Java Jetty WebSocket 服务器 : Handle broadcasts with asynchronously disconnecting clients

标签 java asynchronous websocket jetty

背景

我正在研究一个概念证明,它由一个使用 websocket 通信的服务器-客户端系统组成。我使用一个小型 jetty 解决方案(jetty-all-9.1.3.v20140225.jar 和 servlet-api-3.1.jar)。我已具备 PoC 所需的大部分基本功能。

我有 2 个类:

  • TestServer(具有创建服务器实例的主要功能,请参见代码)
  • ClientSocket(为每个客户端实例化的 WebSocket 对象)

问题

我想讨论的问题与广播客户端断开连接有关。服务器将 ClientSockets 的所有实例保存在一个数组中,ClientSockets 在其“onConnect”函数中添加到该数组。

系统稍后会将广播限制为客户端组,但对于 PoC,所有连接的客户端都应获得广播。如果一个客户端断开连接,我想向所有其他客户端发送通知(“myClient has disconnected.”或类似的)。

为了做到这一点,我在服务器中实现了一个广播功能,循环遍历客户端列表,将此信息发送给所有已连接的客户端,但断开连接的客户端除外。此功能还用于通知所有客户端有关其他事情的信息,例如新连接,如果客户端在有人连接或广播某些内容的同时断开连接,则很可能会在此处出现此问题。

这个问题很容易通过连接几个(10+)客户端(我在 js 中做)然后同时断开它们来产生。如果我这样做,我总是会遇到并发错误,对于极少数客户端 (2-3),它有时会起作用,具体取决于我猜的时间。

问题

考虑到任何客户端都可以随时(异步)断开连接,我应该如何处理向所有其他客户端广播的任务?我可以在不产生异常的情况下执行此操作吗?由于它是异步的,所以除了处理发生的异常之外,我看不到任何其他方法。非常感谢任何建议。

代码

TestServer.java

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;

import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.websocket.server.WebSocketHandler;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;

public class TestServer {
    private static final TestServer testServer = new TestServer();
    private ArrayList<ClientSocket> clients = new ArrayList<>();

    public static void main(String[] args) throws Exception {
        int port = 8080;
        Server server = new Server(port);
        WebSocketHandler wsHandler = new WebSocketHandler() {
            @Override
            public void configure(WebSocketServletFactory factory) {
                factory.register(ClientSocket.class);
            }
        };
        server.setHandler(wsHandler);
        System.out.println("Starting server on port " + port + ".");
        server.start();
        server.join();
    }

    public static TestServer getServer() {
        return testServer;
    }

    public void addClient(ClientSocket client) {
        this.clients.add(client);
    }

    public void removeClient(ClientSocket client) {
        this.clients.remove(client);
        this.broadcast("disconnect " + client.id, client);  
    }

    public void broadcast(String message, ClientSocket excludedClient) {
        log("Sending to all clients: " + message);
        for (ClientSocket cs : this.clients) {
            if (!cs.equals(excludedClient) && cs.session.isOpen() && cs != null) {
                try {
                    cs.session.getRemote().sendStringByFuture(message);
                } catch (Exception e) {
                    log("Error when broadcasting to " + cs.id + " (" + cs.address + "):");
                    log(e.getMessage());
                }
            }
        }
    }

由于如果您在此过程中干预数组,以这种方式遍历数组通常是行不通的,所以我也尝试了这个广播函数:

    public void broadcast(String message, ClientSocket excludedClient) {
        log("Sending to all clients: " + message);
        Iterator<ClientSocket> cs = this.clients.iterator();
        while (cs.hasNext()) {
            ClientSocket client = cs.next();
            if (client != null) {
                if (!client.equals(excludedClient) && client.session.isOpen()) {
                try {
                    client.session.getRemote().sendStringByFuture(message);
                } catch (Exception e) {
                    log("Error when broadcasting to " + client.id + " (" + client.address + "):");
                    log(e.getMessage());
                }
            }
        }
    }

虽然它并没有更好地工作,因为问题是如果另一个 ClientSocket 对象断开连接,因为第一个正在广播它的断开连接,那么数组可能会被异步干预。

ClientSocket.java

import java.io.IOException;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;

@WebSocket(maxIdleTime=0)
public class ClientSocket {
    TestServer server = TestServer.getServer();
    Session session;
    String id;
    String address;

    @OnWebSocketClose
    public void onClose(Session session, int statusCode, String reason) {
        server.log("Disconnected: " + this.id + "(" + this.address + ")" + " (statusCode=" + statusCode + ", reason=" + reason + ")");
        server.removeClient(this);
    }

    @OnWebSocketError
    public void onError(Session session, Throwable t) {
        server.log(this.id + "(" + this.address + ") error: " + t.getMessage());
    }

    @OnWebSocketConnect
    public void onConnect(Session session) {
        this.session = session;
        this.address = this.session.getRemoteAddress().getAddress().toString().substring(1);
        this.id = this.address; //Until user is registered their id is their IP
        server.log("New connection: " + this.address);
        server.addClient(this);
        try {
            session.getRemote().sendString("Hello client with address " + this.address + "!");
        } catch (IOException e) {
            server.log("Error in onConnect for " + this.id + "(" + this.address + "): " + e.getMessage());
        }
    }

    @OnWebSocketMessage
    public void onMessage(Session session, String message) {
        server.log("Received from " + this.id + "(" + this.address + "): " + message);
        String replyMessage;
        String[] commandList = message.split("\\s+");
        switch (commandList[0].toLowerCase()) {
            case "register":
                if (commandList.length > 1) {
                    this.id = commandList[1];
                    replyMessage = "Registered on server as " + this.id;
                    server.broadcast(this.id + " has connected", this);
                } else {
                    replyMessage = "Incorrect register message";
                }
                break;
            default:
                replyMessage = "echo " + message;
                break;
        }
        server.log("Sending to " + this.id + "(" + this.address + "): " + replyMessage);
        try {
            session.getRemote().sendString(replyMessage);
        } catch (IOException e) {
            server.log("Error during reply in onMessage for " + this.id + "(" + this.address + "): " + e.getMessage());
        }
    }
}

为了完整起见,我粘贴了整个类,尽管我删除了 onMessage 开关中的一些情况。然而,需要注意的部分是 onConnect 和 onClose 函数,它们将在服务器的客户端数组中填充和删除客户端。

错误日志

[2014-04-17 17:40:17.961] Sending to all clients: disconnect testclient4
2014-04-17 17:40:17.962:WARN:ClientSocket:qtp29398564-17: Unhandled Error (closing connection)
org.eclipse.jetty.websocket.api.WebSocketException: Cannot call method public void ClientSocket#onClose(org.eclipse.jetty.websocket.api.Session, int, java.lang.String) with args: [org.eclipse.jetty.websocket.common.WebSocketSession, java.lang.Integer, <null>]
        at org.eclipse.jetty.websocket.common.events.annotated.CallableMethod.call(CallableMethod.java:99)
        at org.eclipse.jetty.websocket.common.events.annotated.OptionalSessionCallableMethod.call(OptionalSessionCallableMethod.java:68)
        at org.eclipse.jetty.websocket.common.events.JettyAnnotatedEventDriver.onClose(JettyAnnotatedEventDriver.java:122)
        at org.eclipse.jetty.websocket.common.events.AbstractEventDriver.incomingFrame(AbstractEventDriver.java:125)
        at org.eclipse.jetty.websocket.common.WebSocketSession.incomingFrame(WebSocketSession.java:302)
        at org.eclipse.jetty.websocket.common.extensions.AbstractExtension.nextIncomingFrame(AbstractExtension.java:163)
        at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.nextIncomingFrame(PerMessageDeflateExtension.java:92)
        at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.incomingFrame(PerMessageDeflateExtension.java:66)
        at org.eclipse.jetty.websocket.common.extensions.ExtensionStack.incomingFrame(ExtensionStack.java:210)
        at org.eclipse.jetty.websocket.common.Parser.notifyFrame(Parser.java:219)
        at org.eclipse.jetty.websocket.common.Parser.parse(Parser.java:257)
        at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.read(AbstractWebSocketConnection.java:500)
        at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.onFillable(AbstractWebSocketConnection.java:409)
        at org.eclipse.jetty.io.AbstractConnection$1.run(AbstractConnection.java:505)
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:607)
        at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:536)
        at java.lang.Thread.run(Thread.java:744)
Caused by:
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.eclipse.jetty.websocket.common.events.annotated.CallableMethod.call(CallableMethod.java:71)
        at org.eclipse.jetty.websocket.common.events.annotated.OptionalSessionCallableMethod.call(OptionalSessionCallableMethod.java:68)
        at org.eclipse.jetty.websocket.common.events.JettyAnnotatedEventDriver.onClose(JettyAnnotatedEventDriver.java:122)
        at org.eclipse.jetty.websocket.common.events.AbstractEventDriver.incomingFrame(AbstractEventDriver.java:125)
        at org.eclipse.jetty.websocket.common.WebSocketSession.incomingFrame(WebSocketSession.java:302)
        at org.eclipse.jetty.websocket.common.extensions.AbstractExtension.nextIncomingFrame(AbstractExtension.java:163)
        at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.nextIncomingFrame(PerMessageDeflateExtension.java:92)
        at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.incomingFrame(PerMessageDeflateExtension.java:66)
        at org.eclipse.jetty.websocket.common.extensions.ExtensionStack.incomingFrame(ExtensionStack.java:210)
        at org.eclipse.jetty.websocket.common.Parser.notifyFrame(Parser.java:219)
        at org.eclipse.jetty.websocket.common.Parser.parse(Parser.java:257)
        at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.read(AbstractWebSocketConnection.java:500)
        at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.onFillable(AbstractWebSocketConnection.java:409)
        at org.eclipse.jetty.io.AbstractConnection$1.run(AbstractConnection.java:505)
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:607)
        at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:536)
        at java.lang.Thread.run(Thread.java:744)
Caused by:
java.util.ConcurrentModificationException
        at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:859)
        at java.util.ArrayList$Itr.next(ArrayList.java:831)
        at TestServer.broadcast(TestServer.java:61)
        at TestServer.removeClient(TestServer.java:45)
        at ClientSocket.onClose(ClientSocket.java:22)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.eclipse.jetty.websocket.common.events.annotated.CallableMethod.call(CallableMethod.java:71)
        at org.eclipse.jetty.websocket.common.events.annotated.OptionalSessionCallableMethod.call(OptionalSessionCallableMethod.java:68)
        at org.eclipse.jetty.websocket.common.events.JettyAnnotatedEventDriver.onClose(JettyAnnotatedEventDriver.java:122)
        at org.eclipse.jetty.websocket.common.events.AbstractEventDriver.incomingFrame(AbstractEventDriver.java:125)
        at org.eclipse.jetty.websocket.common.WebSocketSession.incomingFrame(WebSocketSession.java:302)
        at org.eclipse.jetty.websocket.common.extensions.AbstractExtension.nextIncomingFrame(AbstractExtension.java:163)
        at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.nextIncomingFrame(PerMessageDeflateExtension.java:92)
        at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.incomingFrame(PerMessageDeflateExtension.java:66)
        at org.eclipse.jetty.websocket.common.extensions.ExtensionStack.incomingFrame(ExtensionStack.java:210)
        at org.eclipse.jetty.websocket.common.Parser.notifyFrame(Parser.java:219)
        at org.eclipse.jetty.websocket.common.Parser.parse(Parser.java:257)
        at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.read(AbstractWebSocketConnection.java:500)
        at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.onFillable(AbstractWebSocketConnection.java:409)
        at org.eclipse.jetty.io.AbstractConnection$1.run(AbstractConnection.java:505)
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:607)
        at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:536)
        at java.lang.Thread.run(Thread.java:744)
[2014-04-17 17:40:17.97] testclient7(94.246.80.30) error: Cannot call method public void ClientSocket#onClose(org.eclipse.jetty.websocket.api.Session, int, java.lang.String) with args: [org.eclipse.jetty.websocket.common.WebSocketSession, java.lang.Integer, <null>]

当同时断开所有客户端时,有时会对多个客户端发生这种情况。我很确定这与 ClientSocket 对象在进行广播的同时消失有关。

最佳答案

替换:

private ArrayList<ClientSocket> clients = new ArrayList<>();

与:

private List<ClientSocket> clients = new CopyOnWriteArrayList<>();

在您的广播方法中,只需使用 for each 语句:

for( ClientSocket client : clients )
{
  if ( !client.equals(excludedClient) && client.session.isOpen() )
  {
    // Broadcast...
  }
}

这将为您提供线程安全的迭代。 ConcurrentModificationException 发生是因为您在迭代列表的同时修改列表。 CopyOnWriteArrayList 在写入时复制内部数组,因此修改不会干扰迭代。这当然会增加一些额外的开销来改变列表,因此您可能想考虑另一种确保线程安全的方法,但我怀疑这足以满足您的需求。如果读取比写入更常见(通常是这种情况),那么“写入时复制”数据结构就可以了。

关于Java Jetty WebSocket 服务器 : Handle broadcasts with asynchronously disconnecting clients,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23137993/

相关文章:

asp.net-mvc - F# 中的异步 Controller 操作

java - Micronaut 是否需要异步编程?

java - 使用 Spring WebSocket 的 SimpMessagingTemplate 进行多端点配置

javascript - 分块 WebSocket 传输

java - 如何从字符串中分离出多个不同的单词(Java)

c# - 非异步执行路径能否在 "async"方法中返回同步结果

java - 从 opendaylight-startup-archetype 构建的 OpenDaylight Oxygen Deploy 应用程序

html - 使用 Node 将视频流式传输到 HTML5

使用 import java.io.Console 时出现 java.lang.NullPointerException;

java - 在mockito测试用例中使用@Value属性