google-cloud-platform - 如何在 Apache Beam 中提取 Google PubSub 发布时间

标签 google-cloud-platform google-cloud-dataflow apache-beam

我的目标是能够访问 Google PubSub 在 Apache Beam(数据流)中记录和设置的 PubSub 消息发布时间。

    PCollection<PubsubMessage> pubsubMsg
            = pipeline.apply("Read Messages From PubSub",
            PubsubIO.readMessagesWithAttributes()
                .fromSubscription(pocOptions.getInputSubscription()));

似乎不包含一个属性。 我已经尝试过

 .withTimestampAttribute("publish_time")

也没有运气。我缺少什么?是否可以在数据流中提取 Google PubSub 发布时间?

最佳答案

Java 版本:

PubsubIO 将从 Pub/Sub 读取消息,并将消息发布时间分配给元素作为记录时间戳。因此,您可以使用 ProcessContext.timestamp() 访问它。举个例子:

p
    .apply("Read Messages", PubsubIO.readStrings().fromSubscription(subscription))
    .apply("Log Publish Time", ParDo.of(new DoFn<String, Void>() {
        @ProcessElement
        public void processElement(ProcessContext c) throws Exception {
            LOG.info("Message: " + c.element());
            LOG.info("Publish time: " + c.timestamp().toString());
            Date date= new Date();
            Long time = date.getTime();
            LOG.info("Processing time: " + new Instant(time).toString());
        }
    }));

我提前发布了一条消息(事件和处理时间之间存在显着差异),DirectRunner 的输出是:

Mar 27, 2019 11:03:08 AM com.dataflow.samples.LogPublishTime$1 processElement
INFO: Message: I published this message a little bit before
Mar 27, 2019 11:03:08 AM com.dataflow.samples.LogPublishTime$1 processElement
INFO: Publish time: 2019-03-27T09:57:07.005Z
Mar 27, 2019 11:03:08 AM com.dataflow.samples.LogPublishTime$1 processElement
INFO: Processing time: 2019-03-27T10:03:08.229Z

最小代码 here

<小时/>

Python 版本:

现在可以通过 process 方法 ( docs ) 的 DoFn.TimestampParam 访问时间戳:

class GetTimestampFn(beam.DoFn):
  """Prints element timestamp"""
  def process(self, element, timestamp=beam.DoFn.TimestampParam):
    timestamp_utc = datetime.datetime.utcfromtimestamp(float(timestamp))
    logging.info(">>> Element timestamp: %s", timestamp_utc.strftime("%Y-%m-%d %H:%M:%S"))
    yield element

注意:日期解析感谢 this answer .

输出:

INFO:root:>>> Element timestamp: 2019-08-12 20:16:53

完整code

关于google-cloud-platform - 如何在 Apache Beam 中提取 Google PubSub 发布时间,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55370068/

相关文章:

java - 使用 Google Cloud Dataflow 的 Java API for Datastore 将属性设置为 null?

google-cloud-dataflow - SDK 版本 0.4.150414 损坏的作业

google-cloud-dataflow - Cloud Dataflow 新鲜度和延迟的确切定义是什么?

kubernetes - 不使用 gcloud sdk 连接到 Kubernetes 集群

google-cloud-platform - 谷歌云 CDN 卡在 0% 缓存命中率

google-app-engine - 如何设置 GCP App Engine 服务的服务名称

tomcat - Bitnami Tomcat 堆栈 GCE

python - 在运行管道之前提交 Dataflow 作业时可以运行设置脚本吗?

google-cloud-dataflow - 跨多个 DataFlow 作业/管道共享实例 - 可能吗?

python - Google Cloud Platform 数据流集成