java - 当第一个包含某个事件的时间范围和第二个时间戳时,如何连接两个 PCollection?

标签 java google-cloud-dataflow apache-beam

我有一个用 Dataflow 编写的批处理管道。我想实现以下数据连接。

我有两个 PCollection。首先是代表 session :

class Session{
    String id
    long start; 
    long stop;
}

第二个代表一些事件:

class Event{
    long timestamp;
    String id;
}

我想加入这两个 PCollection,最后有类似 KV<Session,Iterable<Event>> 的内容- 因此该结构包含具有关联事件列表的 session 。如果事件的时间戳在一个 session (或多个 session )的时间范围内,则应将其聚合附加到它(或它们)。

实现这一目标的最佳方法是什么?

最佳答案

鉴于这是一个批处理管道,我要做的就是遍历所有可能的 Session首先,建立一个列表并将其另存为 PCollectionView 。然后,在解析每个Event时我们可以检查哪个Session会不会掉下来。

在我的测试中,我定义了类和构造函数,如下所示:

@DefaultCoder(AvroCoder.class)
public static class Session {
    String id;
    long start; 
    long stop;

    public Session(String id, long start, long stop) {
        this.id = id;
        this.start = start;
        this.stop = stop;
    }

    public Session() {
        // for serialization only
    }
}

@DefaultCoder(AvroCoder.class)
public static class Event {
    String id;
    long timestamp;

    public Event(String id, long timestamp) {
        this.id = id;
        this.timestamp = timestamp;
    }

    public Event() {
        // for serialization only
    }
}

我们将使用一些测试数据,例如:

// Example sessions data
final List<Session> sessionList = Arrays.asList(
    new Session("s1", 0L, 100L),
    new Session("s2", 100L, 200L),
    new Session("s3", 200L, 300L)
);

// Example event data
final List<Event> eventList = Arrays.asList(
    new Event("e1", 20L),
    new Event("e2", 60L),
    new Event("e3", 120L),
    new Event("e4", 160L),
    new Event("e5", 210L),
    new Event("e6", 290L)            
);

首先我们将构建 PCollectionView以及所有可能的 session :

// create PCollectionView from sessions
final PCollectionView<List<Session>> sessionPC = p
    .apply("Create Sessions", Create.of(sessionList))
    .apply("Save as List", View.asList());

并且,对于每个 Event ,我们将检查 AssignFn ParDo 其中 Session应该Event落入:

public static class AssignFn extends DoFn<Event, KV<Session, Event>> {  

    final PCollectionView<List<Session>> sessionPC;

    public AssignFn(PCollectionView<List<Session>> TagsideInput) {
        this.sessionPC = TagsideInput;
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
        Event event = c.element();

        // get side input with all possible Sessions
        List<Session> sessions = c.sideInput(sessionPC);

        // where does the Event fall in?
        for (Session session:sessions) { 
            if (event.timestamp >= session.start && event.timestamp <= session.stop) {
                c.output(KV.of(session, event));
                break;
            }
        }
    }
}

主要管道结构为:

p
    .apply("Create Events", Create.of(eventList))
    .apply("Assign Sessions", ParDo.of(new AssignFn(sessionPC))
        .withSideInputs(sessionPC))
    .apply("Group By Key", GroupByKey.<Session,Event>create())
    .apply("Log Grouped Results", ParDo.of(new LogFn()));

请注意,在 Session 之后作业,我们应用 GroupByKey操作以获得 KV<Session, Iterable<Event>> 形式的所需输出。

LogFn仅用于验证内容:

public static class LogFn extends DoFn<KV<Session, Iterable<Event>>, KV<Session, Iterable<Event>>> {  

    @ProcessElement
    public void processElement(ProcessContext c) {
        Session session = c.element().getKey();
        Iterable<Event> events = c.element().getValue();
        StringBuilder str = new StringBuilder(); 

        // print session info
        str.append(String.format("\nSession id=%s, start=%d, stop=%d", session.id, session.start, session.stop));

        // print each event info
        for (Event event:events) { 
            str.append(String.format("\n---Event id=%s, timestamp=%d", event.id, event.timestamp));
        }

        LOG.info(str.toString());

        c.output(c.element());
    }
}

我得到了预期的输出:

Nov 07, 2019 12:36:22 AM com.dataflow.samples.AssignSessions$LogFn processElement
INFO: 
Session id=s2, start=100, stop=200
---Event id=e3, timestamp=120
---Event id=e4, timestamp=160
Nov 07, 2019 12:36:22 AM com.dataflow.samples.AssignSessions$LogFn processElement
INFO: 
Session id=s1, start=0, stop=100
---Event id=e1, timestamp=20
---Event id=e2, timestamp=60
Nov 07, 2019 12:36:22 AM com.dataflow.samples.AssignSessions$LogFn processElement
INFO: 
Session id=s3, start=200, stop=300
---Event id=e6, timestamp=290
---Event id=e5, timestamp=210

完整代码here .

使用 Beam SDK 2.16.0 和 DirectRunner 进行测试.

关于java - 当第一个包含某个事件的时间范围和第二个时间戳时,如何连接两个 PCollection?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58731608/

相关文章:

java - 采用 C# 或 Java 的 Adob​​e In-Design

java - 不断检查日期的变化

java - 使用 AspectJ 注释提供重写方法执行后的逻辑

java - hibernate一次插入三个表

java - 尝试使用 DataflowRunner 时出现 ClassNotFound 异常

google-cloud-dataflow - Google Cloud Dataflow 中的自动缩放功能未按预期运行

go - 从 Go 应用的 Dataflow 模板创建作业

python - Google Cloud DataFlow 无法将文件写入临时位置

google-cloud-dataflow - 使用 Dataflow failed insert WriteResult 处理 table not found 异常

apache-beam - 使用apache beam按顺序读取文件和文件夹