我有一个 ViewModel
正在观察 RxJava Observable
在我的MainRepo
类(class)。我正在尝试获取我的 WebsocketListener
在 MainRepo
类来发出事件,但我不确定该怎么做。
MainRepo 类:
private WebSocket ws;
public void createWsConnection() {
Request request = new Request.Builder()
.url(Constants.WEBSOCKET_ENDPOINT)
.addHeader(Constants.WEBSOCKET_HEADERS_KEY, Constants.USER_ID)
.build();
OkHttpClient client = new OkHttpClient
.Builder()
.pingInterval(30, TimeUnit.SECONDS)
.build();
this.ws = client.newWebSocket(request, webSocketListener);
}
这就是我感到困惑的地方。我不知道如何将 websocket 与 RxJava observable 一起使用。public Observable<String> createListener(){
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) {
//I don't know what to put here in order to emit messages
//back to my ViewModel class using the websocket listener
}
});
}
websocket监听器: private WebSocketListener webSocketListener = new WebSocketListener() {
@Override
public void onOpen(@NotNull WebSocket webSocket, Response response) {
Timber.d("Ws connection opened...", response.toString());
}
@Override
public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
Timber.d("Ws connection closing...");
}
@Override
public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
Timber.d("Ws connection closed...");
}
@Override
public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
Timber.d("Ws incoming message.");
}
@Override
public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, Response response) {
Timber.e(t, "Ws connection failure.", response.toString());
}
};
ViewModel 类中的一个函数正在观察我的 MainRepo 类中的 Observable:public void connectToWs(){
mainRepo.createListener()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Timber.d("Subscribed");
}
@Override
public void onNext(@NonNull String s) {
Timber.d("Message: " + s);
}
@Override
public void onError(@NonNull Throwable e) {
Timber.e(e, "Something went wrong.");
}
@Override
public void onComplete() {
Timber.d("On complete.");
}
});
}
最佳答案
创建 PublishSubject并更改您的createListener
返回方法:
private PublishSubject<String> publishSubject = PublishSubject.create<String>();
public Observable<String> createListener(){
return publishSubject;
}
PublishSubject 是一个 Observable,因此请注意您不需要更改方法签名,但我建议您将方法名称重命名为 observeMessages
.然后在您的 websocket 监听器中,您可以使用 onNext 将消息发送到 PublishSubject方法。您也应该调用 onComplete在 onClosed 方法和 onError在 onFailure 方法中:
private WebSocketListener webSocketListener = new WebSocketListener() {
@Override
public void onOpen(@NotNull WebSocket webSocket, Response response) {
Timber.d("Ws connection opened...", response.toString());
}
@Override
public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
Timber.d("Ws connection closing...");
}
@Override
public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
Timber.d("Ws connection closed...");
publishSubject.onComplete();
}
@Override
public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
Timber.d("Ws incoming message.");
publishSubject.onNext(text);
}
@Override
public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, Response response) {
Timber.e(t, "Ws connection failure.", response.toString());
publishSubject.onError(t);
}
};
关于android - RxJava + Websocket - 如何将 Observable 添加到 Websocket 监听器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67782867/