scala - Redis 与 Redisson 框架在 Scala 中的不可预测行为

标签 scala concurrency redis redisson

我将 Redis 和 Redisson 框架与 Scala 一起使用,在其中实现了分布式 Set 和 Publish\Subscribe 命令。当系统收到消息后访问 Set 时,框架的整个行为变得不可预测。以下是创建它的异常和源代码。有任何想法吗?

import org.redisson.RedissonClient
import org.redisson._

import org.redisson.core.{ RTopic, MessageListener }
import org.scalatest._ 
import org.slf4j.LoggerFactory
object RedissonTest {


   val config = new Config().setUseLinuxNativeEpoll( true )
   config.useSingleServer().setAddress("127.0.0.1:6379") 

   val redis = Redisson.create( config  )
   val set_test = java.util.UUID.randomUUID
   val system_topic = "system_bus"

   class RedissonTestSet extends FlatSpec with Matchers {

   val topic:RTopic[String] = redis.getTopic( system_topic )
   val redisSet:java.util.Set[String] =  redis.getSet( set_test.toString )

   it should "produce fucking exeption " in {

   val listener = new MessageListener[ String ] () {
    override def onMessage( chanel: String, message: String ) {
      checkSet
    }
  }
  topic.addListener( listener )


  for ( i <- 1 to 1000 ) {
    redisSet.add( i.toString  )
  }
  topic.publish( new String( "hey this is the bug" ) )

}

def checkSet {
  for ( i <-1 to 1000 ) {
    if ( redisSet.contains( i.toString ) ) {
    }
  }
}
}
}

异常:

io.netty.handler.codec.DecoderException: io.netty.util.concurrent.BlockingOperationException: DefaultPromise@447696da(incomplete)
        at io.netty.handler.codec.ReplayingDecoder.callDecode(ReplayingDecoder.java:431) ~[netty-codec-4.0.34.Final.jar:4.0.34.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244) ~[netty-codec-4.0.34.Final.jar:4.0.34.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:307) [netty-transport-4.0.34.Final.jar:4.0.34.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:293) [netty-transport-4.0.34.Final.jar:4.0.34.Final]
        at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) [netty-transport-4.0.34.Final.jar:4.0.34.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:307) [netty-transport-4.0.34.Final.jar:4.0.34.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:293) [netty-transport-4.0.34.Final.jar:4.0.34.Final]
        at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) [netty-transport-4.0.34.Final.jar:4.0.34.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:307) [netty-transport-4.0.34.Final.jar:4.0.34.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:293) [netty-transport-4.0.34.Final.jar:4.0.34.Final]
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:840) [netty-transport-4.0.34.Final.jar:4.0.34.Final]
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:830) [netty-transport-native-epoll-4.0.34.Final-linux-x86_64.jar:na]
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:348) [netty-transport-native-epoll-4.0.34.Final-linux-x86_64.jar:na]
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:264) [netty-transport-native-epoll-4.0.34.Final-linux-x86_64.jar:na]
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112) [netty-common-4.0.34.Final.jar:4.0.34.Final]
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) [netty-common-4.0.34.Final.jar:4.0.34.Final]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_77]
Caused by: io.netty.util.concurrent.BlockingOperationException: DefaultPromise@447696da(incomplete)
        at io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:391) ~[netty-common-4.0.34.Final.jar:4.0.34.Final]
        at io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:284) ~[netty-common-4.0.34.Final.jar:4.0.34.Final]
        at io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:33) ~[netty-common-4.0.34.Final.jar:4.0.34.Final]
        at org.redisson.command.CommandAsyncService.get(CommandAsyncService.java:84) ~[redisson-2.2.9.jar:na]
        at org.redisson.RedissonObject.get(RedissonObject.java:49) ~[redisson-2.2.9.jar:na]
        at org.redisson.RedissonSet.contains(RedissonSet.java:70) ~[redisson-2.2.9.jar:na]
        at com.web3.RedissonTest$RedissonTestSet$$anonfun$checkSet$1.apply$mcVI$sp(ReddisonSetPubSubTest.scala:45) ~[test-classes/:na]
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) ~[scala-library-2.10.4.jar:na]
        at com.web3.RedissonTest$RedissonTestSet.checkSet(ReddisonSetPubSubTest.scala:44) ~[test-classes/:na]
        at com.web3.RedissonTest$RedissonTestSet$$anonfun$1$$anon$1.onMessage(ReddisonSetPubSubTest.scala:30) ~[test-classes/:na]
        at com.web3.RedissonTest$RedissonTestSet$$anonfun$1$$anon$1.onMessage(ReddisonSetPubSubTest.scala:27) ~[test-classes/:na]
        at org.redisson.PubSubMessageListener.onMessage(PubSubMessageListener.java:73) ~[redisson-2.2.9.jar:na]
        at org.redisson.client.RedisPubSubConnection.onMessage(RedisPubSubConnection.java:68) ~[redisson-2.2.9.jar:na]
        at org.redisson.client.handler.CommandDecoder.handleMultiResult(CommandDecoder.java:277) ~[redisson-2.2.9.jar:na]
        at org.redisson.client.handler.CommandDecoder.decodeMulti(CommandDecoder.java:242) ~[redisson-2.2.9.jar:na]
        at org.redisson.client.handler.CommandDecoder.decode(CommandDecoder.java:217) ~[redisson-2.2.9.jar:na]
        at org.redisson.client.handler.CommandDecoder.decode(CommandDecoder.java:97) ~[redisson-2.2.9.jar:na]
        at io.netty.handler.codec.ReplayingDecoder.callDecode(ReplayingDecoder.java:376) ~[netty-codec-4.0.34.Final.jar:4.0.34.Final]
        ... 16 common frames omitted

最佳答案

这个错误在 Redisson 2.2.11 中修复

关于scala - Redis 与 Redisson 框架在 Scala 中的不可预测行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36248742/

相关文章:

redis - 一段时间后带有redis的spring-data-redis出现异常: could not get a resource from the pool

java - 带条件的任务设计模式

scala - 将数据帧转换为密集向量 Spark

scala - 折叠\/[A,A] 到 A

java - InputStream - 关闭、中断阻塞 read()

java - 并发收集先发生关系

scala - 如何在工作表中停止程序执行?

java - 使用 ConcurrentHashMap 缓存

php - Redis - 按请求缓存流程

带有 Jmeter 集成的 Redis 数据库 - 无法连接