python - Apache Beam 中的异步 API 调用

标签 python python-requests python-asyncio google-cloud-dataflow apache-beam

正如标题所说,我想使用 python 在 apache beam 中进行异步 API 调用。

目前,我正在 DoFn 内为 Pcollection 中的每个元素调用 API。

DoFn 代码

class textapi_call(beam.DoFn):
    def __init__(self, api_key):
        self.api_key = api_key


    def setup(self):
        self.session = requests.session()

    def process(self, element):
        address = element[3] + ", " + element[4] + ", " + element[5] + ", " + element[6] + ", " + element[7]
        url = findplace_url(address, api_key=self.api_key)
        params = {"inputtype": "textquery",
                  "fields": "name,place_id,geometry,type,icon,permanently_closed,business_status"}
        start = time.time()
        res = self.session.get(url, params=params)
        results = json.loads(res.content)
        time_taken = time.time() - start

        return [[element[0], address, str(results), time_taken]]

管道代码:

with beam.Pipeline(options=pipeline_options) as p:
    lines = p | ReadFromText(input_file,skip_header_lines=1)
    lines_list = lines | "to list" >> beam.Map(parse_csv)
    res = lines_list | "API calls" >> beam.ParDo(textapi_call(api_key))

如何修改代码以使 API 调用异步或并发? 我在 python 中找不到与此相关的任何示例。

我想主要提高性能。除了水平缩放之外,是否还有其他方法可以使 Beam 中的 API 调用更快,请告诉我。

最佳答案

对于这样的事情,您大部分时间都在等待外部服务,有几种方法可以加快速度。

最简单的方法是使用 BatchElements 之类的东西接下来是一个可以并行处理整个批处理的 DoFn。当相关服务具有批处理 API 时,这种方法效果最佳,但也可以手动执行,例如

class ProcessBatchDoFn(beam.DoFo):
  def setup(self):
    self.session = requests.session()
    self.executor = concurrent.futures.Executor(...)

  def process(batch_of_elements):
    urls = [element_to_url(e) for e in batch_of_elements]
    for element, result in zip(
        batch_of_elements,
        self.executor.map(self.session.get, urls))
        yield element[0], url, str(result)  # or whatever is needed downstream

用作

with beam.Pipeline(options=pipeline_options) as p:
    lines = p | ReadFromText(input_file,skip_header_lines=1)
    lines_list = lines | "to list" >> beam.Map(parse_csv)
    res = lines_list | beam.BatchElements() |  beam.ParDo(ProcessBatchDoFn())

也许使用类似 https://github.com/ross/requests-futures 的东西可以让这变得更简单。

只要 URL 的获取时间大致相同,这种方法就可以正常工作。另一个选择是滚动请求,例如

def ExpensivePerElementDoFn(beam.DoFn):
  def setup(self):
    self.executor = concurrent.futures.Executor(...)

  def start_bundle(self):
    self.pending = []

  def process(element):
    self.pending.append(self.executor.submit(fn, element))
    yield from self.flush_done()
    while len(self.pending) > MAX_CONCURRENT_REQUESTS:
      time.sleep(0.1)
      yield from self.flush_done()

  def flush_done(self):
    done = [ix for ix, future in enumerate(self.pending) if future.done()]
    for ix in reversed(done):
      future = self.pending.pop(ix)
      yield future.result()

  def finish_bundle(self):
    for future in self.pending():
      yield future.result()

关于python - Apache Beam 中的异步 API 调用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72842846/

相关文章:

python - 如何在不引发 UnicodeEncodeError 的情况下覆盖 str 函数?

python - Pandas 在给定日期范围内过滤和标记数据

Python 干+请求 : Not switching circut/changing IP address when using a session

javascript - 在Python中使用Asyncio和for循环类似于Javascript中的map方法和promise.all

python - Selenium 中发现元素的顺序

Python - SSL - 错误的版本号

python - 在 `with` 语句中使用 requests.post() 时出现 AttributeError

python-requests - Uber API - 请求端点无法读取 json

python - 使用 Django Channels 和 pytest-asyncio 测试消费者方法是否可以引发异常

python - ValueError : set_wakeup_fd only works in main thread on Windows on Python 3. 8 使用 Django 3.0.2 或 Flask 2.0.0