Scala-redis 订阅 * 但收到零条消息

标签 scala redis akka

使用 Akka 将 redis 与我的 Scala 应用程序集成,但由于某种原因它没有收到任何消息。我可以通过在命令行上打开 redis-cli 来确认 redis 确实有大量流量。

在 pSubscribe 之后,它收到:订阅了 * 且 count = 1

我的猜测是,这可能与 Akka 设置接收回调的方式有关。由于一些冲突,我不得不删除 scala-redis 库中的 Scala Actor 并用 Akka Actor 替换它们。

代码如下:

订阅者 Actor

class Subscriber(client: RedisClient) extends Actor {
  var callback: PubSubMessage => Any = { m => }

  def receive: Receive = { 
    case Subscribe(channels) =>
      client.subscribe(channels.head, channels.tail: _*)(callback)

    case pSubscribe(channels) =>
      client.pSubscribe(channels.head, channels.tail: _*)(callback)

    case pSubscribeAll(channels) =>
      Logger.info("Subscribing to all channels")
      client.pSubscribe(channels.head, channels.tail: _*)(callback)

    case Register(cb) =>
      Logger.info("Callback is registered")
      callback = cb

    case Unsubscribe(channels) =>
      client.unsubscribe(channels.head, channels.tail: _*)

    case UnsubscribeAll =>
      client.unsubscribe
  }
}

初始化订阅者

class RelaySub extends Actor {

  // important config values
  val system = ActorSystem("pubsub")
  val conf = play.api.Play.current.configuration
  val relayPubHost = conf.getString("relays.redis.host").get
  val relayPubPort = conf.getInt("relays.redis.port").get

  val rs = new RedisClient(relayPubHost, relayPubPort)
  val s = system.actorOf(Props(new Subscriber(rs)))
  s ! Register(callback) 
  s ! pSubscribeAll(Array("*"))
  Logger.info("Engine Relay Subscriber has started up")

  def receive: Receive = {      
    case Register(callback) =>
  }

  def callback(pubsub: PubSubMessage) = pubsub match {
    case S(channel, no) => Logger.info("subscribed to " + channel + " and count = " + no)
      case U(channel, no) => Logger.info("unsubscribed from " + channel + " and count = " + no)
      case M(channel, msg) => 
        msg match {
          // exit will unsubscribe from all channels and stop subscription service
          case "exit" => 
            Logger.info("unsubscribe all ... no handler yet ;)")

          // message "+x" will subscribe to channel x
          case x if x startsWith "+" => 
            Logger.info("subscribe to ... no handler yet ;)")

          // message "-x" will unsubscribe from channel x
          case x if x startsWith "-" => 
            Logger.info("unsubscribe from ... no handler yet ;)")

          // other message receive
          case x => 
            Logger.info("Engine: received redis message")
            val channelVars = channel.split(".").toArray[String]
            if(channelVars(0)!=Engine.instanceID)
                channelVars(1) match {
                  case "relay" => 
                    EngineSyncLocal.constructRelay(channel, msg)
                  case _ => 
                    Logger.error("Engine: received unknown redis message")
                }
        }
  }
}

感谢您的帮助!

最佳答案

我发现了问题。这似乎是 scala-redis 客户端中的一个错误。

我在消费者类中添加了一些日志记录,并开始收到Engine:奇怪的消息错误,这意味着它无法识别传入流量。我将联系作者并提出拉取请求。

代码:

class Consumer(fn: PubSubMessage => Any) extends Runnable {

    def start () {
      val myThread = new Thread(this) ;
      myThread.start() ;
    }

    def run {
      whileTrue {
        asList match {
          case Some(Some(msgType) :: Some(channel) :: Some(data) :: Nil) =>
            Logger.info("Engine: redis traffic")
            msgType match {
              case "subscribe" | "psubscribe" => fn(S(channel, data.toInt))
              case "unsubscribe" if (data.toInt == 0) => 
                fn(U(channel, data.toInt))
                break
              case "punsubscribe" if (data.toInt == 0) => 
                fn(U(channel, data.toInt))
                break
              case "unsubscribe" | "punsubscribe" => 
                fn(U(channel, data.toInt))
              case "message" | "pmessage" => 
                fn(M(channel, data))
              case x => throw new RuntimeException("unhandled message: " + x)
            }
          case _ => Logger.error("Engine: weird redis message")
        }
      }
    }
  }

关于Scala-redis 订阅 * 但收到零条消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10789325/

相关文章:

redis - Redis集群总线端口连接上没有保持事件状态

java - Akka 消息为空

scala - 在 SBT-Scalatra 项目中添加本地依赖 jar

c# - Redis 因内存不足而中止

c# - Redis快速插入5000万条记录的方法

java - 试图将 java 文件转换为字节串以便在流中使用

用于分布式计算的 Java 8 MapReduce

scala - 在 Scala 中将 BitSet 设置为 Set[Int] 或反之亦然

scala - 结合镜片的集合

Scala:添加到 Map 的奇怪现象