redis - 实时分析处理系统设计

标签 redis batch-processing distributed-computing apache-storm distributed-caching

我正在设计一个系统,该系统应该分析大量用户交易并生成聚合度量(例如趋势等)。 该系统应该运行快速、健壮且可扩展。 系统基于 Java(在 Linux 上)。

数据来自生成用户交易日志文件(基于 CSV)的系统。 系统每分钟生成一个文件,每个文件包含不同用户的交易记录(按时间排序),每个文件可能包含数千个用户。

CSV 文件的示例数据结构:

10:30:01,用户 1,...
10:30:01,用户 1,...
10:30:02,用户 78,...
10:30:02,用户 2,...
10:30:03,用户 1,...
10:30:04,用户 2,...
. . .

我计划的系统应该实时处理文件并执行一些分析。 它必须收集输入,将其发送到多个算法和其他系统,并将计算结果存储在数据库中。数据库不保存实际的输入记录,只保存关于交易的高级聚合分析。比如趋势等。

我计划使用的第一个算法需要至少 10 条用户记录才能获得最佳操作,如果 5 分钟后找不到 10 条记录,它应该使用任何可用的数据。

我想使用 Storm 来实现,但我更愿意将此讨论尽可能留在设计层面。

系统组件列表:

  1. 每分钟监控传入文件的任务。

  2. 读取文件、解析文件并使其可用于其他系统组件和算法的任务。

  3. 为用户缓冲 10 条记录(不超过 5 分钟)的组件,当收集到 10 条记录或 5 分钟后,就可以将数据发送给算法进行进一步处理。 由于要求为算法提供至少 10 条记录,我想到了使用 Storm Field Grouping(这意味着为同一用户调用相同的任务)并跟踪任务中 10 条用户记录的集合,当然我计划有几个这样的任务,每个任务处理一部分用户。

  4. 还有其他组件处理单个事务,我计划为它们创建其他任务,在每个事务被解析时接收它(与其他任务并行)。

我需要你帮助 #3。

设计此类组件的最佳实践是什么? 很明显,它需要维护每个用户 10 条记录的数据。 键值映射可能会有所帮助,在任务本身或使用分布式缓存中管理映射更好吗? 例如 Redis 键值存储(我以前从未使用过)。

谢谢你的帮助

最佳答案

我曾多次使用 Redis。所以,我会评论你对使用 redis 的想法

#3 有 3 个要求

  1. 每个用户的缓冲区

  2. 10 个任务的缓冲区

  3. 应该每 5 分钟过期一次

<强>1。每个用户的缓冲区: Redis只是一个键值存储。虽然它支持各种各样的 datatypes ,它们始终是映射到 STRING 键的值。因此,如果您需要每个用户缓冲区,您应该决定如何唯一地识别用户。因为在 Redis 中,当你覆盖一个键的新值时,你永远不会得到错误。一种解决方案可能是在写入之前检查是否存在。

<强>2。 10 个任务的缓冲区: 您显然可以实现 queue在redis中。但是限制它的大小是留给你的。例如:使用 LPUSHLTRIM 或使用 LLEN 检查长度并决定是否触发您的进程。与此队列关联的键应该是您在第 1 部分中确定的键。

<强>3。缓冲区在 5 分钟后过期:这是一项最艰巨的任务。在 Redis 中,无论其值具有何种底层数据类型,每个键都可以有一个 expiry。但是到期过程是无声的。您不会在任何 key 过期时收到通知。因此,如果您使用此属性,您将悄无声息地失去缓冲区。解决这个问题的一个方法是,有一个索引。意味着,索引会将时间戳映射到所有需要在该时间戳值处过期的键。然后在后台,您可以每分钟读取一次索引,并从 redis 中手动删除 [读取后] 键,并使用缓冲区数据调用所需的进程。要获得这样的索引,您可以查看 Sorted Sets . timestamp 将是您的 score 并且 set member 将是您希望在该时间戳删除的键 [第 1 部分中决定的每个用户的唯一键映射到队列]。您可以执行 zrangebyscore 以读取具有指定时间戳的所有集合成员

总体:

使用Redis List实现队列。

使用 LLEN 确保您没有超过 10 个限制。

每当您创建一个新列表时,都会在索引 [Sorted Set] 中创建一个条目,并将 Score 作为 Current Timestamp + 5 min 并将 Value 作为列表的键。

当 LLEN 达到 10 时,记得读取然后从索引 [sorted set] 和数据库 [delete the key->list] 中删除键。然后用数据触发你的过程。

每隔一分钟,生成当前时间戳,读取索引和每个键,读取数据,然后从数据库中删除键并触发您的过程。

这可能是我实现它的方式。可能还有其他更好的方法在 Redis 中对数据建模

关于redis - 实时分析处理系统设计,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11644571/

相关文章:

java - 使用 `start` 在 Windows 批处理文件中使用双引号启动进程的副作用

cron - 为什么在 kubernetes cron 作业中可能会创建两个作业,或者可能不会创建作业?

neural-network - tensorflow 。 Cifar10 多 gpu 示例使用更多 gpu 时性能更差

concurrency - 两个进程可以使用 Watch 更改相同的 Redis 资源。我应该担心活锁吗?

node.js - client.get() 的值是 "true"而不是实际值

redis - 无法在 Redis 数据库中配置端口

具有批处理功能的 Java 8 Stream

python - 使用流水线 hgetall 进行 Redis 慢速查询

c# - 如何使用 Nhibernate 删除多个数据库实体?

hadoop - 什么是数据序列化系统?