java - 无法从 KSQL UDF 返回结构

标签 java apache-kafka user-defined-functions ksqldb

我正在尝试创建一个 UDF 函数,它接受 STRUCT 作为参数,并在对输入进行一些修改(更新现有字段+添加新字段)后返回一个结构,如下面的代码所示:

@UdfDescription(name = "myCustomUdf")
public class MyCustomUdf {

    @Udf(description = "do stuff")
    public Struct MyCustomUdf(@UdfParameter(schema = "struct <NAME VARCHAR, EMAIL VARCHAR>", value = "user") final Struct struct) {
        String processedEmail = struct.getString("EMAIL").toUpperCase();
        struct.put("EMAIL", processedEmail);
        return struct;
    }
}

但是,在部署自定义 jar 时,我在 KSQL 日志中看到以下异常,在文档中找不到任何涉及禁止此功能的内容,或者我在这里遗漏了一些内容:

让UDF返回STRUCT是否可行?

io.confluent.ksql.util.KsqlException: Could not load UDF method with signature: public org.apache.kafka.connect.data.Struct com.myudf.MyCustomUdf.MyCustomUdf(org.apache.kafka.connect.data.Struct)
    at io.confluent.ksql.function.UdfLoader.getReturnType(UdfLoader.java:373)
    at io.confluent.ksql.function.UdfLoader.addFunction(UdfLoader.java:280)
    at io.confluent.ksql.function.UdfLoader.lambda$handleUdfAnnotation$8(UdfLoader.java:217)
    at io.github.lukehutch.fastclasspathscanner.scanner.ScanSpec$9.lookForMatches(ScanSpec.java:1390)
    at io.github.lukehutch.fastclasspathscanner.scanner.ScanSpec.callMatchProcessors(ScanSpec.java:696)
    at io.github.lukehutch.fastclasspathscanner.FastClasspathScanner.scan(FastClasspathScanner.java:1606)
    at io.github.lukehutch.fastclasspathscanner.FastClasspathScanner.scan(FastClasspathScanner.java:1678)
    at io.github.lukehutch.fastclasspathscanner.FastClasspathScanner.scan(FastClasspathScanner.java:1704)
    at io.confluent.ksql.function.UdfLoader.loadUdfs(UdfLoader.java:145)
    at io.confluent.ksql.function.UdfLoader.lambda$load$2(UdfLoader.java:115)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
    at java.util.Iterator.forEachRemaining(Iterator.java:116)
    at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
    at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
    at io.confluent.ksql.function.UdfLoader.load(UdfLoader.java:115)
    at io.confluent.ksql.rest.server.KsqlRestApplication.buildApplication(KsqlRestApplication.java:473)
    at io.confluent.ksql.rest.server.KsqlRestApplication.buildApplication(KsqlRestApplication.java:441)
    at io.confluent.ksql.rest.server.KsqlServerMain.createExecutable(KsqlServerMain.java:94)
    at io.confluent.ksql.rest.server.KsqlServerMain.main(KsqlServerMain.java:59)
Caused by: io.confluent.ksql.util.KsqlException: Type inference is not supported for: class org.apache.kafka.connect.data.Struct
    at io.confluent.ksql.util.SchemaUtil.handleParametrizedType(SchemaUtil.java:378)
    at io.confluent.ksql.util.SchemaUtil.lambda$getSchemaFromType$5(SchemaUtil.java:158)
    at io.confluent.ksql.util.SchemaUtil.getSchemaFromType(SchemaUtil.java:158)
    at io.confluent.ksql.util.SchemaUtil.getSchemaFromType(SchemaUtil.java:153)
    at io.confluent.ksql.function.UdfLoader.getReturnType(UdfLoader.java:366)
    ... 26 more

最佳答案

KSQL 不仅需要输入参数的模式,还需要输出的模式。对于您的情况,请确保在 @Udf 注释中指定以下内容:

    @Udf(description = "do stuff", schema="STRUCT<name VARCHAR, email VARCHAR>")
    public Struct MyCustomUdf(@UdfParameter(schema = "struct <NAME VARCHAR, EMAIL VARCHAR>", value = "user") final Struct struct) {
       ...
    }

看来我们的文档中确实遗漏了这一点!我添加了一个问题以确保它已得到解决 ( https://github.com/confluentinc/ksql/issues/3699 )。

关于java - 无法从 KSQL UDF 返回结构,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58583372/

相关文章:

java - 错误: unknown type name 'ClassObject'

java - 如何在 Spring Boot 中获取静态资源(js、css)作为 ClassPathResource?

java - 当 where 子句条件可以为 null 时更新 SQL

apache-kafka - kafka 连接器 HTTP/API 源

java - 在没有 hashmap 结构的微调器中使用键和值

d3.js - Bluemix Kafka 流

node.js - Kafka Node - BrokerNotAvailableError

scala - 合并 Spark 数据框中的列

sql - 尝试创建函数时 DECLARE 部分中的“"VARCHAR"处或附近的语法错误”

sql - 如何使用 Fluent NHibernate 映射 Tsql 用户定义函数?