August'24: Kamaelia is in maintenance mode and will recieve periodic updates, about twice a year, primarily targeted around Python 3 and ecosystem compatibility. PRs are always welcome. Latest Release: 1.14.32 (2024/3/24)

Kamaelia.Util.RateFilter

These components limit the rate of data flow, either by buffering or by taking charge and requesting data at a given rate.

Message Rate limiting

This component buffers incoming messages and limits the rate at which they are sent on.

Example Usage

Regulating video to a constant framerate, buffering 2 seconds of data before starting to emit frames:

Pipeline( RateControlledFileReader(...),
          DiracDecoder(),
          MessageRateLimit(messages_per_second=framerate, buffer=2*framerate),
          VideoOverlay(),
        ).activate()

How does it work?

Data items sent to this component's "inbox" inbox are buffered. Once the buffer is full, the component starts to emit items at the specified rate to its "outbox" outbox.

If there is a shortage of data in the buffer, then the specified rate of output will, obviously, not be sustained. Items will be output when they are available.

The specified rate serves as a ceiling limit - items will never be emitted faster than that rate, though they may be emitted slower.

Make sure you choose a sufficient buffer size to handle any expected jitter/temporary shortages of data.

If a producerFinished or shutdownMicroprocess message is received on the components' "control" inbox, it is sent on out of the "signal" outbox. The component will then immediately terminate.

Rate Control

These components control the rate of a system by requesting data at a given rate. The 'variable' version allows this rate to the changed whilst running.

Example Usage

Reading from a file at a fixed rate:

Graphline( ctrl   = ByteRate_RequestControl(rate=1000, chunksize=32),
           reader = PromptedFileReader(filename="myfile", readmode="bytes"),
           linkages = {
                ("ctrl", "outbox") : ("reader","inbox"),
                ("reader", "outbox") : ("self", "outbox"),

                ("self", "control") : ("reader", "control"),
                ("reader", "signal") : ("ctrl", "control"),
                ("ctrl, "signal") : ("self", "signal"),
              }

Note that the "signal"-"control" path goes in the opposite direction so that when the file is finished reading, the ByteRate_RequestControl component receives a shutdown message.

Reading from a file at a varying rate (send new rates to the "inbox" inbox):

Graphline( ctrl   = VariableByteRate_RequestControl(rate=1000, chunksize=32),
           reader = PromptedFileReader(filename="myfile", readmode="bytes"),
           linkages = {
                  ("self", "inbox") : ("ctrl", "inbox"),
                  ("ctrl", "outbox") : ("reader","inbox"),
                  ("reader", "outbox") : ("self", "outbox"),

                  ("self", "control") : ("reader", "control"),
                  ("reader", "signal") : ("ctrl", "control"),
                  ("ctrl, "signal") : ("self", "signal"),
              }
         ).activate()

Note that the "signal"-"control" path goes in the opposite direction so that when the file is finished reading, the VariableByteRate_RequestControl component receives a shutdown message.

How does it work?

These components emit from their "outbox" outboxes, requests for data at the specified rate. Each request is an integer specifying the number of items.

Rates are in no particular units (eg. bitrate, framerate) - you can use it for whatever purpose you wish. Just ensure your values fit the units you are working in.

At initialisation, you specify not only the rate, but also the chunk size or chunk rate. For example, a rate of 12 and chunksize of 4 will result in 3 requests per second, each for 4 items. Conversely, specifying a rate of 12 and a chunkrate of 2 will result in 2 requests per second, each for 6 items.

The rate and chunk size or chunk rate you specify does not have to be integer or divide into integers. For example, you can specify a rate of 10 and a chunksize of 3. Requests will then be emitted every 0.3 seconds, each for 3 items.

When requests are emitted, they will always be for an integer number of items. Rounding errors are averaged out over time, and should not accumulate. Rounding will occur if chunksize, either specified, or calculated from chunkrate, is non-integer.

At initialisation, you can also specify that chunk 'aggregation' is permitted. If permitted, then the component can choose to exceed the specified chunksize. For example if, for some reason, the component gets behind, it might aggregate two requests together - the next request will be for twice as many items.

Another example would be if you, for example, specify a rate of 100 and chunkrate of 3. The 3 requests emitted every second will then be for 33, 33 and 34 items.

The VariableByteRate_RequestControl component allows the rate to be changed on-the-fly. Send a new rate to the component's "inbox" inbox and it will be adopted immediately. You cannot change the chunkrate or chunksize.

The new rate is adopted at the instant it is received. There will be no glitches in the apparent rate of requests due to your changing the rate.

If a producerFinished or shutdownMicroprocess message is received on the components' "control" inbox, it is sent on out of the "signal" outbox. The component will then immediately terminate.

Flow limiting by request

This component buffers incoming data and emits it one item at a time, whenever a "NEXT" request is received.

Example Usage

An app that reads data items from a file, then does something with then one at a time when the user clicks a visual button in pygame:

Graphline( source   = RateControlledFileReader(..., readmode="lines"),
           limiter  = OnDemandLimit(),
           trigger  = Button(caption="Click for next",msg="NEXT"),
           dest     = consumer(...),
           linkages = {
                   ("source", "outbox") : ("limiter", "inbox"),
                   ("limiter", "outbox") : ("dest", "inbox"),
                   ("trigger", "outbox") : ("limiter", "slidecontrol")
               }
         ).activate()

How does it work?

Data items sent to the component's "inbox" inbox are buffered in a queue. Whenever a "NEXT" message is received on the component's "slidecontrol" inbox, an item is taken out of the queue and sent out of the "outbox" outbox.

Items come out in the same order they go in.

If a "NEXT" message is received but there are no items waiting in the queue, the "NEXT" message is discarded and nothing is emitted.

If a producerFinished message is received on the components' "control" inbox, it is sent on out of the "signal" outbox. The component will then immediately terminate.


Kamaelia.Util.RateFilter.ByteRate_RequestControl

class ByteRate_RequestControl(Axon.Component.component)

ByteRate_RequestControl([rate][,chunksize][,chunkrate][,allowchunkaggregation]) -> new ByteRate_RequestControl component.

Controls rate of a data source by, at a controlled rate, emitting integers saying how much data to emit.

Keyword arguments:

  • rate -- qty of data items per second (default=100000)
  • chunksize -- None or qty of items per 'chunk' (default=None)
  • chunkrate -- None or number of chunks per second (default=10)
  • allowchunkaggregation -- if True, chunksize will be enlarged if 'catching up' is necessary (default=False)

Specify either chunksize or chunkrate, but not both.

Inboxes

  • control : Shutdown signalling
  • inbox : NOT USED

Outboxes

  • outbox : requests for 'n' items
  • signal : Shutdown signalling

Methods defined here

Warning!

You should be using the inbox/outbox interface, not these methods (except construction). This documentation is designed as a roadmap as to their functionalilty for maintainers and new component developers.

__init__(self[, rate][, chunksize][, chunkrate][, allowchunkaggregation])

x.__init__(...) initializes x; see x.__class__.__doc__ for signature

getChunksToSend(self)

Generator. Returns the size of chunks to be requested (if any) to 'catch up' since last time this method was called.

main(self)

Main loop.

resetTiming(self)

Resets the timing variable used to determine when the next time to send a request is.

shutdown(self)

Returns True if shutdown message received.

Kamaelia.Util.RateFilter.MessageRateLimit

class MessageRateLimit(Axon.Component.component)

MessageRateLimit(messages_per_second[, buffer]) -> new MessageRateLimit component.

Buffers messages and outputs them at a rate limited by the specified rate once the buffer is full.

Keyword arguments:

  • messages_per_second -- maximum output rate
  • buffer -- size of buffer (0 or greater) (default=60)

Inboxes

  • control : NOT USED
  • inbox : Incoming items/messages

Outboxes

  • outbox : Items/messages limited to specified maximum output rate
  • signal : NOT USED

Methods defined here

Warning!

You should be using the inbox/outbox interface, not these methods (except construction). This documentation is designed as a roadmap as to their functionalilty for maintainers and new component developers.

__init__(self, messages_per_second, buffer, **argd)

x.__init__(...) initializes x; see x.__class__.__doc__ for signature

main(self)

Main loop.

Kamaelia.Util.RateFilter.OnDemandLimit

class OnDemandLimit(Axon.Component.component)

OnDemandLimit() -> new OnDemandLimit component.

A component that receives data items, but only emits them on demand, one at a time, when "NEXT" messages are received on the "slidecontrol" inbox.

Inboxes

  • control : Shutdown signalling
  • inbox : Data items to be passed on, on demand.
  • slidecontrol : 'NEXT' requests to emit a data item.

Outboxes

  • outbox : Data items, when requested.
  • signal : Shutdown signalling

Methods defined here

Warning!

You should be using the inbox/outbox interface, not these methods (except construction). This documentation is designed as a roadmap as to their functionalilty for maintainers and new component developers.

main(self)

Main loop.

Kamaelia.Util.RateFilter.VariableByteRate_RequestControl

class VariableByteRate_RequestControl(Axon.Component.component)

ByteRate_RequestControl([rate][,chunksize][,chunkrate][,allowchunkaggregation]) -> new ByteRate_RequestControl component.

Controls rate of a data source by, at a controlled rate, emitting integers saying how much data to emit. Rate can be changed at runtime.

Keyword arguments: - rate -- qty of data items per second (default=100000) - chunksize -- None or qty of items per 'chunk' (default=None) - chunkrate -- None or number of chunks per second (default=10) - allowchunkaggregation -- if True, chunksize will be enlarged if 'catching up' is necessary (default=False)

Specify either chunksize or chunkrate, but not both.

Inboxes

  • control : Shutdown signalling
  • inbox : New rate

Outboxes

  • outbox : requests for 'n' items
  • signal : Shutdown signalling

Methods defined here

Warning!

You should be using the inbox/outbox interface, not these methods (except construction). This documentation is designed as a roadmap as to their functionalilty for maintainers and new component developers.

__init__(self[, rate][, chunksize][, chunkrate][, allowchunkaggregation])

x.__init__(...) initializes x; see x.__class__.__doc__ for signature

changeRate(self, newRate, now)

Change the rate.

Guaranteed to not cause a glitch in the rate of output.

getChunksToSend(self, now)

Generator. Returns the size of chunks to be requested (if any) to 'catch up' since last time this method was called.

main(self)

Main loop.

resetTiming(self, now)

Resets the timing variable used to determine when the next time to send a request is.

shutdown(self)

Returns True if shutdown message received.

Feedback

Got a problem with the documentation? Something unclear that could be clearer? Want to help improve it? Constructive criticism is very welcome - especially if you can suggest a better rewording!

Please leave you feedback here in reply to the documentation thread in the Kamaelia blog.

-- Automatic documentation generator, 05 Jun 2009 at 03:01:38 UTC/GMT