我有以下代码:
这部分在内存中生成 CSV:
def to_csv(events: list) -> io.BytesIO():
if not events:
return None
bio = io.BytesIO()
iow = io.TextIOWrapper(bio)
writer = csv.DictWriter(iow, fieldnames=events[0].keys())
writer.writeheader()
writer.writerows(events)
iow.flush()
bio.seek(0)
return bio
此部分将此文件发送到 FTP 服务器:
def send_data(self, bytes: io.BytesIO) -> str:
filename = f"{time.time()}.csv"
if not bytes:
self.__logger.warning("No data to send")
return None
try:
self.__ftp.storbinary(f"STOR {filename}", bytes)
except ftp.all_errors as e:
self.__logger.error(
"FTP fail data send",
extra={
"host": self.__cfg.ftp.host,
"type": type(e).__name__,
"line": e.__traceback__.tb_lineno,
"file": __file__,
"detail": e,
},
)
return None
传递给 send_data 的文件已关闭。我该如何重新打开它? 我试图做这样的事情:
f = open(bytes, "rb")
print(f.getvalue())
但是返回错误:
expected str, bytes or os.PathLike object, not _io.BytesIO
回溯:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/python.py", line 171, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/python.py", line 189, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/opt/airflow/dags/accounting/main.py", line 29, in process_events
result = Uploader(logger, cfg).send_data(csv_file)
File "/opt/airflow/dags/accounting/packages/uploader.py", line 57, in send_data
self.__ftp.storbinary(f"STOR {filename}", bytes)
File "/usr/local/lib/python3.7/ftplib.py", line 513, in storbinary
buf = fp.read(blocksize)
ValueError: I/O operation on closed file.
最佳答案
一旦 to_csv
使用 io.TextIOWrapper
完成,它需要从底层 io.BytesIO
对象分离包装器,以停止试图关闭 BytesIO 对象的包装器:
iow.detach()
关于python - 使用函数打开文件(类型为 BytesIO),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73249469/