API documentation
MessageObject
- class valkka.multiprocess.base.MessageObject(command, **kwargs)[source]
A generic MessageObject for intercommunication between fronend (main python process) and backend (forked multiprocess).
Encapsulates a command and parameters
- Parameters:
command – the command
kwargs – kwargs
Example:
msg = MessageObject("a-command", par1=1, par2=2) msg() # returns "a-command" msg["par1"] # returns 1
MessageProcess
The MessageProcess
class implements a seamless intercommunication between the
forked multiprocess (aka “backend”) and the main/current python process (aka “frontend”).
Like this:
from valkka.multiprocess import MessageProcess, MessageObject
class MyProcess(MessageProcess):
def c__myStuff(self, parameter=None):
print("Regards from other side of the fork! Got parameter", parameter)
def myStuff(self, parameter=None):
self.sendMessageToBack(MessageObject(
"myStuff", parameter=parameter))
p = MyProcess(name="my-process")
pipe = p.getPipe() # returns a multiprocessing.Pipe instance
# ..that can be used as-is with select.select
p.start()
p.myStuff(parameter="gotcha!")
...
p.stop()
In MessageProcess
’s run
method, there is an event loop that listens
to MessageObject
s, which are mapped to correct methods in the backend
(to method c__myStuff
in this case).
Within MessageProcess
, you can use logger self.logger
that
has the name classname.name
, in this example
case it is MyProcess.my-process
- class valkka.multiprocess.base.MessageProcess(name='MessageProcess')[source]
Encapsulates:
Frontend methods (in the current main process)
Backend methods (that run in the background/forked process)
Intercom pipes that communicate (seamlessly) between the multiprocessing front- and backend
All intercom is encapsulated in
MessageObject
s
When you send a
MessageObject
with commandmyStuff
, the forked multiprocess (aka backend) tries to find and execute the methodc__myStuff
in the backend.- Parameters:
name – name of the multiprocess
NOTE: when subclassing
__init__
, remember to call thereinsuper().__init__()
- c__ping(lis=[])[source]
A demo multiprocessing backend method: triggered when frontend calls the method
ping
and sends a reply to frontend
- classmethod formatLogger(level=20)[source]
A helper to setup logger formatter
Sets loglevel to the automatically created logger
self.logger
(that has the nameclassname.name
)- Parameters:
level – loglevel. Default:
logging.INFO
.
- getPipe() Pipe [source]
Multiprocessing frontend method: returns the pipe you can use to listen to messages sent by the multiprocessing backend.
returns a
multiprocessing.Pipe
instance
- ignoreSIGINT()[source]
Multiprocessing frontend method: call before
start
(orgo
), so that the multiprocess ignores all SIGINT signals
- postRun__()[source]
Multiprocessing backend method: subclass if needed
Everything that needs to be done after the fork (i.e. in the backend), and right after the multiprocess has exited it’s main listening & execution loop, i.e. just before the multiprocess exits and dies.
For example: clear heavy libraries and instantiate deep neural net detectors
- preRun__()[source]
Multiprocessing backend method: subclass if needed
Everything that needs to be done after the fork (i.e. in the backend), but before the multiprocesses’ main listening & execution loop starts running.
For example: import heavy libraries and instantiate deep neural net detectors
- readPipes__(timeout)[source]
Multiprocessing backend method: listen simultaneously (i.e. “multiplex”) all intercom pipes.
If you need to listen additionally anything else than the normal intercom pipe, please subclass this one.
- Parameters:
timeout – listening i/o timeout in seconds
- requestStop()[source]
Multiprocessing frontend method: send a request to the multiprocess (backend) to stop
- run()[source]
Multiprocessing backend method: the main listening & execution loop. Normally you would not subclass this one.
- sendMessageToBack(message: MessageObject)[source]
Multiprocessing frontend method: send a
MessageObject
to multiprocessing backend
- sendPing(lis)[source]
A demo multiprocessing frontend method: a demo method that sends the following
MessageObject
to the multiprocessing backend:MessageObject( "ping", lis = lis ))
In the backend this is mapped seamlessly into backend method
c__ping
- send_out__(obj)[source]
Multiprocessing backend method: send an object from the backend to the frontend. It’s recommended to use the
MessageObject
class.
AsyncBackMessageProcess
Warning: understanding of asyncio required
Identical to MessageProcess
class, but now the forked process (aka backend) runs
asyncio:
from valkka.multiprocess import AsyncBackMessageProcess, MessageObject, exclog
class MyProcess(AsyncBackMessageProcess):
async def c__myStuff(self, parameter=None):
# NOTE: this is a coroutine
# so, here call other coroutine with await
# send asyncio tasks, etc. etc.
print("Regards from other side of the fork! Got parameter", parameter)
def myStuff(self, parameter=None):
self.sendMessageToBack(MessageObject(
"myStuff", parameter=parameter))
p = MyProcess(name="my-process")
pipe = p.getPipe() # returns a custom Duplex instance
fd=pipe.getReadFd() # this can be used with select.select
p.start()
p.myStuff(parameter="gotcha!")
...
p.stop()
The whole idea of using asyncio python, is to solve the problem of simultaneous, blocking i/o operations. In the above example, we don’t
yet achieve that i/o concurrency: if you would call myStuff
consecutively twice, the asyncio backend will await the completion of c__myStuff
before
executing and awaiting it for the second time.
To achieve non-blocking behaviour for i/o operations, you should use (background) tasks instead. Here is one way to do that:
class MyProcess(AsyncBackMessageProcess):
@exclog
async def myStuff__(self, parameter=None):
print("Regards from other side of the fork! Got parameter", parameter)
# DO SOME BLOCKING IO HERE
print("Did that blocking io wait")
async def c__myStuff(self, parameter=None):
asyncio.get_event_loop().create_task(self.myStuff__(parameter=parameter))
def myStuff(self, parameter=None):
self.sendMessageToBack(MessageObject(
"myStuff", parameter=parameter))
For a handy way to achieve asyncio concurrency (without asyncio.gather
etc. techniques), please see TaskThread.
Finally, please, note the small “glitch” in the API when getting the file descriptor for reading: you need to call getReadFd
to get the file descriptor.
- class valkka.multiprocess.base.AsyncBackMessageProcess(name='AsyncMessageProcess')[source]
A subclass of
MessageProcess
, but now the backend runs asyncio- Parameters:
name – multiprocess name
NOTE: when subclassing
__init__
, remember to call thereinsuper().__init__()
- async asyncPost__()[source]
Multiprocessing backend coroutine: subclass if needed
Everything that needs to be done after the fork (i.e. in the backend), immediately before exiting the main asyncio event loop
In addition to this, you can still subclass also
postRun__
that is executed after exiting the main syncio event loop
- async asyncPre__()[source]
Multiprocessing backend coroutine: subclass if needed
Everything that needs to be done after the fork (i.e. in the backend), but before the multiprocesses’ main asyncio event loop starts running.
In addition to this, you can still subclass also
preRun__
that is executed after the fork but before the asyncio event loop
- async c__ping(lis=[])[source]
A demo backend coroutine: triggered when frontend calls the method
ping
and sends a reply to frontendSo, in this coroutine it’s all asyncio, i.e. await’ing and sending tasks.
MainContext
- class valkka.multiprocess.base.MainContext[source]
A convenience class to organize your python main process in the context of multiprocessing
You should subclass this. In subclassed
__init__
, you should always call the superclass constructor:def __init__(self): # do custom initializations # call superclass ctor in the last line of your # custom init super().__init__()
This will have the effect of calling
startProcesses
andstartThreads
(see below).Remember to call the superclass constructor always in the last line of your customized
__init__
Please see tutorial, part II for practical subclassing examples
MainContext
has a loggerself.logger
with the nameclassname
.- close()[source]
Mandatory. Terminate all multiprocesses and threads. Should be called in the
__call__
method after exiting the main loop.
- classmethod formatLogger(level=20)[source]
A helper to setup logger formatter
Sets loglevel to the automatically created logger
self.logger
(that has the nameclassname
)- Parameters:
level – loglevel. Default:
logging.INFO
.
EventGroup
With this class you can:
reserve and release (i.e. recycle) events
access a certain reserved event, based on it’s index
Motivation for these are:
When doing multiprocessing, the synchronization primitives (events in this case), must be reserved before forking - after forking you can’t create an event and then expect the forked process to see it.
However if you have created an event before forking, then the forked multiprocesses can see the event and it’s state. For this reason we need to create it before anything else and then reuse it (instead of creating a new one) on-demand.
Furthermore, when communicating between the multiprocessing front- and backends, we can’t expect that an Event object would be serialized correctly. However, as events were created and cached before forking, we can send the index/address of the event (just a simple integer) accross multiprocessing front- and backend: now both front- and backend know what event in question we are using to synchronize.
An example:
...
from multiprocessing import Event
class YourProcess(MessageProcess):
def __init__(self, name):
...
self.event_group = EventGroup(10, Event) # create 10 multiprocessing.Event instances
...
# multiprocessing backend methods
def c__ping(self, event_index=None):
# do something, then trigger the event to indicate that something's done
self.event_group.set(event_index)
# multiprocessing frontend methods
def ping(self):
i, event = self.event_group.reserve()
self.sendMessageToBack(MessageObject(
"ping",
event_index = i
))
event.wait() # wait until c__ping sets the event
self.event_group.release(event)
- class valkka.multiprocess.sync.EventGroup(n=10, event_class=<bound method BaseContext.Event of <multiprocessing.context.DefaultContext object>>)[source]
Creates a group of multiprocessing events
- Parameters:
n – number of events to be instantiated and cached
event_class – a multiprocessing event class that has
set
andclear
methods. default: pythonmultiprocessing.Event
. Can also beEventFd
from libValkka.
- fromIndex(i)[source]
Get an event, based on the event index. Use typically at multiprocessing backend to get the corresponding event as in the frontend.
- release(event)[source]
Release an EventFd sync primitive. Use typically at process frontend / python main process
- Parameters:
event – event to be released / returned
- release_ind(index: int)[source]
Release an EventFd sync primitive, based on the index. Use typically at process frontend / python main process
- Parameters:
index – event’s index
SyncIndex
A context manager that reserves an event from EventGroup, wait’s until it has been set’ted and finally returns the event back to EventGroup.
An example:
...
from multiprocessing import Event
class YourProcess(MessageProcess):
def __init__(self, name):
...
self.event_group = EventGroup(10, Event) # create 10 multiprocessing.Event instances
...
# multiprocessing backend methods
def c__ping(self, event_index=None):
# do something, then trigger the event to indicate that something's done
self.event_group.set(event_index)
# multiprocessing frontend methods
def ping(self):
with SyncIndex(self.event_group) as i:
self.sendMessageToBack(MessageObject(
"ping",
event_index = i
))
# SyncIndex context manager:
# - reserves event from self.event_group
# - waits until event has been set'ted
# - releases event back to self.event_group
- class valkka.multiprocess.sync.SyncIndex(event_group: EventGroup)[source]
A context manager for synchronizing between multiprocessing front- and backend.
- Parameters:
event_group – an EventGroup instance
Wait’s and releases an event at context manager exit