我有一个主机名列表,我正在使用 ExecutorService 并行执行该列表,以收集每个主机名的所有指标。然后我通过迭代每个主机名来创建一个列表,其中包含所有主机名的所有指标相关信息。由于我并行执行多个主机名,因此我不确定此代码是否线程安全。
这是我的主要代码,我在其中并行执行多个 HOSTNAMES
:
final Flows typeOfFlow = Flows.TREE;
List<Future<MachineMetrics>> machineFutureList = new ArrayList<>();
for (final String machine : HOSTNAMES) {
machineFutureList.add(executorService.submit(new Callable<MachineMetrics>() {
@Override
public MachineMetrics call() throws Exception {
MachineMetrics machineMetrics = new MachineMetrics();
String url = "http://" + machine + ":8080/text";
Map<String, String> metrics = getMetrics(machine, url, typeOfFlow);
machineMetrics.setMachineName(machine.split("\\.")[0]);
machineMetrics.setDatacenter(TestUtils.findDatacenter(machine).get().name().toLowerCase());
machineMetrics.setMetrics(metrics);
return machineMetrics;
}
}));
}
List<MachineMetrics> metricsList = new ArrayList<>();
for (Future<MachineMetrics> future : machineFutureList) {
try {
metricsList.add(future.get());
} catch (InterruptedException | ExecutionException ex) {
// log exception here
}
}
// now print all the hostnames metrics information
System.out.println(metricsList);
下面是我的 getMetrics
代码,与上面的代码位于同一类中:
private Map<String, String> getMetrics(final String machine, final String url, final Flows flowType) {
Map<String, String> holder = new HashMap<String, String>();
try {
RestTemplate restTemplate = RestTemplateClient.getInstance().getClient();
String response = restTemplate.getForObject(url, String.class);
Matcher m = PATTERN.matcher(response);
while (m.find()) {
String key = m.group(1).trim();
String value = m.group(2).trim();
holder.put(key, value);
}
} catch (Exception ex) {
// log here
}
return TestUtils.process(holder);
}
下面是我在 TestUtils
类中的 findDatacenter
代码:
public static Optional<Datacenter> findDatacenter(final String hostname) {
if (!TestUtils.isEmpty(hostname)) {
for (Datacenter dc : DC_LIST) {
String namepart = "." + dc.name().toLowerCase() + ".";
if (hostname.indexOf(namepart) >= 0) {
return Optional.of(dc);
}
}
}
return Optional.absent();
}
下面是我在 TestUtils
类中的 process
方法:
public static Map<String, String> process(final Map<String, String> holder) {
Map<String, String> tempMap = new HashMap<>();
for (Map.Entry<String, String> entry : holder.entrySet()) {
if (!entry.getKey().startsWith("calls_") && !entry.getValue().contains("|")) {
continue;
}
String currentKey = entry.getKey();
String currentValue = entry.getValue();
StringTokenizer tokenizer = new StringTokenizer(currentValue, "|");
String count = tokenizer.nextToken().trim();
String avgData = tokenizer.nextToken().trim();
String medianData = tokenizer.nextToken().trim();
String n95data = tokenizer.nextToken().trim();
String n99data = tokenizer.nextToken().trim();
tempMap.put(generateKey(currentKey, currentKey.contains(MISS), COUNT), count);
tempMap.put(generateKey(currentKey, currentKey.contains(MISS), AVG_IN_MS), avgData);
tempMap.put(generateKey(currentKey, currentKey.contains(MISS), MEDIAN_IN_MS), medianData);
tempMap.put(generateKey(currentKey, currentKey.contains(MISS), N95_IN_MS), n95data);
tempMap.put(generateKey(currentKey, currentKey.contains(MISS), N99_IN_MS), n99data);
holder.remove(currentKey);
}
tempMap.putAll(holder);
return tempMap;
}
下面是我在 TestUtils
类中的 generateKey
方法:
private static String generateKey(final String currentKey, final boolean hasMiss, final String constant) {
StringBuilder newKey = new StringBuilder();
if (hasMiss) {
newKey.append(currentKey).append(constant);
} else {
String firstPart = currentKey.substring(0, currentKey.indexOf("_"));
String secondPart = currentKey.substring(currentKey.lastIndexOf("_") + 1, currentKey.length());
newKey.append(firstPart).append(CACHE).append(secondPart).append(constant);
}
return newKey.toString();
}
下面是我的 MachineMetrics
类:
public class MachineMetrics {
private String machineName;
private String datacenter;
private Map<String, String> metrics;
// normal setters and getters here
}
我上面的代码线程安全吗?我是否做错了什么,由于某些竞争条件或线程安全问题,我可能会看到错误的结果?
最佳答案
看起来不错。你的方法是 stateless 。您还使用 immutable objects作为方法参数。所以你不会有线程安全的问题。
一点评论:
for (Future<MachineMetrics> future : machineFutureList) {
try {
metricsList.add(future.get());
} catch (InterruptedException | ExecutionException ex) {
// log exception here
}
}
get 如有必要,等待计算完成,然后检索其结果。
因此,如果第一次调用很慢,您将无法检索其他结果。使用isDone检查您是否可以调用 get
而无需等待。
关于java - 以线程安全的方式并行执行多台机器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35259551/