java - 创建操作作业的队列

标签 java android queue rx-java2

我想构建一个线程来监听队列,并在每当我将项目添加到队列时执行作业。

但我不太清楚如何开发它。我已经尝试过 RxJava2 中的一些 Flowable 示例,但不知道如何做到这一点。

我对 Android 和 Java 中的所有示例持开放态度,也许消息处理程序或执行器将是一个简单的解决方案。可惜没有专业知识。特别是 RxJava2 会很棒。

更新

换句话说,我想在其上构建一个队列机制,因为长日志显示为单独的,并且每当其中两个在附近调用时,计时就会使它们混合。

public final class Logcat {

   private static final String TAG = "HOWDY";

   public static void v(String message) {
       Log.v(TAG, message);
   }

   public static void d(String message) {
       Log.d(TAG, message); 
       //TODO I will add a for-loop later for long messages to make sure to show all of them for each method.
   }

   public static void e(Throwable throwable) {
       Log.e(TAG, throwable.getMessage());
   }

   public static void e(String message) {
       Log.e(TAG, message);
   }

   public static void e(ApiError error) {
       Log.e(TAG, error.message);
   }
}

最佳答案

好的,这就是我要怎么做的..

import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;

public class DemoRxJava2 {

    public static void testWithQueue() {

        CompletableFuture<String> allDone = new CompletableFuture<>();
        AtomicBoolean submitDone = new AtomicBoolean(Boolean.FALSE);
        final Queue<Long> queue = new ConcurrentLinkedQueue<>();

        Observable.interval(2, TimeUnit.SECONDS)
        .takeWhile(tick -> !queue.isEmpty() || !submitDone.get())
        .flatMap(tick -> {
            return Observable.create(sub -> {
                while (!queue.isEmpty()) {
                    sub.onNext(queue.poll());
                }
                sub.onComplete();
            });
        })
        .subscribeOn(Schedulers.single())
        .doOnSubscribe(dis -> System.out.println("Queue processing active"))
        .doOnComplete(() -> {
            System.out.println("Queue processing done");
            allDone.complete("DONE");
        })
        .subscribe(nextTs -> System.out.printf("[%s] : Processing tx : %d\n", Thread.currentThread().getName(), nextTs));

        Observable.interval(1,TimeUnit.SECONDS)
        .take(10)
        .doOnSubscribe(dis -> System.out.println("Job submitter start"))
        .doOnNext(tick -> {
            long ms = System.currentTimeMillis() / 1000;
            System.out.printf("[%s] : Submitting tx : %d\n", Thread.currentThread().getName(), ms);
            queue.add(ms);
        })
        .doOnComplete(() -> submitDone.set(Boolean.TRUE))
        .blockingSubscribe();

        try {
            allDone.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

    public static void testWithSubject() {

        CompletableFuture<String> allDone = new CompletableFuture<>();

        PublishSubject<Long> queue = PublishSubject.create();

        queue.observeOn(Schedulers.single())
        .flatMap(tx -> Observable.just(tx).delay(2, TimeUnit.SECONDS))
        .doOnSubscribe(dis -> System.out.println("Queue processing active"))
        .doOnComplete(() -> allDone.complete("DONE"))
        .subscribe(nextTs -> System.out.printf("[%s] : Processing tx : %d\n", Thread.currentThread().getName(), nextTs));

        Observable.interval(1, TimeUnit.SECONDS)
        .take(10)
        .doOnSubscribe(dis -> System.out.println("Job submitter start"))
        .doOnNext(tick -> {
            long ms = System.currentTimeMillis() / 1000;
            System.out.printf("[%s] : Submitting tx : %d\n", Thread.currentThread().getName(), ms);
            queue.onNext(ms);
        })
        .doOnComplete(() -> queue.onComplete())
        .blockingSubscribe();

        //wait until all done
        try {
            allDone.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        testWithQueue();
        testWithSubject();
    }
}

这只是演示如何使用 RxJava 在单独的线程中处理对象队列,请根据您的需要进行调整

关于java - 创建操作作业的队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48459754/

相关文章:

用于阅读和编辑 PDF 的 Android API

.net - 使用 Amazon SQS 和 SNS 设计推送队列,如何?

events - 如何在 laravel 中使用 Event::queue?

Java强制转换方法

java - 在另一个应用程序中从 edittext 读取/写入文本

android - 如何在 API 22 中管理 Cookie

Python threading.Thread 只能通过私有(private)方法 self.__Thread_stop() 停止

java - JPA 标准 API 函数转换或转换

java - @ApiParam 不适用于 springfox swagger 2

android - 如何在加载数据之前在flutter中显示进度条