hadoop - Apache Spark : Apply existing mllib model on Incoming DStreams/DataFrames

标签 hadoop apache-spark machine-learning apache-spark-mllib

使用 Apache Spark 的 mllib,我有一个存储在 HDFS 中的逻辑回归模型。此逻辑回归模型是根据来自某些传感器的历史数据进行训练的。

我有另一个 spark 程序,它使用来自这些传感器的流数据。我希望能够使用预先存在的训练模型对传入的数据流进行预测。注意:我不希望我的模型被这些数据更新。

要加载训练模型,我必须在我的代码中使用以下行:

val logisticModel = LogisticRegressionModel.load(sc, <location of model>)

sc: Spark 上下文。

但是,这个应用程序是一个流应用程序,因此已经有一个“StreamingContext”设置。现在,根据我的阅读,在同一个程序中有两个上下文是不好的做法(即使这是可能的)。

这是否意味着我的方法是错误的,我不能做我想做的事?

此外,如果我继续将流数据存储在一个文件中并继续对其运行逻辑回归而不是尝试直接在流应用程序中执行它会更有意义吗?

最佳答案

StreamingContext 可以通过几种方式创建,包括两个采用现有 SparkContext 的构造函数:

  • StreamingContext(path: String, sparkContext: SparkContext) - 其中 path 是检查点文件的路径
  • StreamingContext(sparkContext: SparkContext, batchDuration: Duration)

因此您可以简单地创建 SparkContext,加载所需的模型,并创建 StreamingContext:

val sc: SparkContext = ???
...
val ssc = new StreamingContext(sc, Seconds(1))

您还可以使用 StreamingContext.sparkContext 方法获取 SparkContext:

val ssc: StreamingContext  = ???
ssc.sparkContext: SparkContext

关于hadoop - Apache Spark : Apply existing mllib model on Incoming DStreams/DataFrames,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36021796/

相关文章:

hadoop - 为什么 Mutation 不为现有列插入

mysql - Spark SQL/Hive 查询永远伴随着 Join

python - PyTorch 学习调度程序顺序极大地改变了损失

c++ - OpenCV 3.1.0 C++ 预测函数 RAW_OUTPUT 返回值有时是反的

hadoop - 映射器将值传递给不同的映射器-缩减器

hadoop - 使用 Hadoop 读取 s3 时出现 java.lang.NullPointerException(Scalding)

hadoop - 为缓存 RDD 分配了多少内存?

apache-spark - 处理 spark mllib 分类器中的 null/NaN 值

apache-spark - 如何使用Spark查找10亿条记录的最近邻居?

machine-learning - 无人机虚拟测试环境