这是关于非常常见的传感器数据处理问题。
为了同步和合并来自不同来源的传感器数据,我想用 Java 实现它,而不需要太复杂的第三个库或框架。
假设我定义了一个对象 (O),它由 4 个属性 (A1,..A4) 组成。这 4 个属性来自不同的数据通道,例如套接字 channel 。
4个属性的到达频率一般在1.0~2.0Hz之间,并且它们的到达相互独立。 一旦有 4 个属性(A1,..A4)同时出现(在一个很小的时间窗口内,例如 100 毫秒),那么我就会从这 4 个属性构造一个新对象 (O)。
描述性场景如下。 A1~A4到达时间点带*标记。
对象O1~U3分别在时间点t1、t2、t3构建。 有些属性在 t2 和 t3 之间到达,但对于构造对象来说并不完整,因此它们 将被丢弃并被忽略。
A1 * * * *
A2 * * * *
A3 * * *
A4 * * * *
--------|------------|-----------------|----------> time
t1 t2 t3
O1 O2 O3
一些要求:
- 确定时间点a.s.a.p.,以根据最后传入的 4 个属性构造对象。
- FIFO、O1 必须先于 O2 构建,依此类推。
- Java 中较少的锁定
- 如果数据未完成以构造对象,则最终会丢弃数据。
一些关于实现的快速想法是:
- 将任何传入属性存储在时间离散存储桶的 FIFO 队列中(每个存储桶包含 4 个不同的属性)。
- 并发运行一个无限线程来检查 FIFO 队列(从队列头部开始)是否有任何存储桶已填充了 4 个不同的属性。如果是,则构造一个对象并将该桶从队列中移除。如果桶在特定时间窗口内没有完全填满,它将被丢弃。
欢迎任何建议和指正!
最佳答案
这不太可能解决您的问题,但它可能会为您指明正确的方向。
我会使用 Google Guava 的 MapMaker第一次尝试:
ConcurrentMap<Key, Bucket> graphs = new MapMaker()
.expireAfterAccess(100, TimeUnit.MILLISECOND)
.makeComputingMap(new Function<Key, Bucket>() {
public Bucket apply(Key key) {
return new Bucket(key);
}
});
这将创建一个映射,如果 100 毫秒内未访问该条目,该映射的条目就会消失,并在需要时创建一个新存储桶。
我无法弄清楚 key 到底是什么:S您真正想要的是队列形式的同类功能。
关于java - 同步和合并消息/数据流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6609790/