有没有办法从指定整个记录应被视为 VARCHAR 的主题创建流,以便我可以使用 extractjsonfield() 从它创建流?示例记录可能类似于:
{
"Header": {
"RecType": "RecA",
... more header records in a fairly consistent format ...
},
"RAFld1": {
"someFld": "some data",
"someOtherField": 1.001,
},
"RAFld2": {
"aFld": "data",
"anotherFld": 98.6,
...
},
...
}
但下一条记录可能如下所示:
{
"Header": {
"RecType": "RecB",
... more header records in a fairly consistent format ...
},
"RBFld1": {
"randomFld": "random data",
"randomOtherField": 1.001,
...
}
}
我可以弄清楚如何使用 VARCHAR 类型的已知字段定义初始流,然后使用 extractjsonfield() (使用适当的 where 子句),但没有找到一种方法来说明顶级结构没有一致命名字段。
这是我输入主题的格式化方式;我无法更改该格式。我希望 KSQL 能够成为一个优雅的解决方案,但看起来我从一开始就陷入了无法处理这种动态结构的困境。
最佳答案
如果您在架构中命名的字段并不出现在每条消息中,这并不重要;你会得到null
值(value)观。
我认为你的问题很有趣,并且已经写了一份关于 KSQL 如何在这里工作的解释 - 如果你想用它做其他事情,请告诉我,我可以扩展答案。
检查原始数据:
ksql> PRINT 'source_data' FROM BEGINNING; Format:JSON {"ROWTIME":1545239521600,"ROWKEY":"null","Header":{"RecType":"RecA"},"RAFld1":{"someFld":"some data","someOtherField":1.001},"RAFld2":{"aFld":"data","anotherFld":98.6}} {"ROWTIME":1545239526600,"ROWKEY":"null","Header":{"RecType":"RecB"},"RBFld1":{"randomFld":"random data","randomOtherField":1.001}}
注册
source_data
用作名为my_stream
的 KSQL 流的主题:CREATE STREAM my_stream (Header VARCHAR, \ RAFld1 VARCHAR, \ RAFld2 VARCHAR, \ RBFld1 VARCHAR) \ WITH (KAFKA_TOPIC='source_data', VALUE_FORMAT='JSON');
检查消息。请注意,在第二条消息(记录类型“B”)中,“RAFld1”没有值,因此
null
显示:ksql> SELECT Header, RAFld1 FROM my_stream LIMIT 2; {"RecType":"RecA"} | {"someOtherField":1.001,"someFld":"some data"} {"RecType":"RecB"} | null
仅使用记录类型“A”值填充新的 Kafka 主题,使用
EXTRACTFROMJSON
过滤 Header 值上的记录类型,并从有效负载中提取命名字段:CREATE STREAM recA_data WITH (VALUE_FORMAT='AVRO') AS \ SELECT EXTRACTJSONFIELD(RAFld1,'$.someOtherField') AS someOtherField, \ EXTRACTJSONFIELD(RAFld1,'$.someFld') AS someFld, \ EXTRACTJSONFIELD(RAFld2,'$.aFld') AS aFld, \ EXTRACTJSONFIELD(RAFld2,'$.anotherFld') AS anotherFld \ FROM my_stream \ WHERE EXTRACTJSONFIELD(Header,'$.RecType') = 'RecA';
请注意,序列化正在切换到 Avro,以便任何使用者都可以自动使用该架构,而无需手动声明它。
观察新流具有模式,并且在消息到达原始
source_data
时不断填充消息。主题:ksql> DESCRIBE recA_data; Name : RECA_DATA Field | Type -------------------------------------------- ROWTIME | BIGINT (system) ROWKEY | VARCHAR(STRING) (system) SOMEOTHERFIELD | VARCHAR(STRING) SOMEFLD | VARCHAR(STRING) AFLD | VARCHAR(STRING) ANOTHERFLD | VARCHAR(STRING) -------------------------------------------- For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>; ksql> SELECT * FROM recA_data; 1545240188787 | null | 1.001 | some data | data | 98.6
关于ksqldb - 来自具有异构 JSON 结构的主题的 KSQL 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53840974/