Проблема в обработке потоковых данных. Spark

Я обучила модель с помощью SparkMl, использую датасет с данными о погоде чтоб предсказывать прогноз. Когда пробую прогнать модель без использования потоковых данных, то все получается отлично:

from pyspark.ml import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf
from pyspark.sql.functions import regexp_replace

# Создаем Spark сессию
spark = SparkSession.builder.appName("WeatherPredictionApp").getOrCreate()

# Загружаем модель
model = PipelineModel.load("model")

# Читаем данные из файла CSV и преобразуем их в DataFrame
data = spark.read.csv("content/dataset1.csv", header=True)

# Преобразуем типы данных столбцов в числовой формат
data = data.withColumn("coord_lon", data["coord_lon"].cast("double"))
data = data.withColumn("coord_lat", data["coord_lat"].cast("double"))
data = data.withColumn("main_temp", data["main_temp"].cast("double"))
data = data.withColumn("main_feels_like", data["main_feels_like"].cast("double"))
data = data.withColumn("main_temp_min", data["main_temp_min"].cast("double"))
data = data.withColumn("main_temp_max", data["main_temp_max"].cast("double"))
data = data.withColumn("main_pressure", data["main_pressure"].cast("double"))
data = data.withColumn("main_humidity", data["main_humidity"].cast("double"))
data = data.withColumn("clouds_all", data["clouds_all"].cast("double"))


assembler = VectorAssembler(inputCols=["coord_lon", "coord_lat", "main_temp", "main_feels_like", "main_temp_min", "main_temp_max", "main_pressure", "main_humidity", "clouds_all"], outputCol="features")
data_with_features = assembler.transform(data)

# Применяем модель к данным
predictions = model.transform(data_with_features)

weather_dict = {
    0.0: "overcast clouds",
    1.0: "few clouds",
    2.0: "scattered clouds",
    3.0: "few clouds",
    4.0: "light rain",
    5.0: "moderate rain",
    6.0: "clear sky",
    7.0: "heavy intensity rain",
    8.0: "very heavy rain",
    9.0: "light intensity rain",
    10.0: "Thunderstorm",
    11.0: "thunderstorm",
    12.0: "extreme rain"
}



# Выбираем необходимые столбцы для вывода
output = predictions.select("datetime", "prediction")

def weather_name(prediction):
    return weather_dict.get(prediction, "unknown")

weather_udf = udf(weather_name)

# Применяем UDF к столбцу predictions
output_with_names = output.withColumn("weather_name", weather_udf("prediction"))

output_clean_datetime = output_with_names.withColumn("datetime", regexp_replace("datetime", '"10000.0,"', ""))

# Выводим результаты
output_clean_datetime.show()

Но как только я подключаю потоковые данные, то постоянно получаю ошибку о типе данных.

from pyspark.sql import SparkSession
from pyspark.ml import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

# Создаем Spark сессию
spark = SparkSession.builder.appName("WeatherPredictionApp").getOrCreate()

# Загружаем модель
model = PipelineModel.load("model")

schema = StructType([
    StructField("coord_lon", DoubleType(), True),
    StructField("coord_lat", DoubleType(), True),
    StructField("main_temp", DoubleType(), True),
    StructField("main_feels_like", DoubleType(), True),
    StructField("main_temp_min", DoubleType(), True),
    StructField("main_temp_max", DoubleType(), True),
    StructField("main_pressure", DoubleType(), True),
    StructField("main_humidity", DoubleType(), True),
    StructField("clouds_all", DoubleType(), True),
])

# Загрузка потоковых данных с HDFS
streaming_data = spark.readStream \
    .format("csv") \
    .schema(schema) \
    .option("header", "true") \
    .load("hdfs://namenode:9000/test/datasets")

assembler = VectorAssembler(inputCols=["coord_lon", "coord_lat", "main_temp", "main_feels_like", "main_temp_min", "main_temp_max", "main_pressure", "main_humidity", "clouds_all"], outputCol="features")

streaming_assembled = assembler.transform(streaming_data)

# Применяем модель к данным
predictions = model.transform(streaming_assembled)

weather_dict = {
    0.0: "overcast clouds",
    1.0: "few clouds",
    2.0: "scattered clouds",
    3.0: "few clouds",
    4.0: "light rain",
    5.0: "moderate rain",
    6.0: "clear sky",
    7.0: "heavy intensity rain",
    8.0: "very heavy rain",
    9.0: "light intensity rain",
    10.0: "Thunderstorm",
    11.0: "thunderstorm",
    12.0: "extreme rain"
}

# Выбираем необходимые столбцы для вывода
output = predictions.select("prediction")

def weather_name(prediction):
    return weather_dict.get(prediction, "unknown")

weather_udf = udf(weather_name)

# Применяем UDF к столбцу predictions
output_with_names = output.withColumn("weather_name", weather_udf("prediction"))

# Выводим результаты
query = output_clean_datetime \
    .writeStream \
    .format("csv") \
    .option("header", "true") \
    .option("checkpointLocation", "hdfs://namenode:9000/test/checkpoint") \
    .start("hdfs://namenode:9000/test/pred")
    
query.awaitTermination()

Сама ошибка:

StreamingQueryException: [STREAM_FAILED] Query [id = c69ac199-7313-4c98-a79d-32aea108a729, runId = f9dc5633-eb0b-4f20-8984-e281a885a5db] terminated with exception: Job aborted due to stage failure: Task 2 in stage 173.0 failed 1 times, most recent failure: Lost task 2.0 in stage 173.0 (TID 193) (87a8c3198a19 executor driver): org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`VectorAssembler$$Lambda$3232/0x00007f33c90cd670`: (struct<coord_lon:double,coord_lat:double,main_temp:double,main_feels_like:double,main_temp_min:double,main_temp_max:double,main_pressure:double,main_humidity:double,clouds_all:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>).
    at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
    at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
    at org.apache.spark.sql.execution.python.BasePythonUDFRunner$PythonUDFWriterThread.writeIteratorToStream(PythonUDFRunner.scala:58)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "error". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
    at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:291)
    at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
    at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
    at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
    at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
    at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
    ... 18 more

Ответы (0 шт):