java - 多线程 + RxJava 条件观察

标签 java spring multithreading rx-java

我有一个长时间运行的任务,结果会生成常规文件和一个列出其他文件的主文件。

调度程序通过 cron 每天重新生成一次此文件。

任务流程是使用rx-java实现的。

问题是,如果一个请求进入并启动任务,或者任务由调度程序运行,然后在任务正在进行时,其他请求到来,并且不等待任务完成,而是触发另一个执行。

所以问题是如何同步任务执行,以便只执行一次?

这是示例代码:

@Service
public class FileService {
    @Autowired FileRepository fileRepository;
    @Autowired List<Pipeline> pipelines;

    public Observable<File> getMainFile() {
        if (fileRepository.isMainFileExists())
            return Observable.just(fileRepository.getMainFile());
        else
            return generate(() -> fileRepository.getMainFile());
    }

    public Observable<File> getFile(String fileName) {
        if (fileRepository.isMainFileExists())
            return Observable.just(fileRepository.getFile(fileName));
        else
            return generate(() -> fileRepository.getFile(fileName));
    }

    Observable<File> generate(Func0<File> whenGenerated) {
        return Observable.from(pipelines)
                // other business logic goes here
                // after task execution finished just get needed file
                .map(isAllPipelinesSuccessful -> {
                    return whenGenerated.call();
                });
    }

    @Scheduled(cron = "0 0 4 * * ?")
    void scheduleGeneration() {
        generate(() -> fileRepository.getMainFile()).subscribe();
    }
}

它是从 Controller 调用的,示例代码如下:

@RestController
public class FileController {
    private static final Long TIMEOUT = 1_000 * 60 * 10L; //ten mins
    @Autowired FileService fileService;

    @RequestMapping(value = "/mainfile", produces = "application/xml")
    public DeferredResult<ResponseEntity<InputStreamResource>> getMainFile() {
        DeferredResult<ResponseEntity<InputStreamResource>> deferredResult = new DeferredResult<>(TIMEOUT);
        Observable<File> observableMainFile = fileService.getMainFile();
        observableMainFile
                .map(this::fileToInputStreamResource)
                .map(resource -> ResponseEntity.ok().cacheControl(CacheControl.maxAge(1, TimeUnit.HOURS).cachePublic()).body(resource))
                .subscribe(deferredResult::setResult, ex -> {
                deferredResult.setErrorResult(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(null));
                });
        return deferredResult;
    }
    @RequestMapping(value = "/files/{filename:.+}", produces = "application/xml")
    public DeferredResult<ResponseEntity<InputStreamResource>> getFile(@PathVariable("filename") String filename) {
        DeferredResult<ResponseEntity<InputStreamResource>> deferredResult = new DeferredResult<>(TIMEOUT);
        Observable<File> observableFile = fileService.getFile(filename);
        observableFile
                .map(this::fileToInputStreamResource)
                .map(resource -> ResponseEntity.ok().cacheControl(CacheControl.maxAge(1, TimeUnit.HOURS).cachePublic()).body(resource))
                .subscribe(deferredResult::setResult, ex -> {
                    boolean isFileNotFound = FileNotFoundException.class.isInstance(ex.getCause());
                    HttpStatus status = isFileNotFound ? HttpStatus.NOT_FOUND : HttpStatus.INTERNAL_SERVER_ERROR;
                    deferredResult.setErrorResult(ResponseEntity.status(status).body(null));
                });
        return deferredResult;
    }
}

最佳答案

我有类似以下的内容,但我认为有更好的解决方案。我正在使用 RxJava2-RC5。

  1. 答案缺少检查,该任务已执行。 https://gist.github.com/anonymous/7b4717cea7ddce270a2e39850a3bd2a4

更新::

interface FileRepository {
        String getFile();

        Boolean isMainFileExists();
}

private static Scheduler executorService = Schedulers.from(Executors.newFixedThreadPool(1));

@org.junit.Test
public void schedulerTest123() throws Exception {
        FileRepository fRepo = mock(FileRepository.class);

        when(fRepo.getFile()).thenReturn("");
        when(fRepo.isMainFileExists()).thenReturn(false);

        Thread t1 = new Thread(() -> {
            getFile(fRepo, executorService).subscribe();
        });

        Thread t2 = new Thread(() -> {
            getFile(fRepo, executorService).subscribe();
        });

        t1.start();
        t2.start();

        Thread.sleep(3_000);

        when(fRepo.getFile()).thenReturn("DasFile");
        when(fRepo.isMainFileExists()).thenReturn(true);

        Thread t3 = new Thread(() -> {
            getFile(fRepo, executorService).subscribe();
        });

        t3.start();

        Thread.sleep(5_000);
}

private Observable<String> getFile(FileRepository fileRepo, Scheduler scheduler) {
        return Observable.defer(() -> {
            try {
                if (fileRepo.isMainFileExists()) {
                    return Observable.fromCallable(fileRepo::getFile)
                            .subscribeOn(Schedulers.io())
                            .doOnNext(s -> printCurrentThread("Get File from Repo"));
                } else {
                    return startLongProcess().doOnNext(s -> printCurrentThread("Push long processValue"));
                }

            } catch (Exception ex) {
                return Observable.error(ex);
            }
        }).subscribeOn(scheduler).doOnSubscribe(disposable -> printCurrentThread("SUB"));
    }

private Observable<String> startLongProcess() {
        return Observable.fromCallable(() -> {
            printCurrentThread("Doing LongProcess");

            Thread.sleep(5_000);

            return "leFile";
        });
}

private void printCurrentThread(String additional) {
        System.out.println(additional + "_" + Thread.currentThread());
}

关于java - 多线程 + RxJava 条件观察,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40236772/

相关文章:

java - 如果我使用response.sendRedirect,如何设置 header

Java泛型结合接口(interface)继承

java - 使用 appengine-maven-plugin 在 Eclipse 中的本地 DevAppServer 上使用 GAE HTTPS

spring - 测试@TransactionalEvents和@Rollback

java - 谁能告诉我学习 spring 的最好方法是什么

java - 简单的守护线程不输出消息

java - 多线程和继承

java - [Android]Object cannot be cast to java.lang.String Asynctask

java - File.canRead() 返回 false

java - 自定义任务执行器或 "am I reinventing the wheel?"