Как правильно использовать комбинацию ThreadPool и PySpark для записи в одну таблицу?
У меня есть набор табличек (более тысячи), в каждой из которых одна колонка ('date') содержит только одно значение, но для разных таблиц это значение разное (как для партицирования).
Все эти файлы я хочу склеить в одну таблицу, но в цикле делать append очень долго. Идея в использовании конструкции вида:
def write_chunk(table_name):
(
spark.table(table_name)
.join(...)
.filter(...)
.write.partitionBy('date').mode('append').format('parquet').saveAsTable('new_tn')
)
pool = ThreadPool(16)
with tqdm(total=len(dates)) as pbar:
for _ in pool.imap_unordered(write_chunk, dates):
pbar.update()
Конечно, в конфиге используется опция для безопасной записи
spark.conf.set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
Однако, при этом спарк ругается на невозможность переименования готовой таблицы из tmp формата в итоговый.
Есть ли возможность параллельно писать в одну таблицу спарком, и как это правильно делать (если возможно)?
PS: union на таком количестве таблиц не отрабатывает