list - 将最近两分钟的数据存储在Redis列表中

标签 list go redis queue

我正在缓存系统的数据库,并且遇到了这个问题:

我们有一些远程设备每秒将日志发送到服务器!由于我们拥有许多此类设备,因此我们不能仅将所有数据存储在普通数据库表中。实际上,我们需要快速访问每个设备发送的日志的最后5分钟。日志位于json中,但我们将它们映射到Go结构。每个日志都有一些数据,例如:

    type Log struct {
    DeviceID        string
    CompanyID       uint
    DeviceTime      time.Time
    Latitude        float64
    Longitude       float64
    Altitude        float64
    SpeedOTG        float32
}

我们正在使用 redis 将数据存储在ram上。

我的问题:

任务是将每个设备的最后5分钟的所有日志存储在列表中。当提供缓存 key 时,我也应该检索这些数据。我必须做一些事情,例如排队。当其已满(包含最近5分钟的日志)时,每个新条目都会进入,旧条目应该会消失。但是,如果列表仍然不包含最后5分钟,那么什么也不会发生!所有计算都是根据上面定义的 DeviceTime 进行的。

起初,我想到了要存储大小确定为(5 * 60)的日志列表,但没有被授权者每秒都有新日志!

最佳答案

将您的日志存储在Redis Streams中。要插入,请使用 XADD command[Go Redis Example]。

您可以将所有设备存储在一个流中,或者每个设备使用一个流。

在这里,假设每个设备一个流并保留一组设备ID,则将插入:

SADD devices myDeviceID
XADD logs:myDeviceID * DeviceID myDeviceID CompanyID myCompanyID DeviceTime "myDeviceTime"
     ... SpeedOTG "mySpeedOTG"

设置的devices是要保留设备ID的列表,以防您要检索所有设备的数据并且不想依赖保留数据(客户端)或使用SCAN。要检索所有设备日志,您将使用类似于我在最后的Lua脚本示例Which datatype to use for this RedisCache implementation?中描述的模式。

如果进行字段压缩,流的好处之一是:当您保持字段一致(相同的字段以相同的顺序)时,Redis通过不对每个条目重复字段名称来进行优化。

通过使用*,您可以让Redis将时间戳记设置为条目ID。您可以使用自己的时间戳记作为ID,只要您确信它会严格增加。您不能将ID添加到ID小于最后添加的ID的流中。

您可以使用 XTRIM 将列表保持在300个以下,但是正如您所说的,您不能依靠一致的每秒XADD。

要将其精确地保留到最后5分钟,您可以使用:
XRANGE logs:myDeviceID - + COUNT 1

这将检索第一个(最旧的)条目。可以将它与最新添加的最新条目的时间戳进行比较,如果时间超过5分钟,则可以使用 XDEL 删除它。

重复直到最旧的条目不超过5分钟。

当然,执行此逻辑客户端意味着很多往返,从而影响您的性能。因此,我建议您使用Lua Script在Redis服务器端进行操作。您可以每次使用EVAL传递脚本。但是最好是先load the script然后再使用EVALSHA

剧本:
redis.call('SADD', KEYS[1], ARGV[2])

local latestID = redis.call('XADD', 'logs:'..ARGV[2], '*', 'DeviceID', ARGV[2],
                            'OtherFields', ARGV[3])
local latestTime = tonumber(string.sub(latestID, 1, string.find(latestID, '-') - 1))

local oldestID = redis.call('XRANGE', 'logs:'..ARGV[2], '-', '+', 'COUNT', '1')
oldestID = oldestID[1][1]
local oldestTime = tonumber(string.sub(oldestID, 1, string.find(oldestID, '-') - 1))

local maxTime = tonumber(ARGV[1])

while (latestTime - oldestTime) > maxTime do
    redis.call('XDEL', 'logs:'..ARGV[2], oldestID)
    oldestID = redis.call('XRANGE', 'logs:'..ARGV[2], '-', '+', 'COUNT', '1')
    oldestID = oldestID[1][1]
    oldestTime = tonumber(string.sub(oldestID, 1, string.find(oldestID, '-') - 1))
end
return { 'Added: '..latestID, 'Device: '..ARGV[2], 'Stream Key: logs:'..ARGV[2],
         'Length: '..redis.call('XLEN', 'logs:'..ARGV[2]) }

参数:
EVALSHA <shaId> 1 devices 300000 myDeviceID ...
                                            ^ARGV[3...]: the rest of fields
                                 ^ARGV[2]: DeviceID
                          ^ARGV[1]: logging time desired (ms), 5 minutes = 300000
                  ^KEYS[1]: the set key and prefix to device log keys
                ^numkeys

使用流,您可以获得很多功能,例如按时间戳范围或使用者组。

但最重要的是,要使您的日志不受时间限制,请使用Lua脚本。您也可以使用列表来实现,存储以一个字符串编码的日志条目:timestamp:valuesSerialized,并使用与Lua类似的方法在一个原子操作中添加-check-trim。

关于list - 将最近两分钟的数据存储在Redis列表中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59909195/

相关文章:

python - 压缩嵌套列表

java - 列表实现中的 ClassCastException?

python - 文件类型列表声明问题

javascript - 如何使用 arr.forEach 调用异步 JavaScript redis 调用?

redis - 如何在flink map()中使用Jedis

Redis hgetall 还是 hget?

C# 使用 LINQ 对字符数组列表进行排序

amazon-web-services - 键包含点时可以设置映射吗?

go - 为什么我们不能将 slice 或映射嵌入到 Go 结构中

Go:使用 map 数组创建 map