我正在设计一个系统,该系统应该分析大量用户交易并生成聚合度量(例如趋势等)。 该系统应该运行快速、健壮且可扩展。 系统基于 Java(在 Linux 上)。
数据来自生成用户交易日志文件(基于 CSV)的系统。 系统每分钟生成一个文件,每个文件包含不同用户的交易记录(按时间排序),每个文件可能包含数千个用户。
CSV 文件的示例数据结构:
10:30:01,用户 1,...
10:30:01,用户 1,...
10:30:02,用户 78,...
10:30:02,用户 2,...
10:30:03,用户 1,...
10:30:04,用户 2,...
.
.
.
我计划的系统应该实时处理文件并执行一些分析。 它必须收集输入,将其发送到多个算法和其他系统,并将计算结果存储在数据库中。数据库不保存实际的输入记录,只保存关于交易的高级聚合分析。比如趋势等。
我计划使用的第一个算法需要至少 10 条用户记录才能获得最佳操作,如果 5 分钟后找不到 10 条记录,它应该使用任何可用的数据。
我想使用 Storm 来实现,但我更愿意将此讨论尽可能留在设计层面。
系统组件列表:
每分钟监控传入文件的任务。
读取文件、解析文件并使其可用于其他系统组件和算法的任务。
为用户缓冲 10 条记录(不超过 5 分钟)的组件,当收集到 10 条记录或 5 分钟后,就可以将数据发送给算法进行进一步处理。 由于要求为算法提供至少 10 条记录,我想到了使用 Storm Field Grouping(这意味着为同一用户调用相同的任务)并跟踪任务中 10 条用户记录的集合,当然我计划有几个这样的任务,每个任务处理一部分用户。
还有其他组件处理单个事务,我计划为它们创建其他任务,在每个事务被解析时接收它(与其他任务并行)。
我需要你帮助 #3。
设计此类组件的最佳实践是什么? 很明显,它需要维护每个用户 10 条记录的数据。 键值映射可能会有所帮助,在任务本身或使用分布式缓存中管理映射更好吗? 例如 Redis 键值存储(我以前从未使用过)。
谢谢你的帮助
最佳答案
我曾多次使用 Redis。所以,我会评论你对使用 redis 的想法
#3 有 3 个要求
每个用户的缓冲区
10 个任务的缓冲区
应该每 5 分钟过期一次
<强>1。每个用户的缓冲区: Redis只是一个键值存储。虽然它支持各种各样的 datatypes ,它们始终是映射到 STRING 键的值。因此,如果您需要每个用户缓冲区,您应该决定如何唯一地识别用户。因为在 Redis 中,当你覆盖一个键的新值时,你永远不会得到错误。一种解决方案可能是在写入之前检查是否存在。
<强>2。 10 个任务的缓冲区: 您显然可以实现 queue在redis中。但是限制它的大小是留给你的。例如:使用 LPUSH
和 LTRIM
或使用 LLEN
检查长度并决定是否触发您的进程。与此队列关联的键应该是您在第 1 部分中确定的键。
<强>3。缓冲区在 5 分钟后过期:这是一项最艰巨的任务。在 Redis 中,无论其值具有何种底层数据类型,每个键都可以有一个 expiry
。但是到期过程是无声的。您不会在任何 key 过期时收到通知。因此,如果您使用此属性,您将悄无声息地失去缓冲区。解决这个问题的一个方法是,有一个索引。意味着,索引会将时间戳映射到所有需要在该时间戳值处过期的键。然后在后台,您可以每分钟读取一次索引,并从 redis 中手动删除 [读取后] 键,并使用缓冲区数据调用所需的进程。要获得这样的索引,您可以查看 Sorted Sets . timestamp 将是您的 score
并且 set member
将是您希望在该时间戳删除的键 [第 1 部分中决定的每个用户的唯一键映射到队列]。您可以执行 zrangebyscore
以读取具有指定时间戳的所有集合成员
总体:
使用Redis List实现队列。
使用 LLEN 确保您没有超过 10 个限制。
每当您创建一个新列表时,都会在索引 [Sorted Set] 中创建一个条目,并将 Score 作为 Current Timestamp + 5 min
并将 Value 作为列表的键。
当 LLEN 达到 10 时,记得读取然后从索引 [sorted set] 和数据库 [delete the key->list] 中删除键。然后用数据触发你的过程。
每隔一分钟,生成当前时间戳,读取索引和每个键,读取数据,然后从数据库中删除键并触发您的过程。
这可能是我实现它的方式。可能还有其他更好的方法在 Redis 中对数据建模
关于redis - 实时分析处理系统设计,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11644571/