ksqldb - 来自具有异构 JSON 结构的主题的 KSQL 流

标签 ksqldb

有没有办法从指定整个记录应被视为 VARCHAR 的主题创建流,以便我可以使用 extractjsonfield() 从它创建流?示例记录可能类似于:

{
  "Header": {
    "RecType": "RecA",
    ... more header records in a fairly consistent format ...
  },
  "RAFld1": {
    "someFld": "some data",
    "someOtherField": 1.001,
  },
  "RAFld2": {
    "aFld": "data",
    "anotherFld": 98.6,
    ...
  },
  ...
}

但下一条记录可能如下所示:

{
  "Header": {
    "RecType": "RecB",
    ... more header records in a fairly consistent format ...
  },
  "RBFld1": {
    "randomFld": "random data",
    "randomOtherField": 1.001,
    ...
  }
}

我可以弄清楚如何使用 VARCHAR 类型的已知字段定义初始流,然后使用 extractjsonfield() (使用适当的 where 子句),但没有找到一种方法来说明顶级结构没有一致命名字段。

这是我输入主题的格式化方式;我无法更改该格式。我希望 KSQL 能够成为一个优雅的解决方案,但看起来我从一开始就陷入了无法处理这种动态结构的困境。

最佳答案

如果您在架构中命名的字段并不出现在每条消息中,这并不重要;你会得到null值(value)观。

我认为你的问题很有趣,并且已经写了一份关于 KSQL 如何在这里工作的解释 - 如果你想用它做其他事情,请告诉我,我可以扩展答案。


  1. 检查原始数据:

    ksql> PRINT 'source_data' FROM BEGINNING;
    Format:JSON
    {"ROWTIME":1545239521600,"ROWKEY":"null","Header":{"RecType":"RecA"},"RAFld1":{"someFld":"some data","someOtherField":1.001},"RAFld2":{"aFld":"data","anotherFld":98.6}}
    {"ROWTIME":1545239526600,"ROWKEY":"null","Header":{"RecType":"RecB"},"RBFld1":{"randomFld":"random data","randomOtherField":1.001}}
    
  2. 注册 source_data用作名为 my_stream 的 KSQL 流的主题:

    CREATE STREAM my_stream (Header VARCHAR, \
                             RAFld1 VARCHAR, \
                             RAFld2 VARCHAR, \
                             RBFld1 VARCHAR) \
    WITH (KAFKA_TOPIC='source_data', VALUE_FORMAT='JSON');
    
  3. 检查消息。请注意,在第二条消息(记录类型“B”)中,“RAFld1”没有值,因此 null显示:

    ksql> SELECT Header, RAFld1 FROM my_stream LIMIT 2;
    {"RecType":"RecA"} | {"someOtherField":1.001,"someFld":"some data"}
    {"RecType":"RecB"} | null
    
  4. 仅使用记录类型“A”值填充新的 Kafka 主题,使用 EXTRACTFROMJSON过滤 Header 值上的记录类型,并从有效负载中提取命名字段:

    CREATE STREAM recA_data WITH (VALUE_FORMAT='AVRO') AS \
    SELECT EXTRACTJSONFIELD(RAFld1,'$.someOtherField') AS someOtherField, \
            EXTRACTJSONFIELD(RAFld1,'$.someFld')        AS someFld, \
            EXTRACTJSONFIELD(RAFld2,'$.aFld')           AS aFld, \
            EXTRACTJSONFIELD(RAFld2,'$.anotherFld')     AS anotherFld \
            FROM my_stream \
    WHERE EXTRACTJSONFIELD(Header,'$.RecType') = 'RecA';
    

    请注意,序列化正在切换到 Avro,以便任何使用者都可以自动使用该架构,而无需手动声明它。

  5. 观察新流具有模式,并且在消息到达原始 source_data 时不断填充消息。主题:

    ksql> DESCRIBE recA_data;
    
    Name                 : RECA_DATA
    Field          | Type
    --------------------------------------------
    ROWTIME        | BIGINT           (system)
    ROWKEY         | VARCHAR(STRING)  (system)
    SOMEOTHERFIELD | VARCHAR(STRING)
    SOMEFLD        | VARCHAR(STRING)
    AFLD           | VARCHAR(STRING)
    ANOTHERFLD     | VARCHAR(STRING)
    --------------------------------------------
    For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
    
    ksql> SELECT * FROM recA_data;
    1545240188787 | null | 1.001 | some data | data | 98.6
    

关于ksqldb - 来自具有异构 JSON 结构的主题的 KSQL 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53840974/

相关文章:

apache-kafka - 如何编写查询来提取 KSQL 中的所有字段值以及满足条件的值

apache-kafka - 如何查询 ksql 中的 map 字段?

apache-kafka - KSQL创建具有多列聚合的表

apache-kafka - Linux 的 Windows 子系统 : Command Not Found Error

oracle - 如何在Docker中连接到外部Oracle数据库

apache-kafka - 如何让ksql只打印最新的记录

confluent-platform - 使用 KSQL,为什么我的表保留较旧 ROWTIME 的数据并丢弃较新 ROWTIME 的更新?

java - KSQL 数据生成 - java.lang.ClassNotFoundException : MonitoringProducerInterceptor

java - 如何在 Spring Kafka 中接收来自 KSQL 的流式响应?