apache-flink - 选择所有字段为json字符串作为Flink SQL中的新字段

标签 apache-flink flink-streaming flink-sql

我正在使用 Flink Table API。我有一个表定义,我想选择所有字段并将它们转换为新字段中的 JSON 字符串。

我的表有三个字段; a:字符串,b:整数,c:时间戳。

如果我这样做

INSERT INTO kinesis
SELECT a, b, c from my_table

kinesis流有json记录;

{
  "a" : value,
  "b": value,
  "c": value
}

但是,我想要类似于 Spark 的功能;

INSERT INTO kinesis
SELECT "constant_value" as my source, to_json(struct(*)) as playload from my_table

所以,预期结果是;

{
  "my_source": "constant_value",
  "payload": "json string from the first example that has a,b,c"
}

我在 Flink 中看不到任何 to_jsonstruct() 函数。可以实现吗?

最佳答案

您可能必须实现自己的用户定义聚合函数。

这就是我所做的,这里我假设 UDF 的输入看起来像

to_json('col1', col1, 'col2', col2)

public class RowToJson extends ScalarFunction {
    public String eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object... row) throws Exception {
        if(row.length % 2 != 0) {
            throw new Exception("Wrong key/value pairs!");
        }

        String json = IntStream.range(0, row.length).filter(index -> index % 2 == 0).mapToObj(index -> {
            String name = row[index].toString();
            Object value = row[index+1];
            ... ...
        }).collect(Collectors.joining(",", "{", "}"));
        return json;
    }
}

如果您希望 udf 可以用于 group by,则必须从 AggregateFunction 扩展您的 udf 类

public class RowsToJson extends AggregateFunction<String, List<String>>{
    @Override
    public String getValue(List<String> accumulator) {
        return accumulator.stream().collect(Collectors.joining(",", "[", "]"));
    }

    @Override
    public List<String> createAccumulator() {
        return new ArrayList<String>();
    }

    public void accumulate(List<String> acc, @DataTypeHint(inputGroup = InputGroup.ANY) Object... row) throws Exception {
        if(row.length % 2 != 0) {
            throw new Exception("Wrong key/value pairs!");
        }
        String json = IntStream.range(0, row.length).filter(index -> index % 2 == 0).mapToObj(index -> {
            String name = row[index].toString();
            Object value = row[index+1];
            ... ...
        }).collect(Collectors.joining(",", "{", "}"));
        acc.add(json);
    }

}

关于apache-flink - 选择所有字段为json字符串作为Flink SQL中的新字段,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65896143/

相关文章:

kubernetes - 如何在 Kubernetes Flink 集群中实现 JobManager 高可用?

apache-flink - apache Flink 中的重叠分区

hadoop - Flink Temp Jar上传目录已删除

scala - 使用Flink获取DataStream的文件名

optimization - 如何知道哪些运算符可以在 Apache Flink 中链接

apache-flink - 弗林克 : Build is failing when I add gauge

apache-kafka - 无法反序列化 Avro 记录 - Apache flink SQL CLI

java - 如何确定flink中的任务槽数

java - 在 Apache Flink Broadcast 流中应用基于窗口的规则

apache-flink - Flink CEP : Which method to join data streams for different type of events?