java - 如何根据用户定义的参数执行 throttle ?

标签 java multithreading scala throttling

我正在以用户在多线程环境中定义的批量大小写入内存分布式数据库。但是我想限制写入ex的行数。 1000 行/秒。这个要求的原因是我的生产者写得太快而消费者遇到叶内存错误。在批处理记录时是否有任何标准做法来执行 throttle 。

dataStream.map(line => readJsonFromString(line)).grouped(memsqlBatchSize).foreach { recordSet =>
      val dbRecords = recordSet.map(m => (m, Events.transform(m)))
      dbRecords.map { record =>
        try {
          Events.setValues(eventInsert, record._2)
          eventInsert.addBatch
        } catch {
          case e: Exception =>
            logger.error(s"error adding batch: ${e.getMessage}")
            val error_event = Events.jm.writeValueAsString(mapAsJavaMap(record._1.asInstanceOf[Map[String, Object]]))
            logger.error(s"event: $error_event")
        }
      }

      // Bulk Commit Records
      try {
        eventInsert.executeBatch
      } catch {
        case e: java.sql.BatchUpdateException =>
          val updates = e.getUpdateCounts
          logger.error(s"failed commit: ${updates.toString}")
          updates.zipWithIndex.filter { case (v, i) => v == Statement.EXECUTE_FAILED }.foreach { case (v, i) =>
            val error = Events.jm.writeValueAsString(mapAsJavaMap(dbRecords(i)._1.asInstanceOf[Map[String, Object]]))
            logger.error(s"insert error: $error")
            logger.error(e.getMessage)
          }
      }
      finally {
        connection.commit
        eventInsert.clearBatch
        logger.debug(s"committed: ${dbRecords.length.toString}")
      }
    }

我希望如果我可以将用户定义的参数作为 throttleMax 传递,并且如果每个线程写入的总记录达到 throttleMax,thread.sleep() 将被调用 1 秒。但这会使整个过程变得非常缓慢。是否有任何其他有效方法可用于将数据加载限制为 1000 行/秒?

最佳答案

正如其他人所建议的那样(请参阅问题的评论),与在此处限制相比,您有更好的选择。但是,您可以使用如下一些简单的代码限制 Java 中的操作:

/**
 * Given an Iterator `inner`, returns a new Iterator which will emit items upon
 * request, but throttled to at most one item every `minDelayMs` milliseconds.
 */
public static <T> Iterator<T> throttledIterator(Iterator<T> inner, int minDelayMs) {
    return new Iterator<T>() {
        private long lastEmittedMillis = System.currentTimeMillis() - minDelayMs;

        @Override
        public boolean hasNext() {
            return inner.hasNext();
        }

        @Override
        public T next() {
            long now = System.currentTimeMillis();
            long requiredDelayMs = now - lastEmittedMillis;
            if (requiredDelayMs > 0) {
                try {
                    Thread.sleep(requiredDelayMs);
                } catch (InterruptedException e) {
                    // resume
                }
            }
            lastEmittedMillis = now;

            return inner.next();
        }
    };
}

以上代码使用了Thread.sleep,所以不适合在Reactive系统中使用。在这种情况下,您可能希望使用该系统中提供的 Throttle 实现,例如throttle in Akka

关于java - 如何根据用户定义的参数执行 throttle ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53715890/

相关文章:

java - 当Java Web服务部署在运行同一应用服务器的不同操作系统上时,同一消息的不同数字签名

c++ - 从不同的线程、不同的函数写入(登录)同一个文件?

c# - 线程安全 List<T>.Remove 创建的 System.ArgumentOutOfRangeException

scala - 多集的高级类型

scala - 推断类型参数 [Boolean] 不符合方法过滤器的类型参数范围 [T < : slick. lifted.Rep[_]]

Scala:根据另一个有条件地执行一个 Future 的最惯用方式?

java - 如何使用Gson序列化LocalDate?

java - 如何将列表中的每个字段数据存储在字符串中?

Java FX : How to change the style of a cell when a property of the object relating to a row changes in TableView?

java - 线程完成时是否发出通知信号?为什么此代码示例有效?