java - 在 Java Flink 作业中使用 Python 用户定义函数

标签 java python apache-flink flink-cep pyflink

无论如何要在Java Flink Job中使用python用户定义的函数,或者无论如何来交流例如flink与java完成的转换结果以及python用户定义的函数来应用一些机器学习的东西:
我知道从 pyFlink 你可以做这样的事情:

table_env.register_java_function("hash_code", "my.java.function.HashCode")
但是我需要做类似的事情但是从java添加python函数,或者如何将java转换的结果直接传递给Python UDF Flink作业?
我希望这些问题不要太疯狂,但我需要知道是否存在以某种方式将 Flink DataStream API 与以 Java 为主要语言的 Python Table API 进行通信?这意味着从 Java 我需要做:
Source -> Transformations -> Sink,但其中一些转换可以触发 Python 函数,或者 Python 函数将等待某些 Java 转换完成以对 Stream 结果执行某些操作。
我希望有人明白我在这里想做什么。
亲切的问候!

最佳答案

在 Flink 1.10 中添加了对 Python UDF(用户定义函数)的支持——参见 PyFlink: Introducing Python Support for UDFs in Flink's Table API .例如,您可以这样做:

add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())
table_env.register_function("add", add)
my_table.select("add(a, b)")
有关更多示例等,请参阅上面链接的博客文章,或 stable documentation .
在 Flink 1.11(预计下周发布)中,添加了对矢量化 Python UDF 的支持,带来了与 Pandas、Numpy 等的互操作性。此版本还包括对 SQL DDL 和 SQL 客户端中的 Python UDF 的支持。有关文档,请参阅 the master docs .
听起来您想从 Java 调用 Python。 Stateful Functions API 更全面地支持这一点——参见 remote functions .但是要从 Java DataStream API 调用 Python,我认为您唯一的选择是使用 Flink 1.11 中添加的 SQL DDL 支持。见 FLIP-106the docs .
FLIP-106 有这个例子:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
tEnv.getConfig().getConfiguration().setString("python.files", "/home/my/test1.py");
tEnv.getConfig().getConfiguration().setString("python.client.executable", "python3");

tEnv.sqlUpdate("create temporary system function func1 as 'test1.func1' language python");
Table table = tEnv.fromDataSet(env.fromElements("1", "2", "3")).as("str").select("func1(str)");
tEnv.toDataSet(table, String.class).collect();
您应该能够将其转换为使用 DataStream API。

关于java - 在 Java Flink 作业中使用 Python 用户定义函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62725535/

相关文章:

python - 设置 Python Pub/Sub 异步拉取订阅者线程数

python 日志记录性能比较和选项

generics - 在 clojure 中为泛型类提供类型提示

java - Flink 将 csv 文件映射到元组中

java - 如何在 C# 和 Java 之间正确设置 ssl 套接字连接?

java - 从 php 调用外部 java 类

java - 在 fragment 布局上使用谷歌地图时遇到问题

java - 如何使用 GWT 或 Javascript 强调 DIV 内的文本 (innerText) :

python - Swig C++ 到 python : compiling a bunch of . cpp 和 .h 文件

java - Flink : java. io.NotSerializedException : redis.clients.jedis.JedisCluster