我有一个 pandas 数据框 stocks_df
如下:
| Date | Price|Symbol|
|-----------|------|------|
| 2022-07-10|150.54| APPL |
我需要将其转换为 JSON 并将其推送到 firehose
{"Date":"2022-07-11","Price":65.09,"Symbol":"ACM"}
我收到一些错误,并在我的 S3 存储桶 format-conversion-failed/
中创建了一个文件夹,文件中包含以下内容:
{"attemptsMade":1,"arrivalTimestamp":1657602058357,"lastErrorCode":"DataFormatConversion.MalformedData","lastErrorMessage":"The input JSON contained a primitive at the top level. The top level must be an object or array.","attemptEndingTimestamp":1657602335593,"rawData":"IntcIkRhdGVcIjpcIjIwMjItMDctMTFcIixcIlByaWNlXCI6OC4yNixcIlN5bWJvbFwiOlwiQUNSXCJ9Ig==","sequenceNumber":"49631306863317095792192735938176634752612190707410534402","subSequenceNumber":null,"dataCatalogTable":{"catalogId":null,"databaseName":"XXXXXX","tableName":"XXXXXXXXX","region":"us-east-1","versionId":"LATEST","roleArn":"arn:aws:iam::XXXXXXXXXXXXXXXX:role/service-role/XXXXXXXXXXXXXXXXXXXXXXX-PUT-S3-bndnet-us-east-1-XXXXXXXXXXXXXXX"}}
这是 RawData 的解码值:"{\"Date\":\"2022-07-11\",\"Price\":0.16,\"Symbol\":\"ACQRW\“}”
这是我的功能:
def push_file_to_firehose(stocks_df):
fh = boto3.client('firehose')
stocks_json = stocks_df.to_json(orient='records').strip('[]')
logger.info(stocks_json)
try:
fh.put_record(DeliveryStreamName='PUT-S3-bndnetworks', \
Record={'Data': json.dumps(stocks_json)} )
logger.info("Successfully pushed the file into Firehose")
except Exception as e:
logging.error("Error while pushing the file into Firehose")
logging.error(e)
最佳答案
Data
的值必须采用 base64
编码(bytes
),而不是 JSON 字符串(str
) .
示例:
import base64
import json
fh.put_record(
DeliveryStreamName='PUT-S3-bndnetworks',
Record={
'Data': base64.b64encode(
json.dumps(stocks_json).encode('utf-8')
).decode('utf-8')
}
)
引用:put_record()
(boto) (不可否认,文档有点令人困惑)
关于python - 将 JSON 文件推送到 Firehose 时格式转换失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72947404/