Source code for sicor.Tools.tqdm_joblist

from time import sleep, time
from sys import stdout

__author__ = "Niklas Bohn, Andre Hollstein"


# noinspection PyDefaultArgument
[docs] def tqdm_joblist(jobs, update_intervall=1.0, max_update_intervall=10, n_status_prints=10, modify_jobs=True, sleep_time=1.0, apply_on_done=None, status_prints=True, overprint=True, extra_args={}): """ Monitor how a list of jobs submitted to a Pool becomes smaller as jobs become ready. :param overprint: TODO :param status_prints: True / False :param jobs: list og multiprocessing jobs :param apply_on_done: callable object, is called as ``apply_on_done(*job.get(),**extra_args)`` :param extra_args: should be dictionary of extra args which is passed to apply_on_done :param modify_jobs: if True, then jobs is changed in place and is empty on return if false, the list remains unchanged -> this increases memory consumption (!) :param n_status_prints: number of wanted prints, wont be reached exactly if runtime ob jobs varies :param update_intervall: initial update time :param sleep_time: if fiven, additional sleep time in seconds :return: if apply_on_done given, then a list of results from apply_on_done is returned, order is nor preserved :raises: Nothing19 """ t0 = time() t_last_status_print = time() n_jobs = len(jobs) if apply_on_done is not None: job_results = [] while jobs: # check which jobs are done, this is then used to split up the jobs readies = [job.ready() for job in jobs] # must be consistent for done and not done jobs! jobs_done = [job for ready, job in zip(readies, jobs) if ready is True] if apply_on_done is not None: for job in jobs_done: inp = job.get() try: # if inp is not a seqence, e.g. a scalar, then can't pass as * job_results.append(apply_on_done(*inp, **extra_args)) except TypeError: job_results.append(apply_on_done(inp, **extra_args)) else: for job in jobs: if job.ready(): job.get() del jobs_done # delete rererences to jobs to let gc free memory # modify jobs in place to let the gc free the memory from the jobs which are done if modify_jobs: jobs[:] = [job for ready, job in zip(readies, jobs) if ready is False] else: jobs = [job for ready, job in zip(readies, jobs) if ready is False] # print eta, ... if wanted if status_prints: if ((n_jobs - len(jobs)) != 0) and (time() - t_last_status_print) > update_intervall: eta = (time() - t0) / (n_jobs - len(jobs)) * len(jobs) tot = (time() - t0) / (n_jobs - len(jobs)) * n_jobs update_intervall = tot / n_status_prints if update_intervall > max_update_intervall: update_intervall = max_update_intervall t_last_status_print = time() msg = "Open Jobs: %i , eta: %.2fs~%.2fm~%.2fh, tot: %.2fs~%.2fm~%.2fh" % ( len(jobs), eta, eta / 60., eta / 3600.0, tot, tot / 60., tot / 3600.) if overprint: stdout.write("\r%s" % msg) stdout.flush() else: print(msg) if (n_jobs - len(jobs)) == 0: msg = "Open Jobs: %i, time:%i" % (len(jobs), time()) if overprint: stdout.write("\r%s" % msg) stdout.flush() pass else: print(msg) # if set, we sleep some time if sleep_time is not None: sleep(sleep_time) # return only if apply_on_done where given if apply_on_done is not None: return job_results
if __name__ == "__main__": print("Test") from multiprocessing import Pool def test(tt): sleep(tt) return tt def process(tt, jj): return tt * jj print("Simple") pp = Pool() # max number of processes tqdm_joblist([pp.apply_async(func=test, args=(tt,)) for tt in 1000 * [0.001]], n_status_prints=10) pp.close() pp.join() # wait for all runs to finish print("Call on done") pp = Pool() # max number of processes print(tqdm_joblist([pp.apply_async(func=test, args=(tt,)) for tt in 20 * [0.5]], n_status_prints=10, apply_on_done=process, # extra_args={"jj":10.3}) )) pp.close() pp.join() # wait for all runs to finish