spring - 将 SSE-Emitter 对象保存到 MongoDB/Redis 中,从数据库中获取它并通过它发送事件

标签 spring mongodb rest redis

我正在尝试使我的 REST API 无状态。为此,我需要的是将客户端的 SSE-Emitter 对象保存到 mongo 或 redis,以便其他实例可以集中访问它。

当前行为:

我能够将 SSE 发射器对象保存到 mongoDb,但我认为该对象正在以某种方式被修改,因此,在从 mongoDb 获取它之后,我无法向客户端发送事件。如果我在本地 map /列表中保存相同的发射器对象,则事件将成功发送。

预期行为:

我应该能够从 mongoDb 获取发射器对象并通过它向客户端发送 EventData。

源代码:

客户端订阅的 Controller :

@GetMapping("/memory/{userName}")
public SseEmitter handle(@PathVariable("userName") String userName) {
 SseEmitter emitter = new SseEmitter();
 try{
         MongoSession session = new MongoSession();
         session.setId(userName);
         session.setAttribute("emitter", emitter);
         mongoSessionRepo.save(session);
 }catch(Exception e){
         e.printStackTrace();
 }
 this.emitters.add(emitter);// adding it to list as well just for testing.
 emitter.onCompletion(() -> this.emitters.remove(emitter));
 emitter.onTimeout(() -> this.emitters.remove(emitter));

 return emitter;
}

在 mongoDb 中表示文档的 MongoSession 类:

package ch.rasc.sse;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.index.Indexed;
import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.session.ExpiringSession;

@Document(collection = "springMongoSession")
public class MongoSession implements ExpiringSession{

  public static final int DEFAULT_MAX_INACTIVE_INTERVAL_SECONDS = 1800;

  /**
  * MongoDB Object ID
  */
  @Indexed(unique = true)
  @Id
  private String id;

  public void setId(String id) {
    this.id = id;
  }
  /**
  * Session ID
  */
  public static final String KEY_SESSION_ID = "_id";

  /**
   * Serialized session attributes
   */
  private byte[] serializedAttributes;

  /**
  * Session attributes (not saved to MongoDB)
   */
  private Map<String,Object> attributes;

/**
 * Creation time (epoch in ms)
 */
  private long creationTime;

/**
 * Last accessed time (epoch in ms)
 */
  private long lastAccessedTime;

/**
 * Max inactive interval (sec)
 */
  private int maxInactiveIntervalInSeconds;

/**
 * Expire time (epoch in ms)
 */
  @Indexed
  private long expireTime;
  public static final String KEY_EXPIRE_TIME = "expireTime";

/**
 * Constructor
 */
 public MongoSession() {
    attributes = new HashMap<>();
    creationTime = System.currentTimeMillis();
    lastAccessedTime = creationTime;
    maxInactiveIntervalInSeconds = DEFAULT_MAX_INACTIVE_INTERVAL_SECONDS;
    updateExpireTime();
 }

/**
 * Constructor
 */
 public MongoSession(String sessionId) {
    this.id = sessionId;
    //this.sessionId = sessionId;
    attributes = new HashMap<>();
    creationTime = System.currentTimeMillis();
    lastAccessedTime = creationTime;
    maxInactiveIntervalInSeconds = DEFAULT_MAX_INACTIVE_INTERVAL_SECONDS;
    updateExpireTime();
 }


public String getId() {
    return id;
}

public void setLastAccessedTime(long lastAccessedTime) {
    this.lastAccessedTime = lastAccessedTime;
    updateExpireTime();
}


public long getCreationTime() {
    return creationTime;
}

public long getLastAccessedTime() {
    return lastAccessedTime;
}

public void setMaxInactiveIntervalInSeconds(int interval) {
    maxInactiveIntervalInSeconds = interval;
    updateExpireTime();
}

public int getMaxInactiveIntervalInSeconds() {
    return maxInactiveIntervalInSeconds;
}

protected long getExpireTime() {
    return expireTime;
}

private void updateExpireTime() {
    expireTime = lastAccessedTime + maxInactiveIntervalInSeconds * 1000;
}

public boolean isExpired() {
    long now = System.currentTimeMillis();
    return expireTime <= now;
}

public <T> T getAttribute(String attributeName) {
    return (T)attributes.get(attributeName);
}

public Set<String> getAttributeNames() {
    return attributes.keySet();
}

public void setAttribute(String attributeName, Object attributeValue) {

   attributes.put(attributeName, attributeValue);
}

public void removeAttribute(String attributeName) {
    attributes.remove(attributeName);
}
/**
 * Serialize session attributes
 */
 public void serializeAttributes() {
    try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
         ObjectOutputStream oos = new ObjectOutputStream(bos)) {
        oos.writeObject(attributes);
        oos.flush();
        serializedAttributes = bos.toByteArray();
    } catch (IOException e) {
        //e.printStackTrace();
        serializedAttributes = new byte[0];
    }
}
public void serializeAttributesThis(Object attributeValue) {
   try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos)) {
       oos.writeObject(attributeValue);
       oos.flush();
       serializedAttributes = bos.toByteArray();
   } catch (IOException e) {
       //e.printStackTrace();
       serializedAttributes = new byte[0];
   }
 }
   /**
   * Deserialize session attributes
   */
  public void deserializeAttributes() {
    try (ByteArrayInputStream bis = new ByteArrayInputStream(serializedAttributes);
         ObjectInputStream ois = new ObjectInputStream(bis))  {
        attributes = (Map<String,Object>)ois.readObject();
    } catch (IOException | ClassNotFoundException e) {
        //e.printStackTrace();
        attributes = new HashMap<>();
    }
  }
 }

根据以下请求,我想将事件数据发送回客户端:

    @RequestMapping("/qmevents/{sessionId}")
    public void readQmEvents(@PathVariable("sessionId") String userName)
    {
       try{
        System.out.println("Emitter Object: 
         "+mongoSessionRepo._getSession(userName));
        System.out.println("Emitter Object: 
  "+mongoSessionRepo._getSession(userName).getAttributeNames());
        System.out.println("Emitter Object: 
  "+mongoSessionRepo._getSession(userName)
    .getAttribute("emitter").toString());
    sessionRepo.getSessionAttributes(userName, "emitter");
    SseEmitter emitter =mongoSessionRepo._getSession(userName).
            getAttribute("emitter");
    MemoryMXBean memBean = ManagementFactory.getMemoryMXBean();
    MemoryUsage heap = memBean.getHeapMemoryUsage();
    MemoryUsage nonHeap = memBean.getNonHeapMemoryUsage();
    MemoryInfo mi = new MemoryInfo(heap.getUsed(), nonHeap.getUsed());
        mi.setForUserName("Event raised by QM");
        System.out.println("Emitter from map: 
     "+SSEControllerPerUser.emitters.get(0));
        SSEControllerPerUser.emitters.get(0).send(mi);
        //emitter.send(mi);
    }catch(Exception e){
        e.printStackTrace();
    }

} 

最佳答案

子类化 Spring SseEmitter(请参阅下文)并使用该组件,我已将此解决方案用于您描述的类似场景(服务器崩溃)。

public class SerializableSSE extends SseEmitter implements Serializable{

    public SerializableSSE() {
    }

    public SerializableSSE(Long timeout) {
        super(timeout);
    }
}

希望对您有所帮助!

关于spring - 将 SSE-Emitter 对象保存到 MongoDB/Redis 中,从数据库中获取它并通过它发送事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45036309/

相关文章:

java - 仅仅为了可测试性而使用额外的构造函数有多好?

javascript - 过滤 $lookup 的结果

javascript - 无法写入mongoDB : Inability to write in mongoDB:

rest - 为什么在 REST 实现中的 URI 中包含 Action 动词会违反协议(protocol)?

rest - Swagger/RAML(或任何类似的 REST API 规范)是否支持 REST 异步客户端回调?

javascript - API调用追踪链

java - 如何处理 RESTful Spring MVC Controller 中的验证错误和异常?

java - 未找到 Bean spring-saml

java - 选择性使用 Spring Security 的 CSRF 过滤器

mongodb - 获取日期范围内给定时间范围内的 Mongodb 文档