java - 从异步处理程序返回值到包含方法

标签 java vert.x

给定以下两个类:

public class Test {

    public void call() {
        MyClass myClass = new MyClass();

        myClass.methodOne().subscribe(v -> {...});
    }
}

public class MyClass {

    public Observable<String> methodOne() {
        Observable<String> response =  Observable.fromFuture(this.methodTwo());
        return response;
    }

    public CompletableFuture<String> methodTwo() {
        CompletableFuture<String> response = new CompletableFuture<>();
        KafkaProducerRecord<String, String> record = KafkaProducerRecord.create(...);

        response.complete("initial value");

        kafkaProducer.write(record, done -> {
            if(done.succeeded()) {
                response.complete("done");
            }
            else {
                response.complete("not done");
            };
        );

        return response;
    }
}

其中kafkaProducerio.vertx.kafka.client. Producer.impl.KafkaProducerImpl的实例。

预期的行为是,当在 MyClass.methodTwo() 中调用 response.complete() 时,response 值将为从 methodTwo() 返回到 methodOne()。然后,该值将被包装在 future 中,并将在 Test.call()subscribe 处理程序中可用。

但是,由于异步处理,methodTwo() 将始终返回在 vertx 的 write 方法之前设置的“初始值”。 kafkaProducer.

即使在稍后的某个时刻响应将在处理程序中设置为“完成”或“未完成”,该值也永远不会返回。

我尝试将 methodTwo 中的代码更改为以下内容:

AsyncResult<RecordMetadata> res = 
Single.create((SingleEmitter<AsyncResult<RecordMetadata>> emitter) ->
   producer.write(record,   result -> emitter.onSuccess(result)))
   .blockingGet();

想法是返回AsyncResult中的值,但这会无限期地阻塞。

解决这个问题的正确方法是什么?

谢谢

最佳答案

您可以使用 Vert.x Handler处理异步调用的结果。 异步调用完成后,您可以调用随结果一起传递的处理程序。

下面是与解决您的问题相关的小片段。

public class Test {

    public void call() {
        MyClass myClass = new MyClass();

        myClass.methodTwo(f-> {
            if (f.succeeded()) {
                //do something with f.result()
            }
            else {
                //handle;
            }
        });
    }
}

public class MyClass {

    public void methodTwo(Handler<AsyncResult<String>> handler) {
        KafkaProducerRecord<String, String> record = KafkaProducerRecord.create(...);

        //response.complete("initial value");

        kafkaProducer.write(record, done -> {
            if(done.succeeded()) {
                handler.handle("done");
            }
            else {
                handler.handle("not done");
            };
        );

        //return response;
    }
}

关于java - 从异步处理程序返回值到包含方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53079230/

相关文章:

java - Vertx-java-HttpClient : How to derive maxPoolSize and maxWaitQueueSize values and their impact

Java 包/库函数

java - 如何限制 JTable 中的输入?

java - 如何在Java中使用ThreadLocalRandom.current().ints(int,int)来查找与特定表达式匹配的特定数字?

java - Vert.x Web 和 session 固定

java - Vert.x IO 阻塞操作性能

java - 如何在 Eclipse FormEditor 中实现撤销/重做功能?

java - 在Java中,什么命令行可以让计算机点击屏幕?

java - Hawkular 和 Vertx : Why the new installed Hawkular Metric Server does not receive any metrics?

java - Vert.x 的 Verticle(s) JSON/YAML 配置(每个环境最好)