我试图从 yammer 指标中理解这段代码。困惑始于trim 方法以及update 和getSnapShot 中对trim 的调用。有人可以解释一下这里的逻辑吗?比如 15 分钟的滑动窗口?为什么您想要在将 map 传递到 SnapShot 之前清除 map (这是计算窗口统计信息的地方)。
package com.codahale.metrics;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class SlidingTimeWindowReservoir implements Reservoir {
// allow for this many duplicate ticks before overwriting measurements
private static final int COLLISION_BUFFER = 256;
// only trim on updating once every N
private static final int TRIM_THRESHOLD = 256;
private final Clock clock;
private final ConcurrentSkipListMap<Long, Long> measurements;
private final long window;
private final AtomicLong lastTick;
private final AtomicLong count;
public SlidingTimeWindowReservoir(long window, TimeUnit windowUnit) {
this(window, windowUnit, Clock.defaultClock());
}
public SlidingTimeWindowReservoir(long window, TimeUnit windowUnit, Clock clock) {
this.clock = clock;
this.measurements = new ConcurrentSkipListMap<Long, Long>();
this.window = windowUnit.toNanos(window) * COLLISION_BUFFER;
this.lastTick = new AtomicLong();
this.count = new AtomicLong();
}
@Override
public int size() {
trim();
return measurements.size();
}
@Override
public void update(long value) {
if (count.incrementAndGet() % TRIM_THRESHOLD == 0) {
trim();
}
measurements.put(getTick(), value);
}
@Override
public Snapshot getSnapshot() {
trim();
return new Snapshot(measurements.values());
}
private long getTick() {
for (; ; ) {
final long oldTick = lastTick.get();
final long tick = clock.getTick() * COLLISION_BUFFER;
// ensure the tick is strictly incrementing even if there are duplicate ticks
final long newTick = tick > oldTick ? tick : oldTick + 1;
if (lastTick.compareAndSet(oldTick, newTick)) {
return newTick;
}
}
}
private void trim() {
measurements.headMap(getTick() - window).clear();
}
}
最佳答案
文档中的两点信息
ConcurrentSkipListMap
is sorted according to the natural ordering of its keys
这是保存所有测量值的数据结构。这里的关键是一个 long ,基本上是当前时间。 -> 按时间索引的测量按时间排序。
.headMap(K toKey)
returns a view of the portion of this map whose keys are strictly less thantoKey
.
getTick
中的神奇代码可确保一个时间值永远不会使用两次(如果发生这种情况,只需使用 oldTick + 1
)。 COLLISION_BUFFER
理解起来有点棘手,但它基本上确保即使通过 Clock#getTick()
返回相同的值,您也会获得不会与下一个刻度相冲突的新值从时钟。
例如
Clock.getTick()
返回 0 -> 修改为 0 * 256 = 0
Clock.getTick()
返回 1 -> 修改为 1 * 256 = 256
-> 256 个值之间的空间。
现在 trim()
可以了
measurements.headMap(getTick() - window).clear();
这会计算“当前时间”,减去时间窗口并使用该时间来获取早于“窗口刻度前”的 map 部分。清除该部分也会清除原始 map 中的部分。它并没有清除整个 map ,只是清除那部分。
-> 修剪删除太旧的值。
每次更新
时,您都需要删除旧值,否则 map 会变得太大。创建快照
时会发生相同的事情,因此不包括那些旧值。
getTick 中的无限 for 循环是使用原子比较和设置方法的另一个技巧,以确保 - 一旦你准备好更新值 - 之间的值没有任何变化。如果发生这种情况,整个循环将重新开始并刷新其起始值。基本架构是
for (; ; ) {
long expectedOldValue = atomic.get();
// other threads can change the value of atomic here..
long modified = modify(expectedOldValue);
// we can only set the new value if the old one is still the same
if (atomic.compareAndSet(expectedOldValue, modified)) {
return modified;
}
}
关于java - 实现时间滑动窗口类时使用原子类型的非锁定线程代码,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18209585/