java - 根据我们需要调用多少页来并行化 while 循环?

标签 java multithreading performance thread-safety guava

我必须通过传递 header 和正文来发出 HTTP POST 请求。在正文中,我需要在发布数据之前提供 pageNumber,因此我最初从“1”开始。之后,我将发布数据,并收到 JSON 响应,如下所示。

{
    "response": {
        "pageNumber": 1,
        "entries": 200,
        "numberOfPages": 3
    },
    "list": [
        {
            // some stuff here
        }
    ],
    "total": 1000
}

现在,根据 pageNumber 1 的响应,我将决定还需要调用多少次电话。现在,在上面的响应中,numberOfPages 为 3,因此我需要对同一 URL 总共进行 3 次调用。由于我们已经进行了 1 次调用,我将再进行 2 次调用,正文中使用 pageNumber“2”和“3”。

下面是我的工作代码。我只需要通过更改正文来调用相同的 URL 直到 numberOfPages 次。对于每个调用,应该使用相应的 pageNumber 进行,因此如果 numberOfPages 是 3,那么我将总共进行 3 次调用。从每个页面收集数据后,我正在填充两个 map 。

public class AppParser {
  private static final ObjectMapper objectMapper = new ObjectMapper();
  private static final String lastParentIdJsonPath = "......";    
  private final Map<String, String> processToTaskIdHolder = new HashMap<>();
  private final Multimap<String, Category> itemsByCategory = LinkedListMultimap.create();
  private final int entries;
  private final String siteId;

  public AppParser(int entries, String id) {
    this.entries = entries;
    this.id = id;
    collect();
  }

  // this is only called from above constructor
  private void collect() {
    String endpoint = "url_endpoint";
    int number = 1;
    int expectedNumber;
    do {
      HttpEntity<String> requestEntity = new HttpEntity<String>(getBody(number), getHeader());
      ResponseEntity<String> responseEntity =
          HttpClient.getInstance().getClient()
              .exchange(URI.create(endpoint), HttpMethod.POST, requestEntity, String.class);
      String jsonInput = responseEntity.getBody();
      Stuff response = objectMapper.readValue(jsonInput, Stuff.class);
      expectedNumber = (int) response.getPaginationResponse().getNumberOfPages();
      if (expectedNumber <= 0) {
        break;
      }
      List<Postings> postings = response.getPostings();
      for (Postings posting : postings) {
        if (posting.getClientIds().isEmpty()) {
          continue;
        }
        List<String> lastParent = JsonPath.read(jsonInput, lastParentIdJsonPath);
        String clientId = posting.getClientIds().get(0).getId();
        Category category = getCategory(posting);
        // populate two maps now
        itemsByCategory.put(clientId, category);
        processToTaskIdHolder.put(clientId, lastParent.get(0));
      }
      number++;
    } while (number <= expectedNumber);
  }

  private String getBody(final int number) {
    Input input = new Input(entries, number, 0);
    Body body = new Body("Stuff", input);
    return gson.toJson(body);
  }

  // getters for those two above maps
}

现在我的上面的代码正在逐个按顺序收集每个页面的数据,因此如果我的 numberOfPages 很高,那么将需要一些时间来收集所有这些页码的所有数据。假设如果 numberOfPages 为 500,那么我的代码将针对每个 pageNumber 依次运行。有什么方法可以并行化上面的代码,以便我们可以同时收集 5 个页面的数据?这可以吗?我想我需要确保我的代码是线程安全的。

注意:HttpClient是线程安全的单例类。

最佳答案

我尝试使用多线程修改您的代码,但这并不容易,因为您没有提供所有导入的完整类源。此外,您的代码也不够干净。 您的任务是异步请求的常见情况。我将您的收集代码包装到 java.util.concurrent.Callable 中。它为我提供了通过 ExecutorService 异步使用任务的功能,并在需要时将结果作为 ParseResult 对象获取。在下面的代码中,我发出了 1 个请求来填充 expectedNumber 变量,循环应用程序创建任务并将它们提交到 executorService 中,并在其中运行它们的专用线程池。 代码:

private static final ObjectMapper objectMapper = new ObjectMapper();
private static final String URL_ENDPOINT = "url_endpoint";
private final Map<String, String> processToTaskIdHolder = new HashMap<>();
private final Multimap<String, Category> itemsByCategory = LinkedListMultimap.create();
private static final String lastParentIdJsonPath = "......";

class ParseResult {
    private String clientId;
    private Category category;
    private String lastParent;
    private int expectedNumber;
}

class ParseTask implements Callable<ParseResult> {

    private int pageNumber;

    public ParseTask(int pageNumber) {
        this.pageNumber = pageNumber;
    }

    @Override
    public ParseResult call() throws Exception {
        HttpEntity<String> requestEntity = new HttpEntity<String>(getBody(pageNumber), getHeader());
        ResponseEntity<String> responseEntity =
                HttpClient.getInstance().getClient()
                        .exchange(URI.create(URL_ENDPOINT), HttpMethod.POST, requestEntity, String.class);
        String jsonInput = responseEntity.getBody();
        Stuff response = objectMapper.readValue(jsonInput, Stuff.class);
        int expectedNumber = (int) response.getPaginationResponse().getNumberOfPages();
        if (expectedNumber <= 0) {
            return null; // or throw exception
        }
        List<Postings> postings = response.getPostings();
        for (Postings posting : postings) {
            if (posting.getClientIds().isEmpty()) {
                continue;
            }
            List<String> lastParent = JsonPath.read(jsonInput, lastParentIdJsonPath);
            String clientId = posting.getClientIds().get(0).getId();
            Category category = getCategory(posting);

            //collecting the result
            ParseResult parseResult = new ParseResult();
            parseResult.clientId = clientId;
            parseResult.category = category;
            parseResult.expectedNumber = expectedNumber;
            parseResult.lastParent = lastParent.get(0);
            writeResult(parseResult); // writing the result
            return parseResult;
        }
    }
}

public AppParser(int entries, String id) {
    // .....
    collect();
}

// this is only called from above constructor
private void collect() {
    int number = 1;
    int expectedNumber = 0;
    ParseTask parseTask = new ParseTask(number);
    try {
        ParseResult firstResult = parseTask.call();
        expectedNumber = firstResult.expectedNumber; // fill the pages amount
    } catch (Exception e) {
        e.printStackTrace();
    }

    ExecutorService executorService = Executors.newCachedThreadPool();
    while (number <= expectedNumber) {
        executorService.submit(new ParseTask(number));
    }
}

private String getBody(final int number) {
    Input input = new Input(entries, number, 0);
    Body body = new Body("Stuff", input);
    return gson.toJson(body);
}

private void writeResult(ParseResult result) {
    // populate two maps now
    itemsByCategory.put(result.clientId, result.category);
    processToTaskIdHolder.put(result.clientId, result.lastParent);
}

我们可能会花费大量时间来升级您的代码,但这是具有多线程的原始版本。我不确定它是否有效,因为正如我之前所说,您没有提供完整版本。也许它需要一些语法修复。

关于java - 根据我们需要调用多少页来并行化 while 循环?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48458184/

相关文章:

java - 从 RecyclerView 中的 ViewHolder 中删除小部件

Java Process.waitFor() 和 Readline 挂起

multithreading - native 互斥体实现

multithreading - 使用redis多线程,使用jedis连接池

python - 如何有效地枚举所有完美的正方形?

java - 使用 Spring Data LdapRepository 创建新用户失败,LDAP 错误代码为 32

java - 如何将下拉列表的选定值从xhtml页面传递到jsf bean?

objective-c - Apple 的 Objective-C 运行时如何在不降低性能的情况下进行多线程引用计数?

multithreading - Matlab多核

从 SqlDataReader 返回 Nullable 类型的 C# 性能提升