Как правильно использовать комбинацию ThreadPool и PySpark для записи в одну таблицу?

У меня есть набор табличек (более тысячи), в каждой из которых одна колонка ('date') содержит только одно значение, но для разных таблиц это значение разное (как для партицирования).

Все эти файлы я хочу склеить в одну таблицу, но в цикле делать append очень долго. Идея в использовании конструкции вида:

def write_chunk(table_name):
    (
        spark.table(table_name)
        .join(...)
        .filter(...)
        .write.partitionBy('date').mode('append').format('parquet').saveAsTable('new_tn')
    )

pool = ThreadPool(16)

with tqdm(total=len(dates)) as pbar:
    for _ in pool.imap_unordered(write_chunk, dates):
        pbar.update()

Конечно, в конфиге используется опция для безопасной записи

spark.conf.set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")

Однако, при этом спарк ругается на невозможность переименования готовой таблицы из tmp формата в итоговый.

Есть ли возможность параллельно писать в одну таблицу спарком, и как это правильно делать (если возможно)?

PS: union на таком количестве таблиц не отрабатывает


Ответы (0 шт):