java - 实现时间滑动窗口类时使用原子类型的非锁定线程代码

标签 java multithreading concurrency atomic

我试图从 yammer 指标中理解这段代码。困惑始于trim 方法以及update 和getSnapShot 中对trim 的调用。有人可以解释一下这里的逻辑吗?比如 15 分钟的滑动窗口?为什么您想要在将 map 传递到 SnapShot 之前清除 map (这是计算窗口统计信息的地方)。

package com.codahale.metrics;

import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;


public class SlidingTimeWindowReservoir implements Reservoir {
    // allow for this many duplicate ticks before overwriting measurements
    private static final int COLLISION_BUFFER = 256;
    // only trim on updating once every N
    private static final int TRIM_THRESHOLD = 256;

    private final Clock clock;
    private final ConcurrentSkipListMap<Long, Long> measurements;
    private final long window;
    private final AtomicLong lastTick;
    private final AtomicLong count;


public SlidingTimeWindowReservoir(long window, TimeUnit windowUnit) {
    this(window, windowUnit, Clock.defaultClock());
}

public SlidingTimeWindowReservoir(long window, TimeUnit windowUnit, Clock clock) {
    this.clock = clock;
    this.measurements = new ConcurrentSkipListMap<Long, Long>();
    this.window = windowUnit.toNanos(window) * COLLISION_BUFFER;
    this.lastTick = new AtomicLong();
    this.count = new AtomicLong();
}

@Override
public int size() {
    trim();
    return measurements.size();
}

@Override
public void update(long value) {
    if (count.incrementAndGet() % TRIM_THRESHOLD == 0) {
        trim();
    }
    measurements.put(getTick(), value);
}

@Override
public Snapshot getSnapshot() {
    trim();
    return new Snapshot(measurements.values());
}

private long getTick() {
    for (; ; ) {
        final long oldTick = lastTick.get();
        final long tick = clock.getTick() * COLLISION_BUFFER;
        // ensure the tick is strictly incrementing even if there are duplicate ticks
        final long newTick = tick > oldTick ? tick : oldTick + 1;
        if (lastTick.compareAndSet(oldTick, newTick)) {
            return newTick;
        }
    }
}

private void trim() {
    measurements.headMap(getTick() - window).clear();
}
}

最佳答案

文档中的两点信息

ConcurrentSkipListMap is sorted according to the natural ordering of its keys

这是保存所有测量值的数据结构。这里的关键是一个 long ,基本上是当前时间。 -> 按时间索引的测量按时间排序。

.headMap(K toKey) returns a view of the portion of this map whose keys are strictly less than toKey.

getTick 中的神奇代码可确保一个时间值永远不会使用两次(如果发生这种情况,只需使用 oldTick + 1)。 COLLISION_BUFFER 理解起来有点棘手,但它基本上确保即使通过 Clock#getTick() 返回相同的值,您也会获得不会与下一个刻度相冲突的新值从时钟。

例如 Clock.getTick() 返回 0 -> 修改为 0 * 256 = 0

Clock.getTick() 返回 1 -> 修改为 1 * 256 = 256

-> 256 个值之间的空间。

现在 trim() 可以了

measurements.headMap(getTick() - window).clear();

这会计算“当前时间”,减去时间窗口并使用该时间来获取早于“窗口刻度前”的 map 部分。清除该部分也会清除原始 map 中的部分。它并没有清除整个 map ,只是清除那部分。

-> 修剪删除太旧的值。

每次更新时,您都需要删除旧值,否则 map 会变得太大。创建快照时会发生相同的事情,因此不包括那些旧值。

getTick 中的无限 for 循环是使用原子比较和设置方法的另一个技巧,以确保 - 一旦你准备好更新值 - 之间的值没有任何变化。如果发生这种情况,整个循环将重新开始并刷新其起始值。基本架构是

for (; ; ) {
    long expectedOldValue = atomic.get();
    // other threads can change the value of atomic here..

    long modified = modify(expectedOldValue);

    // we can only set the new value if the old one is still the same
    if (atomic.compareAndSet(expectedOldValue, modified)) {
        return modified;
    }
}

关于java - 实现时间滑动窗口类时使用原子类型的非锁定线程代码,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18209585/

相关文章:

java - 对 String[] 数组的数组列表进行排序

java - Redhat 上的 java.library.path 中没有 xuggle

java - 检测标准输入的空输入

Java线程wait()notify()在一个方法中

具有 Future 返回类型的 Scala 递归函数

mysql - MySQL 是否调用队列存储过程?

java - 在单个数据结构中维护来自两种不同类型值的 java 映射

c# - 我可以在我的 C# 应用程序中获取所有线程的堆栈跟踪吗?

java - JAVA中如何避免到处检查线程中断

c++ - 并发数据结构设计