Differences

This shows you the differences between two versions of the page.

random:python_multiprocessing_lazy_iterating_map [2013/12/03 19:14] (current)
grant created
Line 1: Line 1:
 +======Python Multiprocessing Lazy Iterating Map======
 +Python multiprocessing function that processes work in a pool of worker functions. The advantage of this code is that it doesn't consume the entire iterable unlike the standard library. The iterable is consumed on-demand as workers are ready to process items.
 +
 +<code python>
 +from multiprocessing import Process, Queue, cpu_count
 +from Queue import Full as QueueFull
 +from Queue import Empty as QueueEmpty
 +
 +def worker(recvq, sendq):
 +    for func, args in iter(recvq.get, None):
 +        result = func(*args)
 +        sendq.put(result)
 +
 +def pool_imap_unordered(function, iterable, procs=cpu_count()):
 +    # Create queues for sending/receiving items from iterable.
 +
 +    sendq = Queue(procs)
 +    recvq = Queue()
 +
 +    # Start worker processes.
 +
 +    for rpt in xrange(procs):
 +        Process(target=worker, args=(sendq, recvq)).start()
 +
 +    # Iterate iterable and communicate with worker processes.
 +
 +    send_len = 0
 +    recv_len = 0
 +    itr = iter(iterable)
 +
 +    try:
 +        value = itr.next()
 +        while True:
 +            try:
 +                sendq.put((function, value), True, 0.1)
 +                send_len += 1
 +                value = itr.next()
 +            except QueueFull:
 +                while True:
 +                    try:
 +                        result = recvq.get(False)
 +                        recv_len += 1
 +                        yield result
 +                    except QueueEmpty:
 +                        break
 +    except StopIteration:
 +        pass
 +
 +    # Collect all remaining results.
 +
 +    while recv_len < send_len:
 +        result = recvq.get()
 +        recv_len += 1
 +        yield result
 +
 +    # Terminate worker processes.
 +
 +    for rpt in xrange(procs):
 +        sendq.put(None)
 +</code>
random/python_multiprocessing_lazy_iterating_map.txt · Last modified: 2013/12/03 19:14 by grant