pojo - Apache Flink - 如何使用 AWS Kinesis 发送和使用 POJO

标签 pojo apache-flink amazon-kinesis flink-streaming

我想使用 Flink 使用来自 Kinesis 的 POJO。
是否有关于如何正确发送和反序列化消息的任何标准?

谢谢

最佳答案

我解决了它:

DataStream<SamplePojo> kinesis = see.addSource(new FlinkKinesisConsumer<>(
        "my-stream",
        new POJODeserializationSchema(),
        kinesisConsumerConfig));

public class POJODeserializationSchema extends AbstractDeserializationSchema<SamplePojo> {
    private ObjectMapper mapper;

    @Override
    public SamplePojo deserialize(byte[] message) throws IOException {
        if (mapper == null) {
            mapper = new ObjectMapper();
        }

        SamplePojo retVal = mapper.readValue(message, SamplePojo.class);

        return retVal;
    }

    @Override
    public boolean isEndOfStream(SamplePojo nextElement) {
        return false;
    }
}

关于pojo - Apache Flink - 如何使用 AWS Kinesis 发送和使用 POJO,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42885446/

相关文章:

java - 知道类中定义的任何变量的值何时更改

java - 如何使用java创建这个JSONObject?

amazon-kinesis - AWS Kinesis Stream Consumer使用推还是拉协议(protocol)?

java - 在java中设置相同的属性POJO

java - 如何使用 Jersey 将 POJO 序列化为查询参数

out-of-memory - Flink 死了;直接缓冲内存内存不足

kubernetes - Flink HA JobManager 集群无法选举领导者

scala - Apache Flink - org.apache.flink.client.program.ProgramInvocationException

java - AWS Lambda 性能问题

amazon-web-services - AWS Lambda 似乎在完成之前就退出了