Lead Image © liu liming, 123RF.com

Lead Image © liu liming, 123RF.com

mpi4py – high-performance distributed Python

Parallel Paths

Article from ADMIN 60/2020
By
Tap into the power of MPI to run distributed Python code on your laptop at scale.

Python, I think it's safe to say, is the most prevalent language for machine learning and deep learning and has become a popular language for technical computation, as well. Part of the reason for its increasing popularity is the availability of common high-performance computing (HPC) tools and libraries. One of these libraries is mpi4py . If you are familiar with the prevalent naming schemes in Python, you can likely figure out that this is a Message Passing Interface (MPI) library for Python.

MPI is the key library and protocol for writing parallel applications that expand beyond a single system. At first, it was primarily a tool for HPC in the early 1990s, with version 1.0 released in June 1994. In the early days of MPI, it was used almost exclusively with C/C++ and Fortran. Over time, versions of MPI or MPI bindings were released for other languages, such as Java, C#, Matlab, OCaml, R, and, of course, Python. These bindings were created because MPI became the standard for exchanging data between processes on shared or distributed architectures in the HPC world.

With the rise of machine learning, as well as Python in general, developing standard computational tools for Python seemed logical. MPI for Python (mpi4py ) [1] was developed for Python with the C++ bindings in the MPI-2 standard. The 1.0 release was on March 20, 2020, and the current release as of this writing is 3.0.3 (July 27, 2020). Mpi4py is designed to be fairly Pythonic, so if you know Python and understand the basic concepts of MPI, then mpi4py shouldn't be a problem.

Python data structures can be used to create more complex data structures. Because it's an object-oriented language, you can create classes and objects with these data structures. Libraries such as NumPy [2] can create new data types (e.g., N -dimensional arrays). To pass these complex data structures and objects between MPI ranks, Python serializes the data structures with the pickle standard library module [3], supporting both ASCII and binary formats. The marshal module also can be used to serialize built-in Python objects into a binary format that is specific to Python.

Mpi4py takes advantage of Python features to maximize the performance of serializing and de-serializing objects as part of data transmission. These efforts have been taken to the point that the mpi4py documentation claims it "… enables the implementation of many algorithms involving multidimensional numeric arrays (e.g., image processing, fast Fourier transforms, finite difference schemes on structured Cartesian grids) directly in Python, with negligible overhead, and almost as fast as compiled Fortran, C, or C++ codes" [4].

Simple mpi4py Examples

For the example programs, I used the mpi4py install for Anaconda [5] and built it with MPICH2 [6]. Running mpi4py Python codes follows the MPICH2 standards. For example, a command might be:

$ mpirun -np 4 -f ./hosts python3 ./script.py

where script.py is the Python code that uses mpi4py .

The first example is just a simple "hello world" (Listing 1) from Jörg Bornschein's GitHub page [7]. The MPI.COMM_WORLD variable (line 3) is how the world communicator is defined in mpi4py to access a global "communicator," which is a default group of all processes (i.e., collective function) [8] It is created when the mpi4py module is imported. The import also causes the MPI.Init() and MPI.init_thread() functions to be called [9].

Listing 1

Hello World

01 from mpi4py import MPI
02
03 comm = MPI.COMM_WORLD
04
05 print("Hello! I'm rank %d from %d running in total..." % (comm.rank, comm.size))
06
07 comm.Barrier()   # wait for everybody to synchronize _here_

Contrast this with C or Fortran the mpi_init() is explicitly called and the world communicator, MPI_COMM_WORLD is defined. Note the difference between MPI.COMM_WORLD for mpi4py and MPI_COMM_WORLD for C and Fortran.

Line 3 in the sample code sets the MPI.COMM_WORLD communicator to the variable comm. The code then uses the rank of the specific process, comm.rank, and the total number of processes, comm.size. The rank of a specific process is very similar to C, MPI_Comm_rank, and Fortran, MPI_COMM_RANK. Although not exactly the same, if you understand the basics of MPI, it won't be difficult to use mpi4py .

The code ends by calling the Barrier() function. Although not strictly necessary, it's a nice way of ending the code to make sure all of the processes reach that point.

To run the code, I used:

$ mpirun -np 4 -f ./hosts python3 ./hello-world.py

The output is shown in Listing 2.

Listing 2

Hello World Output

Hello! I'm rank 0 from 4 running in total...
Hello! I'm rank 1 from 4 running in total...
Hello! I'm rank 2 from 4 running in total...
Hello! I'm rank 3 from 4 running in total...

A second example is just a simple broadcast from Ahmed Azridev's GitHub page [10] (Listing 3). [Note: I cosmetically modified the code to appeal to my sensibilities and to update it to Python 3.] The code first defines a dictionary, data, but only for the rank 0 process. All other processes define data with the keyword None.

Listing 3

Broadcast Example

01 from numpy import array
02 from mpi4py import MPI
03
04 comm = MPI.COMM_WORLD
05 rank = comm.Get_rank()
06
07 if rank == 0:
08   data = { 'key1' : [10,10.1,10+11j],
09            'key2' : ('mpi4py' , 'python'),
10            'key3' : array([1, 2, 3]) }
11 else:
12   data = None
13 # end if
14
15 data = comm.bcast(data, root=0)
16
17 if rank == 0:
18   print("bcast finished")
19 # end if
20
21 print("data on rank %d is: "%comm.rank, data)

The data is broadcast with the bcast function, which is part of communicator comm, which happens to be MPI.COMM_WORLD. In Fortran and C, the function is usually named MPI_Bcast or MPI_BCAST. For the specific function, the content of variable data is broadcast to all ranks, from the rank 0 process (root=0). Again, all of this is very similar to C and Fortran, but not exactly the same. Listing 4 shows the output from the code.

Listing 4

Broadcast Output

bcast finished
data on rank 0 is:  {'key1': [10, 10.1, (10+11j)], 'key2': ('mpi4py', 'python'), 'key3': array([1, 2, 3])}
data on rank 1 is:  {'key1': [10, 10.1, (10+11j)], 'key2': ('mpi4py', 'python'), 'key3': array([1, 2, 3])}
data on rank 2 is:  {'key1': [10, 10.1, (10+11j)], 'key2': ('mpi4py', 'python'), 'key3': array([1, 2, 3])}
data on rank 3 is:  {'key1': [10, 10.1, (10+11j)], 'key2': ('mpi4py', 'python'), 'key3': array([1, 2, 3])}

Notice that the data is defined on the rank 0 process, but each rank printed the same data. Perhaps more importantly, the data is a dictionary, not something that is defined by default in Fortran or C.

The third example is point-to-point code [11] (Listing 5). Again, I modified the code to fit my sensibilities and to port to Python 3. I also changed the pprint() functions to print(). The code is pretty simple. The rank 0 process creates some data in the NumPy data array and sends the first part of that array to the rank 1 process and the second part of the array to the rank 2 process. To make even more sure the data goes to the correct process, it uses tags for the send() and recv() functions (i.e., tag=13 and tag=14) to distinguish the destinations (dest) and make sure the sender and receiver processes are matched correctly. It's not the same as MPI_Send and MPI_Recv, but it's fairly easy to figure out if you know some MPI. The output when running the code on four processes is shown in Listing 6. Notice that processes 3 and 4 didn't contribute or do anything.

Listing 5

Point-to-Point

01 import numpy
02 from mpi4py import MPI
03
04
05 comm = MPI.COMM_WORLD
06 rank = comm.Get_rank()
07 size = comm.Get_size()
08
09 if (rank == 0):
10   data = numpy.arange(10)
11   comm.send(data[:5], dest=1, tag=13)
12   comm.send(data[5:], dest=2, tag=14)
13   print("Rank %d data is: "%rank, data)
14 elif (rank == 1):
15   data = comm.recv(source=0, tag=13)
16   print("Rank %d Message Received, data is: "%rank, data)
17 elif (rank == 2):
18   data = comm.recv(source=0, tag=14)
19   print("Rank %d Message Received, data is: "%rank, data)
20 # end if

Listing 6

Point-to-Point Output

output:
Rank 0 data is:  [0 1 2 3 4 5 6 7 8 9]
Rank 1 Message Received, data is:  [0 1 2 3 4]
Rank 2 Message Received, data is:  [5 6 7 8 9]

More Complex Examples

From these three simple examples, you've learned that mpi4py works exactly as MPI code written in Fortran or C with a Python twist. Mpi4py can work with Python data types by serializing and de-serializing them. The next step is to try some more complicated examples.

The first example (Listing 7) is simple parallel trapezoid integration code derived from a presentation by Pawel Pomorski [12]. The code is fairly simple. If you remember how to do numerical integration by the trapezoidal approach, this code should make sense.

Listing 7

Parallel Trapezoid Method

01 from mpi4py import MPI
02 import math
03
04
05 def f(x):
06   return x*x
07 # end def
08
09
10 def trapezoidal(a, b, n, h):
11
12   s = 0.0
13   s += h * f(a)
14   for i in range(1, n):
15     s += 2.0 * h * f(a + i*h)
16   # end for
17   s += h * f(b)
18   return (s/2.)
19 # end def
20
21
22 # Main section
23 comm = MPI.COMM_WORLD
24 my_rank = comm.Get_rank()
25
26 p = comm.Get_size()
27
28 if (my_rank == 0):
29   print("Number of ranks = ",p)
30 # end if
31 print('my_rank = ',my_rank)
32
33
34 # Integral parameters
35 a=0.0        # Left endpoint
36 b=2.0        # Right endpoint
37 n=1024       # Number of trapezoids
38 dest=0       # Destination for messages (rank 0 process)
39 total=-1.0   # Initialize to negative number
40
41 h = (b-a)/n         #  h = Trapezoid Base Length - the same for all processes
42 local_n = int(n/p)  #  Number of trapezoids for each process
43
44 #   Length of each process' interval of
45 #   integration = local_n*h.
46 local_a = a + my_rank*local_n*h   # local a (specific to each process)
47 local_b = local_a + local_n*h     # local b (specific to each process)
48
49 # Each rank performs its own integration
50 integral = trapezoidal(local_a, local_b, local_n, h)
51
52 # Add up the integrals calculated by each process
53 if (my_rank == 0):
54   total=integral
55   for source in range(1,p):
56     integral=comm.recv(source=source)
57     print("PE ",my_rank,"<-",source,",",integral,"\n")
58     total=total+integral
59   # end for
60 else:
61   print("PE ",my_rank,"->",dest,",",integral,"\n")
62   comm.send(integral, dest=0)
63 # end if
64
65
66 #  Print the result
67 if (my_rank==0):
68   print("**With n=",n,", trapezoids,")
69   print("** Final integral from",a,"to",b,"=",total,"\n")
70 # end if
71
72 MPI.Finalize()

To begin, the code breaks up the integration beginning and end points into n trapezoids, where n is also the number of processes. Then the number of intervals are divided across the number of processes, and each process performs its own integration. When finished, each process sends its result to the rank 0 process, which adds the contribution of each process to get the final answer.

The command to run the code is straightforward:

$ mpirun -n 4 -f ./hosts python3 trap-mpi4py.py

Another sample problem (Listing 8) integrates x ^2 over the interval from 0.0 to 2.0. The output contains information about what each process is doing, showing when a process sends data to the rank 0 process (e.g., PE 1 -> 0). The rank 0 process shows which process sent the data (e.g., PE 0 <- 1).

Listing 8

Integration Example

01 Number of ranks =  4
02 my_rank =  0
03 PE  0 <- 1 , 0.29166698455810547
04
05 PE  0 <- 2 , 0.7916669845581055
06
07 PE  0 <- 3 , 1.5416669845581055
08
09 **With n= 1024 , trapezoids,
10 ** Final integral from 0.0 to 2.0 = 2.666667938232422
11
12 my_rank =  1
13 PE  1 -> 0 , 0.29166698455810547
14
15 my_rank =  3
16 PE  3 -> 0 , 1.5416669845581055
17
18 my_rank =  2
19 PE  2 -> 0 , 0.7916669845581055

The next example is a simple matrix-matrix multiply [13] that breaks the matrices into a grid of blocks. It then handles communication between the blocks in a classic east, west, north, south layout. I won't put the code here because it is a little long. I only changed the function calls to pprint from print to get it to work. The program uses four processes, and the output is just the timings of the code run (Listing 9).

Listing 9

Matrix-Matrix Product Output

Creating a 2 x 2 processor grid...
=================================================================
Computed (serial) 3000 x 3000 x 3000 in       1.68 seconds
 ... expecting parallel computation to take   3.36 seconds
Computed (parallel) 6000 x 6000 x 6000 in     3.69 seconds

The final example I tried, just for fun, is a parallel implementation of Conway's Game of Life [14] that I got from Kevin Moore's GitHub site [15]. Again, I won't show the lengthy code here. Because it's old Python 2.x style, I had to fix the print functions.

If you've never played Game of Life, give it a try. There is really nothing to do: just watch the screen. The grid is initialized randomly. Because the output is animated, I can't show anything here, but give it a try and be sure to take a look at the code.

Summary

Mpi4py is a very powerful Python tool that uses wrappers around standard MPI implementations such as MPICH2 or OpenMPI. If you have used MPI before, either in Fortran or C, then using it in Python is not difficult. It holds true to Python's object-oriented nature but still uses the familiar MPI terminology.

In this article, I showed a few simple programs, from Hello World, to broadcast, to point-to-point examples, which all illustrate that coding MPI is not difficult in Python.

The final few "real" examples were more than just instructional: The simple trapezoidal integration example shows how easy it is to parallelize code; the matrix-matrix multiplication example breaks matrices into a grid of blocks (sometimes called tiles) and is a bit more complicated, because you need to know how to do parallel matrix multiplication; a parallel version of Conway's Game of Life is a diverting visual example, with a bit more difficult code. You probably need to know something about this algorithm to better understand the parallel code, but it's still fun to run, and you can always change the initialization from random to something more deliberate.

The Author

Jeff Layton has been in the HPC business for almost 25 years (starting when he was 4 years old). He can be found lounging around at a nearby Frys enjoying the coffee and waiting for sales.

Buy this article as PDF

Express-Checkout as PDF
Price $2.95
(incl. VAT)

Buy ADMIN Magazine

SINGLE ISSUES
 
SUBSCRIPTIONS
 
TABLET & SMARTPHONE APPS
Get it on Google Play

US / Canada

Get it on Google Play

UK / Australia

Related content

comments powered by Disqus