我有一个用例,我初始化一个包含一组查找数据(有关 IoT 设备的物理位置等的信息)的 HashMap。此查找数据用作作为 PCollection 的第二个数据集的引用数据。此 PCollection 是一个数据流,提供 IoT 设备记录的数据。来自 IoT 设备的数据流使用 Apache Beam 管道,该管道作为使用 Google Cloud 发布/订阅的 Google 数据流运行。
当我处理 PCollection(设备数据)时,我将 Google Cloud 发布/订阅数据链接到 HashMap 中的相关查找条目。
我需要更新 HashMap,基于将更改推送到其数据的第二个发布/订阅。到目前为止,这是我如何获取 PCollection 并使用 HashMap 进行查找:
HashMap -> 包含预加载的查找数据(有关 IoT 设备的信息)
PCollection -> 包含来自管道数据流的数据(物联网设备记录的数据)
我正在为 IoT 设备查找数据生成一个 HashMap 作为单例:
public class MyData {
private static final MyData instance = new MyData ();
private MyData () {
HashMap myDataMap = new HashMap<String, String>();
... logic to populate the map
this.referenceData = myDataMap;
}
public HashMap<Integer, DeviceReference> referenceData;
public static DeviceData getInstance(){
return instance;
}
}
然后,我在另一个类中使用 HashMap,在该类中我订阅数据更新(这些消息例如为我提供与已存储在 HashMap 中的实体相关的新数据)。我正在使用带有 Apache Beam 的 Google 发布/订阅订阅更改:
HashMap<String, String> referenceData = MyData.getInstance().referenceData;
Pipeline pipeLine = Pipeline.create(options);
// subscribe to changes in data
org.apache.beam.sdk.values.PCollection myDataUpdates;
myDataUpdates = pipeLine.begin()
.apply(String.format("Subscribe to data updates"),
PubsubIO.readStrings().fromTopic(
String.format("myPubSubPath")));
我想做的是有效地将数据更新应用于单例 HashMap(即根据我的数据订阅操作 HashMap)。我该怎么做?
我对 Apache Beam 的了解有限,我只知道如何对管道数据进行转换以创建另一个单独的 PCollection
。我认为这就是 Beam 的意义所在,它用于将大型数据集转换为不同的形式。有没有一种方法可以使用 Apache Beam 实现我需要的功能(更新 基于发布/订阅订阅的数据集),或者有没有其他方法可以使用发布/订阅更新 HashMap? (我无法轮询数据,因为它会产生太多延迟和成本,我需要使用订阅来更新 HashMap)。
Google 云文档展示了一种 directly subscribing to a Google Cloud pub/sub that isn't linked to an Apache Beam pipeline 的方法.这是一个很有前途的潜在解决方案,并且依赖于以下 Maven 依赖项:
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<version>1.53.0</version>
</dependency>
不过我遇到了冲突,它与 Apache Beam 的以下 Maven 依赖项发生冲突:
<dependency>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
<version>2.5.0</version>
</dependency>
此问题记录在此处的单独问题中 - Maven conflict in Java app with google-cloud-core-grpc dependency .从我所看到的情况来看,我使用哪个版本的 google-cloud-pubsub
Maven 工件似乎并不重要,因为从我的理解来看,它看起来 像 v.2.5.0 beam 依赖项及以下版本将始终与任何当前版本的 google 依赖项冲突。
(我已在 Beam Jira 中将此作为问题提出 - https://issues.apache.org/jira/browse/BEAM-6118)
我目前正在研究侧输入和组合
作为实现 HashMap 更新的方法:
https://www.programcreek.com/java-api-examples/?api=org.apache.beam.sdk.transforms.Combine
示例 10 展示了一种可以将 .getSideInputsMap()
应用于 payload
的方法。我想知道我是否可以以某种方式将其应用于我对查找数据更改的订阅。如果我得到这样的 PCollection
,我不能直接将 .getSideInputsMap()
链接到 PCollection
deviceReferenceDataUpdates = pipeLine.begin()
.apply("Get changes to the IoT device lookup data"),
PubsubIO.readMessages().fromTopic("IoT device lookup data")).
我问了一个单独的问题,具体是关于如何使用 .getSideInputsMap()
- Apache Beam - how can I apply .getSideInputsMap to a subscription to a Google pub/sub?
最佳答案
我找到了一种在 Apache Beam 框架内执行此操作的方法,如下所示(未完全测试)。
Note - take into account the comment on the OP from @Serg M Ten that a better approach may be to consolidate the data later, instead of trying to join the lookup data as part of the transformation processing.
单例 HashMap
在这里查看我的答案 - Accessing a HashMap from a different class
管道(在单线程上,在main
中实现)
// initialise singleton HashMap containing lookup data on bootstrap:
LookupData lookupData = LookupData.getInstance();
org.apache.beam.sdk.values.PCollection lookupDataUpdateMessage;
lookupDataUpdateMessage = pipeLine.begin()
.apply("Extract lookup update data", PubsubIO.readStrings().fromTopic("myLookupUpdatePubSubTopic"))
.apply("Transform lookup update data",
ParDo.of(new TransformLookupData.TransformFn()));
org.apache.beam.sdk.values.PCollection lookupDataMessage;
转换
import java.io.Serializable;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.transforms.DoFn;
import org.json.JSONObject;
import myLookupSingletonClass;
import myLookupUpObjectClass;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.util.Strings;
public class TransformDeviceMeta
public static class TransformFn extends DoFn<String, MyLookupData> {
@ProcessElement
public void processElement(ProcessContext c)
{
LookupData lookupData = LookupData.getInstance();
MyLookupData myLookupDataUpdate = new MyLookupData();
try
{
byte[] payload = c.element().getBytes();
String myLookUpDataJson = new JSONObject(new String(payload)).toString();
ObjectMapper mapper = new ObjectMapper();
myLookUpDataUpdate = mapper.readValue(myLookUpDataJson , MyLookupData.class);
String updatedLookupDataId = updatedLookupDataId.id;
// logic for HashMap updating e.g:
lookupData.myHashMap.remove(updatedDeviceId);
}
else {
lookupData.myHashMap.put(updatedDeviceId, deviceMetaUpdate);
}
}
catch (Exception ex) {
Log.error(ex.getMessage());
System.out.println("Error " + ex.getMessage());
}
}
}
}
MyLookupData
= 构成查找数据模型的类
关于java - 使用 Google pub/sub 更新单例 HashMap,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53421932/