可以在一个 flask 应用中注册多个 blueprint,每个 blueprint 可以对应一个 sse 接口。例如:
from flask import flask, response, blueprint
import time
app = flask(__name__)
bp1 = blueprint('sse1', __name__)
bp2 = blueprint('sse2', __name__)
@bp1.route('/sse1')
def sse1():
def generate():
while true:
yield 'data: {}\n\n'.format(time.time())
time.sleep(1)
return response(generate(), mimetype='text/event-stream')
@bp2.route('/sse2')
def sse2():
def generate():
while true:
yield 'data: {}\n\n'.format(time.strftime('%y-%m-%d %h:%m:%s'))
time.sleep(1)
return response(generate(), mimetype='text/event-stream')
app.register_blueprint(bp1)
app.register_blueprint(bp2)
if __name__ == '__main__':
app.run(debug=true)在这个例子中,我们创建了两个 blueprint,分别对应 `/sse1` 和 `/sse2` 接口。每个 blueprint 中的函数都返回一个 sse 数据流,使用 `response` 类型的响应对象封装。最后在 flask 应用中注册这两个 blueprint,即可同时开启两个 sse 接口。
from flask import flask, response, request
app = flask(__name__)
# 存储所有的订阅者
subscribers = {}
# 订阅接口,每个浏览器对应一个topic
@app.route('/subscribe/<topic>')
def subscribe(topic):
def stream():
# 将当前请求的客户端添加到订阅者列表中
subscribers.setdefault(topic, []).append(stream)
# 无限循环,等待新的消息
while true:
# 等待新的消息
message = request.args.get('message')
if message:
# 向所有订阅者推送新的消息
for subscriber in subscribers.get(topic, []):
subscriber.put(message)
# 休眠一段时间,减少服务器压力
time.sleep(1)
# 设置响应头,告诉客户端这是一个sse流
response = response(stream(), mimetype='text/event-stream')
response.headers['cache-control'] = 'no-cache'
response.headers['connection'] = 'keep-alive'
return response
# 推送接口,向指定topic的所有订阅者推送消息
@app.route('/publish/<topic>')
def publish(topic):
# 获取要推送的消息
message = request.args.get('message')
if message:
# 向所有订阅者推送新的消息
for subscriber in subscribers.get(topic, []):
subscriber.put(message)
return 'ok'
在上面的代码中,我们定义了两个接口,`/subscribe/<topic>`用于订阅指定的topic,`/publish/<topic>`用于向指定topic的所有订阅者推送消息。
在订阅接口中,我们将当前请求的客户端添加到订阅者列表中,并通过一个无限循环等待新的消息。当有新的消息到达时,我们会向所有订阅者推送这条消息。
在推送接口中,我们获取要推送的消息,并向指定topic的所有订阅者推送这条消息。
使用这个代码,你可以轻松地实现一个简单的sse推送服务,每个浏览器对应一个topic。
import uuid
from flask import flask, jsonify, request, response, g
from flask_cors import cors
from flask_sse import sse
import time
import json
app = flask(__name__)
cros = cors(app)
app.config['redis_url'] = 'redis://localhost'
app.register_blueprint(sse, url_prefix='/stream')
# sse 推送函数
# sse 推送路由
@app.route('/register', methods=["get"])
def register():
# 获取客户端标识符
client_id = str(uuid.uuid4())
# 返回 sse 响应
return jsonify({"client_id": client_id})
# sse 推送路由
@app.route('/sse', methods=['post'])
def stream():
# 获取客户端标识符
data = request.get_json()
client_id = data['clientid']
print("client_id", client_id)
def aa():
# 循环发送 sse 数据
for i in range(10):
data = 'hello, %s!' % client_id+str(i)
sse.publish(data, channel=client_id, type='message')
time.sleep(1)
sse.publish("end", channel=client_id, type='message')
# 返回 sse 响应
response = response(aa(), mimetype='text/event-stream')
response.headers.add('cache-control', 'no-cache')
response.headers.add('connection', 'keep-alive')
response.headers.add('x-accel-buffering', 'no')
return response
if __name__ == '__main__':
app.run(debug=true, port=5000)
在上面的代码中,我们使用了 flask-sse 扩展来管理 sse 通道。我们在 `/register` 路由中为每个客户端创建了唯一的标识符,并将其存储在请求环境中。在 `/sse` 路由中,我们使用 `sse.publish` 方法发送 sse 数据。
到此这篇关于flask框架里面sse的使用示例的文章就介绍到这了,更多相关flask使用sse内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论