Skip to content Skip to sidebar Skip to footer

How To Parallelize Iteration Over A Range, Using Stdlib And Python 3?

I've been searching for an answer on this now for days to no avail. I'm probably just not understanding the pieces that are floating around out there and the Python documentation o

Solution 1:

Yes, that is doable. Your calculation is not dependend on intermediate results, so you can easily divide the task into chunks and distribute it over multiple processes. It's what is called an

embarrassingly parallel problem.

The only tricky part here might be, to divide the range into fairly equal parts in the first place. Straight out my personal lib two functions to deal with this:

# mp_utils.py

from itertools import accumulate

defcalc_batch_sizes(n_tasks: int, n_workers: int) -> list:"""Divide `n_tasks` optimally between n_workers to get batch_sizes.

    Guarantees batch sizes won't differ for more than 1.

    Example:
    # >>>calc_batch_sizes(23, 4)
    # Out: [6, 6, 6, 5]

    In case you're going to use numpy anyway, use np.array_split:
    [len(a) for a in np.array_split(np.arange(23), 4)]
    # Out: [6, 6, 6, 5]
    """
    x = int(n_tasks / n_workers)
    y = n_tasks % n_workers
    batch_sizes = [x + (y > 0)] * y + [x] * (n_workers - y)

    return batch_sizes


defbuild_batch_ranges(batch_sizes: list) -> list:"""Build batch_ranges from list of batch_sizes.

    Example:
    # batch_sizes [6, 6, 6, 5]
    # >>>build_batch_ranges(batch_sizes)
    # Out: [range(0, 6), range(6, 12), range(12, 18), range(18, 23)]
    """
    upper_bounds = [*accumulate(batch_sizes)]
    lower_bounds = [0] + upper_bounds[:-1]
    batch_ranges = [range(l, u) for l, u in zip(lower_bounds, upper_bounds)]

    return batch_ranges

Then your main script would look like this:

import time
from multiprocessing import Pool
from mp_utils import calc_batch_sizes, build_batch_ranges


deftarget_foo(batch_range):
    returnsum(batch_range)  # ~ 6x faster than target_foo1deftarget_foo1(batch_range):
    numbers = []
    for num in batch_range:
        numbers.append(num)
    returnsum(numbers)


if __name__ == '__main__':

    N = 100000000
    N_CORES = 4

    batch_sizes = calc_batch_sizes(N, n_workers=N_CORES)
    batch_ranges = build_batch_ranges(batch_sizes)

    start = time.perf_counter()
    with Pool(N_CORES) as pool:
        result = pool.map(target_foo, batch_ranges)
        r_sum = sum(result)
    print(r_sum)
    print(f'elapsed: {time.perf_counter() - start:.2f} s')

Note that I also switched your for-loop for a simple sum over the range object, since it offers much better performance. If you cant do this in your real app, a list comprehension would still be ~60% faster than filling your list manually like in your example.

Example Output:

4999999950000000elapsed:0.51 s

Process finished withexit code 0

Solution 2:

import timeit

from multiprocessing import Pool

defappendNumber(x):
    return x

start = timeit.default_timer()

with Pool(4) as p:
    numbers = p.map(appendNumber, range(100000000))

end = timeit.default_timer()

print('TIME: {} seconds'.format(end - start))
print('SUM:', sum(numbers))

So Pool.map is like the builtin map function. It takes a function and an iterable and produces a list of the result of calling that function on every element of the iterable. Here since we don't actually want to change the elements in the range iterable we just return the argument.

The crucial thing is that Pool.map divides up the provided iterable (range(1000000000) here) into chunks and sends them to the number of processes it has (defined here as 4 in Pool(4)) then rejoins the results back into one list.

The output I get when running this is

TIME: 8.748245699999984 secondsSUM: 4999999950000000

Solution 3:

I did a comparison, the time taken to split the tasks sometimes may take longer:

File multiprocessing_summation.py:

defsummation(lst):
  sum = 0for x inrange(lst[0], lst[1]):
    sum += x
  returnsum

File multiprocessing_summation_master.py:

%%file ./examples/multiprocessing_summation_master.py
import multiprocessing as mp
import timeit
import os
import sys
import multiprocessing_summation as mps

if __name__ == "__main__":

  iflen(sys.argv) == 1:
    print(f'{sys.argv[0]} <number1 ...>')
    sys.exit(1)
  else:
    args = [int(x) for x in sys.argv[1:]]

  nBegin = 1
  nCore = os.cpu_count()

  for nEnd in args:

    ### Approach 1  ########################
    start = timeit.default_timer()
    answer1 = mps.summation((nBegin, nEnd+1))
    end = timeit.default_timer()
    print(f'Answer1 = {answer1}')
    print(f'Time taken = {end - start}')

    ### Approach 2 ########################
    start = timeit.default_timer()
    lst = []
    for x inrange(nBegin, nEnd, int((nEnd-nBegin+1)/nCore)):
      lst.append(x)
    lst.append(nEnd+1)

    lst2 = []
    for x inrange(1, len(lst)):
      lst2.append((lst[x-1], lst[x]))

    with mp.Pool(processes=nCore) as pool:
      answer2 = pool.map(mps.summation, lst2)
    end = timeit.default_timer()
    print(f'Answer2 = {sum(answer2)}')
    print(f'Time taken = {end - start}')

Run the second script:

python multiprocessing_summation_master.py 1000 100000 10000000 1000000000

The outputs are:

Answer1 = 500500Timetaken=4.558405389566795e-05
Answer2 = 500500Timetaken=0.15728066685459452
Answer1 = 5000050000Timetaken=0.005781152051264199
Answer2 = 5000050000Timetaken=0.14532123447452705
Answer1 = 50000005000000Timetaken=0.4903863230334036
Answer2 = 50000005000000Timetaken=0.49744346392131533
Answer1 = 500000000500000000Timetaken=50.825169837068
Answer2 = 500000000500000000Timetaken=26.603663061636567

Post a Comment for "How To Parallelize Iteration Over A Range, Using Stdlib And Python 3?"