mpi4py – high-performance distributed Python
Parallel Paths
Python, I think it's safe to say, is the most prevalent language for machine learning and deep learning and has become a popular language for technical computation, as well. Part of the reason for its increasing popularity is the availability of common high-performance computing (HPC) tools and libraries. One of these libraries is mpi4py . If you are familiar with the prevalent naming schemes in Python, you can likely figure out that this is a Message Passing Interface (MPI) library for Python.
MPI is the key library and protocol for writing parallel applications that expand beyond a single system. At first, it was primarily a tool for HPC in the early 1990s, with version 1.0 released in June 1994. In the early days of MPI, it was used almost exclusively with C/C++ and Fortran. Over time, versions of MPI or MPI bindings were released for other languages, such as Java, C#, Matlab, OCaml, R, and, of course, Python. These bindings were created because MPI became the standard for exchanging data between processes on shared or distributed architectures in the HPC world.
With the rise of machine learning, as well as Python in general, developing standard computational tools for Python seemed logical. MPI for Python (mpi4py ) [1] was developed for Python with the C++ bindings in the MPI-2 standard. The 1.0 release was on March 20, 2020, and the current release as of this writing is 3.0.3 (July 27, 2020). Mpi4py is designed to be fairly Pythonic, so if you know Python and understand the basic concepts of MPI, then mpi4py shouldn't be a problem.
Python data structures can be used to create more complex data structures. Because it's an object-oriented language, you can create classes and objects with these data structures. Libraries such as NumPy [2] can create new data types (e.g., N -dimensional arrays). To pass these complex data structures and objects between MPI ranks, Python serializes the data structures with the pickle standard library module [3], supporting both ASCII and binary formats. The marshal module also can be used to serialize built-in Python objects into a binary format that is specific to Python.
Mpi4py takes advantage of Python features to maximize the performance of serializing and de-serializing objects as part of data transmission. These efforts have been taken to the point that the mpi4py documentation claims it "… enables the implementation of many algorithms involving multidimensional numeric arrays (e.g., image processing, fast Fourier transforms, finite difference schemes on structured Cartesian grids) directly in Python, with negligible overhead, and almost as fast as compiled Fortran, C, or C++ codes" [4].
Simple mpi4py Examples
For the example programs, I used the mpi4py install for Anaconda [5] and built it with MPICH2 [6]. Running mpi4py Python codes follows the MPICH2 standards. For example, a command might be:
$ mpirun -np 4 -f ./hosts python3 ./script.py
where script.py
is the Python code that uses mpi4py
.
The first example is just a simple "hello world" (Listing 1) from Jörg Bornschein's GitHub page [7]. The MPI.COMM_WORLD
variable (line 3) is how the world communicator is defined in mpi4py to access a global "communicator," which is a default group of all processes (i.e., collective function) [8] It is created when the mpi4py module is imported. The import also causes the MPI.Init() and MPI.init_thread()
functions to be called [9].
Listing 1
Hello World
01 from mpi4py import MPI 02 03 comm = MPI.COMM_WORLD 04 05 print("Hello! I'm rank %d from %d running in total..." % (comm.rank, comm.size)) 06 07 comm.Barrier() # wait for everybody to synchronize _here_
Contrast this with C or Fortran the mpi_init()
is explicitly called and the world communicator, MPI_COMM_WORLD
is defined. Note the difference between MPI.COMM_WORLD
for mpi4py and MPI_COMM_WORLD
for C and Fortran.
Line 3 in the sample code sets the MPI.COMM_WORLD
communicator to the variable comm
. The code then uses the rank of the specific process, comm.rank
, and the total number of processes, comm.size
. The rank of a specific process is very similar to C, MPI_Comm_rank
, and Fortran, MPI_COMM_RANK
. Although not exactly the same, if you understand the basics of MPI, it won't be difficult to use mpi4py
.
The code ends by calling the Barrier()
function. Although not strictly necessary, it's a nice way of ending the code to make sure all of the processes reach that point.
To run the code, I used:
$ mpirun -np 4 -f ./hosts python3 ./hello-world.py
The output is shown in Listing 2.
Listing 2
Hello World Output
Hello! I'm rank 0 from 4 running in total... Hello! I'm rank 1 from 4 running in total... Hello! I'm rank 2 from 4 running in total... Hello! I'm rank 3 from 4 running in total...
A second example is just a simple broadcast from Ahmed Azridev's GitHub page [10] (Listing 3). [Note: I cosmetically modified the code to appeal to my sensibilities and to update it to Python 3.] The code first defines a dictionary, data
, but only for the rank 0 process. All other processes define data
with the keyword None
.
Listing 3
Broadcast Example
01 from numpy import array 02 from mpi4py import MPI 03 04 comm = MPI.COMM_WORLD 05 rank = comm.Get_rank() 06 07 if rank == 0: 08 data = { 'key1' : [10,10.1,10+11j], 09 'key2' : ('mpi4py' , 'python'), 10 'key3' : array([1, 2, 3]) } 11 else: 12 data = None 13 # end if 14 15 data = comm.bcast(data, root=0) 16 17 if rank == 0: 18 print("bcast finished") 19 # end if 20 21 print("data on rank %d is: "%comm.rank, data)
The data is broadcast with the bcast
function, which is part of communicator comm
, which happens to be MPI.COMM_WORLD
. In Fortran and C, the function is usually named MPI_Bcast
or MPI_BCAST
. For the specific function, the content of variable data
is broadcast to all ranks, from the rank 0 process (root=0
). Again, all of this is very similar to C and Fortran, but not exactly the same. Listing 4 shows the output from the code.
Listing 4
Broadcast Output
bcast finished data on rank 0 is: {'key1': [10, 10.1, (10+11j)], 'key2': ('mpi4py', 'python'), 'key3': array([1, 2, 3])} data on rank 1 is: {'key1': [10, 10.1, (10+11j)], 'key2': ('mpi4py', 'python'), 'key3': array([1, 2, 3])} data on rank 2 is: {'key1': [10, 10.1, (10+11j)], 'key2': ('mpi4py', 'python'), 'key3': array([1, 2, 3])} data on rank 3 is: {'key1': [10, 10.1, (10+11j)], 'key2': ('mpi4py', 'python'), 'key3': array([1, 2, 3])}
Notice that the data is defined on the rank 0 process, but each rank printed the same data. Perhaps more importantly, the data is a dictionary, not something that is defined by default in Fortran or C.
The third example is point-to-point code [11] (Listing 5). Again, I modified the code to fit my sensibilities and to port to Python 3. I also changed the pprint()
functions to print()
. The code is pretty simple. The rank 0 process creates some data in the NumPy data
array and sends the first part of that array to the rank 1 process and the second part of the array to the rank 2 process. To make even more sure the data goes to the correct process, it uses tags for the send()
and recv()
functions (i.e., tag=13
and tag=14
) to distinguish the destinations (dest
) and make sure the sender and receiver processes are matched correctly. It's not the same as MPI_Send
and MPI_Recv
, but it's fairly easy to figure out if you know some MPI. The output when running the code on four processes is shown in Listing 6. Notice that processes 3 and 4 didn't contribute or do anything.
Listing 5
Point-to-Point
01 import numpy 02 from mpi4py import MPI 03 04 05 comm = MPI.COMM_WORLD 06 rank = comm.Get_rank() 07 size = comm.Get_size() 08 09 if (rank == 0): 10 data = numpy.arange(10) 11 comm.send(data[:5], dest=1, tag=13) 12 comm.send(data[5:], dest=2, tag=14) 13 print("Rank %d data is: "%rank, data) 14 elif (rank == 1): 15 data = comm.recv(source=0, tag=13) 16 print("Rank %d Message Received, data is: "%rank, data) 17 elif (rank == 2): 18 data = comm.recv(source=0, tag=14) 19 print("Rank %d Message Received, data is: "%rank, data) 20 # end if
Listing 6
Point-to-Point Output
output: Rank 0 data is: [0 1 2 3 4 5 6 7 8 9] Rank 1 Message Received, data is: [0 1 2 3 4] Rank 2 Message Received, data is: [5 6 7 8 9]
More Complex Examples
From these three simple examples, you've learned that mpi4py works exactly as MPI code written in Fortran or C with a Python twist. Mpi4py can work with Python data types by serializing and de-serializing them. The next step is to try some more complicated examples.
The first example (Listing 7) is simple parallel trapezoid integration code derived from a presentation by Pawel Pomorski [12]. The code is fairly simple. If you remember how to do numerical integration by the trapezoidal approach, this code should make sense.
Listing 7
Parallel Trapezoid Method
01 from mpi4py import MPI 02 import math 03 04 05 def f(x): 06 return x*x 07 # end def 08 09 10 def trapezoidal(a, b, n, h): 11 12 s = 0.0 13 s += h * f(a) 14 for i in range(1, n): 15 s += 2.0 * h * f(a + i*h) 16 # end for 17 s += h * f(b) 18 return (s/2.) 19 # end def 20 21 22 # Main section 23 comm = MPI.COMM_WORLD 24 my_rank = comm.Get_rank() 25 26 p = comm.Get_size() 27 28 if (my_rank == 0): 29 print("Number of ranks = ",p) 30 # end if 31 print('my_rank = ',my_rank) 32 33 34 # Integral parameters 35 a=0.0 # Left endpoint 36 b=2.0 # Right endpoint 37 n=1024 # Number of trapezoids 38 dest=0 # Destination for messages (rank 0 process) 39 total=-1.0 # Initialize to negative number 40 41 h = (b-a)/n # h = Trapezoid Base Length - the same for all processes 42 local_n = int(n/p) # Number of trapezoids for each process 43 44 # Length of each process' interval of 45 # integration = local_n*h. 46 local_a = a + my_rank*local_n*h # local a (specific to each process) 47 local_b = local_a + local_n*h # local b (specific to each process) 48 49 # Each rank performs its own integration 50 integral = trapezoidal(local_a, local_b, local_n, h) 51 52 # Add up the integrals calculated by each process 53 if (my_rank == 0): 54 total=integral 55 for source in range(1,p): 56 integral=comm.recv(source=source) 57 print("PE ",my_rank,"<-",source,",",integral,"\n") 58 total=total+integral 59 # end for 60 else: 61 print("PE ",my_rank,"->",dest,",",integral,"\n") 62 comm.send(integral, dest=0) 63 # end if 64 65 66 # Print the result 67 if (my_rank==0): 68 print("**With n=",n,", trapezoids,") 69 print("** Final integral from",a,"to",b,"=",total,"\n") 70 # end if 71 72 MPI.Finalize()
To begin, the code breaks up the integration beginning and end points into n
trapezoids, where n
is also the number of processes. Then the number of intervals are divided across the number of processes, and each process performs its own integration. When finished, each process sends its result to the rank 0 process, which adds the contribution of each process to get the final answer.
The command to run the code is straightforward:
$ mpirun -n 4 -f ./hosts python3 trap-mpi4py.py
Another sample problem (Listing 8) integrates x
^2 over the interval from 0.0 to 2.0. The output contains information about what each process is doing, showing when a process sends data to the rank 0 process (e.g., PE 1 -> 0
). The rank 0 process shows which process sent the data (e.g., PE 0 <- 1
).
Listing 8
Integration Example
01 Number of ranks = 4 02 my_rank = 0 03 PE 0 <- 1 , 0.29166698455810547 04 05 PE 0 <- 2 , 0.7916669845581055 06 07 PE 0 <- 3 , 1.5416669845581055 08 09 **With n= 1024 , trapezoids, 10 ** Final integral from 0.0 to 2.0 = 2.666667938232422 11 12 my_rank = 1 13 PE 1 -> 0 , 0.29166698455810547 14 15 my_rank = 3 16 PE 3 -> 0 , 1.5416669845581055 17 18 my_rank = 2 19 PE 2 -> 0 , 0.7916669845581055
The next example is a simple matrix-matrix multiply [13] that breaks the matrices into a grid of blocks. It then handles communication between the blocks in a classic east, west, north, south layout. I won't put the code here because it is a little long. I only changed the function calls to pprint
from print
to get it to work. The program uses four processes, and the output is just the timings of the code run (Listing 9).
Listing 9
Matrix-Matrix Product Output
Creating a 2 x 2 processor grid... ================================================================= Computed (serial) 3000 x 3000 x 3000 in 1.68 seconds ... expecting parallel computation to take 3.36 seconds Computed (parallel) 6000 x 6000 x 6000 in 3.69 seconds
The final example I tried, just for fun, is a parallel implementation of Conway's Game of Life [14] that I got from Kevin Moore's GitHub site [15]. Again, I won't show the lengthy code here. Because it's old Python 2.x
style, I had to fix the print
functions.
If you've never played Game of Life, give it a try. There is really nothing to do: just watch the screen. The grid is initialized randomly. Because the output is animated, I can't show anything here, but give it a try and be sure to take a look at the code.
Summary
Mpi4py is a very powerful Python tool that uses wrappers around standard MPI implementations such as MPICH2 or OpenMPI. If you have used MPI before, either in Fortran or C, then using it in Python is not difficult. It holds true to Python's object-oriented nature but still uses the familiar MPI terminology.
In this article, I showed a few simple programs, from Hello World, to broadcast, to point-to-point examples, which all illustrate that coding MPI is not difficult in Python.
The final few "real" examples were more than just instructional: The simple trapezoidal integration example shows how easy it is to parallelize code; the matrix-matrix multiplication example breaks matrices into a grid of blocks (sometimes called tiles) and is a bit more complicated, because you need to know how to do parallel matrix multiplication; a parallel version of Conway's Game of Life is a diverting visual example, with a bit more difficult code. You probably need to know something about this algorithm to better understand the parallel code, but it's still fun to run, and you can always change the initialization from random to something more deliberate.
Infos
- MPI for Python: https://mpi4py.readthedocs.io/en/stable/intro.html
- NumPy: https://numpy.org/
- pickle: https://en.wikipedia.org/wiki/Serialization#Pickle
- Release 3.0.3 docs: https://buildmedia.readthedocs.org/media/pdf/mpi4py/stable/mpi4py.pdf
- Anaconda: https://www.anaconda.com/
- MPICH: https://www.mpich.org/
- mpi4py hello world: https://github.com/jbornschein/mpi4py-examples/blob/master/01-hello-world
- MPI_COMM_WORLD: https://computing.llnl.gov/tutorials/mpi/#Getting_Started
- mpi.init(): https://mpi4py.readthedocs.io/en/stable/overview.html#initialization-and-exit
- Simple broadcast code: https://github.com/azridev/mpi4py/blob/master/bcast.py
- Point-to-point code: https://github.com/azridev/mpi4py/blob/master/p2p.py
- Pomorski, Pawel. Introduction to Parallel Programming with MPI and Python. University of Waterloo. SHARCNET, 2019: https://www.sharcnet.ca/help/images/4/4b/Python_mpi_gis.pdf
- Matrix-matrix product code: https://github.com/jbornschein/mpi4py-examples/blob/master/08-matrix-matrix-product.py
- Conway's Game of Life: https://en.wikipedia.org/wiki/Conway%27s_Game_of_Life
- Game of Life parallel code: https://github.com/Jauntbox/paralife
Buy this article as PDF
(incl. VAT)