__author__ = 'mirko' import sys import glob import time import datetime from mpi4py import MPI from subprocess import Popen VERBOSE = 0 EXIT_TAG = 2000 def input_files(my_path): """ Get the input names :param my_path: path of the input files :return: a list of inputs """ return glob.glob(my_path+'/*') def master(my_inputs): """ Receive all the inputs and dispatch them to all the other tasks :param my_inputs: :return: Feed all the tasks with inputs. When a task is finished get him another input. Do it all over until there's no more work to do """ elapsed_times = [] # stores the elapsed times for each task for i in range(size): elapsed_times.append(0.0) # feed all tasks for i in range(1, size): # get the next input input_ = my_inputs.pop() comm.send(input_, dest=i, tag=10) # feed back the tasks that have finished their work while my_inputs: my_status = MPI.Status() elaps = comm.recv(source=MPI.ANY_SOURCE, tag=1001, status=my_status) new_destination = my_status.source elapsed_times[new_destination] += elaps input_ = my_inputs.pop() comm.send(input_, dest=new_destination, tag=10) # receive last sends for i in range(1, size): my_status = MPI.Status() elaps = comm.recv(source=MPI.ANY_SOURCE, tag=1001, status=my_status) new_destination = my_status.source elapsed_times[new_destination] += elaps # tell all the tasks to abort for i in range(1, size): comm.send('ok', dest=i, tag=EXIT_TAG) return elapsed_times def slave(): """ :return: """ while True: status = MPI.Status() inp = comm.recv(source=0, tag=MPI.ANY_TAG, status=status) if status.tag == EXIT_TAG: break elapsed = run([inp]) comm.send(elapsed, dest=0, tag=1001) def run(my_inputs): """ Run cash_flow for all inputs by using Popen :param my_inputs: list of input files :return: """ t1_ = time.time() for input_ in my_inputs: with open(input_+'.out', 'w') as out_file: if VERBOSE: print "./cash_flow_dbg.x %s" % input_ pid = Popen(['./cash_flow_dbg.x', input_], stdout=out_file) pid.wait() t2_ = time.time() return t2_ - t1_ if __name__ == '__main__': comm = MPI.COMM_WORLD rank = comm.Get_rank() size = comm.Get_size() comm.Barrier() t1 = time.time() elapsed_times = [] if rank == 0: # welcome message print '*'*80 print datetime.datetime.now() print '*'*80 # get all the input names inputs = input_files(sys.argv[1]) print "Number of elaborations: %d" % len(inputs) elapsed_times = master(inputs) else: slave() comm.Barrier() t2 = time.time() if rank == 0: for i, time_ in enumerate(elapsed_times): print "Rank: %d Elapsed time %5.2f" % (i, time_) print '*'*80 print '%s Elapsed time: %5.2f ' % (datetime.datetime.now(), t2-t1) print '*'*80