Почему не отображается DataFrame после применения на нем в Spark UDF?

В user defined function (UDF) функция объединения двух списков. Результирующий DF не отображается, хотя, без применения функции, исходный DF выводится на экран.

#объединим теги и жанры в единое пространство текстовых фич
from pyspark.sql.types import ArrayType, StringType

# В Spark нет некоторых полезных функций, но легко можно создать свои
# в частности, мы хотим привести все жанры также к верхнему регистру
list_concat = sql_func.udf(
    lambda one_list, another_list:
        [str.upper(elem) for elem in one_list] + (another_list if another_list else[]),
    returnType=ArrayType(StringType())
)

content_features = (
    tags
    .select(
        'movieId',
        'title',
        list_concat('genres_list','tags_list').alias('content_features')
    )
    .cache()
)

content_features.toPandas()

Отчет об ошибке:

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-26-5fcd26081fff> in <module>
----> 1 content_features.toPandas()

D:\Anaconda\lib\site-packages\pyspark\sql\pandas\conversion.py in toPandas(self)
    155 
    156         # Below is toPandas without Arrow optimization.
--> 157         pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
    158         column_counter = Counter(self.columns)
    159 
...
Py4JJavaError: An error occurred while calling o258.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 55.0 failed 1 times, most recent failure: Lost task 1.0 in stage 55.0 (TID 70) (192.168.1.37 executor driver): java.io.IOException: Cannot run program "python3": CreateProcess error=2, Не удается найти указанный файл
    at java.lang.ProcessBuilder.start(Unknown Source)
    at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:166)
...

Ответы (1 шт):

Автор решения: MaxU

В этом ответе (в английской версии SO) я привел пример использования Pandas функций в UDF для PySpark:

from pyspark.sql.functions import lit,unix_timestamp, udf, col, lit
from pyspark.sql.types import DoubleType, DecimalType
from pyspark.sql.functions import pandas_udf

@pandas_udf(DoubleType())
def ts_diff(start, end):
    return (end - start).dt.total_seconds()

использование:

>>> new_df.withColumn("DIFF", ts_diff("time1", "time2")).show()
+---+-----+-------------------+-------------------+----+
|age| name|              time1|              time2|DIFF|
+---+-----+-------------------+-------------------+----+
|  1|Alice|2021-07-25 17:21:58|2021-07-25 17:23:36|98.0|
|  2|Again|2021-07-25 17:21:58|2021-07-25 17:23:36|98.0|
+---+-----+-------------------+-------------------+----+
→ Ссылка