Part I: Managing a multiprocess

In this part we create your first multiprocess and learn how to intercommunicate and share resources with it.

1. Send message to a forked process

[download code]

In this example, we call a method in the backend (forked multiprocess) seamlessly by simply calling a method in the frontend (python main process context).

This is achieved with:

MessageProcess’s backend has an internal execution loop that listen’s continuously messages from the frontend.

When a MessageObject is sent with the command ping, backend looks automagically for the method c__ping in the backend and executes it: execution of c__ping happens in the multiprocessing backend, i.e. in the forked process

import time
from valkka.multiprocess import MessageProcess, MessageObject

class MyProcess(MessageProcess):

    def c__ping(self, parameter=None):
        print("Regards from other side of the fork!  Got parameter", parameter)

    def ping(self, parameter=None):
        self.sendMessageToBack(MessageObject(
            "ping", parameter=parameter))

p = MyProcess(name="my-process")

Calling start, forks and creates the backend

p.start()

Let multiprocessing backend run on it’s own for one second

time.sleep(1)

Call frontend method with a parameter: it sends seamlessly a message to the backend which executes c__ping in the backend:

p.ping(parameter="gotcha!")

Again, let multiprocessing backend run on it’s own a while

time.sleep(1)

Finally, tell the backend to stop and exit

p.stop()

2. Send message and synchronize

[download code]

So, we can call backend methods, i.e. tell the forked process to do something by simply calling a frontend method.

Many times we need to synchronize between the main python process and the forked multiprocess: i.e., the main process needs to wait that the forked process has completed doing something.

import time
import logging
from valkka.multiprocess import MessageProcess, MessageObject, \
    EventGroup, SyncIndex

class MyProcess(MessageProcess):

    def __init__(self, name):
        super().__init__(name)
        self.eg = EventGroup(10)

here we have created a group of multiprocessing Event objects (link) into the self.eg member.

As the Event s are created before fork (i.e. calling start), they are visible both in the multiprocessing back- and frontend.

def c__ping(self, parameter=None, sync_index: int =None):
    self.logger.info("c__ping: got parameter %s - will do some", parameter)
    time.sleep(1.0) # do something time consuming..
    self.logger.info("c__ping: ..did that!")
    self.eg.set(sync_index) # tell frontend that it's done

An index number is communicated to the multiprocessing backend, so that the forked process knows which Event it needs to set in order to sync with the frontend:

    def ping(self, parameter=None):
        self.logger.info("ping: callin backend to do some stuff")
        with SyncIndex(self.eg) as i:
            self.sendMessageToBack(MessageObject(
                "ping",
                parameter=parameter,
                sync_index = i
            ))
        self.logger.info("ping: backend seems to be ready")

p = MyProcess(name="my-process")

A helper method to quick-format the logger for MyProcess:

p.formatLogger()

Calling start, forks and creates the backend

p.start()
time.sleep(1)
p.ping(parameter="gotcha!")

Tell the backend to stop and exit

p.stop()

We get this output:

MyProcess.my-process - INFO - ping: callin backend to do some stuff
MyProcess.my-process - INFO - c__ping: got parameter gotcha! - will do some
MyProcess.my-process - INFO - c__ping: ..did that!
MyProcess.my-process - INFO - ping: backend seems to be ready

3. Send and receive message

[download code]

Next, we’re going to call a backend method and then get some results from the backend in the form of a message.

import time
import logging
from valkka.multiprocess import MessageProcess, MessageObject

We are going to use quite some the select module:

import select

so you might want to read this tutorial

class MyProcess(MessageProcess):

    def __init__(self, name):
        super().__init__(name)

We’re sending a message, this time from backend to frontend

    def c__ping(self, parameter=None):
        self.logger.info("c__ping: got parameter %s - will do some", parameter)
        time.sleep(1.0) # do something time consuming..
        self.logger.info("c__ping: ..did that!")
        self.send_out__(MessageObject(
            "pong", result="pudding"
        ))

    def ping(self, parameter=None):
        self.logger.info("ping: sending ping to backend")
        self.sendMessageToBack(MessageObject(
            "ping",
            parameter=parameter
        ))

p = MyProcess(name="my-process")
p.formatLogger()

Get the pipe (multiprocessing.Pipe) for listening messages from MyProcess p. Remember that you should use use pipe communication only for a small amount of data: typically just short messages and maybe some tiny (numpy) arrays if you have to.

pipe = p.getPipe()

Calling start, forks and creates the backend

p.start()
time.sleep(1)
p.ping(parameter="gotcha!")
print("main: waiting reply from multiprocessing backend")

Use select to wait for the message

select.select([pipe], [], [])
msg = pipe.recv()
print("main: got", msg(), "with some", msg["result"])

Tell the backend to stop and exit

p.stop()

We get this output:

MyProcess.my-process - INFO - ping: sending ping to backend
main: waiting reply from multiprocessing backend
MyProcess.my-process - INFO - c__ping: got parameter gotcha! - will do some
MyProcess.my-process - INFO - c__ping: ..did that!
main: got pong with some pudding

4. Using shared memory and forked resources

[download code]

NOTE: for the following examples, you need numpy installed.

Shared memory is a powerfull tool in multiprocessing: it allows you to intercommunicate large blocks of data between the main python process (aka frontend) and the forked multiprocess (aka backend).

Shared memory is used as follows:

Shared memory blocks are created first in the frontend and after that again in the backend, using the same unique name.

You can think of the shared memory in the frontend as “server-side” and in the backend as “client-side”.

Shared memory is just memory, i.e. raw bytes, and is typically wrapped into a numpy array.

In the other examples you learned how to subclass MessageProcess. Here we also define the (virtual) methods preRun__ and postRun__ from the MessageProcess class: both of them are executed after the fork, i.e. in the backend, but outside MessageProcess s main execution loops: they are “startup” and “shutdown” functions in the forked process.

import time
import logging
import numpy as np
from valkka.multiprocess import MessageProcess, MessageObject, \
    EventGroup, SyncIndex
from multiprocessing import shared_memory

class MyProcess(MessageProcess):

Create “server-side” shared memory first in the frontend with the unique name my-shm-1

def __init__(self, name):
    super().__init__(name)
    self.eg = EventGroup(10)
    self.shmem_name = "my-shm-1"
    # create a dummy model array;
    self.model = np.zeros((100,100), dtype=float)
    # create frontend-side shared memory array:
    self.shmem = shared_memory.SharedMemory(
        name=self.shmem_name, create=True, size=self.model.nbytes
    )
    self.array = np.ndarray(
        self.model.shape, dtype=self.model.dtype, buffer=self.shmem.buf
    )
    self.array[:,:] = 1.0

Close the “server-side” shared memory, for example, at garbage collection of MyProcess

def __del__(self):
    self.shmem.close()

Backend methods are: preRun__, postRun__ and c__readWrite.

preRun__ is executed in the forked process (backend) before the main execution loop kicks in: create “client-side” shared memory in the backend with the correct unique name my-shm-1.

def preRun__(self):
    self.logger.info("preRun__ : ")
    # create backend-side shared memory array:
    self.shmem__ = shared_memory.SharedMemory(
        name=self.shmem_name, create=False, size=self.model.nbytes
    )
    self.array__ = np.ndarray(
        self.model.shape, dtype=self.model.dtype, buffer=self.shmem__.buf
    )

postRun__ is executed in the forked process (backend) just before the process exists and shuts down: close “client-side” shared memory:

def postRun__(self):
    self.logger.info("postRun__ : ")
    # release backend-side shared memory array:
    self.shmem__.unlink()

def c__readWrite(self, sync_index: int =None):
    self.logger.info("c__readWrite: will change array values from %s to %s",
        self.array__[0,0], 3.0)
    self.array__[:,:] = 3.0
    self.logger.info("c__readWrite: ..did that!")
    self.eg.set(sync_index) # tell frontend that it's done

Frontend methods: readWrite

    def readWrite(self):
        self.logger.info("readWrite : calling backend")
        with SyncIndex(self.eg) as i:
            self.sendMessageToBack(MessageObject(
                "readWrite",
                sync_index = i
            ))
        self.logger.info("readWrite: array values now %s", self.array[0,0])

p = MyProcess(name="my-process")
p.formatLogger()

Calling start, forks and creates the backend

p.start()
time.sleep(1)
p.readWrite()

Tell the backend to stop and exit

p.stop()

Resulting output:

MyProcess.my-process - INFO - preRun__ :
MyProcess.my-process - INFO - readWrite : callin backend
MyProcess.my-process - INFO - c__readWrite: will change array values from 1.0 to 3.0
MyProcess.my-process - INFO - c__readWrite: ..did that!
MyProcess.my-process - INFO - readWrite: array values now 3.0
MyProcess.my-process - INFO - postRun__ :

Some additional observations are in place:

Now that you learned how to use preRun__, that would be the typical place where you would import heavy libraries that utilize threading (remember: spawning threads should be done after fork).

That’s also the place where you would instantiate your heavy neural net instances: no need to fork all that stuff (as it might not like forking at all). In general, all libraries and object instances that might be sensitive to forking.

Similary, shutting down / releasing resources should be done in postRun__.

5. Syncing server/client resources

[download code]

In the previous example we played around with shared memory. As discussed, there is “server-side” and “client-side” shared memory.

In multiprocessing programming, the same goes for any resource: i.e. the main python process (again, “frontend”) instantiates some resource first and the forked process (“backend”) instantiates it’s “client-side” / mirror image of the same resource.

After this, the resource (in this case shared memory) can be used to interchange data between front- and backend.

Let’s demonstrate all this by instantiating shared memory “on-demand”:

import time, math
import logging
import numpy as np
from valkka.multiprocess import MessageProcess, MessageObject, \
    EventGroup, SyncIndex
from multiprocessing import shared_memory

class MyProcess(MessageProcess):

Create “server-side” shared memory first in the frontend with the unique name my-shm-1

def __init__(self, name):
    super().__init__(name)
    self.eg = EventGroup(10)

Backend methods: create, modify and close shared memory at the forked process

def c__create(self, name: str = None,
        dtype: np.dtype = None,
        shape: tuple = None,
        sync_index: int = None):
    # create backend-side shared memory array "on-demand"
    n_bytes = len(np.array(1, dtype=dtype).tobytes())*math.prod(shape)
    self.shmem_name = name
    self.shmem__ = shared_memory.SharedMemory(
        name=self.shmem_name, create=False, size=n_bytes
    )
    self.array__ = np.ndarray(
        shape, dtype=dtype, buffer=self.shmem__.buf
    )
    self.eg.set(sync_index)

def c__close(self, sync_index = None):
    self.shmem__.unlink()
    self.logger.info("c__close: client side shmem unlinked")
    self.eg.set(sync_index)

def c__readWrite(self, sync_index: int =None):
    self.logger.info("c__readWrite: will change array values from %s to %s",
        self.array__[0,0], 3.0)
    self.array__[:,:] = 3.0
    self.logger.info("c__readWrite: ..did that!")
    self.eg.set(sync_index) # tell frontend that it's done

Frontend methods: create and close shared memory in the main python process and tell backend to do the same.

def create(self, shape: tuple = (100, 100)):
    name = "my-shm-1"
    dtype = np.float16
    n_bytes = len(np.array(1, dtype=dtype).tobytes())*math.prod(shape)
    self.shmem = shared_memory.SharedMemory(
        name=name, create=True, size=n_bytes
    )
    self.array = np.ndarray(
        shape, dtype=dtype, buffer=self.shmem.buf
    )
    self.array[:,:] = 0.0
    # send data to backend so that it can create the client side shmem
    with SyncIndex(self.eg) as i:
        self.sendMessageToBack(MessageObject(
            "create",
            name = name,
            dtype = dtype,
            shape = shape,
            sync_index = i
        ))

def getArray(self):
    return self.array

def readWrite(self):
    with SyncIndex(self.eg) as i:
        self.sendMessageToBack(MessageObject(
            "readWrite",
            sync_index = i
        ))
    self.logger.info("readWrite: array values now %s", self.array[0,0])

def close(self):
    with SyncIndex(self.eg) as i:
        self.sendMessageToBack(MessageObject(
            "close",
            sync_index = i
        ))
    self.logger.info("close: closing server side shmem")
    self.shmem.close()

Run the process:

p = MyProcess(name="my-process")
p.formatLogger()
p.start()
time.sleep(1)
p.create(shape=(200,200))
p.readWrite()
print("main: array values now", p.getArray()[0,0])
p.close()
time.sleep(0.1)
p.stop()
print("main: bye!")

Resulting output:

MyProcess.my-process - INFO - c__readWrite: will change array values from 0.0 to 3.0
MyProcess.my-process - INFO - c__readWrite: ..did that!
MyProcess.my-process - INFO - readWrite: array values now 3.0
main: array values now 3.0
MyProcess.my-process - INFO - c__close: client side shmem unlinked
MyProcess.my-process - INFO - close: closing server side shmem
main: bye!

Part II: Organizing workers

Time to ramp things up and to fire hundreds or even thousands of multiprocessing workers!

How do you manage and organize something like that? You are about to find out just that.

A word of warning: if you’re trying to “optimize” and skip the Part I of the tutorial, don’t do that, as you’ll miss important concepts and will not understand what’s going on (mainly, the concept of “front” and “backend” methods).

6. Planning it

Our goal is to create a main process and N subprocess workers that do some (heavy) calculation and return results to the main process via a sharedmem numpy array.

When having a main process and workers, we have a hierarchy and it is always a good idea to write it down as a hierarchical list, so we will do just that:

Manager (main process)
    - IN:
        - MessageObject (from SUB WorkerProcess-N)
    - SUB:
        WorkerProcess-1
            - UP:
                - MessageObject
                - method: getArray()
            - IN:
                - method doWork()
        WorkerProcess-2
            - UP:
                - MessageObject
                - method: getArray()
            - IN:
                - method doWork()
        WorkerProcess-3
            ...
        WorkerProcess-4
            ...
    ...

In this notation, there is a Manager (the main process) that get’s input messages in the form of MessageObject s from the individual workers which are WorkerProcess-1, WorkerProcess-2, etc.

Each WorkerProcess-N is commanded in by the upper-level object (Manager) by the method doWork that tells the WorkerProcess to launch a calculation.

Information is passed from the lower-level objects (WorkerProcess) to *up*per level (Manager) with a MessageObject and by the method getArray().

You seldomly need to create any deeper nested hierarchies. If you have to, then you’re probably doing something wrong.

7. Implementation

[download code]

import time, sys
import numpy as np
import logging
from random import randint
from multiprocessing import shared_memory
from valkka.multiprocess import MessageProcess, MessageObject, \
    MainContext, safe_select

First define the worker process that will do some calculation on a shared numpy array.

This is similar to examples 3 and 4:

class WorkerProcess(MessageProcess):

    def __init__(self, name):
        super().__init__(name)
        self.shmem_name = "shm-"+self.name
        # create a dummy model array;
        self.model = np.zeros((100,100), dtype=float)
        # create frontend-side shared memory array:
        self.shmem = shared_memory.SharedMemory(
            name=self.shmem_name, create=True, size=self.model.nbytes
        )
        self.array = np.ndarray(
            self.model.shape, dtype=self.model.dtype, buffer=self.shmem.buf
        )
        self.array[:,:] = 0.0

    def __del__(self):
        self.shmem.close()

    # Backend methods
    def preRun__(self):
        self.logger.info("preRun__ : ")
        # create backend-side shared memory array:
        self.shmem__ = shared_memory.SharedMemory(
            name=self.shmem_name, create=False, size=self.model.nbytes
        )
        self.array__ = np.ndarray(
            self.model.shape, dtype=self.model.dtype, buffer=self.shmem__.buf
        )

    def postRun__(self):
        self.logger.info("postRun__ : ")
        # release backend-side shared memory array:
        self.shmem__.unlink()

    def c__doWork(self):
        self.logger.info("c__doWork: will change array values from %s to %s",
            self.array__[0,0], 3.0)
        r = randint(1,5)
        self.array__[:,:] = r
        time.sleep(r) # wait random secs
        self.logger.info("c__doWork: ..did that!")
        # send message when ready
        self.send_out__(MessageObject(
            "ready"
        ))

    # Frontend methods
    def doWork(self):
        self.logger.info("readWrite : calling backend")
        self.sendMessageToBack(MessageObject(
            "doWork"
        ))

    def getArray(self):
        return self.array

Next we create a convenience class that helps us in managing the main process systematically. It is subclassed from MainContext, which is pretty simple base class: You only need to (re)define __init__(), startProcesses(), startThreads(), close() and __call__ methods.

super().__init__() inits the logger and calls first startProcesses and then startThreads (i.e. forks before thread):

class Manager(MainContext):

    def __init__(self, n_workers = 10):
        self.n_workers = n_workers
        self.timeout = 2.0
        super().__init__()

startProcesses (mandatory) creates and caches processes.

Here we cache processes into the list self.cache and also into the dictionary self.process_by_pipe, where key = multiprocessing.Pipe and value = the running multiprocess. Reason for this will become obvious in method __call__ (see below).

All pipes we’re going to listen, are also cached into self.read_pipes. That list should also always include self.aux_pipe_read (which is created in the base class super().__init__()):

def startProcesses(self):
    self.logger.debug("startProcesses:")
    self.cache = [] # all multiprocesses
    self.avail_cache = [] # available multiprocesses for work
    self.process_by_pipe = {} # ditto
    self.read_pipes = [self.aux_pipe_read] # pipes / file descriptors to listen to
    for i in range(self.n_workers):
        p = WorkerProcess(name="worker-"+str(i))
        p.ignoreSIGINT()
        pipe = p.getPipe()
        self.cache.append(p)
        self.avail_cache.append(p)
        self.process_by_pipe[pipe] = p
        self.read_pipes.append(pipe)
    self.logger.info("startProcesses: starting multiprocesses")
    # forks!
    for p in self.cache:
        p.start()
    self.logger.info("startProcesses: all multiprocesses running")

One more thing worth noticing: each multiprocesses ignoreSIGINT() method is called: this makes them to ignore SIGINT as that signal should be caught only by the main process, and processed accordingly (i.e. when receiving SIGINT main process should shutdown the multiprocesses in a clean way).

startThreads (mandatory), creates and starts threads (if any):

def startThreads(self):
    self.logger.debug("startThreads:")
    pass # no threads in this app

close (mandatory) stops all multiprocesses and threads in parallel:

def close(self):
    self.logger.debug("close: stopping processes")
    for p in self.cache:
        p.requestStop()
    for p in self.cache:
        p.waitStop()
    self.logger.debug("close: processes stopped")
    self.closed = True

In __call__ (mandatory) we have the main process startup and loop.

First, we tell three multiprocesses from the cache to do some work and then start the execution loop:

def __call__(self):
    p1 = self.avail_cache.pop(0)
    p2 = self.avail_cache.pop(0)
    p3 = self.avail_cache.pop(0)
    p1.doWork()
    p2.doWork()
    p3.doWork()
    self.loop = True
    self.logger.debug("starting main loop")
    while self.loop:
        try:
            reads, writes, others = safe_select(
                self.read_pipes, [], [], timeout=self.timeout)
        except KeyboardInterrupt:
            self.logger.warning("SIGTERM or CTRL-C: will exit asap")
            self.loop = False
            continue
        if len(reads) < 1: # reading operation timeout
            self.logger.debug("still alive")
            continue
        for pipe in reads:
            if pipe is self.aux_pipe_read:
                self.logger.critical("debug mode exit")
                self.loop = False
                continue
            else:
                try:
                    p = self.process_by_pipe[pipe]
                except KeyError:
                    self.logger.critical("unknown pipe %s", p)
                    continue
                obj = pipe.recv()
                self.logger.debug("got message %s", obj)
                self.handleMessage__(p, obj)
    self.close()
    self.logger.debug("bye!")

So, we’re listening simultaneously all running multiprocesses. When any of them sends a message, safe_select is triggered and the message is processed by handleMessage__.

In addition to just the multiprocesses, as in this example, we could add in that select call the listening of any file descriptor, say, tcp sockets, unix named pipe and the like so that they would interact with our program and the multiprocesses.

File descriptors can also be instantiated in C++ code (say, like EventFd in libValkka) and listened here at the Python side. This creates nice opportunities for interfacing i/o devices into this multiprocessing scheme.

Messages coming from the worker processes are handled like this:

def handleMessage__(self, p: WorkerProcess, msg: MessageObject):
    # read the sharedmem:
    vals=p.getArray()[0, 0:5]
    self.logger.info("handleMessage__ : subprocess %s returned %s with values %s",
        p, msg(), vals)
    # return the process back to cache
    self.avail_cache.append(p)
    # take a new process from the cache
    try:
        p_ = self.avail_cache.pop(0)
    except IndexError:
        self.logger.fatal("handleMessage__ : no more processes available!")
    else:
        # tell it to do some work
        p_.doWork()

So, in this particular example, once a worker has done it’s calculation, another worker is launched to do another one.

If you have many different kinds of multiprocesses running under your main process, it’s a good idea to create dedicated MessageObject classes and handleMessage__ methods for each one of them to keep the code readable.

Behold: this is your apps main entry point.

When running interactively, just press CTRL-C to exit.

def main():
    Manager.formatLogger(logging.DEBUG) # of course in a real app, in some other way
    WorkerProcess.formatLogger(logging.DEBUG) # of course in a real app, in some other way
    manager = Manager(n_workers=5)
    manager()

For testing purposes, you can run the manager in background thread, while in the main code, you can interact with it in any way you want, say, create files it sees, or simulate the environment where it is running somehow.

Here the only “interaction” we do, is just to sleep for 10 secs before terminating it.

def test():
    Manager.formatLogger(logging.DEBUG)
    WorkerProcess.formatLogger(logging.DEBUG)
    manager = Manager(n_workers=5)
    manager.runAsThread()
    print("running manager for 10 secs")
    time.sleep(10)
    # .. or alternative, interact with the manager somehow
    # as it runs in the background thread
    print("stopping manager")
    manager.stopThread()
    print("bye!")

Please do run this file interactively either with command line arg “main” or “test” to run main() or test() above:

if __name__ == "__main__":
    if len(sys.argv) < 2:
        print("please give 'main' or 'test'")
    elif sys.argv[1] == "main":
        main()
    elif sys.argv[1] == "test":
        test()
    else:
        print("please give 'main' or 'test'")

Part III: Miscellaneous

Development cleanup

When you are developing your multiprocessing program, it might and will crash. Many times, this has the practical effect of leaving “dangling” multiprocesses running in your system. So remember to kill them manually with

killall -9 python3

Or whatever name they might use.

Forgetting this, will have all kinds of weird effects, as the “zombie” processes are still running in the background and continue meddling with the same shmem arrays and files etc. while you start your program again.

If you’re using sharedmem, sometimes you might need to clean up manually memory-mapped files from:

/dev/shm/

Process debug

Use the setproctitle python module to name your python multiprocesses. This way you can find them easily using standard linux monitoring tools, such as htop and smem.

Setting the name of the process should, of course, happen after the multiprocessing fork (i.e. in preRun__).

Install smem and htop:

sudo apt-get install smem htop

In htop, remember to go to setup => display options and enable “Hide userland process threads” to make the output more readable.

Streaming data

You learned how to exchange numpy arrays between processes using shared memory.

If you’re wondering how to exchange streaming data in the same manner, then the correct way to do that are synchronized sharedmem ringbuffers. You might want to google that, or use the particular implementation done in libValkka.

Interfacing with C++

We also mentioned the possibility to create data at C++ side (say from specific industrial equipment, etc.) and then passing it to the python side. In such case, you would simply add one more file descriptor (which is managed & set’ted at C++ side) in the list that select listens to in your manager implementation.

For C++ / python numpy interfacing, you might want to take a look into skeleton. A more serious example will be provided as well (TBA).

Custom MessageProcess

On some occasions, when you need the MessageProcess class to do something more complex than just rerouting frontend to backend methods, say, you want it to create a shmem server and push streaming data to another process, you need to subclass more methods than was considered in the tutorial.

Typically, you need to subclass the run, readPipes__, etc. methods. This is fairly easy, since the whole MessageProcess class is very slim. See in here.

Asyncio

If your multiprocess needs to run asyncio, that is ok. Please take a look at the AsyncBackMessageProcess class.

For full-blown asyncio programs, using the same kind of hierarchical scheme as considered in this module’s tutorial for master/worker processes, you might want to take a look at TaskThread.