High-Performance Python – Distributed Python
As the last few articles have pointed out, Python is becoming an important language in HPC and is arguably the most important language in data science. The previous three articles have covered tools for improving the performance of Python with C, Fortran, and GPUs on a single node.
In this article, I present tools for scaling Python GPU code to distributed systems. Some of the tools are Python variations of classic HPC tools, such as MPI for Python (mpi4py), a Python binding to the Message Passing Interface (MPI). Tools such as Dask focus on keeping code Pythonic, and other tools support the best performance possible.
MPI for Python
The HPC world has been using MPI since the early 1990s. In the beginning, it was used almost exclusively with C/C++ and Fortran. Over time, versions of MPI or MPI bindings were released for other languages such as Java, C#, Matlab, OCaml, R, and, of course, Python. MPI was used because it is the standard in the HPC world for exchanging data between processes on shared or distributed architectures. Python is arguably the number one language for Deep Learning and is very popular in Machine Learning and HPC. Combining the two seemed like a natural step toward improving Python performance, while making code writing for science and engineering similar to the current language and tools already in use.
MPI for Python (mpi4py) was developed for Python by using the C++ bindings on the MPI-2 standard. It has been designed to be fairly Pythonic, so if you know Python and understand the basic concepts of MPI, mpi4py shouldn't be a problem. I am a self-admitted Fortran coder and using mpi4py, even though it is based on the C++ MPI bindings, is no more difficult than with Fortran. In fact, I think it's a bit easier, because Python has data structures that can be used to create complex data structures and objects. To pass these structures between MPI ranks, Python serializes the data structures using pickle, with both ASCII and binary formats supported. Python can also use the marshal module to serialize built-in objects into a binary format specific to Python.
Mpi4py takes advantage of Python features to maximize the performance of serializing and de-serializing objects as part of data transmission. These efforts have been taken to the point that the mpi4py documentation claims these features enable "… the implementation of many algorithms involving multidimensional numeric arrays (e.g., image processing, Fast Fourier Transforms, finite difference schemes on structured Cartesian grids) directly in Python, with negligible overhead, and almost as fast as compiled Fortran, C, or C++ codes."
Mpi4py has all of the communications and functions that you expect from MPI:
- Blocking and non-blocking send/receive
- Collective communications
- Scatter/gather
- Dynamic process management (MPI-2 standard)
- One-sided communications
- Parallel input/output: (MPI.File class)
- Initialization
- Timers
- Error handling
All of these are accomplished very Pythonically but, at the heart, are still MPI.
mpi4py Example
Running mpi4py code is about the same as running classic C/Fortran code with MPI. For example, using Open MPI, the command for running MPI code would be,
$ mpiexec -n 4 python script.py
where script.py is the Python code. Depending on how Python is installed or built on your system, you might have to define the fully qualified path to the Python executable itself, the script, or both. Here is a quick example of a Hello World mpi4py code.
from __future__ import print_function from mpi4py import MPI comm = MPI.COMM_WORLD print("Hello! I'm rank %d from %d running in total..." % (comm.rank, comm.size)) comm.Barrier() # wait for everybody to synchronize _here_
It looks like a classic MPI program that has been "pythonized." A second example from an mpi4py tutorial illustrates how to communicative NumPy arrays using MPI:
from mpi4py import MPI import numpy comm = MPI.COMM_WORLD rank = comm.Get_rank() # passing MPI datatypes explicitly if rank == 0: data = numpy.arange(1000, dtype='i') comm.Send([data, MPI.INT], dest=1, tag=77) elif rank == 1: data = numpy.empty(1000, dtype='i') comm.Recv([data, MPI.INT], source=0, tag=77) # automatic MPI datatype discovery if rank == 0: data = numpy.arange(100, dtype=numpy.float64) comm.Send(data, dest=1, tag=13) elif rank == 1: data = numpy.empty(100, dtype=numpy.float64) comm.Recv(data, source=0, tag=13)
Don't forget that mpi4py will serialize the data when passing it to other ranks, but the good news is that it works very hard to make this as efficient as possible. The third and last example is a simple broadcast (bcast) of a Python dictionary:
from mpi4py import MPI comm = MPI.COMM_WORLD rank = comm.Get_rank() if rank == 0: data = {'key1' : [7, 2.72, 2+3j], 'key2' : ( 'abc', 'xyz')} else: data = None #endif data = comm.bcast(data, root=0)
I threw in my Fortran habit of putting in a comment to mark the end of an if/then/else statement or loop (sometimes the indentation gets a little crazy).
UCX
HPC developers, which can include MPI developers and HPC system vendors, seem to create communication frameworks (communication middleware) for new concepts, which limits interoperability and means users have to "port" their applications to the frameworks if they switch software stacks. To reduce this problem, Unified Communication X (UCX) was created by a multivendor consortium. UCX is an "… open-source, product grade, communication framework focusing on data-centric and high performance applications." The goal of UCX is to provide the functionality needed for efficient high-performance communication, eliminating the need to "roll your own" and reducing the porting time when different software stacks are involved.
UCX has a set of interfaces to various libraries that support networks such as InfiniBand and NVLink. Many tools can be built on top of UCX taking advantage of the commonality of UCX as well as the performance and portability. For example, UCX is currently a framework used within Open MPI. You can also create bindings on top of UCX to provide fast, low-latency data communications for various networks. UCX has three parts to its framework: (1) UC-P for Protocols, (2) UC-T for Transport, and (3) UC-S for Services. UC-P is a high-level API that uses the UCT transport layer framework to construct common protocols and is useful for things such as multirail support, device selection, tag-matching, and so on. UC-T for Transport is a low-level API that exposes basic network operations used by the network hardware. Finally, UC-S for Services provides the infrastructure for component-based programming, memory management, and other services and is primarily used for platform abstractions, data structures, and so on. These APIs are used within UCX to achieve the goals of portability, high bandwidth, and low latency.