[Python, Scala, Apache] Тестирование в Apache Spark Structured Streaming
Автор
Сообщение
news_bot ®
Стаж: 6 лет 9 месяцев
Сообщений: 27286
Введение
На текущий момент не так много примеров тестов для приложений на основе Spark Structured Streaming. Поэтому в данной статье приводятся базовые примеры тестов с подробным описанием.
Все примеры используют: Apache Spark 3.0.1.
Подготовка
Необходимо установить:
- Apache Spark 3.0.x
- Python 3.7 и виртуальное окружение для него
- Conda 4.y
- scikit-learn 0.22.z
- Maven 3.v
- В примерах для Scala используется версия 2.12.10.
- Загрузить Apache Spark
- Распаковать: tar -xvzf ./spark-3.0.1-bin-hadoop2.7.tgz
- Создать окружение, к примеру, с помощью conda: conda create -n sp python=3.7
Необходимо настроить переменные среды. Здесь приведен пример для локального запуска.
SPARK_HOME=/Users/$USER/Documents/spark/spark-3.0.1-bin-hadoop2.7
PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9-src.zip;
Тесты
Пример с scikit-learn
При написании тестов необходимо разделять код таким образом, чтобы можно было изолировать логику и реальное применение конечного API. Хороший пример изоляции: DataFrame-pandas, DataFrame-spark.
Для написания тестов будет использоваться следующий пример: LinearRegression.
Итак, пусть код для тестирования использует следующий "шаблон" для Python:
class XService:
def __init__(self):
# Инициализация
def train(self, ds):
# Обучение
def predict(self, ds):
# Предсказание и вывод результатов
Для Scala шаблон выглядит соответственно.
Полный пример:
from sklearn import linear_model
class LocalService:
def __init__(self):
self.model = linear_model.LinearRegression()
def train(self, ds):
X, y = ds
self.model.fit(X, y)
def predict(self, ds):
r = self.model.predict(ds)
print(r)
Тест.
Импорт:
import unittest
import numpy as np
Основной класс:
class RunTest(unittest.TestCase):
Запуск тестов:
if __name__ == "__main__":
unittest.main()
Подготовка данных:
X = np.array([
[1, 1], # 6
[1, 2], # 8
[2, 2], # 9
[2, 3] # 11
])
y = np.dot(X, np.array([1, 2])) + 3 # [ 6 8 9 11], y = 1 * x_0 + 2 * x_1 + 3
Создание модели и обучение:
service = local_service.LocalService()
service.train((X, y))
Получение результатов:
service.predict(np.array([[3, 5]]))
service.predict(np.array([[4, 6]]))
Ответ:
[16.]
[19.]
Все вместе:
import unittest
import numpy as np
from spark_streaming_pp import local_service
class RunTest(unittest.TestCase):
def test_run(self):
# Prepare data.
X = np.array([
[1, 1], # 6
[1, 2], # 8
[2, 2], # 9
[2, 3] # 11
])
y = np.dot(X, np.array([1, 2])) + 3 # [ 6 8 9 11], y = 1 * x_0 + 2 * x_1 + 3
# Create model and train.
service = local_service.LocalService()
service.train((X, y))
# Predict and results.
service.predict(np.array([[3, 5]]))
service.predict(np.array([[4, 6]]))
# [16.]
# [19.]
if __name__ == "__main__":
unittest.main()
Пример с Spark и Python
Будет использован аналогичный алгоритм – LinearRegression. Нужно отметить, что Structured Streaming основан на тех же DataFrame-х, которые используются и в Spark Sql. Но как обычно есть нюансы.
Инициализация:
self.service = LinearRegression(maxIter=10, regParam=0.01)
self.model = None
Обучение:
self.model = self.service.fit(ds)
Получение результатов:
transformed_ds = self.model.transform(ds)
q = transformed_ds.select("label", "prediction").writeStream.format("console").start()
return q
Все вместе:
from pyspark.ml.regression import LinearRegression
class StructuredStreamingService:
def __init__(self):
self.service = LinearRegression(maxIter=10, regParam=0.01)
self.model = None
def train(self, ds):
self.model = self.service.fit(ds)
def predict(self, ds):
transformed_ds = self.model.transform(ds)
q = transformed_ds.select("label", "prediction").writeStream.format("console").start()
return q
Сам тест.
Обычно в тестах можно использовать данные, которые создаются прямо в тестах.
train_ds = spark.createDataFrame([
(6.0, Vectors.dense([1.0, 1.0])),
(8.0, Vectors.dense([1.0, 2.0])),
(9.0, Vectors.dense([2.0, 2.0])),
(11.0, Vectors.dense([2.0, 3.0]))
],
["label", "features"]
)
Это очень удобно и код получается компактным.
Но подобный код, к сожалению, не будет работать в Structured Streaming, т.к. созданный DataFrame не будет обладать нужными свойствами, хотя и будет соответствовать контракту DataFrame.
На текущий момент для создания источников для тестов можно использовать такой же подход, что и в тестах для Spark.
def test_stream_read_options_overwrite(self):
bad_schema = StructType([StructField("test", IntegerType(), False)])
schema = StructType([StructField("data", StringType(), False)])
df = self.spark.readStream.format('csv').option('path', 'python/test_support/sql/fake') \
.schema(bad_schema)\
.load(path='python/test_support/sql/streaming', schema=schema, format='text')
self.assertTrue(df.isStreaming)
self.assertEqual(df.schema.simpleString(), "struct<data:string>")
И так.
Создается контекст для работы:
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
Подготовка данных для обучения (можно сделать обычным способом):
train_ds = spark.createDataFrame([
(6.0, Vectors.dense([1.0, 1.0])),
(8.0, Vectors.dense([1.0, 2.0])),
(9.0, Vectors.dense([2.0, 2.0])),
(11.0, Vectors.dense([2.0, 3.0]))
],
["label", "features"]
)
Обучение:
service = structure_streaming_service.StructuredStreamingService()
service.train(train_ds)
Получение результатов. Для начала считываем данные из файла и выделяем: признаки и идентификатор для объектов. После запускаем предсказание с ожиданием в 3 секунды.
def extract_features(x):
values = x.split(",")
features_ = []
for i in values[1:]:
features_.append(float(i))
features = Vectors.dense(features_)
return features
extract_features_udf = udf(extract_features, VectorUDT())
def extract_label(x):
values = x.split(",")
label = float(values[0])
return label
extract_label_udf = udf(extract_label, FloatType())
predict_ds = spark.readStream.format("text").option("path", "data/structured_streaming").load() \
.withColumn("features", extract_features_udf(col("value"))) \
.withColumn("label", extract_label_udf(col("value")))
service.predict(predict_ds).awaitTermination(3)
Ответ:
15.96699
18.96138
Все вместе:
import unittest
import warnings
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import FloatType
from pyspark.ml.linalg import Vectors, VectorUDT
from spark_streaming_pp import structure_streaming_service
class RunTest(unittest.TestCase):
def test_run(self):
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
# Prepare data.
train_ds = spark.createDataFrame([
(6.0, Vectors.dense([1.0, 1.0])),
(8.0, Vectors.dense([1.0, 2.0])),
(9.0, Vectors.dense([2.0, 2.0])),
(11.0, Vectors.dense([2.0, 3.0]))
],
["label", "features"]
)
# Create model and train.
service = structure_streaming_service.StructuredStreamingService()
service.train(train_ds)
# Predict and results.
def extract_features(x):
values = x.split(",")
features_ = []
for i in values[1:]:
features_.append(float(i))
features = Vectors.dense(features_)
return features
extract_features_udf = udf(extract_features, VectorUDT())
def extract_label(x):
values = x.split(",")
label = float(values[0])
return label
extract_label_udf = udf(extract_label, FloatType())
predict_ds = spark.readStream.format("text").option("path", "data/structured_streaming").load() \
.withColumn("features", extract_features_udf(col("value"))) \
.withColumn("label", extract_label_udf(col("value")))
service.predict(predict_ds).awaitTermination(3)
# +-----+------------------+
# |label| prediction|
# +-----+------------------+
# | 1.0|15.966990887541273|
# | 2.0|18.961384020443553|
# +-----+------------------+
def setUp(self):
warnings.filterwarnings("ignore", category=ResourceWarning)
warnings.filterwarnings("ignore", category=DeprecationWarning)
if __name__ == "__main__":
unittest.main()
Нужно отметить, что для Scala можно воспользоваться созданием потока в памяти.
Это может выглядеть вот так:
implicit val sqlCtx = spark.sqlContext
import spark.implicits._
val source = MemoryStream[Record]
source.addData(Record(1.0, Vectors.dense(3.0, 5.0)))
source.addData(Record(2.0, Vectors.dense(4.0, 6.0)))
val predictDs = source.toDF()
service.predict(predictDs).awaitTermination(2000)
Полный пример на Scala (здесь, для разнообразия, не используется sql):
package aaa.abc.dd.spark_streaming_pr.cluster
import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.streaming.StreamingQuery
class StructuredStreamingService {
var service: LinearRegression = _
var model: LinearRegressionModel = _
def train(ds: DataFrame): Unit = {
service = new LinearRegression().setMaxIter(10).setRegParam(0.01)
model = service.fit(ds)
}
def predict(ds: DataFrame): StreamingQuery = {
val m = ds.sparkSession.sparkContext.broadcast(model)
def transformFun(features: org.apache.spark.ml.linalg.Vector): Double = {
m.value.predict(features)
}
val transform: org.apache.spark.ml.linalg.Vector => Double = transformFun
val toUpperUdf = udf(transform)
val predictionDs = ds.withColumn("prediction", toUpperUdf(ds("features")))
predictionDs
.writeStream
.foreachBatch((r: DataFrame, i: Long) => {
r.show()
// scalastyle:off println
println(s"$i")
// scalastyle:on println
})
.start()
}
}
Тест:
package aaa.abc.dd.spark_streaming_pr.cluster
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.scalatest.{Matchers, Outcome, fixture}
class StructuredStreamingServiceSuite extends fixture.FunSuite with Matchers {
test("run") { spark =>
// Prepare data.
val trainDs = spark.createDataFrame(Seq(
(6.0, Vectors.dense(1.0, 1.0)),
(8.0, Vectors.dense(1.0, 2.0)),
(9.0, Vectors.dense(2.0, 2.0)),
(11.0, Vectors.dense(2.0, 3.0))
)).toDF("label", "features")
// Create model and train.
val service = new StructuredStreamingService()
service.train(trainDs)
// Predict and results.
implicit val sqlCtx = spark.sqlContext
import spark.implicits._
val source = MemoryStream[Record]
source.addData(Record(1.0, Vectors.dense(3.0, 5.0)))
source.addData(Record(2.0, Vectors.dense(4.0, 6.0)))
val predictDs = source.toDF()
service.predict(predictDs).awaitTermination(2000)
// +-----+---------+------------------+
// |label| features| prediction|
// +-----+---------+------------------+
// | 1.0|[3.0,5.0]|15.966990887541273|
// | 2.0|[4.0,6.0]|18.961384020443553|
// +-----+---------+------------------+
}
override protected def withFixture(test: OneArgTest): Outcome = {
val spark = SparkSession.builder().master("local[2]").getOrCreate()
try withFixture(test.toNoArgTest(spark))
finally spark.stop()
}
override type FixtureParam = SparkSession
case class Record(label: Double, features: org.apache.spark.ml.linalg.Vector)
}
Выводы
При написании тестов необходимо разделять код таким образом, чтобы разделять логику и применение конкретных вызовов API. Можно использоваться любые доступные источники. В том числе и kafka.
Такие абстракции как “DataFrame” позволяют это сделать легко и просто.
При использовании Python данные придется хранить в файлах.
Ссылки и ресурсы
===========
Источник:
habr.com
===========
Похожие новости:
- [Python, JavaScript, Компиляторы] Ещё один способ использования python в браузере (и не только)
- [JavaScript, Программирование, C#, Rust] Вышла версия 1.0 библиотеки для управления секс-игрушками Buttplug
- [Python, Машинное обучение, TensorFlow] Хакатоны Осень 2020. Мой опыт
- [Разработка веб-сайтов, Программирование, Java] Пять причин, по которым следует использовать Apache Wicket (перевод)
- [Data Mining, Алгоритмы, Математика, Научно-популярное] Звездный год (365 дней 369 минут), Тропический год(+ 348.5 минут) и звездные сутки(1436 минут) в радиоактивном распаде
- [Python, Программирование, Data Mining, Алгоритмы, Машинное обучение] ИИ итоги уходящего 2020-го года в мире машинного обучения
- [Python, Программирование, Машинное обучение] Обеспечить январь настроением
- [PHP, Python, JavaScript, Управление разработкой] Эволюция команды разработки
- Нарушена работа поиска пакетов по Python-репозиторию PIP
- [Python, Big Data, Открытые данные, Веб-аналитика] Вытаскиваем данные из Instagram
Теги для поиска: #_python, #_scala, #_apache, #_apache, #_apache_spark, #_spark, #_python, #_scala, #_testing, #_kafka, #_streaming, #_python, #_scala, #_apache
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 16:25
Часовой пояс: UTC + 5
Автор | Сообщение |
---|---|
news_bot ®
Стаж: 6 лет 9 месяцев |
|
Введение На текущий момент не так много примеров тестов для приложений на основе Spark Structured Streaming. Поэтому в данной статье приводятся базовые примеры тестов с подробным описанием. Все примеры используют: Apache Spark 3.0.1. Подготовка Необходимо установить:
Необходимо настроить переменные среды. Здесь приведен пример для локального запуска. SPARK_HOME=/Users/$USER/Documents/spark/spark-3.0.1-bin-hadoop2.7
PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9-src.zip; Тесты Пример с scikit-learn При написании тестов необходимо разделять код таким образом, чтобы можно было изолировать логику и реальное применение конечного API. Хороший пример изоляции: DataFrame-pandas, DataFrame-spark. Для написания тестов будет использоваться следующий пример: LinearRegression. Итак, пусть код для тестирования использует следующий "шаблон" для Python: class XService:
def __init__(self): # Инициализация def train(self, ds): # Обучение def predict(self, ds): # Предсказание и вывод результатов Для Scala шаблон выглядит соответственно. Полный пример: from sklearn import linear_model
class LocalService: def __init__(self): self.model = linear_model.LinearRegression() def train(self, ds): X, y = ds self.model.fit(X, y) def predict(self, ds): r = self.model.predict(ds) print(r) Тест. Импорт: import unittest
import numpy as np Основной класс: class RunTest(unittest.TestCase):
Запуск тестов: if __name__ == "__main__":
unittest.main() Подготовка данных: X = np.array([
[1, 1], # 6 [1, 2], # 8 [2, 2], # 9 [2, 3] # 11 ]) y = np.dot(X, np.array([1, 2])) + 3 # [ 6 8 9 11], y = 1 * x_0 + 2 * x_1 + 3 Создание модели и обучение: service = local_service.LocalService()
service.train((X, y)) Получение результатов: service.predict(np.array([[3, 5]]))
service.predict(np.array([[4, 6]])) Ответ: [16.]
[19.] Все вместе: import unittest
import numpy as np from spark_streaming_pp import local_service class RunTest(unittest.TestCase): def test_run(self): # Prepare data. X = np.array([ [1, 1], # 6 [1, 2], # 8 [2, 2], # 9 [2, 3] # 11 ]) y = np.dot(X, np.array([1, 2])) + 3 # [ 6 8 9 11], y = 1 * x_0 + 2 * x_1 + 3 # Create model and train. service = local_service.LocalService() service.train((X, y)) # Predict and results. service.predict(np.array([[3, 5]])) service.predict(np.array([[4, 6]])) # [16.] # [19.] if __name__ == "__main__": unittest.main() Пример с Spark и Python Будет использован аналогичный алгоритм – LinearRegression. Нужно отметить, что Structured Streaming основан на тех же DataFrame-х, которые используются и в Spark Sql. Но как обычно есть нюансы. Инициализация: self.service = LinearRegression(maxIter=10, regParam=0.01)
self.model = None Обучение: self.model = self.service.fit(ds)
Получение результатов: transformed_ds = self.model.transform(ds)
q = transformed_ds.select("label", "prediction").writeStream.format("console").start() return q Все вместе: from pyspark.ml.regression import LinearRegression
class StructuredStreamingService: def __init__(self): self.service = LinearRegression(maxIter=10, regParam=0.01) self.model = None def train(self, ds): self.model = self.service.fit(ds) def predict(self, ds): transformed_ds = self.model.transform(ds) q = transformed_ds.select("label", "prediction").writeStream.format("console").start() return q Сам тест. Обычно в тестах можно использовать данные, которые создаются прямо в тестах. train_ds = spark.createDataFrame([
(6.0, Vectors.dense([1.0, 1.0])), (8.0, Vectors.dense([1.0, 2.0])), (9.0, Vectors.dense([2.0, 2.0])), (11.0, Vectors.dense([2.0, 3.0])) ], ["label", "features"] ) Это очень удобно и код получается компактным. Но подобный код, к сожалению, не будет работать в Structured Streaming, т.к. созданный DataFrame не будет обладать нужными свойствами, хотя и будет соответствовать контракту DataFrame. На текущий момент для создания источников для тестов можно использовать такой же подход, что и в тестах для Spark. def test_stream_read_options_overwrite(self):
bad_schema = StructType([StructField("test", IntegerType(), False)]) schema = StructType([StructField("data", StringType(), False)]) df = self.spark.readStream.format('csv').option('path', 'python/test_support/sql/fake') \ .schema(bad_schema)\ .load(path='python/test_support/sql/streaming', schema=schema, format='text') self.assertTrue(df.isStreaming) self.assertEqual(df.schema.simpleString(), "struct<data:string>") И так. Создается контекст для работы: spark = SparkSession.builder.enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("ERROR") Подготовка данных для обучения (можно сделать обычным способом): train_ds = spark.createDataFrame([
(6.0, Vectors.dense([1.0, 1.0])), (8.0, Vectors.dense([1.0, 2.0])), (9.0, Vectors.dense([2.0, 2.0])), (11.0, Vectors.dense([2.0, 3.0])) ], ["label", "features"] ) Обучение: service = structure_streaming_service.StructuredStreamingService()
service.train(train_ds) Получение результатов. Для начала считываем данные из файла и выделяем: признаки и идентификатор для объектов. После запускаем предсказание с ожиданием в 3 секунды. def extract_features(x):
values = x.split(",") features_ = [] for i in values[1:]: features_.append(float(i)) features = Vectors.dense(features_) return features extract_features_udf = udf(extract_features, VectorUDT()) def extract_label(x): values = x.split(",") label = float(values[0]) return label extract_label_udf = udf(extract_label, FloatType()) predict_ds = spark.readStream.format("text").option("path", "data/structured_streaming").load() \ .withColumn("features", extract_features_udf(col("value"))) \ .withColumn("label", extract_label_udf(col("value"))) service.predict(predict_ds).awaitTermination(3) Ответ: 15.96699
18.96138 Все вместе: import unittest
import warnings from pyspark.sql import SparkSession from pyspark.sql.functions import col, udf from pyspark.sql.types import FloatType from pyspark.ml.linalg import Vectors, VectorUDT from spark_streaming_pp import structure_streaming_service class RunTest(unittest.TestCase): def test_run(self): spark = SparkSession.builder.enableHiveSupport().getOrCreate() spark.sparkContext.setLogLevel("ERROR") # Prepare data. train_ds = spark.createDataFrame([ (6.0, Vectors.dense([1.0, 1.0])), (8.0, Vectors.dense([1.0, 2.0])), (9.0, Vectors.dense([2.0, 2.0])), (11.0, Vectors.dense([2.0, 3.0])) ], ["label", "features"] ) # Create model and train. service = structure_streaming_service.StructuredStreamingService() service.train(train_ds) # Predict and results. def extract_features(x): values = x.split(",") features_ = [] for i in values[1:]: features_.append(float(i)) features = Vectors.dense(features_) return features extract_features_udf = udf(extract_features, VectorUDT()) def extract_label(x): values = x.split(",") label = float(values[0]) return label extract_label_udf = udf(extract_label, FloatType()) predict_ds = spark.readStream.format("text").option("path", "data/structured_streaming").load() \ .withColumn("features", extract_features_udf(col("value"))) \ .withColumn("label", extract_label_udf(col("value"))) service.predict(predict_ds).awaitTermination(3) # +-----+------------------+ # |label| prediction| # +-----+------------------+ # | 1.0|15.966990887541273| # | 2.0|18.961384020443553| # +-----+------------------+ def setUp(self): warnings.filterwarnings("ignore", category=ResourceWarning) warnings.filterwarnings("ignore", category=DeprecationWarning) if __name__ == "__main__": unittest.main() Нужно отметить, что для Scala можно воспользоваться созданием потока в памяти. Это может выглядеть вот так: implicit val sqlCtx = spark.sqlContext
import spark.implicits._ val source = MemoryStream[Record] source.addData(Record(1.0, Vectors.dense(3.0, 5.0))) source.addData(Record(2.0, Vectors.dense(4.0, 6.0))) val predictDs = source.toDF() service.predict(predictDs).awaitTermination(2000) Полный пример на Scala (здесь, для разнообразия, не используется sql): package aaa.abc.dd.spark_streaming_pr.cluster
import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel} import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions.udf import org.apache.spark.sql.streaming.StreamingQuery class StructuredStreamingService { var service: LinearRegression = _ var model: LinearRegressionModel = _ def train(ds: DataFrame): Unit = { service = new LinearRegression().setMaxIter(10).setRegParam(0.01) model = service.fit(ds) } def predict(ds: DataFrame): StreamingQuery = { val m = ds.sparkSession.sparkContext.broadcast(model) def transformFun(features: org.apache.spark.ml.linalg.Vector): Double = { m.value.predict(features) } val transform: org.apache.spark.ml.linalg.Vector => Double = transformFun val toUpperUdf = udf(transform) val predictionDs = ds.withColumn("prediction", toUpperUdf(ds("features"))) predictionDs .writeStream .foreachBatch((r: DataFrame, i: Long) => { r.show() // scalastyle:off println println(s"$i") // scalastyle:on println }) .start() } } Тест: package aaa.abc.dd.spark_streaming_pr.cluster
import org.apache.spark.ml.linalg.Vectors import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.streaming.MemoryStream import org.scalatest.{Matchers, Outcome, fixture} class StructuredStreamingServiceSuite extends fixture.FunSuite with Matchers { test("run") { spark => // Prepare data. val trainDs = spark.createDataFrame(Seq( (6.0, Vectors.dense(1.0, 1.0)), (8.0, Vectors.dense(1.0, 2.0)), (9.0, Vectors.dense(2.0, 2.0)), (11.0, Vectors.dense(2.0, 3.0)) )).toDF("label", "features") // Create model and train. val service = new StructuredStreamingService() service.train(trainDs) // Predict and results. implicit val sqlCtx = spark.sqlContext import spark.implicits._ val source = MemoryStream[Record] source.addData(Record(1.0, Vectors.dense(3.0, 5.0))) source.addData(Record(2.0, Vectors.dense(4.0, 6.0))) val predictDs = source.toDF() service.predict(predictDs).awaitTermination(2000) // +-----+---------+------------------+ // |label| features| prediction| // +-----+---------+------------------+ // | 1.0|[3.0,5.0]|15.966990887541273| // | 2.0|[4.0,6.0]|18.961384020443553| // +-----+---------+------------------+ } override protected def withFixture(test: OneArgTest): Outcome = { val spark = SparkSession.builder().master("local[2]").getOrCreate() try withFixture(test.toNoArgTest(spark)) finally spark.stop() } override type FixtureParam = SparkSession case class Record(label: Double, features: org.apache.spark.ml.linalg.Vector) } Выводы При написании тестов необходимо разделять код таким образом, чтобы разделять логику и применение конкретных вызовов API. Можно использоваться любые доступные источники. В том числе и kafka. Такие абстракции как “DataFrame” позволяют это сделать легко и просто. При использовании Python данные придется хранить в файлах. Ссылки и ресурсы =========== Источник: habr.com =========== Похожие новости:
|
|
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 16:25
Часовой пояс: UTC + 5