java - Flink : java. io.NotSerializedException : redis.clients.jedis.JedisCluster

标签 java apache-flink jedis

当我提交新的 flink 作业时,它会抛出

Caused by: java.io.NotSerializableException: redis.clients.jedis.JedisCluster
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:512)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
    ... 24 more

这是我的代码:

    JedisCluster jedisCluster = JedisClusterBuilder.getInstance(JedisClusterEnum.THIRD);


    DataStream<MobileClickEvent> clickEventDataStream = environment.addSource(clickConsumer);


    clickEventDataStream
            .filter(Objects::nonNull)
            .keyBy(new KeySelector<MobileClickEvent, String>() {
                @Override
                public String getKey(MobileClickEvent value) throws Exception {
                    return value.getItemId() + "_" + value.getItemType();
                }
            })
            .process(new KeyedProcessFunction<String, MobileClickEvent, Object>() {
                @Override
                public void processElement(MobileClickEvent value, Context ctx, Collector<Object> out) throws Exception {
                    String key = ctx.getCurrentKey();
                    jedisCluster.hincrBy("{item_feature}" + key, "click", 1);
                    jedisCluster.expire("{item_feature}" + key, 60 * 10);
                }
            });

最佳答案

In OP's answer , jedisCluster 将为每个元素进行初始化。

考虑覆盖open以及并在那里初始化。

Initialization method for the function. It is called before the actual working methods (like map or join) and thus suitable for one time setup work.

.process(new KeyedProcessFunction<String, MobileClickEvent, Object>() {
    private JedisCluster jedisCluster;

    @Override
    public void open(Configuration parameters) {
        jedisCluster = JedisClusterBuilder.getInstance(JedisClusterEnum.THIRD);
    }

    @Override
    public void processElement(MobileClickEvent value, Context ctx, Collector<Object> out) throws Exception {
        String key = ctx.getCurrentKey();
        jedisCluster.hincrBy(REDIS_PREFIX + key, "click", 1);
        jedisCluster.expire(REDIS_PREFIX + key, 60 * 10);
    }
});

关于java - Flink : java. io.NotSerializedException : redis.clients.jedis.JedisCluster,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56057346/

相关文章:

java - 使用 HashMap 的 Set 迭代器不会产生值/键?

java - 在 JDBC 中处理 Oracle 的 "Timestamp With local TimeZone"数据类型

java - 可以运行多个java包,每个包都有一个主类

java - 如何提高 Flink 中数据流实现的不同计数?

java - @Valid不会触发@Repository中的验证

java - 如何通过将切入点作为用户的输入来将 spring aop 应用于遗留代码

java - Flink - 查询Kafka主题以获取消费者组的偏移量?

apache-flink - Flink 的简单 hello world 示例

redis - 仅错误 (P)SUBSCRIBE/(P)UNSUBSCRIBE/PING/QUIT 在此上下文中允许

Spring数据redis java.lang.NoSuchMethodError : org. springframework.util.Assert.isTrue(ZLjava/util/function/Supplier;)V