c# - 如何使用多线程 C# 应用程序在 Redis 中插入数百万个键/值

标签 c# multithreading redis

我需要创建一个 C# 应用程序(Windows 服务),它每 5 秒(间隔)运行一次,生成大约 2000 万个值。

我需要在 5 秒内将这 2000 万个值插入 Redis(1 个键/值),确保在下一个间隔开始之前完成插入。

注意:我只需要在 Redis 中保留 7 个周期 => 2000 万 * 7 => Redis 中的 1.4 亿个键

我正在使用 C# 的 Threading.Tasks 调用一个函数(2000 万次),以便它们被并行(异步)处理。

我什至为 Redis 客户端创建了一个池,以便我的进程也能够并行执行 Redis 查询。

这是调用该函数 2000 万次的 C# 部分:

List<Task> tasksList = new List<Task>();

foreach (object k in ListOf20MillionData)
{
    tasksList.Add(

        Task.Factory.StartNew(() =>
        {
            GenerateValue(k);
            //Inside 'GenerateValue' data is generated and pushed to redis
        })
    );
}

这是“GenerateValue”中的一段代码,它从客户端池中获取一个 Redis 客户端对象,执行插入并将 Redis 客户端释放回池中。

RedisClient redisClientObj = RedisPool.GetNextAvailableClient();

redisClientObj.Add("SomeKey", "SomeValue");

RedisPool.ReleaseRedisClient(redisClientObj );

我的担忧和挑战:

  1. 我对 Redis 池的概念是否正确?
  2. Redis 可以处理多少客户端连接?
  3. 我的要求甚至可以使用 C# 和 Redis 实现吗?
  4. 非常感谢任何意见或建议。

最佳答案

  1. 我对 Redis 池的概念是否正确?

不是真的。池不会给你更多的吞吐量。它们将不同的逻辑连接范围分解为顺序命令,并且它们允许简单的并发...但是 Redis 核心是单线程的,您应该寻求饱和网络,而不是线程。

  1. Redis 可以处理多少客户端连接?

很多,但是如果您不能使它们饱和,那么添加更多也无济于事 - 事实上,拥有大量连接会增加开销

  1. 我的要求甚至可以使用 C# 和 Redis 实现吗?

仅在非常具有庞大网络的强大盒子上;您可能通过“集群”增加吞吐量,但这也会增加数据包碎片

  1. 非常感谢任何意见或建议。

批处理。疯狂地批处理,以尽量减少往返。具有微小响应的胖批处理可以非常有效地利用网络,并且不需要您拥有复杂的代码。而 redis mset 命令正是针对这一点进行了优化:具有微小响应的大批量。

在本地,同一台机器在单个线程上发明数据并且是redis服务器,但对我来说它仍然需要34秒:

    static void Main()
    {
        using (var conn = ConnectionMultiplexer.Connect("127.0.0.1:6379"))
        {
            var db = conn.GetDatabase();
            var watch = Stopwatch.StartNew();
            foreach(var batch in InventData(20000000).Batchify(5000))
            {
                db.StringSet(batch);
            }
            watch.Stop();
            Console.WriteLine(watch.ElapsedMilliseconds);
        }
    }

或者如果我使用Parallel,即

            var watch = Stopwatch.StartNew();
            Parallel.ForEach(InventData(20000000).Batchify(5000),
                batch => db.StringSet(batch));
            watch.Stop();

需要 16 秒。

和(见评论)如果我将 Parallel 与 async 结合使用:

            var watch = Stopwatch.StartNew();
            Parallel.ForEach(InventData(20000000).Batchify(5000),
                batch => db.StringSetAsync(batch));
            watch.Stop();

然后它只需要不到 14 秒。

    static IEnumerable<KeyValuePair<RedisKey, RedisValue>> InventData(int count)
    {
        if (count < 0) throw new ArgumentOutOfRangeException(nameof(count));
        string dictionary = "abcdefghijklmnopqrstuvwxyz _@:0123456789";
        int dLen = dictionary.Length;
        var rand = new Random(12345);
        const int KEY_LEN = 10, MAX_VAL_LEN = 50;
        char[] keyData = new char[KEY_LEN];
        char[] valueData = new char[MAX_VAL_LEN];
        while (count-- != 0)
        {
            for (int i = 0; i < keyData.Length; i++)
                keyData[i] = dictionary[rand.Next(dLen)];
            var len = rand.Next(10, MAX_VAL_LEN);
            for(int i = 0; i < len; i++)
                valueData[i] = dictionary[rand.Next(dLen)];

            yield return new KeyValuePair<RedisKey, SomeType>(
                new string(keyData), new string(valueData, 0, len));
        }
    }

    static IEnumerable<T[]> Batchify<T>(this IEnumerable<T> source, int batchSize)
    {
        var batch = new List<T>(batchSize);
        foreach(var item in source)
        {
            batch.Add(item);
            if (batch.Count == batchSize)
            {
                var arr = batch.ToArray();
                batch.Clear();
                yield return arr;
            }
        }
        if (batch.Count != 0) yield return batch.ToArray(); // trailers
    }

关于c# - 如何使用多线程 C# 应用程序在 Redis 中插入数百万个键/值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55515370/

相关文章:

c# - c#中字符串之间的比较

c# - 具有 Entity Framework 的 WCF 存储库服务模式

c# - c# 中的定时器初始化和竞争条件?

java - 消息传递(例如 JMS)何时可以替代多线程?

java - 为什么线程中的公共(public)方法无法在 onCreate 中解析?

redis - 如何处理试图清除 redis 数据库的多个服务器

python - 特定于队列的 Celery 事件

c# - 如何使用与 C# 兼容的 python 创建 RSA 加密 key 对,反之亦然?

c# - JIT 编译器是否优化(内联)不必要的变量声明?

redis 在 flushall 命令上挂起