连接池实现
socket_pool.py
# -*- coding:utf-8 -*- import socket import time import threading import os import logging import traceback from queue import queue, empty _logger = logging.getlogger('mylogger') class socketpool: def __init__(self, host, port, min_connections=10, max_connections=10): ''' 初始化socket连接池 :param host: 目标主机地址 :param port: 目标端口号 :param min_connections: 最小连接数 :param max_connections: 最大连接数 ''' self.host = host self.port = port self.min_connections = min_connections self.max_connections = max_connections self.busy_sockets_dict = {} # 存放从连接池取出的socket的id self._sock_lock = threading.lock() # 线程锁保证计数正确 self._pool = queue(max_connections) # 基于线程安全的队列存储连接 self._lock = threading.lock() # 线程锁保证资源安全: self._init_pool() # 预创建连接 self._start_health_check() # 启动连接健康检查线程 def _init_pool(self): '''预创建连接并填充到池中''' for _ in range(self.min_connections): sock = self._create_socket() self._pool.put(sock) def _create_socket(self): '''创建新的socket连接''' sock = socket.socket(socket.af_inet, socket.sock_stream) try: sock.connect((self.host, self.port)) return sock except socket.error as e: raise connectionerror(f'failed to connect: {e}') # 连接失败抛出异常 def _start_health_check(self): '''启动后台线程定期检查连接有效性''' def check(): while true: with self._lock: for _ in range(self._pool.qsize()): sock = self._pool.get() self.busy_sockets_dict[sock] = 1 try: sock.send(b'ping<end>') # 发送心跳包验证连接状态 # 以下 11 为服务端返回数据字节长度,不能乱写,否则会导致获取非健康检查响应报文数据存在多余内容,不符合格式,从而导致数据解析问题 sock.recv(11) self._pool.put(sock) self.busy_sockets_dict.pop(sock) except (socket.error, connectionreseterror): _logger.error('socket连接健康检查出错:%s, 关闭失效连接并创建新连接替换' % traceback.format_exc()) sock.close() # 关闭失效连接并创建新连接替换 self.busy_sockets_dict.pop(sock) new_sock = self._create_socket() self._pool.put(new_sock) # 如果sock数量小于最小数量,则补充 for _ in range(0, self.min_connections - self._pool.qsize()): new_sock = self._create_socket() self._pool.put(new_sock) time.sleep(60) # 每60秒检查一次 threading.thread(target=check, daemon=true).start() def get_connection(self): ''' 从池中获取一个可用连接 :return: socket对象 ''' with self._sock_lock: if self._pool.empty(): if len(self.busy_sockets_dict.keys()) < self.max_connections: new_sock = self._create_socket() self.busy_sockets_dict[new_sock] = 1 return new_sock else: raise empty('no available connections in pool') else: try: sock = self._pool.get(block=false) self.busy_sockets_dict[sock] = 1 return sock except exception: _logger.error('获取socket连接出错:%s' % traceback.format_exc()) raise def release_connection(self, sock): ''' 将连接归还到池中 :param sock: 待归还的socket对象 ''' if not sock._closed: self._pool.put(sock) if sock in self.busy_sockets_dict: self.busy_sockets_dict.pop(sock) def close_all(self): '''关闭池中所有连接''' while not self._pool.empty(): sock = self._pool.get() sock.close() self.busy_sockets_dict.pop(sock.id) self.busy_sockets_dict = {} # 兜底 host = os.environ.get('modbus_tcp_server_host', '127.0.0.1') port = int(os.environ.get('modbus_tcp_server_port', '9000')) min_connections = int(os.environ.get('django_socket_pool_max_connections', '10')) max_connections = int(os.environ.get('django_socket_pool_max_connections', '100')) socketpool = socketpool(host, port, min_connections, max_connections)
使用连接池
from socket_pool import socketpool def send_socket_msg(data): global socketpool try: sock = none # 获取连接(支持超时控制) sock = socketpool.get_connection() # 发送数据 sock.sendall(data.encode('utf-8')) except exception: error_msg = '发送消息出错:%s' % traceback.format_exc() _logger.error(error_msg) if sock is not none: sock.close() socketpool.release_connection(sock) return send_socket_msg(data) response = '' try: while true: chunk = sock.recv(4096) chunk = chunk.decode('utf-8') response += chunk if response.endswith('<end>'): response = response.rstrip('<end>') return {'success':true, 'message':response} except exception: error_msg = '获取消息出错:%s' % traceback.format_exc() _logger.error(error_msg) return {'success':false, 'message': error_msg} finally: # 必须归还连接! socketpool.release_connection(sock)
到此这篇关于python 基于队列实现 tcp socket 连接池的文章就介绍到这了,更多相关python tcp socket 连接池内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论