java - 不要同时在两个线程之间共享同一个套接字

标签 java multithreading concurrency synchronization thread-safety

我有大约 60 个套接字和 20 个线程,我想确保每个线程每次都在不同的套接字上工作,所以我根本不想在两个线程之间共享同一个套接字。

在我的 SocketManager 类中,我有一个后台线程,它每 60 秒运行一次并调用 updateLiveSockets() 方法。在 updateLiveSockets() 方法中,我迭代了我拥有的所有套接字,然后通过调用 SendToQueue 类的 send 方法开始对它们进行一个接一个的 ping并根据响应将它们标记为活的或死的。在 updateLiveSockets() 方法中,我总是需要遍历所有套接字并 ping 它们以检查它们是活的还是死的。

现在所有的读取器线程将同时调用SocketManager 类的getNextSocket() 方法来获取下一个有效的可用套接字,以便在该套接字上发送业务消息。所以我在套接字上发送两种类型的消息:

  • 一个是套接字上的ping 消息。这仅从定时器线程调用 SocketManager 类中的 updateLiveSockets() 方法发送。
  • 其他是套接字上的business 消息。这是在 SendToQueue 类中完成的。

因此,如果 pinger 线程正在 ping 一个套接字以检查它们是否处于 Activity 状态,那么其他业务线程不应使用该套接字。同样,如果业务线程正在使用套接字在其上发送数据,则 pinger 线程不应 ping 那个套接字。这适用于所有 socket 。但我需要确保在 updateLiveSockets 方法中,每当我的后台线程启动时,我们都会对所有可用套接字执行 ping 操作,以便我们可以确定哪个套接字处于 Activity 状态或已停止状态。

下面是我的 SocketManager 类:

public class SocketManager {
  private static final Random random = new Random();
  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
  private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
      new ConcurrentHashMap<>();
  private final ZContext ctx = new ZContext();

  // ...

  private SocketManager() {
    connectToZMQSockets();
    scheduler.scheduleAtFixedRate(new Runnable() {
      public void run() {
        updateLiveSockets();
      }
    }, 60, 60, TimeUnit.SECONDS);
  }

  // during startup, making a connection and populate once
  private void connectToZMQSockets() {
    Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
    for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
      liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets);
    }
  }

  private List<SocketHolder> connect(List<String> paddes, int socketType) {
    List<SocketHolder> socketList = new ArrayList<>();
    // ....
    return socketList;
  }

  // this method will be called by multiple threads concurrently to get the next live socket
  // is there any concurrency or thread safety issue or race condition here?
  public Optional<SocketHolder> getNextSocket() {
    for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
      Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
      if (liveSocket.isPresent()) {
        return liveSocket;
      }
    }
    return Optional.absent();
  }

  private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
    if (!listOfEndPoints.isEmpty()) {
      // The list of live sockets
      List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
      for (SocketHolder obj : listOfEndPoints) {
        if (obj.isLive()) {
          liveOnly.add(obj);
        }
      }
      if (!liveOnly.isEmpty()) {
        // The list is not empty so we shuffle it an return the first element
        return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
      }
    }
    return Optional.absent();
  }

  // runs every 60 seconds to ping all the available socket to make sure whether they are alive or not
  private void updateLiveSockets() {
    Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;

    for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
      List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
      for (SocketHolder liveSocket : liveSockets) {
        Socket socket = liveSocket.getSocket();
        String endpoint = liveSocket.getEndpoint();
        Map<byte[], byte[]> holder = populateMap();
        Message message = new Message(holder, Partition.COMMAND);

        // pinging to see whether a socket is live or not
        boolean isLive = SendToQueue.getInstance().send(message.getAddress(), message.getEncodedRecords(), socket);
        SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
        liveUpdatedSockets.add(zmq);
      }
      liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
    }
  }
}

这是我的 SendToQueue 类:

  // this method will be called by multiple reader threads (around 20) concurrently to send the data
  public boolean sendAsync(final long address, final byte[] encodedRecords) {
    PendingMessage m = new PendingMessage(address, encodedRecords, true);
    cache.put(address, m);
    return doSendAsync(m);
  }

  private boolean doSendAsync(final PendingMessage pendingMessage) {
    Optional<SocketHolder> liveSocket = SocketManager.getInstance().getNextSocket();
    if (!liveSocket.isPresent()) {
      // log error
      return false;
    }       
    ZMsg msg = new ZMsg();
    msg.add(pendingMessage.getEncodedRecords());
    try {
      // send data on a socket LINE A
      return msg.send(liveSocket.get().getSocket());
    } finally {
      msg.destroy();
    }
  }

  public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
    PendingMessage m = new PendingMessage(address, encodedRecords, socket, false);
    cache.put(address, m);
    try {
      if (doSendAsync(m, socket)) {
        return m.waitForAck();
      }
      return false;
    } finally {
      cache.invalidate(address);
    }
  }

问题陈述

现在,正如您在上面看到的,我在两个线程之间共享同一个套接字。似乎 SocketManager 类中的 getNextSocket() 可以返回一个 0MQ socketThread A。同时,timer thread 可能会访问同一个 0MQ socket 来 ping 它。在这种情况下,Thread Atimer thread 正在改变相同的 0MQ socket,这可能会导致问题。因此,我试图找到一种方法,以防止不同线程同时将数据发送到同一个套接字并弄乱我的数据。

我能想到的一个解决方案是在发送数据时在套接字上使用同步,但如果许多线程使用同一个套接字,资源就不会得到很好的利用。此外,如果 msg.send(socket); 被阻塞(从技术上讲不应该),则所有等待此套接字的线程都将被阻塞。所以我想可能有更好的方法来确保每个线程同时使用不同的单个实时套接字,而不是在特定套接字上进行同步。

最佳答案

So I am trying to find a way so that I can prevent different threads from sending data to the same socket at the same time and mucking up my data.

当然有许多不同的方法可以做到这一点。对我来说,这似乎是 BlockingQueue 的正确使用方式。业务线程将从队列中取出一个套接字,并保证没有其他人会使用该套接字。

private final BlockingQueue<SocketHolder> socketHolderQueue = new LinkedBlockingQueue<>();
...
public Optional<SocketHolder> getNextSocket() {
   SocketHolder holder = socketHolderQueue.poll();
   return holder;
}
...
public void finishedWithSocket(SocketHolder holder) {
   socketHolderQueue.put(holder);
}

出于您提到的原因,我认为在套接字上进行同步不是一个好主意——ping 线程将阻塞业务线程。

有多种方法可以处理 ping 线程逻辑。我会存储你的 Socket 最后一次使用时间,然后你的 ping 线程可以经常从同一个 BlockingQueue 中获取每个套接字,测试它,然后把每个套接字放回去测试后放到队列末尾。

public void testSockets() {
   // one run this for as many sockets as are in the queue
   int numTests = socketHolderQueue.size();
   for (int i = 0; i < numTests; i++) {
      SocketHolder holder = socketHolderQueue.poll();
      if (holder == null) {
         break;
      }
      if (socketIsOk(socketHolder)) {
          socketHolderQueue.put(socketHolder);
      } else {
          // close it here or something
      }
   }
}

您还可以使用 getNextSocket() 代码将线程从队列中取出,检查计时器并将它们放入测试队列以供 ping 线程使用,然后从队列。业务线程永远不会与 ping 线程同时使用同一个套接字。

根据您想要测试套接字的时间,您还可以在业务线程将其返回到队列后重置计时器,以便 ping 线程在 X 秒未使用后测试套接字。

关于java - 不要同时在两个线程之间共享同一个套接字,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47783712/

相关文章:

java - 如何使用命令行在 Windows 上查找 JRE 路径

java - 从大字符串中获取特定的子字符串

java - 我怎样才能找到我的线程在哪里被中断?

c# - 只读字典 - 多线程调用 .ContainsKey 方法

java - 对 CountedCompleter 的文档和来源感到困惑

go - 检测 Go 例程中断

java - 我的 EJB3 设计中的明显缺陷

java - Minecraft NPC 没有在服务器上生成并攻击我 : null pointer error

multithreading - perl:多线程写和尾文件

c - 标准 C 函数的线程安全版本 'localtime'