Skip to content Skip to sidebar Skip to footer

Improve Parallelism In Spark Sql

I have the below code. I am using pyspark 1.2.1 with python 2.7 (cpython) for colname in shuffle_columns: colrdd = hive_context.sql('select %s from %s' % (colname, temp_table))

Solution 1:

Unfortunately, the below does not work well. It works in the sense that all of the individual iterations execute. Unfortunately, subsequent calls to the hive_context object fail due to a nullpointer exception.


This is possible with concurrent.futures:

from concurrent import futures

def make_col_temptable(colname):
    colrdd = hive_context.sql('select %s from %s' % (colname, temp_table))
    # zip_with_random_index is expensive
    colwidx = zip_with_random_index(colrdd).map(merge_index_on_row)
    (hive_context.applySchema(colwidx, a_schema)
        .registerTempTable(a_name))

with futures.ThreadPoolExecutor(max_workers=20) as executor:
    futures.wait([executor.submit(make_col_temptable, colname) for colname in shuffle_columns])

Post a Comment for "Improve Parallelism In Spark Sql"