database - 通过分布式数据库聚合作业优化网络带宽

标签 database algorithm caching optimization distributed-computing

我有一个分布式/联合数据库结构如下:

  • 数据库分布在三个地理位置(“节点”)
  • 每个节点集群多个数据库
  • 关系数据库混合了 PostgreSQL、MySQL、Oracle 和 MS SQL Server;非关系数据库是 MongoDB 或 Cassandra
  • 每个节点内和跨节点联合的松散耦合是通过 RabbitMQ 实现的,每个节点都运行一个 RabbitMQ 代理

  • 我正在为跨节点联合的作业(即非节点本地的作业)实现一个只读的节点间聚合作业系统。这些作业只执行“获取”查询——它们不修改数据库。 (如果作业的结果旨在进入一个或多个数据库,那么这是由一个单独的作业完成的,该作业不是我试图优化的节点间作业系统的一部分。)我的目标是最小化这些作业所需的网络带宽(首先最小化节点间/WAN带宽,然后最小化节点内/LAN带宽);我假设每个 WAN 链接的成本是统一的,而每个 LAN 链接的成本是另一个统一的。这些工作对时间不是特别敏感。我在一个节点内而不是在节点之间执行一些 CPU 负载平衡。

    与集群或特定数据库本地的数据库写入量相比,为聚合作业跨 WAN/LAN 传输的数据量较小,因此跨联邦完全分布数据库是不切实际的。

    我用于最小化网络带宽的基本算法是:
  • 给定一个在整个联邦中分布的一组数据上运行的作业,管理器节点向每个其他节点发送一条消息,其中包含相关的数据库查询。
  • 每个节点运行它的一组查询,用 gzip 压缩它们,缓存它们,并将它们的压缩大小发送到管理器节点。
  • 管理器移动到包含多个数据的节点(具体来说,移动到集群中数据最多且内核空闲的机器);它从其他两个节点和集群中的其他机器请求其余数据,然后运行作业。

  • 在可能的情况下,作业使用分而治之的方法来最小化所需的数据协同定位量。例如,如果作业需要计算联邦中所有销售额的总和,则每个节点在本地计算其销售额总和,然后在管理器节点聚合(而不是将所有未处理的销售数据复制到管理器节点) .但是,有时(例如在位于不同节点的两个表之间执行连接时)需要数据共存。

    我为优化所做的第一件事是聚合作业,并以十分钟的时间运行聚合的作业(机器都在运行 NTP,所以我可以合理地确定“每十分钟”在每个节点上意味着相同的事情)。目标是让两个作业能够共享相同的数据,从而降低传输数据的总体成本。
  • 给定两个查询同一个表的作业,我生成每个作业的结果集,然后取两个结果集的交集。
  • 如果两个作业都计划在同一节点上运行,则网络传输成本计算为两个结果集的总和减去两个结果集的交集。
  • 这两个结果集存储在选择运行作业的节点上的 PostgreSQL 临时表(在关系数据的情况下)或临时 Cassandra columnfamilies/MongoDB 集合(在 nosql 数据的情况下);然后针对组合的结果集执行原始查询,并将数据交付给各个作业。 (此步骤仅在组合结果集上执行;单个结果集数据只是简单地传送到其作业,而无需首先存储在临时表/列族/集合中。)

  • 这会导致网络带宽的改善,但我想知道是否有框架/库/算法可以对此进行改进。我考虑的一种选择是在节点上缓存结果集,并在确定网络带宽时考虑这些缓存的结果集(即,除了当前的一组预先调度的协同定位作业之外,尝试跨作业重用结果集,例如在一个 10 分钟的时期内运行的作业可以使用前一个 10 分钟结果集的缓存结果集),但除非作业使用完全相同的结果集(即,除非它们使用相同的 where 子句),否则我不知道一般 -将填补结果集中空白的目的算法(例如,如果结果集使用了“where N > 3”子句,而不同的工作需要带有“where N > 0”子句的结果集,那么我可以使用什么算法来确定我需要将原始结果集与结果集与子句“where N > 0 AND N <= 3”)合并 - 我可以尝试编写自己的算法来执行此操作,但结果将是马车无用的困惑。我还需要确定缓存数据何时过时 - 最简单的方法是将缓存数据的时间戳与源表上最后修改的时间戳进行比较,如果时间戳已更改,则替换所有数据,但理想情况下我希望能够仅更新每行或每块时间戳已更改的值。

    最佳答案

    我已经开始实现我对这个问题的解决方案。

    为了简化节点内缓存并简化 CPU 负载平衡,我在每个数据库集群(“Cassandra 节点”)中使用 Cassandra 数据库来运行聚合作业(以前我手动聚合本地数据库结果集) ) - 我将单个 Cassandra 数据库用于关系、Cassandra 和 MongoDB 数据(缺点是某些关系查询在 Cassandra 上运行速度较慢,但​​由于单个统一聚合数据库更容易实现这一点而弥补了这一点)维护而不是单独的关系和非关系聚合数据库)。我也不再在十分钟内聚合作业,因为缓存使这个算法变得不必要。

    节点中的每台机器都引用一个名为 Cassandra_Cache_[MachineID] 的 Cassandra 列族,该列族用于存储它已发送到 Cassandra 节点的 key_ids 和 column_ids。 Cassandra_Cache 列族由 Table 列、Primary_Key 列、Column_ID 列、Last_Modified_Timestamp 列、Last_Used_Timestamp 列和由 Table|Primary_Key|Column_ID 组成的组合键组成。 Last_Modified_Timestamp 列表示来自源数据库的数据的 last_modified 时间戳,Last_Used_Timestamp 列表示数据上次被聚合作业使用/读取的时间戳。当 Cassandra 节点从机器请求数据时,机器计算结果集,然后取结果集和其 Cassandra_Cache 中的表|键|列的集合差,这些列与它的 Cassandra_Cache 中的行具有相同的 Last_Modified_Timestamp(如果时间戳不匹配,则缓存数据已过时,并与新的 Last_Modified_Timestamp 一起更新)。本地机器然后将设置差异发送到 Cassandra 节点并使用设置差异更新其 Cassandra_Cache 并更新用于组成结果集的每个缓存数据上的 Last_Used_Timestamp。 (为每个表|键|列维护一个单独的时间戳的一个更简单的替代方法是为每个表|键维护一个时间戳,但这不太精确,并且表|键|列时间戳不会过于复杂。)将 Last_Used_Timestamps 保留在Cassandra_Caches 之间的同步只需要本地机器和远程节点发送与每个作业关联的 Last_Used_Timestamp,因为作业中的所有数据都使用相同的 Last_Used_Timestamp。

    Cassandra 节点使用从节点内接收到的新数据以及从其他节点接收到的数据更新其结果集。 Cassandra 节点还维护一个列族,该列族存储每台机器的 Cassandra_Cache 中的相同数据(Last_Modified_Timestamp 除外,它仅在本地机器上需要以确定数据何时过时),以及指示数据是否已到达的源 ID来自节点内或来自另一个节点 - id 区分不同的节点,但不区分本地节点内的不同机器。 (另一种选择是使用统一的 Cassandra_Cache,而不是在每台机器上使用一个 Cassandra_Cache 并为节点使用另一个 Cassandra_Cache,但我认为增加的复杂性不值得节省空间。)

    每个 Cassandra 节点还维护一个 Federated_Cassandra_Cache,它由从本地节点发送到其他两个节点之一的 {Database, Table, Primary_Key, Column_ID, Last_Used_Timestamp} 元组组成。

    当一个作业通过管道时,每个 Cassandra 节点用本地结果集更新其节点内缓存,并完成可以在本地执行的子作业(例如在一个作业中对多个节点之间的数据求和,每个节点对其进行求和节点内数据以最小化需要在节点间联合中共同定位的数据量) - 如果子作业仅使用节点内数据,则可以在本地执行。然后,管理器节点决定在哪个节点上执行剩余的工作:每个 Cassandra 节点可以通过获取其结果集和已缓存的结果集子集的集差,在本地计算将其结果集发送到另一个节点的成本。到它的 Federated_Cassandra_Cache,并且管理器节点最小化成本方程 [“从 NodeX 传输结果集的成本”+“从 NodeY 传输结果集的成本”]。例如,将其结果集传输到 {Node2, Node3} 的成本是 Node1 {3, 5},将其结果集传输到 {Node1, Node3} 的成本是 Node2 {2, 2},而 Node3 {4, 3} 的成本是将其结果集传输到 {Node1, Node2},因此作业在 Node1 上运行,成本为“6”。

    我正在为每个 Cassandra 节点使用 LRU 驱逐策略;我最初使用的是最老的优先驱逐策略,因为它更容易实现并且需要更少的写入 Last_Used_Timestamp 列(每个数据更新一次而不是每个数据读取一次),但 LRU 策略的实现结果证明并不过分复杂且 Last_Used_Timestamp 写入不会造成瓶颈。当 Cassandra 节点达到 20% 的可用空间时,它会驱逐数据,直到达到 30% 的可用空间,因此每次驱逐的大小约为可用总空间的 10%。节点维护两个时间戳:最后驱逐的节点内数据的时间戳,以及最后驱逐的节点间/联合数据的时间戳;由于节点间通信相对于节点内通信的延迟增加,驱逐策略的目标是使 75% 的缓存数据为节点间数据,25% 的缓存数据为节点内数据,可以通过每个驱逐的 25% 是节点间数据和每个驱逐的 75% 是节点内数据来快速近似。驱逐的工作原理如下:

    while(evicted_local_data_size < 7.5% of total space available) {
        evict local data with Last_Modified_Timestamp < 
            (last_evicted_local_timestamp += 1 hour)
        update evicted_local_data_size with evicted data
    }
    
    while(evicted_federated_data_size < 2.5% of total space available) {
        evict federated data with Last_Modified_Timestamp < 
            (last_evicted_federated_timestamp += 1 hour)
        update evicted_federated_data_size with evicted data
    }
    

    在从节点内的机器和其他节点收到驱逐确认之前,驱逐的数据不会被永久删除。

    Cassandra 节点然后向其节点内的机器发送通知,指示新的 last_evicted_local_timestamp 是什么。本地机器更新他们的 Cassandra_Caches 以反射(reflect)新的时间戳,并在完成时向 Cassandra 节点发送通知;当 Cassandra 节点收到来自所有本地机器的通知时,它会永久删除被驱逐的本地数据。 Cassandra 节点还使用新的 last_evicted_federated_timestamp 向远程节点发送通知;其他节点更新他们的 Federated_Cassandra_Caches 以反射(reflect)新的时间戳,并且 Cassandra 节点在收到来自每个节点的通知时永久删除被驱逐的联合数据(Cassandra 节点会跟踪数据来自哪个节点,因此在收到驱逐后来自 NodeX 的确认,节点可以在收到来自 NodeY 的驱逐确认之前永久删除被驱逐的 NodeX 数据)。在所有机器/节点都发送通知之前,如果 Cassandra 节点从尚未驱逐其旧数据的机器/节点收到结果集,则它会在其查询中使用缓存的驱逐数据。例如,Cassandra 节点有一个它已经驱逐的本地 Table|Primary_Key|Column_ID 数据,同时本地机器(尚未处理驱逐请求)没有在其结果集中包含 Table|Primary_Key|Column_ID 数据,因为它认为Cassandra 节点的缓存中已经有数据; Cassandra 节点从本地机器接收结果集,并且由于本地机器尚未确认驱逐请求,Cassandra 节点将缓存的驱逐数据包含在其自己的结果集中。

    关于database - 通过分布式数据库聚合作业优化网络带宽,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16879815/

    相关文章:

    algorithm - 递归访问二叉树中的节点

    algorithm - 分而治之算法是否使用递归

    c# - (C#) 从 SQL 数据库读取用户名和密码进行登录

    c - 来自另一个表的 SQLITE UPDATE 字段 IF NULL

    java - 在 Java 中检测 Hashmap 中的循环依赖

    caching - 将 IntelliJ IDEA 缓存/索引目录移动到 RAM

    c# - VaryByCustom 不适用于 session 变量

    python - Django/一般缓存问题

    sql - 500 操作无法完成。 (PostgreSQL.DatabaseError 错误 1。)

    mysql - 与同一实体的多个一对多关系