python - 如何将 Spark Streaming 与 TensorFlow 集成?

标签 python apache-spark tensorflow pyspark real-time-data

目标:不断将嗅探到的网络包送入 Kafka Producer,将其连接到 Spark Streaming 以便能够处理包数据,之后,使用 Tensorflow 或 Keras 中的预处理数据。

我正在处理来自 Kafka 的 Spark Streaming (PySpark) 中的连续数据,现在我想将处理后的数据发送到 Tensorflow。我如何使用 Python 在 Tensorflow 中使用这些转换后的 DStream?谢谢。

目前没有在 Spark Streaming 中应用任何处理,但稍后会添加。这是 py 代码:

import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.conf import SparkConf
from datetime import datetime

if __name__ == '__main__':
    sc = SparkContext(appName='Kafkas')
    ssc = StreamingContext(sc, 2)
    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], 
                                       {'metadata.broker.list': brokers})
    lines = kvs.map(lambda x: x[1])
    lines.pprint()
    ssc.start()
    ssc.awaitTermination()

我也用它来启动 spark streaming:

spark-submit --packages org.apache.spark:spark-streaming-kafka-0–8_2.11:2.0.0 
spark-kafka.py localhost:9092 topic

最佳答案

您有两种方法可以解决您的问题:

  1. 处理完数据后,您可以保存它们,然后独立运行您的模型(在 Keras 中?)。如果它已经存在,只需创建一个 Parquet 文件/附加到它:

    if os.path.isdir(DATA_TREATED_PATH):
        data.write.mode('append').parquet(DATA_TREATED)
    else:
        data.write.parquet(DATA_TREATED_PATH)
    

然后您只需使用 keras/tensorflow 创建您的模型,然后像每小时一样运行它?或者您希望它更新多少次。所以每次都是从头开始运行。

  1. 你处理你的数据,像以前一样保存它们,但在那之后,你加载你的模型,训练你的新数据/新批处理,然后保存你的模型。这称为在线学习,因为您不会从头开始运行模型。

关于python - 如何将 Spark Streaming 与 TensorFlow 集成?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53834196/

相关文章:

python - 从现有数据框创建新数据框 - SettingWithCopyWarning

python - 将 pandas 数据框中的 notnull 值替换为列表中的值/如何获取 notnull 值的索引/ bool 索引的实现

apache-spark - 如何为 spark-shell 设置 YARN 队列?

java - 如何将具有值的列添加到 Spark Java 中的新数据集?

python - TensorFlow 使用 tf.while_loop() 陷入无限循环

python - 如何增量训练 nltk 分类器

python - 在不允许 str.split() 的情况下反转字符串的词序

apache-spark - 如何在 Spark 代码中设置 Kryo 的不可修改集合序列化器

python - 模块未找到错误 : No module named 'tensorflow.examples'

python - 如何将 numpy 数组列表放入 LSTM 神经网络?