API documentation

MessageObject

class valkka.multiprocess.base.MessageObject(command, **kwargs)[source]

A generic MessageObject for intercommunication between fronend (main python process) and backend (forked multiprocess).

Encapsulates a command and parameters

Parameters:
  • command – the command

  • kwargs – kwargs

Example:

msg = MessageObject("a-command", par1=1, par2=2)
msg() # returns "a-command"
msg["par1"] # returns 1

MessageProcess

The MessageProcess class implements a seamless intercommunication between the forked multiprocess (aka “backend”) and the main/current python process (aka “frontend”).

Like this:

from valkka.multiprocess import MessageProcess, MessageObject

class MyProcess(MessageProcess):

    def c__myStuff(self, parameter=None):
        print("Regards from other side of the fork!  Got parameter", parameter)

    def myStuff(self, parameter=None):
        self.sendMessageToBack(MessageObject(
            "myStuff", parameter=parameter))

p = MyProcess(name="my-process")
pipe = p.getPipe() # returns a multiprocessing.Pipe instance
# ..that can be used as-is with select.select
p.start()
p.myStuff(parameter="gotcha!")
...
p.stop()

In MessageProcess’s run method, there is an event loop that listens to MessageObject s, which are mapped to correct methods in the backend (to method c__myStuff in this case).

Within MessageProcess, you can use logger self.logger that has the name classname.name, in this example case it is MyProcess.my-process

class valkka.multiprocess.base.MessageProcess(name='MessageProcess')[source]

Encapsulates:

  • Frontend methods (in the current main process)

  • Backend methods (that run in the background/forked process)

  • Intercom pipes that communicate (seamlessly) between the multiprocessing front- and backend

  • All intercom is encapsulated in MessageObject s

When you send a MessageObject with command myStuff, the forked multiprocess (aka backend) tries to find and execute the method c__myStuff in the backend.

Parameters:

name – name of the multiprocess

NOTE: when subclassing __init__, remember to call therein super().__init__()

c__ping(lis=[])[source]

A demo multiprocessing backend method: triggered when frontend calls the method ping and sends a reply to frontend

classmethod formatLogger(level=20)[source]

A helper to setup logger formatter

Sets loglevel to the automatically created logger self.logger (that has the name classname.name)

Parameters:

level – loglevel. Default: logging.INFO.

getPipe() Pipe[source]

Multiprocessing frontend method: returns the pipe you can use to listen to messages sent by the multiprocessing backend.

returns a multiprocessing.Pipe instance

go()[source]

Multiprocessing frontend method: a synonym to multiprocessing start()

ignoreSIGINT()[source]

Multiprocessing frontend method: call before start (or go), so that the multiprocess ignores all SIGINT signals

postRun__()[source]

Multiprocessing backend method: subclass if needed

Everything that needs to be done after the fork (i.e. in the backend), and right after the multiprocess has exited it’s main listening & execution loop, i.e. just before the multiprocess exits and dies.

For example: clear heavy libraries and instantiate deep neural net detectors

preRun__()[source]

Multiprocessing backend method: subclass if needed

Everything that needs to be done after the fork (i.e. in the backend), but before the multiprocesses’ main listening & execution loop starts running.

For example: import heavy libraries and instantiate deep neural net detectors

readPipes__(timeout)[source]

Multiprocessing backend method: listen simultaneously (i.e. “multiplex”) all intercom pipes.

If you need to listen additionally anything else than the normal intercom pipe, please subclass this one.

Parameters:

timeout – listening i/o timeout in seconds

requestStop()[source]

Multiprocessing frontend method: send a request to the multiprocess (backend) to stop

run()[source]

Multiprocessing backend method: the main listening & execution loop. Normally you would not subclass this one.

sendMessageToBack(message: MessageObject)[source]

Multiprocessing frontend method: send a MessageObject to multiprocessing backend

sendPing(lis)[source]

A demo multiprocessing frontend method: a demo method that sends the following MessageObject to the multiprocessing backend:

MessageObject(
    "ping",
    lis = lis
))

In the backend this is mapped seamlessly into backend method c__ping

send_out__(obj)[source]

Multiprocessing backend method: send an object from the backend to the frontend. It’s recommended to use the MessageObject class.

stop()[source]

Multiprocessing frontend method: request backend multiprocess to stop and wait until it has finished

waitStop()[source]

Multiprocessing frontend method: alias to multiprocessing join()

AsyncBackMessageProcess

Warning: understanding of asyncio required

Identical to MessageProcess class, but now the forked process (aka backend) runs asyncio:

from valkka.multiprocess import AsyncBackMessageProcess, MessageObject, exclog

class MyProcess(AsyncBackMessageProcess):

    async def c__myStuff(self, parameter=None):
        # NOTE: this is a coroutine
        # so, here call other coroutine with await
        # send asyncio tasks, etc. etc.
        print("Regards from other side of the fork!  Got parameter", parameter)

    def myStuff(self, parameter=None):
        self.sendMessageToBack(MessageObject(
            "myStuff", parameter=parameter))

p = MyProcess(name="my-process")
pipe = p.getPipe() # returns a custom Duplex instance
fd=pipe.getReadFd() # this can be used with select.select
p.start()
p.myStuff(parameter="gotcha!")
...
p.stop()

The whole idea of using asyncio python, is to solve the problem of simultaneous, blocking i/o operations. In the above example, we don’t yet achieve that i/o concurrency: if you would call myStuff consecutively twice, the asyncio backend will await the completion of c__myStuff before executing and awaiting it for the second time.

To achieve non-blocking behaviour for i/o operations, you should use (background) tasks instead. Here is one way to do that:

class MyProcess(AsyncBackMessageProcess):

    @exclog
    async def myStuff__(self, parameter=None):
        print("Regards from other side of the fork!  Got parameter", parameter)
        # DO SOME BLOCKING IO HERE
        print("Did that blocking io wait")

    async def c__myStuff(self, parameter=None):
        asyncio.get_event_loop().create_task(self.myStuff__(parameter=parameter))

    def myStuff(self, parameter=None):
        self.sendMessageToBack(MessageObject(
            "myStuff", parameter=parameter))

For a handy way to achieve asyncio concurrency (without asyncio.gather etc. techniques), please see TaskThread.

Finally, please, note the small “glitch” in the API when getting the file descriptor for reading: you need to call getReadFd to get the file descriptor.

class valkka.multiprocess.base.AsyncBackMessageProcess(name='AsyncMessageProcess')[source]

A subclass of MessageProcess, but now the backend runs asyncio

Parameters:

name – multiprocess name

NOTE: when subclassing __init__, remember to call therein super().__init__()

async asyncPost__()[source]

Multiprocessing backend coroutine: subclass if needed

Everything that needs to be done after the fork (i.e. in the backend), immediately before exiting the main asyncio event loop

In addition to this, you can still subclass also postRun__ that is executed after exiting the main syncio event loop

async asyncPre__()[source]

Multiprocessing backend coroutine: subclass if needed

Everything that needs to be done after the fork (i.e. in the backend), but before the multiprocesses’ main asyncio event loop starts running.

In addition to this, you can still subclass also preRun__ that is executed after the fork but before the asyncio event loop

async c__ping(lis=[])[source]

A demo backend coroutine: triggered when frontend calls the method ping and sends a reply to frontend

So, in this coroutine it’s all asyncio, i.e. await’ing and sending tasks.

getPipe() Duplex[source]

Returns a Duplex object, instead of multiprocessing.Pipe object.

Duplex.fileno() returns the read file dtor number

getReadFd()[source]

Returns read file dtor number for the frontend Duplex object

getWriteFd()[source]

Returns write file dtor number for the frontend Duplex object

async send_out__(obj)[source]

Multiprocessing backend coroutine: pickle obj & send to main python process. It’s recommended to use the MessageObject class.

MainContext

class valkka.multiprocess.base.MainContext[source]

A convenience class to organize your python main process in the context of multiprocessing

You should subclass this. In subclassed __init__, you should always call the superclass constructor:

def __init__(self):
    # do custom initializations
    # call superclass ctor in the last line of your
    # custom init
    super().__init__()

This will have the effect of calling startProcesses and startThreads (see below).

Remember to call the superclass constructor always in the last line of your customized __init__

Please see tutorial, part II for practical subclassing examples

MainContext has a logger self.logger with the name classname.

__call__()[source]

Mandatory. Your main process loop.

close()[source]

Mandatory. Terminate all multiprocesses and threads. Should be called in the __call__ method after exiting the main loop.

classmethod formatLogger(level=20)[source]

A helper to setup logger formatter

Sets loglevel to the automatically created logger self.logger (that has the name classname)

Parameters:

level – loglevel. Default: logging.INFO.

runAsThread()[source]

Run the class as a thread. Only for testing/debugging purposes

startProcesses()[source]

Mandatory. Create, cache and start your MessageProcess es here.

startThreads()[source]

Mandatory. Create, cache and start any python multithreads here if you have them.

stopThread()[source]

If launched with runAsThread, use this method to stop.

EventGroup

With this class you can:

  • reserve and release (i.e. recycle) events

  • access a certain reserved event, based on it’s index

Motivation for these are:

When doing multiprocessing, the synchronization primitives (events in this case), must be reserved before forking - after forking you can’t create an event and then expect the forked process to see it.

However if you have created an event before forking, then the forked multiprocesses can see the event and it’s state. For this reason we need to create it before anything else and then reuse it (instead of creating a new one) on-demand.

Furthermore, when communicating between the multiprocessing front- and backends, we can’t expect that an Event object would be serialized correctly. However, as events were created and cached before forking, we can send the index/address of the event (just a simple integer) accross multiprocessing front- and backend: now both front- and backend know what event in question we are using to synchronize.

An example:

...
from multiprocessing import Event

class YourProcess(MessageProcess):

    def __init__(self, name):
        ...
        self.event_group = EventGroup(10, Event) # create 10 multiprocessing.Event instances
        ...

    # multiprocessing backend methods

    def c__ping(self, event_index=None):
        # do something, then trigger the event to indicate that something's done
        self.event_group.set(event_index)

    # multiprocessing frontend methods

    def ping(self):
        i, event = self.event_group.reserve()
        self.sendMessageToBack(MessageObject(
            "ping",
            event_index = i
        ))
        event.wait() # wait until c__ping sets the event
        self.event_group.release(event)
class valkka.multiprocess.sync.EventGroup(n=10, event_class=<bound method BaseContext.Event of <multiprocessing.context.DefaultContext object>>)[source]

Creates a group of multiprocessing events

Parameters:
  • n – number of events to be instantiated and cached

  • event_class – a multiprocessing event class that has set and clear methods. default: python multiprocessing.Event. Can also be EventFd from libValkka.

asIndex(event)[source]

Return index corresponding to an event

fromIndex(i)[source]

Get an event, based on the event index. Use typically at multiprocessing backend to get the corresponding event as in the frontend.

release(event)[source]

Release an EventFd sync primitive. Use typically at process frontend / python main process

Parameters:

event – event to be released / returned

release_ind(index: int)[source]

Release an EventFd sync primitive, based on the index. Use typically at process frontend / python main process

Parameters:

index – event’s index

reserve() tuple[source]

Reserve and return an Event instance together with its index: index, Event

Use typically at process frontend / python main process

set(i)[source]

Set / trigger an event at index i. Used typically at multiprocessing backend.

SyncIndex

A context manager that reserves an event from EventGroup, wait’s until it has been set’ted and finally returns the event back to EventGroup.

An example:

...
from multiprocessing import Event

class YourProcess(MessageProcess):

    def __init__(self, name):
        ...
        self.event_group = EventGroup(10, Event) # create 10 multiprocessing.Event instances
        ...

    # multiprocessing backend methods

    def c__ping(self, event_index=None):
        # do something, then trigger the event to indicate that something's done
        self.event_group.set(event_index)

    # multiprocessing frontend methods

    def ping(self):
        with SyncIndex(self.event_group) as i:
            self.sendMessageToBack(MessageObject(
                "ping",
                event_index = i
            ))
            # SyncIndex context manager:
            # - reserves event from self.event_group
            # - waits until event has been set'ted
            # - releases event back to self.event_group
class valkka.multiprocess.sync.SyncIndex(event_group: EventGroup)[source]

A context manager for synchronizing between multiprocessing front- and backend.

Parameters:

event_group – an EventGroup instance

Wait’s and releases an event at context manager exit

Other

valkka.multiprocess.base.safe_select(l1, l2, l3, timeout=None)[source]

Like select.select, but ignores EINTR