android - 如何执行 "RXJava 2 queue jobs"直到值改变?

标签 android sockets rx-java2

我在我的 Android 应用程序中使用 RXJava2,我有一个有点奇怪的场景。 我想执行未知数量的作业(由用户确定),但我希望它们仅在特定值更改后才开始。

我的具体要求是使用套接字进行服务器通信, 流程如下:

  1. 用户请求一些数据 - 通过向套接字发送数据并等待响应来检索数据。
  2. 与服务器通信的模块应该打开一个Socket连接,只有建立连接后,才可以获取请求的数据。
  3. 当 Socket 尝试连接时,用户可能会请求更多数据。
  4. 成功建立连接后,模块应执行用户在连接过程中发送的所有请求。
  5. 模块还应该发布发送到套接字的每个数据的结果。

这如何使用 RXJava2 完成?

最佳答案

您可以为队列部分使用 UnicastSubject,并在连接建立后执行一些 flatMap-ping:

UnicastSubject<String> userRequests = new UnicastSubject.create();

Single.fromCallable(() -> new Socket("server", port))
  .subscribeOn(Schedulers.io())
  .flatMapObservable(socket -> {

      InputStream in = socket.getInputStream();
      OutputStream out = socket.getOutputStream();
      byte[] responseBuffer = new byte[4096];

      return userRequests
         .observeOn(Schedulers.io())
         .map(request -> {

             out.write(request.getBytes());

             int n = in.read(responseBuffer);
             if (n >= 0) {
                 return new String(responseBuffer, 0, n);
             }

             throw new IOException("Socket closed while waiting for response");
         })
         .doFinally(() -> socket.close());
  });

由于您在 Socket 级别上工作,因此您有责任计算出要写入的请求的正确编码和要读取的响应的正确解码(即,响应的长度(以字节为单位)针对特定请求)。

关于android - 如何执行 "RXJava 2 queue jobs"直到值改变?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48168776/

相关文章:

android - 找不到 Gradle DSL 方法 : 'compileSdkVersion()' after importing a module into the project

Android AlarmManager - 安排一个重复的 Intent 每天触发两次

java - 如何在java中从输入流读取png图像

c# - 从 Unity3D 调用 Java 函数

java - AsyncTask 中的某些内容阻塞了 UI - 导致界面短暂停止

c - Posix AIO 损坏/损坏?

node.js - 如何在 node.js http 模块超时时发送响应

android - 在 Activity 之间保持蓝牙连接?

java - RxJava2 中的条件完成

java - RxJava2 批处理项