Python进阶(二)

news2024/10/6 5:55:27

前言:本篇博客主要记录Python进程的创建、进程间的通信、进程池的使用、线程的创建、多线程的执行、同步和互斥、协程的创建和应用。

目录

思维导图

基本概念

进程

进程的创建

进程间的通信

进程池

线程

线程的创建

子线程的执行顺序

同步&互斥

互斥锁

死锁

生产者消费者模式

协程

协程的创建

进程线程协程的对比

 总结


思维导图

基本概念

首先来了解几个基本概念,什么是进程?什么是线程?

进程:资源分配的最小工具(管理:一个进程下包含线程)

线程:操作系统调度执行的最小单位(执行)

协程:比线程更轻量级的微线程

三者的关系,如图,同一进程可以有多个线程,同一线程内可以有多个协程

下面的这篇文件介绍得更为详细,推荐阅读。

干货 | 进程、线程、协程 10 张图讲明白了! - 知乎 (zhihu.com)

什么是并行?什么是并发?

并行:同一时间点,任务同时地运行,比如一台电脑,有8个CPU,每个CPU的每个核心都可以独立地执行一个任务,在同一时间点,可同时执行8个任务,这时任务是同时执行,并行地运行任务。

并发:同一时间点,只能去执行一个任务,只不过任务之间快速切换,使得看上去像是多个任务在同时运行,但实际并非如此。

进程

进程的创建

创建一个进程,其中multiprocessing 模块就是跨平台版本的多进程模块,它提供了一个 Process 类来代表一个进程对象,可以执行另外的任务,创建子进程时,只需要传入一个执行函数和函数的参数,创建一个 Process 实例,用 start()方法启动。

import time
from multiprocessing import Process
def test1():
    while True:
        print("--------------test1----------------")
        time.sleep(10)

def test2():
    while True:
        print("--------------test2----------------")
        time.sleep(10)
p1 = Process(target=test1())
p2 = Process(target=test2())
p1.start()
p2.start()

multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={})

参数说明:

group:指定进程组

target:指定函数,执行指定的函数

name:给进程设定一个名字,可以不设定

args:给 target 指定的函数传递的参数,以元组的方式传递

kwargs:给 target 指定的函数传递命名参数

启动子进程实例 strat()

判断进程子进程是否还存在 is_alive()

关闭子进程 terminate()

等待子进程执行结束 join()

举例:执行传递参数的函数

from multiprocessing import Process
def runProc(*args, **kwargs):
    print(args, kwargs)
if __name__ == '__main__':
    P = Process(target=runProc, args=('test', 123), kwargs={"name": "test1","age":35})
    P.start()
    P.join()
    print("over!")

开启一个子进程,实例代码:

from multiprocessing import Process
import time
import os
class Process_Class(Process):
    def __init__(self, interval):
        Process.__init__(self)
        self.interval = interval

    def run(self):
        print("子进程(%s) 开始执行,父进程为(%s)" % (os.getpid(), os.getppid()))
        t_start = time.time()
        time.sleep(self.interval)
        t_stop = time.time()
        print("(%s)执行结束,耗时%0.2f秒" % (os.getpid(), t_stop - t_start))
if __name__ == "__main__":
    t_start = time.time()
    print("当前程序进程(%s)" % os.getpid())
    p1 = Process_Class(2)
    p1.start()
    p1.join()
    t_stop = time.time()
    print("(%s)执行结束,耗时%0.2f" % (os.getpid(), t_stop - t_start))

进程间的通信

上面我们学习了如何去创建进程,那不同进程间是如何通信的?

可以使用 multiprocessing 模块的 Queue ,实现多进程之间的数据传递,Queue 本身是一个消息 列队程序,这是操作系统开辟的一个空间,各个子程序可以把信息放进去,也可以取走,一个进程可以放多条消息到Queue中。

import multiprocessing
q = multiprocessing.Queue(3)
q.put("test1")
q.put("test2")
print(q.full())

q.put("test3")
print(q.full())

try:
    q.put("test4",True,2)
except:
    print("消息队列已满,现有消息数量%s"%q.qsize())
try:
    q.put_nowait("test4")
except:
    print("消息队列已满,现有消息数量%s" % q.qsize())
if not q.full():
    q.put_nowait("test4")
if not q.empty():
    for i in range(q.qsize()):
        print(q.get_nowait())

看一下这段代码的知识点:

初始化 Queue()对象时(例如:q = Queue()),若括号中没有指定最大可接收的消息数量, 或数量为负值,那么就代表可接受的消息数量没有上限(直到内存的尽头) 

返回当前队列包含的消息数量 Queue.qsize()

判断队列是否为空 Queue.empty()

判断队列是否满了 Queue.full()

put、get的用法:

Queue.put(item,block, timeout) 

Queue.put_nowait(item),相当于 Queue.put(item, False)

将 item 消息写入队列,block 默认值为 True 如果 block 使用默认值,且没有设置 timeout(单位秒),消息列队如果已经没有空间可写入,此时程序将被阻塞(停在写入状态),直到从消息列队腾出空间为止, 如果设置了 timeout,则会等待 timeout 秒,若还没空间,则抛出"Queue.Full"异常,如果 block 值为 False,消息列队如果没有空间可写入,则会立刻抛出"Queue.Full" 异常。

get用法与put用法,参数一样,不再赘述。

进程池

结合上面的进程间的通信,写两个函数,一个是将消息依次放入队列里,一个是从队列里依次取出消息,具体代码如下:

import multiprocessing
import time
import random


def write(q):
    for value in ['a', 'b', 'c']:
        print("put %s to queue" % (value))
        q.put(value)
        time.sleep(random.random())


def read(q):
    while True:
        if not q.empty():
            value = q.get(True)
            print("get %s from queue" % (value))
            time.sleep(random.random())
        else:
            break


if __name__ == '__main__':
    q = multiprocessing.Queue()
    pw = multiprocessing.Process(target=write, args=(q,))
    pr = multiprocessing.Process(target=read, args=(q,))

    pw.start()
    pw.join()

    pr.start()
    pr.join()

    print("")
    print("over")

下面用进程池去实现:

这段代码对比了顺序执行和并行执行耗费的时间,代码执行后的结果可以看到,并行的耗费的时间要小于顺序执行的时间。

import time
from multiprocessing import Pool


def run(f):
    time.sleep(1)
    return f * f


if __name__ == '__main__':
    test = [1, 2, 3, 4, 5, 6]
    print("顺序执行")
    s = time.time()
    for f in test:
        run(f)
    e1 = time.time()
    print("顺序执行耗费时间:", int(e1 - s))
    print("创建多个线程")
    pool = Pool(5)
    r1 = pool.map(run, test)
    pool.close()
    pool.join()
    e2 = time.time()
    print("并行执行耗费时间:", int(e2 - e1))
    print(r1)

 下面的代码展示了进程池中进程间是如何通信的,

其中:apply_async(func,args,kwds),使用非阻塞方式调用 func(并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程),args 为传递给 func 的参数列表,kwds为传递给 func 的关键字参数列表。

import multiprocessing
import time, os, random
from multiprocessing import Pool


def reader(q):
    print("read启动(%s),父进程为%s" % (os.getpid(), os.getppid()))
    for i in range(q.qsize()):
        print("Queue 从队列中获取到消息%s" % q.get(True))


def writer(q):
    print("writer启动(%s),父进程为%s" % (os.getpid(), os.getppid()))
    for i in "test001":
        q.put(i)


if __name__ == '__main__':
    print("(%s) start" % os.getpid())
    q = multiprocessing.Manager().Queue()
    po = multiprocessing.Pool()
    po.apply_async(writer, (q,))
    time.sleep(1)
    po.apply_async(reader, (q,))
    po.close()
    po.join()
    print("(%s) end" % os.getpid())

线程

线程的创建

创建一个单线程

import time

def test():
    print("test")
    time.sleep(1)

if __name__ == '__main__':
    StartTime = time.time()
    for i in range(5):
        test()
    EndTime = time.time()
    print("耗时%s" % (EndTime - StartTime))

创建一个多线程,threading.Thread(target=test),用.start()方法启动线程。

import time,threading

def test():
    print("test")
    time.sleep(1)

if __name__ == '__main__':
    StartTime = time.time()
    for i in range(5):
        t = threading.Thread(target=test)
        t.start()
    EndTime = time.time()
    print("\n耗时%s" % (EndTime - StartTime))

 从执行结果上,可以明显看到多线程并发耗时要比单线程少很多。

子线程的执行顺序

可以看到下面的这段程序,子线程执行顺序是不确定的。

import time, threading
def test1():
    for i in range(5):
        print("---test1---")
        time.sleep(1)
def test2():
    for i in range(5):
        print("---test2---")
        time.sleep(1)

if __name__ == '__main__':
    t1 = threading.Thread(target=test1)
    t2 = threading.Thread(target=test2)
    print("线程启动前,线程数是:", len(threading.enumerate()))
    t1.start()
    t2.start()
    print("线程启动后,线程数是:", len(threading.enumerate()))

整个线程的生命周期:

1、new新建,初始化后,进入Runnable状态

2、Running运行:Runnable就绪,等待线程调度,调度后进入运行状态

4、Blocked阻塞,暂停运行,可能的情况有:sleep、locked、处理业务上的数据

5、Dead消亡。线程方法执行完毕返回或者异常终止

多线程之间全局变量是可以共享的,但不同线程对全局变量的修改可能带来数据混乱的问题,即线程非安全。代码示例如下:

import time, threading
num_test = 0
def test1(num):
    global num_test
    for i in range(num):
        num_test += 1
    print("test1中num_test的值是:%s"%(num_test))
def test2(num):
    global num_test
    for i in range(num):
        num_test += 1
    print("test2中num_test的值是:%s"%(num_test))

if __name__ == '__main__':
    t1 = threading.Thread(target=test1,args=(1000000,))
    t2 = threading.Thread(target=test2,args=(1000000,))
    t1.start()
    t2.start()
    time.sleep(10)
    print("num_test的值是:",num_test)

结果如下,可以看到输出的结果出错了。 那如何处理呢?引出互斥,锁的概念。

同步&互斥

什么是同步?什么是互斥?

同步:程序必须按规定的某种顺序执行,程序A的运行依赖程序B运行产生的数据

互斥:同一公共资源,在同一时间点,只能由一个进程或线程使用

互斥锁

上面的例子中,当多个线程需要同时去修改公共数据时,就需要加入同步控制,安全访问资源的有效方式是引入互斥锁,其状态有锁定、非锁定。当某个线程需要改共享数据时,先锁定,这样,其他线程就不能更改数据,直到该线程释放资源,资源状态变成非锁定状态,其他线程才能再次锁定,然后修改,互斥锁的机制保证了每次写入操作只有一个线程,在多线程的情况下,保护了数据的正确性

threading 模块中定义了 Lock 类, 创建锁:mutex = threading.Lock(),加锁:mutex.acquire(),释放锁:mutex.release()

import multiprocessing
import time, threading
num_test = 0

def test1(num):
    global num_test
    for i in range(num):
        mutex.acquire()
        num_test += 1
        mutex.release()
    print("test1中num_test的值是:%s"%(num_test))
def test2(num):
    global num_test
    for i in range(num):
        mutex.acquire()
        num_test += 1
        mutex.release()
    print("test2中num_test的值是:%s"%(num_test))

if __name__ == '__main__':
    mutex = threading.Lock()
    t1 = threading.Thread(target=test1,args=(1000000,))
    t2 = threading.Thread(target=test2,args=(1000000,))
    t1.start()
    t2.start()
    time.sleep(10)
    print("num_test的值是:",num_test)

可以看到,这次最终计算结果是正确的。 

 

死锁

下面我们来看一下,加锁时出现死锁的特殊情况,所谓死锁,就是多个进程一直在互相等待,程序中应该尽量避免死锁的出现

下面的这篇博客举出的死锁例子很好,可以了解一下

python—多线程之死锁_python 死锁_敲代码敲到头发茂密的博客-CSDN博客

生产者消费者模式

什么是生产者消费者模式?

生产者和消费者通过阻塞队列来进行通讯,解决生产者和消费者的强耦合问题,在生产者生产完数据之后不用等待消费者处理,消费者也不找生产者要数据,而是通过一个阻塞队列来放数据取数据。

实现代码示例:

import random
import threading
import time
from queue import Queue
class Producer(threading.Thread):
    def __init__(self, name, queue):
        threading.Thread.__init__(self, name=name)
        self.data = queue

    def run(self):
        for i in range(5):
            print("%s正在生产%d" % (self.getName(), i))
            self.data.put(i)
            time.sleep(random.randint(1, 3))
        print("%s生产结束!" % (self.getName()))
class Consumer(threading.Thread):
    def __init__(self, name, queue):
        threading.Thread.__init__(self, name=name)
        self.data = queue

    def run(self):
        for i in range(5):
            value = self.data.get()
            print("%s正在消费%d" % (self.getName(), value))
            time.sleep(random.randint(1, 3))
        print("%s消费结束!" % (self.getName()))

if __name__ == '__main__':
    queue = Queue()
    producer = Producer('Producer', queue)
    consumer = Consumer('Consumer', queue)
    producer.start()
    consumer.start()
    producer.join()
    consumer.join()
    print("over")

协程

协程的创建

用yeild实现一个简单的协程,代码如下:

import time
def test1():
    while True:
        print("test1")
        yield
        time.sleep(1)
def test2():
    while True:
        print("test2")
        yield
        time.sleep(1)
def main():
    t1 = test1()
    t2 = test2()
    while True:
        next(t1)
        next(t2)
if __name__ == '__main__':
    main()

用greenlet创建协程,代码如下:

import time
import greenlet
def test1():
    while True:
        print("test1")
        g2.switch()
        time.sleep(1)
def test2():
    while True:
        print("test2")
        g1.switch()
        time.sleep(1)
g1=greenlet.greenlet(test1)
g2=greenlet.greenlet(test2)
g1.switch()
g2.switch()

用gevent实现协程,代码如下:

import gevent
def f(n):
    for i in range(n):
        print(gevent.getcurrent(),i)
        gevent.sleep(1)
g1 = gevent.spawn(f,5)
g2 = gevent.spawn(f,5)
g3 = gevent.spawn(f,5)
g1.join()
g2.join()
g3.join()

 或者给程序打补丁

import time
import gevent
from gevent import monkey
monkey.patch_all()
def f(n):
    for i in range(n):
        print(gevent.getcurrent(),i)
        time.sleep(1)
g1 = gevent.spawn(f,5)
g2 = gevent.spawn(f,5)
g3 = gevent.spawn(f,5)
g1.join()
g2.join()
g3.join()

进程线程协程的对比

 总结

本篇博客学习了Python中的进程、线程、协程的使用,还要多加练习,多在实际中应用感受多线程编程以及协程的使用,这部分是比较抽象,也是比较重要的一部分,python进阶的两篇学习博客暂告一段落,还是要多敲代码。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/860846.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

惊讶,日本用“Excel”作画,中国却用“Excel”造“另类”软件

精益求精 表格是一项伟大的创造,它的出现改变了人类记录、分析和展示数据的方式。 随着科技的日益月异的更新,从最早的纸质表格到现代化的电子表格,人类对表格的改造也在不断的升级和进步。 最初的纸质表格的简单记录,无法进行复…

企业权限管理(八)-登陆使用数据库认证

Spring Security 使用数据库认证 在 Spring Security 中如果想要使用数据进行认证操作,有很多种操作方式,这里我们介绍使用 UserDetails 、 UserDetailsService来完成操作。 UserDetails public interface UserDetails extends Serializable { Collecti…

汇聚行业精英,共探创新之道:首届NDI生态系统行业峰会即将登陆北京!

IP化和轻量化是媒体制播技术发展的主要趋势。为了推进媒体行业的IP制播技术应用发展,提供一个专业、全面的媒体技术交流平台,长沙千视电子科技有限公司将联合NDI官方、NDI生态产品厂家和NDI技术应用媒体单位于2023年8月22日在北京举办国内首届NDI生态系统…

嵌入式开发学习(STC51-11-中断系统)

内容 外部中断-使用独立按键K3控制LED亮灭; 定时器(中断)-通过定时器0中断控制D1指示灯间隔1秒闪烁; 串口通信(中断)-通过串口(UART)实现与PC机对话,51单片机的串口收…

Leetcode算法递归类—合并两个有序链表

目录 21. 合并两个有序链表 题解: 代码: 将两个升序链表合并为一个新的 升序 链表并返回。新链表是通过拼接给定的两个链表的所有节点组成的。 示例 1: 输入:l1 [1,2,4], l2 [1,3,4] 输出:[1,1,2,3,4,4]示例 2&a…

logstash 采集 docker 日志

1、nginx容器部署 参考:nginx容器部署 将容器内的nginx日志文件映射到宿主机/home/logs/nginx目录下 注意:并且需要需要将日志的输出格式修改为json 2、编辑vim /opt/logstash-7.4.2/config/nginx-log-es.conf 文件,收集docker nginx容器日…

【时间格式引发的事故】

时间格式引发的事故 背景实战演示结论 背景 前不久写了一个删除数据接口,条件是根据时间删除时间后面的数据。入参是 时间字符串。后台的时间格式 是 yyyyMMdd。然后当时前端传参数的时候,随意的传了2023-07-31的时间,然后将该表的数据全部删…

CCLINK IE转MODBUS-TCP网关modbus tcp协议详解

你是否曾经遇到过需要同时处理CCLINK IE FIELD BASIC和MODBUS两种数据协议的情况?捷米的JM-CCLKIE-TCP网关可以帮助你解决这个问题。 捷米JM-CCLKIE-TCP网关可以分别从CCLINK IE FIELD BASIC一侧和MODBUS一侧读写数据,然后将数据存入各自的缓冲区。接着…

【学习】若依源码(前后端分离版)之 “ 用户管理根据不同角色、部门显示数据范围”

大型纪录片:学习若依源码(前后端分离版)之 “ 用户管理根据不同角色、部门显示数据范围” 前端部分后端部分“ /list " 方法" /treeselect " 方法 结语 起因是我想做一个根据不同角色以及其所在的部门展示其相应的信息&#x…

StarRocks 3.1重磅发布,云原生湖仓新范式再升级!

StarRocks 自4月底发布3.0版本,拥抱云原生,开启极速统一的湖仓新范式;8月7日,StarRocks 正式发布全新3.1版本,全面提升云原生存算分离构架、极速数据湖分析、物化视图等重量级特性,让用户更简单的实现极速统…

前端性能优化之性能优化的指标和工具(chrome devtools、lighthouse、webpagetest)

文章目录 引言一、为什么要进行web性能优化二、RAIL测量模型1. 什么是RAIL2. 性能测量工具 三、性能测量工具的使用和性能指标以及优化目标1. Chrome DevTools1. 打开调试工具方式和配置2. network下的几个性能指标1. requests 请求总数2. transferred实际从服务器下载的数据量…

uni-app:实现点击按钮,进行数据累加展示(解决数据过多,导致出错)

效果 代码 核心代码 一、标签显示 <!-- 加载更多 --> <view class"load_more" v-if"info.length > pageNum * pageSize" tap"loadMore">加载更多 </view> v-if"info.length > pageNum * pageSize"&#xf…

主数据管理案例-某研究所

1、 背景介绍及难点分析 某军工研究所是机电类科研生产一体化研究所&#xff0c;具有多品种、小批量、离散性、央企、军工保密等特点&#xff0c;在数据管理系统和研制管理体系的控制下&#xff0c;设计、工艺、 制造、试验、售后服务等环节都产生了大量的数据。在管理信息化、…

影像维修工程师专项技能培训

最近遇到很多咨询的人员都在对医疗行业产生疑惑&#xff0c;新闻报道说很多医院、公司的领导都被查&#xff0c;这样会不会影响设备维修方面&#xff0c;对后期找工作等有没有影响&#xff1f;总不能学好了技术却没有发挥的余地&#xff1f; 最近确实是国家整体在对医疗方面做…

Win11 VS2022 配置CGAL-5.6

由于项目要用到几何库CGAL&#xff0c;因此做了配置。采用的是官方文档中的“Installing from the Source Archive”方式。 1. 下载安装CGAL &#xff08;1&#xff09;CGAL-5.6.zip下载地址&#xff1a;Releases CGAL/cgal GitHub 下载下图所示的两个文件。 &#xff08…

LeetCode面向运气之Javascript—第27题-移除元素-98.93%

LeetCode第27题-移除元素 题目要求 一个数组nums和一个值val&#xff0c;你需要原地移除所有数值等于val的元素&#xff0c;并返回移除后数组的新长度 举例 输入&#xff1a;nums [3,2,2,3], val 3 输出&#xff1a;2, nums [2,2] 输入&#xff1a;nums [0,1,2,2,3,0,4,2…

【C++】——模板

目录 泛型编程函数模板函数模板的概念函数模板格式&#xff1a;函数模板的原理函数模板的实例化模板参数的匹配原则 类模板类模板定义格式类模板的实例化 泛型编程 泛型编程&#xff1a;编写与类型无关的通用代码&#xff0c;是代码复用的一种手段。模板是泛型编程的基础 引例…

并发——线程池实践

文章目录 1. 使用 ThreadPoolExecutor 的构造函数声明线程池2.监测线程池运行状态3.建议不同类别的业务用不同的线程池4.别忘记给线程池命名5.正确配置线程池参数常规操作美团的骚操作 简单总结一下我了解的使用线程池的时候应该注意的东西&#xff0c;网上似乎还没有专门写这…

带你了解科研院所

一、什么是科研院所 研究院是独立于教育部和高校系统之外的&#xff0c;以科研工作为业务核心的各级、各类研究机构。独立研究院有很多种&#xff0c;其中实力最强、名气最大、分布最广、数量最集中的是直属国务院的中科院、社科院两大科研系统中的各类研究所和研究中心。 两大…

大数据课程I1——Kafka的概述

文章作者邮箱&#xff1a;yugongshiyesina.cn 地址&#xff1a;广东惠州 ▲ 本章节目的 ⚪ 了解Kafka的概念&#xff1b; ⚪ 掌握Kafka的配置与启动&#xff1b; 一、简介 1. 基本概念 Apache kafka 是一个分布式数据流平台。可以从如下几个层面来理解&#x…