jakarta-ee - 当数据库中的某些内容被修改时,通过 WebSockets 仅通知特定用户

标签 jakarta-ee websocket java-ee-7 real-time-updates

为了通过 WebSockets 通知所有用户,当在选定的 JPA 实体中修改某些内容时,我使用以下基本方法。

@ServerEndpoint("/Push")
public class Push {

    private static final Set<Session> sessions = new LinkedHashSet<Session>();

    @OnOpen
    public void onOpen(Session session) {
        sessions.add(session);
    }

    @OnClose
    public void onClose(Session session) {
        sessions.remove(session);
    }

    private static JsonObject createJsonMessage(String message) {
        return JsonProvider.provider().createObjectBuilder().add("jsonMessage", message).build();
    }

    public static void sendAll(String text) {
        synchronized (sessions) {
            String message = createJsonMessage(text).toString();

            for (Session session : sessions) {
                if (session.isOpen()) {
                    session.getAsyncRemote().sendText(message);
                }
            }
        }
    }
}

当修改选定的 JPA 实体时,将引发适当的 CDI 事件,该事件将由以下 CDI 观察者观察。
@Typed
public final class EventObserver {

    private EventObserver() {}

    public void onEntityChange(@Observes EntityChangeEvent event) {
        Push.sendAll("updateModel");
    }
}

观察者/消费者调用静态方法 Push#sendAll()在 WebSockets 端点中定义,该端点将 JSON 消息作为通知发送给所有关联的用户/连接。
sendAll()里面的逻辑当只通知选定的用户时,需要以某种方式修改方法。
  • 仅通知负责修改相关实体的用户(可能是管理员用户或注册用户,只有在他们成功登录后才能修改某些内容)。
  • 仅通知特定用户(不是全部)。 “特定”是指,例如,当一个帖子在本网站上被投票时,只有帖子所有者会收到通知(该帖子可能被任何其他具有足够权限的用户投票)。

  • 当建立初始握手时,HttpSession可以按照 this 中所述进行访问回答但仍不足以用两颗子弹完成上述任务。由于在第一次握手请求时它是可用的,之后为该 session 设置的任何属性在服务器端点中都将不可用,即,在建立握手后设置的任何 session 属性都将不可用。

    如上所述,仅通知选定用户的最可接受/最规范的方式是什么? sendAll() 中的一些条件语句方法或其他地方是必需的。看来它必须做的不仅仅是用户的HttpSession .

    我使用 GlassFish Server 4.1/Java EE 7。

    最佳答案

    session ?

    Since it is available when the first handshake request is made, any attribute set to that session afterwards will not be available in the server endpoint i.e. in other words, any session attribute set after a handshake is established will not be available



    看来你被“ session ”这个词的歧义给咬了。 session 的生命周期取决于上下文和客户端。 websocket (WS) session 与 HTTP session 的生命周期不同。就像 EJB session 与 HTTP session 没有相同的生命周期一样。就像传统的 Hibernate session 没有与 HTTP session 相同的生命周期一样。等等。您可能已经了解的 HTTP session 在 How do servlets work? Instantiation, sessions, shared variables and multithreading 中进行了说明。 EJB session 在这里解释 JSF request scoped bean keeps recreating new Stateful session beans on every request?

    WebSocket 生命周期

    WS session 与 HTML 文档表示的上下文相关联。客户端基本上是 JavaScript 代码。当 JavaScript 执行 new WebSocket(url) 时,WS session 开始。当 JavaScript 在 close() 实例上显式调用 WebSocket 函数时,或者当关联的 HTML 文档由于页面导航(单击链接/书签或修改浏览器地址栏中的 URL)或页面刷新而被卸载时,WS session 停止,或浏览器选项卡/窗口关闭。请注意,您可以在同一个 DOM 中创建多个 WebSocket 实例,通常每个实例具有不同的 URL 路径或查询字符串参数。

    每次 WS session 开始时(即每次当 JavaScript 执行 var ws = new WebSocket(url); 时),这将触发握手请求,因此您可以通过以下 Configurator 类访问关联的 HTTP session ,正如您已经发现的:
    public class ServletAwareConfigurator extends Configurator {
    
        @Override
        public void modifyHandshake(ServerEndpointConfig config, HandshakeRequest request, HandshakeResponse response) {
            HttpSession httpSession = (HttpSession) request.getHttpSession();
            config.getUserProperties().put("httpSession", httpSession);
        }
    
    }
    

    因此,这不会像您所期望的那样在每个 HTTP session 或 HTML 文档中只调用一次。每次创建 new WebSocket(url) 时都会调用它。

    然后将创建 @ServerEndpoint 注释类的全新实例,并调用其 @OnOpen 注释方法。如果您熟悉 JSF/CDI 托管 bean,只需将该类视为 @ViewScoped 并将方法视为 @PostConstruct
    @ServerEndpoint(value="/push", configurator=ServletAwareConfigurator.class)
    public class PushEndpoint {
    
        private Session session;
        private EndpointConfig config;
    
        @OnOpen
        public void onOpen(Session session, EndpointConfig config) {
            this.session = session;
            this.config = config;
        }
    
        @OnMessage
        public void onMessage(String message) {
            // ...
        }
    
        @OnError
        public void onError(Throwable exception) {
            // ...
        }
    
        @OnClose
        public void onClose(CloseReason reason) {
            // ...
        }
    
    }
    

    请注意,此类不同于例如不是应用程序范围的 servlet。它基本上是 WS session 范围的。所以每个新的 WS session 都有自己的实例。这就是为什么您可以安全地将 SessionEndpointConfig 分配为实例变量的原因。根据类设计(例如抽象模板等),如有必要,您可以将 Session 添加回所有其他 onXxx 方法的第一个参数。这也受支持。

    当 JavaScript 执行 @OnMessage 时,将调用 webSocket.send("some message") 注释的方法。 WS session 关闭时将调用 @OnClose 注释的方法。如有必要,确切的关闭原因可以通过 CloseReason.CloseCodes 枚举可用的关闭原因代码来确定。抛出异常时会调用 @OnError 带注释的方法,通常作为 WS 连接上的 IO 错误(管道损坏、连接重置等)。

    通过登录用户收集 WS session

    回到你只通知特定用户的具体功能需求,在上面的解释之后你应该明白你可以安全地依赖 modifyHandshake() 从关联的 HTTP session 中提取登录用户,只要 new WebSocket(url) 在用户已登录。
    public class UserAwareConfigurator extends Configurator {
    
        @Override
        public void modifyHandshake(ServerEndpointConfig config, HandshakeRequest request, HandshakeResponse response) {
            HttpSession httpSession = (HttpSession) request.getHttpSession();
            User user = (User) httpSession.getAttribute("user");
            config.getUserProperties().put("user", user);
        }
    
    }
    

    在带有 @ServerEndpoint(configurator=UserAwareConfigurator.class) 的 WS 端点类中,您可以在 @OnOpen 注释方法中获得它,如下所示:
    @OnOpen
    public void onOpen(Session session, EndpointConfig config) {
        User user = (User) config.getUserProperties().get("user");
        // ...
    }
    

    您应该在应用范围内收集它们。您可以在端点类的 static 字段中收集它们。或者,更好的是,如果 WS 端点中的 CDI 支持在您的环境中没有被破坏(在 WildFly 中工作,而不是在 Tomcat+Weld 中,不确定 GlassFish),那么只需将它们收集在应用程序范围的 CDI 托管 bean 中,您又将其 @Inject端点类。

    User 实例不是 null 时(即当用户登录时),请记住一个用户可以有多个 WS session 。因此,您基本上需要将它们收集在 Map<User, Set<Session>> 结构中,或者可能是更细粒度的映射,通过用户 ID 或组/角色来映射它们,毕竟这样可以更轻松地找到特定用户。这一切都取决于最终的要求。这里至少有一个使用应用程序范围的 CDI 托管 bean 的启动示例:
    @ApplicationScoped
    public class PushContext {
    
        private Map<User, Set<Session>> sessions;
    
        @PostConstruct
        public void init() {
            sessions = new ConcurrentHashMap<>();
        }
    
        void add(Session session, User user) {
            sessions.computeIfAbsent(user, v -> ConcurrentHashMap.newKeySet()).add(session);
        }
    
        void remove(Session session) {
            sessions.values().forEach(v -> v.removeIf(e -> e.equals(session)));
        }
    
    }
    
    @ServerEndpoint(value="/push", configurator=UserAwareConfigurator.class)
    public class PushEndpoint {
    
        @Inject
        private PushContext pushContext;
    
        @OnOpen
        public void onOpen(Session session, EndpointConfig config) {
            User user = (User) config.getUserProperties().get("user");
            pushContext.add(session, user);
        }
    
        @OnClose
        public void onClose(Session session) {
            pushContext.remove(session);
        }
    
    }
    

    最后,您可以在 PushContext 中向特定用户发送消息,如下所示:
    public void send(Set<User> users, String message) {
        Set<Session> userSessions;
    
        synchronized(sessions) {
            userSessions = sessions.entrySet().stream()
                .filter(e -> users.contains(e.getKey()))
                .flatMap(e -> e.getValue().stream())
                .collect(Collectors.toSet());
        }
    
        for (Session userSession : userSessions) {
            if (userSession.isOpen()) {
                userSession.getAsyncRemote().sendText(message);
            }
        }
    }
    
    PushContext 作为 CDI 托管 bean 具有额外的优势,它可以注入(inject)任何其他 CDI 托管 bean,从而更容易集成。

    与关联用户一起触发 CDI 事件

    在您的 EntityListener 中,您最有可能根据之前的相关问题 Real time updates from database using JSF/Java EE 触发 CDI 事件,您已经拥有更改的实体,因此您应该能够通过模型中的关系找到与其关联的用户。

    Notify only the user who is responsible for modifying the entity in question (it may be an admin user or a registered user who can modify something only after their successful login)


    @PostUpdate
    public void onChange(Entity entity) {
        Set<User> editors = entity.getEditors();
        beanManager.fireEvent(new EntityChangeEvent(editors));
    }
    

    Notify only specific user/s (not all). "Specific" means for example, when a post is voted up on this site, only the post owner is notified (the post may be voted up by any other user with sufficient privileges).


    @PostUpdate
    public void onChange(Entity entity) {
        User owner = entity.getOwner();
        beanManager.fireEvent(new EntityChangeEvent(Collections.singleton(owner)));
    }
    

    然后在 CDI 事件观察器中,传递它:
    public void onEntityChange(@Observes EntityChangeEvent event) {
        pushContext.send(event.getUsers(), "message");
    }
    

    也可以看看:
  • RFC6455 - The WebSocket Protocol(描述ws://协议(protocol))
  • W3 - The WebSocket API(描述JS WebSocket接口(interface))
  • MDN - Writing WebSocket client application(描述如何在客户端使用WS API)
  • Java EE 7 tutorial - WebSocket(描述 javax.websocket API 及其使用方法)
  • 关于jakarta-ee - 当数据库中的某些内容被修改时,通过 WebSockets 仅通知特定用户,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32426674/

    相关文章:

    jsf - javax.faces.INTERPRET_EMPTY_STRING_SUBMITTED_VALUES_AS_NULL 自 Java EE 7/EL 3.0 起不再工作

    jakarta-ee - 打包存档和分解存档之间的区别

    Java EE - 查找 session 大小

    android - NodeJS + SocketIO 推送到移动应用

    Java EE 7 批处理 API (JSR-352) : it's possible to stop a single step and not all the job?

    java - 无法在eclipse中添加Servlet

    java - http请求对电池生命周期的影响

    java - 在应用程序服务器中调用远程 Bean 与本地 Bean

    amazon-web-services - 使用 CloudFormation 时,AWS Lambda 函数缺少由 websocket 网关触发

    javascript - stompjs xhr_streaming 超时