Serial and Parallel Pi Calculation
Writing code for a cluster can be a little bit confusing. Unfortunately, there are some extra steps to be taken before a cluster can be taken advantage of. As a reference, I am personally using python 2.7 and mpich2 as my message passing interface. Let's just jump right into the code.
First, let's look at what a serial version of a pi approximation looks like.
""" Approximation of pi for comparison with a parallel version """ from __future__ import division import sys import time from decimal import * # Python package for intuitive handling decimal numbers getcontext().prec = 50 def piApprox(n): """ Where n is the number of terms of the summation to calculate. """ pi = 0 for i in xrange(n+1): sign = 1 if i % 2 == 0 else -1 # sign is +1 if i is even and -1 if i is odd pi += Decimal(sign * 2 * 3 ** (.5-i)) / Decimal(2 * i + 1) # Madhava-Leibniz series approximation for pi return pi def main(): n = sys.argv[1] # an argument taken from the command line when the program is run n = int(n) # argument specifying the desired number of terms start = time.time() # in the approximation pi = piApprox(n) elapsed = time.time() - start print("Estimation of {} terms is: {}.".format(n, pi)) # new style string formatting print("Elapsed time: {}.".format(elapsed)) if __name__ == '__main__': # this bit tells python to run main if we call piserial.py main() # It will not run if it is imported into another program
This is simple enough, maybe you've even written something similar. I'll go over a few things.
-If you are unfamiliar with python 2.7, the line at the top -- from __future__ import division -- probably looks strange to you. All this is doing is telling python to compute divisions as floats and to not do integer division (that would really mess up our approximation).
-sys.argv[1] is telling python to take a look at the 1 index of the commandline arguments (where index 0 is the name of the file you are running) For example, we would run this file like
We can see the elapsed time was 3.26 seconds. Then we just run the piApprox(n) function and add up our n terms. Alright, now let's take a look at the parallel version.
from __future__ import division import sys import time from mpi4py import MPI from decimal import * getcontext().prec = 50 def nthTerm(data): # data is a list of terms (see main() below) """ Misleading function name. Computes the sum of terms in data. """ total = 0 for i in data: sign = 1 if i % 2 == 0 else -1 total += Decimal(sign * 2 * 3 ** (.5-i)) / Decimal(2 * i + 1) return total # total is a partial sum of terms (contained in data) of the approx of pi def main(): comm = MPI.COMM_WORLD # comm is an mpi4py.MPI object representing the "world" of the cluster. rank = comm.Get_rank() # That is, all the nodes and all the CPUs of the nodes, etc. if rank == 0: # The head does this if block # Each cpu on each node is given an integer "rank" (0, 1, 2, 3,...) start = time.time() # Usually, rank 0 is called the "head" node and assigns jobs to the i = int(sys.argv[1]) # higher-ranked "worker" nodes. The worker nodes do their jobs and size = comm.Get_size() # subsequently send the results back to the head node to assemble and workers = size - 1 # collate. termsplits = [] # initialize a list for splitting the task up among the workers. for x in range(workers): # termsplits is a list of lists of terms the workers will compute termsplits.append(xrange(x, i+1, workers)) # (different for each worker) for y, data in enumerate(termsplits): # This block sends data (a list of terms) to each worker dest = y + 1 if dest >= size: pass else:formmating comm.send(data, dest=dest) pi = 0 for y in range(workers): # In this block, the head receives the results from the workers and source = y + 1 # adds them together to calculate pi print("Waiting for source:{}".format(source)) pi_appr = comm.recv(source=source) pi += pi_appr elapsed = time.time() - start print(pi) print("MPI pi approx time: {} seconds".format(elapsed)) else: # The workers each do this block. data = comm.recv(source=0) # They receive the list of terms to calculate, do the calculation and pi_appr = nthTerm(data) # send the result back to the head comm.send(pi_appr, dest=0) if __name__ == '__main__': main()
In many ways, the two versions look quite similar, and they should. They are computing the same thing after all. However, there are a few differences.
At the top, we of course have to do a from mpi4py import MPI, so that we can use our cluster. I think that the important thing to think about and realize is that, this exact program is going to run on all of the processors that we tell it to - more or less independent of each other. So unless we explicitly tell two cores to communicate with eachother, they really don't care, or even know what another processor is doing. They are, in fact, completely separate python instances.
In this case, we will need the cores to communicate in order to get a sum of terms. The extra step that we have to do in this version, versus the serial version, is divy up the work for each of our processors.
Each process comes with a rank which we can access through MPI.COMM_WORLD.Get_rank(). We can use the rank of the processor to divide up the work. Basically we'll set up some conditionals so that a certain process will only run some block of code, if it is the correct rank. I denoted rank = 0, to be a sort of "master" process. It is what is dividing up all of the work into n pieces. Where n is the number of cores we are going to use. This is generally the sum of the numbers in your machinefile. This is the block where we divide up the work.
i = int(sys.argv[1]) size = comm.Get_size() workers = size - 1 termsplits = [] for x in range(workers): termsplits.append(xrange(x, i+1, workers))
Here, I denote the total cores - 1 to be the number of workers. (Because 1 of them is going to be the master process) Then, I split the terms into workers lists, where workers is an integer. I also used workers as the step size so that each list would have a similar number of small and large numbers. Next we need to send the lists out to their respective cores.
for y, data in enumerate(termsplits): dest = y + 1 if dest >= size: pass else: comm.send(data, dest=dest)
Here, we use comm.send() to send the term lists to each of the processors. The first argument is the data you want to send, and the 2nd argument is the core you want to send it to. Then we need to receive the data into the other cores.
data = comm.recv(source=0) pi_appr = nthTerm(data) comm.send(pi_appr, dest=0)
Here we are receiving the data from the master core, computing the sum of the terms in our data list, and sending it back to the master core. All that's left is to sum of the data from each of the computing cores.
for y in range(workers): source = y + 1 print("Waiting for source:{}".format(source)) pi_appr = comm.recv(source=source) pi += pi_appr print(pi)
And that's pretty much it! For comparison, let's check our timing on the parallel version.
Considerably faster! About 10x with 11 computing cores.