apache-spark - 使用 Python 中的 Spark Structured Streaming 从 Kafka 读取数据并打印到控制台

标签 apache-spark pyspark apache-kafka apache-spark-sql spark-structured-streaming

我有 kafka_2.13-2.7.0 在 Ubuntu 20.04 中。我运行 kafka 服务器和 zookeeper,然后创建一个主题并通过 nc -lk 9999 在其中发送一个文本文件.这个话题充满了数据。另外,我还有 spark-3.0.1-bin-hadoop2.7 在我的系统上。事实上,我想使用 kafka 主题作为 Spark Structured Streaming with python 的源。我的代码是这样的:

spark = SparkSession \
    .builder \
    .appName("APP") \
    .getOrCreate()

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "sparktest") \
    .option("startingOffsets", "earliest") \
    .load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.printSchema()
我通过 运行上面的代码spark-submit 用这个命令:
./spark-submit --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 /home/spark/PycharmProjects/testSparkStream/KafkaToSpark.py 
代码运行没有任何异常,我收到这个输出,因为它在 Spark 站点中:
   root
    |-- key: binary (nullable = true)
    |-- value: binary (nullable = true)
    |-- topic: string (nullable = true)
    |-- partition: integer (nullable = true)
    |-- offset: long (nullable = true)
    |-- timestamp: timestamp (nullable = true)
    |-- timestampType: integer (nullable = true)
我的问题是 kafka 主题充满了数据;但是由于在输出中运行代码,没有任何数据。你能指导我这里有什么问题吗?

最佳答案

原样的代码不会打印出任何数据,而只会为您提供一次架构。
您可以按照一般 Structured Streaming Guide 中给出的说明进行操作。和 Structured Streaming + Kafka integration Guide查看如何将数据打印到控制台。请记住,在 Spark 中读取数据是一种惰性操作,没有操作(通常是 writeStream 操作)什么也做不了。
如果您对以下代码进行补充,您应该会看到所选数据(键和值)打印到控制台:

spark = SparkSession \
          .builder \
          .appName("APP") \
          .getOrCreate()

df = spark\
      .readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "localhost:9092") \
      .option("subscribe", "sparktest") \
      .option("startingOffsets", "earliest") \
      .load()
      

query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .writeStream \
    .format("console") \
    .option("checkpointLocation", "path/to/HDFS/dir") \
    .start()

query.awaitTermination()

关于apache-spark - 使用 Python 中的 Spark Structured Streaming 从 Kafka 读取数据并打印到控制台,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65574234/

相关文章:

hadoop - 如何从 pyspark 连接到 Teradata?

java - Apache Spark 中 RowMatrix 和 Matrix 的区别?

Python Spark - 如何创建一个新列,对数据帧上的现有列进行切片?

apache-kafka - 具有更改日志主题与日志压缩源主题的 Kafka Streams KTable 存储

mysql - JDBC 接收器连接器 : How to map fields from the Kafka's message to the database table's column

apache-spark - 即使将 "auto.offset.reset"设置为 "latest"后也会出现错误 OffsetOutOfRangeException

apache-spark - 垃圾收集时间是apache spark中任务执行时间的一部分吗?

apache-spark - 使用 YARN 客户端模式时如何防止 Spark Executors 丢失?

python - 如何根据 PySpark 中的条件修改行子集

python-3.x - 获取由 PySpark Dataframe 上的另一列分组的列的不同元素