python多线程

进程和线程

  • 进程:是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。
  • 线程:是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源。

进程

含义:

  • 进程是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。
  • 一个正在运行的程序就是一个进程,进程是系统资源分配的最小单位,是能独立运行的最小单位。
    注意:进程可以创建多个线程,多进程也可以完成多任务

进程的状态

  1. 就绪状态:进程已经获得除CPU外的所有必要资源,等待分配CPU。
  2. 执行状态:进程已获得CPU,正在执行。
  3. 等待(阻塞)状态:等待某些条件满足,如一个程序sleep了,此时就是等待状态。
print('start') # 程序开始,处于执行状态

input('') # 程序暂停,处于等待状态

time.sleep(1) # 程序暂停,处于等待状态

print('end') # 程序继续执行,处于执行状态

进程语法结构

multiprocessing模块提供了Process类来代表一个进程对象。

  1. process类参数
    参数:
  • target:进程函数
  • args:传递给进程函数的参数,类型为元组
  • kwargs:传递给进程函数的参数,类型为字典
  1. process类方法
  • start():启动进程,调用进程函数
  • join(timeout=None):阻塞主进程,直到子进程执行完毕,timeout为等待时间,单位为秒
  • is_alive():判断进程是否在执行
  • terminate():强制终止进程
  • close():关闭进程
  1. 常用属性
    name:当前进程名,默认为Process-1,Process-2,Process-3…
    pid:当前进程id
import multiprocessing
import time
import os

def sing():
    # os.getpid()获取当前进程id
    print(f"sing子进程id:{os.getpid()}")
    # os.getppid()获取当前进程的父进程id
    print(f"sing子进程的父进程id:{os.getppid()}")
    print('sing')

def dance():
    # os.getpid()获取当前进程id
    print(f"dance子进程id:{os.getpid()}")
    # os.getppid()获取当前进程的父进程id
    print(f"sing子进程的父进程id:{os.getppid()}")
    print('dance')


if __name__ == '__main__':
    p1 = Process(target=sing, name="子进程1")
    p2 = Process(target=dance, name="子进程2")
    # 开启进程
    p1.start()
    p2.start()
    
    # 访问进程属性
    print(p1.name)
    print(p2.name)
    print(p1.pid)
    print(p2.pid)

    # 阻塞主进程,直到子进程执行完毕
    p1.join()
    p2.join()

    # 判断进程是否在执行
    print(p1.is_alive())
    print(p2.is_alive())

    # 强制终止进程
    p1.terminate()
    p2.terminate()

    # 关闭进程
    p1.close()
    p2.close()
    print('end')
    print(f"主进程id:{os.getpid()},主进程的父进程id:{os.getppid()}")

注意: cmd 查看进程:tasklist 可以查看电脑所有进程

传参

import multiprocessing
import time
import os

def sing(name):
    print(f"sing子进程id:{os.getpid()}")
    print(f"sing子进程的父进程id:{os.getppid()}")
    print(f"{name}在唱歌")

def dance(name):
    print(f"dance子进程id:{os.getpid()}")
    print(f"sing子进程的父进程id:{os.getppid()}")
    print(f"{name}在跳舞")

if __name__ == '__main__':
    p1 = multiprocessing.Process(target=sing, args=("张三",)) # 传递参数
    p2 = multiprocessing.Process(target=dance, kwargs={"name": "李四"}) # 传递参数
    # 开启进程
    p1.start()
    p1.join()
    p2.start()
    p2.join()
    print("p1存活状态:", p1.is_alive())
    print("p2存活状态:", p2.is_alive())

进程间不共享全局变量

import multiprocessing
import time
import os

li = []

def wdata():
    for i in range(10):
        li.append(i)
        time.sleep(0.1)

def rdata():
    print(f"读取数据{li}")

# 1. 防止别人导入文件时执行main里面的方法
# 2. 防止windows系统递归创建子进程
if __name__ == '__main__':
    p1 = multiprocessing.Process(target=wdata)
    p2 = multiprocessing.Process(target=rdata)
    # 开启进程
    p1.start()
    p1.join()
    p2.start()

进程间通信

进程间通信(Inter-Process Communication,IPC)是指进程之间进行数据交换的机制。

Queue队列

Queue是Python中的一种数据结构,它是一种先进先出(FIFO)的数据结构,可以用来实现进程间的通信。
方法:

  • Queue(maxsize=0):创建一个队列对象,maxsize为队列大小,0表示队列大小无限制
  • put(item, block=True, timeout=None):将item放入队列,block为是否阻塞,timeout为等待时间,单位为秒
  • get(block=True, timeout=None):从队列中取出一个元素,block为是否阻塞,timeout为等待时间,单位为秒
  • empty():判断队列是否为空
  • qsize():返回队列中元素个数
  • full():判断队列是否已满

初始化一个队列

from multiprocessing import Queue

q = Queue(3)

向队列中添加元素

from multiprocessing import Queue

q = Queue(3)
q.put(1)
q.put(2)
q.put(3)

从队列中取出元素

from multiprocessing import Queue

q = Queue(3)
q.put(1)

q.get() # 获取队列中的元素,然后删除该元素

判断队列是否为空

from multiprocessing import Queue

q = Queue(3)
q.put(1)
q.put(2)
q.put(3)

q.empty() # 判断队列是否为空,为空返回True,否则返回False

判断队列是否已满

from multiprocessing import Queue

q = Queue(3)
q.put(1)
q.put(2)
q.put(3)

q.full() # 判断队列是否已满,已满返回True,否则返回False

获取队列中元素个数

from multiprocessing import Queue

q = Queue(3)
q.put(1)
q.put(2)
q.put(3)

q.qsize() # 获取队列中元素个数

阻塞和非阻塞

from multiprocessing import Queue

q = Queue(3)
q.put(1)
q.put(2)
q.put(3)

q.get(block=True, timeout=3) # 阻塞,等待3秒,如果3秒后队列中还有元素,则取出元素,否则抛出异常
q.get(block=False) # 非阻塞,如果队列中有元素,则取出元素,否则抛出异常

进程间通信

from multiprocessing import Process, Queue
import time

li = ["张三", "李四", "王五", "赵六"]
def wdata(q):
    for i in li:
        print(f"{i}已经被放入")
        q.put(i)
        time.sleep(0.1)
    print("写入数据是:",li)
# 读数据
def rdata(q):
    while True:
        if q.empty():
            break
        else:
            print("取数据:",q.get())
    print("读取数据是:",li)

if __name__ == '__main__':
    # 创建队列
    q = Queue()
    p1 = Process(target=wdata, args=(q,))
    p2 = Process(target=rdata, args=(q,))
    p1.start()
    p1.join()
    p2.start()

多线程

多线程实现

  1. 直接使用threading模块

参数:

  • target:线程函数
  • args:传递给线程函数的参数,类型为元组
  • kwargs:传递给线程函数的参数,类型为字典
import threading
import time

def run(n):
    print('task', n)
    time.sleep(1)
    print('task done', n)


if __name__ == '__main__':
    t1 = threading.Thread(target=run, args=('t1',))
    t2 = threading.Thread(target=run, args=('t2',))
    # 守护线程,必须在线程启动前设置,否则无效
    t1.setDaemon(True)
    t2.setDaemon(True)
    t1.start()
    t2.start()
    # 阻塞子线程,必须在线程启动后设置,否则无效
    t1.join()
    t2.join()
    # 获取线程名字
    print(t1.getName())
    print(t2.getName())
    # 更改线程名字
    t1.setName('线程1')
    t2.setName('线程2')
    print(t1.getName())
    print(t2.getName())
    print('end')

线程之间共享资源(全局变量)

import threading
import time

num = 0

def run(n):
    global num
    num += 1
    print('task', n)
    time.sleep(1)
    print('task done', n)

if __name__ == '__main__':
    t1 = threading.Thread(target=run, args=('t1',))
    t2 = threading.Thread(target=run, args=('t2',))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print('num:', num)

资源竞争

import threading
import time

a = 0
b = 1000000
def add1():
    for i in range(b):
        global a
        a += 1
    print('add1:', a)

def add2():
    for i in range(b):
        global a
        a += 1
    print('add2:', a)

if __name__ == '__main__':
    t1 = threading.Thread(target=add1)
    t2 = threading.Thread(target=add2)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print('a:', a)

线程同步

两种方式:

  1. join 阻塞
import threading
import time

a = 0
b = 1000000
def add1():
    for i in range(b):
        global a
        a += 1
    print('add1:', a)

def add2():
    for i in range(b):
        global a
        a += 1
    print('add2:', a)


if __name__ == '__main__':
    t1 = threading.Thread(target=add1)
    t2 = threading.Thread(target=add2)
    t1.start()
    t1.join() # 阻塞t1线程,t1执行完后再执行t2
    t2.start()
    t2.join()
    print('a:', a)
  1. 互斥锁
    概念:互斥锁为资源引入了一个状态:锁定/非锁定。某个线程要更改共享数据时,先要将该资源锁定,此时其他线程不能更改;直到该线程释放资源,其他线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。
  • acquire():获取锁,如果锁已经被其他线程获取,那么调用该方法的线程会被阻塞,直到锁被释放为止
  • release():释放锁
import threading
import time
from threading import Lock
a = 0
b = 1000000
lock = Lock()

def add1():
    lock.acquire() # 加锁
    for i in range(b):
        
        global a
        a += 1
        
    print('add1:', a)
    lock.release() # 解锁
def add2():
    lock.acquire()
    for i in range(b):
        
        global a
        a += 1
        
    print('add2:', a)
    lock.release()

if __name__ == '__main__':
    t1 = threading.Thread(target=add1)
    t2 = threading.Thread(target=add2)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print('a:', a)

注意:锁的粒度越细,性能越好,但锁的个数越多,性能越差。

协程

协程,又称微线程,纤程。英文名Coroutine。协程是一种用户态的轻量级线程。

协程拥有自己的CPU寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此,协程能保留上一次调用时的状态,每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处的逻辑流中。

协程的调用比线程的调用节省资源,因为协程不需要操作系统来进行调度,而是由程序员自己来控制协程的切换。

协程实现

使用yield关键字

yield关键字可以将一个函数变成一个生成器,生成器是一种特殊的迭代器,可以通过next()函数来获取下一个值。
应用场景:

  • 如果一个线程里面IO操作比较多时,可以用协程
  • 适合高并发处理

使用:


def task1():
    yield 'a'
    yield '哈哈'
def task2():
    yield 'b'
    yield '嘿嘿'

if __name__ == '__main__':
    t1 = task1()
    t2 = task2()
    print(next(t1))
    print(next(t2))
    print(next(t1))
    print(next(t2))

greenlet模块

greenlet模块是一个第三方模块,用于实现协程。
注意:greenlet只能手动切换,当遇到IO操作时,程序会阻塞,需要手动切换到其他协程。

格式:

  • a = greenlet(函数名) 创建一个协程对象
  • xxx.switch() 切换到下一个协程xxx

使用:

from greenlet import greenlet

def sing():
    print("singing")
    print("singing end")

def dance():
    print("dancing")
    print("dancing end")

if __name__ == '__main__':
    g1 = greenlet(sing)
    g2 = greenlet(dance)
    # 切换到g1
    g1.switch()
    # 切换到g2
    g2.switch()

gevent模块

gevent模块是一个第三方模块,用于实现协程。
注意:gevent可以自动切换,当遇到IO操作时,程序会自动切换到其他协程。
格式:

  • gevent.spawn(函数名) 创建一个协程对象
  • gevent.sleep() 耗时操作
  • gevent.join() 阻塞,等待某个协程执行完毕
  • gevent.joinall() 等待所有协程执行完毕,再退出,参数为协程列表

使用:

import gevent

def sing():
    print("在唱歌")
    gevent.sleep(2)
    print("唱歌结束")

def dance():
    print("在跳舞")
    gevent.sleep(3)
    print("跳舞结束")

if __name__ == '__main__':
    # 创建协程对象
    g1 = gevent.spawn(sing)
    g2 = gevent.spawn(dance)
    # 阻塞,等待某个协程执行完毕
    g1.join()
    g2.join()

gevent.joinall()

import gevent

def sing(name):
    for i in range(3):
        gevent.sleep(1)
        print(f'{name}在唱歌,第{i}次')
if __name__ == '__main__':
    # 创建协程对象
    g1 = gevent.spawn(sing, '张三')
    g2 = gevent.spawn(sing, '李四')
    # 阻塞,等待所有协程执行完毕
    gevent.joinall([g1, g2])

monket补丁

  • 作用:拥有在模块运行时替换的功能
  • 注意:monkey.patch_all() 必须放在被打补丁的模块之前,否则无效

使用:

import gevent
import time
from gevent import monkey
monkey.patch_all() # 将所有的IO操作都变成gevent的协程,即将time.sleep()替换程gevent里面实现耗时操作的gevent.sleep()代码

def sing(name):
    for i in range(3):
        time.sleep(1)
        print(f'{name}在唱歌,第{i}次')
if __name__ == '__main__':
    # 创建协程对象
    g1 = gevent.spawn(sing, '张三')
    g2 = gevent.spawn(sing, '李四')
    # 阻塞,等待所有协程执行完毕
    gevent.joinall([g1, g2])

总结:

  1. 线程是CPU调度的基本单位,进程是资源分配的基本单位
  2. 进程、线程和协程的区别
  • 进程:切换需要资源最大,效率最低
  • 线程:切换需要资源中等,效率中等
  • 协程:切换需要资源最小,效率最高
  • 进程和线程是操作系统调度的,协程是程序员调度的
  1. 多线程适合IO密集型操作,多进程适合CPU密集型操作,协程适合IO密集型操作
  2. 进程、线程和协程都可以完成多任务,根据自己实际开发需要选择使用。