java - 同步和合并消息/数据流

标签 java concurrency merge messaging sensors

这是关于非常常见的传感器数据处理问题。

为了同步和合并来自不同来源的传感器数据,我想用 Java 实现它,而不需要太复杂的第三个库或框架。

假设我定义了一个对象 (O),它由 4 个属性 (A1,..A4) 组成。这 4 个属性来自不同的数据通道,例如套接字 channel 。

4个属性的到达频率一般在1.0~2.0Hz之间,并且它们的到达相互独立。 一旦有 4 个属性(A1,..A4)同时出现(在一个很小的时间窗口内,例如 100 毫秒),那么我就会从这 4 个属性构造一个新对象 (O)。

描述性场景如下。 A1~A4到达时间点带*标记。

对象O1~U3分别在时间点t1、t2、t3构建。 有些属性在 t2 和 t3 之间到达,但对于构​​造对象来说并不完整,因此它们 将被丢弃并被忽略。

  A1     *          *         *         *
  A2      *           *         *        *
  A3     *            *                  * 
  A4      *            *       *         * 
  --------|------------|-----------------|----------> time
          t1           t2                t3
          O1           O2                O3  

一些要求:

  1. 确定时间点a.s.a.p.,以根据最后传入的 4 个属性构造对象。
  2. FIFO、O1 必须先于 O2 构建,依此类推。
  3. Java 中较少的锁定
  4. 如果数据未完成以构造对象,则最终会丢弃数据。

一些关于实现的快速想法是:

  • 将任何传入属性存储在时间离散存储桶的 FIFO 队列中(每个存储桶包含 4 个不同的属性)。
  • 并发运行一个无限线程来检查 FIFO 队列(从队列头部开始)是否有任何存储桶已填充了 4 个不同的属性。如果是,则构造一个对象并将该桶从队列中移除。如果桶在特定时间窗口内没有完全填满,它将被丢弃。

欢迎任何建议和指正!

最佳答案

这不太可能解决您的问题,但它可能会为您指明正确的方向。

我会使用 Google Guava 的 MapMaker第一次尝试:

ConcurrentMap<Key, Bucket> graphs = new MapMaker()
                                   .expireAfterAccess(100, TimeUnit.MILLISECOND)
                                   .makeComputingMap(new Function<Key, Bucket>() {
                                                     public Bucket apply(Key key) {
                                                         return new Bucket(key);
                                                     }
                                    });

这将创建一个映射,如果 100 毫秒内未访问该条目,该映射的条目就会消失,并在需要时创建一个新存储桶。

我无法弄清楚 key 到底是什么:S您真正想要的是队列形式的同类功能。

关于java - 同步和合并消息/数据流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6609790/

相关文章:

git - Git 会记录过去的 merge 冲突吗?

java - 验证空文本框?

java - Hibernate:如何建模继承类型结构并在没有显式转换的情况下执行操作

java - 将java值插入sqlite数据库

java - JCIP 书中定义的 core java 内部 protected 对象的示例

git:我可以只 merge 存储库的子路径吗?

java - 制作基本的java程序来计算点击次数时遇到问题

java - 寻找java并发模型建议

java - Netty:使用多个事件循环的并发问题

r - 如何合并 R 中的两列?