我正在使用 Stream.generate
从 Instagram 获取数据。由于 instagram 限制每小时调用一次,我希望 generate
的运行频率低于每 2 秒一次。
我之所以选择这样的标题,是因为我从 ScheduledExecutorService.scheduleAtFixedRate
转移过来,而这正是我一直在寻找的。我确实意识到流中间操作是惰性的,不能按计划调用。如果您对标题有更好的想法,请告诉我。
所以我再次希望世代之间至少有 2 秒的延迟。
我的尝试没有考虑 generate
之后的操作所消耗的时间,这可能需要比 2s 更长的时间:
Stream.generate(() -> {
List<MediaFeedData> feedDataList = null;
while (feedDataList == null) {
try {
Thread.sleep(2000);
feedDataList = newData();
} catch (InstagramException e) {
notifyError(e.getMessage());
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return feedDataList;
})
最佳答案
一种解决方案是将生成器与 Stream
分离,例如使用 BlockingQueue
final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(100);
ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(1);
scheduler.scheduleAtFixedRate(() -> {
// Generate new data every 2s, regardless of their processing rate
ThreadLocalRandom random = ThreadLocalRandom.current();
queue.offer(random.nextInt(10));
}, 0, 2, TimeUnit.SECONDS);
Stream.generate(() -> {
try {
// Accept new data if ready, or wait for some more to be generated
return queue.take();
} catch (InterruptedException e) {}
return -1;
}).forEach(System.out::println);
如果数据处理时间超过2s,新数据会入队等待消费。如果用时少于 2 秒,生成器中的 take
方法将等待调度程序生成新数据。
通过这种方式,您可以保证每小时向 Instagram 调用的电话少于 N 次!
关于java - 流以固定速率生成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30052604/