python - 在 grpc python 中处理异步流请求

标签 python python-asyncio grpc grpc-python

我正在尝试了解如何使用双向流处理 grpc api(使用 Python API)。

假设我有以下简单的服务器定义:

syntax = "proto3";
package simple;

service TestService {
  rpc Translate(stream Msg) returns (stream Msg){}
}

message Msg
{
 string msg = 1;
}

假设将从客户端发送的消息是异步的(作为用户选择某些 ui 元素的结果)。

为客户端生成的 python stub 将包含一个方法 Translate,它将接受一个生成器函数并返回一个迭代器。

我不清楚的是,我将如何编写生成器函数来返回用户创建的消息。在等待消息的同时在线程上休眠听起来不是最好的解决方案。

最佳答案

现在这有点笨拙,但您可以按如下方式完成您的用例:

#!/usr/bin/env python

from __future__ import print_function

import time
import random
import collections
import threading

from concurrent import futures
from concurrent.futures import ThreadPoolExecutor
import grpc

from translate_pb2 import Msg
from translate_pb2_grpc import TestServiceStub
from translate_pb2_grpc import TestServiceServicer
from translate_pb2_grpc import add_TestServiceServicer_to_server


def translate_next(msg):
    return ''.join(reversed(msg))


class Translator(TestServiceServicer):
  def Translate(self, request_iterator, context):
    for req in request_iterator:
      print("Translating message: {}".format(req.msg))
      yield Msg(msg=translate_next(req.msg))

class TranslatorClient(object):
  def __init__(self):
    self._stop_event = threading.Event()
    self._request_condition = threading.Condition()
    self._response_condition = threading.Condition()
    self._requests = collections.deque()
    self._last_request = None
    self._expected_responses = collections.deque()
    self._responses = {}

  def _next(self):
    with self._request_condition:
      while not self._requests and not self._stop_event.is_set():
        self._request_condition.wait()
      if len(self._requests) > 0:
        return self._requests.popleft()
      else:
        raise StopIteration()

  def next(self):
    return self._next()

  def __next__(self):
    return self._next()

  def add_response(self, response):
    with self._response_condition:
      request = self._expected_responses.popleft()
      self._responses[request] = response
      self._response_condition.notify_all()

  def add_request(self, request):
    with self._request_condition:
      self._requests.append(request)
      with self._response_condition:
        self._expected_responses.append(request.msg)
      self._request_condition.notify()

  def close(self):
    self._stop_event.set()
    with self._request_condition:
      self._request_condition.notify()

  def translate(self, to_translate):
    self.add_request(to_translate)
    with self._response_condition:
      while True:
        self._response_condition.wait()
        if to_translate.msg in self._responses:
          return self._responses[to_translate.msg]


def _run_client(address, translator_client):
  with grpc.insecure_channel('localhost:50054') as channel:
    stub = TestServiceStub(channel)
    responses = stub.Translate(translator_client)
    for resp in responses:
      translator_client.add_response(resp)

def main():
  server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
  add_TestServiceServicer_to_server(Translator(), server)
  server.add_insecure_port('[::]:50054')
  server.start()
  translator_client = TranslatorClient()
  client_thread = threading.Thread(
      target=_run_client, args=('localhost:50054', translator_client))
  client_thread.start()

  def _translate(to_translate):
    return translator_client.translate(Msg(msg=to_translate)).msg

  translator_pool = futures.ThreadPoolExecutor(max_workers=4)
  to_translate = ("hello", "goodbye", "I", "don't", "know", "why",)
  translations = translator_pool.map(_translate, to_translate)
  print("Translations: {}".format(zip(to_translate, translations)))

  translator_client.close()
  client_thread.join()
  server.stop(None)


if __name__ == "__main__":
  main()

基本思想是让一个名为 TranslatorClient 的对象在单独的线程上运行,关联请求和响应。它期望响应将按照请求发出的顺序返回。它还实现了迭代器接口(interface),以便您可以将其直接传递给对 stub 上的 Translate 方法的调用。

我们启动一个运行 _run_client 的线程,它从 TranslatorClient 中提取响应,并在另一端使用 add_response 将它们反馈回来。

我在此处包含的 main 函数实际上只是一个稻草人,因为我没有您的 UI 代码的详细信息。我在 ThreadPoolExecutor 中运行 _translate 来证明,即使 translator_client.translate 是同步的,它也会产生,让你有多个- 一次航类请求。

我们认识到,要为这样一个简单的用例编写大量代码。最终,答案将是 asyncio 支持。我们在不久的将来有这方面的计划。但目前,无论您运行的是 python 2 还是 python 3,这种解决方案都应该能让您继续前进。

关于python - 在 grpc python 中处理异步流请求,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55029342/

相关文章:

python - 如何在 Python 中列出导入的类?

Python错误: Cannot find the file specified

python - Sqlalchemy:多对多关系错误

python - 确保 aiohttp/asyncio 中递归函数的 future

java - ServerBuilder.forPort(端口) - java.lang.NoSuchMethodError

python - 如何在 python 中有效地加载大文本文件

python - 每个线程一个 aiohttp ClientSession?

python - 异步IO。如何异步调用1000个方法并在准备好后立即获取异步结果

google-cloud-platform - 如何安排任务调用 gRPC 方法?

kubernetes - 入口不适用于GRPC网关。 HTTP请求未按入口预期的那样转发到GRPC网关