一. 前言
在python中,回调函数是指在一个函数执行完成后,调用另一个函数的过程。通常情况下,回调函数作为参数传递给原始函数,原始函数在执行完自己的逻辑后,会自动调用回调函数并将结果作为参数传递给它。
二. 回调函数基本使用
以下是在python中设置回调函数的基本步骤:
- 定义回调函数,确定回调函数的参数列表和返回值(如果有)。
- 在原始函数中,将回调函数作为参数传递给需要回调的函数。
- 在原始函数内部的适当位置调用回调函数,将需要传递的参数传递给它。
例如,假设我们需要设置一个回调函数来处理异步操作的结果,可以按如下方式进行设置:
# 定义回调函数
def callback(result):
print('callback function is called with result: ', result)
# 异步函数,需要传入回调函数
def async_function(param1, param2, callback):
# 进行异步操作
result = param1 + param2
# 异步操作完成后调用回调函数
callback(result)
# 调用异步函数,并传入回调函数
async_function(1, 2, callback)
运行结果

在上面的代码中,我们先定义了一个回调函数callback,然后在异步函数async_function中将该函数作为参数传递,并在异步操作完成后调用回调函数,将操作结果传递给它。
通常情况下,我们会将回调函数定义为一个可调用对象,也就是实现了__call__方法的类对象。使用这种方式,可以更加灵活地定义回调函数,并且可以把一些状态或上下文信息存储在对象中,在回调函数中使用。
三. 进阶 - 使用functools.partial传递函数
1. functools.partial基本介绍
functools.partial 是 python 标准库中的一个函数,用来部分应用一个函数(partial application),也就是固定函数的一部分参数,返回一个新的函数。partial 函数的用法如下:
functools.partial(func, *args, **kwargs)
其中,func 是要部分应用的函数,*args 和 **kwargs 是要固定的参数。
具体来说,partial 函数会返回一个新的函数对象,这个新的函数对象跟原来的函数对象是相似的,但是将部分参数固定下来了,相当于原来的函数变成了一个带有默认参数的函数。我们可以用这个新的函数对象来调用原来的函数,而不必传入那些已经固定的参数。
下面是一个简单的示例代码,演示如何使用 partial 函数:
import functools
# 定义一个简单的加法函数
def add(a, b):
return a + b
# 固定 add 函数的第一个参数
add2 = functools.partial(add, 2)
# 调用 add2 函数
print(add2(3)) # 输出:5
在上面的示例代码中,我们定义了一个简单的加法函数 add,然后使用 partial 函数将 add 函数的第一个参数固定为 2,得到一个新的函数对象 add2。接下来,我们使用 add2 函数来计算 2+3 的结果,并将结果输出到控制台。
使用 partial 函数的好处是可以更方便地定义新的函数,避免代码重复。比如,如果我们想定义一个加 3、加 4、加 5 的函数,可以使用 partial 来实现,而不必写多个类似的函数。
add3 = functools.partial(add, 3) add4 = functools.partial(add, 4) add5 = functools.partial(add, 5) print(add3(2)) # 输出:5 print(add4(2)) # 输出:6 print(add5(2)) # 输出:7
在上面的示例代码中,使用 partial 函数分别定义了 3 个新的函数 add3、add4、add5,分别将加数固定为 3、4、5,然后使用这些新的函数计算 2+3、2+4、2+5 的结果,并将结果输出到控制台。
2. functools.partial进阶使用示例代码
接下来我们看一个socket连接成功之后调用回调函数的例子:
1. 启动一个socket server服务
import json
import os
import socket
import threading
import time
import sys
import traceback
host = '127.0.0.1' # 服务器ip地址
port = 8000 # 服务器端口号
backlog = 5 # 服务器监听队列大小,即最多同时接收多少个客户端连接
reconnect_interval = 5 # 重连间隔,单位:秒
def start_server():
print(os.getpid())
while true:
try:
# 创建一个 tcp/ip socket 对象
server_socket = socket.socket(socket.af_inet, socket.sock_stream)
# 绑定服务器 ip 地址和端口号
server_socket.bind((host, port))
# 开始监听客户端连接请求
server_socket.listen(backlog)
print('服务器启动,监听端口:%s' % port)
while true:
# 等待客户端连接
print('等待客户端连接...')
try:
client_socket, client_address = server_socket.accept()
threading.thread(target=send_msg, args=(client_socket, client_address)).start()
# send_msg(client_socket, client_address)
print(f"process {threading.current_thread()}: {threading.active_count()} threads")
print(f"total threads: {threading.active_count()}")
print('新客户端连接,地址:%s' % str(client_address))
# 读取客户端发送的数据
data = client_socket.recv(1024)
print('received data:', data.decode())
# 向客户端发送数据
message = 'welcome to my server!'
client_socket.sendall(message.encode())
except exception as e:
print('客户端连接异常,错误信息:%s' % e)
finally:
# 关闭客户端连接
client_socket.close()
print('客户端连接已关闭')
except exception as e:
print('服务器异常,错误信息:%s' % e)
traceback.print_exc()
# 关闭服务端 socket
server_socket.close()
print('{}s后尝试重连服务器...'.format(reconnect_interval))
time.sleep(reconnect_interval)
def send_msg(client, addr):
try:
while 1:
time.sleep(1)
jsontemplate = {
"command": "forward_elev_info",
"deviceid": "c0002t",
"elevid": 1,
}
msg2elev = json.dumps(jsontemplate).encode() + "\n".encode()
client.sendto(msg2elev, addr)
print('send msg to client:{}:{}'.format(addr, msg2elev))
except exception as e:
print('send_msg:{}'.format(e))
if __name__ == '__main__':
# 启动服务器
start_server()
2. 开启一个客户端连接,连接成功后调用回调函数
import functools
import json
import socket
import threading
import time
import traceback
class testclient(threading.thread):
def __init__(self, connecthost, connectport, callbackfunc):
threading.thread.__init__(self, name="testclient")
self.host = connecthost
self.port = connectport
self.callbackfunc = callbackfunc
self.sck = socket.socket(socket.af_inet, socket.sock_stream)
def run(self):
self.connect()
while true:
try:
# 从socket中读取数据
# data = self.sck.recv(1024)
# print(data)
data = self.recv_msg(self.sck)
if data is none:
time.sleep(1)
continue
self.callbackfunc(data)
except oserror:
# an operation was attempted on something that is not a socket
traceback.print_exc()
time.sleep(5)
# fixme: if socket is broken, reconnect with the same sck does not work, so create an new one.
self.sck = socket.socket(socket.af_inet, socket.sock_stream)
self.connect()
except exception as e:
# todo: if disconnected, connect it
traceback.print_exc()
time.sleep(5)
self.connect()
def recv_msg(self, sock):
try:
data = sock.recv(1024)
print('recv data:{}'.format(data))
return data
except exception as e:
print('recv_msg:{}'.format(e))
sock.close()
time.sleep(0.5)
def connect(self):
while true:
try:
self.sck.connect((self.host, self.port))
print("connected to service {}:{}".format(self.host, self.port))
break
except connectionrefusederror:
print("service refused or not started? reconnecting to service in 5s")
time.sleep(5)
except exception as e:
print("connect error type->{}".format(type(e)))
traceback.print_exc()
# fixme: if other exception, not sure to restart action will work.
time.sleep(5)
def callbackfunc(a, res):
print(a)
print('callback msg -- >', res)
if __name__ == '__main__':
connecthost = '127.0.0.1'
connectport = 8000
callbackfunc = callbackfunc
elevclient = testclient(connecthost, connectport, functools.partial(callbackfunc, 'hello callback'))
elevclient.start()
上面的程序定义了一个回调函数callbackfunc,在socket连接完成后将其作为参数传给testclient类的构造函数,用于处理接收到的消息,通过回调方式处理从服务端发送回来的数据。
运行结果

到此这篇关于python中functools.partial设置回调函数处理异步任务使用的文章就介绍到这了,更多相关python functools.partial异步内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论