thanks for attending compu-salon episode 8, multiprocessing. Here's a summary of our discussion.
We're not meeting today (Mar 23) in observance of PCGM. Next week (Mar 30, at 4pm) we'll talk about using file formats (and even databases!) in Python.
I'll send out a reminder next week. Until then!
Here's what we discussed and programmed together.
Parallel programming is difficult, there's no way around it. It's especially difficult to do it in the transparent, zen-like way that we pursue in this salon. This may be because of the immaturity of tools and concepts, so things may be improving in the future. (Maybe, but things sometimes get worse: GPU programming can be more painful than cluster programming.) So be warned, pain lies this way. But if you still want to tread this path, here's the gentlest, quickest introduction I could come up with.
This is a big topic with lots of theory behind it, and many treatments. We only mentioned a few important themes. Paraphrasing Konrad Hinsen, we define
Generally in scientific computing we are happy with parallelism alone, but some pitfalls of concurrency do appear in its implementation. The main architectural constraint in managing parallelism in scientific applications is decomposition (e.g., of a computational domain) and distribution (e.g., of tasks). These are tackled through a model (or a combination of models):
Another model worth mentioning is task farming (e.g., repeating a simulation for many different sets of parameters). Here there is no communication between processes, and the main job is dispatching tasks efficiently and collecting results.
The main concerns in parallel programming are
Correctness: are you sure that your code is really doing what it should? Explicit message passing introduces synchronization problems such as deadlocks (multiple processes waiting on each other in non-solvable fashion) and race conditions (code that has different results depending on the contingent order of execution of processes).
While not necessarily incorrect, the result of a computation can be nondeterministic. At the salon, Nick brought up the example of the self-optimizing library FFTW.
All threads within a process share the same data, but execute instructions at different points within the code. The operating system can switch between threads without warning (and on multicore CPU, threads may truly be running in parallel), so threads must be careful in writing and reading variables, because these could be changed under their feet (as it were) if a thread gets suspended in the middle of writing or reading. These problems are managed with synchronization mechanisms such as locks, semaphores, and more.
In Python the benefits of multithreading are limited: because of the complexity of managing memory access in a high-level, interpreted language, the standard Python interpreter restricts access to all Python objects to one thread at a time. This is known as the Global Interpreter Lock. Thus, the preferred multi-core parallel-computing model in Python is multiprocessing, supported by both message passing and shared memory. The Python
multiprocessing module deals with this case, but multiprocessing can be achieved with a similar interface on networked clusters.
multiprocessing module is a relatively recent addition to the Python canon (version 2.6) that standardizes a previous third party package.
multiprocessing—in general, all of Doug's Python Module of the Week series is excellent.
threadingmodule documentation, since
threadinginterface as much as possible.
Note: on OS X (at least on Snow Leopard) the
multiprocessing distributed with python 2.6 is broken. The best way to get it forward may be to update python (
brew install python; then make sure your
/usr/local/share/python). You will need to rebuild your
virtualenvs after updating Python.
In our salon, we realtime-coded a very simple example from GW data analysis: a search for an inspiral signal using matched filtering. Here's a cleaned-up version. We start by defining the signal and the matched-filtering products between signals; we'll put these in a separate module,
# --- module filtering.py --- # integer division should return a float, so we avoid the classic # "C" bug that 3/5 = 0; this is standard in Python 3, by the way from __future__ import division import math, numpy as N # we'll need a couple of constants Msun = 4.9257e-6 # Mass of the Sun in seconds Mpcs = 1.02927e14 # megaparsec in seconds # we work with a basic array of frequencies from 0 to fmax; # we take one more element than a power of two, so that the inverse real FFT # has 2^N elements; this is consistent with the standard arrangements # of frequencies in numerical FFTs, which goes as # [0,df,2*df,...,f_Nyquist,-(f_Nyquist-df),..,-df] fmax = 1024 f = N.linspace(0,fmax,2**16 + 1) # enough samples to cover chirp mass # between 0.8 and 1.75 Msun df = f - f # frequency spacing fmin = 40 def template(Mc): """Return a 'Newtonian' chirp with chirp mass Mc [Msun], as defined in the frequency domain by the stationary-phase approximation. See, e.g., Maggiore (2007).""" # by operating on f[1:] avoid taking "difficult" powers at f = 0 phasing = (3/128) * (math.pi * Mc * Msun * f[1:])**(-5/3) # the last factor (a Boolean) sets to 0 all elements with f < fmin amplitude = math.sqrt(5/24) * math.pi**(-2/3) * (Mc * Msun)**(5/6) * f[1:]**(-7/6) * (f[1:] > fmin) # but we include f = 0 in the final resulting array waveform = N.zeros(len(f),'complex128') waveform[1:] = amplitude * N.exp(-1j * phasing) return waveform # to see these waveforms in the time domain you could do # import matplotlib.pyplot as P # P.plot(N.fft.irfft(template(0.8))) # P.plot(N.fft.irfft(template(1.74)),'r') def normsq(signal): """Squared "noise" norm of a signal (with noise variance = 1).""" return (4/df) * N.sum(N.abs(signal)**2) def normtemplate(Mc): """Return a normalized template by calling template and normsq.""" t = template(Mc) return t / math.sqrt(normsq(t)) def maxsnr(signal,temp): """Signal--template correlation, maximized over time shifts with the 'fft trick'. Again, see Maggiore (2007).""" return N.max((4/df) * N.real(N.fft.irfft(N.conj(signal) * temp)))
We then set up a parallel code to evaluate the signal-to-noise ratio (SNR) of a fiducial signal after filtering by all templates in a bank. We will do so by using the
Poolinitializes the desired number of worker processes, which are spawned with the fork function of the operating system. Each spawned process gets a complete copy of the memory space of the current process, including all the function and variable definitions. The copy is done efficiently, and blocks of memory will be actually copied only when they are first modified in the new process. (This is not quite true on Windows, but we shall not worry about that.)
Poolare then asked (with the method
map) to evaluate a function (in this case
getsnr) on all the elements of a list (
Mc); the list elements are spread among the workers, which operate in parallel; but the results are returned in the correct order. All of this happens transparently, but you can bet that it requires quite a bit of message passing under the hood.
Here's a script that does all this:
# --- script search.py --- import time, multiprocessing as mp import numpy as N, matplotlib.pyplot as P import filtering as F # our own module with data-analysis definitions # we're going to evaluate SNRs against this fiducial signal... signal = F.template(1.2) # ...for normalized templates in this range of chirp masses Mcs = N.arange(0.8,1.74,0.01) def getsnr(Mc): """Evaluate SNR of fiducial signal after filtering by a normalized template with chirp mass Mc.""" return F.maxsnr(signal,F.normtemplate(Mc)) # we first do it (and time it) on a single CPU t0 = time.time() snrs = map(getsnr,Mcs) # equivalent to [getsnr(Mc) for Mc in Mcs] print "Monoprocessing:", time.time() - t0 # next, we create a multiprocessing _pool_ of processes # each is _forked_ with a copy of all variables and functions defined so far pool = mp.Pool(processes=4) # and now we ask the pool to evaluate getsnr() on all elements of Mcs, # by distributing them among the workers using multiprocessing.map() t0 = time.time() snrs = pool.map(getsnr,Mcs) print "Multiprocessing:", time.time() - t0 # gracefully close all worker processes pool.close() # let's see what they look like, using matplotlib P.plot(Mcs,snrs) P.show()
Pool objects support a few other methods, including
apply_async(func,args), which assigns a single task to the first available worker. The function is
asyncronous, since we don't want to wait for the worker to complete before we can assign more tasks, so we get back an
AsyncResult object that can be queried for the actual
func result with
True, or after calling
multiprocessing module supports also operation at a lower level, with explicit data passing. To show this we'll modify our example, and set up standing worker processes that wait for input (signals) from a
Pipe and return output (SNRs) on a
Queue. Pipes and queues are examples of shared
multiprocessing objects that can be used for communication. (Remember that once a process is forked, all changes to its memory space remain local and are not propagated to other processes. The
multiprocessing objects are actually proxies that achieve propagation by hidden message passing.)
A convenient feature of
multiprocessing is that messages can be any types of Python objects (whereas message-passing libraries such as MPI are limit to arrays of basic types such as integers and floats). Internally, this is achieved with serialization.
multiprocessing communication objects include
Conditions (which act as signals between processes),
Locks (used to control exclusive access to resources),
Semaphores (used to control shared but limited access to resources), as well as shared, synchronized versions of Python lists, dictionaries, and even namespaces, all obtained through
multiprocessing.Manager. Updates to these objects are seen in all processes. Shared memory objects are also possible, specifically
Array (these are statically-typed
If the last paragraph was a little scary, don't worry! As I hinted above, parallel programming is nice if you can stay within high-level abstractions, but gets "dirty" pretty soon if you have to worry about the details of concurrency. So I hope you don't!
Here's a script that implements the cached evaluation of SNRs on a set of worker processes:
# --- script cachedsearch.py --- import time, multiprocessing as mp, random import numpy as N, matplotlib.pyplot as P import filtering as F # our own module with data-analysis definitions # this time we're going to evaluate the SNR of multiple fiducial signals # against the same bank of templates, nsignals = 10 Mcs = N.arange(0.8,1.74,0.01) # to speed up the computation, we're going to cache subsets # of pre-computed templates within each worker process def worker(Mcs,pipe,results): """Worker function for parallel processes: takes as input a list 'Mcs' of chirp masses, a Pipe to receive fiducial signals from the master, and a Queue 'results' to return maximum SNRs and the corresponding chirp mass.""" # make a list of cached templates templates = [F.normtemplate(Mc) for Mc in Mcs] # loop forever! while True: # receive a signal from the master process signal = pipe.recv() # exit if it is a poison pill if signal is None: return # evaluate all the SNRs snrs = [F.maxsnr(signal,t) for t in templates] # put into queue the location and value of the max SNR maxt = N.argmax(snrs) results.put((Mcs[maxt],snrs[maxt])) # we will run 4 worker processes cpus = 4 # we create shared, synchronized objects to send signals # and to receive results pipes = [mp.Pipe() for i in range(cpus)] # a Pipe for every worker results = mp.Queue() # a single Queue for all # (that's almost poetry) # we will not use the multiprocessing.Pool abstraction, # but instead create a list of processes and have each execute # the function 'worker' with arguments given by a subset of chirp masses, # the worker "end" of the appropriate Pipe, and the Queue procs = [mp.Process(name = 'worker-%s' % i, target = worker, args = (Mcs[i::cpus],pipes[i],results) ) for i in range(cpus)] # start all the processes for p in procs: p.start() # repeat 'nsignals' times for j in range(nsignals): # create a signal with random chirp mass trueMc = random.uniform(0.8,1.74) signal = F.template(trueMc) # get maximum SNRs from workers t0 = time.time() # start timing for p in pipes: # send signal to all workers... p.send(signal) # ...using the master "end" of the Pipe snrs = N.array([results.get() for i in range(cpus)]) # collect 'cpus' results from the Queue, # place them into an array print "This iteration:", time.time() - t0 # compare the Mc with the maximum SNRs with the correct one print "True: %.3f Msun -> best fit: %.3f Msun" % (trueMc,snrs[N.argmax(snrs[:,1]),0]) # send poison pills to all workers for p in pipes: p.send(None) # wait for worker processes to terminate for p in procs: p.join()
Cool! That's where we left things, but I should give you some
In order of wonkishness:
Poolon networked clusters, but requires a little work to set these up.
Arrayobjects to pass
numpyarrays economically. Not pretty, but see also numpy-sharedmem.