我有一个用 Dataflow 编写的批处理管道。我想实现以下数据连接。
我有两个 PCollection。首先是代表 session :
class Session{
String id
long start;
long stop;
}
第二个代表一些事件:
class Event{
long timestamp;
String id;
}
我想加入这两个 PCollection,最后有类似 KV<Session,Iterable<Event>>
的内容- 因此该结构包含具有关联事件列表的 session 。如果事件的时间戳在一个 session (或多个 session )的时间范围内,则应将其聚合附加到它(或它们)。
实现这一目标的最佳方法是什么?
最佳答案
鉴于这是一个批处理管道,我要做的就是遍历所有可能的 Session
首先,建立一个列表并将其另存为 PCollectionView
。然后,在解析每个Event
时我们可以检查哪个Session
会不会掉下来。
在我的测试中,我定义了类和构造函数,如下所示:
@DefaultCoder(AvroCoder.class)
public static class Session {
String id;
long start;
long stop;
public Session(String id, long start, long stop) {
this.id = id;
this.start = start;
this.stop = stop;
}
public Session() {
// for serialization only
}
}
@DefaultCoder(AvroCoder.class)
public static class Event {
String id;
long timestamp;
public Event(String id, long timestamp) {
this.id = id;
this.timestamp = timestamp;
}
public Event() {
// for serialization only
}
}
我们将使用一些测试数据,例如:
// Example sessions data
final List<Session> sessionList = Arrays.asList(
new Session("s1", 0L, 100L),
new Session("s2", 100L, 200L),
new Session("s3", 200L, 300L)
);
// Example event data
final List<Event> eventList = Arrays.asList(
new Event("e1", 20L),
new Event("e2", 60L),
new Event("e3", 120L),
new Event("e4", 160L),
new Event("e5", 210L),
new Event("e6", 290L)
);
首先我们将构建 PCollectionView
以及所有可能的 session :
// create PCollectionView from sessions
final PCollectionView<List<Session>> sessionPC = p
.apply("Create Sessions", Create.of(sessionList))
.apply("Save as List", View.asList());
并且,对于每个 Event
,我们将检查 AssignFn
ParDo 其中 Session
应该Event
落入:
public static class AssignFn extends DoFn<Event, KV<Session, Event>> {
final PCollectionView<List<Session>> sessionPC;
public AssignFn(PCollectionView<List<Session>> TagsideInput) {
this.sessionPC = TagsideInput;
}
@ProcessElement
public void processElement(ProcessContext c) {
Event event = c.element();
// get side input with all possible Sessions
List<Session> sessions = c.sideInput(sessionPC);
// where does the Event fall in?
for (Session session:sessions) {
if (event.timestamp >= session.start && event.timestamp <= session.stop) {
c.output(KV.of(session, event));
break;
}
}
}
}
主要管道结构为:
p
.apply("Create Events", Create.of(eventList))
.apply("Assign Sessions", ParDo.of(new AssignFn(sessionPC))
.withSideInputs(sessionPC))
.apply("Group By Key", GroupByKey.<Session,Event>create())
.apply("Log Grouped Results", ParDo.of(new LogFn()));
请注意,在 Session
之后作业,我们应用 GroupByKey
操作以获得 KV<Session, Iterable<Event>>
形式的所需输出。
LogFn
仅用于验证内容:
public static class LogFn extends DoFn<KV<Session, Iterable<Event>>, KV<Session, Iterable<Event>>> {
@ProcessElement
public void processElement(ProcessContext c) {
Session session = c.element().getKey();
Iterable<Event> events = c.element().getValue();
StringBuilder str = new StringBuilder();
// print session info
str.append(String.format("\nSession id=%s, start=%d, stop=%d", session.id, session.start, session.stop));
// print each event info
for (Event event:events) {
str.append(String.format("\n---Event id=%s, timestamp=%d", event.id, event.timestamp));
}
LOG.info(str.toString());
c.output(c.element());
}
}
我得到了预期的输出:
Nov 07, 2019 12:36:22 AM com.dataflow.samples.AssignSessions$LogFn processElement
INFO:
Session id=s2, start=100, stop=200
---Event id=e3, timestamp=120
---Event id=e4, timestamp=160
Nov 07, 2019 12:36:22 AM com.dataflow.samples.AssignSessions$LogFn processElement
INFO:
Session id=s1, start=0, stop=100
---Event id=e1, timestamp=20
---Event id=e2, timestamp=60
Nov 07, 2019 12:36:22 AM com.dataflow.samples.AssignSessions$LogFn processElement
INFO:
Session id=s3, start=200, stop=300
---Event id=e6, timestamp=290
---Event id=e5, timestamp=210
完整代码here .
使用 Beam SDK 2.16.0 和 DirectRunner
进行测试.
关于java - 当第一个包含某个事件的时间范围和第二个时间戳时,如何连接两个 PCollection?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58731608/