Improve Parallelism In Spark Sql
I have the below code. I am using pyspark 1.2.1 with python 2.7 (cpython) for colname in shuffle_columns: colrdd = hive_context.sql('select %s from %s' % (colname, temp_table))
Solution 1:
Unfortunately, the below does not work well. It works in the sense that all of the individual iterations execute. Unfortunately, subsequent calls to the hive_context
object fail due to a nullpointer exception.
This is possible with concurrent.futures
:
from concurrent import futures
def make_col_temptable(colname):
colrdd = hive_context.sql('select %s from %s' % (colname, temp_table))
# zip_with_random_index is expensive
colwidx = zip_with_random_index(colrdd).map(merge_index_on_row)
(hive_context.applySchema(colwidx, a_schema)
.registerTempTable(a_name))
with futures.ThreadPoolExecutor(max_workers=20) as executor:
futures.wait([executor.submit(make_col_temptable, colname) for colname in shuffle_columns])
Post a Comment for "Improve Parallelism In Spark Sql"