我编写了以下 Java 代码:
twitterStream.addListener(new StreamListener());
FilterQuery filterQuery = new FilterQuery();
filterQuery.follow(filteringUsers);
filterQuery.track(filteringWords);
twitterStream.filter(filterQuery);
跟踪 Twitter 中的一些用户和关键字(通过 Streaming API)。这里,StreamListener
是我个人的监听器实现。
我正在跟踪大量关键字、主题标签和用户,因此我在内存中积累了大量等待处理的推文。事实上,我只是通过监听器(在 onStatus() 方法中)获取它们并将它们刷新到数据库中。
尽管如此,他们必须在内存中等待的事实显然会在几个小时内使内存饱和。在 20 分钟的运行中,我在内存中累积了 177000 个 LinkedBlockingQueue$Node
对象和 1.272MB 的 char[]
(通过分析看到)。
我希望保持管道持续运行,显然这在当前状态下是不可能的。
因此,我想知道是否有一种方法可以在多线程中添加多个监听器,以便它们可以同时清空推文队列并加快处理速度。
- 如果可能:这些监听器是否同时清空队列?我的意思是:他们是否会多次阅读同一条推文?
- 如果不可能:我该如何解决我的问题?
提前致谢。
最佳答案
尽管通过 Twitter4J 无法实现直接的多线程解决方案,但可以决定通过监听器类模拟多线程队列处理。
假设 StreamListener
是您对 StatusListener
Twitter4J 监听器的专门化。
我们将队列复制到 StreamListener
中,作为私有(private)属性:
private LinkedBlockingQueue<String> tweets;
队列在构造函数中初始化:
tweets = new LinkedBlockingQueue<String>();
此外,在构造函数中,我们构建了一个线程池,用于从队列中(批量)读取推文并将其存储在数据库中:
final ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
Runnable tweetAnalyzer = defineMonitoringRunnable(tweetRepository);
for (int i = 0; i < NUM_THREADS; i++) {
executor.execute(tweetAnalyzer);
try {
Thread.sleep(THREADS_DELAY);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
其中 Runnable
对象可以按如下方式构建:
private Runnable defineMonitoringRunnable(final TweetRepository tweetRepository) {
return new Runnable() {
@Override
public void run() {
List<String> tempTweets = new ArrayList<String>();
while (true) {
if (tweets.size() > 0) {
tempTweets.clear();
tweets.drainTo(tempTweets);
tweetRepository.insert(tempTweets);
}
try {
Thread.sleep(TWEETS_SAVING_TIME);
}
catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
};
}
(TWEETS_SAVING_TIME
是每个 Thread
对象在一条推文保存与另一条推文保存之间的等待时间)
最后,onStatus()
方法在推文到达监听器后将其存储在队列中:
@Override
public void onStatus(Status status) {
tweets.add(TwitterObjectFactory.getRawJSON(status));
}
关于java - 通过 Twitter4J 进行多线程 Twitter 访问,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24700781/