java - 对所有线程具有完全原子性,而不影响性能或吞吐量

标签 java multithreading thread-safety atomic

我有一个主机名列表,我应该通过从中创建正确的 URL 来进行调用。假设我的链接列表中有四个主机名(hostA、hostB、hostC、hostD) -

  • 执行hostA url,如果hostA UP,则获取数据并返回响应。
  • 但是,如果 hostA 已关闭,则将 hostA 添加到主机名阻止列表中,并确保没有其他线程正在调用 hostA。然后尝试执行hostB url并返回响应。
  • 但如果 hostB 也已关闭,则将 hostB 也添加到主机名阻止列表中,并重复相同的操作。

此外,我的应用程序中运行着一个后台线程,该线程将包含 block 主机名列表(来 self 的另一个服务),我们不应该对其进行调用,但它每 10 分钟运行一次,因此 block 主机名列表将仅在 10 分钟后更新,因此如果存在任何主机名阻止列表,那么我不会从主线程调用该主机名,而是尝试调用另一个主机名。这意味着如果 hostA 被阻止,则阻止列表中将包含 hostA,但如果 hostA 已启动,则该列表将不会有 hostA在其中。

下面是我的后台线程代码,它从我的服务 URL 获取数据,并在应用程序启动后每 10 分钟继续运行一次。然后,它将解析来自 URL 的数据并将其存储在 ClientData 类变量中 -

临时调度程序
public class TempScheduler {

    // .. scheduledexecutors service code to start the background thread

    // call the service and get the data and then parse 
    // the response.
    private void callServiceURL() {
        String url = "url";
        RestTemplate restTemplate = new RestTemplate();
        String response = restTemplate.getForObject(url, String.class);
        parseResponse(response);
    }

    // parse the response and store it in a variable
    private void parseResponse(String response) {
        //...       
        
        // get the block list of hostnames
        Map<String, List<String>> coloExceptionList = gson.fromJson(response.split("blocklist=")[1], Map.class);
        List<String> blockList = new ArrayList<String>();
        for(Map.Entry<String, List<String>> entry : coloExceptionList.entrySet()) {
            for(String hosts : entry.getValue()) {
                blockList.add(hosts);
            }
        }
        
        // store the block list of hostnames which I am not supposed to make a call
        ClientData.replaceBlockedHosts(blockList);
    }
}

下面是我的 ClientData 类。 replaceBlockedHosts 方法将仅由后台线程调用,这意味着只有一个写入器。但是 isHostBlocked 方法将被主应用程序线程多次调用,以检查特定主机名是否被阻止。而且 blockHost 方法将从 catch block 多次调用,以将已关闭的主机添加到 blockedHosts 列表中,因此我需要确保所有读取线程可以看到一致的数据,并且不会调用该故障主机,而是调用主机名链接列表中的下一个主机。

客户数据
public class ClientData {

    // .. some other variables here which in turn used to decide the  list of hostnames
    
    private static final AtomicReference<ConcurrentHashMap<String, String>> blockedHosts = 
            new AtomicReference<ConcurrentHashMap<String, String>>(new ConcurrentHashMap<String, String>());

    public static boolean isHostBlocked(String hostName) {
        return blockedHosts.get().containsKey(hostName);
    }

    public static void blockHost(String hostName) {
        blockedHosts.get().put(hostName, hostName);
    }

    public static void replaceBlockedHosts(List<String> hostNames) {
        ConcurrentHashMap<String, String> newBlockedHosts = new ConcurrentHashMap<>();
        for (String hostName : hostNames) {
            newBlockedHosts.put(hostName, hostName);
        }
        blockedHosts.set(newBlockedHosts);
    }
}

下面是我的主应用程序线程代码,其中有我应该进行调用的主机名列表。如果主机名为空或位于阻止列表类别中,那么我不会调用该特定主机名,而是尝试列表中的下一个主机名。

@Override
public DataResponse call() {

    List<String> hostnames = new LinkedList<String>();
    
    // .. some separate code here to populate the hostnames list
    // from ClientData class
    
    for (String hostname : hostnames) {     

        // If host name is null or host name is in block list category, skip sending request to this host
        if (hostname == null || ClientData.isHostBlocked(hostname)) {
            continue;
        }
    
        try {
            String url = generateURL(hostname);

            response = restTemplate.getForObject(url, String.class);

            break;
        } catch (RestClientException ex) {
            // add host to block list, 
            // Is this call fully atomic and thread safe for blockHost method 
            // in ClientData class?
            ClientData.blockHost(hostname);
        }
    }
}

每当主机名从主线程关闭时,我都不需要调用它。我的后台线程也从我的一项服务获取这些详细信息,每当任何服务器关闭时,它都会有作为 block 主机的主机名列表,每当它们启动时,该列表都会更新。

而且,每当抛出任何 RestClientException 时,我都会在 blockedHosts 并发映射中添加该主机名,因为我的后台线程每 10 分钟运行一次,这样映射就不会'在 10 分钟完成之前不会有此主机名。每当该服务器恢复时,我的后台就会自动更新此列表。

我上面的主机名 block 列表代码是否完全原子且线程安全?因为我想要的是 - 如果 hostA 关闭,则在更新阻止的主机列表之前,其他线程不应调用 hostA。

最佳答案

请记住,与其他主机的通信比您在线程中执行的任何操作花费的时间要多得多。在这种情况下我不会担心原子操作。

假设我们有线程 t1t2t1hostA 发送请求并等待响应。当达到超时时,将抛出RestClientException。现在,抛出异常和将该主机添加到阻止主机列表之间的时间跨度非常小。 可能发生 t2 尝试在主机被阻止之前向 hostA 发送请求的情况 - 但更有可能的是t2t1 等待响应的很长一段时间内已经发送了它,这是您无法阻止的。

您可以尝试设置合理的超时时间。当然,还有其他类型的错误不会等待超时,但即使这些错误也比处理异常花费更多的时间。

使用ConcurrentHashMap是线程安全的,并且应该足以跟踪被阻止的主机。

AtomicReference 本身并没有多大作用,除非您使用 compareAndSet 之类的方法,因此调用不是原子的(但如上所述,不需要在我看来)。如果您确实想在出现异常后立即阻止主机,则应该使用某种同步。您可以使用 synchronized set存储被阻止的主机。这仍然无法解决需要一些时间才能真正检测到任何连接错误的问题。


关于更新:正如评论中所述, future 超时应该大于请求超时。否则,Callable 可能会被取消,并且主机不会添加到列表中。使用 Future.get 时,您可能甚至不需要超时,因为请求最终会成功或失败。

当主机 A 宕机时,您看到许多异常的实际问题可能只是因为许多线程仍在等待主机 A 的响应。您只在启动请求之前而不是在任何请求期间检查阻塞的主机。仍在等待该主机响应的任何线程将继续这样做,直到达到超时。

如果您想防止这种情况发生,您可以尝试定期检查当前主机是否尚未被阻止。这是一个非常幼稚的解决方案,并且有点违背了 future 的目的,因为它基本上是轮询。不过,它应该有助于理解一般问题。

// bad pseudo code 

DataTask dataTask = new DataTask(dataKeys, restTemplate);
future = service.submit(dataTask);

while(!future.isDone()) {
    if( blockedHosts.contains(currentHost) ) {
        // host unreachable, don't wait for http timeout
        future.cancel(); 
    }
    thread.sleep(/* */);
}

更好的方法是在同一主机宕机时向所有等待同一主机的 DataTask 线程发送中断,以便它们可以中止请求并尝试下一个主机。

关于java - 对所有线程具有完全原子性,而不影响性能或吞吐量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25614948/

相关文章:

Java - 为什么即使调用了有效资源,也会执行自定义 404 错误页面?

java - 自定义对象列表作为泛型方法的参数

java - 如果不使用 "return;",如何突破 void 方法

mysql - 具有 gpars 的 Groovy 应用程序在多次迭代后速度变慢

c# - C# 中线程的处理(死锁)

java - 空安全 valueOf 方法

multithreading - 我的 “design pattern”用于异步方法好吗?

c# - 从 DragDrop 事件中将数据传递到线程的线程问题

c# - 这个例子线程安全吗?

python - 使用 python 中的线程从无限循环中读取结果