对 N 个数据流进行时间排序的算法

标签 algorithm sorting asynchronous time stream

所以我有 N 个异步的、带时间戳的数据流。每个流都有固定的速率。我想处理所有数据,但要注意的是我必须尽可能接近数据到达的时间来处理数据(这是一个实时流应用程序)。

到目前为止,我的实现是创建一个包含 K 条消息的固定窗口,我使用优先级队列按时间戳对其进行排序。然后我在移动到下一个窗口之前按顺序处理整个队列。这没关系,但它不太理想,因为它会产生与缓冲区大小成比例的滞后,并且如果消息刚好在缓冲区末尾处理完后到达,有时还会导致消息丢失。它看起来像这样:

// Priority queue keeping track of the data in timestamp order.
ThreadSafeProrityQueue<Data> q;
// Fixed buffer size
int K = 10;
// The last successfully processed data timestamp
time_t lastTimestamp = -1;

// Called for each of the N data streams asyncronously
void receiveAsyncData(const Data& dat) {
   q.push(dat.timestamp, dat);
   if (q.size() > K) {
       processQueue();
   }
}

// Process all the data in the queue.
void processQueue() {
    while (!q.empty()) {
        const auto& data = q.top();
        // If the data is too old, drop it.
        if (data.timestamp < lastTimestamp) {
            LOG("Dropping message. Too old.");
            q.pop();
            continue;
        }
        // Otherwise, process it.
        processData(data);
        lastTimestamp = data.timestamp;
        q.pop();
    }
}

关于数据的信息:它们保证在它们自己的流中被排序。它们的频率在 5 到 30 赫兹之间。它们由图像和其他数据组成。

为什么这比看起来更难的一些例子。假设我有两个流,A 和 B 都以 1 Hz 的频率运行,并且我按以下顺序获取数据:

(stream, time)
(A, 2)
(B, 1.5)
(A, 3)
(B, 2.5)
(A, 4)
(B, 3.5)
(A, 5)

看看如果我按接收数据的顺序处理数据,B 会如何总是被丢弃?这正是我想要避免的。现在在我的算法中,B 会每 10 帧被丢弃,我会处理过去 10 帧延迟的数据。

最佳答案

我建议使用生产者/消费者结构。让每个流将数据放入队列,并让一个单独的线程读取队列。即:

// your asynchronous update:
void receiveAsyncData(const Data& dat) {
   q.push(dat.timestamp, dat);
}

// separate thread that processes the queue
void processQueue()
{
    while (!stopRequested)
    {
        data = q.pop();
        if (data.timestamp >= lastTimestamp)
        {
            processData(data);
            lastTimestamp = data.timestamp;
        }
    }
}

这可以防止您在处理批处理时在当前实现中看到的“滞后”。

processQueue 函数在一个单独的持久线程中运行。 stopRequested 是程序在要关闭时设置的标志——强制线程退出。有些人会为此使用 volatile 标志。我更喜欢使用手动重置事件之类的东西。

要完成这项工作,您需要一个允许并发更新的优先级队列实现,或者您需要用同步锁包装您的队列。特别是,您要确保 q.pop() 在队列为空时等待下一个项目。或者当队列为空时,您永远不会调用 q.pop()。我不知道您的 ThreadSafePriorityQueue 的具体细节,所以我无法确切地说出您将如何编写它。

时间戳检查仍然是必要的,因为有可能在较早的项目之前处理较晚的项目。例如:

  1. 从数据流 1 接收到事件,但线程在添加到队列之前被换出。
  2. 从数据流 2 接收到事件,并添加到队列中。
  3. 来自数据流 2 的事件被 processQueue 函数从队列中移除。
  4. 上述步骤 1 中的线程获得另一个时间片,并将项目添加到队列中。

这并不罕见,只是很少见。时间差通常为微秒级。

如果您经常无序地获取更新,那么您可以引入人为延迟。例如,在您更新的问题中,您显示的消息以 500 毫秒的顺序乱序出现。假设 500 毫秒是您要支持的最大容差。也就是说,如果消息延迟超过 500 毫秒,它将被丢弃。

您所做的是在将事物添加到优先级队列时向时间戳添加 500 毫秒。即:

q.push(AddMs(dat.timestamp, 500), dat);

在处理事物的循环中,您不会在时间戳之前使事物出列。像这样的东西:

while (true)
{
    if (q.peek().timestamp <= currentTime)
    {
        data = q.pop();
        if (data.timestamp >= lastTimestamp)
        {
            processData(data);
            lastTimestamp = data.timestamp;
        }
    }
}

这会在所有项目的处理中引入 500 毫秒的延迟,但它可以防止丢弃落在 500 毫秒阈值内的“延迟”更新。您必须在对“实时”更新的期望与防止丢失更新的期望之间取得平衡。

关于对 N 个数据流进行时间排序的算法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44949899/

相关文章:

php - 在 PHP 中进行异步处理的最佳方式

javascript - JS 异步函数返回未定义,尽管在异步函数内调用它

c++ - 给定一个数组和整数 k 在每个大小为 k 的子数组中找到最大值

php - 将关联数组转换为索引数组的方法

java - 如何在最快的时间内对接近排序的数组进行排序? ( java )

java - 连续排序数组

asynchronous - 如何等待google geocoder.geocode?

Python自动化

algorithm - 二进制字符串排列

android - 从适配器中生成的值而不是对象变量对 RecyclerView 进行排序?