python - PySpark:如何在不达到速率限制的情况下调用 API/Web 服务?

标签 python google-maps apache-spark pyspark google-api

我有一个包含 4 列的 Spark DataFrame:location_string , locality , region , 和 country .我正在使用 Google Map 的 Geocode API 来解析每个 location_string然后用结果填NULL locality , regioncountry领域。
我已将调用地理编码库的函数设为 udf,但我面临的问题是,当我超过 Google API 策略的速率限制时,最终会得到“OVERLIMIT”响应状态。
以下是 Spark 数据帧的示例:

+--------------------------------------------------------------------------------------------------------+------------+------+-------+
|location_string                                                                                         |locality    |region|country|
+--------------------------------------------------------------------------------------------------------+------------+------+-------+
|-Tainan City-Tainan, Taiwan                                                                             |Tainan City |null  |TWN    |
|093 Cicero, IL                                                                                          |null        |null  |null   |
|1005 US 98 Bypass Suite 7 Columbia, MS 39429                                                            |null        |null  |null   |
|10210  Baltimore Avenue, College Park, MD, US 20740                                                     |College Park|MD    |null   |
|12 Braintree - Braintree, MA, 02184                                                                     |null        |null  |null   |
|1215 E.Main St. #1074 Carbondale, IL 62901,                                                             |null        |null  |null   |
|18 Fairview Heights - Fairview Heights, IL, 62208                                                       |null        |null  |null   |
|21000 Hayden Dr, Woodhaven, MI, US 48183                                                                |null        |null  |null   |
|2257 N. Germantown Pkwy in Cordova, TN                                                                  |null        |null  |null   |
|2335 S. Towne Ave., Pomona, CA, US 91766                                                                |Pomona      |CA    |null   |
|2976-Taylor Ave & Harford Rd (Parkville Shopping Center, Parkville, MARYLAND, UNITED STATES             |null        |null  |null   |
|3342 Southwest Military Drive, Texas3342 Southwest Military Drive, San Antonio, TX, 78211, United States|null        |null  |null   |
|444 Cedar St., Suite 201, St. Paul, MN, US 55101                                                        |St. Paul    |MN    |null   |
|4604 Lowe Road, Louisville, KY, US 40220                                                                |Louisville  |KY    |null   |
|4691 Springboro Pike, Moraine, OH, US 45439                                                             |null        |null  |null   |
|50 Hwy 79 Bypass N Ste K Magnolia, AR 71753                                                             |null        |null  |null   |
|5188 Commerce Dr., Baldwin Park, CA, US 91706                                                           |Baldwin Park|CA    |null   |
|55445                                                                                                   |null        |null  |null   |
|5695 Harvey St, Muskegon, MI 49444                                                                      |null        |null  |null   |
|6464 Downing Street, Denver, CO, US 80229                                                               |null        |null  |null   |
+--------------------------------------------------------------------------------------------------------+------------+------+-------+
为了解决这个问题,我有一个这样的功能:
def geocoder_decompose_location(location_string):
    if not location_string:
        return Row('nation', 'state', 'city')(None, None, None)
    
    GOOGLE_GEOCODE_API_KEYS = [key1, key2, key3]
    
    GOOGLE_GEOCODE_API_KEY = random.choice(GOOGLE_GEOCODE_API_KEYS)
    
    attempts = 0
    success = False
    while status != True and attempts < 5:
        result = geocoder.google(location_string, key=GOOGLE_GEOCODE_API_KEY)
        attempts += 1
        status = result.status
        if status == 'OVER_QUERY_LIMIT':
            time.sleep(2)
            
            # retry
            continue
        
        success = True
    
    if attempts == 5:
        print('Daily Limit Reached')
        
    return Row('nation', 'state', 'city')(result.country, result.state, result.city)
但它似乎并没有像预期的那样处理 spark 数据框。任何指导将不胜感激!

最佳答案

解决这个问题的最简单方法是用指数回退替换 sleep 函数。用...

time.sleep(Math.exp(attempts)


这会将您的读取率降低到节流限制以下。您也可以通过添加 .coalesce 或 .repartition(max_parallelism) 来控制 Spark 最大并行度

关于python - PySpark:如何在不达到速率限制的情况下调用 API/Web 服务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63728124/

相关文章:

python - 单个表上的 SQLAlchemy 多对多关系

javascript - 如何在 D3 map 上添加线条箭头(标记)?

apache-spark - 如何检查结构化流中的StreamingQuery性能指标?

java - 用于搜索带有空格和反斜杠性能问题的字符串的正则表达式程序

javascript - 信息窗口按钮 jQuery 事件

java - 如何避免连续出现 "Resetting offset"和 "Seeking to LATEST offset"?

python - Tf-Idf 值是如何用分析器 ='char' 计算出来的?

python - 内存使用过多 xarray `to_dataframe()`

python - 如何查找列表中某个元素的所有出现位置

android - 无法导入 google-play-services_lib,因为项目名称正在使用中