想要
许多线程将进行数据库调用并阻塞,以提高和扩展性能。
问题:
- 标准 Java 可完成的 future API does not work well with blocking/IO tasks, even when using ManagedBlocker.
- 如果使用不存在此问题的库,同时出现太多异步请求至少会出现 1 个扩展问题:
- 同时创建过多的线程可能会由于每个线程需要多少内存而导致内存不足错误。并且没有好的默认 ThreadPoolExecutors 允许设置线程池参数,例如最大线程数,然后为传入任务提供队列系统,以便在线程可用之前等待。
示例
我想要扩展一个需要发出 3000 个异步数据库请求的程序。我不想一次性发出 3000 个请求,而是希望在任何给定时间将其限制为 50 个,并将剩余的 2950 个请求排队,然后在任务完成时一次处理剩余的 2950 个请求。理想情况下,我想使用现有的库来使用新的自定义代码重新发明它,因为我假设有一种方法可以做到这一点,但我不确定如何使用不断出现的各种异步 Java SDK 的 API出来。
最佳答案
我认为有几种方法可以解决无限线程池的问题。正如其他人指出的,一种是从有界线程池支持的执行器创建 RxJava 调度程序。这非常简单,而且很可能是最好的方法。
但是,我确实想指出,RxJava 的“并行化”运算符(flatMap、concatMapEager)还有一个可选的 maxConcurrency 运算符,它允许我们将给定 Rx 管道中的泳道数量与用于执行的调度程序解耦它。
这是一个假设的示例,假设我们有一个执行阻塞查询的数据访问对象。在本例中,它仅 hibernate 1 秒并返回查询本身并附加时间戳:
public class MyDao
{
public Object blockingGetData( String query ) throws InterruptedException
{
Thread.sleep( 1000 );
return query.toUpperCase() + " - " + new Date().toString();
}
}
接下来,让我们将 DAO 包装在一个异步服务中,该服务维护一个 Rx 管道,其中每个元素代表一个查询及其异步结果:
public class MyService
{
private class QueryHolder
{
final String query;
final Subject<Object> result;
public QueryHolder( String query, Subject<Object> result )
{
this.query = query;
this.result = result;
}
}
private static final int MAX_CONCURRENCY = 2;
private final Subject<QueryHolder> querySubject;
private final MyDao dao;
public MyService()
{
dao = new MyDao();
querySubject = PublishSubject.<QueryHolder>create().toSerialized();
querySubject
.flatMap(
// For each element in the pipeline, perform blocking
// get on IO Scheduler, populating the result Subject:
queryHolder -> Observable.just( queryHolder )
.subscribeOn( Schedulers.io() )
.doOnNext( __ -> {
Object data = dao.blockingGetData( queryHolder.query );
queryHolder.result.onNext( data );
queryHolder.result.onComplete();
} ),
// With max concurrency limited:
MAX_CONCURRENCY )
.subscribe();
}
public Single<Object> getData( String query )
{
Subject<Object> result = AsyncSubject.create();
// Emit pipeline element:
querySubject.onNext( new QueryHolder( query, result ));
return result.firstOrError();
}
}
我建议您在谷歌上搜索不同的主题类型和运算符等 - 有大量可用文档。
简单的手动测试:
@Test
public void testService() throws InterruptedException
{
MyService service = new MyService();
// Issue 20 queries immediately, printing the results when they complete:
for ( int i = 0; i < 20; i++ )
{
service.getData( "query #" + i )
.subscribe( System.out::println );
}
// Sleep:
Thread.sleep( 11000 );
}
输出:
QUERY #0 - Wed Mar 11 11:08:21 EDT 2020
QUERY #1 - Wed Mar 11 11:08:21 EDT 2020
QUERY #2 - Wed Mar 11 11:08:22 EDT 2020
QUERY #3 - Wed Mar 11 11:08:22 EDT 2020
QUERY #4 - Wed Mar 11 11:08:23 EDT 2020
QUERY #5 - Wed Mar 11 11:08:23 EDT 2020
QUERY #6 - Wed Mar 11 11:08:24 EDT 2020
QUERY #7 - Wed Mar 11 11:08:24 EDT 2020
QUERY #8 - Wed Mar 11 11:08:25 EDT 2020
QUERY #9 - Wed Mar 11 11:08:25 EDT 2020
QUERY #10 - Wed Mar 11 11:08:26 EDT 2020
QUERY #11 - Wed Mar 11 11:08:26 EDT 2020
QUERY #12 - Wed Mar 11 11:08:27 EDT 2020
QUERY #13 - Wed Mar 11 11:08:27 EDT 2020
QUERY #14 - Wed Mar 11 11:08:28 EDT 2020
QUERY #15 - Wed Mar 11 11:08:28 EDT 2020
QUERY #16 - Wed Mar 11 11:08:29 EDT 2020
QUERY #17 - Wed Mar 11 11:08:29 EDT 2020
QUERY #18 - Wed Mar 11 11:08:30 EDT 2020
QUERY #19 - Wed Mar 11 11:08:30 EDT 2020
关于java - 如何从Java SDK或类似的SDK(即: rxJava,项目 react 器)获得可扩展的I/O绑定(bind)异步多线程?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60493245/