java - Vertx 上的并发

标签 java vert.x

我已经加入了其中一位 Vertx 爱好者,但是单线程主机可能不适合我,因为在我的服务器中可能同时有 50 个文件下载请求,作为解决方案,我创建了这个类

public abstract T onRun() throws Exception;
public abstract void onSuccess(T result);
public abstract void onException();

private static final int poolSize = Runtime.getRuntime().availableProcessors();
private static final long maxExecuteTime = 120000;

private static WorkerExecutor mExecutor;

private static final String BG_THREAD_TAG = "BG_THREAD";

protected RoutingContext ctx;

private boolean isThreadInBackground(){
    return Thread.currentThread().getName() != null && Thread.currentThread().getName().equals(BG_THREAD_TAG);
}

//on success will not be called if exception be thrown
public BackgroundExecutor(RoutingContext ctx){

    this.ctx = ctx;

    if(mExecutor == null){
        mExecutor = MyVertxServer.vertx.createSharedWorkerExecutor("my-worker-pool",poolSize,maxExecuteTime);
    }

    if(!isThreadInBackground()){

    /** we are unlocking the lock before res.succeeded , because it might take long and keeps any thread waiting */

    mExecutor.executeBlocking(future -> {
        try{

              Thread.currentThread().setName(BG_THREAD_TAG);

              T result = onRun();
              future.complete(result);

        }catch (Exception e) {
            GUI.display(e);
            e.printStackTrace();
            onException();
            future.fail(e);
        }

              /** false here means they should not be parallel , and will run without order multiple times on same context*/
            },false, res -> {

                if(res.succeeded()){
                    onSuccess((T)res.result());
                }

            });

    }else{


        GUI.display("AVOIDED DUPLICATE BACKGROUND THREADING");
        System.out.println("AVOIDED DUPLICATE BACKGROUND THREADING");

        try{

              T result = onRun();

              onSuccess((T)result);

        }catch (Exception e) {
            GUI.display(e);
            e.printStackTrace();
            onException();          
        }
    }

}

允许处理程序扩展它并像这样使用它

public abstract class DefaultFileHandler implements MyHttpHandler{

public abstract File getFile(String suffix);

@Override
public void Handle(RoutingContext ctx, VertxUtils utils, String suffix) {
    new BackgroundExecutor<Void>(ctx) {

        @Override
        public Void onRun() throws Exception {

            File file = getFile(URLDecoder.decode(suffix, "UTF-8"));

            if(file == null || !file.exists()){
                utils.sendResponseAndEnd(ctx.response(),404);
                return null;
            }else{
                utils.sendFile(ctx, file);
            }
            return null;
        }

        @Override
        public void onSuccess(Void result) {}

        @Override
        public void onException() {
            utils.sendResponseAndEnd(ctx.response(),404);
        }
    };
}

这是我初始化 vertx 服务器的方法

vertx.deployVerticle(MainDeployment.class.getCanonicalName(),res -> {
          if (res.succeeded()) {

                GUI.display("Deployed");

              } else {
                res.cause().printStackTrace();
              }
            });

    server.requestHandler(router::accept).listen(port);

这是我的 MainDeployment 类

public class MainDeployment extends AbstractVerticle{

  @Override
  public void start() throws Exception {

    // Different ways of deploying verticles

    // Deploy a verticle and don't wait for it to start

   for(Entry<String, MyHttpHandler> entry : MyVertxServer.map.entrySet()){
       MyVertxServer.router.route(entry.getKey()).handler(new Handler<RoutingContext>() {

            @Override
            public void handle(RoutingContext ctx) {

                String[] handlerID = ctx.request().uri().split(ctx.currentRoute().getPath());

                String suffix = handlerID.length > 1 ? handlerID[1] : null;
                entry.getValue().Handle(ctx, new VertxUtils(), suffix);

            }
        });
   }

  }
}

这在我需要的时间和地点工作得很好,但我仍然想知道是否有更好的方法来处理 vertx 上的并发,如果有的话,我将不胜感激。非常感谢

最佳答案

我不完全理解您的问题以及解决方案的原因。为什么不实现一个 verticle 来处理 http 上传并多次部署它呢?我认为处理 50 个并发上传对于 vert.x 来说应该是小菜一碟。

When deploying a verticle using a verticle name, you can specify the number of verticle instances that you want to deploy:

DeploymentOptions options = new DeploymentOptions().setInstances(16); vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", options);

This is useful for scaling easily across multiple cores. For example you might have a web-server verticle to deploy and multiple cores on your machine, so you want to deploy multiple instances to take utilise all the cores.

http://vertx.io/docs/vertx-core/java/#_specifying_number_of_verticle_instances

关于java - Vertx 上的并发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42791093/

相关文章:

java - ComboBox 和计算出现问题

java - 如何在 Git/EGit 上进行简单的还原工作?

java - 将java包导入Android

vert.x - 在 vert.x 中,在运行时创建多个 HttpServer 实例有意义吗?

cluster-computing - Vertx 聚类替代方案

java - 如何使用 Hibernate/JPA 注释覆盖 GenerationType 策略?

java - 在 Java 中获取一个数组的值并使它们成为另一个数组的索引

java - 在 Vert.x 3 这样的非阻塞 I/O 环境中使用吗啡的正确方法

java - 如何从 Cloud Run Java 应用程序连接到 Cloud SQL 数据库?

java - 我如何在单元测试中使用 Mockito 或任何 Mocking 框架来模拟 Guice 注入(inject)?