使用 Akka 将 redis 与我的 Scala 应用程序集成,但由于某种原因它没有收到任何消息。我可以通过在命令行上打开 redis-cli 来确认 redis 确实有大量流量。
在 pSubscribe 之后,它收到:订阅了 * 且 count = 1
我的猜测是,这可能与 Akka 设置接收回调的方式有关。由于一些冲突,我不得不删除 scala-redis 库中的 Scala Actor 并用 Akka Actor 替换它们。
代码如下:
订阅者 Actor
class Subscriber(client: RedisClient) extends Actor {
var callback: PubSubMessage => Any = { m => }
def receive: Receive = {
case Subscribe(channels) =>
client.subscribe(channels.head, channels.tail: _*)(callback)
case pSubscribe(channels) =>
client.pSubscribe(channels.head, channels.tail: _*)(callback)
case pSubscribeAll(channels) =>
Logger.info("Subscribing to all channels")
client.pSubscribe(channels.head, channels.tail: _*)(callback)
case Register(cb) =>
Logger.info("Callback is registered")
callback = cb
case Unsubscribe(channels) =>
client.unsubscribe(channels.head, channels.tail: _*)
case UnsubscribeAll =>
client.unsubscribe
}
}
初始化订阅者
class RelaySub extends Actor {
// important config values
val system = ActorSystem("pubsub")
val conf = play.api.Play.current.configuration
val relayPubHost = conf.getString("relays.redis.host").get
val relayPubPort = conf.getInt("relays.redis.port").get
val rs = new RedisClient(relayPubHost, relayPubPort)
val s = system.actorOf(Props(new Subscriber(rs)))
s ! Register(callback)
s ! pSubscribeAll(Array("*"))
Logger.info("Engine Relay Subscriber has started up")
def receive: Receive = {
case Register(callback) =>
}
def callback(pubsub: PubSubMessage) = pubsub match {
case S(channel, no) => Logger.info("subscribed to " + channel + " and count = " + no)
case U(channel, no) => Logger.info("unsubscribed from " + channel + " and count = " + no)
case M(channel, msg) =>
msg match {
// exit will unsubscribe from all channels and stop subscription service
case "exit" =>
Logger.info("unsubscribe all ... no handler yet ;)")
// message "+x" will subscribe to channel x
case x if x startsWith "+" =>
Logger.info("subscribe to ... no handler yet ;)")
// message "-x" will unsubscribe from channel x
case x if x startsWith "-" =>
Logger.info("unsubscribe from ... no handler yet ;)")
// other message receive
case x =>
Logger.info("Engine: received redis message")
val channelVars = channel.split(".").toArray[String]
if(channelVars(0)!=Engine.instanceID)
channelVars(1) match {
case "relay" =>
EngineSyncLocal.constructRelay(channel, msg)
case _ =>
Logger.error("Engine: received unknown redis message")
}
}
}
}
感谢您的帮助!
最佳答案
我发现了问题。这似乎是 scala-redis 客户端中的一个错误。
我在消费者类中添加了一些日志记录,并开始收到Engine:奇怪的消息
错误,这意味着它无法识别传入流量。我将联系作者并提出拉取请求。
代码:
class Consumer(fn: PubSubMessage => Any) extends Runnable {
def start () {
val myThread = new Thread(this) ;
myThread.start() ;
}
def run {
whileTrue {
asList match {
case Some(Some(msgType) :: Some(channel) :: Some(data) :: Nil) =>
Logger.info("Engine: redis traffic")
msgType match {
case "subscribe" | "psubscribe" => fn(S(channel, data.toInt))
case "unsubscribe" if (data.toInt == 0) =>
fn(U(channel, data.toInt))
break
case "punsubscribe" if (data.toInt == 0) =>
fn(U(channel, data.toInt))
break
case "unsubscribe" | "punsubscribe" =>
fn(U(channel, data.toInt))
case "message" | "pmessage" =>
fn(M(channel, data))
case x => throw new RuntimeException("unhandled message: " + x)
}
case _ => Logger.error("Engine: weird redis message")
}
}
}
}
关于Scala-redis 订阅 * 但收到零条消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10789325/