August'24: Kamaelia is in maintenance mode and will recieve periodic updates, about twice a year, primarily targeted around Python 3 and ecosystem compatibility. PRs are always welcome. Latest Release: 1.14.32 (2024/3/24)

Kamaelia.Util.Splitter

Simple Fanout of messages

A component that splits a data source, fanning it out to multiple destinations.

Example Usage

Component connecting to a Splitter:

class Consumer(Axon.Component.component):
    Outboxes = [ "outbox", "signal", "splitter_config" ]

    def main(self):
         self.send( addsink( self, "inbox", "control" ), "splitter_config")
         yield 1
         ... do stuff when data is received on "inbox" inbox

mysplitter = Splitter()
Pipeline( producer(), mysplitter ).activate()

myconsumer = Consumer().activate()
myconsumer.link( (myconsumer, "splitter_config"), ("mysplitter", "configuration") )

How does it work?

Any data sent to the component's "inbox" inbox is sent out to multiple destinations (but not to the "outbox" outbox).

Add a destination by sending an addsink(...) message to the "configuration" inbox of the component. Splitter will then wire up to the 'sinkbox' inbox specified in the message, and send it any data sent to its "inbox" inbox.

NOTE: Splitter only does this for the 'sinkbox' inbox, not for the 'sinkcontrol' inbox. If one is specified, it is ignored.

There is no limit on the number of 'sinks' that can be connected to the splitter. The same component can add itself as a sink multiple times, provided different named inboxes are used each time.

NOTE: The data is not duplicated - the same item is sent to all destinations. Care must therefore be taken if the data item is mutable.

If one or more destinations cause a noSpaceInBox exception, the data item will be queued, and Splitter will attempt to resend it to the destinations in question until successful. It will stop forwarding any new incoming data until it has succeeded, thereby ensuring the order of data is not altered.

Stop data being sent to a destination by sending a removesink(...) message to the "configuration" inbox of the Splitter component. Splitter will then cease sending messages to the 'sinkbox' inbox specified in the message and will unwire from it.

Any messages sent to the "control" inbox are ignored. The "outbox" and "signal" outboxes are not used.

This component does not terminate.

Pluggable Fanout of messages

The PlugSplitter component splits a data source, fanning it out to multiple destinations. The Plug component allows you to easily 'plug' a destination into the splitter.

Example Usage

Two consumers receiving the same data from a single consumer. Producer and consumers are encapsulated by PlugSplitter and Plug components respectively:

mysplitter = PlugSplitter( producer() ).activate()

Plug(mysplitter, consumer() ).activate()
Plug(mysplitter, consumer() ).activate()

The same, but the producer and consumers are not encapsulated:

mysplitter = PlugSplitter()
Pipeline( producer, mysplitter ).activate()

Pipeline( Plug(mysplitter), consumer() ).activate()
Pipeline( Plug(mysplitter), consumer() ).activate()

How does it work?

Any data sent to the component's "inbox" and "control" inboxes is sent out to multiple destinations. It is also sent onto the components "outbox" and "signal" outboxes, respectively.

Alternatively, initialisation you can specify a 'source' component. If you do, then data to be sent out to multiple destinations is instead received from that component's "outbox" and "signal" outboxes, respectively. Any data sent to the "inbox" and "control" inboxes of the PlugSplitter component will be forwarded to the "inbox" and "control" inboxes of the 'source' component, respectively.

This source component is encapsulated as a child within the PlugSplitter component, and so must not be separately activated. Activating PlugSplitter will also activate this child component.

Add a destination by making a Plug component, specifying the PlugSplitter component to 'plug into'. See documentation for the Plug component for more information.

Alternatively, you can add and remove destinations manually:

  • Add a destination by sending an addsink(...) message to the "configuration" inbox of the component.

    If a 'sinkbox' inbox is specified in the message, then PlugSplitter will wire up to it and forward to it any 'inbox'/'outbox' data. If a 'sinkcontrol' inbox is specified, then Plugsplitter will wire up to it and forward to it any 'control'/'signal' data.

  • Stop data being sent to a destination by sending a removesink(...) message to the "configuration" inbox of the Splitter component.

    Splitter will then cease sending messages to the 'sinkbox' inbox specified in the message and will unwire from it.

There is no limit on the number of 'sinks' that can be connected to the splitter. The same component can add itself as a sink multiple times, provided different named inboxes are used each time.

NOTE: The data is not duplicated - the same item is sent to all destinations. Care must therefore be taken if the data item is mutable.

If a shutdownMicroprocess or producerFinished message is received on the "control" inbox and there is NO 'source' child component, then the message is forwarded onto all 'control' destinations and the 'signal' outbox. The component then immediately terminates, unwiring from all destinations.

If there is a child component then PlugSplitter will terminate when the child component terminates, unwiring from all destinations.

Plug for PlugSplitter

The Plug component 'plugs into' a PlugSplitter as a destination to which the source data is split.

Example Usage

See PlugSplitter documentation.

How does it work?

Initialise the Plug component by specifying a PlugSplitter component to connect to and the component that wants to receive the data from the Plugsplitter.

The destination/sink component is encapsulated as a child component, and is therefore activated by the Plug component when it is activated. Do not activate it yourself.

The Plug component connects to the PlugSplitter component by wiring its "splitter_config" outbox to the "configuration" inbox of the PlugSplitter component and sending it an addsink(...) message. This causes PlugSplitter to wire up to the Plug's "inbox" and "control" inboxes.

The Plug's "inbox" and "control" inboxes are forwarded to the "inbox" and "control" inboxes of the child component respectively. The "outbox" and "signal" outboxes of the child component are forwarded to the "outbox" and "signal" outboxes of the Plug component respectively.

When the child component terminates, the Plug component sends a removesink(...) message to the PlugSplitter, causing PlugSplitter to unwire from it. It then terminates.

Thoughts

PlugSplitter is probably more reliable than Splitter however it feels too complex. However the actual "Splitter" class in this file is not the preferable option.


Kamaelia.Util.Splitter.Plug

class Plug(Axon.Component.component)

Plug(splitter,component) -> new Plug component.

A component that 'plugs' the specified component into the specified splitter as a destination for data.

Keyword arguments:

  • splitter -- splitter component to plug into (any component that accepts addsink(...) and removesink(...) messages on a 'configuration' inbox
  • component -- component to receive data from the splitter

Inboxes

  • control : Incoming control data for child component, and shutdown signalling
  • inbox : Incoming data for child component

Outboxes

  • outbox : Outgoing data from child component
  • signal : Outgoing control data from child component, and shutdown signalling
  • splitter_config : Used to communicate with the target splitter

Methods defined here

Warning!

You should be using the inbox/outbox interface, not these methods (except construction). This documentation is designed as a roadmap as to their functionalilty for maintainers and new component developers.

__init__(self, splitter, component)

x.__init__(...) initializes x; see x.__class__.__doc__ for signature

childrenDone(self)

Unplugs any children that have terminated, and returns true if there are no running child components left (ie. their microproceses have finished)

main(self)

Main loop.

Kamaelia.Util.Splitter.PlugSplitter

class PlugSplitter(Axon.AdaptiveCommsComponent.AdaptiveCommsComponent)

PlugSplitter([sourceComponent]) -> new PlugSplitter component.

Splits incoming data out to multiple destinations. Send addsink(...) and removesink(...) messages to the 'configuration' inbox to add and remove destinations.

Keyword arguments:

  • sourceComponent -- None, or component to act as data source

Inboxes

  • control : Shutdown signalling, and signalling to be fanned out.
  • _control : Internal inbox for receiving from the child source component (if it exists)
  • configuration : addsink(...) and removesink(...) request messages
  • inbox : Data items to be fanned out.
  • _inbox : Internal inbox for receiving from the child source component (if it exists)

Outboxes

  • outbox : Data items received on 'inbox' inbox.
  • signal : Shutdown signalling, and data items received on 'control' inbox.

Methods defined here

Warning!

You should be using the inbox/outbox interface, not these methods (except construction). This documentation is designed as a roadmap as to their functionalilty for maintainers and new component developers.

__init__(self[, sourceComponent])

x.__init__(...) initializes x; see x.__class__.__doc__ for signature

_addSink(self, sink[, sinkinbox][, sinkcontrol])

Add a new destination for data.

Specify target component (sink), and target inbox (sinkinbox) and/or target shutdown signalling inbox (sinkcontrol).

_delSink(self, sink[, sinkinbox][, sinkcontrol])

Remove a destination for data.

Specify target component (sink), and target inbox (sinkinbox) and/or target shutdown signalling inbox (sinkcontrol).

childrenDone(self)

Unplugs any children that have terminated, and returns true if there are no running child components left (ie. their microproceses have finished)

main(self)

Main loop.

Kamaelia.Util.Splitter.Splitter

class Splitter(Axon.AdaptiveCommsComponent.AdaptiveCommsComponent)

Splitter() -> new Splitter component.

Splits incoming data out to multiple destinations. Send addsink(...) and removesink(...) messages to the 'configuration' inbox to add and remove destinations.

Inboxes

  • control : NOT USED
  • configuration : addsink(...) and removesink(...) request messages
  • inbox : Source of data items

Outboxes

  • outbox : NOT USED
  • signal : NOT USED

Methods defined here

Warning!

You should be using the inbox/outbox interface, not these methods (except construction). This documentation is designed as a roadmap as to their functionalilty for maintainers and new component developers.

__init__(self)

x.__init__(...) initializes x; see x.__class__.__doc__ for signature

createsink(self, sink[, sinkbox][, passthrough])

Set up a new destination for data.

Creates an outbox, links it to the target (component,inbox) and records it in self.outlist.

deletesink(self, oldsink)

Removes the specified (component, inbox) as a destination for data where (component, inbox) = (oldsink.sink, oldsink.sinkbox).

Unlinks the target, destroys the corresponding outbox, and removes the corresponding record from self.outlist.

mainBody(self)

Main loop body.

Feedback

Got a problem with the documentation? Something unclear that could be clearer? Want to help improve it? Constructive criticism is very welcome - especially if you can suggest a better rewording!

Please leave you feedback here in reply to the documentation thread in the Kamaelia blog.

-- Automatic documentation generator, 05 Jun 2009 at 03:01:38 UTC/GMT