java - 以线程安全的方式并行执行多台机器

标签 java multithreading concurrency thread-safety executorservice

我有一个主机名列表,我正在使用 ExecutorService 并行执行该列表,以收集每个主机名的所有指标。然后我通过迭代每个主机名来创建一个列表,其中包含所有主机名的所有指标相关信息。由于我并行执行多个主机名,因此我不确定此代码是否线程安全。

这是我的主要代码,我在其中并行执行多个 HOSTNAMES:

final Flows typeOfFlow = Flows.TREE;

List<Future<MachineMetrics>> machineFutureList = new ArrayList<>();
for (final String machine : HOSTNAMES) {
    machineFutureList.add(executorService.submit(new Callable<MachineMetrics>() {
        @Override
        public MachineMetrics call() throws Exception {
            MachineMetrics machineMetrics = new MachineMetrics();
            String url = "http://" + machine + ":8080/text";
            Map<String, String> metrics = getMetrics(machine, url, typeOfFlow);
            machineMetrics.setMachineName(machine.split("\\.")[0]);
            machineMetrics.setDatacenter(TestUtils.findDatacenter(machine).get().name().toLowerCase());
            machineMetrics.setMetrics(metrics);
            return machineMetrics;
        }
    }));
}
List<MachineMetrics> metricsList = new ArrayList<>();
for (Future<MachineMetrics> future : machineFutureList) {
    try {
        metricsList.add(future.get());
    } catch (InterruptedException | ExecutionException ex) {
        // log exception here
    }
}
// now print all the hostnames metrics information
System.out.println(metricsList);

下面是我的 getMetrics 代码,与上面的代码位于同一类中:

private Map<String, String> getMetrics(final String machine, final String url, final Flows flowType) {
    Map<String, String> holder = new HashMap<String, String>();
    try {
        RestTemplate restTemplate = RestTemplateClient.getInstance().getClient();
        String response = restTemplate.getForObject(url, String.class);
        Matcher m = PATTERN.matcher(response);
        while (m.find()) {
            String key = m.group(1).trim();
            String value = m.group(2).trim();
            holder.put(key, value);
        }
    } catch (Exception ex) {
        // log here
    }

    return TestUtils.process(holder);
}

下面是我在 TestUtils 类中的 findDatacenter 代码:

public static Optional<Datacenter> findDatacenter(final String hostname) {
    if (!TestUtils.isEmpty(hostname)) {
        for (Datacenter dc : DC_LIST) {
            String namepart = "." + dc.name().toLowerCase() + ".";
            if (hostname.indexOf(namepart) >= 0) {
                return Optional.of(dc);
            }
        }
    }
    return Optional.absent();
}

下面是我在 TestUtils 类中的 process 方法:

public static Map<String, String> process(final Map<String, String> holder) {
    Map<String, String> tempMap = new HashMap<>();

    for (Map.Entry<String, String> entry : holder.entrySet()) {
        if (!entry.getKey().startsWith("calls_") && !entry.getValue().contains("|")) {
            continue;
        }
        String currentKey = entry.getKey();
        String currentValue = entry.getValue();
        StringTokenizer tokenizer = new StringTokenizer(currentValue, "|");

        String count = tokenizer.nextToken().trim();
        String avgData = tokenizer.nextToken().trim();
        String medianData = tokenizer.nextToken().trim();
        String n95data = tokenizer.nextToken().trim();
        String n99data = tokenizer.nextToken().trim();

        tempMap.put(generateKey(currentKey, currentKey.contains(MISS), COUNT), count);
        tempMap.put(generateKey(currentKey, currentKey.contains(MISS), AVG_IN_MS), avgData);
        tempMap.put(generateKey(currentKey, currentKey.contains(MISS), MEDIAN_IN_MS), medianData);
        tempMap.put(generateKey(currentKey, currentKey.contains(MISS), N95_IN_MS), n95data);
        tempMap.put(generateKey(currentKey, currentKey.contains(MISS), N99_IN_MS), n99data);

        holder.remove(currentKey);
    }

    tempMap.putAll(holder);

    return tempMap;
}

下面是我在 TestUtils 类中的 generateKey 方法:

private static String generateKey(final String currentKey, final boolean hasMiss, final String constant) {
    StringBuilder newKey = new StringBuilder();

    if (hasMiss) {
        newKey.append(currentKey).append(constant);
    } else {
        String firstPart = currentKey.substring(0, currentKey.indexOf("_"));
        String secondPart = currentKey.substring(currentKey.lastIndexOf("_") + 1, currentKey.length());
        newKey.append(firstPart).append(CACHE).append(secondPart).append(constant);
    }

    return newKey.toString();
}

下面是我的 MachineMetrics 类:

public class MachineMetrics {

    private String machineName;
    private String datacenter;
    private Map<String, String> metrics;

    // normal setters and getters here
}

我上面的代码线程安全吗?我是否做错了什么,由于某些竞争条件或线程安全问题,我可能会看到错误的结果?

最佳答案

看起来不错。你的方法是 stateless 。您还使用 immutable objects作为方法参数。所以你不会有线程安全的问题。

一点评论:

for (Future<MachineMetrics> future : machineFutureList) {
    try {
        metricsList.add(future.get());
    } catch (InterruptedException | ExecutionException ex) {
        // log exception here
    }
}

get 如有必要,等待计算完成,然后检索其结果。因此,如果第一次调用很慢,您将无法检索其他结果。使用isDone检查您是否可以调用 get 而无需等待。

关于java - 以线程安全的方式并行执行多台机器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35259551/

相关文章:

java - 使用后退按钮时显示标签分页不起作用

java - 有没有办法让泛型持续存在? [Java控制台]

c# - 如何使用 HostBuilder Generic Host ref StopAsync OperationCanceledException 设置 ShutdownTimeout

java - 什么时候应该使用 CompletionService 而不是 ExecutorService?

go - 带 channel 的 WaitGroup

multithreading - 顺序执行和同步执行有什么区别?

java - Prepared Statement、批量更新、不同的更新

java - 是否有必要从 getter 方法返回一个与 Hibernate 传递给 setter 的对象不同的对象?

multithreading - 当监听器线程终止时,如何在不重新启动服务器的情况下获得与服务器丢失的连接

c# - HttpClient.PostAsync ThreadPool 中没有足够的空闲线程来完成操作