java - 如何确保我不会同时在两个线程之间共享同一个套接字?

标签 java multithreading synchronization thread-safety thread-local

<分区>

我有一个处理套接字的代码,我需要确保我不会在两个线程之间共享同一个套接字。在我下面的代码中,我有一个后台线程,每 60 秒运行一次并调用 updateLiveSockets() 方法。在 updateLiveSockets() 方法中,我迭代了我拥有的所有套接字,然后通过调用 SendToQueue 类的 send 方法开始对它们进行一个接一个的 ping并根据响应将它们标记为活的或死的。

现在所有的阅读器线程都将同时调用getNextSocket() 方法来获取下一个 Activity 的可用套接字,所以它必须是线程安全的,我需要确保所有的阅读器线程看到的都是一样的SocketHolderSocket 的常态。

下面是我的 SocketManager 类:

public class SocketManager {
  private static final Random random = new Random();
  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
  private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
      new ConcurrentHashMap<>();
  private final ZContext ctx = new ZContext();

  // ...

  private SocketManager() {
    connectToZMQSockets();
    scheduler.scheduleAtFixedRate(this::updateLiveSockets, 60, 60, TimeUnit.SECONDS);
  }

  // during startup, making a connection and populate once
  private void connectToZMQSockets() {
    Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
    for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
      liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets);
    }
  }

  private List<SocketHolder> connect(List<String> paddes, int socketType) {
    List<SocketHolder> socketList = new ArrayList<>();
    // ....
    return socketList;
  }

  // this method will be called by multiple threads concurrently to get the next live socket
  // is there any concurrency or thread safety issue or race condition here?
  public Optional<SocketHolder> getNextSocket() {
    for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
      Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
      if (liveSocket.isPresent()) {
        return liveSocket;
      }
    }
    return Optional.absent();
  }

  private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
    if (!CollectionUtils.isEmpty(listOfEndPoints)) {
      // The list of live sockets
      List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
      for (SocketHolder obj : listOfEndPoints) {
        if (obj.isLive()) {
          liveOnly.add(obj);
        }
      }
      if (!liveOnly.isEmpty()) {
        // The list is not empty so we shuffle it an return the first element
        return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
      }
    }
    return Optional.absent();
  }

  // runs every 60 seconds to ping all the socket to make sure whether they are alive or not
  private void updateLiveSockets() {
    Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;

    for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
      List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
      for (SocketHolder liveSocket : liveSockets) {
        Socket socket = liveSocket.getSocket();
        String endpoint = liveSocket.getEndpoint();
        Map<byte[], byte[]> holder = populateMap();
        Message message = new Message(holder, Partition.COMMAND);

        // pinging to see whether a socket is live or not
        boolean status = SendToQueue.getInstance().send(message.getAddress(), message.getEncodedRecords(), socket);
        boolean isLive = (status) ? true : false;

        SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
        liveUpdatedSockets.add(zmq);
      }
      liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
    }
  }
}

这是我的 SendToQueue 类:

  // this method will be called by multiple threads concurrently to send the data
  public boolean sendAsync(final long address, final byte[] encodedRecords) {
    Optional<SocketHolder> liveSockets = SocketManager.getInstance().getNextSocket();
    PendingMessage m = new PendingMessage(address, encodedRecords, liveSockets.get().getSocket(), true);
    cache.put(address, m);
    return doSendAsync(m, socket);
  }

  private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(pendingMessage.getEncodedRecords());
    try {
      // send data on a socket LINE A
      return msg.send(socket);
    } finally {
      msg.destroy();
    }
  }

  public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
    PendingMessage m = new PendingMessage(address, encodedRecords, socket, false);
    cache.put(address, m);
    try {
      if (doSendAsync(m, socket)) {
        return m.waitForAck();
      }
      return false;
    } finally {
      // Alternatively (checks that address points to m):
      // cache.asMap().remove(address, m);
      cache.invalidate(address);
    }
  }

问题陈述

现在你可以看到我在两个线程之间共享同一个套接字。似乎 getNextSocket() 可以返回一个 0MQ socketthread A。同时,timer thread 可能会访问同一个 0MQ socket 来 ping 它。在这种情况下,thread Atimer thread 正在改变相同的 0MQ socket,这可能会导致问题。因此,我试图找到一种方法,以防止不同线程同时将数据发送到同一个套接字并弄乱我的数据。

所以我决定同步套接字,这样两个线程就不能同时访问同一个套接字。下面是我在 updateLiveSockets 方法中所做的更改。我使用以下方法在套接字上进行了同步:

  // runs every 60 seconds to ping all the socket to make sure whether they are alive or not
  private void updateLiveSockets() {
    Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;

    for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
      List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
      for (SocketHolder liveSocket : liveSockets) {
        Socket socket = liveSocket.getSocket();
        String endpoint = liveSocket.getEndpoint();
        Map<byte[], byte[]> holder = populateMap();
        Message message = new Message(holder, Partition.COMMAND);

        // using the socket as its own lock
        synchronized (socket) {
            // pinging to see whether a socket is live or not
            boolean status = SendToQueue.getInstance().execute(message.getAddress(), message.getEncodedRecords(), socket);
            boolean isLive = (status) ? true : false;

            SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
            liveUpdatedSockets.add(zmq);
        }
      }
      liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
    }
  }

下面是我在 doSendAsync 方法中所做的更改。在这方面,我也在发送之前在套接字上进行了同步。

  private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(pendingMessage.getEncodedRecords());
    try {
      // send data on a socket LINE A by synchronizing on it
      synchronized (socket) {
        return msg.send(socket);
      }
    } finally {
      msg.destroy();
    }
  }

确保两个线程之间不共享相同套接字的最佳方法是什么?一般来说,我有大约 60 个套接字和 20 个线程访问这些套接字。

如果许多线程使用同一个套接字,资源就没有得到很好的利用。此外,如果 msg.send(socket); 被阻塞(从技术上讲不应该),则所有等待此套接字的线程都将被阻塞。所以我想可能有更好的方法来确保每个线程同时使用不同的单个 Activity 套接字,而不是在特定套接字上进行同步。还有我错过的任何极端情况或边缘情况会导致一些错误吗?

最佳答案

首先,您需要一种方式让客户使用 Socket 通知您他们已完成.您可以添加一种允许他们发出信号的方法。这是合法的,它会起作用,但你必须依靠你的客户表现良好。或者更确切地说,使用您的套接字的程序员不会忘记返回它。有一种模式可以帮助解决这个问题:execute around图案。而不是给出 Socket ,你创建了一个接受 Consumer<Socket> 的方法,然后执行消费者,返回Socket本身。

public void useSocket(Consumer<Socket> socketUser) {
    Socket socket = getSocket();
    try {
        socketUser.accept(socket);
    } finally {
        returnSocket(socket);
    }
}

现在让我们看看我们将如何实现 getSocket()returnSocket() .显然,它涉及从某种集合中获取它们,并将它们返回到该集合中。 Queue在这里是一个不错的选择(正如其他人也指出的那样)。它允许从一侧获取它,并在另一侧返回,此外还有大量高效的线程安全实现,而且 taker 和 adder 通常不会相互争用。既然你事先知道 socket 的数量,我会选择 ArrayBlockingQueue .

这里的另一个问题是您的实现返回一个 Optional .我不确定如果没有可用的客户会做什么 Socket , 但如果它正在等待并重试,我建议你简单地制作 getSocket()阻塞在队列中。实际上,我会尊重您方法的这一方面,并​​考虑到可能没有 Socket。可用的。对于 execute around 方法,这会将其转换为 useSocket()方法返回 false如果没有 Socket可用。

private final BlockingQueue<Socket> queue;

public SocketPool(Set<Socket> sockets) {
    queue = new ArrayBlockingQueue<>(sockets.size());
    queue.addAll(sockets);
}

public boolean useSocket(Consumer<Socket> socketUser) throws InterruptedException {
    Optional<Socket> maybeSocket = getSocket();
    try {
        maybeSocket.ifPresent(socketUser);
        return maybeSocket.isPresent();
    } finally {
        maybeSocket.ifPresent(this::returnSocket);
    }
}

private void returnSocket(Socket socket) {
    queue.add(socket);
}

private Optional<Socket> getSocket() throws InterruptedException {
    return Optional.ofNullable(queue.poll());
}

就是这样,就是你的 SocketPool .

啊,但接下来是小气的一点:检查 active 。这很吝啬,因为您的 Activity 检查实际上与您的普通客户竞争。

为了解决这个问题,我建议如下:让您的客户报告是否 Socket他们得到的是活的还是不活的。由于检查 Activity 性归结为使用套接字,这对您的客户来说应该很简单。

所以不是 Consumer<Socket> ,我们将采用 Function<Socket, Boolean> .如果函数返回 false ,我们将考虑 Socket不再活着。在这种情况下,我们不会将其添加回常规队列,而是将其添加到死套接字集合中,并且我们将有一个计划任务,间歇性地重新检查死套接字。由于这是在单独的集合上发生的,因此计划检查不再与常规客户竞争。

现在你可以制作一个 SocketManagerMap将数据中心映射到 SocketPool实例。此映射不需要更改,因此您可以将其设为最终并在 SocketManager 中对其进行初始化。的构造函数。

这是我对 SocketPool 的初步代码(未经测试):

class SocketPool implements AutoCloseable {

    private final BlockingQueue<Socket> queue;
    private final Queue<Socket> deadSockets = new ConcurrentLinkedQueue<>();
    private final ScheduledFuture<?> scheduledFuture;

    public SocketPool(Set<Socket> sockets, ScheduledExecutorService scheduledExecutorService) {
        queue = new ArrayBlockingQueue<>(sockets.size());
        queue.addAll(sockets);
        scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this::recheckDeadSockets, 60, 60, TimeUnit.SECONDS);
    }

    public boolean useSocket(Function<Socket, Boolean> socketUser) throws InterruptedException {
        Optional<Socket> maybeSocket = getSocket();
        boolean wasLive = true;
        try {
            wasLive = maybeSocket.map(socketUser).orElse(false);
            return wasLive && maybeSocket.isPresent();
        } finally {
            boolean isLive = wasLive;
            maybeSocket.ifPresent(socket -> {
                if (isLive) {
                    returnSocket(socket);
                } else {
                    reportDead(socket);
                }
            });
        }
    }

    private void reportDead(Socket socket) {
        deadSockets.add(socket);
    }

    private void returnSocket(Socket socket) {
        queue.add(socket);
    }

    private Optional<Socket> getSocket() throws InterruptedException {
        return Optional.ofNullable(queue.poll());
    }

    private void recheckDeadSockets() {
        for (int i = 0; i < deadSockets.size(); i++) {
            Socket socket = deadSockets.poll();
            if (checkAlive(socket)) {
                queue.add(socket);
            } else {
                deadSockets.add(socket);
            }
        }
    }

    private boolean checkAlive(Socket socket) {
        // do actual live check with SendSocket class, or implement directly in this one
        return true;
    }

    @Override
    public void close() throws Exception {
        scheduledFuture.cancel(true);
    }
}

关于java - 如何确保我不会同时在两个线程之间共享同一个套接字?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47107331/

相关文章:

java - 根据分隔符拆分字符串但仅在括号外?

java - 注释参数的继承

java - 客户端服务器程序的多线程

java - 为什么 Java 中线程是顺序运行而不是同时运行

c++ - 如何从更基本的同步原语制作多读/单写锁?

java - 创建对象时出现 IllegalMonitorStateException

java - 使用通用图像加载器仅从磁盘缓存加载一些图像

c# - 为什么线程和任务之间的性能差异如此之大?

java - 这些方法是线程安全的吗?

java - ORA-01861 : literal does not match format string error isn't always shown in similar cases