Почему не отображается DataFrame после применения на нем в Spark UDF?
В user defined function (UDF) функция объединения двух списков. Результирующий DF не отображается, хотя, без применения функции, исходный DF выводится на экран.
#объединим теги и жанры в единое пространство текстовых фич
from pyspark.sql.types import ArrayType, StringType
# В Spark нет некоторых полезных функций, но легко можно создать свои
# в частности, мы хотим привести все жанры также к верхнему регистру
list_concat = sql_func.udf(
lambda one_list, another_list:
[str.upper(elem) for elem in one_list] + (another_list if another_list else[]),
returnType=ArrayType(StringType())
)
content_features = (
tags
.select(
'movieId',
'title',
list_concat('genres_list','tags_list').alias('content_features')
)
.cache()
)
content_features.toPandas()
Отчет об ошибке:
Py4JJavaError Traceback (most recent call last)
<ipython-input-26-5fcd26081fff> in <module>
----> 1 content_features.toPandas()
D:\Anaconda\lib\site-packages\pyspark\sql\pandas\conversion.py in toPandas(self)
155
156 # Below is toPandas without Arrow optimization.
--> 157 pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
158 column_counter = Counter(self.columns)
159
...
Py4JJavaError: An error occurred while calling o258.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 55.0 failed 1 times, most recent failure: Lost task 1.0 in stage 55.0 (TID 70) (192.168.1.37 executor driver): java.io.IOException: Cannot run program "python3": CreateProcess error=2, Не удается найти указанный файл
at java.lang.ProcessBuilder.start(Unknown Source)
at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:166)
...
Ответы (1 шт):
Автор решения: MaxU
→ Ссылка
В этом ответе (в английской версии SO) я привел пример использования Pandas функций в UDF для PySpark:
from pyspark.sql.functions import lit,unix_timestamp, udf, col, lit
from pyspark.sql.types import DoubleType, DecimalType
from pyspark.sql.functions import pandas_udf
@pandas_udf(DoubleType())
def ts_diff(start, end):
return (end - start).dt.total_seconds()
использование:
>>> new_df.withColumn("DIFF", ts_diff("time1", "time2")).show()
+---+-----+-------------------+-------------------+----+
|age| name| time1| time2|DIFF|
+---+-----+-------------------+-------------------+----+
| 1|Alice|2021-07-25 17:21:58|2021-07-25 17:23:36|98.0|
| 2|Again|2021-07-25 17:21:58|2021-07-25 17:23:36|98.0|
+---+-----+-------------------+-------------------+----+