android - RxJava + Websocket - 如何将 Observable 添加到 Websocket 监听器?

标签 android rx-java rx-java2 okhttp android-mvvm

我有一个 ViewModel正在观察 RxJava Observable在我的MainRepo类(class)。我正在尝试获取我的 WebsocketListenerMainRepo类来发出事件,但我不确定该怎么做。
MainRepo 类:

private WebSocket ws;

public void createWsConnection() {
        Request request = new Request.Builder()
                .url(Constants.WEBSOCKET_ENDPOINT)
                .addHeader(Constants.WEBSOCKET_HEADERS_KEY, Constants.USER_ID)
                .build();

        OkHttpClient client = new OkHttpClient
                .Builder()
                .pingInterval(30, TimeUnit.SECONDS)
                .build();

        this.ws = client.newWebSocket(request, webSocketListener);
    }
这就是我感到困惑的地方。我不知道如何将 websocket 与 RxJava observable 一起使用。
public Observable<String> createListener(){
        return Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) {
                 //I don't know what to put here in order to emit messages
                 //back to my ViewModel class using the websocket listener
            }
        });
    }
websocket监听器:
 private WebSocketListener webSocketListener = new WebSocketListener() {

        @Override
        public void onOpen(@NotNull WebSocket webSocket, Response response) {
            Timber.d("Ws connection opened...", response.toString());
        }

        @Override
        public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
            Timber.d("Ws connection closing...");
        }

        @Override
        public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
            Timber.d("Ws connection closed...");
        }

        @Override
        public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
            Timber.d("Ws incoming message.");

        }

        @Override
        public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, Response response) {
            Timber.e(t, "Ws connection failure.", response.toString());

        }
    };
ViewModel 类中的一个函数正在观察我的 MainRepo 类中的 Observable:
public void connectToWs(){
        mainRepo.createListener()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Timber.d("Subscribed");
            }

            @Override
            public void onNext(@NonNull String s) {
                Timber.d("Message: " + s);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Timber.e(e, "Something went wrong.");
            }

            @Override
            public void onComplete() {
                Timber.d("On complete.");
            }
        });
    }

最佳答案

创建 PublishSubject并更改您的createListener返回方法:

private PublishSubject<String> publishSubject = PublishSubject.create<String>();

public Observable<String> createListener(){
    return publishSubject;
}
PublishSubject 是一个 Observable,因此请注意您不需要更改方法签名,但我建议您将方法名称重命名为 observeMessages .
然后在您的 websocket 监听器中,您可以使用 onNext 将消息发送到 PublishSubject方法。您也应该调用 onComplete在 onClosed 方法和 onError在 onFailure 方法中:
 private WebSocketListener webSocketListener = new WebSocketListener() {

        @Override
        public void onOpen(@NotNull WebSocket webSocket, Response response) {
            Timber.d("Ws connection opened...", response.toString());
        }

        @Override
        public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
            Timber.d("Ws connection closing...");
        }

        @Override
        public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
            Timber.d("Ws connection closed...");

            publishSubject.onComplete();
        }

        @Override
        public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
            Timber.d("Ws incoming message.");

            publishSubject.onNext(text);
        }

        @Override
        public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, Response response) {
            Timber.e(t, "Ws connection failure.", response.toString());

            publishSubject.onError(t);
        }
    };

关于android - RxJava + Websocket - 如何将 Observable 添加到 Websocket 监听器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67782867/

相关文章:

java - 在 RxJava 中同时从 Socket 读取和写入

android - 使用 .just() 运算符时如何添加延迟

java - 如何使用通知播放多个闹钟?

java - PublishSubject 在 `onError()` 之后停止发射

android - 获取android默认浏览器的url

android - 更新到 Gradle 3.0.0-alpha1 时没有静态方法 lambda$onGoogleApiClientReady$0

android - 返回类型为 () -> MultipartBody.Part!而不是 MultipartBody.Part

java - 为什么map被调用多次?

android - 我怎样才能知道 onDraw 何时完成?

android - Volley - POST/GET 参数