本文共 6331 字,大约阅读时间需要 21 分钟。
有人翻译成栅栏,建议还是使用屏障,可以想象成路障,道闸,Python3.2引入的新功能
名称 含义 Barrier(parties,action=None,timerout=None) 构建Barrier对象,指定参与方数目,timerout是wait方法未指定超时的默认值 n_waiting 当前在屏障中等待的线程数 parties 各方数,就是需要多少等待 wait(timerout=None) 等待通过屏障。返回0到线程数-1的整数,每个线程返回不同,如果wait方法设置了超时,并超时发送,屏障将处于broken状态 Barrier实例
import threadingimport logging#输出自定义FORMAT = '%(asctime)s-15s\t [%(threadName)s,%(thread)8d] %(message)s'logging.basicConfig(format=FORMAT,level=logging.INFO)def worker(barrier:threading.Barrier): logging.info("waiting for {} threads".format(barrier.n_waiting)) try: barrier_id = barrier.wait() logging.info("after barrier {}".format(barrier_id)) except threading.BrokenBarrierError: logging.info("Broken Barrier")barrier = threading.Barrier(3)for x in range(3): threading.Thread(target=worker,name="worker-{}".format(x),args=(barrier,)).start()logging.info("started")结果:2021-06-20 18:25:40,300-15s [worker-0, 13364] waiting for 0 threads2021-06-20 18:25:40,301-15s [worker-1, 18264] waiting for 1 threads2021-06-20 18:25:40,301-15s [worker-2, 20048] waiting for 2 threads2021-06-20 18:25:40,301-15s [MainThread, 21624] started2021-06-20 18:25:40,301-15s [worker-2, 20048] after barrier 22021-06-20 18:25:40,301-15s [worker-0, 13364] after barrier 02021-06-20 18:25:40,301-15s [worker-1, 18264] after barrier 1
- 从运行结果来看,所有线程冲到了Barrier前等待,直到到达parites的数目,屏障打开,所有线程停止等待,继续执行
- 再有线程wait,屏障就绪等待参数发数目
名称 注解 broken() 如果屏障处于打破的状态,返回True abort() 将屏障于broken状态,等待中的线程或者调用等待方法的线程中都会抛出BrokenBarrierError,直到reset方法来恢复屏障 reset() 恢复屏障,重新开始拦截 import threadingimport logging#输出格式定义FORMAT = '%(asctime)-15s\t [%(threadName)s, %(thread)8d] %(message)s'logging.basicConfig(format=FORMAT,level=logging.INFO)def woker(barrier:threading.Barrier): logging.info("waiting for {} threads".format(barrier.n_waiting)) try: barrier_id = barrier.wait() logging.info("after barrier {}".format(barrier_id)) except threading.BrokenBarrierError: logging.info("Broken Barrier .run")barrier = threading.Barrier(3)for i in range(0,9): if i ==2 : barrier.abort() elif i == 6: barrier.reset() threading.Event().wait(1) threading.Thread(target=woker,name="worker - {}".format(i),args=(barrier,)).start()结果:2021-06-20 19:38:07,580 [worker - 0, 17756] waiting for 0 threads2021-06-20 19:38:08,580 [worker - 1, 17920] waiting for 1 threads2021-06-20 19:38:08,580 [worker - 0, 17756] Broken Barrier .run2021-06-20 19:38:08,580 [worker - 1, 17920] Broken Barrier .run2021-06-20 19:38:09,581 [worker - 2, 21476] waiting for 0 threads2021-06-20 19:38:09,582 [worker - 2, 21476] Broken Barrier .run2021-06-20 19:38:10,584 [worker - 3, 18800] waiting for 0 threads2021-06-20 19:38:10,584 [worker - 3, 18800] Broken Barrier .run2021-06-20 19:38:11,584 [worker - 4, 21236] waiting for 0 threads2021-06-20 19:38:11,585 [worker - 4, 21236] Broken Barrier .run2021-06-20 19:38:12,585 [worker - 5, 16756] waiting for 0 threads2021-06-20 19:38:12,585 [worker - 5, 16756] Broken Barrier .run2021-06-20 19:38:13,586 [worker - 6, 21268] waiting for 0 threads2021-06-20 19:38:14,588 [worker - 7, 10068] waiting for 1 threads2021-06-20 19:38:15,590 [worker - 8, 15120] waiting for 2 threads2021-06-20 19:38:15,590 [worker - 8, 15120] after barrier 22021-06-20 19:38:15,590 [worker - 7, 10068] after barrier 12021-06-20 19:38:15,590 [worker - 6, 21268] after barrier 0
- 屏幕中等待了2个,屏障就被break了,waiting的线程抛了,BrokenBarrierError异常,新wait的线程也抛出异常,直到屏幕恢复,才继续按照parties数目要求继续拦截线程
wait方法超时发生,屏障将处于broken状态,直到reset
import threadingimport logging#输出格式定义FORMAT = '%(asctime)s-15s\t [%(threadName)s,%(thread)8d] %(message)s'logging.basicConfig(level=logging.INFO,format=FORMAT)def worker(barrier:threading.Barrier,i:int): logging.info("waiting for {}".format(barrier.n_waiting)) try: logging.info(barrier.broken) #是否broker if i < 3: barrier_id = barrier.wait(1) #超时后,屏障brokern else: if i == 6: barrier.reset() #恢复屏障 barrier_id = barrier.wait() logging.info('after barrier {}'.format(barrier_id)) except threading.BrokenBarrierError: logging.info("Broken barrier.run")barrier = threading.Barrier(3)for x in range(0,9): threading.Event().wait(2) threading.Thread(target=worker,name='worker-{}'.format(x),args=(barrier,x)).start()结果:2021-06-20 22:30:19,253-15s [worker-0, 24124] waiting for 02021-06-20 22:30:19,254-15s [worker-0, 24124] False2021-06-20 22:30:20,255-15s [worker-0, 24124] Broken barrier.run2021-06-20 22:30:21,255-15s [worker-1, 14364] waiting for 02021-06-20 22:30:21,255-15s [worker-1, 14364] True2021-06-20 22:30:21,255-15s [worker-1, 14364] Broken barrier.run2021-06-20 22:30:23,257-15s [worker-2, 24020] waiting for 02021-06-20 22:30:23,258-15s [worker-2, 24020] True2021-06-20 22:30:23,258-15s [worker-2, 24020] Broken barrier.run2021-06-20 22:30:25,259-15s [worker-3, 3532] waiting for 02021-06-20 22:30:25,259-15s [worker-3, 3532] True2021-06-20 22:30:25,259-15s [worker-3, 3532] Broken barrier.run2021-06-20 22:30:27,260-15s [worker-4, 24260] waiting for 02021-06-20 22:30:27,260-15s [worker-4, 24260] True2021-06-20 22:30:27,260-15s [worker-4, 24260] Broken barrier.run2021-06-20 22:30:29,260-15s [worker-5, 14320] waiting for 02021-06-20 22:30:29,261-15s [worker-5, 14320] True2021-06-20 22:30:29,261-15s [worker-5, 14320] Broken barrier.run2021-06-20 22:30:31,261-15s [worker-6, 24016] waiting for 02021-06-20 22:30:31,261-15s [worker-6, 24016] True2021-06-20 22:30:33,262-15s [worker-7, 9308] waiting for 12021-06-20 22:30:33,263-15s [worker-7, 9308] False2021-06-20 22:30:35,266-15s [worker-8, 22676] waiting for 22021-06-20 22:30:35,266-15s [worker-8, 22676] False2021-06-20 22:30:35,266-15s [worker-8, 22676] after barrier 22021-06-20 22:30:35,266-15s [worker-7, 9308] after barrier 12021-06-20 22:30:35,266-15s [worker-6, 24016] after barrier 0
并发初始化
所有线程都必须初始化完成后,才能继续工作,列如运行前加载数据、检查,如果这些工作没完成,就开始运行,将不能工作
10个线程10种准备工作,每个线程负责一种工作,只有这10个线程都完成后,才能继续工作,先完成的要等待后完成的线程
例如:启动一个程序,需要先加载磁盘文件、缓存预热、初始化连接池等工作,这些工作可以齐头并进,不过只有满足了,程序才能继续向后执行,假设数据库连接失败,则初始化工作失败,就要abort,barrier置为broken,所有线程收到异常退出
工作量:有10个计算任务,完成6个,就算工作完成
转载地址:http://woesi.baihongyu.com/