apache-flink - Apache Flink 1.14.0 - 无法通过 Java 中的 SQL DDL 使用 python UDF

标签 apache-flink user-defined-functions flink-table-api

我正在尝试在 SQL DDL(1.14.0) 中执行 python UDF 函数

Python文件在这里:

from pyflink.table import DataTypes
from pyflink.table.udf import udf


@udf(input_types=[DataTypes.INT()], result_type=DataTypes.INT())
def add_one(a: int):
    return a + 1

并启动flink集群:

➜  flink-1.14.0 ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host magiclian-ubuntu.
Starting taskexecutor daemon on host magiclian-ubuntu.

这里是 Java 代码:

public class PyUDF {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        //set cfg
        tEnv.getConfig().getConfiguration().setString("python.files",
                "/home/magic/workspace/python/flinkTestUdf/udfTest.py");
        tEnv.getConfig().getConfiguration().setString("python.client.executable", "python3");
        tEnv.executeSql(
                "CREATE TEMPORARY SYSTEM FUNCTION add1 AS 'udfTest.add_one' LANGUAGE PYTHON"
        );

        TableResult ret1 = tEnv.executeSql("select add1(3)");
        ret1.print();

        env.execute();
    }
}

然后通过Flink客户端运行作业:

flink run /home/magic/workspace/flink-jobs/UDF/pythonUDF/target/pythonUDF-1.0.0.jar

错误是:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL validation failed. Cannot instantiate user-defined function 'add1'.

但是当我使用 sql-client 执行我的 py UDF 时,它运行成功。

启动sql客户端:

PYFLINK_CLIENT_EXECUTABLE=/usr/bin/python3  ./sql-client.sh embedded -pyexec /usr/bin/python3 -pyfs home/magic/workspace/python/flinkTestUdf/udfTest.py

然后

创建临时系统函数 add1 作为“udfTest.add_one”语言 python;

然后

选择 add1(3);

我得到了正确的结果 4,我的代码有问题吗?

我看到版本 1.11 支持 py UDF 函数 https://cwiki.apache.org/confluence/display/FLINK/FLIP-106%3A+Support+Python +UDF+in+SQL+Function+DDL,但现在我使用的是 1.14.0。

谁能帮帮我!

最佳答案

确保所有依赖项都已安装。

Java:

  • 8 或 11

  • 行家 3.5+

  • flink jar :

      <dependencies>
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-java</artifactId>
                 <version>${flink.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-streaming-java_2.11</artifactId>
                 <version>${flink.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-clients_2.11</artifactId>
                 <version>${flink.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-table-api-java-bridge_2.11</artifactId>
                 <version>${flink.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-table-common</artifactId>
                 <version>${flink.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-table-planner_2.11</artifactId>
                 <version>${flink.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-python_2.11</artifactId>
                 <version>${flink.version}</version>
             </dependency>
         </dependencies>
    

python :

  • Python 3.6+
  • Apache Beam(== 2.19.0)
  • 点(>=7.1.0)
  • 设置工具(>= 37.0.0)
  • apache-fink (1.14.0)

关于apache-flink - Apache Flink 1.14.0 - 无法通过 Java 中的 SQL DDL 使用 python UDF,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70212809/

相关文章:

sql-server - SQL Server View 或表值函数?

Python 3 : Functions dependent on other functions, 缺少参数?

apache-flink - Flink sink filesystem as parquet - 保存嵌套数据时出错

apache-flink - 如何在 Apache Flink 中合并两个数据流

java - Flink 按字段 id 对记录进行分组的最佳方式

hadoop - 使用 S3AFileSystem 的 Flink 不会从 S3 读取子文件夹

java - 哪种方法是读取缓慢变化查找和丰富流输入集合的最佳方法?

打开两个工作簿的 Excel UDF