java - 在 Spark 结构化流处理中处理二进制数据

标签 java apache-spark spark-streaming apache-spark-dataset

我正在使用 Kafka 和 Spark 结构化流。我收到以下格式的 kafka 消息。

{"deviceId":"001","sNo":1,"data":"aaaaa"}
{"deviceId":"002","sNo":1,"data":"bbbbb"}
{"deviceId":"001","sNo":2,"data":"ccccc"}
{"deviceId":"002","sNo":2,"data":"ddddd"}

我正在像下面这样阅读它。

Dataset<String> data = spark
      .readStream()
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option(subscribeType, topics)
      .load()
      .selectExpr("CAST(value AS STRING)")
      .as(Encoders.STRING());
Dataset<DeviceData> ds = data.as(ExpressionEncoder.javaBean(DeviceData.class)).orderBy("deviceId","sNo"); 
ds.foreach(event -> 
      processData(event.getDeviceId(),event.getSNo(),event.getData().getBytes())
);}

private void processData(String deviceId,int SNo, byte[] data) 
{
  //How to check previous processed Dataset???
} 

在我的json消息中,“data”是byte[]的字符串形式。我有一个要求,我需要按“sNo”的顺序处理给定“deviceId”的二进制“数据”。因此,对于“deviceId”=“001”,我必须处理“sNo”=1 的二进制数据,然后处理“sNo”=2 等等。如何检查结构化流中先前处理的数据集的状态?

最佳答案

如果您正在寻找像 DStream.mapWithState 这样的状态管理,那么结构化流尚不支持它。工作正在进行中。请检查 https://issues.apache.org/jira/browse/SPARK-19067 .

关于java - 在 Spark 结构化流处理中处理二进制数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42433039/

相关文章:

java - Spark Streaming App通过代码提交

java - Spark : java. lang.NoClassDefFoundError: com/mongodb/hadoop/MongoInputFormat

java - 复制数组中的字符串值

java - 为什么不能伪造ServletRequest.getRemoteAddr?

Scala 2.11 Spark 2.0 hortonworks-spark/shc sbt 程序集

python - 从 Spark 连接到 HANA

sql - Spark sql 中的广播连接(Spark 1.6.2)

java - 加速 Java 应用程序

Java GUI、JFrames 和对话框

java - Spark和java编译错误