经过几天的研究,为什么我的 Flink application is not working properly我得出的结论是,问题出在我正在使用的 MinMaxPriorityQueue
中。
这个结构体似乎不可序列化。我尝试了几种方法来序列化它:
env.getConfig.registerTypeWithKryoSerializer(classOf[MinMaxPriorityQueue[Double]], classOf[JavaSerializer])
env.getConfig.registerTypeWithKryoSerializer(classOf[MinMaxPriorityQueue[java.lang.Double]], classOf[ProtobufSerializer]);
env.getConfig().addDefaultKryoSerializer(MyCustomType.class, TBaseSerializer.class);
他们都没有运气。
但是我发现了这个:Serializing Guava's ImmutableTable
是否有与 MinMaxPriorityQueue 等效的方法,或者序列化它的方法?
更新
我已将 Tomasz 翻译成 scala:
class MinMaxPriorityQueueSerializer extends Serializer[MinMaxPriorityQueue[Object]] {
private[this] val log = LoggerFactory.getLogger(this.getClass)
setImmutable(false)
setAcceptsNull(false)
val OPTIMIZE_POSITIVE = true
override def read(kryo: Kryo, input: Input, aClass: Class[MinMaxPriorityQueue[Object]]): MinMaxPriorityQueue[Object] = {
log.error("Kryo READ")
val comparator: Ordering[Object] = kryo.readClassAndObject(input).asInstanceOf[Ordering[Object]]
val size = input.readInt(OPTIMIZE_POSITIVE)
val queue: MinMaxPriorityQueue[Object] = MinMaxPriorityQueue.orderedBy(comparator)
.expectedSize(size)
.create()
(0 to size).foreach(_ => queue.offer(kryo.readClassAndObject(input)))
queue
}
override def write(kryo: Kryo, output: Output, queue: MinMaxPriorityQueue[Object]): Unit = {
log.error("Kryo WRITE")
kryo.writeClassAndObject(output, queue.comparator)
val declaredSize = queue.size
output.writeInt(declaredSize, OPTIMIZE_POSITIVE)
val actualSize = queue.toArray.foldLeft(0) {
case (z, q) =>
kryo.writeClassAndObject(output, q)
z + 1
}
Preconditions.checkState(
declaredSize == actualSize,
"Declared size (%s) different than actual size (%s)", declaredSize, actualSize)
}
}
并在 flink 中设置 kryo 以使用该序列化器:
env.getConfig.addDefaultKryoSerializer(classOf[MinMaxPriorityQueue[Double]], classOf[MinMaxPriorityQueueSerializer])
env.getConfig.registerTypeWithKryoSerializer(classOf[MinMaxPriorityQueue[Double]], classOf[MinMaxPriorityQueueSerializer])
但是它似乎从未被调用过,因为我在日志中没有看到 log.error("Kryo READ")
和 log.error("Kryo WRITE"的输出”)
即使我正在更新它,转换仍然返回一个空的 MinMaxPriorityQueue。
更新2
我已经实现了 SerializerTester,但我得到了 bufferUnderflow:
object Main {
def main(args: Array[String]) {
val tester = new MinMaxPriorityQueueSerializerTester()
val inQueue: MinMaxPriorityQueue[java.lang.Double] = MinMaxPriorityQueue.create()
inQueue.add(1.0)
val outputStream = new ByteArrayOutputStream()
tester.serialize(outputStream, inQueue)
val inputStream = new ByteArrayInputStream(outputStream.toByteArray())
val outQueue: MinMaxPriorityQueue[java.lang.Double] = tester.deserialize(inputStream);
System.out.println(inQueue);
System.out.println(outQueue);
}
class MinMaxPriorityQueueSerializerTester {
val kryo = new Kryo
kryo.setInstantiatorStrategy(new StdInstantiatorStrategy)
registerMinMaxSerializer();
// allowForClassesWithoutNoArgConstructor(); // needed to serialize Ordering
def registerMinMaxSerializer() {
kryo.addDefaultSerializer(classOf[MinMaxPriorityQueue[java.lang.Double]], new MinMaxPriorityQueueSerializer());
}
def serialize(out: OutputStream, queue: MinMaxPriorityQueue[java.lang.Double]) {
// try (Output output = new Output(out)) {
val output = new Output(out)
kryo.writeClassAndObject(output, queue)
// kryo.writeObject(output, queue)
//}
output.flush
}
def deserialize(in: InputStream): MinMaxPriorityQueue[java.lang.Double] = {
//try (Input input = new Input(in)) {
val input = new Input(in)
//kryo.readObject(input, classOf[MinMaxPriorityQueue[java.lang.Double]])
kryo.readClassAndObject(input).asInstanceOf[MinMaxPriorityQueue[java.lang.Double]]
//p}
}
}
最佳答案
您可以使用自定义 Kryo 序列化器
。
这是一个示例(Java 语言):
class MinMaxPriorityQueueSerializer extends Serializer<MinMaxPriorityQueue<Object>> {
private static final boolean OPTIMIZE_POSITIVE = true;
protected MinMaxPriorityQueueSerializer() {
setAcceptsNull(false);
setImmutable(false);
}
@Override
public void write(Kryo kryo, Output output, MinMaxPriorityQueue<Object> queue) {
kryo.writeClassAndObject(output, queue.comparator());
int declaredSize = queue.size();
output.writeInt(declaredSize, OPTIMIZE_POSITIVE);
int actualSize = 0;
for (Object element : queue) {
kryo.writeClassAndObject(output, element);
actualSize++;
}
Preconditions.checkState(
declaredSize == actualSize,
"Declared size (%s) different than actual size (%s)", declaredSize, actualSize
);
}
@Override
public MinMaxPriorityQueue<Object> read(Kryo kryo, Input input, Class<MinMaxPriorityQueue<Object>> type) {
@SuppressWarnings("unchecked")
Comparator<Object> comparator = (Comparator<Object>) kryo.readClassAndObject(input);
int size = input.readInt(OPTIMIZE_POSITIVE);
MinMaxPriorityQueue<Object> queue = MinMaxPriorityQueue.orderedBy(comparator)
.expectedSize(size)
.create();
for (int i = 0; i < size; ++i) {
queue.offer(kryo.readClassAndObject(input));
}
return queue;
}
}
以下是如何使用它:
class MinMaxPriorityQueueSerializerTester {
public static void main(String[] args) {
MinMaxPriorityQueueSerializerTester tester = new MinMaxPriorityQueueSerializerTester();
MinMaxPriorityQueue<Integer> inQueue = MinMaxPriorityQueue.<Integer>orderedBy(Comparator.reverseOrder())
.create(Arrays.asList(5, 2, 7, 2, 4));
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
tester.serialize(outputStream, inQueue);
ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
@SuppressWarnings("unchecked")
MinMaxPriorityQueue<Integer> outQueue = (MinMaxPriorityQueue<Integer>) tester.deserialize(inputStream);
System.out.println(inQueue);
System.out.println(outQueue);
}
private final Kryo kryo;
public MinMaxPriorityQueueSerializerTester() {
this.kryo = new Kryo();
registerMinMaxSerializer();
allowForClassesWithoutNoArgConstructor(); // needed to serialize Ordering
}
private void registerMinMaxSerializer() {
kryo.addDefaultSerializer(MinMaxPriorityQueue.class, new MinMaxPriorityQueueSerializer());
}
private void allowForClassesWithoutNoArgConstructor() {
((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy())
.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
}
public void serialize(OutputStream out, MinMaxPriorityQueue<?> queue) {
try (Output output = new Output(out)) {
kryo.writeObject(output, queue);
}
}
public MinMaxPriorityQueue<?> deserialize(InputStream in) {
try (Input input = new Input(in)) {
return kryo.readObject(input, MinMaxPriorityQueue.class);
}
}
}
关于scala - 序列化Guava的MinMaxPriorityQueue,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51086803/