背景
我正在研究一个概念证明,它由一个使用 websocket 通信的服务器-客户端系统组成。我使用一个小型 jetty 解决方案(jetty-all-9.1.3.v20140225.jar 和 servlet-api-3.1.jar)。我已具备 PoC 所需的大部分基本功能。
我有 2 个类:
- TestServer(具有创建服务器实例的主要功能,请参见代码)
- ClientSocket(为每个客户端实例化的 WebSocket 对象)
问题
我想讨论的问题与广播客户端断开连接有关。服务器将 ClientSockets 的所有实例保存在一个数组中,ClientSockets 在其“onConnect”函数中添加到该数组。
系统稍后会将广播限制为客户端组,但对于 PoC,所有连接的客户端都应获得广播。如果一个客户端断开连接,我想向所有其他客户端发送通知(“myClient has disconnected.”或类似的)。
为了做到这一点,我在服务器中实现了一个广播功能,循环遍历客户端列表,将此信息发送给所有已连接的客户端,但断开连接的客户端除外。此功能还用于通知所有客户端有关其他事情的信息,例如新连接,如果客户端在有人连接或广播某些内容的同时断开连接,则很可能会在此处出现此问题。
这个问题很容易通过连接几个(10+)客户端(我在 js 中做)然后同时断开它们来产生。如果我这样做,我总是会遇到并发错误,对于极少数客户端 (2-3),它有时会起作用,具体取决于我猜的时间。
问题
考虑到任何客户端都可以随时(异步)断开连接,我应该如何处理向所有其他客户端广播的任务?我可以在不产生异常的情况下执行此操作吗?由于它是异步的,所以除了处理发生的异常之外,我看不到任何其他方法。非常感谢任何建议。
代码
TestServer.java
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.websocket.server.WebSocketHandler;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
public class TestServer {
private static final TestServer testServer = new TestServer();
private ArrayList<ClientSocket> clients = new ArrayList<>();
public static void main(String[] args) throws Exception {
int port = 8080;
Server server = new Server(port);
WebSocketHandler wsHandler = new WebSocketHandler() {
@Override
public void configure(WebSocketServletFactory factory) {
factory.register(ClientSocket.class);
}
};
server.setHandler(wsHandler);
System.out.println("Starting server on port " + port + ".");
server.start();
server.join();
}
public static TestServer getServer() {
return testServer;
}
public void addClient(ClientSocket client) {
this.clients.add(client);
}
public void removeClient(ClientSocket client) {
this.clients.remove(client);
this.broadcast("disconnect " + client.id, client);
}
public void broadcast(String message, ClientSocket excludedClient) {
log("Sending to all clients: " + message);
for (ClientSocket cs : this.clients) {
if (!cs.equals(excludedClient) && cs.session.isOpen() && cs != null) {
try {
cs.session.getRemote().sendStringByFuture(message);
} catch (Exception e) {
log("Error when broadcasting to " + cs.id + " (" + cs.address + "):");
log(e.getMessage());
}
}
}
}
由于如果您在此过程中干预数组,以这种方式遍历数组通常是行不通的,所以我也尝试了这个广播函数:
public void broadcast(String message, ClientSocket excludedClient) {
log("Sending to all clients: " + message);
Iterator<ClientSocket> cs = this.clients.iterator();
while (cs.hasNext()) {
ClientSocket client = cs.next();
if (client != null) {
if (!client.equals(excludedClient) && client.session.isOpen()) {
try {
client.session.getRemote().sendStringByFuture(message);
} catch (Exception e) {
log("Error when broadcasting to " + client.id + " (" + client.address + "):");
log(e.getMessage());
}
}
}
}
虽然它并没有更好地工作,因为问题是如果另一个 ClientSocket 对象断开连接,因为第一个正在广播它的断开连接,那么数组可能会被异步干预。
ClientSocket.java
import java.io.IOException;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
@WebSocket(maxIdleTime=0)
public class ClientSocket {
TestServer server = TestServer.getServer();
Session session;
String id;
String address;
@OnWebSocketClose
public void onClose(Session session, int statusCode, String reason) {
server.log("Disconnected: " + this.id + "(" + this.address + ")" + " (statusCode=" + statusCode + ", reason=" + reason + ")");
server.removeClient(this);
}
@OnWebSocketError
public void onError(Session session, Throwable t) {
server.log(this.id + "(" + this.address + ") error: " + t.getMessage());
}
@OnWebSocketConnect
public void onConnect(Session session) {
this.session = session;
this.address = this.session.getRemoteAddress().getAddress().toString().substring(1);
this.id = this.address; //Until user is registered their id is their IP
server.log("New connection: " + this.address);
server.addClient(this);
try {
session.getRemote().sendString("Hello client with address " + this.address + "!");
} catch (IOException e) {
server.log("Error in onConnect for " + this.id + "(" + this.address + "): " + e.getMessage());
}
}
@OnWebSocketMessage
public void onMessage(Session session, String message) {
server.log("Received from " + this.id + "(" + this.address + "): " + message);
String replyMessage;
String[] commandList = message.split("\\s+");
switch (commandList[0].toLowerCase()) {
case "register":
if (commandList.length > 1) {
this.id = commandList[1];
replyMessage = "Registered on server as " + this.id;
server.broadcast(this.id + " has connected", this);
} else {
replyMessage = "Incorrect register message";
}
break;
default:
replyMessage = "echo " + message;
break;
}
server.log("Sending to " + this.id + "(" + this.address + "): " + replyMessage);
try {
session.getRemote().sendString(replyMessage);
} catch (IOException e) {
server.log("Error during reply in onMessage for " + this.id + "(" + this.address + "): " + e.getMessage());
}
}
}
为了完整起见,我粘贴了整个类,尽管我删除了 onMessage 开关中的一些情况。然而,需要注意的部分是 onConnect 和 onClose 函数,它们将在服务器的客户端数组中填充和删除客户端。
错误日志
[2014-04-17 17:40:17.961] Sending to all clients: disconnect testclient4
2014-04-17 17:40:17.962:WARN:ClientSocket:qtp29398564-17: Unhandled Error (closing connection)
org.eclipse.jetty.websocket.api.WebSocketException: Cannot call method public void ClientSocket#onClose(org.eclipse.jetty.websocket.api.Session, int, java.lang.String) with args: [org.eclipse.jetty.websocket.common.WebSocketSession, java.lang.Integer, <null>]
at org.eclipse.jetty.websocket.common.events.annotated.CallableMethod.call(CallableMethod.java:99)
at org.eclipse.jetty.websocket.common.events.annotated.OptionalSessionCallableMethod.call(OptionalSessionCallableMethod.java:68)
at org.eclipse.jetty.websocket.common.events.JettyAnnotatedEventDriver.onClose(JettyAnnotatedEventDriver.java:122)
at org.eclipse.jetty.websocket.common.events.AbstractEventDriver.incomingFrame(AbstractEventDriver.java:125)
at org.eclipse.jetty.websocket.common.WebSocketSession.incomingFrame(WebSocketSession.java:302)
at org.eclipse.jetty.websocket.common.extensions.AbstractExtension.nextIncomingFrame(AbstractExtension.java:163)
at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.nextIncomingFrame(PerMessageDeflateExtension.java:92)
at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.incomingFrame(PerMessageDeflateExtension.java:66)
at org.eclipse.jetty.websocket.common.extensions.ExtensionStack.incomingFrame(ExtensionStack.java:210)
at org.eclipse.jetty.websocket.common.Parser.notifyFrame(Parser.java:219)
at org.eclipse.jetty.websocket.common.Parser.parse(Parser.java:257)
at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.read(AbstractWebSocketConnection.java:500)
at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.onFillable(AbstractWebSocketConnection.java:409)
at org.eclipse.jetty.io.AbstractConnection$1.run(AbstractConnection.java:505)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:607)
at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:536)
at java.lang.Thread.run(Thread.java:744)
Caused by:
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.eclipse.jetty.websocket.common.events.annotated.CallableMethod.call(CallableMethod.java:71)
at org.eclipse.jetty.websocket.common.events.annotated.OptionalSessionCallableMethod.call(OptionalSessionCallableMethod.java:68)
at org.eclipse.jetty.websocket.common.events.JettyAnnotatedEventDriver.onClose(JettyAnnotatedEventDriver.java:122)
at org.eclipse.jetty.websocket.common.events.AbstractEventDriver.incomingFrame(AbstractEventDriver.java:125)
at org.eclipse.jetty.websocket.common.WebSocketSession.incomingFrame(WebSocketSession.java:302)
at org.eclipse.jetty.websocket.common.extensions.AbstractExtension.nextIncomingFrame(AbstractExtension.java:163)
at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.nextIncomingFrame(PerMessageDeflateExtension.java:92)
at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.incomingFrame(PerMessageDeflateExtension.java:66)
at org.eclipse.jetty.websocket.common.extensions.ExtensionStack.incomingFrame(ExtensionStack.java:210)
at org.eclipse.jetty.websocket.common.Parser.notifyFrame(Parser.java:219)
at org.eclipse.jetty.websocket.common.Parser.parse(Parser.java:257)
at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.read(AbstractWebSocketConnection.java:500)
at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.onFillable(AbstractWebSocketConnection.java:409)
at org.eclipse.jetty.io.AbstractConnection$1.run(AbstractConnection.java:505)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:607)
at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:536)
at java.lang.Thread.run(Thread.java:744)
Caused by:
java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:859)
at java.util.ArrayList$Itr.next(ArrayList.java:831)
at TestServer.broadcast(TestServer.java:61)
at TestServer.removeClient(TestServer.java:45)
at ClientSocket.onClose(ClientSocket.java:22)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.eclipse.jetty.websocket.common.events.annotated.CallableMethod.call(CallableMethod.java:71)
at org.eclipse.jetty.websocket.common.events.annotated.OptionalSessionCallableMethod.call(OptionalSessionCallableMethod.java:68)
at org.eclipse.jetty.websocket.common.events.JettyAnnotatedEventDriver.onClose(JettyAnnotatedEventDriver.java:122)
at org.eclipse.jetty.websocket.common.events.AbstractEventDriver.incomingFrame(AbstractEventDriver.java:125)
at org.eclipse.jetty.websocket.common.WebSocketSession.incomingFrame(WebSocketSession.java:302)
at org.eclipse.jetty.websocket.common.extensions.AbstractExtension.nextIncomingFrame(AbstractExtension.java:163)
at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.nextIncomingFrame(PerMessageDeflateExtension.java:92)
at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.incomingFrame(PerMessageDeflateExtension.java:66)
at org.eclipse.jetty.websocket.common.extensions.ExtensionStack.incomingFrame(ExtensionStack.java:210)
at org.eclipse.jetty.websocket.common.Parser.notifyFrame(Parser.java:219)
at org.eclipse.jetty.websocket.common.Parser.parse(Parser.java:257)
at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.read(AbstractWebSocketConnection.java:500)
at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.onFillable(AbstractWebSocketConnection.java:409)
at org.eclipse.jetty.io.AbstractConnection$1.run(AbstractConnection.java:505)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:607)
at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:536)
at java.lang.Thread.run(Thread.java:744)
[2014-04-17 17:40:17.97] testclient7(94.246.80.30) error: Cannot call method public void ClientSocket#onClose(org.eclipse.jetty.websocket.api.Session, int, java.lang.String) with args: [org.eclipse.jetty.websocket.common.WebSocketSession, java.lang.Integer, <null>]
当同时断开所有客户端时,有时会对多个客户端发生这种情况。我很确定这与 ClientSocket 对象在进行广播的同时消失有关。
最佳答案
替换:
private ArrayList<ClientSocket> clients = new ArrayList<>();
与:
private List<ClientSocket> clients = new CopyOnWriteArrayList<>();
在您的广播方法中,只需使用 for each 语句:
for( ClientSocket client : clients )
{
if ( !client.equals(excludedClient) && client.session.isOpen() )
{
// Broadcast...
}
}
这将为您提供线程安全的迭代。 ConcurrentModificationException 发生是因为您在迭代列表的同时修改列表。 CopyOnWriteArrayList 在写入时复制内部数组,因此修改不会干扰迭代。这当然会增加一些额外的开销来改变列表,因此您可能想考虑另一种确保线程安全的方法,但我怀疑这足以满足您的需求。如果读取比写入更常见(通常是这种情况),那么“写入时复制”数据结构就可以了。
关于Java Jetty WebSocket 服务器 : Handle broadcasts with asynchronously disconnecting clients,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23137993/