redis - 在多线程环境中从 Redis 列表中弹出值会将重复值返回给少数线程

标签 redis spring-batch

数据库中有一些记录在 cId 和 mId 之间有关联,我通过一些复杂的数据库查询获得这些记录。 Use Case 是获取 cId & mId 组合对应的 dataId 的完整列表,并推送到 redis。 我们从输入的 csv 文件中获取 cId 和 mId 组合到我的批处理作业。文件中会有多条记录对应同一个组合 批处理作业配置了 10 个并行线程和一个读取记录的线程。我们想要的只是当线程从文件中读取特定组合时,我们从数据库中获取所有记录并将它们上传到 Redis 以解决 2 个问题: 1.数据库命中 2. 并发问题,即两个线程不应该根据 cId 和 mId 组合从数据库中获取相同的记录

private static Long DEFAULT_REDIS_OBJECTID = 0L;


        public DataObject getDataObject(C cId, M mId) {
                String redisListKey = new StringBuilder().append(LIST-).append(cId)
                        .append("-").append(mId).toString();
    if (BooleanUtils.isFalse(redisTemplate.hasKey(redisListKey))) {
                pushRedisData(cId, mId, redisListKey);
            }
        }
    Long dataId = redisTemplate.opsForList().leftPop(redisListKey);
            if (Objects.isNull(dataId ) || 0L.equals(dataId )) {
//    Again creating the key in order to make sure another thread request for this should not shouldn't go in the db as we know we dont have data in db for this combination         
redisTemplate.opsForList().leftPush(redisListKey, 0L);
                //create a new dataObject and return it
            } else {
                //Get the dataObject based on dataId and return it
            }


    public synchronized void pushRedisData(Long cId, Long mId, String redisListKey) {
            if (BooleanUtils.isFalse(redisTemplate.hasKey(redisListKey))) {
                List<Long> dataToPush = dataService.getDataIdListFromCIdAndMIdCombination(cId,
                        mId);
                if (CollectionUtils.isNotEmpty(dataToPush )) {
                    redisTemplate.opsForList().leftPushAll(redisListKey, dataToPush );
                    redisTemplate.expire(redisListKey, 5, TimeUnit.HOURS);
                } else {
                    if (redisTemplate.opsForList().size(redisListKey) == 0) {
                        redisTemplate.opsForList().leftPush(redisListKey, DEFAULT_REDIS_ITEMID);
                    }

                }

我做了一个synchronized方法把记录push到redis,这样只有一个线程能够把数据发布到key对应的redis,其他线程只是从redis中pop数据。 如果在数据库中没有找到任何 cId 和 mId 组合的记录,那么我将在 redis 上创建默认值为 0 的键,这样具有这种组合的线程不应该进行数据库调用。

问题: 当我在文件中执行包含 1000 条记录的批处理作业并且配置了 10 个线程来处理这些记录时,我看到 3-5 个线程获得了已分配给其他线程的重复 dataId threads and object is under process 导致具有重复 dataId 的线程出现 Stalestate 异常。 我还发现在前几条记录的作业开始阶段遇到了这个问题。

最佳答案

长话短说

由于同步泄漏,您遇到了竞争条件。

说明

您尝试同步的方式是有漏洞的,它允许多个线程执行相同的工作。这是你已经注意到的事情。没有原子 block 可以防止两个并发线程执行相同的语句——这里不说锁。

如果您使用相同的对象实例,引入synchronized 方法可能会在本地解决问题,但这不是解决方案。它会阻塞其他并发线程,阻止它们继续进行。

你为什么不通过例如同步利用 Redis 的属性呢?一组?

可以使用 Redis 集来确保只有一个线程/进程能够通过将特定元素添加到 Redis 集来处理该元素。 Redis 将响应添加该元素是否成功。此信息将帮助您通过检查响应来解决竞争问题。如果该元素可以添加到集合中,那么当前线程是第一个命中您的 dataId 的线程,并且该线程可能会继续进行昂贵的工作(数据库获取,...)。

当向同步集中添加元素失败时,您就知道其他进程已经在做昂贵的工作,您可以继续从缓存中查找数据。您需要注意,尽管另一个线程可能赢得了非阻塞同步,但另一个第一个线程不一定完成填充缓存。然后您可以:

  1. 轮询缓存直到值存在。
  2. 暂停(您无法确定其他进程是否处于事件状态),然后进行重复工作以推进您的实际任务。

再想想更糟糕的是:WAITING缓存值并阻止导入进程或多次查询昂贵的数据源。您可以针对这两种情况进行优化,但您需要自行决定。

关于redis - 在多线程环境中从 Redis 列表中弹出值会将重复值返回给少数线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49154918/

相关文章:

java - 如何将消费者组与 Spring Data Redis for Redis Streams 一起使用(继续获取 NOGROUP)?

node.js - 将 redis 用于 pub-sub 时不需要的多条消息

docker - 无法从应用程序容器连接到 Redis 容器

memory - Membase 或 Redis 用于内存缓存系统

.net-core - 除了从本地主机无法连接到Redis

multithreading - 多线程与分区之间的 Spring 批处理差异

java - 如何在 Spring 管理中使用 JobLauncher Synchronizer

java - 添加 spring-cloud-starter-dataflow-server-local 会导致错误 : Command line is too long. Shorten command line for ... 或 Spring Boot

java - Spring 批处理 : taskExecutor exception when adding skip limit