August'24: Kamaelia is in maintenance mode and will recieve periodic updates, about twice a year, primarily targeted around Python 3 and ecosystem compatibility. PRs are always welcome. Latest Release: 1.14.32 (2024/3/24)
The Scheduler runs active microprocesses - giving a regular timeslice to each. It also provides the ability to pause and wake them; allowing an Axon based system to play nicely and relinquish the cpu when idle.
The simplest way is to just use the default scheduler scheduler.run
. Simply activate
components or microprocesses then call the runThreads() method of the
scheduler:
from Axon.Scheduler import scheduler
from MyComponents import MyComponent, AnotherComponent
c1 = MyComponent().activate()
c2 = MyComponent().activate()
c3 = AnotherComponent().activate()
scheduler.run.runThreads()
Alternatively you can create a specific scheduler instance, and activate them using that specific scheduler:
mySched = scheduler()
c1 = MyComponent().activate(Scheduler=mySched)
c2 = MyComponent().activate(Scheduler=mySched)
c3 = AnotherComponent().activate(Scheduler=mySched)
mySched.runThreads()
The runThreads() method is the way of bootstrapping the scheduler. Being a microprocess, it needs something to schedule it! The runThreads() method does exactly that.
The activate() method is fully thread-safe. It can handle multiple simultaneous callers from different threads to the one the scheduler is running in.
The Scheduler supports the ability to, in a thread safe manner, pause and wake individual microprocesses under its control. Because it is thread safe, any thread of execution can issue pause and wake requests for any scheduled microprocess.
The pauseThread() and wakeThread() methods submit requests to pause or wake microprocesses. The scheduler will process these when it is next able to - the requests are queued rather than processed immediately. This is done to ensure thread safety. It can handle multiple simultaneous callers from different threads to the one the scheduler is running in.
Pausing a microprocess means the scheduler removes it from its 'run queue'. This means that it no longer executes that microprocess. Waking it puts it back into the 'run queue'.
If no microprocesses are awake then the scheduler relinquishes cpu usage by blocking.
If however this scheduler is itself being scheduled by another microprocess then it does not block. Ideally it should ask its scheduler to pause it, but instead it busy-waits - self pausing functionality is not yet implemented.
In general, the main() generator in a microprocess (its thread of
execution) can return any values it likes when it uses the yield
statement. It is recommended to
not yield zeros or other kinds of 'false' value as these are reserved
for possible future special meaning.
However, this scheduler does understand certain values that can be yielded:
Axon.Ipc.newComponent - a microprocess can yield this to ask the scheduler to activate a new component or microprocess:
def main(self):
...
x=MyComponent()
yield Axon.Ipc.newComponent(x)
...
This is simply an alternative to calling x.activate().
Axon.Ipc.WaitComplete - this is a way for a microprocess to substitute itself (temporarily) with another one that uses a new generator. For example:
def main(self):
...
yield Axon.Ipc.WaitComplete(self.waitOneSecond())
...
def waitOneSecond(self):
t=time.time()
while time.time() < t+1.0:
yield 1
This is a convenient way to modularise parts of your main() code. But there is an important limitation with the current implementation:
The scheduler will stop running it! It will call the microprocess's stop() method. It will also call the _closeDownMicroprocess() method and will act on the return value if it is one of the following:
The listAllThreads() method returns a list of all activated microprocesses - both paused and awake.
The isThreadPaused() method lets you determine if an individual microprocess is paused. Note that the result returned by this method is conservative (the default assumption is that a thread is probably awake). the result will vary depending on the exact moment it is called!
Both these methods are thread safe.
It also has a slow motion mode designed to help with debugging & testing. Call runThreads() with the slowmo argument set to the number of seconds the scheduler should pause after each cycle of executing all microprocesses. For example, to wait half a second after each cycle of execution:
scheduler.run.runThreads(slowmo=0.5)
The scheduler keeps the following internal state:
The scheduler uses a simple round robin approach - it walks through its run queue and calls the next() method of each microprocess in turn. As it goes, it builds a new run queue, ready for the next cycle. If a microprocess terminates (raises a StopIteration exception) then it is not included in the next cycle's run queue.
After it has gone through all microprocesses, the scheduler then processes messages in its wakeRequests and sleepRequests queues. Sleep requests are processed first; then wake requests second. Suppose there is a sleep and wake request queued for the same microprocess; should it be left awake or put to sleep? By processing wake requests last, the scheduler can err on the side of caution and prefer to leave it awake.
Microprocesses are all in one of three possible states (recorded in
the threads
dictionary):
A request to put a microprocess to sleep is handled as follows:
If the microprocess is already sleeping, then nothing needs to happen.
If the microprocess is active, then it is changed to "going to sleep". It is not removed from the run queue immediately. Instead, what happens is:
- on the next cycle of execution, as the scheduler goes through items in the run queue, it doesn't execute any that are "going to sleep" and doesn't include them in the next run queue it is building. It also sets them to the "sleeping" state,
Wake requests are used to both wake up sleeping microprocesses and also to activate new ones. A request to wake a microprocess is handled like this:
If the request contains a flag indicating that this is actually an activation request, then this also happens:
threads
dictionary then it is added to
both the run queue and threads
. It
is set to be active.This three state system is a performance optimisation: it means that the scheduler does not need to waste time searching through the next run queue to remove items - they simply get removed on the next cycle of execution.
Wake requests and sleep requests are handled through thread-safe queues. This enables other threads of execution (eg. threaded components) to safely make requests to wake or pause components.
Tests passed:
Scheduler - runs microthreads of control.
Creates a scheduler object. If scheduler.run has not been set, sets it. Class initialisation ensures that this object/class attribute is initialised - client modules always have access to a standalone scheduler. Internal attributes:
- time = time when this object was last active.
- threads = set of threads to be run, including their state - whether active or sleeping(paused)
Whilst there can be more than one scheduler active in the general case you will NOT want to create a custom scheduler.
A Microprocess adds itself to the runqueue using this method, using the mannerism scheduler.run._addThread(). Generally component writers should not use this method to activate a component - use the component's own activate() method instead.
Returns True if the specified microprocess is sleeping, or the scheduler does not know about it.
Returns a list of all microprocesses (both active and sleeping)
main([slowmo][,canblock]) - Scheduler main loop generator
Each cycle through this generator does two things: * one pass through all active microprocesses, giving executing them. * processing of wake/sleep requests
You can optionally slow down execution to aid debugging. You can also allow the scheduler to block if there are no active, awake microprocesses.
Keyword arguments:
slowmo specifies a delay (in seconds) before the main loop is run. slowmo defaults to 0.
If canblock is True, this generator will briefly) block if there are no active microprocesses, otherwise it will return immediately (default).
This generator terminates when there are no microprocesses left (either sleeping or awake) because they've all terminated. (or because there were none to begin with!)
pauseThread(mprocess) - request to put a mprocess to sleep.
If active, or already sleeping, the specified microprocess will be put to leep on the next cycle through the scheduler.
Runs the scheduler until there are no activated microprocesses left (they've all terminated).
Think of this as bootstrapping the scheduler - after all it is a microprocess like any other, so needs something to run it!
Keyword arguments:
Request to wake a sleeping mprocess, or activate a new one.
If sleeping or already active, the specified microprocess will be ensured to be active on the next cycle through the scheduler.
If the microprocess is not running yet then it will be woken if (and only if) canActivate is set to True (the default is False).
Got a problem with the documentation? Something unclear that could be clearer? Want to help improve it? Constructive criticism is very welcome - especially if you can suggest a better rewording!
Please leave you feedback here in reply to the documentation thread in the Kamaelia blog.
-- Automatic documentation generator, 09 Dec 2009 at 04:00:25 UTC/GMT