当前位置: 代码网 > it编程>前端脚本>Python > Python使用WebSocket和SSE实现HTTP服务器消息推送方式

Python使用WebSocket和SSE实现HTTP服务器消息推送方式

2024年11月25日 Python 我要评论
很多时候我们需要实时获取最新数据,但是传统意义上的http请求,必须由客户端向服务端发起请求,服务端再返回相应的数据。那如果我们需要获取实时数据,就要通过http轮询,客户端不间断的向服务器发起请求。

很多时候我们需要实时获取最新数据,但是传统意义上的http请求,必须由客户端向服务端发起请求,服务端再返回相应的数据。

那如果我们需要获取实时数据,就要通过http轮询,客户端不间断的向服务器发起请求。

这样不断的的请求不但严重加大服务器的压力,还可能因为网络延迟而影响数据的时效性。

下面介绍两种方法能够很好的满足业务的需求。

一、websocket

websocket是html5开始提供的一种在单个 tcp 链接上进行全双工通信的协议。

  • 优点:双工通信
  • 缺点:需专门定义数据协议,解析数据流,且部分服务器支持不完善,后台例如java spring boot 2.1.2 仅支持websocket 1.0(最高已达1.3)

1.客户端代码

python 3+ 代码

#python 3+
import threading
import websocket


class client:
    def __init__(self,url):
    	self.url = url
        self.ws = none
        self.enable = true


    def on_message(self,response):
		self.enable = false
        print(response)


    def on_error(self,error):
        # print(ws)
        print(error)


    def on_close(self):
    	self.enable = true
        print(ws)
       

    def on_open(self):
		print('open')
    
    def start_func(self):

        while self.enable:
            websocket.enabletrace(true)
            self.ws = websocket.websocketapp(self.url,
                                        on_message=self.on_message,
                                        # on_data=on_data,
                                        on_error=self.on_error,
                                        on_open=self.on_open,
                                        on_close=self.on_close, )


            self.ws.run_forever(ping_interval=60, ping_timeout=5)

if __name__ == "__main__":
	cli = client(url = 'wss://api.zb.live/websocket' )
    t1 = threading.thread(target=cli.start_func_zb)
    t1.start()

javascript 代码

var ws = new websocket("wss://echo.websocket.org");

ws.onopen = function(evt) { 
  console.log("connection open ..."); 
  ws.send("hello websockets!");
};

ws.onmessage = function(evt) {
  console.log( "received message: " + evt.data);
  ws.close();
};

ws.onclose = function(evt) {
  console.log("connection closed.");
};      

2.服务端代码

from websocket_server import websocketserver

class wssocketobj:
    def __init__(self,host=none,port = 8131):
       self.host = host if host else '127.0.0.1'
       self.port = port

    # 当新的客户端连接时会提示
    def new_client(self,client, server,):
        print("当新的客户端连接时会提示:%s" % client['id'])
        dd = 122
        server.send_message_to_all("hey all, a new client has joined us")

    # 当旧的客户端离开
    def client_left(self,client, server):
        print("客户端%s断开" % client['id'])

    # 接收客户端的信息。
    def message_received(self,client, server, message):
        print("client(%d) said: %s" % (client['id'], message))
        # server.send_message_to_all(message) #发送消息给 全部客户端
        server.send_message(client, 'hello,client')  # 发送消息给指定客户端

    def run_server(self):
        server = websocketserver(self.port, self.host)
        server.set_fn_new_client(self.new_client)
        server.set_fn_client_left(self.client_left)
        server.set_fn_message_received(self.message_received)
        server.run_forever()

if __name__ == '__main__':

    wssocketobj().run_server()

二、sse(server-sent events,服务器发送事件)

sse ( server-sent events )通俗解释起来就是一种基于http的,以流的形式由服务端持续向客户端发送数据的技术,是 websocket 的一种轻量代替方案。

  • 优点:开发简单,和传统的http开发几乎无任何差别,客户端开发简单,有标准支持(eventsource)
  • 缺点:和websocket相比,只能单工通信,建立连接后,只能由服务端发往客户端,且占用一个连接,如需客户端向服务端通信,需额外打开一个连接

1.客户端代码

python

# 第一种方式
def sse_client():
    """
    pip install sseclient-py
    只对于返回标准sse格式的请求可用 格式:event: {event}\nretry: 10000\ndata: {data}\n\n
    :return:
    """
    import requests
    # res = requests.request('get', url, json=data, stream=true, headers={'accept': 'text/event-stream'})
    client = requests.post(url, json=data, stream=true, headers={'accept': 'text/event-stream'})
    client = sseclient.sseclient(client)
    for i in client.events():
        print(i.data)

# 第二种方式
def sse_with_requests():
    headers = {"accept": "text/event-stream"}
    r = requests.post(url, headers=headers, json=data, stream=true)
    r.encoding = 'utf-8'
    for chunk in r.iter_content(chunk_size=none, decode_unicode=true):
        # 处理接收到的数据块
        print("received:", chunk)

javascript

第一种方式:

//判断是否支持sse
if('eventsource' in window){

//初始化sse
var url="http:localhost:8000/stream";
var source=new eventsource(url);

// 连接成功后会触发 open 事件
source.onopen=(event)=>{

console.log("开启sse");

}

// 服务器发送信息到客户端时,如果没有 event 字段,默认会触发 message 事件
source.onmessage=(event)=>{

var data=event.data;

$("body").append($("<p>").text(data));

}

//监听like事件
source.addeventlistener('like',function(event){

var data=event.data;

$("body").append($("<p>").text(data));
},false);

// 连接异常时会触发 error 事件并自动重连
source.onerror=(event)=>{

console.log(event);

}

第二种方式:使用 addeventlistener 方法来添加相应的事件处理方法

if (window.eventsource) {
  // 创建 eventsource 对象连接服务器
  const source = new eventsource('http://localhost:2000');

  // 连接成功后会触发 open 事件
  source.addeventlistener('open', () => {
    console.log('connected');
  }, false);

  // 服务器发送信息到客户端时,如果没有 event 字段,默认会触发 message 事件
  source.addeventlistener('message', e => {
    console.log(`data: ${e.data}`);
  }, false);

  // 自定义 eventhandler,在收到 event 字段为 slide 的消息时触发
  source.addeventlistener('slide', e => {
    console.log(`data: ${e.data}`); // => data: 7
  }, false);

  // 连接异常时会触发 error 事件并自动重连
  source.addeventlistener('error', e => {
    if (e.target.readystate === eventsource.closed) {
      console.log('disconnected');
    } else if (e.target.readystate === eventsource.connecting) {
      console.log('connecting...');
    }
  }, false);
} else {
  console.error('your browser doesn\'t support sse');
}

eventsource从父接口 eventtarget 中继承了属性和方法,其内置了 3 个 eventhandler 属性、2 个只读属性和 1 个方法:

eventhandler 属性

  • eventsource.onopen 在连接打开时被调用。
  • eventsource.onmessage 在收到一个没有 event 属性的消息时被调用。
  • eventsource.onerror 在连接异常时被调用。 只读属性
  • eventsource.readystate 一个 unsigned short 值,代表连接状态。可能值是 connecting (0), open (1), 或者 closed (2)。
  • eventsource.url连接的 url。 方法
  • eventsource.close() 关闭连接

eventsource 对象的 onmessage 属性的作用类似于 addeventlistener( ‘ message ’ )

2.服务端代码(基于flask)

import json
import time

from flask import flask, request
from flask import response
from flask import render_template

app = flask(__name__)


def get_message():
    """this could be any function that blocks until data is ready"""
    time.sleep(1)
    s = time.ctime(time.time())
    return json.dumps(['当前时间:' + s , 'a'], ensure_ascii=false)


@app.route('/')
def hello_world():
    return render_template('index.html')


@app.route('/stream')
def stream():
    user_id = request.args.get('user_id')
    print(user_id)
    def eventstream():
        id = 0
        while true:
            id +=1
            # wait for source data to be available, then push it

            yield 'id: {}\nevent: add\ndata: {}\n\n'.format(id,get_message())


    return response(eventstream(), mimetype="text/event-stream")


if __name__ == '__main__':
    app.run()

因为sse是http请求,可是又限定是一个长链接,因此要设置mime类型为text/event-stream。返回的为字符串。 

消息的格式

服务器向浏览器发送的 sse 数据,必须是 utf-8 编码的文本;

每一次发送的信息,由若干个message组成,每一个message之间用\n\n分隔。每一个message内部由若干行组成

  • 格式
[field]:value\n

其中在规范中为消息定义了 4 个字段

  • id 表明id
  • event 表明消息的事件类型
  • data 消息的数据字段
  • retry 客户端重连的时间。只接受整数,单位是毫秒。如果这个值不是整数则会被自动忽略

需要注意的是,id字段不是必须的,服务器有可能不会在消息中带上 id 字段,这样子客户端就不会存在 last-event-id这个属性。所以为了保证数据可靠,我们需要在每条消息上带上 id 字段。

一个很有意思的地方是,规范中规定以冒号开头的消息都会被当作注释,一条普通的注释(:\n\n)对于服务器来说只占 5个字符,但是发送到客户端上的时候不会触发任何事件,这对客户端来说是非常友好的。所以注释一般被用于维持服务器和客户端的长连接。

3.sse使用注意事项

1、sse 如何保证数据完整性

客户端在每次接收到消息时,会把消息的 id 字段作为内部属性 last-event-id 储存起来。

sse 默认支持断线重连机制,在连接断开时会 触发 eventsource 的 error 事件,同时自动重连。再次连接成功时eventsource 会把 last-event-id 属性作为请求头发送给服务器,这样服务器就可以根据这个 last-event-id作出相应的处理。

这里需要注意的是,id 字段不是必须的,服务器有可能不会在消息中带上 id 字段,这样子客户端就不会存在 last-event-id这个属性。所以为了保证数据可靠,我们需要在每条消息上带上 id 字段。

2、减少开销

在 sse 的草案中提到,“text/event-stream” 的 mime 类型传输应当在静置 15秒后自动断开。在实际的项目中也会有这个机制,但是断开的时间没有被列入标准中。

为了减少服务器的开销,我们也可以有目的的断开和重连。

简单的办法是服务器发送一个 关闭消息并指定一个重连的时间戳,客户端在触发关闭事件时关闭当前连接并创建 一个计时器,在重连时把计时器销毁。

function connectsse() {
  if (window.eventsource) {
    const source = new eventsource('http://localhost:2000');
    let reconnecttimeout;

    source.addeventlistener('open', () => {
      console.log('connected');
      cleartimeout(reconnecttimeout);
    }, false);

    source.addeventlistener('pause', e => {
      source.close();
      const reconnecttime = +e.data;
      const currenttime = +new date();
      reconnecttimeout = settimeout(() => {
        connectsse();
      }, reconnecttime - currenttime);
    }, false);
  } else {
    console.error('your browser doesn\'t support sse');
  }
}

connectsse();

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com