<分区>
我有一个处理套接字的代码,我需要确保我不会在两个线程之间共享同一个套接字。在我下面的代码中,我有一个后台线程,每 60 秒运行一次并调用 updateLiveSockets()
方法。在 updateLiveSockets()
方法中,我迭代了我拥有的所有套接字,然后通过调用 SendToQueue
类的 send
方法开始对它们进行一个接一个的 ping并根据响应将它们标记为活的或死的。
现在所有的阅读器线程都将同时调用getNextSocket()
方法来获取下一个 Activity 的可用套接字,所以它必须是线程安全的,我需要确保所有的阅读器线程看到的都是一样的SocketHolder
和 Socket
的常态。
下面是我的 SocketManager
类:
public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
new ConcurrentHashMap<>();
private final ZContext ctx = new ZContext();
// ...
private SocketManager() {
connectToZMQSockets();
scheduler.scheduleAtFixedRate(this::updateLiveSockets, 60, 60, TimeUnit.SECONDS);
}
// during startup, making a connection and populate once
private void connectToZMQSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets);
}
}
private List<SocketHolder> connect(List<String> paddes, int socketType) {
List<SocketHolder> socketList = new ArrayList<>();
// ....
return socketList;
}
// this method will be called by multiple threads concurrently to get the next live socket
// is there any concurrency or thread safety issue or race condition here?
public Optional<SocketHolder> getNextSocket() {
for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
if (liveSocket.isPresent()) {
return liveSocket;
}
}
return Optional.absent();
}
private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
if (!CollectionUtils.isEmpty(listOfEndPoints)) {
// The list of live sockets
List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
for (SocketHolder obj : listOfEndPoints) {
if (obj.isLive()) {
liveOnly.add(obj);
}
}
if (!liveOnly.isEmpty()) {
// The list is not empty so we shuffle it an return the first element
return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
}
}
return Optional.absent();
}
// runs every 60 seconds to ping all the socket to make sure whether they are alive or not
private void updateLiveSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) {
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
Message message = new Message(holder, Partition.COMMAND);
// pinging to see whether a socket is live or not
boolean status = SendToQueue.getInstance().send(message.getAddress(), message.getEncodedRecords(), socket);
boolean isLive = (status) ? true : false;
SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
}
}
}
这是我的 SendToQueue
类:
// this method will be called by multiple threads concurrently to send the data
public boolean sendAsync(final long address, final byte[] encodedRecords) {
Optional<SocketHolder> liveSockets = SocketManager.getInstance().getNextSocket();
PendingMessage m = new PendingMessage(address, encodedRecords, liveSockets.get().getSocket(), true);
cache.put(address, m);
return doSendAsync(m, socket);
}
private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
ZMsg msg = new ZMsg();
msg.add(pendingMessage.getEncodedRecords());
try {
// send data on a socket LINE A
return msg.send(socket);
} finally {
msg.destroy();
}
}
public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
PendingMessage m = new PendingMessage(address, encodedRecords, socket, false);
cache.put(address, m);
try {
if (doSendAsync(m, socket)) {
return m.waitForAck();
}
return false;
} finally {
// Alternatively (checks that address points to m):
// cache.asMap().remove(address, m);
cache.invalidate(address);
}
}
问题陈述
现在你可以看到我在两个线程之间共享同一个套接字。似乎 getNextSocket()
可以返回一个 0MQ socket
给 thread A
。同时,timer thread
可能会访问同一个 0MQ socket
来 ping 它。在这种情况下,thread A
和 timer thread
正在改变相同的 0MQ socket
,这可能会导致问题。因此,我试图找到一种方法,以防止不同线程同时将数据发送到同一个套接字并弄乱我的数据。
所以我决定同步套接字,这样两个线程就不能同时访问同一个套接字。下面是我在 updateLiveSockets
方法中所做的更改。我使用以下方法在套接字上进行了同步:
// runs every 60 seconds to ping all the socket to make sure whether they are alive or not
private void updateLiveSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) {
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
Message message = new Message(holder, Partition.COMMAND);
// using the socket as its own lock
synchronized (socket) {
// pinging to see whether a socket is live or not
boolean status = SendToQueue.getInstance().execute(message.getAddress(), message.getEncodedRecords(), socket);
boolean isLive = (status) ? true : false;
SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
}
liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
}
}
下面是我在 doSendAsync
方法中所做的更改。在这方面,我也在发送之前在套接字上进行了同步。
private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
ZMsg msg = new ZMsg();
msg.add(pendingMessage.getEncodedRecords());
try {
// send data on a socket LINE A by synchronizing on it
synchronized (socket) {
return msg.send(socket);
}
} finally {
msg.destroy();
}
}
确保两个线程之间不共享相同套接字的最佳方法是什么?一般来说,我有大约 60 个套接字和 20 个线程访问这些套接字。
如果许多线程使用同一个套接字,资源就没有得到很好的利用。此外,如果 msg.send(socket);
被阻塞(从技术上讲不应该),则所有等待此套接字的线程都将被阻塞。所以我想可能有更好的方法来确保每个线程同时使用不同的单个 Activity 套接字,而不是在特定套接字上进行同步。还有我错过的任何极端情况或边缘情况会导致一些错误吗?