1 python多进程
1.1 基本概念
Python中的多进程是通过
multiprocessing
包来实现的,和多线程的threading.Thread差不多,它可以利用multiprocessing.Process对象来创建一个进程对象这个进程对象的方法和线程对象的方法差不多也有start(),run(),join()等方法,其中有一个方法不同Thread线程对象中的守护线程方法是setDeamon,而Process进程对象的守护进程是通过设置daemon属性来完成的
与多线程的共享式内存不同,由于各个进程都是相互独立的,因此进程间通信再多进程中扮演这非常重要的角色,Python中我们可以使用multiprocessing模块中的pipe
、queue
、Array
、Value
等等工具来实现进程间通讯和数据共享,但是在编写起来仍然具有很大的不灵活性
1.2 任务类型
同步与异步
- 同步就是指一个进程在执行某个请求的时候,若该请求需要一段时间才能返回信息 那么这个进程将会一直等待下去,直到收到返回信息才继续执行下去
- 异步是指进程不需要一直等下去,而是继续执行下面的操作,不管其他进程的状态 当有消息返回时系统会通知进程进行处理,这样可以提高执行的效率
IO密集和计算密集
- 对于IO密集型任务: python的多线程能够节省时间
- 对于计算(CPU)密集型任务: Python的多线程并没有用处,建议使用多进程
其他组合搭配
python使用多核,即开多个进程
方法一: 协程+多进程,使用方法简单,效率还可以,一般使用该方法
协程yield是你自己写的,是自己定义什么时候切换进程
方法二:IO多路复用,使用复杂,但效率很高,不常用
多进程相关模块
# 创建管理进程模块:
Process(用于创建进程):通过创建一个Process对象然后调用它的start()方法来生成进程。Process遵循threading.Thread的API。
Pool(用于创建进程管理池):可以创建一个进程池,该进程将执行与Pool该类一起提交给它的任务,当子进程较多需要管理时使用。
Queue(用于进程通信,资源共享):进程间通信,保证进程安全。
Value,Array(用于进程通信,资源共享):
Pipe(用于管道通信):管道操作。
Manager(用于资源共享):创建进程间共享的数据,包括在不同机器上运行的进程之间的网络共享。
# 同步子进程模块:
Condition
Event:用来实现进程间同步通信。
Lock:当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突。
RLock
Semaphore:用来控制对共享资源的访问数量,例如池的最大连接数。
python多线程低效原因
GIL的全称是 Global Interpreter Lock(全局解释器锁),来源是 Python 设计之初的考虑,为了数据安全所做的决定
某个线程想要执行,必须先拿到 GIL,我们可以把 GIL 看作是“通行证”,并且在一个 Python 进程中,GIL 只有一个
拿不到通行证的线程,就不允许进入 CPU 执行
目前 Python 的解释器有多种,例如:
- CPython:CPython 是用C语言实现的 Python 解释器,作为官方实现,它是最广泛使用的 Python 解释器
- PyPy:PyPy 是用RPython实现的解释器。RPython 是 Python 的子集, 具有静态类型。这个解释器的特点是即时编译,支持多重后端(C, CLI, JVM)。PyPy 旨在提高性能,同时保持最大兼容性(参考 CPython 的实现)
- Jython:Jython 是一个将 Python 代码编译成 Java 字节码的实现,运行在JVM(Java Virtual Machine)上。另外,它可以像是用 Python 模块一样,导入 并使用任何Java类
- IronPython:IronPython 是一个针对 .NET 框架的 Python 实现。它 可以用 Python 和 .NET framewor k的库,也能将 Python 代码暴露给 .NET 框架中的其他语言
GIL 只在 CPython 中才有,而在 PyPy 和 Jython 中是没有 GIL 的
注意: 每次释放GIL锁,线程进行锁竞争、切换线程,会消耗资源
这就导致打印线程执行时长,会发现耗时更长的原因
并且由于 GIL 锁存在,Python 里一个进程永远只能同时执行一个线程(拿到 GIL 的线程才能执行),这就是为什么在多核CPU上,Python 的多线程效率并不高的根本原因
2 多进程实现方式
2.1 Process
普通Process
from multiprocessing import Process
def func(name):
print('测试%s多进程' %name)
if __name__ == '__main__':
process_list = []
for i in range(5): #开启5个子进程执行fun1函数
p = Process(target=func, args=('Python',)) #实例化进程对象
p.start()
process_list.append(p)
for i in process_list:
p.join()
print('结束测试')
测试Python多进程
测试Python多进程
测试Python多进程
测试Python多进程
测试Python多进程
结束测试
Process finished with exit code 0
上面的代码开启了5个子进程去执行函数,我们可以观察结果,是同时打印的,这里实现了真正的并行操作,就是多个CPU同时执行任务。
我们知道进程是python中最小的资源分配单元,也就是进程中间的数据,内存是不共享的,每启动一个进程,都要独立分配资源和拷贝访问的数据,所以进程的启动和销毁的代价是比较大了,所以在实际中使用多进程,要根据服务器的配置来设定。
继承Process
from multiprocessing import Process
class MyProcess(Process): #继承Process类
def __init__(self,name):
super(MyProcess,self).__init__()
self.name = name
def run(self):
print('测试%s多进程' % self.name)
if __name__ == '__main__':
process_list = []
for i in range(5): #开启5个子进程执行fun1函数
p = MyProcess('Python') #实例化进程对象
p.start()
process_list.append(p)
for i in process_list:
p.join()
print('结束测试')
测试Python多进程
测试Python多进程
测试Python多进程
测试Python多进程
测试Python多进程
结束测试
Process finished with exit code 0
通过类继承的方法来实现的,python多进程的第二种实现方式也是一样的,效果和第一种方式一样
Process类的其他方法
构造方法:
Process([group [, target [, name [, args [, kwargs]]]]])
group: 线程组
target: 要执行的方法
name: 进程名
args/kwargs: 要传入方法的参数
实例方法:
is_alive():返回进程是否在运行,bool类型。
join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。
start():进程准备就绪,等待CPU调度
run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。
terminate():不管任务是否完成,立即停止工作进程
属性:
daemon:和线程的setDeamon功能一样
name:进程名字
pid:进程号
2.2 进程池
# apply_async:异步
from multiprocessing import Pool,cpu_count
import os, time, random
def fun1(name):
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))
return f'{name}: {os.getpid()}'
if __name__=='__main__':
results = []
pool = Pool(cpu_count()-1)
for i in range(4):
results.append(pool.apply_async(func=fun1, args=(i,)))
pool.close()
pool.join()
print()
for result in results:
print(result.get())
print('All Done!!!')
print('结束测试')
Run task 0 (30716)...
Run task 1 (15020)...
Run task 2 (23200)...
Run task 3 (5884)...
Task 0 runs 1.34 seconds.
Task 2 runs 1.53 seconds.
Task 1 runs 1.88 seconds.
Task 3 runs 2.48 seconds.
0: 30716
1: 15020
2: 23200
3: 5884
All Done!!!
结束测试
Process finished with exit code 0
对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了
# map_async:异步
from multiprocessing import Pool, cpu_count, Manager
from functools import partial
def job(data, mgrDicTask, lock):
res = f'a+b = {data[0] + data[1]}'
lock.acquire()
# Manager对象无法监测到它引用的可变对象值的修改,需要通过触发__setitem__方法来让它获得通知
tempDic = list(mgrDicTask['result'])
tempDic.append(res)
mgrDicTask['result'] = tempDic
lock.release()
return res
if __name__ == "__main__":
data = [[2, 3], [3, 4], [2, 5]]
pool = Pool(processes=cpu_count() - 1)
mgr = Manager()
lock = mgr.Lock()
mgrDicTask = mgr.dict()
mgrDicTask['result'] = []
fun = partial(job, mgrDicTask=mgrDicTask, lock=lock)
pool.map_async(fun, data)
pool.close()
pool.join()
print(mgrDicTask['result'])
print('All Done!!!')
['a+b = 7', 'a+b = 7', 'a+b = 5']
All Done!!!
Process finished with exit code 0
2.3 其他进程池
# 进程池的另外一种创建方式,跟线程池的创建方式一样。其方法等也相同。
def process_pool_test(url_list):
book_list = []
# 创建进程池
pool = ProcessPoolExecutor(max_workers=20)
start = time.time()
for url in url_list:
time.sleep(0.5)
result = pool.submit(get_book_info, url)
book_list.append(result)
pool.shutdown()
print('time: ', time.time() - start)
book_name_list = []
author_list = []
author_info_list = []
print('book_list: ', len(book_list))
for future in book_list:
book_name_list.extend(future.result()['name'])
author_list.extend(future.result()['author'])
author_info_list.extend(future.result()['info'])
ExcelUtils.write_data_to_excel('bookInfo', book_name_list, author_list, author_info_list)
if __name__ == '__main__':
sys.setrecursionlimit(10000)
url_list = ['https://www.edge.org/library']
for i in range(1, 52):
url_list.append('https://www.edge.org/library?page=%s' % i)
thread_pool_test(url_list)
3 多进程通信
进程是系统独立调度核分配系统资源(CPU、内存)的基本单位,进程之间是相互独立的,每启动一个新的进程相当于把数据进行了一次克隆,子进程里的数据修改无法影响到主进程中的数据,不同子进程之间的数据也不能共享,这是多进程在使用中与多线程最明显的区别
但是难道Python多进程中间难道就是孤立的吗?
当然不是,python也提供了多种方法实现了多进程中间的通信和数据共享(可以修改一份数据)
3.1 进程队列Queue
#!/usr/bin/env Python
# -- coding: utf-8 --
"""
@version: v1.0
@author: narutohyc
@file: multiprocessing_queue.py
@Description: 多进程队列使用示例
@time: 2020/5/14 15:53
"""
from multiprocessing import Process, Queue, Manager
from multiprocessing import cpu_count
import os
import time
class Task:
def __init__(self, task_name: str, data: list, **kwargs):
self.task_name = task_name
self.data = data
def __repr__(self):
return f'task_name:{self.task_name} data:{self.data}'
class MultiProcessingQueue:
def __init__(self):
# 进程数
self.num_of_worker = cpu_count()
# 进程队列大小,根据不同的任务需求
self.size_of_queue = 10
def start_work(self):
print("start_work 开始")
# 进程队列
process_list = []
# 新建一个大小为10的队列
work_queue = Queue(self.size_of_queue)
# 进程间共享列表, 其他的还有共享字典等,都是进程安全的
dealed_sample_lst = Manager().list()
# 一个生产者
sent = Process(target=self.productor, args=(work_queue, dealed_sample_lst,))
sent.start()
process_list.append(sent)
# 多个消费者
for _ in range(self.num_of_worker - 1):
process = Process(target=self.consumer, args=(work_queue, dealed_sample_lst,))
process.start()
process_list.append(process)
[process.join() for process in process_list]
print("start_work 结束")
return dealed_sample_lst
def productor(self, work_queue: Queue, dealed_sample_lst):
print("生产者开始工作")
for ii in range(100):
work_queue.put(Task(task_name=f'{str(os.getpid())}-{str(ii)}', data=[ii for _ in range(2)]))
if ii % 30 == 0:
time.sleep(1)
print("生产者休息ing")
'''
JoinableQueue 比Queue多了task_done() 与join()两个函数,多用于生产者消费者问题。
task_done()是用在get()后,发送通知说我get完了
join()是说Queue里所有的task都已处理。
'''
# 这里需要加入结束标识,还有就是JoinableQueue的方式
for _ in range(self.num_of_worker - 1):
work_queue.put(None)
print("生产者工作结束")
def consumer(self, work_queue: Queue, dealed_sample_lst):
while True:
task: Task = work_queue.get()
if task is None:
break
# 处理数据
task.data = [ii * 2 for ii in task.data]
dealed_sample_lst.append(task)
print(task)
print(f'进程{os.getpid()} 处理结束')
def multiprocessing_queue_test():
multiprocessing_queue = MultiProcessingQueue()
dealed_sample_lst = multiprocessing_queue.start_work()
# for sample in dealed_sample_lst:
# print(sample)
print("测试结束")
if __name__ == '__main__':
multiprocessing_queue_test()
start_work 开始
生产者开始工作
task_name:28868-0 data:[0, 0]
生产者休息ing
task_name:28868-1 data:[2, 2]
...
task_name:28868-6 data:[12, 12]
生产者休息ing
task_name:28868-31 data:[62, 62]
...
task_name:28868-58 data:[116, 116]
生产者休息ing
task_name:28868-61 data:[122, 122]
...
task_name:28868-64 data:[128, 128]
...
生产者休息ing
task_name:28868-91 data:[182, 182]
...
task_name:28868-96 data:[192, 192]
生产者工作结束
进程29208 处理结束
task_name:28868-97 data:[194, 194]
进程20632 处理结束
进程28496 处理结束
task_name:28868-98 data:[196, 196]
进程30200 处理结束
进程26512 处理结束
进程29776 处理结束
task_name:28868-99 data:[198, 198]
进程30072 处理结束
start_work 结束
测试结束
上面的代码结果可以看到我们主进程中可以通过Queue获取子进程中put的数据,实现进程间的通信
3.2 JoinableQueue队列
JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理
通知进程是使用共享的信号和条件变量来实现的
- 参数介绍:
- maxsize: 是队列中允许最大项数,省略则无大小限制
- 方法介绍:
- q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理 如果调用此方法的次数大于从队列中删除项目的数量将引发ValueError异常
- q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理 阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
示例代码
#!/usr/bin/env Python
# -- coding: utf-8 --
"""
@version: v1.0
@author: narutohyc
@file: multiprocessing_queue.py
@Description: 多进程队列使用示例
@time: 2020/5/14 15:53
"""
from multiprocessing import Process, Queue, JoinableQueue, Manager
from multiprocessing import cpu_count
import os, time
class Task:
def __init__(self, task_name: str, data: list, **kwargs):
self.task_name = task_name
self.data = data
def __repr__(self):
return f'task_name:{self.task_name} data:{self.data}'
class MultiProcessingJoinableQueue:
def __init__(self):
# 进程数
self.num_of_worker = cpu_count()
# 进程队列大小,根据不同的任务需求
self.size_of_queue = 10
def start_work(self):
print("start_work 开始")
# 进程队列
process_list = []
# 新建一个大小为10的队列
work_queue = JoinableQueue(self.size_of_queue)
# 进程间共享列表, 其他的还有共享字典等,都是进程安全的
dealed_sample_lst = Manager().list()
# 一个生产者
sent = Process(target=self.productor, args=(work_queue, dealed_sample_lst,))
process_list.append(sent)
# 多个消费者
for _ in range(self.num_of_worker - 1):
process = Process(target=self.consumer, args=(work_queue, dealed_sample_lst,))
process.daemon = True
process_list.append(process)
[process.start() for process in process_list]
# 这里需要注意的一点是,这里join只需要调用生产者(别调消费者的join,否则无法正常退出)
# 消费者不需要,个人感觉应该是生产者那边已经调用了work_queue.join()的方法
# 消费者结束后,整个程序退出
[process.join() for process in process_list[:1]]
print("start_work 结束")
return dealed_sample_lst
def productor(self, work_queue: Queue, dealed_sample_lst):
print("生产者开始工作")
for ii in range(100):
work_queue.put(Task(task_name=f'{str(os.getpid())}-{str(ii)}', data=[ii for _ in range(2)]))
if ii % 30 == 0:
time.sleep(1)
print("生产者休息ing")
print("生产者工作结束")
work_queue.join()
def consumer(self, work_queue: Queue, dealed_sample_lst):
while True:
task: Task = work_queue.get()
if task is None:
break
# 处理数据
task.data = [ii * 2 for ii in task.data]
dealed_sample_lst.append(task)
print(task)
work_queue.task_done()
print(f'进程{os.getpid()} 处理结束')
def multiprocessing_joinablequeue_test():
multiprocessing_joinablequeue = MultiProcessingJoinableQueue()
dealed_sample_lst = multiprocessing_joinablequeue.start_work()
# for sample in dealed_sample_lst:
# print(sample)
print("测试结束")
if __name__ == '__main__':
multiprocessing_joinablequeue_test()
结果输出
start_work 开始
生产者开始工作
task_name:14608-0 data:[0, 0]
生产者休息ing
task_name:14608-1 data:[2, 2]
...
task_name:14608-7 data:[14, 14]
生产者休息ing
task_name:14608-31 data:[62, 62]
...
task_name:14608-60 data:[120, 120]
生产者休息ing
task_name:14608-61 data:[122, 122
...
task_name:14608-90 data:[180, 180]
生产者休息ing
生产者工作结束
task_name:14608-91 data:[182, 182]
...
task_name:14608-93 data:[186, 186]
start_work 结束
测试结束
3.3 管道Pipe
Pipe的本质是进程之间的用管道数据传递,而不是数据共享,这和socket有点像
pipe()返回两个连接对象分别表示管道的两端,每端都有send()和recv()函数
如果两个进程试图在同一时间的同一端进行读取和写入那么,这可能会损坏管道中的数据
管道是数据不安全的,多个进程同时收发数据可道引起数据异常,这时候就应该配合锁使用from multiprocessing import Process, Pipe
def fun1(conn):
print('子进程发送消息:')
conn.send('你好主进程')
print('子进程接受消息:')
print(conn.recv())
conn.close()
if __name__ == '__main__':
conn1, conn2 = Pipe() #关键点,pipe实例化生成一个双向管
p = Process(target=fun1, args=(conn2,)) #conn2传给子进程
p.start()
print('主进程接受消息:')
print(conn1.recv())
print('主进程发送消息:')
conn1.send("你好子进程")
p.join()
print('结束测试')
主进程接受消息:
子进程发送消息:
子进程接受消息:
你好主进程
主进程发送消息:
你好子进程
结束测试
Process finished with exit code 0
上面可以看到主进程和子进程可以相互发送消息
3.4 Managers
Queue和Pipe只是实现了数据交互,并没实现数据共享,即一个进程去更改另一个进程的数据,那么就要用到Managers
from multiprocessing import Process, Manager
def fun1(dic,lis,index):
dic[index] = 'a'
dic['2'] = 'b'
lis.append(index) #[0,1,2,3,4,0,1,2,3,4,5,6,7,8,9]
#print(l)
if __name__ == '__main__':
with Manager() as manager:
dic = manager.dict()#注意字典的声明方式,不能直接通过{}来定义
l = manager.list(range(5))#[0,1,2,3,4]
process_list = []
for i in range(10):
p = Process(target=fun1, args=(dic,l,i))
p.start()
process_list.append(p)
for res in process_list:
res.join()
print(dic)
print(l)
{0: 'a', '2': 'b', 3: 'a', 1: 'a', 2: 'a', 4: 'a', 5: 'a', 7: 'a', 6: 'a', 8: 'a', 9: 'a'}
[0, 1, 2, 3, 4, 0, 3, 1, 2, 4, 5, 7, 6, 8, 9]
可以看到主进程定义了一个字典和一个列表,在子进程中,可以添加和修改字典的内容 在列表中插入新的数据,实现进程间的数据共享,即可以共同修改同一份数据
4 注意事项
4.1 无法调用多层生成器(待验证)
#!/usr/bin/env Python
# -- coding: utf-8 --
"""
@version: v0.1
@author: narutohyc
@file: text.py
@Description:
@time: 2020/5/29 19:51
"""
from multiprocessing import Process, Queue, JoinableQueue, Manager
from multiprocessing import cpu_count
import os, time
from abc import (ABC,
abstractmethod,
ABCMeta)
from comm.logger.logger_config import logger
class SampleIterator(ABC, metaclass=ABCMeta):
def __init__(self):
pass
@abstractmethod
def __iter__(self):
'''
样本处理并返回
'''
pass
class DataSource1(SampleIterator):
def __init__(self):
super(DataSource1, self).__init__()
def __iter__(self):
for ii in range(10):
yield ii
class DataSource2(SampleIterator):
def __init__(self, data_source):
super(DataSource2, self).__init__()
self.data_source = data_source
def __iter__(self):
for ii in self.data_source:
yield ii
class DataSource3(SampleIterator):
def __init__(self, data_source):
super(DataSource3, self).__init__()
self.data_source = data_source
def __iter__(self):
for ii in self.data_source:
yield ii
class DataSource4(SampleIterator):
def __init__(self, data_source):
super(DataSource4, self).__init__()
self.data_source = data_source
def __iter__(self):
for ii in self.data_source:
yield ii
class HUCY():
def __init__(self, data_source=None):
self.num_of_worker = cpu_count()
self.size_of_queue = 2
self.data_source = data_source
def start_work(self):
# 进程队列
process_list = []
# 新建一个大小为10的队列
work_queue = Queue(self.size_of_queue)
# 一个生产者
produce_num = 1
for _ in range(produce_num):
sent = Process(target=self.productor, args=(work_queue,))
sent.start()
process_list.append(sent)
# 多个消费者
for _ in range(self.num_of_worker - produce_num):
process = Process(target=self.consumer, args=(work_queue,))
process.start()
process_list.append(process)
# 这里需要加入结束标识,还有就是JoinableQueue的方式
[process.join() for process in process_list[:produce_num]]
for _ in range(self.num_of_worker - produce_num):
work_queue.put(None)
[process.join() for process in process_list[produce_num:]]
print("start_work 结束")
def productor(self, work_queue):
[work_queue.put(ii) for ii in self.data_source]
logger.info("生产者结束")
def consumer(self, work_queue):
while True:
data = work_queue.get()
if data is None:
break
logger.info(f"数据: {data}")
logger.info("消费者结束")
def hyc_test():
da1=DataSource2(DataSource3(DataSource4(DataSource1())))
da2 = DataSource2(DataSource3(DataSource4(da1)))
da3 = DataSource2(DataSource3(DataSource4(da2)))
da4 = DataSource2(DataSource3(DataSource4(da3)))
da5 = DataSource2(DataSource3(DataSource4(da4)))
da6 = DataSource2(DataSource3(DataSource4(DataSource2(da5))))
hucy = HUCY(da6)
hucy.start_work()
if __name__ == '__main__':
hyc_test()
几个问题:
以上代码 若有多个生产者 就会各自拥有自己的数据生成器,导致数据重复
有些定义的方法 好像会使程序卡住