当前位置: 代码网 > it编程>前端脚本>Python > Flask框架里面sse的使用示例

Flask框架里面sse的使用示例

2025年05月20日 Python 我要评论
可以在一个 flask 应用中注册多个 blueprint,每个 blueprint 可以对应一个 sse 接口。例如:from flask import flask, response, bluep

可以在一个 flask 应用中注册多个 blueprint,每个 blueprint 可以对应一个 sse 接口。例如:

from flask import flask, response, blueprint
import time

app = flask(__name__)

bp1 = blueprint('sse1', __name__)
bp2 = blueprint('sse2', __name__)

@bp1.route('/sse1')
def sse1():
    def generate():
        while true:
            yield 'data: {}\n\n'.format(time.time())
            time.sleep(1)
    return response(generate(), mimetype='text/event-stream')

@bp2.route('/sse2')
def sse2():
    def generate():
        while true:
            yield 'data: {}\n\n'.format(time.strftime('%y-%m-%d %h:%m:%s'))
            time.sleep(1)
    return response(generate(), mimetype='text/event-stream')

app.register_blueprint(bp1)
app.register_blueprint(bp2)

if __name__ == '__main__':
    app.run(debug=true)

在这个例子中,我们创建了两个 blueprint,分别对应 `/sse1` 和 `/sse2` 接口。每个 blueprint 中的函数都返回一个 sse 数据流,使用 `response` 类型的响应对象封装。最后在 flask 应用中注册这两个 blueprint,即可同时开启两个 sse 接口。

from flask import flask, response, request

app = flask(__name__)

# 存储所有的订阅者
subscribers = {}

# 订阅接口,每个浏览器对应一个topic
@app.route('/subscribe/<topic>')
def subscribe(topic):
    def stream():
        # 将当前请求的客户端添加到订阅者列表中
        subscribers.setdefault(topic, []).append(stream)
        # 无限循环,等待新的消息
        while true:
            # 等待新的消息
            message = request.args.get('message')
            if message:
                # 向所有订阅者推送新的消息
                for subscriber in subscribers.get(topic, []):
                    subscriber.put(message)
            # 休眠一段时间,减少服务器压力
            time.sleep(1)

    # 设置响应头,告诉客户端这是一个sse流
    response = response(stream(), mimetype='text/event-stream')
    response.headers['cache-control'] = 'no-cache'
    response.headers['connection'] = 'keep-alive'
    return response

# 推送接口,向指定topic的所有订阅者推送消息
@app.route('/publish/<topic>')
def publish(topic):
    # 获取要推送的消息
    message = request.args.get('message')
    if message:
        # 向所有订阅者推送新的消息
        for subscriber in subscribers.get(topic, []):
            subscriber.put(message)
    return 'ok'

在上面的代码中,我们定义了两个接口,`/subscribe/<topic>`用于订阅指定的topic,`/publish/<topic>`用于向指定topic的所有订阅者推送消息。
在订阅接口中,我们将当前请求的客户端添加到订阅者列表中,并通过一个无限循环等待新的消息。当有新的消息到达时,我们会向所有订阅者推送这条消息。
在推送接口中,我们获取要推送的消息,并向指定topic的所有订阅者推送这条消息。
使用这个代码,你可以轻松地实现一个简单的sse推送服务,每个浏览器对应一个topic。

import uuid
from flask import flask, jsonify, request, response, g
from flask_cors import cors
from flask_sse import sse
import time
import json

app = flask(__name__)
cros = cors(app)
app.config['redis_url'] = 'redis://localhost'
app.register_blueprint(sse, url_prefix='/stream')
# sse 推送函数


# sse 推送路由


@app.route('/register', methods=["get"])
def register():
    # 获取客户端标识符
    client_id = str(uuid.uuid4())

    # 返回 sse 响应
    return jsonify({"client_id": client_id})

# sse 推送路由


@app.route('/sse', methods=['post'])
def stream():
    # 获取客户端标识符
    data = request.get_json()
    client_id = data['clientid']
    print("client_id", client_id)

    def aa():
        # 循环发送 sse 数据
        for i in range(10):
            data = 'hello, %s!' % client_id+str(i)
            sse.publish(data, channel=client_id, type='message')
            time.sleep(1)
        sse.publish("end", channel=client_id, type='message')
     # 返回 sse 响应
    response = response(aa(), mimetype='text/event-stream')
    response.headers.add('cache-control', 'no-cache')
    response.headers.add('connection', 'keep-alive')
    response.headers.add('x-accel-buffering', 'no')
    return response


if __name__ == '__main__':
    app.run(debug=true, port=5000)

在上面的代码中,我们使用了 flask-sse 扩展来管理 sse 通道。我们在 `/register` 路由中为每个客户端创建了唯一的标识符,并将其存储在请求环境中。在 `/sse` 路由中,我们使用 `sse.publish` 方法发送 sse 数据。

到此这篇关于flask框架里面sse的使用示例的文章就介绍到这了,更多相关flask使用sse内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com