Python: 'before' And 'after' For Multiprocessing Workers
Solution 1:
So you basically create a histogram. This is can easily be parallelized, because histograms can be merged without complication. One might want to say that this problem is trivially parallelizable or "embarrassingly parallel". That is, you do not need to worry about communication among workers.
Just split your data set into multiple chunks, let your workers work on these chunks independently, collect the histogram of each worker, and then merge the histograms.
In practice, this problem is best off by letting each worker process/read its own file. That is, a "task" could be a file name. You should not start pickling file contents and send them around between processes through pipes. Let each worker process retrieve the bulk data directly from files. Otherwise your architecture spends too much time with inter-process communication, instead of doing some real work.
Do you need an example or can you figure this out yourself?
Edit: example implementation
I have a number of data files with file names in this format: data0.txt, data1.txt, ... .
Example contents:
wolf
wolf
cat
blume
eisenbahn
The goal is to create a histogram over the words contained in the data files. This is the code:
from multiprocessing import Pool
from collections import Counter
import glob
defbuild_histogram(filepath):
    """This function is run by a worker process.
    The `filepath` argument is communicated to the worker
    through a pipe. The return value of this function is
    communicated to the manager through a pipe.
    """
    hist = Counter()
    withopen(filepath) as f:
        for line in f:
            hist[line.strip()] += 1return hist
defmain():
    """This function runs in the manager (main) process."""# Collect paths to data files.
    datafile_paths = glob.glob("data*.txt")
    # Create a pool of worker processes and distribute work.# The input to worker processes (function argument) as well# as the output by worker processes is transmitted through# pipes, behind the scenes.
    pool = Pool(processes=3)
    histograms = pool.map(build_histogram, datafile_paths)
    # Properly shut down the pool of worker processes, and# wait until all of them have finished.
    pool.close()
    pool.join()
    # Merge sub-histograms. Do not create too many intermediate# objects: update the first sub-histogram with the others.# Relevant docs: collections.Counter.update
    merged_hist = histograms[0]
    for h in histograms[1:]:
        merged_hist.update(h)
    for word, count in merged_hist.items():
        print"%s: %s" % (word, count)
if __name__ == "__main__":
    main()
Test output:
python countwords.py
eisenbahn: 12auto: 6cat: 1katze: 10stadt: 1wolf: 3zug: 4blume: 5herbert: 14destruction: 4Solution 2:
I had to modify the original pool.py (the trouble was worker is defined as a method without any inheritance) to get what I want but it's not so bad, and probably better than writing a new pool entirely.
classworker(object):
    def__init__(self, inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
           wrap_exception=False, finalizer=None, finargs=()): 
        assert maxtasks isNoneor (type(maxtasks) == intand maxtasks > 0)
        put = outqueue.put
        get = inqueue.get
        self.completed = 0ifhasattr(inqueue, '_writer'):
            inqueue._writer.close()
            outqueue._reader.close()
        if initializer isnotNone:
            initializer(self, *initargs)
        defrun(self): 
            while maxtasks isNoneor (maxtasks and self.completed < maxtasks):
                try:
                    task = get()
                except (EOFError, OSError):
                    util.debug('worker got EOFError or OSError -- exiting')
                    breakif task isNone:
                    util.debug('worker got sentinel -- exiting')
                    break
                job, i, func, args, kwds = task
                try:
                    result = (True, func(*args, **kwds))
                except Exception as e:
                    if wrap_exception:
                        e = ExceptionWithTraceback(e, e.__traceback__)
                    result = (False, e)
                try:
                    put((job, i, result))
                except Exception as e:
                    wrapped = MaybeEncodingError(e, result[1])
                    util.debug("Possible encoding error while sending result: %s" % (
                        wrapped))
                    put((job, i, (False, wrapped)))
                self.completed += 1if finalizer:
                finalizer(self, *finargs)
            util.debug('worker exiting after %d tasks' % self.completed)
        run(self)
Post a Comment for "Python: 'before' And 'after' For Multiprocessing Workers"