我有一个包含 4 列的 Spark DataFrame:location_string
, locality
, region
, 和 country
.我正在使用 Google Map 的 Geocode API 来解析每个 location_string
然后用结果填NULL locality
, region
和 country
领域。
我已将调用地理编码库的函数设为 udf,但我面临的问题是,当我超过 Google API 策略的速率限制时,最终会得到“OVERLIMIT”响应状态。
以下是 Spark 数据帧的示例:
+--------------------------------------------------------------------------------------------------------+------------+------+-------+
|location_string |locality |region|country|
+--------------------------------------------------------------------------------------------------------+------------+------+-------+
|-Tainan City-Tainan, Taiwan |Tainan City |null |TWN |
|093 Cicero, IL |null |null |null |
|1005 US 98 Bypass Suite 7 Columbia, MS 39429 |null |null |null |
|10210 Baltimore Avenue, College Park, MD, US 20740 |College Park|MD |null |
|12 Braintree - Braintree, MA, 02184 |null |null |null |
|1215 E.Main St. #1074 Carbondale, IL 62901, |null |null |null |
|18 Fairview Heights - Fairview Heights, IL, 62208 |null |null |null |
|21000 Hayden Dr, Woodhaven, MI, US 48183 |null |null |null |
|2257 N. Germantown Pkwy in Cordova, TN |null |null |null |
|2335 S. Towne Ave., Pomona, CA, US 91766 |Pomona |CA |null |
|2976-Taylor Ave & Harford Rd (Parkville Shopping Center, Parkville, MARYLAND, UNITED STATES |null |null |null |
|3342 Southwest Military Drive, Texas3342 Southwest Military Drive, San Antonio, TX, 78211, United States|null |null |null |
|444 Cedar St., Suite 201, St. Paul, MN, US 55101 |St. Paul |MN |null |
|4604 Lowe Road, Louisville, KY, US 40220 |Louisville |KY |null |
|4691 Springboro Pike, Moraine, OH, US 45439 |null |null |null |
|50 Hwy 79 Bypass N Ste K Magnolia, AR 71753 |null |null |null |
|5188 Commerce Dr., Baldwin Park, CA, US 91706 |Baldwin Park|CA |null |
|55445 |null |null |null |
|5695 Harvey St, Muskegon, MI 49444 |null |null |null |
|6464 Downing Street, Denver, CO, US 80229 |null |null |null |
+--------------------------------------------------------------------------------------------------------+------------+------+-------+
为了解决这个问题,我有一个这样的功能:def geocoder_decompose_location(location_string):
if not location_string:
return Row('nation', 'state', 'city')(None, None, None)
GOOGLE_GEOCODE_API_KEYS = [key1, key2, key3]
GOOGLE_GEOCODE_API_KEY = random.choice(GOOGLE_GEOCODE_API_KEYS)
attempts = 0
success = False
while status != True and attempts < 5:
result = geocoder.google(location_string, key=GOOGLE_GEOCODE_API_KEY)
attempts += 1
status = result.status
if status == 'OVER_QUERY_LIMIT':
time.sleep(2)
# retry
continue
success = True
if attempts == 5:
print('Daily Limit Reached')
return Row('nation', 'state', 'city')(result.country, result.state, result.city)
但它似乎并没有像预期的那样处理 spark 数据框。任何指导将不胜感激!
最佳答案
解决这个问题的最简单方法是用指数回退替换 sleep 函数。用...
time.sleep(Math.exp(attempts)
这会将您的读取率降低到节流限制以下。您也可以通过添加 .coalesce 或 .repartition(max_parallelism) 来控制 Spark 最大并行度
关于python - PySpark:如何在不达到速率限制的情况下调用 API/Web 服务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63728124/