1 python多进程

1.1 基本概念

Python中的多进程是通过multiprocessing包来实现的,和多线程的threading.Thread差不多,它可以利用multiprocessing.Process对象来创建一个进程对象

这个进程对象的方法和线程对象的方法差不多也有start(),run(),join()等方法,其中有一个方法不同Thread线程对象中的守护线程方法是setDeamon,而Process进程对象的守护进程是通过设置daemon属性来完成的

与多线程的共享式内存不同,由于各个进程都是相互独立的,因此进程间通信再多进程中扮演这非常重要的角色,Python中我们可以使用multiprocessing模块中的pipequeueArrayValue等等工具来实现进程间通讯和数据共享,但是在编写起来仍然具有很大的不灵活性

img

1.2 任务类型

同步与异步

  • 同步就是指一个进程在执行某个请求的时候,若该请求需要一段时间才能返回信息 那么这个进程将会一直等待下去,直到收到返回信息才继续执行下去
  • 异步是指进程不需要一直等下去,而是继续执行下面的操作,不管其他进程的状态 当有消息返回时系统会通知进程进行处理,这样可以提高执行的效率

IO密集和计算密集

  • 对于IO密集型任务: python的多线程能够节省时间
  • 对于计算(CPU)密集型任务: Python的多线程并没有用处,建议使用多进程

其他组合搭配

python使用多核,即开多个进程

  • 方法一: 协程+多进程,使用方法简单,效率还可以,一般使用该方法

    协程yield是你自己写的,是自己定义什么时候切换进程

  • 方法二:IO多路复用,使用复杂,但效率很高,不常用

多进程相关模块

# 创建管理进程模块:
Process(用于创建进程):通过创建一个Process对象然后调用它的start()方法来生成进程。Process遵循threading.Thread的API。
Pool(用于创建进程管理池):可以创建一个进程池,该进程将执行与Pool该类一起提交给它的任务,当子进程较多需要管理时使用。
Queue(用于进程通信,资源共享):进程间通信,保证进程安全。
Value,Array(用于进程通信,资源共享):
Pipe(用于管道通信):管道操作。
Manager(用于资源共享):创建进程间共享的数据,包括在不同机器上运行的进程之间的网络共享。

# 同步子进程模块:
Condition
Event:用来实现进程间同步通信。
Lock:当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突。
RLock
Semaphore:用来控制对共享资源的访问数量,例如池的最大连接数。

python多线程低效原因

GIL的全称是 Global Interpreter Lock(全局解释器锁),来源是 Python 设计之初的考虑,为了数据安全所做的决定

某个线程想要执行,必须先拿到 GIL,我们可以把 GIL 看作是“通行证”,并且在一个 Python 进程中,GIL 只有一个

拿不到通行证的线程,就不允许进入 CPU 执行

目前 Python 的解释器有多种,例如:

  • CPython:CPython 是用C语言实现的 Python 解释器,作为官方实现,它是最广泛使用的 Python 解释器
  • PyPy:PyPy 是用RPython实现的解释器。RPython 是 Python 的子集, 具有静态类型。这个解释器的特点是即时编译,支持多重后端(C, CLI, JVM)。PyPy 旨在提高性能,同时保持最大兼容性(参考 CPython 的实现)
  • Jython:Jython 是一个将 Python 代码编译成 Java 字节码的实现,运行在JVM(Java Virtual Machine)上。另外,它可以像是用 Python 模块一样,导入 并使用任何Java类
  • IronPython:IronPython 是一个针对 .NET 框架的 Python 实现。它 可以用 Python 和 .NET framewor k的库,也能将 Python 代码暴露给 .NET 框架中的其他语言

GIL 只在 CPython 中才有,而在 PyPy 和 Jython 中是没有 GIL 的

注意: 每次释放GIL锁,线程进行锁竞争、切换线程,会消耗资源

这就导致打印线程执行时长,会发现耗时更长的原因

并且由于 GIL 锁存在,Python 里一个进程永远只能同时执行一个线程(拿到 GIL 的线程才能执行),这就是为什么在多核CPU上,Python 的多线程效率并不高的根本原因

2 多进程实现方式

2.1 Process

普通Process

from multiprocessing import  Process

def func(name):
    print('测试%s多进程' %name)

if __name__ == '__main__':
    process_list = []
    for i in range(5):  #开启5个子进程执行fun1函数
        p = Process(target=func, args=('Python',)) #实例化进程对象
        p.start()
        process_list.append(p)

    for i in process_list:
        p.join()

    print('结束测试')
测试Python多进程
测试Python多进程
测试Python多进程
测试Python多进程
测试Python多进程
结束测试

Process finished with exit code 0

上面的代码开启了5个子进程去执行函数,我们可以观察结果,是同时打印的,这里实现了真正的并行操作,就是多个CPU同时执行任务。

我们知道进程是python中最小的资源分配单元,也就是进程中间的数据,内存是不共享的,每启动一个进程,都要独立分配资源和拷贝访问的数据,所以进程的启动和销毁的代价是比较大了,所以在实际中使用多进程,要根据服务器的配置来设定。

继承Process

from multiprocessing import  Process

class MyProcess(Process): #继承Process类
    def __init__(self,name):
        super(MyProcess,self).__init__()
        self.name = name

    def run(self):
        print('测试%s多进程' % self.name)


if __name__ == '__main__':
    process_list = []
    for i in range(5):  #开启5个子进程执行fun1函数
        p = MyProcess('Python') #实例化进程对象
        p.start()
        process_list.append(p)

    for i in process_list:
        p.join()

    print('结束测试')
测试Python多进程
测试Python多进程
测试Python多进程
测试Python多进程
测试Python多进程
结束测试

Process finished with exit code 0

通过类继承的方法来实现的,python多进程的第二种实现方式也是一样的,效果和第一种方式一样

Process类的其他方法

构造方法:

Process([group [, target [, name [, args [, kwargs]]]]])
  group: 线程组 
  target: 要执行的方法
  name: 进程名
  args/kwargs: 要传入方法的参数

实例方法:
  is_alive():返回进程是否在运行,bool类型。
  join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。
  start():进程准备就绪,等待CPU调度
  run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。
  terminate():不管任务是否完成,立即停止工作进程

属性:
  daemon:和线程的setDeamon功能一样
  name:进程名字
  pid:进程号

2.2 进程池

# apply_async:异步
from  multiprocessing import Pool,cpu_count
import os, time, random

def fun1(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))
    return f'{name}: {os.getpid()}'

if __name__=='__main__':
    results = []
    pool = Pool(cpu_count()-1)

    for i in range(4):
        results.append(pool.apply_async(func=fun1, args=(i,)))

    pool.close()
    pool.join()
    print()
    for result in results:
        print(result.get())
    print('All Done!!!')

    print('结束测试')
Run task 0 (30716)...
Run task 1 (15020)...
Run task 2 (23200)...
Run task 3 (5884)...
Task 0 runs 1.34 seconds.
Task 2 runs 1.53 seconds.
Task 1 runs 1.88 seconds.
Task 3 runs 2.48 seconds.

0: 30716
1: 15020
2: 23200
3: 5884
All Done!!!
结束测试

Process finished with exit code 0

对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了

# map_async:异步
from multiprocessing import Pool, cpu_count, Manager
from functools import partial

def job(data, mgrDicTask, lock):
    res = f'a+b = {data[0] + data[1]}'
    lock.acquire()
    # Manager对象无法监测到它引用的可变对象值的修改,需要通过触发__setitem__方法来让它获得通知
    tempDic = list(mgrDicTask['result'])
    tempDic.append(res)
    mgrDicTask['result'] = tempDic
    lock.release()
    return res

if __name__ == "__main__":
    data = [[2, 3], [3, 4], [2, 5]]
    pool = Pool(processes=cpu_count() - 1)

    mgr = Manager()
    lock = mgr.Lock()
    mgrDicTask = mgr.dict()
    mgrDicTask['result'] = []

    fun = partial(job, mgrDicTask=mgrDicTask, lock=lock)
    pool.map_async(fun, data)
    pool.close()
    pool.join()

    print(mgrDicTask['result'])
    print('All Done!!!')
['a+b = 7', 'a+b = 7', 'a+b = 5']
All Done!!!

Process finished with exit code 0

2.3 其他进程池

# 进程池的另外一种创建方式,跟线程池的创建方式一样。其方法等也相同。
def process_pool_test(url_list):
    book_list = []
    # 创建进程池
    pool = ProcessPoolExecutor(max_workers=20)
    start = time.time()
    for url in url_list:
        time.sleep(0.5)
        result = pool.submit(get_book_info, url)
        book_list.append(result)

    pool.shutdown()
    print('time: ', time.time() - start)

    book_name_list = []
    author_list = []
    author_info_list = []
    print('book_list: ', len(book_list))
    for future in book_list:
        book_name_list.extend(future.result()['name'])
        author_list.extend(future.result()['author'])
        author_info_list.extend(future.result()['info'])
    ExcelUtils.write_data_to_excel('bookInfo', book_name_list, author_list, author_info_list)


if __name__ == '__main__':
    sys.setrecursionlimit(10000)
    url_list = ['https://www.edge.org/library']
    for i in range(1, 52):
        url_list.append('https://www.edge.org/library?page=%s' % i)

    thread_pool_test(url_list)

3 多进程通信

内容提取神器 beautiful Soup 的用法

进程是系统独立调度核分配系统资源(CPU、内存)的基本单位,进程之间是相互独立的,每启动一个新的进程相当于把数据进行了一次克隆,子进程里的数据修改无法影响到主进程中的数据,不同子进程之间的数据也不能共享,这是多进程在使用中与多线程最明显的区别

但是难道Python多进程中间难道就是孤立的吗?

当然不是,python也提供了多种方法实现了多进程中间的通信和数据共享(可以修改一份数据)

3.1 进程队列Queue

#!/usr/bin/env Python
# -- coding: utf-8 --

"""
@version: v1.0
@author: narutohyc
@file: multiprocessing_queue.py
@Description: 多进程队列使用示例
@time: 2020/5/14 15:53
"""

from multiprocessing import Process, Queue, Manager
from multiprocessing import cpu_count
import os
import time


class Task:
    def __init__(self, task_name: str, data: list, **kwargs):
        self.task_name = task_name
        self.data = data

    def __repr__(self):
        return f'task_name:{self.task_name} data:{self.data}'


class MultiProcessingQueue:
    def __init__(self):
        # 进程数
        self.num_of_worker = cpu_count()

        # 进程队列大小,根据不同的任务需求
        self.size_of_queue = 10

    def start_work(self):
        print("start_work 开始")

        # 进程队列
        process_list = []

        # 新建一个大小为10的队列
        work_queue = Queue(self.size_of_queue)

        # 进程间共享列表, 其他的还有共享字典等,都是进程安全的
        dealed_sample_lst = Manager().list()

        # 一个生产者
        sent = Process(target=self.productor, args=(work_queue, dealed_sample_lst,))
        sent.start()
        process_list.append(sent)

        # 多个消费者
        for _ in range(self.num_of_worker - 1):
            process = Process(target=self.consumer, args=(work_queue, dealed_sample_lst,))
            process.start()
            process_list.append(process)
        [process.join() for process in process_list]
        print("start_work 结束")
        return dealed_sample_lst

    def productor(self, work_queue: Queue, dealed_sample_lst):
        print("生产者开始工作")
        for ii in range(100):
            work_queue.put(Task(task_name=f'{str(os.getpid())}-{str(ii)}', data=[ii for _ in range(2)]))
            if ii % 30 == 0:
                time.sleep(1)
                print("生产者休息ing")

        '''
        JoinableQueue 比Queue多了task_done() 与join()两个函数,多用于生产者消费者问题。
        task_done()是用在get()后,发送通知说我get完了
        join()是说Queue里所有的task都已处理。
        '''
        # 这里需要加入结束标识,还有就是JoinableQueue的方式
        for _ in range(self.num_of_worker - 1):
            work_queue.put(None)
        print("生产者工作结束")

    def consumer(self, work_queue: Queue, dealed_sample_lst):
        while True:
            task: Task = work_queue.get()
            if task is None:
                break

            # 处理数据
            task.data = [ii * 2 for ii in task.data]
            dealed_sample_lst.append(task)
            print(task)

        print(f'进程{os.getpid()} 处理结束')


def multiprocessing_queue_test():
    multiprocessing_queue = MultiProcessingQueue()
    dealed_sample_lst = multiprocessing_queue.start_work()
    # for sample in dealed_sample_lst:
    #     print(sample)
    print("测试结束")


if __name__ == '__main__':
    multiprocessing_queue_test()
start_work 开始
生产者开始工作
task_name:28868-0 data:[0, 0]
生产者休息ing
task_name:28868-1 data:[2, 2]
...
task_name:28868-6 data:[12, 12]
生产者休息ing
task_name:28868-31 data:[62, 62]
...
task_name:28868-58 data:[116, 116]
生产者休息ing
task_name:28868-61 data:[122, 122]
...
task_name:28868-64 data:[128, 128]
...
生产者休息ing
task_name:28868-91 data:[182, 182]
...
task_name:28868-96 data:[192, 192]
生产者工作结束
进程29208 处理结束
task_name:28868-97 data:[194, 194]
进程20632 处理结束
进程28496 处理结束
task_name:28868-98 data:[196, 196]
进程30200 处理结束
进程26512 处理结束
进程29776 处理结束
task_name:28868-99 data:[198, 198]
进程30072 处理结束
start_work 结束
测试结束

上面的代码结果可以看到我们主进程中可以通过Queue获取子进程中put的数据,实现进程间的通信

3.2 JoinableQueue队列

JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理

通知进程是使用共享的信号和条件变量来实现的

  • 参数介绍:
    • maxsize: 是队列中允许最大项数,省略则无大小限制
  • 方法介绍:
    • q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理 如果调用此方法的次数大于从队列中删除项目的数量将引发ValueError异常
    • q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理 阻塞将持续到队列中的每个项目均调用q.task_done()方法为止

示例代码

#!/usr/bin/env Python
# -- coding: utf-8 --

"""
@version: v1.0
@author: narutohyc
@file: multiprocessing_queue.py
@Description: 多进程队列使用示例
@time: 2020/5/14 15:53
"""

from multiprocessing import Process, Queue, JoinableQueue, Manager
from multiprocessing import cpu_count
import os, time


class Task:
    def __init__(self, task_name: str, data: list, **kwargs):
        self.task_name = task_name
        self.data = data

    def __repr__(self):
        return f'task_name:{self.task_name} data:{self.data}'


class MultiProcessingJoinableQueue:
    def __init__(self):
        # 进程数
        self.num_of_worker = cpu_count()

        # 进程队列大小,根据不同的任务需求
        self.size_of_queue = 10

    def start_work(self):
        print("start_work 开始")

        # 进程队列
        process_list = []

        # 新建一个大小为10的队列
        work_queue = JoinableQueue(self.size_of_queue)

        # 进程间共享列表, 其他的还有共享字典等,都是进程安全的
        dealed_sample_lst = Manager().list()

        # 一个生产者
        sent = Process(target=self.productor, args=(work_queue, dealed_sample_lst,))
        process_list.append(sent)

        # 多个消费者
        for _ in range(self.num_of_worker - 1):
            process = Process(target=self.consumer, args=(work_queue, dealed_sample_lst,))
            process.daemon = True
            process_list.append(process)

        [process.start() for process in process_list]
        # 这里需要注意的一点是,这里join只需要调用生产者(别调消费者的join,否则无法正常退出)
        # 消费者不需要,个人感觉应该是生产者那边已经调用了work_queue.join()的方法
        # 消费者结束后,整个程序退出
        [process.join() for process in process_list[:1]]

        print("start_work 结束")
        return dealed_sample_lst

    def productor(self, work_queue: Queue, dealed_sample_lst):
        print("生产者开始工作")
        for ii in range(100):
            work_queue.put(Task(task_name=f'{str(os.getpid())}-{str(ii)}', data=[ii for _ in range(2)]))
            if ii % 30 == 0:
                time.sleep(1)
                print("生产者休息ing")

        print("生产者工作结束")
        work_queue.join()

    def consumer(self, work_queue: Queue, dealed_sample_lst):
        while True:
            task: Task = work_queue.get()
            if task is None:
                break
            # 处理数据
            task.data = [ii * 2 for ii in task.data]
            dealed_sample_lst.append(task)
            print(task)
            work_queue.task_done()
        print(f'进程{os.getpid()} 处理结束')


def multiprocessing_joinablequeue_test():
    multiprocessing_joinablequeue = MultiProcessingJoinableQueue()
    dealed_sample_lst = multiprocessing_joinablequeue.start_work()
    # for sample in dealed_sample_lst:
    #     print(sample)
    print("测试结束")

if __name__ == '__main__':
    multiprocessing_joinablequeue_test()

结果输出

start_work 开始
生产者开始工作
task_name:14608-0 data:[0, 0]
生产者休息ing
task_name:14608-1 data:[2, 2]
...
task_name:14608-7 data:[14, 14]
生产者休息ing
task_name:14608-31 data:[62, 62]
...
task_name:14608-60 data:[120, 120]
生产者休息ing
task_name:14608-61 data:[122, 122
...
task_name:14608-90 data:[180, 180]
生产者休息ing
生产者工作结束
task_name:14608-91 data:[182, 182]
...
task_name:14608-93 data:[186, 186]
start_work 结束
测试结束

3.3 管道Pipe

Pipe的本质是进程之间的用管道数据传递,而不是数据共享,这和socket有点像

pipe()返回两个连接对象分别表示管道的两端,每端都有send()和recv()函数

如果两个进程试图在同一时间的同一端进行读取和写入那么,这可能会损坏管道中的数据

管道是数据不安全的,多个进程同时收发数据可道引起数据异常,这时候就应该配合锁使用
from multiprocessing import Process, Pipe

def fun1(conn):
    print('子进程发送消息:')
    conn.send('你好主进程')
    print('子进程接受消息:')
    print(conn.recv())
    conn.close()

if __name__ == '__main__':
    conn1, conn2 = Pipe() #关键点,pipe实例化生成一个双向管
    p = Process(target=fun1, args=(conn2,)) #conn2传给子进程
    p.start()
    print('主进程接受消息:')
    print(conn1.recv())
    print('主进程发送消息:')
    conn1.send("你好子进程")
    p.join()
    print('结束测试')
主进程接受消息:
子进程发送消息:
子进程接受消息:
你好主进程
主进程发送消息:
你好子进程
结束测试

Process finished with exit code 0

上面可以看到主进程和子进程可以相互发送消息

3.4 Managers

Queue和Pipe只是实现了数据交互,并没实现数据共享,即一个进程去更改另一个进程的数据,那么就要用到Managers

from multiprocessing import Process, Manager

def fun1(dic,lis,index):
    dic[index] = 'a'
    dic['2'] = 'b'    
    lis.append(index)    #[0,1,2,3,4,0,1,2,3,4,5,6,7,8,9]
    #print(l)

if __name__ == '__main__':
    with Manager() as manager:
        dic = manager.dict()#注意字典的声明方式,不能直接通过{}来定义
        l = manager.list(range(5))#[0,1,2,3,4]

        process_list = []
        for i in range(10):
            p = Process(target=fun1, args=(dic,l,i))
            p.start()
            process_list.append(p)

        for res in process_list:
            res.join()
        print(dic)
        print(l)
{0: 'a', '2': 'b', 3: 'a', 1: 'a', 2: 'a', 4: 'a', 5: 'a', 7: 'a', 6: 'a', 8: 'a', 9: 'a'}
[0, 1, 2, 3, 4, 0, 3, 1, 2, 4, 5, 7, 6, 8, 9]

可以看到主进程定义了一个字典和一个列表,在子进程中,可以添加和修改字典的内容 在列表中插入新的数据,实现进程间的数据共享,即可以共同修改同一份数据

4 注意事项

4.1 无法调用多层生成器(待验证)

#!/usr/bin/env Python
# -- coding: utf-8 --

"""
@version: v0.1
@author: narutohyc
@file: text.py
@Description: 
@time: 2020/5/29 19:51
"""

from multiprocessing import Process, Queue, JoinableQueue, Manager
from multiprocessing import cpu_count
import os, time

from abc import (ABC,
                 abstractmethod,
                 ABCMeta)

from comm.logger.logger_config import logger


class SampleIterator(ABC, metaclass=ABCMeta):
    def __init__(self):
        pass

    @abstractmethod
    def __iter__(self):
        '''
        样本处理并返回
        '''
        pass


class DataSource1(SampleIterator):
    def __init__(self):
        super(DataSource1, self).__init__()

    def __iter__(self):
        for ii in range(10):
            yield ii


class DataSource2(SampleIterator):
    def __init__(self, data_source):
        super(DataSource2, self).__init__()
        self.data_source = data_source

    def __iter__(self):
        for ii in self.data_source:
            yield ii
class DataSource3(SampleIterator):
    def __init__(self, data_source):
        super(DataSource3, self).__init__()
        self.data_source = data_source

    def __iter__(self):
        for ii in self.data_source:
            yield ii
class DataSource4(SampleIterator):
    def __init__(self, data_source):
        super(DataSource4, self).__init__()
        self.data_source = data_source

    def __iter__(self):
        for ii in self.data_source:
            yield ii


class HUCY():
    def __init__(self, data_source=None):
        self.num_of_worker = cpu_count()
        self.size_of_queue = 2
        self.data_source = data_source

    def start_work(self):
        # 进程队列
        process_list = []
        # 新建一个大小为10的队列
        work_queue = Queue(self.size_of_queue)

        # 一个生产者
        produce_num = 1

        for _ in range(produce_num):
            sent = Process(target=self.productor, args=(work_queue,))
            sent.start()
            process_list.append(sent)

        # 多个消费者
        for _ in range(self.num_of_worker - produce_num):
            process = Process(target=self.consumer, args=(work_queue,))
            process.start()
            process_list.append(process)

        # 这里需要加入结束标识,还有就是JoinableQueue的方式
        [process.join() for process in process_list[:produce_num]]
        for _ in range(self.num_of_worker - produce_num):
            work_queue.put(None)
        [process.join() for process in process_list[produce_num:]]
        print("start_work 结束")

    def productor(self, work_queue):
        [work_queue.put(ii) for ii in self.data_source]
        logger.info("生产者结束")

    def consumer(self, work_queue):
        while True:
            data = work_queue.get()
            if data is None:
                break
            logger.info(f"数据: {data}")
        logger.info("消费者结束")


def hyc_test():
    da1=DataSource2(DataSource3(DataSource4(DataSource1())))
    da2 = DataSource2(DataSource3(DataSource4(da1)))
    da3 = DataSource2(DataSource3(DataSource4(da2)))
    da4 = DataSource2(DataSource3(DataSource4(da3)))
    da5 = DataSource2(DataSource3(DataSource4(da4)))
    da6 = DataSource2(DataSource3(DataSource4(DataSource2(da5))))
    hucy = HUCY(da6)
    hucy.start_work()


if __name__ == '__main__':
    hyc_test()

几个问题:

  • 以上代码 若有多个生产者 就会各自拥有自己的数据生成器,导致数据重复

  • 有些定义的方法 好像会使程序卡住

Copyright © narutohyc.com 2021 all right reserved,powered by Gitbook该文件修订时间: 2024-05-06 07:11:15

results matching ""

    No results matching ""