apache-kafka - 具有可变嵌套 JSON 对象作为 KSQL DB 流的 Kafka 主题

标签 apache-kafka ksqldb

我正在尝试在 KSQL 中加入两个现有的 Kafka 主题。 Kafka的部分数据样本(实际值因企业环境有所删减):

设备主题:

{
  "persistTime" : "2020-10-06T13:30:25.373Z",
  "previous" : {
    "device" : "REDACTED",
    "type" : "REDACTED",
    "group" : "REDACTED",
    "inventoryState" : "unknown",
    "managementState" : "registered",
    "communicationId" : "REDACTED",
    "manufacturer" : "",
    "description" : "",
    "model" : "",
    "location" : {
      "geo" : {
        "latitude" : "REDACTED",
        "longitude" : "REDACTED"
      },
      "address" : {
        "city" : "",
        "postalCode" : "",
        "street" : "",
        "houseNumber" : "",
        "floor" : "",
        "company" : "",
        "country" : "",
        "reference" : "",
        "timeZone" : "",
        "region" : "",
        "district" : ""
      },
      "logicalInstallationPoint" : ""
    },
    "tags" : [ ]
  },
  "current" : {
    "device" : "REDACTED",
    "type" : "REDACTED",
    "group" : "REDACTED",
    "inventoryState" : "unknown",
    "managementState" : "registered",
    "communicationId" : "REDACTED",
    "manufacturer" : "",
    "description" : "",
    "model" : "",
    "location" : {
      "geo" : {
        "latitude" : "REDACTED",
        "longitude" : "REDACTED"
      },
      "address" : {
        "city" : "",
        "postalCode" : "",
        "street" : "",
        "houseNumber" : "",
        "floor" : "",
        "company" : "",
        "country" : "",
        "reference" : "",
        "timeZone" : "",
        "region" : "",
        "district" : ""
      },
      "logicalInstallationPoint" : ""
    },
    "tags" : [ ]
  }
}

设备事件主题(第一个示例):

{
  "device" : "REDACTED",
  "event" : "403151",
  "firstOccurrenceTime" : "2020-09-30T11:03:50.000Z",
  "lastOccurrenceTime" : "2020-09-30T11:03:50.000Z",
  "occurrenceCount" : 1,
  "receiveTime" : "2020-09-30T11:03:50.000Z",
  "persistTime" : "2020-09-30T14:32:59.580Z",
  "state" : "open",
  "context" : {
    "2" : "25",
    "3" : "0",
    "4" : "60",
    "1" : "REDACTED"
  }
}

设备事件主题(第二个示例):

{
  "device" : "REDACTED",
  "event" : "402004",
  "firstOccurrenceTime" : "2020-10-07T07:02:48Z",
  "lastOccurrenceTime" : "2020-10-07T07:02:48Z",
  "occurrenceCount" : 1,
  "receiveTime" : "2020-10-07T07:02:48Z",
  "persistTime" : "2020-10-07T07:15:28.533Z",
  "state" : "open",
  "context" : {
    "2" : "2020-10-07T07:02:48.0000000Z",
    "1" : "REDACTED"
  }
}

我面临的问题是设备事件主题下 context 中变量的数量变化。

我已尝试使用以下语句在 ksqlDB 上创建事件流:

CREATE STREAM "events"\
("device" VARCHAR, \
"event" VARCHAR, \
"firstOccurenceTime" VARCHAR, \
"lastOccurenceTime" VARCHAR, \
"occurenceCount" INTEGER, \
"receiveTime" VARCHAR, \
"persistTime" VARCHAR, \
"state" VARCHAR, \
"context" ARRAY<STRING>) \
WITH (KAFKA_TOPIC='device-event', VALUE_FORMAT='JSON');
CREATE STREAM "events"\
("device" VARCHAR, \
"event" VARCHAR, \
"firstOccurenceTime" VARCHAR, \
"lastOccurenceTime" VARCHAR, \
"occurenceCount" INTEGER, \
"receiveTime" VARCHAR, \
"persistTime" VARCHAR, \
"state" VARCHAR, \
"context" STRUCT\
<"1" VARCHAR, \
"2" VARCHAR, \
"3" VARCHAR, \
"4" VARCHAR>) \
WITH (KAFKA_TOPIC='ext_device-event_10195', VALUE_FORMAT='JSON');

第二条语句仅引入具有所有四个上下文变量(“1”、“2”、“3”和“4”)的数据。

如何为设备事件 Kafka 主题创建 KSQL 等效流?

最佳答案

您需要使用 MAP 而不是 STRUCT

顺便说一句,您也不再需要 \ 行分隔符:)

这是一个使用 ksqlDB 0.12 的工作示例。

  • 将示例数据加载到主题中

    kafkacat -b localhost:9092 -P -t events <<EOF
    { "device" : "REDACTED", "event" : "403151", "firstOccurrenceTime" : "2020-09-30T11:03:50.000Z", "lastOccurrenceTime" : "2020-09-30T11:03:50.000Z", "occurrenceCount" : 1, "receiveTime" : "2020-09-30T11:03:50.000Z", "persistTime" : "2020-09-30T14:32:59.580Z", "state" : "open", "context" : { "2" : "25", "3" : "0", "4" : "60", "1" : "REDACTED" } }
    { "device" : "REDACTED", "event" : "402004", "firstOccurrenceTime" : "2020-10-07T07:02:48Z", "lastOccurrenceTime" : "2020-10-07T07:02:48Z", "occurrenceCount" : 1, "receiveTime" : "2020-10-07T07:02:48Z", "persistTime" : "2020-10-07T07:15:28.533Z", "state" : "open", "context" : { "2" : "2020-10-07T07:02:48.0000000Z", "1" : "REDACTED" } }
    EOF
    
  • 在 ksqlDB 中,声明流:

    CREATE STREAM "events" (
        "device" VARCHAR,
        "event" VARCHAR,
        "firstOccurenceTime" VARCHAR,
        "lastOccurenceTime" VARCHAR,
        "occurenceCount" INTEGER,
        "receiveTime" VARCHAR,
        "persistTime" VARCHAR,
        "state" VARCHAR,
        "context" MAP < VARCHAR, VARCHAR >
    ) WITH (KAFKA_TOPIC = 'events', VALUE_FORMAT = 'JSON');
    
  • 查询流以检查一切正常:

    ksql> SET 'auto.offset.reset' = 'earliest';
    Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
    
    ksql> SELECT "device", "event", "receiveTime", "state", "context" FROM "events" EMIT CHANGES;
    +----------+--------+--------------------------+--------+------------------------------------+
    |device    |event   |receiveTime               |state   |context                             |
    +----------+--------+--------------------------+--------+------------------------------------+
    |REDACTED  |403151  |2020-09-30T11:03:50.000Z  |open    |{1=REDACTED, 2=25, 3=0, 4=60}       |
    |REDACTED  |402004  |2020-10-07T07:02:48Z      |open    |{1=REDACTED, 2=2020-10-07T07:02:48.0|
    |          |        |                          |        |000000Z}                            |
    
  • 使用 [''] 语法访问映射中的特定键:

    ksql> SELECT "device", "event", "context", "context"['1'] AS CONTEXT_1, "context"['3'] AS CONTEXT_3 FROM "events" EMIT CHANGES;
    +-----------+--------+------------------------------------+-----------+-----------+
    |device     |event   |context                             |CONTEXT_1  |CONTEXT_3  |
    +-----------+--------+------------------------------------+-----------+-----------+
    |REDACTED   |403151  |{1=REDACTED, 2=25, 3=0, 4=60}       |REDACTED   |0          |
    |REDACTED   |402004  |{1=REDACTED, 2=2020-10-07T07:02:48.0|REDACTED   |null       |
    |           |        |000000Z}                            |           |           |
    

关于apache-kafka - 具有可变嵌套 JSON 对象作为 KSQL DB 流的 Kafka 主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64241285/

相关文章:

apache-kafka - 从主题创建表并得到一些关于 LongDeserializer 收到的数据大小不是 8 的序列化异常

apache-kafka - 如何在通过指定主题名称创建流形式模式注册表时在 ksql 中保留字段区分大小写

json - 创建 KSQL 流 : How to extract value from complex json

java - 延迟 Kafka Streams 消费

java - 我们如何使用 API 从 IDE 在 Kafka 中创建主题

python - Apache Beam Python SDK ReadFromKafka 不接收数据

apache-kafka - 如何根据连接器名称获取Kafka源连接器架构

java - Spring Kafka 和 Kafka Streams

scala - 如何解决Kafka Consumer轮询超时错误

mysql - 如何将MySql表数据集成到Ksql流或表中?