python - My queue is empty after multiprocessing.Process instances finish -
python - My queue is empty after multiprocessing.Process instances finish -
i have python script @ top of file have:
result_queue = queue.queue() key_list = *a big list of little items* #(actually bucket.list() via boto)
i have learned queues process safe info structures. have method:
def enqueue_tasks(keys): key in keys: try: result = perform_scan.delay(key) result_queue.put(result) except: print "failed"
the perform_scan.delay()
function here calls celery worker, don't think relevant (it asynchronous process call).
i have:
def grouper(iterable, n, fillvalue=none): args = [iter(iterable)] * n homecoming izip_longest(fillvalue=fillvalue, *args)
lastly have main()
function:
def main(): executor = concurrent.futures.processpoolexecutor(10) futures = [executor.submit(enqueue_tasks, group) grouping in grouper(key_list, 40)] concurrent.futures.wait(futures) print len(result_queue)
the result print statement 0. yet if include print statement of size of result_queue
in enqueue_tasks
, while programme running, can see size increasing , things beingness added queue.
ideas of happening?
it looks there's simpler solution problem.
you're building list of futures. whole point of futures they're future results. in particular, whatever each function returns, that's (eventual) value of future. so, don't whole "push results onto queue" thing @ all, homecoming them task function, , pick them futures.
the simplest way break loop each key separate task, separate future. don't know whether that's appropriate real code, if is:
def do_task(key): try: homecoming perform_scan.delay(key) except: print "failed" def main(): executor = concurrent.futures.processpoolexecutor(10) futures = [executor.submit(do_task, key) key in key_list] # if want these results, want # loop around concurrent.futures.as_completed or similar here, # rather waiting them finish, ignoring results, # , printing number of them. concurrent.futures.wait(futures) print len(futures)
of course of study doesn't grouping. need it?
the reason grouping necessary tasks tiny overhead in scheduling them (and pickling inputs , outputs) swamps actual work. if that's true, can wait until whole batch done homecoming results. given you're not looking @ results until they're done anyway. (this model of "split groups, process each group, merge together" pretty mutual in cases numerical work, each element may tiny, or elements may not independent of each other, there groups big plenty or independent rest of work.)
at rate, that's simple:
def do_tasks(keys): results = [] key in keys: try: result = perform_scan.delay(key) results.append(result) except: print "failed" homecoming results def main(): executor = concurrent.futures.processpoolexecutor(10) futures = [executor.submit(enqueue_tasks, group) grouping in grouper(key_list, 40)] print sum(len(results) results in concurrent.futures.as_completed(futures))
or, if prefer first wait , calculate:
def main(): executor = concurrent.futures.processpoolexecutor(10) futures = [executor.submit(enqueue_tasks, group) grouping in grouper(key_list, 40)] concurrent.futures.wait(futures) print sum(len(future.result()) future in futures)
but again, uncertainty need this.
python queue multiprocessing celery concurrent.futures
Comments
Post a Comment