我在 app_consumer.py
和 tasks_consumer.py
文件中编写了以下 Python 代码,以将数据流式传输到 consumer.html
Jinja 模板.
app_consumer.py
from flask import render_template, Response, request
from flask_socketio import join_room
from init_consumer import app, socketio
import tasks_consumer
import uuid
def render_template_stream(template_name, **context):
app.update_template_context(context)
t = app.jinja_env.get_template(template_name)
rv = t.stream(context)
rv.enable_buffering(5)
return rv
@app.before_request
def initialize_params():
if not hasattr(app.config,'uid'):
sid = str(uuid.uuid4())
app.config['uid'] = sid
print("initialize_params - Session ID stored =", sid)
@app.route("/", methods=['GET'])
def index():
return render_template('consumer.html', stockInfo = {})
@app.route('/consumetasks', methods=['GET','POST'])
def getStockStatus():
if request.method == 'POST':
print("Retrieving stock status")
return Response(render_template_stream('consumer.html', stockInfo = tasks_consumer.sendStockStatus()))
elif request.method == 'GET':
return '''
<!doctype html>
<html>
<head>
<title>Stock Sheet</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0"/>
</head>
<body class="container">
<h1>Stock Sheet</h1>
<div>
<button id="consumeTasks">Check stock status</button>
</div>
</body>
</html>
'''
# Run using port 5001
if __name__ == "__main__":
socketio.run(app,host='localhost', port=5001,debug=True)
tasks_consumer.py
import csv
from flask import request, stream_with_context
from init_consumer import app, socketio
import json
# Receive the webhook requests and emit a SocketIO event back to the client
def send_message(data):
status_code = 0
if request.method == 'POST':
roomid = app.config['uid']
msg = json.dumps(data)
event = "Send_stock_status"
socketio.emit(event, msg, namespace = '/collectHooks', room = roomid)
status_code = 200
else:
status_code = 405 # Method not allowed
return status_code
# Retrieve the stock status of the products sent through the webhook requests and return them back to the client.
@app.route('/consumetasks', methods=['POST'])
def sendStockStatus():
stockList = [] # List of products in stock
with open("NZ_NVJ_Apparel_SKUs_sheet.csv", newline='') as csvFile:
stockReader = csv.reader(csvFile, delimiter=',', quotechar='"')
for row in stockReader:
stockList.append(row[0])
stockSheet = {} # Dictionary of products sent in the request and their stock status
def generateStockStatus():
request_data = request.get_json()
if request_data:
if 'SKU' in request_data:
stockRequest = request_data['SKU'] # List of products sent in the request
for stock in stockRequest:
if stock in stockList:
stockStatus = "In Stock"
stockSheet.update({str(stock):stockStatus})
send_message(stockSheet)
yield stock, stockStatus
else:
stockStatus = "Out of Stock"
stockSheet.update({str(stock):stockStatus})
send_message(stockSheet)
yield stock, stockStatus
return stream_with_context(generateStockStatus())
当我运行 app_consumer.py
文件时,我得到以下输出:
127.0.0.1 - - [02/Jul/2023 00:28:02] "GET / HTTP/1.1" 200 -
initialize_params - Session ID stored = 69b0e5e8-d5ea-4279-88d1-9653007662d5
emitting event "Send_stock_status" to 69b0e5e8-d5ea-4279-88d1-9653007662d5 [/collectHooks]
127.0.0.1 - - [02/Jul/2023 00:28:05] "POST /consumetasks HTTP/1.1" 200 -
后跟错误:
Error on request:
Traceback (most recent call last):
File "C:\Users\yuanl\AppData\Local\Programs\Python\Python311\Lib\site-packages\werkzeug\serving.py", line 364, in run_wsgi
execute(self.server.app)
File "C:\Users\yuanl\AppData\Local\Programs\Python\Python311\Lib\site-packages\werkzeug\serving.py", line 328, in execute
write(data)
File "C:\Users\yuanl\AppData\Local\Programs\Python\Python311\Lib\site-packages\werkzeug\serving.py", line 296, in write
assert isinstance(data, bytes), "applications must write bytes"
AssertionError: applications must write bytes
请注意,stock
和 stockStatus
变量都是 string
类型。示例 stockSheet
如下所示:
{'PFMTSHIRT_CN_WM_CLS_L_OLIVEDRAB': '缺货'}
我最初认为该错误是由 tasks_consumer.py
中的 generateStockStatus()
函数中的 yield
语句生成的。因此我尝试将yield语句更改为yield str(stock), str(stockStatus)
, yield stock.encode('utf-8'), stockStatus.encode('utf-8 ')
和 yield bytes(stock, 'utf-8'), bytes(stockStatus, 'utf-8')
,但是错误仍然存在。
然后我认为错误是由 app_consumer.py< 中
文件,因此我在返回的字符串上添加了 elif request.method == 'GET':
语句之后的 return
语句生成的.encode()
方法,但是我收到了一个新错误:
类型错误:字节类型的对象不可 JSON 序列化
。
有人能指出我修复错误的正确方向吗?
最佳答案
关于python - 使用 Python Flask 流式传输数据时为 "AssertionError: applications must write bytes",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76595013/