python-3.x - botocore.exceptions.ClientError : An error occurred (ExpiredTokenException) The security token included in the request is expired

标签 python-3.x azure-devops devops aws-step-functions

我遇到 token 过期问题。目前,它将于 60 分钟到期。此问题的问题在于,此步骤函数将运行超过 17 小时,因此我需要能够捕获此 session 的异常或重新承担角色,而不会中断或停止 python 中的步骤函数执行。现行政策无法更改,我需要解决此问题。任何使用 aws Secret Manager 对其进行缓存并在 Python 脚本中使用它的指针。

错误:

> Status...: RUNNING
> Status...: RUNNING
Traceback (most recent call last):
    sf_response = sf_client.describe_execution(executionArn=sf_output['executionArn'])
  File "/opt/hostedtoolcache/Python/3.8.12/x64/lib/python3.8/site-packages/botocore/client.py", line 401, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/opt/hostedtoolcache/Python/3.8.12/x64/lib/python3.8/site-packages/botocore/client.py", line 731, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (ExpiredTokenException) when calling the DescribeExecution operation: The security token included in the request is expired
##[error]The process '/opt/hostedtoolcache/Python/3.8.12/x64/bin/python' failed with exit code  1

Python 代码:

import os
import logging
import snowflake.connector
from argparse import ArgumentParser
from datetime import datetime
from typing import Tuple
import time
from time import sleep
import boto3
import json
from botocore.exceptions import ClientError


sts_client = boto3.client('sts')

   
                                                        
session = boto3.Session()
session.get_credentials().secret_key
print(session.get_credentials().secret_key)

sf_client = boto3.client('stepfunctions', region_name="us-west-2")
sf_output = sf_client.start_execution (
    stateMachineArn = 'arn:aws:states:us-west-2:xxxxx:stateMachine:PredictiveAnalyticsPipelineOrchestration-xxxxxxxx',  
   #input = json.dumps({})  # this is for all 
   #input='"{\"basin_list\" : \"PEDREGOSA_BASIN\"}"'
    input ='{ \"basin_list\": [\"RIpe\"],\"db\": \"RAMAN\",\"pipelinePhases\": \"lifestage,monthly_production,tiers,sequence,quintiles\"}'
)

while True:
    time.sleep(10)   # don't need to check every nanosecond

    sf_response = sf_client.describe_execution(executionArn=sf_output['executionArn'])
    step_status = sf_response['status'] # BE SURE TO GET THE CURRENT STATE

    print("%s: %s" % ("> Status...", step_status))

    if step_status == 'RUNNING':
        continue
    elif step_status == 'FAILED':
        print(step_status)
        print (f'##vso[task.setvariable variable=step_status]{step_status}')
        print(sf_response)
        #raise Exception("%s: %s" % ("! ERROR ! Execution FAILED: ", sf_response))
        break
    elif step_status == 'TIMED_OUT':
        print(step_status)
        print (f'##vso[task.setvariable variable=step_status]{step_status}')
        
        break
    elif step_status == 'ABORTED':
        print(step_status)
        print (f'##vso[task.setvariable variable=step_status]{step_status}')
        
        break
    else: # SUCCEEDED
        step_status == 'SUCCEEDED'
        print(step_status)
        print (f'##vso[task.setvariable variable=step_status]{step_status}')
        print(sf_response)
        break

管道代码:

jobs:
  - job: determine_the_stepfunction_status
    timeoutInMinutes: 5000
    cancelTimeoutInMinutes: 3
    steps:

      - task: AWSAssumeRole@1
        displayName: 'Login to AWS'
        inputs:
          RoleArn: 'arn:aws:iam::$(AWS_ACCOUNT_ID):role/Energyxxxxxx-xxxxx-Role'
          SessionName: 'Energyxxxxxx-xxxxx-Role'
          ConfigureAWSCLIEnvironmentVariables: true
      - task: UsePythonVersion@0
        inputs:
          versionSpec: '3.8'
          #addToPath: true
          #architecture: 'x64' # Options: x86, x64 (this argument applies only on Windows agents)
      - script: python -m pip install --upgrade pip boto3 setuptools sqlalchemy snowflake.sqlalchemy
        displayName: 'Install python tools'
     
      - task: PythonScript@0
        env:
           STEP_STATUS: $(step_status)
           AWS_ACCESS_KEY_ID: $(AWS.AccessKeyID)
           AWS_SECRET_ACCESS_KEY: $(AWS.SecretAccessKey)
           AWS_SESSION_TOKEN: $(AWS.SessionToken)
                    
        inputs:
          scriptSource: 'filePath' # Options: filePath, inline
          scriptPath: 'step_function.py'
          
          failOnStderr: false # Optional

最佳答案

现在已经解决了这个问题,使用 python 每 45 分钟刷新一次 session 。 python调整为:

import os
import logging
import snowflake.connector
from argparse import ArgumentParser
from datetime import datetime
from typing import Tuple
import time
from time import sleep
import boto3
import botocore
import json
import base64
import botocore.session
from botocore.credentials import AssumeRoleCredentialFetcher, DeferredRefreshableCredentials
from botocore.exceptions import ClientError
from botocore.session import get_session

accounts = [
    {"name": "Prod", "id": "youraccountid"},
    #{"name": "Account2", "id": "xxxxxxxxxxx"}  # you can add this if you have multiple account
]

regions = [ "us-west-2" ]

# Replace myRole with your local named profile
boto3.setup_default_session()

# 3600 seconds in an hour, this value should match your role's but am using 45 minutes refresh
# maximum session duration (AWS default is 1 hour). If you're
# role chaining (e.g. saml2aws) 1 hour is a hard limit.
def refresh_external_credentials():
    # Assume role, get details
    client = boto3.client('sts')
    credentials = client.assume_role(
        RoleArn='arn:aws:iam::youraccountid:role/Energyxxxxxx-xxxxx-Role',
        RoleSessionName="Energyxxxxxx-xxxxx-Role", # this name does not matter
        DurationSeconds=3000
    ).get("Credentials")
    return {
        "access_key": credentials.get('AccessKeyId'),
        "secret_key": credentials.get('SecretAccessKey'),
        "token": credentials.get('SessionToken'),
        "expiry_time": credentials.get('Expiration').isoformat()
    }

roleArn = ''

for account in accounts:
    id = account.get('id')
    accountName = account.get('name')
    
    # Replace roleToAssume with your target role
    roleArn = 'arn:aws:iam::' + str(id) + ':role/Energyxxxxxx-xxxxx-Role'
    
    credentials = botocore.credentials.RefreshableCredentials.create_from_metadata(
        metadata=refresh_external_credentials(),
        refresh_using=refresh_external_credentials,
        method="sts-assume-role",
    )
    
    for region in regions:
        session = get_session()
        session._credentials = credentials
        session.set_config_variable("region", region)
        autorefresh_session = boto3.session.Session(botocore_session=session)
        
        # Your boto3 calls, for example...
        #rds = autorefresh_session.client('rds')
        #databases = rds.describe_db_instances()
        sf_client = autorefresh_session.client('stepfunctions')
sf_output = sf_client.start_execution (
    stateMachineArn = 'arn:aws:states:us-west-2:youraccountid:stateMachine:name_of_your_state_machine',  
   #input = json.dumps({})  # this is for all 
   #input='"{\"basin_list\" : \"PEDREGOSA_BASIN\"}"'
    input ='{ \"basin_list\": [\"POwer\",\"BOW\"],\"db_postfix\": \"schemaname\",\"pipe\": \"etl,frac,ecline,capex,breakeven,lifestage\"}'
)



while True:
    time.sleep(10)   # don't need to check every nanosecond

    sf_response = sf_client.describe_execution(executionArn=sf_output['executionArn'])
    step_status = sf_response['status'] # BE SURE TO GET THE CURRENT STATE

    print("%s: %s" % ("> Status...", step_status))

    if step_status == 'RUNNING':
        continue
    elif step_status == 'FAILED':
        print(step_status)
        print (f'##vso[task.setvariable variable=step_status]{step_status}')
        print(sf_response)
        #raise Exception("%s: %s" % ("! ERROR ! Execution FAILED: ", sf_response))
        break
    elif step_status == 'TIMED_OUT':
        print(step_status)
        print (f'##vso[task.setvariable variable=step_status]{step_status}')
        
        break
    elif step_status == 'ABORTED':
        print(step_status)
        print (f'##vso[task.setvariable variable=step_status]{step_status}')
        
        break
    else: # SUCCEEDED
        step_status == 'SUCCEEDED'
        print(step_status)
        print (f'##vso[task.setvariable variable=step_status]{step_status}')
        print(sf_response)
        break

关于管道,不需要承担角色以避免与 python 脚本内的其他角色发生冲突

jobs:
  - job: determine_the_stepfunction_status
    timeoutInMinutes: 5000
    cancelTimeoutInMinutes: 3
    steps:

      - task: UsePythonVersion@0
        inputs:
          versionSpec: '3.8'
          #addToPath: true
          #architecture: 'x64' # Options: x86, x64 (this argument applies only on Windows agents)
      - script: python -m pip install --upgrade pip boto3 setuptools sqlalchemy snowflake.sqlalchemy
        displayName: 'Install python tools'
     
      - task: PythonScript@0
        env:
           STEP_STATUS: $(step_status)
           AWS_ACCESS_KEY_ID: $(AWS.AccessKeyID)
           AWS_SECRET_ACCESS_KEY: $(AWS.SecretAccessKey)
           AWS_SESSION_TOKEN: $(AWS.SessionToken)
                    
        inputs:
          scriptSource: 'filePath' # Options: filePath, inline
          scriptPath: 'step_function.py'
          
          failOnStderr: false # Optional

关于python-3.x - botocore.exceptions.ClientError : An error occurred (ExpiredTokenException) The security token included in the request is expired,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71668986/

相关文章:

python - 使用 .loc 选择 DatetimeIndex 行范围(Pandas Python 3)

python - 改变音效 Pygame

git - 无法确定此 'dev.azure.com' 远程 URL 的组织名称

c# - 一劳永逸地修复 NuGet 包和引用

java - Maven 如何在复制资源时仅包含选定的文件夹

Azure函数应用部署错误 "Encountered an error (ServiceUnavailable) from host runtime. (CODE: 400)"

python - 如何自定义异常消息

azure - 来自 SIG 的 Microsoft Azure 托管应用程序 ARM 模板

vagrant - 使用 Vagrant 在没有 VirtualBox 的情况下使用 KVM/qemu 设置 VM

python - Long Int 文字 - 语法无效?