java - 数据流SDK 2.x : how to consume from PubSubIO using java serialization

标签 java google-cloud-dataflow apache-beam google-cloud-pubsub

我是 Dataflow 新手,我要将以下代码片段从 Java SDK 1.9.0 迁移到 2.3.0:

//SDK 1.9.0
PCollection<MyType> pubsub = p.apply(
  PubsubIO.Read.named("Read from Pubsub")
  .topic(myTopic)
  .withCoder(SerializableCoder.of(MyType.class))
  .timestampLabel("myDate"));

我会将其转换为

//SDK 2.3.0
PCollection<MyType> pubsub = p.apply("Read from Pubsub",
  PubsubIO.<MyType>read () // <-- COMPILE ERROR here, private method
  .fromTopic(myTopic)
  .withTimestampAttribute ("myDate"))
.setCoder(SerializableCoder.of(MyType.class));

但从 java SDK 2.3.0 开始,PubsubIO.read() 方法是私有(private)的。

因此,我需要使用带有 MyType 序列化实例的消息,但 PubsubIO 公开的方法似乎仅适用于短信、avro、protobuf 等。

如何从消息包含序列化 Java 对象的 PubsubIO 主题中读取内容?

更新:

我可以这样调整它(尚未尝试)...

PCollection<MyType> pubsub = p.apply("Read from Pubsub",
  PubsubIO.readMessagesWithAttributes ()
  .fromTopic(myTopic)
  .withTimestampAttribute ("myDate"))
.apply (MapElements.via(new SimpleFunction<PubsubMessage, MyType> () {
        @Override
        public MyType apply (final PubsubMessage message) {
            final byte[] payload = message.getPayload ();
            try {
                try (final ObjectInputStream stream = new ObjectInputStream (new ByteArrayInputStream (payload))) {
                    return (MyType) stream.readObject ();
                }
            } catch (IOException e) {
                throw new RuntimeException (e);
            } catch (ClassNotFoundException e) {
                throw new RuntimeException (e);
            }
        }
    }))

最佳答案

您更新的代码看起来应该可以工作。请注意,如果您不使用属性映射,还有 PubsubIO.readPubsubMessagesWithoutAttributes()

之前的功能已在 PR#2634 中删除,它将其替换为最常见编码类型(proto、avro、字符串)的专用方法。

我怀疑由于依赖 Java 序列化的固有危险,未保留通过 SerializedCoder 进行的任意对象解码。请参阅SerializableCoder javadoc 或相关问题Java serialization - advantages and disadvantages, use or avoid? 。但是,如果您觉得 API 有所欠缺,Beam SDK 是开源的,社区欢迎 contributions .

关于java - 数据流SDK 2.x : how to consume from PubSubIO using java serialization,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49510560/

相关文章:

java - Karaf OSGI 环境中的 Jersey Web 应用程序无法运行

java - 如何在没有 X11 的情况下从 Linux 控制台触发手动 Java GC

go - 在Apache Beam Dataflow管道中控制并行度

java - 如何从 bst 返回排序数组(仅使用局部变量)?

java - 我可以在 Jxls 2+ 中的模板和输出格式中使用 .XLSX 吗?

java - BigQuery 写入完成后数据流发送 PubSub 消息

google-cloud-platform - 为什么使用 Dataflow 写入 Bigquery 非常慢?

python - 从 Bigquery 中读取几行作为辅助输入,得到 None

apache-spark - 根据数据作业的大小启动 kubernetes pod 内存

python - 写入 BigQuery 动态表名 Python SDK