Combining Multithreading And Multiprocessing With Concurrent.futures
Solution 1:
Sorry, but I can't make time to explain all this, so I'll just give code "that works". I urge you to start with something simpler, because the learning curve is non-trivial. Leave numpy out of it at first; stick to only threads at first; then move to only processes; and unless you're an expert don't try to parallelize anything other than named module-level functions (no, not function-local anonymous lambdas).
As often happens, the error messages you "should be" getting are being suppressed because they occur asynchronously so there's no good way to report them. Liberally add print()
statements to see how far you're getting.
Note: I stripped numpy out of this, and added the stuff needed so it runs on Windows too. I expect using numpy's array_split()
instead would work fine, but I didn't have numpy handy on the machine I was on at the time.
import concurrent.futures as cf
import os
import time
defarray_split(xs, n):
from itertools import islice
it = iter(xs)
result = []
q, r = divmod(len(xs), n)
for i inrange(r):
result.append(list(islice(it, q+1)))
for i inrange(n - r):
result.append(list(islice(it, q)))
return result
ids = range(1, 11)
deff(x):
print(f"called with {x}")
time.sleep(5)
x**2defmultithread_accounts(AccountNumbers, f, n_threads=2):
with cf.ThreadPoolExecutor(max_workers=n_threads) as executor:
forslicein array_split(AccountNumbers, n_threads):
executor.map(f, slice)
defparallelize_distribute(AccountNumbers, f, n_threads=2, n_processors=os.cpu_count()):
slices = array_split(AccountNumbers, n_processors)
print("top slices", slices)
with cf.ProcessPoolExecutor(max_workers=n_processors) as executor:
executor.map(multithread_accounts, slices,
[f] * len(slices),
[n_threads] * len(slices))
if __name__ == "__main__":
parallelize_distribute(ids, f, n_processors=2, n_threads=2)
BTW, I suggest this makes more sense for the threaded part:
defmultithread_accounts(AccountNumbers, f, n_threads=2):
with cf.ThreadPoolExecutor(max_workers=n_threads) as executor:
executor.map(f, AccountNumbers)
That is, there's really no need to split the list yourself here - the threading machinery will split it up itself. It's possible you missed that in your original attempts, because the ThreadPoolExecutor()
call in the code you posted forgot to specify the max_workers
argument.
Post a Comment for "Combining Multithreading And Multiprocessing With Concurrent.futures"