dart - Dart:并行处理传入的HTTP请求

标签 dart dart-io dart-isolates

我正在尝试在Dart中编写一个HTTP服务器,该服务器可以并行处理多个请求。到目前为止,我未能成功实现“并行”部分。

这是我最初尝试的内容:

import 'dart:io';

main() {
  HttpServer.bind(InternetAddress.ANY_IP_V4, 8080).then((HttpServer server) {
    server.listen((HttpRequest request) {
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.start();
      while (stopwatch.elapsedMilliseconds < 1000) { /* do nothing */ }
      request.response.statusCode = HttpStatus.OK;
      request.response.write(stopwatch.elapsedMilliseconds.toString());
      request.response.close().catchError(print);
    });
  });
}

对于每个请求,它都会忙一秒钟,然后完成。我以这种方式处理请求,以便可以预测其时间安排,因此我可以轻松地在Windows任务管理器中看到请求的效果(CPU核心使用率达到100%)。

我可以说这不是并行处理请求,因为:
  • 如果我将多个浏览器选项卡加载到http://example:8080/,然后全部刷新,则这些选项卡将依次依次加载,每个之间的间隔约为1秒。
  • 如果我在这些设置下使用负载测试工具wrk ... wrk -d 10 -c 8 -t 8 http://example:8080/ ...,它将在我给出的10秒钟内完成5至8个请求。如果服务器使用了我所有的8个内核,那么我希望有接近80个请求的数字。
  • 在wrk测试期间打开Windows任务管理器时,我发现只有一个内核的使用率接近100%,其余的则几乎处于空闲状态。

  • 因此,然后我尝试使用隔离,希望为每个请求手动产生一个新的隔离/线程:

    import 'dart:io';
    import 'dart:isolate';
    
    main() {
      HttpServer.bind(InternetAddress.ANY_IP_V4, 8080).then((HttpServer server) {
        server.listen((HttpRequest request) {
          spawnFunction(handleRequest).send(request);
        });
      });
    }
    
    handleRequest() {
      port.receive((HttpRequest request, SendPort sender) {
        Stopwatch stopwatch = new Stopwatch();
        stopwatch.start();
        while (stopwatch.elapsedMilliseconds < 1000) { /* do nothing */ }
        request.response.statusCode = HttpStatus.OK;
        request.response.write(stopwatch.elapsedMilliseconds.toString());
        request.response.close().catchError(print);
      });
    }
    

    这根本不起作用。它不喜欢我尝试将HttpRequest作为消息发送到隔离区。这是错误:

    #0      _SendPortImpl._sendInternal (dart:isolate-patch/isolate_patch.dart:122:3)
    #1      _SendPortImpl._sendNow (dart:isolate-patch/isolate_patch.dart:95:18)
    #2      _SendPortImpl.send (dart:isolate-patch/isolate_patch.dart:91:18)
    #3      main.<anonymous closure>.<anonymous closure> (file:///C:/Development/dartbenchmark/simple2.dart:7:40)
    #4      _StreamSubscriptionImpl._sendData (dart:async/stream_impl.dart:475:12)
    #5      _StreamImpl._sendData.<anonymous closure> (dart:async/stream_impl.dart:251:29)
    #6      _SingleStreamImpl._forEachSubscriber (dart:async/stream_impl.dart:335:11)
    #7      _StreamImpl._sendData (dart:async/stream_impl.dart:249:23)
    #8      _StreamImpl._add (dart:async/stream_impl.dart:51:16)
    #9      StreamController.add (dart:async/stream_controller.dart:10:35)
    #10     _HttpServer._handleRequest (http_impl.dart:1261:20)
    #11     _HttpConnection._HttpConnection.<anonymous closure> (http_impl.dart:1188:33)
    #12     _StreamSubscriptionImpl._sendData (dart:async/stream_impl.dart:475:12)
    #13     _StreamImpl._sendData.<anonymous closure> (dart:async/stream_impl.dart:251:29)
    #14     _SingleStreamImpl._forEachSubscriber (dart:async/stream_impl.dart:335:11)
    #15     _StreamImpl._sendData (dart:async/stream_impl.dart:249:23)
    #16     _StreamImpl._add (dart:async/stream_impl.dart:51:16)
    #17     StreamController.add (dart:async/stream_controller.dart:10:35)
    #18     _HttpParser._doParse (http_parser.dart:415:26)
    #19     _HttpParser._parse (http_parser.dart:161:15)
    #20     _HttpParser._onData._onData (http_parser.dart:509:11)
    #21     _StreamSubscriptionImpl._sendData (dart:async/stream_impl.dart:475:12)
    #22     _StreamImpl._sendData.<anonymous closure> (dart:async/stream_impl.dart:251:29)
    #23     _SingleStreamImpl._forEachSubscriber (dart:async/stream_impl.dart:335:11)
    #24     _StreamImpl._sendData (dart:async/stream_impl.dart:249:23)
    #25     _StreamImpl._add (dart:async/stream_impl.dart:51:16)
    #26     StreamController.add (dart:async/stream_controller.dart:10:35)
    #27     _Socket._onData._onData (dart:io-patch/socket_patch.dart:726:42)
    #28     _StreamSubscriptionImpl._sendData (dart:async/stream_impl.dart:475:12)
    #29     _StreamImpl._sendData.<anonymous closure> (dart:async/stream_impl.dart:251:29)
    #30     _SingleStreamImpl._forEachSubscriber (dart:async/stream_impl.dart:335:11)
    #31     _StreamImpl._sendData (dart:async/stream_impl.dart:249:23)
    #32     _StreamImpl._add (dart:async/stream_impl.dart:51:16)
    #33     StreamController.add (dart:async/stream_controller.dart:10:35)
    #34     _RawSocket._RawSocket.<anonymous closure> (dart:io-patch/socket_patch.dart:452:52)
    #35     _NativeSocket.multiplex (dart:io-patch/socket_patch.dart:253:18)
    #36     _NativeSocket.connectToEventHandler.<anonymous closure> (dart:io-patch/socket_patch.dart:338:54)
    #37     _ReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:81:92)
    
    Unhandled exception:
    Illegal argument(s): Illegal argument in isolate message : (object is a closure)
    #0      _throwDelayed.<anonymous closure> (dart:async/stream_impl.dart:22:5)
    #1      _asyncRunCallback._asyncRunCallback (dart:async/event_loop.dart:15:17)
    #2      _asyncRunCallback._asyncRunCallback (dart:async/event_loop.dart:25:9)
    #3      Timer.run.<anonymous closure> (dart:async/timer.dart:17:21)
    #4      Timer.run.<anonymous closure> (dart:async/timer.dart:25:13)
    #5      Timer.Timer.<anonymous closure> (dart:async-patch/timer_patch.dart:9:15)
    #6      _Timer._createTimerHandler._handleTimeout (timer_impl.dart:99:28)
    #7      _Timer._createTimerHandler._handleTimeout (timer_impl.dart:107:7)
    #8      _Timer._createTimerHandler.<anonymous closure> (timer_impl.dart:115:23)
    #9      _ReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:81:92)
    

    使用的版本:
  • Dart编辑器版本0.5.9_r22879
  • Dart SDK版本0.5.9.0_r22879

  • 是否可以使用Dart使用我的机器所有可用内核并行处理这些请求?

    最佳答案

    我写了一个叫做dart-isoserver的库来做这个。现在已经严重烂掉了,但是您可以看到这种方法。

    https://code.google.com/p/dart-isoserver/

    我所做的是通过隔离端口代理HttpRequest和HttpResponse,因为您无法直接发送它们。尽管有一些警告,但它仍然有效:

    请求和响应上的

  • I / O经过主要隔离,因此该部分不是并行的。但是,在工作隔离区中完成的其他工作并未阻止主隔离区。真正应该发生的是,套接字连接应该在隔离之间可转移。
  • 隔离中的异常会导致整个服务器瘫痪。 spawnFunction()现在具有一个未捕获的异常处理程序参数,因此这是可以修复的,但是spawnUri()没有。 dart-isoserver使用spawnUri()来实现热加载,因此必须将其删除。
  • 隔离启动起来有点慢,对于nginx和node.js所针对的数千个并发连接用例,您可能不希望每个连接一个。具有工作队列的隔离池可能会更好地执行,尽管这样做会消除您可以在工作进程中使用阻止I / O的出色功能。

  • 关于第一个代码示例的注释。正如您所注意到的,那肯定不会并行运行,因为Dart是单线程的。同一隔离中的Dart代码永远不会同时运行。

    关于dart - Dart:并行处理传入的HTTP请求,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16703631/

    相关文章:

    dart - 如何在 dart 中运行自定义 https 域?

    dart - 能否指定一个可由具有同步或异步方法的类实现的 Dart 接口(interface)?

    flutter - 根据另一个小部件的位置/大小定位/调整小部件的大小

    dart - 如何实现像 Youtube 和 Instagram 应用程序中那样的导航堆栈?

    websocket - Dart 中的 Dart websocket :io and dart:html

    dart - 如何编写Dart 'worker'服务器

    dart - Flutter 隔离与 future

    Flutter:运行多个方法