java - 无法使用Flink-nifi连接器从Nifi输出端口读取数据

标签 java docker apache-flink apache-nifi

我在虚拟机上的 docker 中有一个 Nifi 实例,暴露端口:8080 和 10000。 在瘦实例上,我创建了一个简单的管道,其输出端口名为“flink”,我想使用 flink-nifi 连接器读取此数据:

 SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
                .url("http://vm-address:8080/nifi")
                .portName("flink")
                .requestBatchCount(100)
                .buildConfig();
        DataStream<NiFiDataPacket> nifi = environment.addSource(new NiFiSource(clientConfig));

        nifi.map(new MapFunction<NiFiDataPacket, JsonNode>() {
            @Override
            public JsonNode map(NiFiDataPacket value) throws Exception {
                return DataConverter.byte2Json(value.getContent());
            }
        }).print();

在这种情况下我收到错误: 线程“main”中的异常org.apache.flink.runtime.client.JobExecutionException:java.net.UnknownHostException 如果我在配置中添加 localAddress:

 SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
                .url("http://vm-address:8080/nifi")
                .localAddress(InetAddress.getByName("vm-address"))
                .portName("flink")
                .requestBatchCount(100)
                .buildConfig();

我收到此错误: 线程“main”中的异常org.apache.flink.runtime.client.JobExecutionException:java.net.BindException:无法分配请求的地址:JVM_Bind

我在 Windows 上从本地 PC 运行此代码,并且 Flink 以独立模式启动。 另外,我尝试直接在虚拟机上运行它,但出现了同样的错误。

<小时/>

日志中有很多重试:

execchain.RetryExec: I/O exception (java.net.BindException) caught when processing request to /vm-address->{}->http://vm-address:8080: Cannot assign requested address: JVM_Bind

最佳答案

终于解决了! 问题出在我的 docker 配置中。首先,我像这样运行nifi: docker run --name nifi -p 8008:8080 -p 10000:10000 -d apache/nifi:1.7.1 默认情况下,网络为 bridge 。在这种情况下,我的容器有一些随机主机名,我不直接与容器通信,而是通过 Docker 进行通信。 当我选择网络=主机时: docker run --name nifi --network host -d apache/nifi:1.7.1一切顺利。 也许,我可以通过另一种方式解决它(也许,显式解析容器主机名),但这是最简单的方法

关于java - 无法使用Flink-nifi连接器从Nifi输出端口读取数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52994229/

相关文章:

typescript - SonarQube:找不到 typescript 模块

scala - 无法从 JAR 文件构建程序

java - 使用汇合反序列化Apache Flink中的Avro

java - Flink时间窗口和滑动时间窗口有什么区别?

java - Java 字节数组转十进制

java - 如何解析 YAML 文件

java - Java 中的对象内的对象

java - 拦截 Shell 命令

docker - 无法访问由 Dockerfile 中的 ADD/chown/chmod 创建的 docker 中自己的目录

docker - 如何通过 jenkins 管道在远程服务器上拉取和运行 docker 镜像