Python - functools.partial设置回调函数处理异步任务基本使用

news2025/1/5 8:42:08

一. 前言

在Python中,回调函数是指在一个函数执行完成后,调用另一个函数的过程。通常情况下,回调函数作为参数传递给原始函数,原始函数在执行完自己的逻辑后,会自动调用回调函数并将结果作为参数传递给它。

二. 回调函数基本使用

以下是在Python中设置回调函数的基本步骤:

  1. 定义回调函数,确定回调函数的参数列表和返回值(如果有)。
  2. 在原始函数中,将回调函数作为参数传递给需要回调的函数。
  3. 在原始函数内部的适当位置调用回调函数,将需要传递的参数传递给它。

例如,假设我们需要设置一个回调函数来处理异步操作的结果,可以按如下方式进行设置:

# 定义回调函数
def callback(result):
    print('Callback function is called with result: ', result)

# 异步函数,需要传入回调函数
def async_function(param1, param2, callback):
    # 进行异步操作
    result = param1 + param2
    # 异步操作完成后调用回调函数
    callback(result)

# 调用异步函数,并传入回调函数
async_function(1, 2, callback)

运行结果
在这里插入图片描述
在上面的代码中,我们先定义了一个回调函数callback,然后在异步函数async_function中将该函数作为参数传递,并在异步操作完成后调用回调函数,将操作结果传递给它。

通常情况下,我们会将回调函数定义为一个可调用对象,也就是实现了__call__方法的类对象。使用这种方式,可以更加灵活地定义回调函数,并且可以把一些状态或上下文信息存储在对象中,在回调函数中使用。

三. 进阶 - 使用functools.partial传递函数

1. functools.partial基本介绍

functools.partial 是 Python 标准库中的一个函数,用来部分应用一个函数(partial application),也就是固定函数的一部分参数,返回一个新的函数。
partial 函数的用法如下:

functools.partial(func, *args, **kwargs)

其中,func 是要部分应用的函数,*args 和 **kwargs 是要固定的参数。

具体来说,partial 函数会返回一个新的函数对象,这个新的函数对象跟原来的函数对象是相似的,但是将部分参数固定下来了,相当于原来的函数变成了一个带有默认参数的函数。我们可以用这个新的函数对象来调用原来的函数,而不必传入那些已经固定的参数。

下面是一个简单的示例代码,演示如何使用 partial 函数:

import functools

# 定义一个简单的加法函数
def add(a, b):
    return a + b

# 固定 add 函数的第一个参数
add2 = functools.partial(add, 2)

# 调用 add2 函数
print(add2(3))  # 输出:5

在上面的示例代码中,我们定义了一个简单的加法函数 add,然后使用 partial 函数将 add 函数的第一个参数固定为 2,得到一个新的函数对象 add2。接下来,我们使用 add2 函数来计算 2+3 的结果,并将结果输出到控制台。

使用 partial 函数的好处是可以更方便地定义新的函数,避免代码重复。比如,如果我们想定义一个加 3、加 4、加 5 的函数,可以使用 partial 来实现,而不必写多个类似的函数。

add3 = functools.partial(add, 3)
add4 = functools.partial(add, 4)
add5 = functools.partial(add, 5)

print(add3(2))  # 输出:5
print(add4(2))  # 输出:6
print(add5(2))  # 输出:7

在上面的示例代码中,使用 partial 函数分别定义了 3 个新的函数 add3、add4、add5,分别将加数固定为 3、4、5,然后使用这些新的函数计算 2+3、2+4、2+5 的结果,并将结果输出到控制台。

2. functools.partial进阶使用示例代码

接下来我们看一个socket连接成功之后调用回调函数的例子:

1. 启动一个socket server服务

import json
import os
import socket
import threading
import time
import sys
import traceback

HOST = '127.0.0.1'  # 服务器IP地址
PORT = 8000  # 服务器端口号
BACKLOG = 5  # 服务器监听队列大小,即最多同时接收多少个客户端连接
RECONNECT_INTERVAL = 5  # 重连间隔,单位:秒


def start_server():
    print(os.getpid())
    while True:
        try:
            # 创建一个 TCP/IP socket 对象
            server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

            # 绑定服务器 IP 地址和端口号
            server_socket.bind((HOST, PORT))

            # 开始监听客户端连接请求
            server_socket.listen(BACKLOG)
            print('服务器启动,监听端口:%s' % PORT)

            while True:
                # 等待客户端连接
                print('等待客户端连接...')
                try:
                    client_socket, client_address = server_socket.accept()
                    threading.Thread(target=send_msg, args=(client_socket, client_address)).start()
                    # send_msg(client_socket, client_address)
                    print(f"Process {threading.current_thread()}: {threading.active_count()} threads")
                    print(f"Total threads: {threading.active_count()}")

                    print('新客户端连接,地址:%s' % str(client_address))

                    # 读取客户端发送的数据
                    data = client_socket.recv(1024)
                    print('Received data:', data.decode())

                    # 向客户端发送数据
                    message = 'Welcome to my server!'
                    client_socket.sendall(message.encode())

                except Exception as e:
                    print('客户端连接异常,错误信息:%s' % e)

                finally:
                    # 关闭客户端连接
                    client_socket.close()
                    print('客户端连接已关闭')

        except Exception as e:
            print('服务器异常,错误信息:%s' % e)
            traceback.print_exc()

            # 关闭服务端 socket
            server_socket.close()
            print('{}s后尝试重连服务器...'.format(RECONNECT_INTERVAL))
            time.sleep(RECONNECT_INTERVAL)


def send_msg(client, addr):
    try:
        while 1:
            time.sleep(1)
            jsonTemplate = {
                "Command": "FORWARD_ELEV_INFO",
                "DeviceId": "C0002T",
                "ElevId": 1,
            }
            msg2Elev = json.dumps(jsonTemplate).encode() + "\n".encode()
            client.sendto(msg2Elev, addr)
            print('send msg to client:{}:{}'.format(addr, msg2Elev))
    except Exception as e:
        print('send_msg:{}'.format(e))


if __name__ == '__main__':
    # 启动服务器
    start_server()

2. 开启一个客户端连接,连接成功后调用回调函数

import functools
import json
import socket
import threading
import time
import traceback


class TestClient(threading.Thread):

    def __init__(self, connectHost, connectPort, callbackFunc):
        threading.Thread.__init__(self, name="TestClient")
        self.host = connectHost
        self.port = connectPort
        self.callbackFunc = callbackFunc
        self.sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    def run(self):

        self.connect()
        while True:
            try:
                # 从socket中读取数据
                # data = self.sck.recv(1024)
                # print(data)
                data = self.recv_msg(self.sck)
                if data is None:
                    time.sleep(1)
                    continue
                self.callbackFunc(data)
            except OSError:
                # An operation was attempted on something that is not a socket
                traceback.print_exc()
                time.sleep(5)
                # FIXME: if socket is broken, reconnect with the same sck does not work, so create an new one.
                self.sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.connect()

            except Exception as e:
                # TODO: if disconnected, connect it
                traceback.print_exc()
                time.sleep(5)
                self.connect()

    def recv_msg(self, sock):
        try:
            data = sock.recv(1024)
            print('recv data:{}'.format(data))
            return data
        except Exception as e:
            print('recv_msg:{}'.format(e))
            sock.close()
            time.sleep(0.5)

    def connect(self):

        while True:
            try:
                self.sck.connect((self.host, self.port))
                print("connected to Service {}:{}".format(self.host, self.port))
                break
            except ConnectionRefusedError:
                print("service refused or not started? Reconnecting to Service in 5s")
                time.sleep(5)

            except Exception as e:
                print("connect error type->{}".format(type(e)))
                traceback.print_exc()
                # FIXME: if other exception, not sure to restart action will work.
                time.sleep(5)


def callbackFunc(a, res):
    print(a)
    print('callback msg -- >', res)


if __name__ == '__main__':
    connectHost = '127.0.0.1'
    connectPort = 8000
    callbackFunc = callbackFunc

    elevClient = TestClient(connectHost, connectPort, functools.partial(callbackFunc, 'hello callback'))
    elevClient.start()

上面的程序定义了一个回调函数callbackFunc,在socket连接完成后将其作为参数传给TestClient类的构造函数,用于处理接收到的消息,通过回调方式处理从服务端发送回来的数据。

运行结果
在这里插入图片描述

以上就是关于python functools.partial设置回调函数处理异步任务基本使用介绍,希望对你有所帮助!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/950704.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

知了汇智承办网信人才培训活动第二期,助力数字化网安人才储备

在数字经济时代,随着信息化的快速发展和互联网的深度应用,网络信息安全问题日益突出,成为制约数字经济健康发展的重要因素。为了有效提升网络安全人才的专业素质和技术能力,保障国家信息安全。知了汇智作为数字产教融合基地&#…

学习JAVA打卡第四十七天

日期的格式化 程序可能希望按照某种习惯来输出时间。例如时间的顺序:年/月/日或年/月/日/时/分/秒。可以直接使用String类调用format方法对日期进行格式化。 Format方法 Format方法: format(格式化模式,日期列表) 按照“格式…

FFT代码上的实现细节

ω \omega ω 的计算 ω n 1 \omega_n^1 ωn1​ 的计算 考虑单位圆, ω n 1 \omega_n^1 ωn1​ 为: 也就是: 注:op为判断当前为dft还是idft ω n i \omega_n^i ωni​ 的计算 当要计算 ω n i \omega_n^i ωni​ 时&#xf…

NSS [NUSTCTF 2022 新生赛]Ezjava1

NSS [NUSTCTF 2022 新生赛]Ezjava1 题目描述:你能获取flag{1}吗 开题,一眼java web中的index.jsp。 默认index.jsp中的body内容是$END$ 附件jar包导入IDEA,会自动反编译。看看源码。 附件结构大致如此。主要看classes.com.joe1sn中的代码就…

一篇掌握高级交换技术原理与配置(三):QINQ

一、概述 随着以太网技术在网络中的大量部署,利用VLAN对用户进行隔离和标识受到很大限制。因为IEEE802.1Q中定义的VLAN Tag域只有12个比特,仅能表示4096个VLAN,无法满足城域以太网中标识大量用户的需求,于是QinQ技术应运而生。 …

远程管理通道安全SSH协议主机验证过程

可以使用SSH协议进行远程管理通道安全保护,其中涉及的主要安全功能包括主机验证、数据加密性和数据完整性保护。 这里要注意的是【主机验证】和【身份验证】的区别,主机验证是客户端确认所访问的服务端是目标访问对象,比如从从客户端A(192.16…

企业微信cgi-bin/gateway/agentinfo接口存在未授权访问漏洞 附POC

文章目录 企业微信cgi-bin/gateway/agentinfo接口存在未授权访问漏洞 附POC1. 企业微信cgi-bin/gateway/agentinfo接口简介2.漏洞描述3.影响版本4.fofa查询语句5.漏洞复现6.POC&EXP7.整改意见8.往期回顾 企业微信cgi-bin/gateway/agentinfo接口存在未授权访问漏洞 附POC 免…

8天长假快来了,Python分析【去哪儿旅游攻略】数据,制作可视化图表

目录 前言环境使用模块使用数据来源分析 代码实现导入模块请求数据解析保存 数据可视化导入模块、数据年份分布情况月份分布情况出行时间情况费用分布情况人员分布情况 前言 2023年的中秋节和国庆节即将来临,好消息是,它们将连休8天!这个长假…

iptables教程

iptables netfilter/iptables(简称iptables)是与2.4.x和2.6.x系列版本Linux内核集成的IP信息包过滤系统。 Iptables Tutorial 1、表和链 1.1、表 iptables会根据不同的数据包处理功能使用不同的规则表。它包括如下五个表:filter、nat和m…

写用例写的焦头烂额?看看摸鱼5年的老点工是怎么写的...

给你个需求,你要怎么转变成最终的用例? 直接把需求文档翻译一下就完事了。 老点工拿到需求后的标准操作: 第一步:解析需求 先解析需求-找出所有需求中的动词,再列出所有测试点。测试点过程不断发散,对于…

Revit SDK:SetParameterValueWithImageData 用图片像素值设置族实例参数值

前言 这个例子通过从图片中获取颜色,将颜色转换成数值,赋值给分割表面上对应族实例的对应参数。 内容 获取颜色的代码: Bitmap image new Bitmap(doc.PathName "_grayscale.bmp"); System.Drawing.Color pixelColor new Syst…

设计模式--迭代器模式(Iterator Pattern)

一、什么是迭代器模式 迭代器模式(Iterator Pattern)是一种行为型设计模式,用于提供一种统一的方式来访问一个聚合对象中的各个元素,而不需要暴露该聚合对象的内部结构。迭代器模式将遍历集合的责任从集合对象中分离出来&#xf…

MyBatisx代码生成

MyBatisx代码生成 1.创建数据库表 CREATE TABLE sys_good (good_id int(11) NOT NULL,good_name varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,good_desc varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,PRIMARY KEY (good_id) ) ENGINEInnoDB DEFAULT CHA…

[C/C++]函数的栈空间(避免栈空间溢出)

个人主页:北海 🎐CSDN新晋作者 🎉欢迎 👍点赞✍评论⭐收藏✨收录专栏:C/C🤝希望作者的文章能对你有所帮助,有不足的地方请在评论区留言指正,大家一起学习交流!&#x1f9…

C++八股记录

C内存管理 C中,内存分成5个区。 栈:函数内局部变量;自动管理,效率高,但空间较小; 堆:new分配的内存块;手动管理,效率低,但空间大; 自由存储区&…

ScottPlot图标控件的使用

效果图 控件获取方法 Nugget ScottPlot.WinForms 参考代码 Form1.Designer.cs namespace ScottplotDemo {partial class Form1{/// <summary>/// 必需的设计器变量。/// </summary>private System.ComponentModel.IContainer components null;/// <summary…

《动手学深度学习》-57长短期记忆网络LSTM

沐神版《动手学深度学习》学习笔记&#xff0c;记录学习过程&#xff0c;详细的内容请大家购买书籍查阅。 b站视频链接 开源教程链接 长短期记忆网络&#xff08;LSTM&#xff09; 长期以来&#xff0c;隐变量模型存在长期信息保存和短期输入缺失的问题。解决这一问题的最早…

bazel入门学习笔记

简介 Bazel Google开源的&#xff0c;是一款与 Make、Maven 和 Gradle 类似的开源构建和测试工具。 它使用人类可读的高级构建语言。Bazel 支持多种语言的项目&#xff0c;可为多个平台构建输出。Bazel支持任意大小的构建目标&#xff0c;并支持跨多个代码库和大量用户的大型代…

webpack loader和plugins的区别

在Webpack中&#xff0c;Loader和Plugin是两个不同的概念&#xff0c;用于不同的目的。 Loader是用于处理非JavaScript模块的文件的转换工具。它们将文件作为输入&#xff0c;并将其转换为Webpack可以处理的模块。例如&#xff0c;当您在Webpack配置中使用Babel Loader时&…

深入浅出AXI协议(3)——握手过程

一、前言 在之前的文章中我们快速地浏览了一下AXI4协议中的接口信号&#xff0c;对此我们建议先有一个简单的认知&#xff0c;接下来在使用到的时候我们还会对各种信号进行一个详细的讲解&#xff0c;在这篇文章中我们将讲述AXI协议的握手协议。 二、握手协议概述 在前面的文章…