java - Flink奇怪的 "Cannot Serialize operator object class ...CoBroadcastWithNonKeyedOperator"错误

标签 java apache-flink

我正在尝试使用 BroadcastState 设置一个项目,但由于某种原因,当我尝试运行它时收到此错误:

org.apache.flink.streaming.runtime.tasks.StreamTaskException:无法序列化运算符对象类org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator。

我不知道为什么它会抛出它。传入和输出的对象(SampleInput 和 Token)是非常简单的 avro 生成的 pojo,具有两个或三个字段,我尝试将 BroadcastProcessFunction 的方法留空,以删除我可以设置的任何内容,使其无法序列化,但是仍然收到错误。这是代码的相关部分:

//Sideoutput that error strings will be written to
    OutputTag<String> sideOutputTag = new OutputTag<String>("side-output") {};

    //<Setup for broadcast state>
    StateTtlConfig ttlConfig = StateTtlConfig
            .newBuilder(Time.seconds(1))
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .cleanupFullSnapshot()
            .build();

    final MapStateDescriptor<String, Token> ruleStateDescriptor = new MapStateDescriptor<>(
            "oathTokens",
            BasicTypeInfo.STRING_TYPE_INFO,
            AvroTypeInfo.of(new TypeHint<Token>() {}));
    ruleStateDescriptor.enableTimeToLive(ttlConfig);

    DataStream<Token> tokenObjectStream = tokenSourceStream.process(new JsonToTokenProcessFunction(sideOutputTag))
            .startNewChain()
            .uid("tokenObjectStream")
            .name("tokenObjectStream");

    BroadcastStream<Token> ruleBroadcastStream = tokenObjectStream.broadcast(ruleStateDescriptor);
    //</Config for broadcast state>


    //<Main Data Input Stream>
    DataStream<SampleInput> jsonToSampleInput = kafkaStream.process(new JsonToPojoProcessFunction(sideOutputTag))
            .startNewChain()
            .uid("sampleInputStream")
            .name("sampleInputStream");

    BroadcastConnectedStream<SampleInput, Token> broadcastConnectedStream = jsonToSampleInput.connect(ruleBroadcastStream);

    DataStream<SampleInput> matchedBroadcastStream = broadcastConnectedStream.process(new BroadcastProcessFunction<SampleInput, Token, SampleInput>() {

        @Override
        public void processElement(SampleInput sampleInput, ReadOnlyContext readOnlyContext, Collector<SampleInput> collector) throws Exception {

        }

        @Override
        public void processBroadcastElement(Token token, Context context, Collector<SampleInput> collector) throws Exception {

        }
    });

任何帮助将不胜感激。我确信我只是忽略了一些事情。谢谢!

最佳答案

事实证明 ttlConfig 对象是不可序列化的。删除它解决了该问题。

关于java - Flink奇怪的 "Cannot Serialize operator object class ...CoBroadcastWithNonKeyedOperator"错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55127868/

相关文章:

java - AbstractCassandraTupleSink 的实现不可序列化

java - 在 Flink 中合并多个流加入

java - 每个操作返回一个部门

java - 处理程序 sendEmptyMessage() 无法解析

java - webflow 和 Spring MVC Controller 之间的和谐

java - 尽管循环在数组长度范围内,但仍出现越界异常

java - Apache flink(稳定版本1.6.2)无法工作

java - Flink State过期时触发

java.lang.NoSuchMethodError : org. apache.xmlbeans.XmlOptions.setSaveAggressiveNamespaces()Lorg/apache/xmlbeans/XmlOptions;

java - ClassNotFoundException : org. apache.hadoop.conf.Configuration 启动 Flink SQL 客户端