当我提交新的 flink 作业时,它会抛出
Caused by: java.io.NotSerializableException: redis.clients.jedis.JedisCluster
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:512)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
... 24 more
这是我的代码:
JedisCluster jedisCluster = JedisClusterBuilder.getInstance(JedisClusterEnum.THIRD);
DataStream<MobileClickEvent> clickEventDataStream = environment.addSource(clickConsumer);
clickEventDataStream
.filter(Objects::nonNull)
.keyBy(new KeySelector<MobileClickEvent, String>() {
@Override
public String getKey(MobileClickEvent value) throws Exception {
return value.getItemId() + "_" + value.getItemType();
}
})
.process(new KeyedProcessFunction<String, MobileClickEvent, Object>() {
@Override
public void processElement(MobileClickEvent value, Context ctx, Collector<Object> out) throws Exception {
String key = ctx.getCurrentKey();
jedisCluster.hincrBy("{item_feature}" + key, "click", 1);
jedisCluster.expire("{item_feature}" + key, 60 * 10);
}
});
最佳答案
In OP's answer , jedisCluster
将为每个元素进行初始化。
考虑覆盖open
以及并在那里初始化。
Initialization method for the function. It is called before the actual working methods (like map or join) and thus suitable for one time setup work.
.process(new KeyedProcessFunction<String, MobileClickEvent, Object>() {
private JedisCluster jedisCluster;
@Override
public void open(Configuration parameters) {
jedisCluster = JedisClusterBuilder.getInstance(JedisClusterEnum.THIRD);
}
@Override
public void processElement(MobileClickEvent value, Context ctx, Collector<Object> out) throws Exception {
String key = ctx.getCurrentKey();
jedisCluster.hincrBy(REDIS_PREFIX + key, "click", 1);
jedisCluster.expire(REDIS_PREFIX + key, 60 * 10);
}
});
关于java - Flink : java. io.NotSerializedException : redis.clients.jedis.JedisCluster,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56057346/