我有一个线程池,专门用于使用共享 DelayQueue 执行异步任务。
基本上,一切都运行良好,除了一件事:我希望能够推迟某些已经安排的任务的执行。 例如,假设我现在在时间 t=0 提交一个在 30 秒内执行的任务。 10 秒后(t=10),我决定,哦不,该任务不会在 t=30 执行,而是在 t=50 执行;因此我推迟到了 20 秒后。
为此,我有推迟方法,它修改任务设置的时间,从而更改 getDelay 的返回值。代码位于本文末尾。
不幸的是,它不起作用。实际上很容易破坏系统并导致过期元素保留在队列中的时间比正常情况下要长得多。 更具体地说,我观察到以下不良行为:
- 在时间 t=0 时,提交第一个任务以在 t=30 时执行
- 在 t=10 时,提交第二个任务以在 t=20 时执行
- 在 t=15 时,将第二个任务从 t=20 推迟到 t=100
- t=30 到达,但第一个任务没有执行,它停留在队列中。它的 getDelay 方法现在开始返回负值。
- t=40:第一个任务已经晚了 10 秒,但仍然没有任何反应。随着时间的推移,第一个任务的 getDelay 返回越来越小的值; DelayQueue 似乎肯定是混淆了。
- t=90:也许是一个希望,因为 q.poll 调用中设置了 30 秒的最大轮询时间。事实上,不,我得到一个 null 并继续等待下一个任务;我的第一个任务仍然留在队列中,延迟为负。
- t=100:小时!两个任务都一个接一个地执行……第二个任务准时完成,但第一个任务终于迟到了 70 秒。这是 Not Acceptable !
我还注意到,如果任务在进入队列时从未成为头部,我可以安全地推迟它,即不会干扰其他任务。
所以,我的问题是:
- 为什么会这样?我的代码是不是做错了什么?
- 我是否必须删除任务,然后再次提交,以模拟推迟?或者还有其他安全的方法吗?我是否真的可以删除一个对象,然后重新添加完全相同的对象,或者最好提交另一个对象以确保避免所有可能的混淆?
- 额外问题:删除操作的复杂性是多少?如果我假设队列是作为一种优先级堆实现的(不可能在优先级堆中进行二分搜索),那么大概是 O(n)。
感谢您的回答。
这是代码。我已经尽可能多地删除了不相关的部分。我特意删除了所有的异常处理。
public abstract class AbstractTaskExecutor<R extends Runnable> implements Runnable {
private final BlockingQueue<R> q;
...
public boolean submit (R dr) { return q.add(dr); }
public void run () {
while (!Thread.currentThread().isInterrupted()) {
Runnable r = q.poll(30, TimeUnit.SECONDS);
if (r!=null) r.run();
}}
}
public abstract class DelayedRunnable implements Runnable, Delayed {
private long time;
public DelayedRunnable (long l) {
time = System.currentTimeMillis() +l;
}
public final int compareTo (Delayed d) {
return (int)( Math.min(Math.max(Integer.MIN_VALUE, time - ((DelayedRunnable)d).time), Integer.MAX_VALUE) );
}
public final long getDelay (TimeUnit t) {
return t.convert(time - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
public final long getTime () { return time; }
public void postpone (long l) { time+=l; }
}
public class DelayedTaskExecutor extends AbstractTaskExecutor<DelayedRunnable> {
...
}
最佳答案
A question remains though: is it possible to reschedule the object that was just cancelled
如果您所做的只是更改“激活”时间,那么您必须删除它,更改它并将其添加回来,因为它可能会以不同的方式放置在数据结构中。这是必需的,因为结构决定了事件的顺序,并且如果您只是更改该值,这可能会也可能不会导致顺序更改。如果在添加 Map 后更改其键,也会遇到类似的问题。即结构行为不正确。
我会使用 ScheduledExecutorService 来包装延迟队列和线程池。
当您放置延迟任务时,您会得到一个 Future 对象,您可以根据需要取消并重新安排该对象。
At time t=0, submit a first task to execute at t=30
安排 30 年后的任务。
At t=10, submit a second task to execute at t=20
安排 10 年后的任务并保存 future 。
At t=15, postpone the second task from t=20 to t=100
Future.取消并重新安排
t=30 arrive, but the first task isn't executed, it stays in the queue. Its getDelay method now starts returning negative values.
在此示例中,除非您取消它,否则它将执行。
关于java - 使用DelayQueue异步执行任务: postponing an already submitted task,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14308401/