flutter - Dart 中的批处理 future

标签 flutter dart future batching

我想将许多 future 批处理到一个请求中,该请求在达到最大批处理大小或达到自收到最早的 future 以来的最长时间时触发。

动机

在 flutter 中,我有许多 UI 元素需要显示 future 的结果,这取决于 UI 元素中的数据。

例如,我有一个地方的小部件和一个显示步行到某个地方需要多长时间的子小部件。为了计算步行需要多长时间,我向 Google Maps API 发出请求以获取到该地点的旅行时间。

将所有这些 API 请求批处理成一个批处理 API 请求会更加高效和经济。因此,如果小部件即时发出 100 个请求,则可以通过单个提供程序代理 future ,该提供商将 future 分批成对 Google 的单个请求,并将来自 Google 的结果解包到所有单独的请求中。

提供者需要知道何时停止等待更多 future 以及何时实际发出请求,这应该由最大“批量”大小(即旅行时间请求的数量)或您愿意的最大时间量来控制等待批处理发生。

所需的 API 类似于:


// Client gives this to tell provider how to compute batch result.
abstract class BatchComputer<K,V> {
  Future<List<V>> compute(List<K> batchedInputs);
}

// Batching library returns an object with this interface
// so that client can submit inputs to completed by the Batch provider.
abstract class BatchingFutureProvider<K,V> {
  Future<V> submit(K inputValue);
}

// How do you implement this in dart???
BatchingFutureProvider<K,V> create<K,V>(
   BatchComputer<K,V> computer, 
   int maxBatchSize, 
   Duration maxWaitDuration,
);


Dart(或 pub 包)是否已经提供了这个批处理功能,如果没有,你将如何实现 create上面的功能?

最佳答案

这听起来非常合理,但也非常专业。
您需要一种表示查询的方法,将这些查询组合成一个 super 查询,然后将 super 结果拆分为单独的结果,这就是您的 BatchComputer做。然后你需要一个队列,你可以在某些条件下刷新它。

有一点很清楚,您需要使用 Completer s 用于结果,因为当您想要返回 future 时,您总是需要它,然后才能获得值(value)或 future 来完成它。

我会选择的方法是:

import "dart:async";

/// A batch of requests to be handled together.
///
/// Collects [Request]s until the pending requests are flushed.
/// Requests can be flushed by calling [flush] or by configuring
/// the batch to automatically flush when reaching certain 
/// tresholds.
class BatchRequest<Request, Response> {
  final int _maxRequests;
  final Duration _maxDelay;
  final Future<List<Response>> Function(List<Request>) _compute;
  Timer _timeout;
  List<Request> _pendingRequests;
  List<Completer<Response>> _responseCompleters;

  /// Creates a batcher of [Request]s.
  ///
  /// Batches requests until calling [flush]. At that pont, the
  /// [batchCompute] function gets the list of pending requests,
  /// and it should respond with a list of [Response]s.
  /// The response to the a request in the argument list
  /// should be at the same index in the response list, 
  /// and as such, the response list must have the same number
  /// of responses as there were requests.
  ///
  /// If [maxRequestsPerBatch] is supplied, requests are automatically
  /// flushed whenever there are that many requests pending.
  ///
  /// If [maxDelay] is supplied, requests are automatically flushed 
  /// when the oldest request has been pending for that long. 
  /// As such, The [maxDelay] is not the maximal time before a request
  /// is answered, just how long sending the request may be delayed.
  BatchRequest(Future<List<Response>> Function(List<Request>) batchCompute,
               {int maxRequestsPerBatch, Duration maxDelay})
    : _compute = batchCompute,
      _maxRequests = maxRequestsPerBatch,
      _maxDelay = maxDelay;

  /// Add a request to the batch.
  ///
  /// The request is stored until the requests are flushed,
  /// then the returned future is completed with the result (or error)
  /// received from handling the requests.
  Future<Response> addRequest(Request request) {
    var completer = Completer<Response>();
    (_pendingRequests ??= []).add(request);
    (_responseCompleters ??= []).add(completer);
    if (_pendingRequests.length == _maxRequests) {
      _flush();
    } else if (_timeout == null && _maxDelay != null) {
      _timeout = Timer(_maxDelay, _flush);
    }
    return completer.future;
  }

  /// Flush any pending requests immediately.
  void flush() {
    _flush();
  }

  void _flush() {
    if (_pendingRequests == null) {
      assert(_timeout == null);
      assert(_responseCompleters == null);
      return;
    }
    if (_timeout != null) {
      _timeout.cancel();
      _timeout = null;
    }
    var requests = _pendingRequests;
    var completers = _responseCompleters;
    _pendingRequests = null;
    _responseCompleters = null;

    _compute(requests).then((List<Response> results) {
      if (results.length != completers.length) {
        throw StateError("Wrong number of results. "
           "Expected ${completers.length}, got ${results.length}");
      }
      for (int i = 0; i < results.length; i++) {
        completers[i].complete(results[i]);
      }
    }).catchError((error, stack) {
      for (var completer in completers) {
        completer.completeError(error, stack);
      }
    });
  }
}

您可以将其用作例如:
void main() async {
  var b = BatchRequest<int, int>(_compute, 
      maxRequestsPerBatch: 5, maxDelay: Duration(seconds: 1));
  var sw = Stopwatch()..start();
  for (int i = 0; i < 8; i++) {
    b.addRequest(i).then((r) {
      print("${sw.elapsedMilliseconds.toString().padLeft(4)}: $i -> $r");
    });
  }
}
Future<List<int>> _compute(List<int> args) => 
    Future.value([for (var x in args) x + 1]);

关于flutter - Dart 中的批处理 future ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58178129/

相关文章:

dart - 是否曾经调用过WebComponent.removed()?

sql - NHibernate future 对象图 许多查询

android - flutter build size 超出预期

flutter - flutter -在日历插件中创建动态标记事件

ios - 在 Flutter iOS 中将 jpg 图像转换为 png 图像

scala - 在能够处理其他一些消息之前初始化一个actor

java - 如何并行调用返回 future 的方法?

flutter - 使用 ConstrainedBox 和 Container 小部件为其子框提供约束有什么区别?

flutter - 我们如何使用 ArCore 在 flutter 中实现类似 snapchat 的过滤器

flutter - SELinux Denial,同时进行视频通话和转录