java - 在 kStreams 中使用 lambda 连接 Avro 格式数据

标签 java apache-kafka apache-kafka-streams confluent-schema-registry

我有两个流:

Stream1: 
[KSTREAM-MAP-0000000004]: 1, {"id": 1, "name": "john", "age": 26}
[KSTREAM-MAP-0000000004]: 2, {"id": 2, "name": "jane", "age": 24}
[KSTREAM-MAP-0000000004]: 3, {"id": 3, "name": "julia", "age": 25}
[KSTREAM-MAP-0000000004]: 4, {"id": 4, "name": "jamie", "age": 22}
[KSTREAM-MAP-0000000004]: 5, {"id": 5, "name": "jenny", "age": 27}

Stream 2:
[KSTREAM-MAP-0000000004]: 1, {"id": 1, "name": "xxx", "age": 26}
[KSTREAM-MAP-0000000004]: 2, {"id": 2, "name": "yyy", "age": 24}
[KSTREAM-MAP-0000000004]: 31, {"id": 3, "name": "zzz", "age": 25}
[KSTREAM-MAP-0000000004]: 41, {"id": 4, "name": "uuu", "age": 22}
[KSTREAM-MAP-0000000004]: 51, {"id": 5, "name": "iii", "age": 27}

现在我想加入两个流并根据 key 检索流 2 中不存在的流 1 字段。

我的异常(exception)输出应如下所示:

3, {"id": 3, "name": "julia", "age": 25}
4, {"id": 4, "name": "jamie", "age": 22}
5, {"id": 5, "name": "jenny", "age": 27}

我的架构注册表文件:

{"namespace": "schema.avro",
 "type": "record",
 "name": "mysql",
 "fields": [
     {"name": "id", "type": "int", "doc" : "id"},
     {"name": "name", "type": "string", "doc" : "name"},
     {"name": "age", "type": "int", "doc" : "age"}
 ]
}

我尝试以这种方式加入:

final Serde<GenericRecord> genericAvroSerde = new GenericAvroSerde();

KStream<Integer,String> joined1 = psql_data.leftJoin(mysql_data,
    (leftValue, rightValue) ->  "psql_data=" + leftValue + ", mysql_data=" + rightValue,
    JoinWindows.of(TimeUnit.MINUTES.toMillis(1)),
    Joined.with(
      Serdes.Integer(),
      genericAvroSerde,
      genericAvroSerde)
);

但是我遇到了一个异常(exception):

[ERROR] /home/kafka-connect/confluent-4.1.0/kafka_streaming/src/main/java/com/aail/kafka_stream.java:[140,43] error: no suitable method found for leftJoin(KStream<Integer,mysql>,(leftValue[...]Value,JoinWindows,Joined<Integer,GenericRecord,GenericRecord>)
[ERROR] method KStream.<VO#1,VR#1>leftJoin(KStream<Integer,VO#1>,ValueJoiner<? super mysql,? super VO#1,? extends VR#1>,JoinWindows) is not applicable
[ERROR] (cannot infer type-variable(s) VO#1,VR#1
[ERROR] (actual and formal argument lists differ in length))
[ERROR] method KStream.<VO#2,VR#2>leftJoin(KStream<Integer,VO#2>,ValueJoiner<? super mysql,? super VO#2,? extends VR#2>,JoinWindows,Joined<Integer,mysql,VO#2>) is not applicable
[ERROR] (inferred type does not conform to equality constraint(s)
[ERROR] inferred: GenericRecord
[ERROR] equality constraints(s): GenericRecord,mysql)

我想我需要在连接函数的左右值中给出我的 mysql avro 文件,而不是 genericAvroSerde。我尝试过,但我不明白。有人可以帮忙执行连接操作吗?

最佳答案

您需要在使用GenericAvroSerde之前对其进行配置:

final Serde<GenericRecord> genericAvroSerde = new GenericAvroSerde();
genericAvroSerde.configure(...);

并传入配置,以便它可以找到 Confluence Schema Registry,如文档中所述:https://docs.confluent.io/current/streams/developer-guide/datatypes.html#avro

关于java - 在 kStreams 中使用 lambda 连接 Avro 格式数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50561918/

相关文章:

java - BIRT 主题不显示在 tomcat 上

java - Kafka 在第一秒内生成消息的速度很慢

python - 我可以使用 Pyspark 更新远程 Mysql DB 中表中的值吗?

docker - docker-compose 中的 Kafka Streams 需要很长时间进行分区分配

apache-kafka - kafka是否具有规则引擎的能力?

java - 替换word文件中的标签

java - 将 BST 转换为数组

java - 如何声明 scala 方法以便可以使用可变参数样式从 Java 调用它

java - 如何检测java中的消费者是否无法使用kafka代理?

apache-kafka - 加入Kafka流中的外键