java - 顶点抛出 IllegalStateException : Response has already been written

标签 java asynchronous vert.x

我正在构建一个 rest api,它从其他 api 收集数据,用它做一些逻辑并发送回客户端:

我的主课:

public class Main {

public static void main(String[] args) {

    Vertx.vertx().deployVerticle(RestVerticle.class.getName());
}

这是我的 RestVerticle:

public class RestVerticle extends AbstractVerticle {

public static final String API_V1 = "/api/v1";

private Map<String, JsonObject> products = new HashMap<>();

@Override
public void start() {

    Router router = Router.router(vertx);

    router.route().handler(BodyHandler.create());
    router.get(API_V1 + "/business/all").handler(this::getAllBusinesses);

    vertx.createHttpServer().requestHandler(router::accept).listen(8080);
}

private void getAllBusinesses(RoutingContext routingContext) {

    vertx.deployVerticle(YelpClientVerticle.class.getName());
    MessageConsumer<String> consumer = vertx.eventBus().consumer("api");

    consumer.handler(message -> {
        JsonObject m = new JsonObject((String) message.body());
        System.out.println("Received message: " + message.body());
        routingContext.response().putHeader("content-type", "application/json").end(m.encodePrettily());

    });
}

这是我的 httpclient ,它调用 Yelp API:

public class YelpClientVerticle extends AbstractVerticle {

private static final String API_HOST = "api.yelp.com";
private static final String DEFAULT_TERM = "dinner";
private static final String DEFAULT_LOCATION = "San Francisco, CA";
private static final int SEARCH_LIMIT = 20;
private static final String SEARCH_PATH = "/v2/search";
private static final String BUSINESS_PATH = "/v2/business";

/*
 * Update OAuth credentials below from the Yelp Developers API site:
 * http://www.yelp.com/developers/getting_started/api_access
 */
private static final String CONSUMER_KEY = "XXXXX";
private static final String CONSUMER_SECRET = "XXXXX";
private static final String TOKEN = "XXXXX";
private static final String TOKEN_SECRET = "XXXXX";

OAuthService service;
Token accessToken;

/**
 * Setup the Yelp API OAuth credentials.
 *
 * @param consumerKey    Consumer key
 * @param consumerSecret Consumer secret
 * @param token          Token
 * @param tokenSecret    Token secret
 */

/**
 * Creates and sends a request to the Search API by term and location.
 * <p>
 * See <a href="http://www.yelp.com/developers/documentation/v2/search_api">Yelp Search API V2</a>
 * for more info.
 *
 * @param term     <tt>String</tt> of the search term to be queried
 * @param location <tt>String</tt> of the location
 * @return <tt>String</tt> JSON Response
 */
public String searchForBusinessesByLocation(String term, String location) {
    OAuthRequest request = createOAuthRequest(SEARCH_PATH);
    request.addQuerystringParameter("term", term);
    request.addQuerystringParameter("location", location);
    request.addQuerystringParameter("limit", String.valueOf(SEARCH_LIMIT));
    return sendRequestAndGetResponse(request);
}

/**
 * Creates and sends a request to the Business API by business ID.
 * <p>
 * See <a href="http://www.yelp.com/developers/documentation/v2/business">Yelp Business API V2</a>
 * for more info.
 *
 * @param businessID <tt>String</tt> business ID of the requested business
 * @return <tt>String</tt> JSON Response
 */
public String searchByBusinessId(String businessID) {
    OAuthRequest request = createOAuthRequest(BUSINESS_PATH + "/" + businessID);
    return sendRequestAndGetResponse(request);
}

/**
 * Creates and returns an {@link OAuthRequest} based on the API endpoint specified.
 *
 * @param path API endpoint to be queried
 * @return <tt>OAuthRequest</tt>
 */
private OAuthRequest createOAuthRequest(String path) {
    OAuthRequest request = new OAuthRequest(Verb.GET, "https://" + API_HOST + path);
    return request;
}

/**
 * Sends an {@link OAuthRequest} and returns the {@link Response} body.
 *
 * @param request {@link OAuthRequest} corresponding to the API request
 * @return <tt>String</tt> body of API response
 */
private String sendRequestAndGetResponse(OAuthRequest request) {
    System.out.println("Querying " + request.getCompleteUrl() + " ...");
    this.service.signRequest(this.accessToken, request);
    Response response = request.send();
    return response.getBody();
}

/**
 * Queries the Search API based on the command line arguments and takes the first result to query
 * the Business API.
 *
 * @param yelpApiCli <tt>YelpAPICLI</tt> command line arguments
 */
private String queryAPI(YelpAPICLI yelpApiCli) {
    String searchResponseJSON =
            searchForBusinessesByLocation(yelpApiCli.term, yelpApiCli.location);

    JSONParser parser = new JSONParser();
    JSONObject response = null;
    try {
        response = (JSONObject) parser.parse(searchResponseJSON);
    } catch (ParseException pe) {
        System.out.println("Error: could not parse JSON response:");
        System.out.println(searchResponseJSON);
        System.exit(1);
    }

    JSONArray businesses = (JSONArray) response.get("businesses");
    JSONObject firstBusiness = (JSONObject) businesses.get(0);
    String firstBusinessID = firstBusiness.get("id").toString();
    System.out.println(String.format(
            "%s businesses found, querying business info for the top result \"%s\" ...",
            businesses.size(), firstBusinessID));

    // Select the first business and display business details
    String businessResponseJSON = searchByBusinessId(firstBusinessID.toString());
    System.out.println(String.format("Result for business \"%s\" found:", firstBusinessID));
    System.out.println(businessResponseJSON);

    return businessResponseJSON;
}

/**
 * Command-line interface for the sample Yelp API runner.
 */
private static class YelpAPICLI {
    @Parameter(names = {"-q", "--term"}, description = "Search Query Term")
    public String term = DEFAULT_TERM;

    @Parameter(names = {"-l", "--location"}, description = "Location to be Queried")
    public String location = DEFAULT_LOCATION;
}

@Override
public void start() throws Exception {
    // Note! in real-life you wouldn't often set trust all to true as it could leave you open to man in the middle attacks.
    this.service =
            new ServiceBuilder().provider(TwoStepOAuth.class).apiKey(CONSUMER_KEY)
                    .apiSecret(CONSUMER_SECRET).build();
    this.accessToken = new Token(TOKEN, TOKEN_SECRET);
    YelpAPICLI yelpApiCli = new YelpAPICLI();
    new JCommander(yelpApiCli);


    String response = queryAPI(yelpApiCli);
    vertx.eventBus().send("api", response);
}

我遇到了 2 个问题。

第一个问题是 Yelp 客户端处理请求的时间太长,它阻塞了主线程并发出警告:

Jan 04, 2016 1:34:30 AM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-eventloop-thread-4,5,main] has been blocked for 3151 ms, time limit is 2000

第二个问题是,在处理完第一个请求后,例如,我第一次访问我的 localhost:8080/api/v1/business/all ,请求成功返回,但下一次再次点击 URL时间,它抛出这样的异常:

Jan 04, 2016 1:34:30 AM io.vertx.core.eventbus.impl.HandlerRegistration
SEVERE: Failed to handleMessage
java.lang.IllegalStateException: Response has already been written

我该如何解决这两个问题?

最佳答案

问题是您在每个 请求时都在做所有工作——启动 Yelp Verticle,并在事件总线上注册消费者。这不可能是你想要的。

所以,我认为正在发生的事情:

  1. 您向您的休息 API 发出请求。
  2. getAllBusinesses() 方法被执行。
  3. 启动 YelpClientVerticle
  4. 处理程序在 api 端点上注册,它将写入响应。
  5. 您的 YelpClientVerticle 做了很多阻塞工作 - 这就是您收到 BlockedThreadChecker 警告的原因。
  6. 最后,Yelp 请求返回并通过事件总线发送一条消息,然后该消息被写入响应。

  7. 您提出另一个请求

  8. 转到 2

您的问题是,对于每个请求,您都会启动另一个 YelpClientVerticle,并注册另一个处理程序来监听相同 EventBus端点地址。

让多个处理程序监听同一个 EventBus 地址 是完全可以接受的。发生这种情况时,Vert.x 会以 RoundRobin 方式选择其中一个处理程序。

我猜测在第二个请求 中,Vertx 正在选择第一个处理程序,并尝试写入您已经写入的第一个请求的响应。因此错误。

我会尝试将 YelpClientVerticle 的部署移动到 RestVerticle 的启动中 - 那么您将只有一个实例。

您可能想要切换发送者/消费者,因此您发送 一条消息到 YelpClientVerticle,然后它会回复。

您可能还想阅读有关 Running Blocking Code 的文档,因为您的 Yelp 客户端看起来正在阻塞。

希望对你有帮助

关于java - 顶点抛出 IllegalStateException : Response has already been written,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34580170/

相关文章:

java - 从邮箱发送邮件不会保存在已发送项目中

optimization - 为什么 Java API 使用 int 而不是 short 或 byte?

java - 将对象转换为json控制字段名称大写首字母

java - java中不同http客户端超时时间单位的意义

java - 用于创建大数据 View 的轻量级 Java 解决方案

javascript - 如何从 for 语句中的异步回调获取结果

c++ - 与 std::future 关联的存储是如何分配的?

java - 使用 Quarkus Reactive MySQL Clients/io.vertx.mysqlclient 进行服务器故障转移

docker-compose - java.net.UnknownHostException : failed to resolve 'inventory-microservice' . 超过每个解析的最大查询数 3

c++ - cURL:处理多个异步请求