java - 如何提取 Kafka Streams 消息中嵌入的时间戳

标签 java apache-kafka apache-kafka-streams

我想提取每条消息中嵌入的时间戳并将它们作为 json 有效负载发送到我的数据库中。

我想获取以下三个时间戳。

事件时间: 事件或数据记录发生的时间点,即最初“由源”创建的时间点。

Processing-time: 事件或数据记录恰好被流处理应用程序处理的时间点,即记录被消费的时间点。

Ingestion-time: Kafka 代理将事件或数据记录存储在主题分区中的时间点。

这是我的流应用程序代码:

Properties props = new Properties();

props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_URL + ":9092"); // pass from env localhost:9092 ,BROKER_URL + ":9092"
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

final StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> source_o365_user_activity = builder.stream("o365_user_activity");

source_o365_user_activity.flatMapValues(new ValueMapper<String, Iterable<String>>() {
    @Override
    public Iterable<String> apply(String value) {
        System.out.println("========> o365_user_activity_by_date Log:     " + value);
        ArrayList<String> keywords = new ArrayList<String>();
        try {
            JSONObject send = new JSONObject();
            JSONObject received = new JSONObject(value);

            send.put("current_date", getCurrentDate().toString()); // UTC TIME
            send.put("activity_time", received.get("CreationTime")); // CONSTANTS FINAL STATIC(Topic Names, Cassandra keys)
            send.put("user_id", received.get("UserId"));
            send.put("operation", received.get("Operation"));
            send.put("workload", received.get("Workload"));
            keywords.add(send.toString());

        } catch (Exception e) {
            // TODO: handle exception
            System.err.println("Unable to convert to json");
            e.printStackTrace();
        }

        return keywords;
    }
}).to("o365_user_activity_by_date");

在代码中,我只是获取每条记录,对其进行一些流处理并将其发送到不同的主题。

现在,对于每条记录,我要发送嵌入有效负载中的事件时间处理时间摄取时间

我查看了 FailOnInvalidTimestampWallclockTimestampExtractor 但我对如何使用它们感到困惑。

请指导我如何实现这一目标。

最佳答案

Timestamp 提取器只能为您提供一个时间戳,并且此时间戳用于基于时间的操作,如窗口聚合或连接。看起来你没有做任何基于时间的计算思想,因此,从计算的角度来看,这无关紧要。

请注意,一条记录只有一个元数据时间戳字段。此时间戳字段可用于存储可由生产者设置的事件时间戳。作为替代方案,您可以让代理使用代理摄取时间覆盖生产者提供的时间戳。这是主题配置。

要访问记录元数据时间戳(独立于事件时间或摄取时间),默认时间戳提取器会为您提供此时间戳。如果您想在您的应用程序中访问它,您需要使用 Processor API,即在您的情况下是 .transform() 而不是 .flatMap 运算符。您的 Transformer 将使用一个 context 对象进行初始化,该对象允许您访问提取的时间戳。

因为一条记录只能存储一个元数据时间戳,并且因为您想将其用于代理摄取时间,所以上游生产者必须将事件时间戳直接放入有效负载中。

对于处理时间,只需按照您的代码片段中的指示进行系统调用即可。

关于java - 如何提取 Kafka Streams 消息中嵌入的时间戳,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49113131/

相关文章:

java - JScience 测量单位库与 android 不兼容

java - 保存随机数组以在整个应用程序中使用

java - 在 Java 中,调度程序可以在第一条指令之前挂起线程吗?

apache-kafka - Kafka Connect 能否保证 RetriableException 发生时的写入顺序?

apache-kafka - 查找分配给 Kafka 流实例的分区

java - KStream从一个集群到多个集群

java - 扩展/实现使用反射加载的类/接口(interface)

尝试使用 Kafka 数据存储运行 Geomesa 快速入门时出现 Java 错误

azure - 从 PySpark 写入大型 DataFrame 到 Kafka 遇到超时

java - kafka 流标点调用中 context().headers() 修改期间出现异常