apache-flink - Flink 到 Nifi 的 Magic Header 不存在

标签 apache-flink apache-nifi

我正在尝试使用this example将 Nifi 连接到 Flink:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
            .url("http://localhost:8090/nifi")
            .portName("Data for Flink")
            .requestBatchCount(5)
            .buildConfig();

    SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig);
    DataStream<NiFiDataPacket> streamSource = env.addSource(nifiSource).setParallelism(2);

    DataStream<String> dataStream = streamSource.map(new MapFunction<NiFiDataPacket, String>() {
        @Override
        public String map(NiFiDataPacket value) throws Exception {
            return new String(value.getContent(), Charset.defaultCharset());
        }
    });

    dataStream.print();
    env.execute();

我将 Nifi 作为具有默认属性的独立服务器运行,但以下属性除外:

nifi.remote.input.host=localhost
nifi.remote.input.secure=false
nifi.remote.input.socket.port=8090
nifi.remote.input.http.enabled=true

每次调用都会失败,Nifi中的日志如下:

[Site-to-Site Worker Thread-24] o.a.nifi.remote.SocketRemoteSiteListener 
Unable to communicate with remote instance null due to
org.apache.nifi.remote.exception.HandshakeException: Handshake 
with nifi://localhost:61680 failed because the Magic Header 
was not present; closing connection

Nifi版本:1.7.1,Flink版本:1.7.1

最佳答案

使用nifi-toolkit后,我删除了nifi.remote.input.socket.port的自定义值,然后添加了transportProtocol(SiteToSiteTransportProtocol.HTTP) 到我的 SiteToSiteClientConfighttp://localhost:8080/nifi 作为 URL。

我首先更改端口的原因是,如果不指定协议(protocol)HTTP,它将默认使用RAW。 当从 Flink 端使用 RAW 协议(protocol)时,客户端无法创建 Transaction 并打印以下警告:

Unable to refresh Remote Group's peers due to Remote instance of NiFi 
is not configured to allow RAW Socket site-to-site communications

这就是为什么我认为这是端口问题

现在使用 Nifi 的默认配置,这可以按预期工作:

SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
            .url("http://localhost:8080/nifi")
            .portName("portNameAsInNifi")
            .transportProtocol(SiteToSiteTransportProtocol.HTTP)
            .requestBatchCount(1)
            .buildConfig();

关于apache-flink - Flink 到 Nifi 的 Magic Header 不存在,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53991316/

相关文章:

apache-flink - 在 Apache Flink 中,Job Manager 和 Job Master 有什么区别?

apache-kafka - Flink Kafka Stream 相对于 Spark Kafka Stream 的优势?和 Flink 上的 Kafka Stream?

java - Flink runner 上的 Beam : ClassNotFoundException: org. apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector

json - 有 JOLT 文档吗? &、@ 等运算符的含义是什么? (NiFi, JoltTransformJSON)

maven - 使用 Flink 处理 Twitter 数据时的依赖问题

hadoop - Flink 能否将结果写入多个文件(如 Hadoop 的 MultipleOutputFormat)?

java - 如何在Nifi中用表达式语言定义ControllerService

json - NiFi QueryRecord 处理器 - 选择可选 JSON 属性

apache-nifi - 在 Apache NiFi 中,我可以评估没有属性的表达式语言吗?

java - 在安全集群中使用 Nifi