java - vertx.io java + twitter4j - 事件循环集成

标签 java twitter4j vert.x event-loop

我正在尝试将 twitter4j 与 vertx 事件循环集成,但我不知道我的做法是否正确。 尽管我是一名 Node.js 开发人员,但我还是 vertx 的新手,因此我熟悉事件循环/单线程概念。

在我的测试中,我想订阅 Twitter 流并在 vertx 事件总线上发布该流。

我创建了一个不可知的 TwitterAPI 类,它将把 twitter 流转换为 Observable(稍后将挂接到 vertx):

public class TwitterAPI {
    public Observable<Status> getTwitterObservable() {
        return Observable.create(emitter -> {
            final TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
            twitterStream.addListener(new StatusListener(){
                public void onStatus(Status status) {
                    emitter.onNext(status);
                }
                public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {}
                public void onTrackLimitationNotice(int numberOfLimitedStatuses) {}
                public void onException(Exception ex) {
                    emitter.onError(ex);
                }
                @Override
                public void onScrubGeo(long userId, long upToStatusId) {}

                @Override
                public void onStallWarning(StallWarning warning) {}
            });

            //twitterStream.filter("some keyword");
            twitterStream.sample();
        });
    }
}

然后我创建了一个 TwitterVerticle,它将监听上面的 Observable 并将流发布到事件总线上,以便其他一些 verticle 可以订阅它并处理它:

public class TwitterVerticle extends AbstractVerticle {
    public void start() {
        EventBus eb = this.vertx.eventBus();
        TwitterAPI twitterAPI = new TwitterAPI();
        twitterAPI.getTwitterObservable()
            .map(Status::getText)
            .filter(text -> text.startsWith("my keyword"))
            .subscribe(text -> {
                eb.publish("tweet-feed", text);
            });
    }
}

例如,我创建了另一个 verticle,它将监听事件总线上的“twitter-feed”并将其发布到 WebSocket 上,以便您可以在浏览器中看到提要。

乍一看一切都运行良好...但是...我的主要问题是:我不确定 twitter4j 能否很好地处理事件循环,也许我的集成技术是错误的。也许我应该让 TwitterVerticle 成为 Worker Verticle?

有人可以看一下并告诉我这是否是完成此类任务的最佳方法吗?

非常感谢!

编辑

在这种情况下,直接在事件总线上发布是更好的模式吗? 下面是修改后的代码:

public class TwitterAPI {

    public void publishStreamOnEventBus(Vertx vertx) {
        EventBus eb = vertx.eventBus();
        TwitterStream twitterStream = new TwitterStreamFactory().getInstance();

        twitterStream.addListener(new StatusListener(){
            public void onStatus(Status status) {
                eb.publish("tweet-feed", status.getText());
            }
            public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {}
            public void onTrackLimitationNotice(int numberOfLimitedStatuses) {}
            public void onException(Exception ex) {
                //emitter.onError(ex);
            }
            @Override
            public void onScrubGeo(long userId, long upToStatusId) {}

            @Override
            public void onStallWarning(StallWarning warning) {}
        });

        twitterStream.sample();
    }
}



public class Main {

    public static void main(String[] args) {
        Vertx vertx = Vertx.vertx();
        new TwitterAPI().publishStreamOnEventBus(vertx);

        //vertx.deployVerticle(new TwitterVerticle()/*, new DeploymentOptions().setWorker(true)*/);
        vertx.deployVerticle(new WebServerVerticle());
    }

}

最佳答案

首先是一些一般性建议。

1/如果订阅 Observable 涉及调用阻塞 API,请使用 blockingScheduler:

Scheduler blockingScheduler = io.vertx.rx.java.RxHelper.blockingScheduler(vertx);
Observable<String> obs = twitterObservable.subscribeOn(blockingScheduler);

2/然后我假设 twitter4j 使用自己的线程来调用 StatusListener,因此 EventBus#publish 调用将在其中一个上进行那些。要返回到 verticle 上下文,请使用 #observeOn 运算符:

Scheduler contextScheduler = io.vertx.rx.java.RxHelper.scheduler(context);
Observable<String> obs = twitterObservable.observeOn(contextScheduler);

结合这两个更改:

public class TwitterVerticle extends AbstractVerticle {
    public void start() {
        EventBus eb = this.vertx.eventBus();
        Scheduler contextScheduler = io.vertx.rx.java.RxHelper.scheduler(context);
        Scheduler blockingScheduler = io.vertx.rx.java.RxHelper.blockingScheduler(vertx);
        TwitterAPI twitterAPI = new TwitterAPI();
        twitterAPI.getTwitterObservable()
            .map(Status::getText)
            .filter(text -> text.startsWith("my keyword"))
            .observeOn(contextScheduler)
            .subscribeOn(blockingScheduler)
            .subscribe(text -> {
                eb.publish("tweet-feed", text);
            });
    }
}
综上所述,如果这是 verticle 的唯一工作,我建议摆脱它并简单地发布到事件总线。事件总线实例是线程安全的,并且完全可以从外部(非 Vert.x)世界调用publish。实际上,这是将 Vert.x 代码与遗留代码结合起来的一个很好的模式。

关于java - vertx.io java + twitter4j - 事件循环集成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43139884/

相关文章:

java - 在 Java 中获取某个时间段内某个国家/地区的所有推文

java - Twitter 搜索所有内容而不是相关性

java - 如何使用带有 Quarkus 的 Vertx 路由器将所有未找到的路由重定向到 index.html?

java - 当我向字符串追加字符时,如何删除 "null"单词?

java - 如何通过封闭网络搜索特定计算机?

java - 如何转义字符串以存储在 JSON 中

Twitter4j 具有地理定位功能的流媒体

java - Vertx 没有绑定(bind)路由器

java - 如何使用Vert.x将Blob插入到oracle数据库11g中?

java - 如何判断域名是上层(primary)还是下层(secondary)?