目标:不断将嗅探到的网络包送入 Kafka Producer,将其连接到 Spark Streaming 以便能够处理包数据,之后,使用 Tensorflow 或 Keras 中的预处理数据。
我正在处理来自 Kafka 的 Spark Streaming (PySpark) 中的连续数据,现在我想将处理后的数据发送到 Tensorflow。我如何使用 Python 在 Tensorflow 中使用这些转换后的 DStream?谢谢。
目前没有在 Spark Streaming 中应用任何处理,但稍后会添加。这是 py 代码:
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.conf import SparkConf
from datetime import datetime
if __name__ == '__main__':
sc = SparkContext(appName='Kafkas')
ssc = StreamingContext(sc, 2)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic],
{'metadata.broker.list': brokers})
lines = kvs.map(lambda x: x[1])
lines.pprint()
ssc.start()
ssc.awaitTermination()
我也用它来启动 spark streaming:
spark-submit --packages org.apache.spark:spark-streaming-kafka-0–8_2.11:2.0.0
spark-kafka.py localhost:9092 topic
最佳答案
您有两种方法可以解决您的问题:
处理完数据后,您可以保存它们,然后独立运行您的模型(在 Keras 中?)。如果它已经存在,只需创建一个 Parquet 文件/附加到它:
if os.path.isdir(DATA_TREATED_PATH): data.write.mode('append').parquet(DATA_TREATED) else: data.write.parquet(DATA_TREATED_PATH)
然后您只需使用 keras/tensorflow 创建您的模型,然后像每小时一样运行它?或者您希望它更新多少次。所以每次都是从头开始运行。
- 你处理你的数据,像以前一样保存它们,但在那之后,你加载你的模型,训练你的新数据/新批处理,然后保存你的模型。这称为在线学习,因为您不会从头开始运行模型。
关于python - 如何将 Spark Streaming 与 TensorFlow 集成?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53834196/