Part I: Managing a multiprocess
In this part we create your first multiprocess and learn how to intercommunicate and share resources with it.
1. Send message to a forked process
In this example, we call a method in the backend (forked multiprocess) seamlessly by simply calling a method in the frontend (python main process context).
This is achieved with:
MessageProcess
’s backend has an internal execution loop that listen’s continuously
messages from the frontend.
When a MessageObject
is sent with the command ping
, backend looks automagically
for the method c__ping
in the backend and executes it: execution of c__ping
happens in the multiprocessing backend, i.e. in the forked process
import time
from valkka.multiprocess import MessageProcess, MessageObject
class MyProcess(MessageProcess):
def c__ping(self, parameter=None):
print("Regards from other side of the fork! Got parameter", parameter)
def ping(self, parameter=None):
self.sendMessageToBack(MessageObject(
"ping", parameter=parameter))
p = MyProcess(name="my-process")
Calling start
, forks and creates the backend
p.start()
Let multiprocessing backend run on it’s own for one second
time.sleep(1)
Call frontend method with a parameter: it sends seamlessly a message to the backend
which executes c__ping
in the backend:
p.ping(parameter="gotcha!")
Again, let multiprocessing backend run on it’s own a while
time.sleep(1)
Finally, tell the backend to stop and exit
p.stop()
2. Send message and synchronize
So, we can call backend methods, i.e. tell the forked process to do something by simply calling a frontend method.
Many times we need to synchronize between the main python process and the forked multiprocess: i.e., the main process needs to wait that the forked process has completed doing something.
import time
import logging
from valkka.multiprocess import MessageProcess, MessageObject, \
EventGroup, SyncIndex
class MyProcess(MessageProcess):
def __init__(self, name):
super().__init__(name)
self.eg = EventGroup(10)
here we have created a group of multiprocessing Event
objects
(link)
into the self.eg
member.
As the Event
s are created before fork (i.e. calling start
), they are visible
both in the multiprocessing back- and frontend.
def c__ping(self, parameter=None, sync_index: int =None):
self.logger.info("c__ping: got parameter %s - will do some", parameter)
time.sleep(1.0) # do something time consuming..
self.logger.info("c__ping: ..did that!")
self.eg.set(sync_index) # tell frontend that it's done
An index number is communicated to the multiprocessing backend,
so that the forked process knows which Event
it needs to set
in order to sync with the frontend:
def ping(self, parameter=None):
self.logger.info("ping: callin backend to do some stuff")
with SyncIndex(self.eg) as i:
self.sendMessageToBack(MessageObject(
"ping",
parameter=parameter,
sync_index = i
))
self.logger.info("ping: backend seems to be ready")
p = MyProcess(name="my-process")
A helper method to quick-format the logger for MyProcess
:
p.formatLogger()
Calling start
, forks and creates the backend
p.start()
time.sleep(1)
p.ping(parameter="gotcha!")
Tell the backend to stop and exit
p.stop()
We get this output:
MyProcess.my-process - INFO - ping: callin backend to do some stuff
MyProcess.my-process - INFO - c__ping: got parameter gotcha! - will do some
MyProcess.my-process - INFO - c__ping: ..did that!
MyProcess.my-process - INFO - ping: backend seems to be ready
3. Send and receive message
Next, we’re going to call a backend method and then get some results from the backend in the form of a message.
import time
import logging
from valkka.multiprocess import MessageProcess, MessageObject
We are going to use quite some the select module:
import select
so you might want to read this tutorial
class MyProcess(MessageProcess):
def __init__(self, name):
super().__init__(name)
We’re sending a message, this time from backend to frontend
def c__ping(self, parameter=None):
self.logger.info("c__ping: got parameter %s - will do some", parameter)
time.sleep(1.0) # do something time consuming..
self.logger.info("c__ping: ..did that!")
self.send_out__(MessageObject(
"pong", result="pudding"
))
def ping(self, parameter=None):
self.logger.info("ping: sending ping to backend")
self.sendMessageToBack(MessageObject(
"ping",
parameter=parameter
))
p = MyProcess(name="my-process")
p.formatLogger()
Get the pipe (multiprocessing.Pipe
) for listening messages from MyProcess
p
.
Remember that you should use use pipe communication only for a small amount of data:
typically just short messages and maybe some tiny (numpy) arrays if you have to.
pipe = p.getPipe()
Calling start
, forks and creates the backend
p.start()
time.sleep(1)
p.ping(parameter="gotcha!")
print("main: waiting reply from multiprocessing backend")
Use select to wait for the message
select.select([pipe], [], [])
msg = pipe.recv()
print("main: got", msg(), "with some", msg["result"])
Tell the backend to stop and exit
p.stop()
We get this output:
MyProcess.my-process - INFO - ping: sending ping to backend
main: waiting reply from multiprocessing backend
MyProcess.my-process - INFO - c__ping: got parameter gotcha! - will do some
MyProcess.my-process - INFO - c__ping: ..did that!
main: got pong with some pudding
5. Syncing server/client resources
In the previous example we played around with shared memory. As discussed, there is “server-side” and “client-side” shared memory.
In multiprocessing programming, the same goes for any resource: i.e. the main python process (again, “frontend”) instantiates some resource first and the forked process (“backend”) instantiates it’s “client-side” / mirror image of the same resource.
After this, the resource (in this case shared memory) can be used to interchange data between front- and backend.
Let’s demonstrate all this by instantiating shared memory “on-demand”:
import time, math
import logging
import numpy as np
from valkka.multiprocess import MessageProcess, MessageObject, \
EventGroup, SyncIndex
from multiprocessing import shared_memory
class MyProcess(MessageProcess):
Create “server-side” shared memory first in the frontend with the unique name my-shm-1
def __init__(self, name):
super().__init__(name)
self.eg = EventGroup(10)
Backend methods: create, modify and close shared memory at the forked process
def c__create(self, name: str = None,
dtype: np.dtype = None,
shape: tuple = None,
sync_index: int = None):
# create backend-side shared memory array "on-demand"
n_bytes = len(np.array(1, dtype=dtype).tobytes())*math.prod(shape)
self.shmem_name = name
self.shmem__ = shared_memory.SharedMemory(
name=self.shmem_name, create=False, size=n_bytes
)
self.array__ = np.ndarray(
shape, dtype=dtype, buffer=self.shmem__.buf
)
self.eg.set(sync_index)
def c__close(self, sync_index = None):
self.shmem__.unlink()
self.logger.info("c__close: client side shmem unlinked")
self.eg.set(sync_index)
def c__readWrite(self, sync_index: int =None):
self.logger.info("c__readWrite: will change array values from %s to %s",
self.array__[0,0], 3.0)
self.array__[:,:] = 3.0
self.logger.info("c__readWrite: ..did that!")
self.eg.set(sync_index) # tell frontend that it's done
Frontend methods: create and close shared memory in the main python process and tell backend to do the same.
def create(self, shape: tuple = (100, 100)):
name = "my-shm-1"
dtype = np.float16
n_bytes = len(np.array(1, dtype=dtype).tobytes())*math.prod(shape)
self.shmem = shared_memory.SharedMemory(
name=name, create=True, size=n_bytes
)
self.array = np.ndarray(
shape, dtype=dtype, buffer=self.shmem.buf
)
self.array[:,:] = 0.0
# send data to backend so that it can create the client side shmem
with SyncIndex(self.eg) as i:
self.sendMessageToBack(MessageObject(
"create",
name = name,
dtype = dtype,
shape = shape,
sync_index = i
))
def getArray(self):
return self.array
def readWrite(self):
with SyncIndex(self.eg) as i:
self.sendMessageToBack(MessageObject(
"readWrite",
sync_index = i
))
self.logger.info("readWrite: array values now %s", self.array[0,0])
def close(self):
with SyncIndex(self.eg) as i:
self.sendMessageToBack(MessageObject(
"close",
sync_index = i
))
self.logger.info("close: closing server side shmem")
self.shmem.close()
Run the process:
p = MyProcess(name="my-process")
p.formatLogger()
p.start()
time.sleep(1)
p.create(shape=(200,200))
p.readWrite()
print("main: array values now", p.getArray()[0,0])
p.close()
time.sleep(0.1)
p.stop()
print("main: bye!")
Resulting output:
MyProcess.my-process - INFO - c__readWrite: will change array values from 0.0 to 3.0
MyProcess.my-process - INFO - c__readWrite: ..did that!
MyProcess.my-process - INFO - readWrite: array values now 3.0
main: array values now 3.0
MyProcess.my-process - INFO - c__close: client side shmem unlinked
MyProcess.my-process - INFO - close: closing server side shmem
main: bye!
Part II: Organizing workers
Time to ramp things up and to fire hundreds or even thousands of multiprocessing workers!
How do you manage and organize something like that? You are about to find out just that.
A word of warning: if you’re trying to “optimize” and skip the Part I of the tutorial, don’t do that, as you’ll miss important concepts and will not understand what’s going on (mainly, the concept of “front” and “backend” methods).
6. Planning it
Our goal is to create a main process and N
subprocess workers that do some (heavy)
calculation and return results to the main process via a sharedmem numpy array.
When having a main process and workers, we have a hierarchy and it is always a good idea to write it down as a hierarchical list, so we will do just that:
Manager (main process)
- IN:
- MessageObject (from SUB WorkerProcess-N)
- SUB:
WorkerProcess-1
- UP:
- MessageObject
- method: getArray()
- IN:
- method doWork()
WorkerProcess-2
- UP:
- MessageObject
- method: getArray()
- IN:
- method doWork()
WorkerProcess-3
...
WorkerProcess-4
...
...
In this notation, there is a Manager
(the main process) that get’s input
messages in the form of MessageObject
s from the individual workers which
are WorkerProcess-1
, WorkerProcess-2
, etc.
Each WorkerProcess-N
is commanded in by the upper-level object (Manager
)
by the method doWork
that tells the WorkerProcess
to launch a calculation.
Information is passed from the lower-level objects (WorkerProcess
) to *up*per level
(Manager
) with a MessageObject
and by the method getArray()
.
You seldomly need to create any deeper nested hierarchies. If you have to, then you’re probably doing something wrong.
7. Implementation
import time, sys
import numpy as np
import logging
from random import randint
from multiprocessing import shared_memory
from valkka.multiprocess import MessageProcess, MessageObject, \
MainContext, safe_select
First define the worker process that will do some calculation on a shared numpy array.
This is similar to examples 3 and 4:
class WorkerProcess(MessageProcess):
def __init__(self, name):
super().__init__(name)
self.shmem_name = "shm-"+self.name
# create a dummy model array;
self.model = np.zeros((100,100), dtype=float)
# create frontend-side shared memory array:
self.shmem = shared_memory.SharedMemory(
name=self.shmem_name, create=True, size=self.model.nbytes
)
self.array = np.ndarray(
self.model.shape, dtype=self.model.dtype, buffer=self.shmem.buf
)
self.array[:,:] = 0.0
def __del__(self):
self.shmem.close()
# Backend methods
def preRun__(self):
self.logger.info("preRun__ : ")
# create backend-side shared memory array:
self.shmem__ = shared_memory.SharedMemory(
name=self.shmem_name, create=False, size=self.model.nbytes
)
self.array__ = np.ndarray(
self.model.shape, dtype=self.model.dtype, buffer=self.shmem__.buf
)
def postRun__(self):
self.logger.info("postRun__ : ")
# release backend-side shared memory array:
self.shmem__.unlink()
def c__doWork(self):
self.logger.info("c__doWork: will change array values from %s to %s",
self.array__[0,0], 3.0)
r = randint(1,5)
self.array__[:,:] = r
time.sleep(r) # wait random secs
self.logger.info("c__doWork: ..did that!")
# send message when ready
self.send_out__(MessageObject(
"ready"
))
# Frontend methods
def doWork(self):
self.logger.info("readWrite : calling backend")
self.sendMessageToBack(MessageObject(
"doWork"
))
def getArray(self):
return self.array
Next we create a convenience class that helps us in managing the main process
systematically. It is subclassed from MainContext
, which is pretty simple
base class: You only need to (re)define __init__()
, startProcesses()
,
startThreads()
, close()
and __call__
methods.
super().__init__()
inits the logger and calls first startProcesses
and then startThreads
(i.e. forks before thread):
class Manager(MainContext):
def __init__(self, n_workers = 10):
self.n_workers = n_workers
self.timeout = 2.0
super().__init__()
startProcesses
(mandatory) creates and caches processes.
Here we cache processes into the list
self.cache
and also into the
dictionary self.process_by_pipe
, where key = multiprocessing.Pipe
and
value = the running multiprocess. Reason for this will become obvious
in method __call__
(see below).
All pipes we’re going to listen, are also cached into self.read_pipes
. That
list should also always include self.aux_pipe_read
(which is created in
the base class super().__init__()
):
def startProcesses(self):
self.logger.debug("startProcesses:")
self.cache = [] # all multiprocesses
self.avail_cache = [] # available multiprocesses for work
self.process_by_pipe = {} # ditto
self.read_pipes = [self.aux_pipe_read] # pipes / file descriptors to listen to
for i in range(self.n_workers):
p = WorkerProcess(name="worker-"+str(i))
p.ignoreSIGINT()
pipe = p.getPipe()
self.cache.append(p)
self.avail_cache.append(p)
self.process_by_pipe[pipe] = p
self.read_pipes.append(pipe)
self.logger.info("startProcesses: starting multiprocesses")
# forks!
for p in self.cache:
p.start()
self.logger.info("startProcesses: all multiprocesses running")
One more thing worth noticing: each multiprocesses ignoreSIGINT()
method
is called: this makes them to ignore SIGINT
as that signal should be
caught only by the main process, and processed accordingly (i.e. when
receiving SIGINT
main process should shutdown the multiprocesses
in a clean way).
startThreads
(mandatory), creates and starts threads (if any):
def startThreads(self):
self.logger.debug("startThreads:")
pass # no threads in this app
close
(mandatory) stops all multiprocesses and threads in parallel:
def close(self):
self.logger.debug("close: stopping processes")
for p in self.cache:
p.requestStop()
for p in self.cache:
p.waitStop()
self.logger.debug("close: processes stopped")
self.closed = True
In __call__
(mandatory) we have the main process startup and loop.
First, we tell three multiprocesses from the cache to do some work and then start the execution loop:
def __call__(self):
p1 = self.avail_cache.pop(0)
p2 = self.avail_cache.pop(0)
p3 = self.avail_cache.pop(0)
p1.doWork()
p2.doWork()
p3.doWork()
self.loop = True
self.logger.debug("starting main loop")
while self.loop:
try:
reads, writes, others = safe_select(
self.read_pipes, [], [], timeout=self.timeout)
except KeyboardInterrupt:
self.logger.warning("SIGTERM or CTRL-C: will exit asap")
self.loop = False
continue
if len(reads) < 1: # reading operation timeout
self.logger.debug("still alive")
continue
for pipe in reads:
if pipe is self.aux_pipe_read:
self.logger.critical("debug mode exit")
self.loop = False
continue
else:
try:
p = self.process_by_pipe[pipe]
except KeyError:
self.logger.critical("unknown pipe %s", p)
continue
obj = pipe.recv()
self.logger.debug("got message %s", obj)
self.handleMessage__(p, obj)
self.close()
self.logger.debug("bye!")
So, we’re listening simultaneously all running multiprocesses. When
any of them sends a message, safe_select
is triggered and the message
is processed by handleMessage__
.
In addition to just the multiprocesses, as in this example, we could add in that select call the listening of any file descriptor, say, tcp sockets, unix named pipe and the like so that they would interact with our program and the multiprocesses.
File descriptors can also be instantiated in C++ code (say, like EventFd in libValkka) and listened here at the Python side. This creates nice opportunities for interfacing i/o devices into this multiprocessing scheme.
Messages coming from the worker processes are handled like this:
def handleMessage__(self, p: WorkerProcess, msg: MessageObject):
# read the sharedmem:
vals=p.getArray()[0, 0:5]
self.logger.info("handleMessage__ : subprocess %s returned %s with values %s",
p, msg(), vals)
# return the process back to cache
self.avail_cache.append(p)
# take a new process from the cache
try:
p_ = self.avail_cache.pop(0)
except IndexError:
self.logger.fatal("handleMessage__ : no more processes available!")
else:
# tell it to do some work
p_.doWork()
So, in this particular example, once a worker has done it’s calculation, another worker is launched to do another one.
If you have many different kinds of multiprocesses running under your main
process, it’s a good idea to create dedicated MessageObject
classes
and handleMessage__
methods for each one of them to keep the code readable.
Behold: this is your apps main entry point.
When running interactively, just press CTRL-C to exit.
def main():
Manager.formatLogger(logging.DEBUG) # of course in a real app, in some other way
WorkerProcess.formatLogger(logging.DEBUG) # of course in a real app, in some other way
manager = Manager(n_workers=5)
manager()
For testing purposes, you can run the manager in background thread, while in the main code, you can interact with it in any way you want, say, create files it sees, or simulate the environment where it is running somehow.
Here the only “interaction” we do, is just to sleep for 10 secs before terminating it.
def test():
Manager.formatLogger(logging.DEBUG)
WorkerProcess.formatLogger(logging.DEBUG)
manager = Manager(n_workers=5)
manager.runAsThread()
print("running manager for 10 secs")
time.sleep(10)
# .. or alternative, interact with the manager somehow
# as it runs in the background thread
print("stopping manager")
manager.stopThread()
print("bye!")
Please do run this file interactively either with command line arg
“main” or “test” to run main()
or test()
above:
if __name__ == "__main__":
if len(sys.argv) < 2:
print("please give 'main' or 'test'")
elif sys.argv[1] == "main":
main()
elif sys.argv[1] == "test":
test()
else:
print("please give 'main' or 'test'")
Part III: Miscellaneous
Development cleanup
When you are developing your multiprocessing program, it might and will crash. Many times, this has the practical effect of leaving “dangling” multiprocesses running in your system. So remember to kill them manually with
killall -9 python3
Or whatever name they might use.
Forgetting this, will have all kinds of weird effects, as the “zombie” processes are still running in the background and continue meddling with the same shmem arrays and files etc. while you start your program again.
If you’re using sharedmem, sometimes you might need to clean up manually memory-mapped files from:
/dev/shm/
Process debug
Use the setproctitle python module to name your python multiprocesses. This way you can find them easily using standard linux monitoring tools, such as htop and smem.
Setting the name of the process should, of course, happen after the multiprocessing fork (i.e. in preRun__
).
Install smem and htop:
sudo apt-get install smem htop
In htop, remember to go to setup => display
options and enable “Hide userland process threads” to make
the output more readable.
Streaming data
You learned how to exchange numpy arrays between processes using shared memory.
If you’re wondering how to exchange streaming data in the same manner, then the correct way to do that are synchronized sharedmem ringbuffers. You might want to google that, or use the particular implementation done in libValkka.
Interfacing with C++
We also mentioned the possibility to create data at C++ side (say from specific industrial equipment, etc.) and then passing it to the python side. In such case, you would simply add one more file descriptor (which is managed & set’ted at C++ side) in the list that select listens to in your manager implementation.
For C++ / python numpy interfacing, you might want to take a look into skeleton. A more serious example will be provided as well (TBA).
Custom MessageProcess
On some occasions, when you need the MessageProcess
class to do something more
complex than just rerouting frontend to backend methods, say, you want it to create a shmem
server and push streaming data to another process, you need to subclass more methods than was
considered in the tutorial.
Typically, you need to subclass the run
, readPipes__
, etc. methods. This is fairly
easy, since the whole MessageProcess
class is very slim. See in
here.
Asyncio
If your multiprocess needs to run asyncio, that is ok. Please take a look at the AsyncBackMessageProcess class.
For full-blown asyncio programs, using the same kind of hierarchical scheme as considered in this module’s tutorial for master/worker processes, you might want to take a look at TaskThread.