我正在尝试在 KSQL 中加入两个现有的 Kafka 主题。 Kafka的部分数据样本(实际值因企业环境有所删减):
设备主题:
{
"persistTime" : "2020-10-06T13:30:25.373Z",
"previous" : {
"device" : "REDACTED",
"type" : "REDACTED",
"group" : "REDACTED",
"inventoryState" : "unknown",
"managementState" : "registered",
"communicationId" : "REDACTED",
"manufacturer" : "",
"description" : "",
"model" : "",
"location" : {
"geo" : {
"latitude" : "REDACTED",
"longitude" : "REDACTED"
},
"address" : {
"city" : "",
"postalCode" : "",
"street" : "",
"houseNumber" : "",
"floor" : "",
"company" : "",
"country" : "",
"reference" : "",
"timeZone" : "",
"region" : "",
"district" : ""
},
"logicalInstallationPoint" : ""
},
"tags" : [ ]
},
"current" : {
"device" : "REDACTED",
"type" : "REDACTED",
"group" : "REDACTED",
"inventoryState" : "unknown",
"managementState" : "registered",
"communicationId" : "REDACTED",
"manufacturer" : "",
"description" : "",
"model" : "",
"location" : {
"geo" : {
"latitude" : "REDACTED",
"longitude" : "REDACTED"
},
"address" : {
"city" : "",
"postalCode" : "",
"street" : "",
"houseNumber" : "",
"floor" : "",
"company" : "",
"country" : "",
"reference" : "",
"timeZone" : "",
"region" : "",
"district" : ""
},
"logicalInstallationPoint" : ""
},
"tags" : [ ]
}
}
设备事件主题(第一个示例):
{
"device" : "REDACTED",
"event" : "403151",
"firstOccurrenceTime" : "2020-09-30T11:03:50.000Z",
"lastOccurrenceTime" : "2020-09-30T11:03:50.000Z",
"occurrenceCount" : 1,
"receiveTime" : "2020-09-30T11:03:50.000Z",
"persistTime" : "2020-09-30T14:32:59.580Z",
"state" : "open",
"context" : {
"2" : "25",
"3" : "0",
"4" : "60",
"1" : "REDACTED"
}
}
设备事件主题(第二个示例):
{
"device" : "REDACTED",
"event" : "402004",
"firstOccurrenceTime" : "2020-10-07T07:02:48Z",
"lastOccurrenceTime" : "2020-10-07T07:02:48Z",
"occurrenceCount" : 1,
"receiveTime" : "2020-10-07T07:02:48Z",
"persistTime" : "2020-10-07T07:15:28.533Z",
"state" : "open",
"context" : {
"2" : "2020-10-07T07:02:48.0000000Z",
"1" : "REDACTED"
}
}
我面临的问题是设备事件主题下 context
中变量的数量变化。
我已尝试使用以下语句在 ksqlDB 上创建事件流:
CREATE STREAM "events"\
("device" VARCHAR, \
"event" VARCHAR, \
"firstOccurenceTime" VARCHAR, \
"lastOccurenceTime" VARCHAR, \
"occurenceCount" INTEGER, \
"receiveTime" VARCHAR, \
"persistTime" VARCHAR, \
"state" VARCHAR, \
"context" ARRAY<STRING>) \
WITH (KAFKA_TOPIC='device-event', VALUE_FORMAT='JSON');
CREATE STREAM "events"\
("device" VARCHAR, \
"event" VARCHAR, \
"firstOccurenceTime" VARCHAR, \
"lastOccurenceTime" VARCHAR, \
"occurenceCount" INTEGER, \
"receiveTime" VARCHAR, \
"persistTime" VARCHAR, \
"state" VARCHAR, \
"context" STRUCT\
<"1" VARCHAR, \
"2" VARCHAR, \
"3" VARCHAR, \
"4" VARCHAR>) \
WITH (KAFKA_TOPIC='ext_device-event_10195', VALUE_FORMAT='JSON');
第二条语句仅引入具有所有四个上下文变量(“1”、“2”、“3”和“4”)的数据。
如何为设备事件 Kafka 主题创建 KSQL 等效流?
最佳答案
您需要使用 MAP
而不是 STRUCT
。
顺便说一句,您也不再需要 \
行分隔符:)
这是一个使用 ksqlDB 0.12 的工作示例。
将示例数据加载到主题中
kafkacat -b localhost:9092 -P -t events <<EOF { "device" : "REDACTED", "event" : "403151", "firstOccurrenceTime" : "2020-09-30T11:03:50.000Z", "lastOccurrenceTime" : "2020-09-30T11:03:50.000Z", "occurrenceCount" : 1, "receiveTime" : "2020-09-30T11:03:50.000Z", "persistTime" : "2020-09-30T14:32:59.580Z", "state" : "open", "context" : { "2" : "25", "3" : "0", "4" : "60", "1" : "REDACTED" } } { "device" : "REDACTED", "event" : "402004", "firstOccurrenceTime" : "2020-10-07T07:02:48Z", "lastOccurrenceTime" : "2020-10-07T07:02:48Z", "occurrenceCount" : 1, "receiveTime" : "2020-10-07T07:02:48Z", "persistTime" : "2020-10-07T07:15:28.533Z", "state" : "open", "context" : { "2" : "2020-10-07T07:02:48.0000000Z", "1" : "REDACTED" } } EOF
在 ksqlDB 中,声明流:
CREATE STREAM "events" ( "device" VARCHAR, "event" VARCHAR, "firstOccurenceTime" VARCHAR, "lastOccurenceTime" VARCHAR, "occurenceCount" INTEGER, "receiveTime" VARCHAR, "persistTime" VARCHAR, "state" VARCHAR, "context" MAP < VARCHAR, VARCHAR > ) WITH (KAFKA_TOPIC = 'events', VALUE_FORMAT = 'JSON');
查询流以检查一切正常:
ksql> SET 'auto.offset.reset' = 'earliest'; Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change. ksql> SELECT "device", "event", "receiveTime", "state", "context" FROM "events" EMIT CHANGES; +----------+--------+--------------------------+--------+------------------------------------+ |device |event |receiveTime |state |context | +----------+--------+--------------------------+--------+------------------------------------+ |REDACTED |403151 |2020-09-30T11:03:50.000Z |open |{1=REDACTED, 2=25, 3=0, 4=60} | |REDACTED |402004 |2020-10-07T07:02:48Z |open |{1=REDACTED, 2=2020-10-07T07:02:48.0| | | | | |000000Z} |
使用
['']
语法访问映射中的特定键:ksql> SELECT "device", "event", "context", "context"['1'] AS CONTEXT_1, "context"['3'] AS CONTEXT_3 FROM "events" EMIT CHANGES; +-----------+--------+------------------------------------+-----------+-----------+ |device |event |context |CONTEXT_1 |CONTEXT_3 | +-----------+--------+------------------------------------+-----------+-----------+ |REDACTED |403151 |{1=REDACTED, 2=25, 3=0, 4=60} |REDACTED |0 | |REDACTED |402004 |{1=REDACTED, 2=2020-10-07T07:02:48.0|REDACTED |null | | | |000000Z} | | |
关于apache-kafka - 具有可变嵌套 JSON 对象作为 KSQL DB 流的 Kafka 主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64241285/