java - 通过 Twitter4J 进行多线程 Twitter 访问

标签 java multithreading twitter twitter4j

我编写了以下 Java 代码:

twitterStream.addListener(new StreamListener());

FilterQuery filterQuery = new FilterQuery();
filterQuery.follow(filteringUsers);
filterQuery.track(filteringWords);

twitterStream.filter(filterQuery);

跟踪 Twitter 中的一些用户和关键字(通过 Streaming API)。这里,StreamListener是我个人的监听器实现。

我正在跟踪大量关键字、主题标签和用户,因此我在内存中积累了大量等待处理的推文。事实上,我只是通过监听器(在 onStatus() 方法中)获取它们并将它们刷新到数据库中。

尽管如此,他们必须在内存中等待的事实显然会在几个小时内使内存饱和。在 20 分钟的运行中,我在内存中累积了 177000 个 LinkedBlockingQueue$Node 对象和 1.272MB 的 char[](通过分析看到)。

我希望保持管道持续运行,显然这在当前状态下是不可能的。

因此,我想知道是否有一种方法可以在多线程中添加多个监听器,以便它们可以同时清空推文队列并加快处理速度。

  1. 如果可能:这些监听器是否同时清空队列?我的意思是:他们是否会多次阅读同一条推文?
  2. 如果不可能:我该如何解决我的问题?

提前致谢。

最佳答案

尽管通过 Twitter4J 无法实现直接的多线程解决方案,但可以决定通过监听器类模拟多线程队列处理。

假设 StreamListener 是您对 StatusListener Twitter4J 监听器的专门化。

我们将队列复制到 StreamListener 中,作为私有(private)属性:

private LinkedBlockingQueue<String> tweets;

队列在构造函数中初始化:

tweets = new LinkedBlockingQueue<String>();

此外,在构造函数中,我们构建了一个线程池,用于从队列中(批量)读取推文并将其存储在数据库中:

    final ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
    Runnable tweetAnalyzer = defineMonitoringRunnable(tweetRepository);
    for (int i = 0; i < NUM_THREADS; i++) {
        executor.execute(tweetAnalyzer);
        try {
            Thread.sleep(THREADS_DELAY);
        } 
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

其中 Runnable 对象可以按如下方式构建:

private Runnable defineMonitoringRunnable(final TweetRepository tweetRepository) {
    return new Runnable() {

        @Override
        public void run() {
            List<String> tempTweets = new ArrayList<String>();

            while (true) {
                if (tweets.size() > 0) {
                    tempTweets.clear();
                    tweets.drainTo(tempTweets);

                    tweetRepository.insert(tempTweets);   
                }

                try {
                    Thread.sleep(TWEETS_SAVING_TIME);
                } 
                catch (InterruptedException ex) {
                    ex.printStackTrace();
                }

            }
        }
    };
}

(TWEETS_SAVING_TIME 是每个 Thread 对象在一条推文保存与另一条推文保存之间的等待时间)

最后,onStatus() 方法在推文到达监听器后将其存储在队列中:

@Override
public void onStatus(Status status) {   
    tweets.add(TwitterObjectFactory.getRawJSON(status));
}

关于java - 通过 Twitter4J 进行多线程 Twitter 访问,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24700781/

相关文章:

c++ - 我如何在另一个线程 Qt 中显示 MessageBox

Javascript 将 DateTime 转换为 "______ seconds/minutes/hours/months ago"

java - Gradle在Jenkins中构建JaCoCo jacocoTestReport时排除预构建的jar和类

java - 有什么方法可以将变量传递给 Java 中的验证约束消息?

java - 如何在 if、嵌套 if、else 语句中使用具有两种含义的 boolean 值

java - java中倒计时锁存器的用法

multithreading - Lwt.async() 未按预期工作

search - Twitter:标签搜索查询

mysql - 邻接列表模型或嵌套集模型,我应该使用哪种数据模型来存储分层数据?

java - 需要在正则表达式中转义的所有特殊字符的列表