我正在尝试在 SQL DDL(1.14.0) 中执行 python UDF 函数
Python文件在这里:
from pyflink.table import DataTypes
from pyflink.table.udf import udf
@udf(input_types=[DataTypes.INT()], result_type=DataTypes.INT())
def add_one(a: int):
return a + 1
并启动flink集群:
➜ flink-1.14.0 ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host magiclian-ubuntu.
Starting taskexecutor daemon on host magiclian-ubuntu.
这里是 Java 代码:
public class PyUDF {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//set cfg
tEnv.getConfig().getConfiguration().setString("python.files",
"/home/magic/workspace/python/flinkTestUdf/udfTest.py");
tEnv.getConfig().getConfiguration().setString("python.client.executable", "python3");
tEnv.executeSql(
"CREATE TEMPORARY SYSTEM FUNCTION add1 AS 'udfTest.add_one' LANGUAGE PYTHON"
);
TableResult ret1 = tEnv.executeSql("select add1(3)");
ret1.print();
env.execute();
}
}
然后通过Flink客户端运行作业:
flink run /home/magic/workspace/flink-jobs/UDF/pythonUDF/target/pythonUDF-1.0.0.jar
错误是:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL validation failed. Cannot instantiate user-defined function 'add1'.
但是当我使用 sql-client 执行我的 py UDF 时,它运行成功。
启动sql客户端:
PYFLINK_CLIENT_EXECUTABLE=/usr/bin/python3 ./sql-client.sh embedded -pyexec /usr/bin/python3 -pyfs home/magic/workspace/python/flinkTestUdf/udfTest.py
然后
创建临时系统函数 add1 作为“udfTest.add_one”语言 python;
然后
选择 add1(3);
我得到了正确的结果 4
,我的代码有问题吗?
我看到版本 1.11
支持 py UDF 函数 https://cwiki.apache.org/confluence/display/FLINK/FLIP-106%3A+Support+Python +UDF+in+SQL+Function+DDL
,但现在我使用的是 1.14.0。
谁能帮帮我!
最佳答案
确保所有依赖项都已安装。
Java:
8 或 11
行家 3.5+
flink jar :
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-python_2.11</artifactId> <version>${flink.version}</version> </dependency> </dependencies>
python :
- Python 3.6+
- Apache Beam(== 2.19.0)
- 点(>=7.1.0)
- 设置工具(>= 37.0.0)
- apache-fink (1.14.0)
关于apache-flink - Apache Flink 1.14.0 - 无法通过 Java 中的 SQL DDL 使用 python UDF,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70212809/