python - 将 JSON 文件推送到 Firehose 时格式转换失败

标签 python json pandas amazon-kinesis-firehose

我有一个 pandas 数据框 stocks_df 如下:

| Date      | Price|Symbol|
|-----------|------|------|
| 2022-07-10|150.54| APPL |

我需要将其转换为 JSON 并将其推送到 firehose

{"Date":"2022-07-11","Price":65.09,"Symbol":"ACM"}

我收到一些错误,并在我的 S3 存储桶 format-conversion-failed/ 中创建了一个文件夹,文件中包含以下内容:

{"attemptsMade":1,"arrivalTimestamp":1657602058357,"lastErrorCode":"DataFormatConversion.MalformedData","lastErrorMessage":"The input JSON contained a primitive at the top level. The top level must be an object or array.","attemptEndingTimestamp":1657602335593,"rawData":"IntcIkRhdGVcIjpcIjIwMjItMDctMTFcIixcIlByaWNlXCI6OC4yNixcIlN5bWJvbFwiOlwiQUNSXCJ9Ig==","sequenceNumber":"49631306863317095792192735938176634752612190707410534402","subSequenceNumber":null,"dataCatalogTable":{"catalogId":null,"databaseName":"XXXXXX","tableName":"XXXXXXXXX","region":"us-east-1","versionId":"LATEST","roleArn":"arn:aws:iam::XXXXXXXXXXXXXXXX:role/service-role/XXXXXXXXXXXXXXXXXXXXXXX-PUT-S3-bndnet-us-east-1-XXXXXXXXXXXXXXX"}}

这是 RawData 的解码值:"{\"Date\":\"2022-07-11\",\"Price\":0.16,\"Symbol\":\"ACQRW\“}”

这是我的功能:

def push_file_to_firehose(stocks_df):
    fh = boto3.client('firehose')
    stocks_json = stocks_df.to_json(orient='records').strip('[]')
    logger.info(stocks_json)
    try: 
        fh.put_record(DeliveryStreamName='PUT-S3-bndnetworks', \
        Record={'Data': json.dumps(stocks_json)} )
        logger.info("Successfully pushed the file into Firehose")
    except Exception as e:
        logging.error("Error while pushing the file into Firehose")
        logging.error(e)

最佳答案

Data 的值必须采用 base64 编码(bytes),而不是 JSON 字符串(str) .

示例:

import base64
import json

fh.put_record(
    DeliveryStreamName='PUT-S3-bndnetworks',
    Record={
        'Data': base64.b64encode(
            json.dumps(stocks_json).encode('utf-8')
        ).decode('utf-8')
    }
)

引用:put_record() (boto) (不可否认,文档有点令人困惑)

关于python - 将 JSON 文件推送到 Firehose 时格式转换失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72947404/

相关文章:

python - 使用 groupby 就地转换 pandas

csv - 读取具有多个 header 的 CSV

python - Pandas 汇总统计的经济状况调查有何不同?

python - Mypy 索引类型 "str"为 "Union[str, Dict[str, str]]"无效;预期类型 "Union[int, slice]"

Node.js 中的 JSON 解析错误

php - 使用PHP将Json数据传输到MySQL

python - 如何根据列名属性对 Pandas 数据框进行切片?

Python timeit NameError

python - 使用 eval 优化 Python 读取大文件

json - 如何检查字符串是否为json格式