Our Code
Writing code for a cluster can be a little bit confusing. Unfortunately, there are some extra steps to be taken before a cluster can be taken advantage of. As a reference, I am personally using python 2.7 and mpich2 as my message passing interface. Let's just jump right into the code.
First, let's look at what a serial version of a pi approximation looks like.
""" Approximation of pi for comparison with a parallel version """ from __future__ import division from math import factorial as f import sys import time from decimal import * # Python package for intuitive handling decimal numbers getcontext().prec = 50 def piApprox(n): """ Where n is the number of terms of the summation to calculate. """ pi = 0 for i in xrange(n+1): sign = 1 if i % 2 == 0 else -1 # sign is +1 if i is even and -1 if i is odd pi += Decimal(sign * 2 * 3 ** (.5-i)) / Decimal(2 * i + 1) # Madhava-Leibniz series approximation for pi return pi def main(): n = sys.argv[1] # requires the program be called with a command line n = int(n) # argument specifying the desired number of terms start = time.time() pi = piApprox(n) elapsed = time.time() - start print("Estimation of {} terms is: {}.".format(n, pi)) print("Elapsed time: {}.".format(elapsed)) if __name__ == '__main__': # this bit of code allow for using this program as a main() # standalone program or as a module
This is simple enough, maybe you've even written something similar. I'll go over a few things.
-If you are unfamiliar with python 2.7, the line at the top -- from __future__ import division -- probably looks strange to you. All this is doing is telling python to compute divisions as floats and to not do integer division (that would really mess up our approximation).
-sys.argv[1] is telling python to take a look at the 1 index of the commandline arguments (where index 0 is the name of the file you are running) For example, we would run this file like
We can see the elapsed time was 3.26 seconds. Then we just run the piApprox(n) function and add up our n terms. Alright, now let's take a look at the parallel version.
from __future__ import division import sys import time from mpi4py import MPI from math import factorial as f from decimal import * getcontext().prec = 50 def nthTerm(data): """ Misleading function name. Computes the sum of terms in data. """ total = 0 for i in data: sign = 1 if i % 2 == 0 else -1 total += Decimal(sign * 2 * 3 ** (.5-i)) / Decimal(2 * i + 1) return total def main(): comm = MPI.COMM_WORLD # comm is an mpi4py.MPI object representing the "world" of the cluster. rank = comm.Get_rank() # That is, all the nodes and all the CPUs of the nodes, etc. if rank == 0: # Each cpu on each node is given an integer "rank" (0, 1, 2, 3,...) start = time.time() # Usually, rank 0 is called the "head" node and assigns jobs to the i = int(sys.argv[1]) # higher-ranked "worker" nodes. The worker nodes do their jobs and size = comm.Get_size() # subsequently send the results back to the head node to assemble and workers = size - 1 # collate. termsplits = [] for x in range(workers): termsplits.append(xrange(x, i+1, workers)) for y, data in enumerate(termsplits): dest = y + 1 if dest >= size: pass else: comm.send(data, dest=dest) pi = 0 for y in range(workers): source = y + 1 print("Waiting for source:{}".format(source)) pi_appr = comm.recv(source=source) pi += pi_appr elapsed = time.time() - start print(pi) print("MPI pi approx time: {} seconds".format(elapsed)) else: data = comm.recv(source=0) pi_appr = nthTerm(data) comm.send(pi_appr, dest=0) if __name__ == '__main__': main()
In many ways, the two versions look quite similar, and they should. They are computing the same thing after all. However, there are a few differences.
At the top, we of course have to do a from mpi4py import MPI, so that we can use our cluster. I think that the important thing to think about and realize is that, this exact program is going to run on all of the processors that we tell it to - more or less independent of each other. So unless we explicitly tell two cores to communicate with eachother, they really don't care, or even know what another processor is doing. They are, in fact, completely separate python instances.
In this case, we will need the cores to communicate in order to get a sum of terms. The extra step that we have to do in this version, versus the serial version, is divy up the work for each of our processors.
Each process comes with a rank which we can access through MPI.COMM_WORLD.Get_rank(). We can use the rank of the processor to divide up the work. Basically we'll set up some conditionals so that a certain process will only run some block of code, if it is the correct rank. I denoted rank = 0, to be a sort of "master" process. It is what is dividing up all of the work into n pieces. Where n is the number of cores we are going to use. This is generally the sum of the numbers in your machinefile. This is the block where we divide up the work.
i = int(sys.argv[1]) size = comm.Get_size() workers = size - 1 termsplits = [] for x in range(workers): termsplits.append(xrange(x, i+1, workers))
Here, I denote the total cores - 1 to be the number of workers. (Because 1 of them is going to be the master process) Then, I split the terms into workers lists, where workers is an integer. I also used workers as the step size so that each list would have a similar number of small and large numbers. Next we need to send the lists out to their respective cores.
for y, data in enumerate(termsplits): dest = y + 1 if dest >= size: pass else: comm.send(data, dest=dest)
Here, we use comm.send() to send the term lists to each of the processors. The first argument is the data you want to send, and the 2nd argument is the core you want to send it to. Then we need to receive the data into the other cores.
data = comm.recv(source=0) pi_appr = nthTerm(data) comm.send(pi_appr, dest=0)
Here we are receiving the data from the master core, computing the sum of the terms in our data list, and sending it back to the master core. All that's left is to sum of the data from each of the computing cores.
for y in range(workers): source = y + 1 print("Waiting for source:{}".format(source)) pi_appr = comm.recv(source=source) pi += pi_appr print(pi)
And that's pretty much it! For comparison, let's check our timing on the parallel version.
Considerably faster! About 10x with 11 computing cores.