Framing the problem
“Some people, when confronted with a problem, think ‘I know, I’ll use multithreading’. Nothhw tpe yawrve o oblems.” (Eiríkr Åsheim, 2012)
If multithreading is so problematic, though, how do we take advantage of systems with 8, 16, 32, and even thousands, of separate CPUs? When presented with large Data Science and HPC data sets, how to you use all of that lovely CPU power without getting in your own way? How do you tightly coordinate the use of resources and processing power needed by servers, monitors, and Internet of Things applications - where there can be a lot of waiting for I/O, many distinct but interrelated operations, and non-sharable resources - and you still have to crank through the preprocessing of data before you send it somewhere?
An excellent solution is to use multiprocessing, rather than multithreading, where work is split across separate processes, allowing the operating system to manage access to shared resources. This also gets around one of the notorious Achilles Heels in Python: the Global Interpreter Lock (aka theGIL). This lock constrains all Python code to run on only one processor at a time so that Python multi-threaded applications are largely only useful when there is a lot of waiting for IO. If your application is I/O bound and doesn’t require large blocks of CPU time, then, as of Python version 3.4, the asyncio system is the preferred approach.
Python ships with the multiprocessing module which provides a number of useful functions and classes to manage subprocesses and the communications between them. One interface the module provides is the
map() workflow, allowing one to take a large set of data that can be broken into chunks that are then mapped to a single function. This is extremely simple and efficient for doing conversion, analysis, or other situations where the same operation will be applied to each data. But
map() aren’t suited to situations that need to maintain state over time or, especially, situations where there are two or more different operations that need to run and interact with each other in some way. For these kinds of problems, one needs to make use of somewhat more complex features of the multiprocessing module, such as
Event. Using these features introduces a number of complicated issues that now need to be managed, especially with regard to cleanly starting and stopping subprocesses, as well as coordinating between them.
Since Python multiprocessing is best for complex problems, we’ll discuss these tips using a sketched out example that emulates an IoT monitoring device. This example is based on an implementation of an HVAC system that I worked on in 2018.
The application consists of a “Main Process” - which manages initialization, shutdown and event loop handling - and four subprocesses.
- “Observation Subprocess” runs every 10 seconds and collects data about the HVAC systems being monitored, queuing an “Observation” event message to the Event Queue. In this example, it is assumed that “Observation Subprocess” is CPU intensive as it performs a significant amount of data processing on the observation before sending it to the server.
- “Status Subprocess” also runs every 10 seconds, and collects data about the IoT system, including such things as uptime, network settings and status, and storage usage. It also queues a “Status” event message to the Event Queue.
- “Send Subprocess” accepts “Send” event messages from the Send Queue, and then sends those messages over the network to a central server that will store, analyze, present, and automatically act upon the data it receives,
- “Listen Subprocess” listens on a network port for incoming requests from the central command servers, queues up a “Command” event message to the Event Queue, and then waits for a “Reply” message from the Main Process. Such commands are largely focused on managing the HVAC system and obtaining detailed IoT system operating data. Note that, in the real world, this interface is much more complex, and relies upon a security layer.
- “Main Process”, besides managing the startup and shutdown of the entire system, also manages the Event Queue, routing messages to the proper subprocess: “Status” and “Observation” messages result in a “Send” message to the Send Queue, and “Command” events are handled and then a “Reply” message is queued to the Reply Queue.
Note on code examples: These examples use the mptools Python library that I developed while writing this blog post. This is the only example where the library interface is directly referenced.
Don’t Share Resources, Pass Messages
At first thought, it might seem like a good idea to have some sort of shared data structures that would be protected by locks. When there is only one shared structure, you can easily run into issues with blocking and contention. As such structures proliferate, however, the complexity and unexpected interactions multiply, potentially leading to deadlocks, and very likely leading to code that is difficult to maintain and test. The better option is to pass messages using
multiprocessing.Queue objects. Queues should be used to pass all data between subprocesses. This leads to designs that “chunkify” the data into messages to be passed and handled, so that subprocesses can be more isolated and functional/task oriented. The Python
Queue class is implemented on unix-like systems as a PIPE - where data that gets sent to the queue is serialized using the Python standard library
pickle module. Queues are usually initialized by the main process and passed to the subprocess as part of their initialization.
Always clean up after yourself
Subprocesses can hang or fail to shutdown cleanly, potentially leaving some system resources unavailable, and, potentially worse, leaving some messages un-processed. For this reason, a significant percentage of one’s code needs to be devoted to cleanly stopping subprocesses.
The first part of this problem is telling subprocesses to stop. Since we’re using Queues and messages, the first, and most common, case is to use “END” messages. When it’s time to stop the application, one queues up “END” messages to each queue in the system, equal to the number of subprocesses reading from that Queue. Each of the subprocess should be looping on messages from the queue, and once it receives an “END” message, it will break out of the loop and cleanly shut itself down.
“END” messages are very useful, but they aren’t the only method available, and they don’t help if a subprocess isn’t reading from a queue. I recommend using a
multiprocessing.Event object. An
Event object is a
False flag, initialized as
False, that can be safely set to
True in a multiprocess environment while other processes can check it with
is-set() and wait on for it to change to
True. Best practice is to have only one “shutdown-requested” Event object in an application which is passed to all subprocesses. Subprocesses should then loop using that
Event as their boolean check - so that if the
Event is set to
True, the loop terminated.
Once a subprocess needs to end - be it via “END” message,
shutdown_event Event flag, or an exception of some sort - it is the subprocess’s duty to clean up after itself by releasing any resources it owns. Python does a pretty great job of cleaning things up during garbage collection, but some resources need to be closed cleanly (pipes, files), and some will hang for some unknown timeout period, thus preventing a clean exit of the process. Network resources can not only tie up local resources, they can also tie up resources on the remote server systems while they wait for timeouts. So final cleanup is vital.
Not only do subprocesses need to clean up after themselves, the main process also needs to clean up the subprocesses,
Queue objects, and other resources that it might have control over. Cleaning up the subprocesses involves making sure that each subprocess gets whatever termination messaging that it might need, and that the subproceses are actually terminated. Otherwise, stopping the main process could result in either a hang, or an orphaned zombie subprocess. The normal procedure involves setting the shutdown flag, waiting for all the processes to stop normally within some reasonable amount of time, and then terminating any that haven’t stopped yet.
Python’s Queue objects also need a bit of special handling to be fully cleaned up: they need to be drained of any items that have been left there (and those items may or may not need to be dealt with somehow - perhaps by saving them to disk), closed, and, importantly, have
Queue.join_thread() called so that the associated monitoring thread gets cleaned up and doesn’t generate a misleading exception message during final Python garbage collection.
Always handle TERM and INT signals
We discussed how to be sure to end subprocesses, but how does one determine when to end them? Applications will often have a way to determine that they have nothing left to process, but server processes usually receive a TERM signal to inform them that it’s time to stop. Also, especially during testing, one often finds oneself using the INT signal (aka
KeyboardInterrupt) to stop a runaway test. More often than not, one desires the same behavior from TERM and INT signals, though INT might also always want to generate a stack trace so the user can see more precisely what they interrupted.
The approach I recommend is to have signal handlers set the
shutdown_event Event flag the first two times they get called, and then raise an exception each time they’re called thereafter. This allows one to hit control-C twice and be sure to stop code that’s been hung up waiting or in a loop, while allowing “normal” shutdown processing to properly clean up. The example below uses a common signal handler function, using functools.partial to create the two functions, differing only in which exception they will raise, that get passed as the signal handlers. An important detail is that signals need to be set up separately for each subprocess. Linux/Unix systems automatically propagate signals to all child processes, so those subprocesses also need to capture and handle the signals as well. An advantage to this is that subprocess signal handlers can potentially operate on resources specific to that subprocess. For example, I have written a signal handler that changed a ZeroMQ blocking socket into a non-blocking one. This allowed me to write code for a
get() call that didn’t have a timeout, and didn’t really need one.
Don’t Wait Forever
Proper shutdown behavior requires that every process in the system be resilient against getting “stuck”. This means that not only will loops have terminating conditions, but that other system calls that could block and wait will need to use timeouts if at all possible:
Queue objects allow for timeouts when doing both
get() calls, sockets can be configured to time out, etc.. This ends up being a form of polling where there are 2 events being checked: the system call, and the termination event (i.e.
True). You’ll need to determine how long you can afford to wait on the system call before checking if the loop needs to terminate. The goal is to check for termination frequently enough that the system will respond promptly to a termination/shutdown request, while spending most of the process’s time waiting on the resource (queue, event, socket, etc) It’s important to not wait very long because for server processes started with systemd, systemd will eventually (90 seconds by default) decide that your application isn’t stopping and send SIGKILL signal, and you no longer have a chance to clean up.
Here are some examples of waiting:
Polling against queues:
get() from the
Queue with block set to
True and a short timeout. If
queue.Empty is raised, go back to the top of the loop and try again, otherwise, process the returned item.
Poll against sockets: Call
settimeout() on the socket with a short timeout. If the
socket.timeout is raised, go back to the top of the loop, check for
shutdown_event, and try again, otherwise, process handle the accepted client connection (which will also need to have
settimeout() called on it, so its operations don’t hang)
Poll while waiting for a timer: Loop as long as
shutdown_event is not set, firing a timer every
INTERVAL_SECS seconds. Each pass through the loop sleeps for the time remaining until
next_time, up to the max of
MAX_SLEEP_SECS (0.02) seconds (which, of course, means that it usually sleeps 0.02 seconds). If the code comes out of the
next_time, go back to the top of the loop and try again, otherwise do something (in this case, put “TIMER EVENT” on the
event_queue), and re-calculate the time for the next timer event.
Tip # 5
Report exceptions, and have a time based, shared logging system.
Logging in an application is vitally important, and even more so in a multiprocessing app, where a combined log shines at reporting events in time-based order. Happily, Python provides good logging facilities. Sadly, Python doesn’t really provide a great way to sync subprocess log messages. Because there are so many moving parts, each log message needs 2 key pieces of data: which process is generating the log message, and how long it’s been since the application started. I generally name my processes. If there are multiple copies of the same process, then they’ll take the name “WORKER-1”, “WORKER-2”, etc.
Besides logging, each subprocess can send Error and Shutdown messages to the main event queue. Allowing the event handler to recognize and deal with unexpected events, such as retrying failed sends, or starting a new subprocess after one has failed.
Below is the log output from a sample run. Note how the timing being based on application start time provides a clearer picture of what’s going on during the all important startup process.
$ python multiproc_example.py
DEBUG:root: 0:00.008 SEND Worker Proc.__init__ starting : SEND DEBUG:root: 0:00.013 SEND Worker Entering QueueProcWorker.init_args : (<multiprocessing.queues.Queue object at 0x105a7cd30>,) DEBUG:root: 0:00.013 SEND Worker Entering init_signals DEBUG:root: 0:00.014 SEND Worker Entering QueueProcWorker.main_loop DEBUG:root: 0:00.014 SEND Worker Proc.__init__ starting : SEND got True DEBUG:root: 0:00.015 LISTEN Worker Proc.__init__ starting : LISTEN DEBUG:root: 0:00.019 LISTEN Worker Entering init_signals DEBUG:root: 0:00.022 LISTEN Worker Entering main_loop DEBUG:root: 0:00.022 LISTEN Worker Proc.__init__ starting : LISTEN got True DEBUG:root: 0:00.024 STATUS Worker Proc.__init__ starting : STATUS DEBUG:root: 0:00.029 STATUS Worker Entering init_signals DEBUG:root: 0:00.033 STATUS Worker Entering TimerProcWorker.main_loop DEBUG:root: 0:00.033 STATUS Worker Proc.__init__ starting : STATUS got True DEBUG:root: 0:00.035 OBSERVATION Worker Proc.__init__ starting : OBSERVATION DEBUG:root: 0:00.039 OBSERVATION Worker Entering init_signals DEBUG:root: 0:00.040 OBSERVATION Worker Entering startup DEBUG:root: 0:00.042 OBSERVATION Worker Entering TimerProcWorker.main_loop DEBUG:root: 0:00.043 OBSERVATION Worker Proc.__init__ starting : OBSERVATION got True ^C (ed: user hit Control-C) INFO:root: 0:03.128 OBSERVATION Worker Normal Shutdown INFO:root: 0:03.128 STATUS Worker Normal Shutdown DEBUG:root: 0:03.130 OBSERVATION Worker Entering shutdown DEBUG:root: 0:03.130 STATUS Worker Entering shutdown ERROR:root: 0:03.131 MAIN Unknown Event: OBSERVATION - SHUTDOWN : Normal DEBUG:root: 0:03.132 SEND Worker QueueProcWorker.main_loop received 'END' message INFO:root: 0:03.133 SEND Worker Normal Shutdown INFO:root: 0:04.025 LISTEN Worker Normal Shutdown DEBUG:root: 0:04.028 MAIN Process OBSERVATION ended with exitcode 0 DEBUG:root: 0:04.028 MAIN Process STATUS ended with exitcode 0 DEBUG:root: 0:04.028 MAIN Process LISTEN ended with exitcode 0 DEBUG:root: 0:04.028 MAIN Process SEND ended with exitcode 0
mutliprocessing module allows you to take advantage of the CPU power available on modern systems, but writing and maintaining robust multiprocessing apps requires avoiding certain patterns that can lead to unexpected difficulties, while also spending a fair amount of time and energy focusing on details that aren’t the primary focus of the application. For these reasons, deciding to use multiprocessing in an application is not something to take on lightly, but when you do, these tips will make your work go more smoothly, and will allow you to focus on your core problems.