java - 使用 groovy actor 最大化数据库的吞吐量?

标签 java database concurrency groovy parallel-processing

我正在研究 GPars 库,同时致力于提高匹配系统的可扩展性。我希望能够查询数据库并在并发处理结果的同时立即查询数据库。瓶颈是从数据库中读取,所以我想让数据库全天忙碌,同时在结果可用时异步处理结果。我意识到我可能对 actor 框架的工作原理有一些根本性的误解,我很乐意得到纠正!

在伪代码中,我尝试执行以下操作:

定义两个参与者,一个用于运行针对数据库的选择,另一个用于处理记录。

  1. queryActor 查询数据库并将结果发送给processorActor
  2. queryActor 立即再次查询数据库,无需等待 processorActor 完成

我或许可以在不使用 actor 的情况下实现简单的用例,但我的最终目标是拥有一个 actor 池,该池始终使用可能不同的数据源处理新查询,以提高系统的总体吞吐量。

处理 Actor 总是比数据库查询快得多,所以我想在未来同时查询多个副本。

      def processor = actor {
    loop {
      react {querySet ->
        println "processing recordset"
        if (querySet instanceof Object[]) {
          MatcherDataRowProcessor matcher = new MatcherDataRowProcessor(matchedRecords, matchedRecordSet);

          matchedRecords = matcher.processRecordset(querySet);
          reply matchedRecords
        }
        else {
          println 'processor fed nothing, halting processor actor'
          stop()
        }
      }
    }
  }

  def dbqueryer = actor {
    println "dbqueryer has started"

    while (batchNum.longValue() <= loopLimiter) {
      println "hitting db"
      Object[] querySet
      def thisRuleBatch = new MatchRuleBatch(targetuidFrom, targetuidTo)
      thisRuleBatch.targetuidFrom = batchNum * perBatch - perBatch
      thisRuleBatch.targetuidTo = thisRuleBatch.targetuidFrom + perBatch
      thisRuleBatch.targetName = targetName
      thisRuleBatch.whereClause = whereClause
      querySet = dao.getRecordSet(thisRuleBatch)
      processor.send querySet
      batchNum++
    }

    react { processedRecords ->
      processor.send false
    }
  }

最佳答案

我建议查看 Dataflow Concurrency 中的数据流队列GPars 用户指南的部分。您可能会发现数据流为您手头的问题提供了更好/更清晰的抽象。数据流也可以与参与者结合使用。

我认为 actor 或数据流都可以在这种情况下工作,并且认为决定归结为哪一个提供的抽象更接近您要完成的目标。对我来说,任务、队列、数据流的概念似乎更适合术语。

关于java - 使用 groovy actor 最大化数据库的吞吐量?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4459497/

相关文章:

java - Mockito 测试 java 8 lambda Consumer API

java - 在 mongoDB 集合上设置 ttl - 在应用程序或 shell 中?

Java 在编辑 observableList 时抛出 IndexOutOfBounds

sql-server - 数据库的单元测试框架

java - JSF/莫贾拉 2.0.2 : ui:repeat is totally broken when updating via AJAX

php - 如何在用户登录时显示用户特定功能?

c# - Entity Framework 连接到 Oracle : ODP for . NET "does not support time"

java - 是否可以使用可见性保证而不是成熟的锁定来以线程安全的方式访问共享状态?

hadoop - 使用 Zookeeper 的分布式应用程序

python - Python 中的函数属性