python 多线程
# 进程和线程
- 进程:是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。
- 线程:是进程的一个实体,是 CPU 调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源。
# 进程
含义:
- 进程是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。
- 一个正在运行的程序就是一个进程,进程是系统资源分配的最小单位,是能独立运行的最小单位。
注意:进程可以创建多个线程,多进程也可以完成多任务
# 进程的状态
- 就绪状态:进程已经获得除 CPU 外的所有必要资源,等待分配 CPU。
- 执行状态:进程已获得 CPU,正在执行。
- 等待(阻塞)状态:等待某些条件满足,如一个程序 sleep 了,此时就是等待状态。
print('start') # 程序开始,处于执行状态 | |
input('') # 程序暂停,处于等待状态 | |
time.sleep(1) # 程序暂停,处于等待状态 | |
print('end') # 程序继续执行,处于执行状态 |
# 进程语法结构
multiprocessing 模块提供了 Process 类来代表一个进程对象。
- process 类参数
参数:
target
:进程函数args
:传递给进程函数的参数,类型为元组kwargs
:传递给进程函数的参数,类型为字典
- process 类方法
start()
:启动进程,调用进程函数join(timeout=None)
:阻塞主进程,直到子进程执行完毕,timeout 为等待时间,单位为秒is_alive()
:判断进程是否在执行terminate()
:强制终止进程close()
:关闭进程
- 常用属性
name
:当前进程名,默认为 Process-1,Process-2,Process-3…
pid
:当前进程 id
import multiprocessing | |
import time | |
import os | |
def sing(): | |
# os.getpid () 获取当前进程 id | |
print(f"sing子进程id:{os.getpid()}") | |
# os.getppid () 获取当前进程的父进程 id | |
print(f"sing子进程的父进程id:{os.getppid()}") | |
print('sing') | |
def dance(): | |
# os.getpid () 获取当前进程 id | |
print(f"dance子进程id:{os.getpid()}") | |
# os.getppid () 获取当前进程的父进程 id | |
print(f"sing子进程的父进程id:{os.getppid()}") | |
print('dance') | |
if __name__ == '__main__': | |
p1 = Process(target=sing, name="子进程1") | |
p2 = Process(target=dance, name="子进程2") | |
# 开启进程 | |
p1.start() | |
p2.start() | |
# 访问进程属性 | |
print(p1.name) | |
print(p2.name) | |
print(p1.pid) | |
print(p2.pid) | |
# 阻塞主进程,直到子进程执行完毕 | |
p1.join() | |
p2.join() | |
# 判断进程是否在执行 | |
print(p1.is_alive()) | |
print(p2.is_alive()) | |
# 强制终止进程 | |
p1.terminate() | |
p2.terminate() | |
# 关闭进程 | |
p1.close() | |
p2.close() | |
print('end') | |
print(f"主进程id:{os.getpid()},主进程的父进程id:{os.getppid()}") |
注意: cmd
查看进程: tasklist
可以查看电脑所有进程
传参
import multiprocessing | |
import time | |
import os | |
def sing(name): | |
print(f"sing子进程id:{os.getpid()}") | |
print(f"sing子进程的父进程id:{os.getppid()}") | |
print(f"{name}在唱歌") | |
def dance(name): | |
print(f"dance子进程id:{os.getpid()}") | |
print(f"sing子进程的父进程id:{os.getppid()}") | |
print(f"{name}在跳舞") | |
if __name__ == '__main__': | |
p1 = multiprocessing.Process(target=sing, args=("张三",)) # 传递参数 | |
p2 = multiprocessing.Process(target=dance, kwargs={"name": "李四"}) # 传递参数 | |
# 开启进程 | |
p1.start() | |
p1.join() | |
p2.start() | |
p2.join() | |
print("p1存活状态:", p1.is_alive()) | |
print("p2存活状态:", p2.is_alive()) |
# 进程间不共享全局变量
import multiprocessing | |
import time | |
import os | |
li = [] | |
def wdata(): | |
for i in range(10): | |
li.append(i) | |
time.sleep(0.1) | |
def rdata(): | |
print(f"读取数据{li}") | |
# 1. 防止别人导入文件时执行 main 里面的方法 | |
# 2. 防止 windows 系统递归创建子进程 | |
if __name__ == '__main__': | |
p1 = multiprocessing.Process(target=wdata) | |
p2 = multiprocessing.Process(target=rdata) | |
# 开启进程 | |
p1.start() | |
p1.join() | |
p2.start() |
# 进程间通信
进程间通信(Inter-Process Communication,IPC)是指进程之间进行数据交换的机制。
# Queue 队列
Queue 是 Python 中的一种数据结构,它是一种先进先出(FIFO)的数据结构,可以用来实现进程间的通信。
方法:
Queue(maxsize=0)
:创建一个队列对象,maxsize 为队列大小,0 表示队列大小无限制put(item, block=True, timeout=None)
:将 item 放入队列,block 为是否阻塞,timeout 为等待时间,单位为秒get(block=True, timeout=None)
:从队列中取出一个元素,block 为是否阻塞,timeout 为等待时间,单位为秒empty()
:判断队列是否为空qsize()
:返回队列中元素个数full()
:判断队列是否已满
初始化一个队列
from multiprocessing import Queue | |
q = Queue(3) |
向队列中添加元素
from multiprocessing import Queue | |
q = Queue(3) | |
q.put(1) | |
q.put(2) | |
q.put(3) |
从队列中取出元素
from multiprocessing import Queue | |
q = Queue(3) | |
q.put(1) | |
q.get() # 获取队列中的元素,然后删除该元素 |
判断队列是否为空
from multiprocessing import Queue | |
q = Queue(3) | |
q.put(1) | |
q.put(2) | |
q.put(3) | |
q.empty() # 判断队列是否为空,为空返回 True,否则返回 False |
判断队列是否已满
from multiprocessing import Queue | |
q = Queue(3) | |
q.put(1) | |
q.put(2) | |
q.put(3) | |
q.full() # 判断队列是否已满,已满返回 True,否则返回 False |
获取队列中元素个数
from multiprocessing import Queue | |
q = Queue(3) | |
q.put(1) | |
q.put(2) | |
q.put(3) | |
q.qsize() # 获取队列中元素个数 |
阻塞和非阻塞
from multiprocessing import Queue | |
q = Queue(3) | |
q.put(1) | |
q.put(2) | |
q.put(3) | |
q.get(block=True, timeout=3) # 阻塞,等待 3 秒,如果 3 秒后队列中还有元素,则取出元素,否则抛出异常 | |
q.get(block=False) # 非阻塞,如果队列中有元素,则取出元素,否则抛出异常 |
进程间通信
from multiprocessing import Process, Queue | |
import time | |
li = ["张三", "李四", "王五", "赵六"] | |
def wdata(q): | |
for i in li: | |
print(f"{i}已经被放入") | |
q.put(i) | |
time.sleep(0.1) | |
print("写入数据是:",li) | |
# 读数据 | |
def rdata(q): | |
while True: | |
if q.empty(): | |
break | |
else: | |
print("取数据:",q.get()) | |
print("读取数据是:",li) | |
if __name__ == '__main__': | |
# 创建队列 | |
q = Queue() | |
p1 = Process(target=wdata, args=(q,)) | |
p2 = Process(target=rdata, args=(q,)) | |
p1.start() | |
p1.join() | |
p2.start() |
# 多线程
# 多线程实现
- 直接使用 threading 模块
参数:
target
:线程函数args
:传递给线程函数的参数,类型为元组kwargs
:传递给线程函数的参数,类型为字典
import threading | |
import time | |
def run(n): | |
print('task', n) | |
time.sleep(1) | |
print('task done', n) | |
if __name__ == '__main__': | |
t1 = threading.Thread(target=run, args=('t1',)) | |
t2 = threading.Thread(target=run, args=('t2',)) | |
# 守护线程,必须在线程启动前设置,否则无效 | |
t1.setDaemon(True) | |
t2.setDaemon(True) | |
t1.start() | |
t2.start() | |
# 阻塞子线程,必须在线程启动后设置,否则无效 | |
t1.join() | |
t2.join() | |
# 获取线程名字 | |
print(t1.getName()) | |
print(t2.getName()) | |
# 更改线程名字 | |
t1.setName('线程1') | |
t2.setName('线程2') | |
print(t1.getName()) | |
print(t2.getName()) | |
print('end') |
# 线程之间共享资源(全局变量)
import threading | |
import time | |
num = 0 | |
def run(n): | |
global num | |
num += 1 | |
print('task', n) | |
time.sleep(1) | |
print('task done', n) | |
if __name__ == '__main__': | |
t1 = threading.Thread(target=run, args=('t1',)) | |
t2 = threading.Thread(target=run, args=('t2',)) | |
t1.start() | |
t2.start() | |
t1.join() | |
t2.join() | |
print('num:', num) |
# 资源竞争
import threading | |
import time | |
a = 0 | |
b = 1000000 | |
def add1(): | |
for i in range(b): | |
global a | |
a += 1 | |
print('add1:', a) | |
def add2(): | |
for i in range(b): | |
global a | |
a += 1 | |
print('add2:', a) | |
if __name__ == '__main__': | |
t1 = threading.Thread(target=add1) | |
t2 = threading.Thread(target=add2) | |
t1.start() | |
t2.start() | |
t1.join() | |
t2.join() | |
print('a:', a) |
# 线程同步
两种方式:
- join 阻塞
import threading | |
import time | |
a = 0 | |
b = 1000000 | |
def add1(): | |
for i in range(b): | |
global a | |
a += 1 | |
print('add1:', a) | |
def add2(): | |
for i in range(b): | |
global a | |
a += 1 | |
print('add2:', a) | |
if __name__ == '__main__': | |
t1 = threading.Thread(target=add1) | |
t2 = threading.Thread(target=add2) | |
t1.start() | |
t1.join() # 阻塞 t1 线程,t1 执行完后再执行 t2 | |
t2.start() | |
t2.join() | |
print('a:', a) |
- 互斥锁
概念:互斥锁为资源引入了一个状态:锁定 / 非锁定。某个线程要更改共享数据时,先要将该资源锁定,此时其他线程不能更改;直到该线程释放资源,其他线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。
acquire()
:获取锁,如果锁已经被其他线程获取,那么调用该方法的线程会被阻塞,直到锁被释放为止release()
:释放锁
import threading | |
import time | |
from threading import Lock | |
a = 0 | |
b = 1000000 | |
lock = Lock() | |
def add1(): | |
lock.acquire() # 加锁 | |
for i in range(b): | |
global a | |
a += 1 | |
print('add1:', a) | |
lock.release() # 解锁 | |
def add2(): | |
lock.acquire() | |
for i in range(b): | |
global a | |
a += 1 | |
print('add2:', a) | |
lock.release() | |
if __name__ == '__main__': | |
t1 = threading.Thread(target=add1) | |
t2 = threading.Thread(target=add2) | |
t1.start() | |
t2.start() | |
t1.join() | |
t2.join() | |
print('a:', a) |
注意:锁的粒度越细,性能越好,但锁的个数越多,性能越差。
# 协程
协程,又称微线程,纤程。英文名 Coroutine。协程是一种用户态的轻量级线程。
协程拥有自己的 CPU 寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此,协程能保留上一次调用时的状态,每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处的逻辑流中。
协程的调用比线程的调用节省资源,因为协程不需要操作系统来进行调度,而是由程序员自己来控制协程的切换。
# 协程实现
# 使用 yield 关键字
yield
关键字可以将一个函数变成一个生成器,生成器是一种特殊的迭代器,可以通过 next()
函数来获取下一个值。
应用场景:
- 如果一个线程里面 IO 操作比较多时,可以用协程
- 适合高并发处理
使用:
def task1(): | |
yield 'a' | |
yield '哈哈' | |
def task2(): | |
yield 'b' | |
yield '嘿嘿' | |
if __name__ == '__main__': | |
t1 = task1() | |
t2 = task2() | |
print(next(t1)) | |
print(next(t2)) | |
print(next(t1)) | |
print(next(t2)) |
# greenlet 模块
greenlet
模块是一个第三方模块,用于实现协程。
注意: greenlet
只能手动切换,当遇到 IO 操作时,程序会阻塞,需要手动切换到其他协程。
格式:
a = greenlet(函数名)
创建一个协程对象xxx.switch()
切换到下一个协程xxx
使用:
from greenlet import greenlet | |
def sing(): | |
print("singing") | |
print("singing end") | |
def dance(): | |
print("dancing") | |
print("dancing end") | |
if __name__ == '__main__': | |
g1 = greenlet(sing) | |
g2 = greenlet(dance) | |
# 切换到 g1 | |
g1.switch() | |
# 切换到 g2 | |
g2.switch() |
# gevent 模块
gevent
模块是一个第三方模块,用于实现协程。
注意: gevent
可以自动切换,当遇到 IO 操作时,程序会自动切换到其他协程。
格式:
gevent.spawn(函数名)
创建一个协程对象gevent.sleep()
耗时操作gevent.join()
阻塞,等待某个协程执行完毕gevent.joinall()
等待所有协程执行完毕,再退出,参数为协程列表
使用:
import gevent | |
def sing(): | |
print("在唱歌") | |
gevent.sleep(2) | |
print("唱歌结束") | |
def dance(): | |
print("在跳舞") | |
gevent.sleep(3) | |
print("跳舞结束") | |
if __name__ == '__main__': | |
# 创建协程对象 | |
g1 = gevent.spawn(sing) | |
g2 = gevent.spawn(dance) | |
# 阻塞,等待某个协程执行完毕 | |
g1.join() | |
g2.join() |
gevent.joinall()
import gevent | |
def sing(name): | |
for i in range(3): | |
gevent.sleep(1) | |
print(f'{name}在唱歌,第{i}次') | |
if __name__ == '__main__': | |
# 创建协程对象 | |
g1 = gevent.spawn(sing, '张三') | |
g2 = gevent.spawn(sing, '李四') | |
# 阻塞,等待所有协程执行完毕 | |
gevent.joinall([g1, g2]) |
monket 补丁
- 作用:拥有在模块运行时替换的功能
- 注意:monkey.patch_all () 必须放在被打补丁的模块之前,否则无效
使用:
import gevent | |
import time | |
from gevent import monkey | |
monkey.patch_all() # 将所有的 IO 操作都变成 gevent 的协程,即将 time.sleep () 替换程 gevent 里面实现耗时操作的 gevent.sleep () 代码 | |
def sing(name): | |
for i in range(3): | |
time.sleep(1) | |
print(f'{name}在唱歌,第{i}次') | |
if __name__ == '__main__': | |
# 创建协程对象 | |
g1 = gevent.spawn(sing, '张三') | |
g2 = gevent.spawn(sing, '李四') | |
# 阻塞,等待所有协程执行完毕 | |
gevent.joinall([g1, g2]) |
总结:
- 线程是 CPU 调度的基本单位,进程是资源分配的基本单位
- 进程、线程和协程的区别
- 进程:切换需要资源最大,效率最低
- 线程:切换需要资源中等,效率中等
- 协程:切换需要资源最小,效率最高
- 进程和线程是操作系统调度的,协程是程序员调度的
- 多线程适合 IO 密集型操作,多进程适合 CPU 密集型操作,协程适合 IO 密集型操作
- 进程、线程和协程都可以完成多任务,根据自己实际开发需要选择使用。