Differences
This shows you the differences between two versions of the page.
— |
random:python_multiprocessing_lazy_iterating_map [2013/12/03 19:14] (current) grant created |
||
---|---|---|---|
Line 1: | Line 1: | ||
+ | ======Python Multiprocessing Lazy Iterating Map====== | ||
+ | Python multiprocessing function that processes work in a pool of worker functions. The advantage of this code is that it doesn't consume the entire iterable unlike the standard library. The iterable is consumed on-demand as workers are ready to process items. | ||
+ | |||
+ | <code python> | ||
+ | from multiprocessing import Process, Queue, cpu_count | ||
+ | from Queue import Full as QueueFull | ||
+ | from Queue import Empty as QueueEmpty | ||
+ | |||
+ | def worker(recvq, sendq): | ||
+ | for func, args in iter(recvq.get, None): | ||
+ | result = func(*args) | ||
+ | sendq.put(result) | ||
+ | |||
+ | def pool_imap_unordered(function, iterable, procs=cpu_count()): | ||
+ | # Create queues for sending/receiving items from iterable. | ||
+ | |||
+ | sendq = Queue(procs) | ||
+ | recvq = Queue() | ||
+ | |||
+ | # Start worker processes. | ||
+ | |||
+ | for rpt in xrange(procs): | ||
+ | Process(target=worker, args=(sendq, recvq)).start() | ||
+ | |||
+ | # Iterate iterable and communicate with worker processes. | ||
+ | |||
+ | send_len = 0 | ||
+ | recv_len = 0 | ||
+ | itr = iter(iterable) | ||
+ | |||
+ | try: | ||
+ | value = itr.next() | ||
+ | while True: | ||
+ | try: | ||
+ | sendq.put((function, value), True, 0.1) | ||
+ | send_len += 1 | ||
+ | value = itr.next() | ||
+ | except QueueFull: | ||
+ | while True: | ||
+ | try: | ||
+ | result = recvq.get(False) | ||
+ | recv_len += 1 | ||
+ | yield result | ||
+ | except QueueEmpty: | ||
+ | break | ||
+ | except StopIteration: | ||
+ | pass | ||
+ | |||
+ | # Collect all remaining results. | ||
+ | |||
+ | while recv_len < send_len: | ||
+ | result = recvq.get() | ||
+ | recv_len += 1 | ||
+ | yield result | ||
+ | |||
+ | # Terminate worker processes. | ||
+ | |||
+ | for rpt in xrange(procs): | ||
+ | sendq.put(None) | ||
+ | </code> |