当前位置: 代码网 > it编程>前端脚本>Python > Python基于WebSocket实现直播弹幕数据采集

Python基于WebSocket实现直播弹幕数据采集

2026年04月30日 Python 我要评论
前言在进行直播数据分析、舆情研究或用户互动行为研究时,弹幕数据是一类非常重要的实时文本数据来源。相比评论数据,弹幕具有两个明显特点:一是实时性强,几乎与直播内容同步出现;二是互动密度高,能够反映观众情

前言

在进行直播数据分析、舆情研究或用户互动行为研究时,弹幕数据是一类非常重要的实时文本数据来源。相比评论数据,弹幕具有两个明显特点:

一是实时性强,几乎与直播内容同步出现;

二是互动密度高,能够反映观众情绪变化与热点话题。因此,如果能够稳定地采集直播弹幕数据,就可以进一步开展诸如情感分析、关键词统计、热点时刻识别等研究。

在实际开发过程中,我尝试设计了一套 基于 websocket 的实时弹幕采集程序。整体思路并不复杂:通过 websocket 服务接收弹幕消息,对消息进行解析与格式化处理,然后通过队列进行缓存,并最终保存到日志文件和 json 文件中。这样不仅可以实现实时监控,还能够方便后续进行数据分析与建模。

本文结合具体代码,分享这一套弹幕采集程序的实现思路与关键技术。

一、环境准备与依赖安装

在开始之前,需要安装 websocket 相关依赖库。

pip install websockets

代码中还使用了以下 python 内置库:

  • asyncio(异步编程)
  • json(数据解析)
  • logging(日志系统)
  • os(文件操作)
  • datetime(时间处理)

导入模块如下:

import asyncio
import json
import websockets
from collections import deque
import logging
import os
from datetime import datetime

这里特别需要注意的是 asyncio 与 websockets 的组合,它能够实现 异步消息处理,非常适合实时弹幕这种高频数据流场景。

二、日志系统与数据缓存设计

在实际采集弹幕时,如果只是在终端输出数据,很容易丢失重要信息。因此我在代码中设计了 日志系统 + 数据缓存队列

首先配置日志系统:

logging.basicconfig(
    level=logging.info,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%h:%m:%s'
)

这样可以让每一条弹幕都带有时间信息,方便后续查看与调试。

接下来创建一个 弹幕队列

danmu_queue = deque(maxlen=100)

这里使用 deque 的原因是:插入速度快、支持固定长度缓存、适合实时数据流。

三、弹幕数据存储结构设计

为了便于后续分析,我将弹幕数据保存为两种格式:

1️⃣ 文本日志
2️⃣ json数据

配置代码如下:

data_dir = 'danmu_data'
log_file = os.path.join(data_dir, 'danmu.log')
json_file = os.path.join(data_dir, 'danmu.json')

os.makedirs(data_dir, exist_ok=true)

这样程序运行时会自动创建一个 danmu_data 文件夹,用来存储采集的数据.

四、弹幕消息解析函数

核心逻辑是 process_message() 函数,它负责解析 websocket 接收到的消息。

async def process_message(message_data):

首先解析 json 数据:

data = json.loads(message_data)

接着判断消息类型:

if data.get('type') != 'danmu':
    return none

这里的设计非常重要,因为 websocket 可能会发送多种类型的消息,而我们只需要 弹幕类型的数据

然后提取关键字段:

content = message.get('content', '')
sender = message.get('nickname', 'unknown')
time = message.get('time', '')
user_token = message.get('usertoken', '')
live_id = message.get('liveid', '')

最终构造一条格式化弹幕:

formatted_message = f"[{time}] {sender} [{user_token}]\n{content}"

例如:

[20:35:12] 用户12345 [token]
这主播太搞笑了

随后将弹幕保存到字典中:

danmu = {
    'user': sender,
    'content': content,
    'time': time,
    'live_id': live_id,
    'user_token': user_token,
    'formatted': formatted_message
}

并放入队列:

danmu_queue.append(danmu)

最后调用函数保存到文件。

五、websocket服务器实现

为了接收弹幕数据,需要搭建一个 websocket 服务。

核心函数:

async def websocket_handler(websocket, path):

当有客户端连接时:

client_id = id(websocket)
logging.info(f"新的客户端连接 (id: {client_id})")

然后持续接收消息:

async for message in websocket:
    success = await process_message(message)

如果消息处理失败:

logging.warning(f"消息处理失败: {message}")

当客户端断开连接时:

except websockets.exceptions.connectionclosed:
    logging.info(f"客户端断开连接")

这一部分逻辑实现了 实时弹幕监听机制

六、服务器启动逻辑

服务器入口函数:

async def main():

启动 websocket 服务:

server = await websockets.serve(
    websocket_handler,
    "127.0.0.1",
    8765,
    ping_interval=none
)

参数解释:

关闭 ping 的原因是某些客户端在频繁 ping 时可能会出现连接异常。

七、弹幕数据保存机制

为了方便数据分析,代码设计了一个保存函数:

def save_danmu_to_file(danmu):

首先写入文本日志:

with open(log_file, 'a', encoding='utf-8') as f:
    f.write(danmu['formatted'] + '\n\n')

再写入 json 数据:

json.dump(danmu_with_timestamp, f, ensure_ascii=false)

并增加保存时间:

danmu_with_timestamp['save_time'] = datetime.now().isoformat()

最终每条弹幕都会以 json 形式存储。

例如:

{
 "user":"张三",
 "content":"主播太厉害了",
 "time":"20:35:12",
 "live_id":"123456",
 "user_token":"abcde",
 "save_time":"2026-03-09t20:35:12"
}

这样既可以用于日志查看,也可以直接用于数据分析。

八、程序启动与关闭

主程序入口:

if __name__ == "__main__":

程序启动时会记录日志:

==================================================
程序启动于: 2026-03-09 20:30:01
==================================================

程序结束时也会记录退出时间。

这一步设计的好处是:可以知道采集时长,也可以追踪异常退出。

九、弹幕采集程序的设计心得

在设计这个弹幕采集程序时,我有几个比较深的体会:

第一,websocket非常适合实时数据采集。相比传统http轮询方式,websocket能够持续保持连接,从而实时接收弹幕消息,大幅降低延迟。

第二,异步编程是处理高频消息流的关键。使用 asyncio 可以避免阻塞,提高程序整体效率。

第三,数据存储需要兼顾实时与分析需求。因此同时保存为日志文件和 json 文件,这样既能实时查看,也方便后续做数据分析。

第四,缓存队列设计可以提高系统稳定性。通过 deque 保存最近弹幕,可以避免高频消息导致系统压力过大。

十、总结

通过这套程序,我们实现了一套完整的 直播弹幕实时采集系统,具备以下特点:

(1)websocket实时采集

(2)asyncio异步处理

(3)弹幕队列缓存

(4)自动日志记录

(5)json结构化数据保存

(6)程序运行状态记录

这套程序不仅可以用于直播弹幕采集,还可以扩展到实时舆情监测、弹幕情感分析、用户互动研究、直播热点识别。

后续如果需要,还可以进一步加入数据库存储(mysql / mongodb)、弹幕情感分析、实时数据可视化、弹幕关键词统计。

从整体实现来看,这套弹幕采集程序虽然代码量不算很大,但在设计上我更关注的是稳定性和可扩展性。通过 websocket 实现实时数据接收,再结合 asyncio 异步处理,可以比较轻松地应对高频弹幕消息;同时利用 deque 做缓存队列、日志系统做运行记录、json 做结构化数据存储,使得整个数据流从“接收 → 解析 → 缓存 → 落盘”形成一个完整闭环。这样的设计不仅方便后续做弹幕文本分析、情感分析或关键词统计,也为进一步接入数据库、实时可视化或数据流处理打下了基础。

以上就是python基于websocket实现直播弹幕数据采集的详细内容,更多关于python websocket弹幕数据采集的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com