Проблема в обработке потоковых данных. Spark
Я обучила модель с помощью SparkMl, использую датасет с данными о погоде чтоб предсказывать прогноз. Когда пробую прогнать модель без использования потоковых данных, то все получается отлично:
from pyspark.ml import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf
from pyspark.sql.functions import regexp_replace
# Создаем Spark сессию
spark = SparkSession.builder.appName("WeatherPredictionApp").getOrCreate()
# Загружаем модель
model = PipelineModel.load("model")
# Читаем данные из файла CSV и преобразуем их в DataFrame
data = spark.read.csv("content/dataset1.csv", header=True)
# Преобразуем типы данных столбцов в числовой формат
data = data.withColumn("coord_lon", data["coord_lon"].cast("double"))
data = data.withColumn("coord_lat", data["coord_lat"].cast("double"))
data = data.withColumn("main_temp", data["main_temp"].cast("double"))
data = data.withColumn("main_feels_like", data["main_feels_like"].cast("double"))
data = data.withColumn("main_temp_min", data["main_temp_min"].cast("double"))
data = data.withColumn("main_temp_max", data["main_temp_max"].cast("double"))
data = data.withColumn("main_pressure", data["main_pressure"].cast("double"))
data = data.withColumn("main_humidity", data["main_humidity"].cast("double"))
data = data.withColumn("clouds_all", data["clouds_all"].cast("double"))
assembler = VectorAssembler(inputCols=["coord_lon", "coord_lat", "main_temp", "main_feels_like", "main_temp_min", "main_temp_max", "main_pressure", "main_humidity", "clouds_all"], outputCol="features")
data_with_features = assembler.transform(data)
# Применяем модель к данным
predictions = model.transform(data_with_features)
weather_dict = {
0.0: "overcast clouds",
1.0: "few clouds",
2.0: "scattered clouds",
3.0: "few clouds",
4.0: "light rain",
5.0: "moderate rain",
6.0: "clear sky",
7.0: "heavy intensity rain",
8.0: "very heavy rain",
9.0: "light intensity rain",
10.0: "Thunderstorm",
11.0: "thunderstorm",
12.0: "extreme rain"
}
# Выбираем необходимые столбцы для вывода
output = predictions.select("datetime", "prediction")
def weather_name(prediction):
return weather_dict.get(prediction, "unknown")
weather_udf = udf(weather_name)
# Применяем UDF к столбцу predictions
output_with_names = output.withColumn("weather_name", weather_udf("prediction"))
output_clean_datetime = output_with_names.withColumn("datetime", regexp_replace("datetime", '"10000.0,"', ""))
# Выводим результаты
output_clean_datetime.show()
Но как только я подключаю потоковые данные, то постоянно получаю ошибку о типе данных.
from pyspark.sql import SparkSession
from pyspark.ml import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
# Создаем Spark сессию
spark = SparkSession.builder.appName("WeatherPredictionApp").getOrCreate()
# Загружаем модель
model = PipelineModel.load("model")
schema = StructType([
StructField("coord_lon", DoubleType(), True),
StructField("coord_lat", DoubleType(), True),
StructField("main_temp", DoubleType(), True),
StructField("main_feels_like", DoubleType(), True),
StructField("main_temp_min", DoubleType(), True),
StructField("main_temp_max", DoubleType(), True),
StructField("main_pressure", DoubleType(), True),
StructField("main_humidity", DoubleType(), True),
StructField("clouds_all", DoubleType(), True),
])
# Загрузка потоковых данных с HDFS
streaming_data = spark.readStream \
.format("csv") \
.schema(schema) \
.option("header", "true") \
.load("hdfs://namenode:9000/test/datasets")
assembler = VectorAssembler(inputCols=["coord_lon", "coord_lat", "main_temp", "main_feels_like", "main_temp_min", "main_temp_max", "main_pressure", "main_humidity", "clouds_all"], outputCol="features")
streaming_assembled = assembler.transform(streaming_data)
# Применяем модель к данным
predictions = model.transform(streaming_assembled)
weather_dict = {
0.0: "overcast clouds",
1.0: "few clouds",
2.0: "scattered clouds",
3.0: "few clouds",
4.0: "light rain",
5.0: "moderate rain",
6.0: "clear sky",
7.0: "heavy intensity rain",
8.0: "very heavy rain",
9.0: "light intensity rain",
10.0: "Thunderstorm",
11.0: "thunderstorm",
12.0: "extreme rain"
}
# Выбираем необходимые столбцы для вывода
output = predictions.select("prediction")
def weather_name(prediction):
return weather_dict.get(prediction, "unknown")
weather_udf = udf(weather_name)
# Применяем UDF к столбцу predictions
output_with_names = output.withColumn("weather_name", weather_udf("prediction"))
# Выводим результаты
query = output_clean_datetime \
.writeStream \
.format("csv") \
.option("header", "true") \
.option("checkpointLocation", "hdfs://namenode:9000/test/checkpoint") \
.start("hdfs://namenode:9000/test/pred")
query.awaitTermination()
Сама ошибка:
StreamingQueryException: [STREAM_FAILED] Query [id = c69ac199-7313-4c98-a79d-32aea108a729, runId = f9dc5633-eb0b-4f20-8984-e281a885a5db] terminated with exception: Job aborted due to stage failure: Task 2 in stage 173.0 failed 1 times, most recent failure: Lost task 2.0 in stage 173.0 (TID 193) (87a8c3198a19 executor driver): org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`VectorAssembler$$Lambda$3232/0x00007f33c90cd670`: (struct<coord_lon:double,coord_lat:double,main_temp:double,main_feels_like:double,main_temp_min:double,main_temp_max:double,main_pressure:double,main_humidity:double,clouds_all:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>).
at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
at org.apache.spark.sql.execution.python.BasePythonUDFRunner$PythonUDFWriterThread.writeIteratorToStream(PythonUDFRunner.scala:58)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "error". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:291)
at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
... 18 more