正如标题所说,我想使用 python 在 apache beam 中进行异步 API 调用。
目前,我正在 DoFn 内为 Pcollection 中的每个元素调用 API。
DoFn 代码
class textapi_call(beam.DoFn):
def __init__(self, api_key):
self.api_key = api_key
def setup(self):
self.session = requests.session()
def process(self, element):
address = element[3] + ", " + element[4] + ", " + element[5] + ", " + element[6] + ", " + element[7]
url = findplace_url(address, api_key=self.api_key)
params = {"inputtype": "textquery",
"fields": "name,place_id,geometry,type,icon,permanently_closed,business_status"}
start = time.time()
res = self.session.get(url, params=params)
results = json.loads(res.content)
time_taken = time.time() - start
return [[element[0], address, str(results), time_taken]]
管道代码:
with beam.Pipeline(options=pipeline_options) as p:
lines = p | ReadFromText(input_file,skip_header_lines=1)
lines_list = lines | "to list" >> beam.Map(parse_csv)
res = lines_list | "API calls" >> beam.ParDo(textapi_call(api_key))
如何修改代码以使 API 调用异步或并发? 我在 python 中找不到与此相关的任何示例。
我想主要提高性能。除了水平缩放之外,是否还有其他方法可以使 Beam 中的 API 调用更快,请告诉我。
最佳答案
对于这样的事情,您大部分时间都在等待外部服务,有几种方法可以加快速度。
最简单的方法是使用 BatchElements 之类的东西接下来是一个可以并行处理整个批处理的 DoFn。当相关服务具有批处理 API 时,这种方法效果最佳,但也可以手动执行,例如
class ProcessBatchDoFn(beam.DoFo):
def setup(self):
self.session = requests.session()
self.executor = concurrent.futures.Executor(...)
def process(batch_of_elements):
urls = [element_to_url(e) for e in batch_of_elements]
for element, result in zip(
batch_of_elements,
self.executor.map(self.session.get, urls))
yield element[0], url, str(result) # or whatever is needed downstream
用作
with beam.Pipeline(options=pipeline_options) as p:
lines = p | ReadFromText(input_file,skip_header_lines=1)
lines_list = lines | "to list" >> beam.Map(parse_csv)
res = lines_list | beam.BatchElements() | beam.ParDo(ProcessBatchDoFn())
也许使用类似 https://github.com/ross/requests-futures 的东西可以让这变得更简单。
只要 URL 的获取时间大致相同,这种方法就可以正常工作。另一个选择是滚动请求,例如
def ExpensivePerElementDoFn(beam.DoFn):
def setup(self):
self.executor = concurrent.futures.Executor(...)
def start_bundle(self):
self.pending = []
def process(element):
self.pending.append(self.executor.submit(fn, element))
yield from self.flush_done()
while len(self.pending) > MAX_CONCURRENT_REQUESTS:
time.sleep(0.1)
yield from self.flush_done()
def flush_done(self):
done = [ix for ix, future in enumerate(self.pending) if future.done()]
for ix in reversed(done):
future = self.pending.pop(ix)
yield future.result()
def finish_bundle(self):
for future in self.pending():
yield future.result()
关于python - Apache Beam 中的异步 API 调用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72842846/