博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Python:线程同步,Barrier屏障
阅读量:4111 次
发布时间:2019-05-25

本文共 6331 字,大约阅读时间需要 21 分钟。

Barrier屏障

有人翻译成栅栏,建议还是使用屏障,可以想象成路障,道闸,Python3.2引入的新功能

名称 含义
Barrier(parties,action=None,timerout=None) 构建Barrier对象,指定参与方数目,timerout是wait方法未指定超时的默认值
n_waiting 当前在屏障中等待的线程数
parties 各方数,就是需要多少等待
wait(timerout=None) 等待通过屏障。返回0到线程数-1的整数,每个线程返回不同,如果wait方法设置了超时,并超时发送,屏障将处于broken状态

Barrier实例

import threadingimport logging#输出自定义FORMAT = '%(asctime)s-15s\t [%(threadName)s,%(thread)8d] %(message)s'logging.basicConfig(format=FORMAT,level=logging.INFO)def worker(barrier:threading.Barrier):    logging.info("waiting for {} threads".format(barrier.n_waiting))    try:        barrier_id = barrier.wait()        logging.info("after barrier {}".format(barrier_id))    except threading.BrokenBarrierError:        logging.info("Broken Barrier")barrier = threading.Barrier(3)for x in range(3):    threading.Thread(target=worker,name="worker-{}".format(x),args=(barrier,)).start()logging.info("started")结果:2021-06-20 18:25:40,300-15s	 [worker-0,   13364] waiting for 0 threads2021-06-20 18:25:40,301-15s	 [worker-1,   18264] waiting for 1 threads2021-06-20 18:25:40,301-15s	 [worker-2,   20048] waiting for 2 threads2021-06-20 18:25:40,301-15s	 [MainThread,   21624] started2021-06-20 18:25:40,301-15s	 [worker-2,   20048] after barrier 22021-06-20 18:25:40,301-15s	 [worker-0,   13364] after barrier 02021-06-20 18:25:40,301-15s	 [worker-1,   18264] after barrier 1
  • 从运行结果来看,所有线程冲到了Barrier前等待,直到到达parites的数目,屏障打开,所有线程停止等待,继续执行
  • 再有线程wait,屏障就绪等待参数发数目
名称 注解
broken() 如果屏障处于打破的状态,返回True
abort() 将屏障于broken状态,等待中的线程或者调用等待方法的线程中都会抛出BrokenBarrierError,直到reset方法来恢复屏障
reset() 恢复屏障,重新开始拦截
import threadingimport logging#输出格式定义FORMAT = '%(asctime)-15s\t [%(threadName)s, %(thread)8d] %(message)s'logging.basicConfig(format=FORMAT,level=logging.INFO)def woker(barrier:threading.Barrier):    logging.info("waiting for {} threads".format(barrier.n_waiting))    try:        barrier_id = barrier.wait()        logging.info("after barrier {}".format(barrier_id))    except threading.BrokenBarrierError:        logging.info("Broken Barrier .run")barrier = threading.Barrier(3)for i in range(0,9):    if i ==2 :        barrier.abort()    elif i == 6:        barrier.reset()    threading.Event().wait(1)    threading.Thread(target=woker,name="worker - {}".format(i),args=(barrier,)).start()结果:2021-06-20 19:38:07,580	 [worker - 0,    17756] waiting for 0 threads2021-06-20 19:38:08,580	 [worker - 1,    17920] waiting for 1 threads2021-06-20 19:38:08,580	 [worker - 0,    17756] Broken Barrier .run2021-06-20 19:38:08,580	 [worker - 1,    17920] Broken Barrier .run2021-06-20 19:38:09,581	 [worker - 2,    21476] waiting for 0 threads2021-06-20 19:38:09,582	 [worker - 2,    21476] Broken Barrier .run2021-06-20 19:38:10,584	 [worker - 3,    18800] waiting for 0 threads2021-06-20 19:38:10,584	 [worker - 3,    18800] Broken Barrier .run2021-06-20 19:38:11,584	 [worker - 4,    21236] waiting for 0 threads2021-06-20 19:38:11,585	 [worker - 4,    21236] Broken Barrier .run2021-06-20 19:38:12,585	 [worker - 5,    16756] waiting for 0 threads2021-06-20 19:38:12,585	 [worker - 5,    16756] Broken Barrier .run2021-06-20 19:38:13,586	 [worker - 6,    21268] waiting for 0 threads2021-06-20 19:38:14,588	 [worker - 7,    10068] waiting for 1 threads2021-06-20 19:38:15,590	 [worker - 8,    15120] waiting for 2 threads2021-06-20 19:38:15,590	 [worker - 8,    15120] after barrier 22021-06-20 19:38:15,590	 [worker - 7,    10068] after barrier 12021-06-20 19:38:15,590	 [worker - 6,    21268] after barrier 0
  • 屏幕中等待了2个,屏障就被break了,waiting的线程抛了,BrokenBarrierError异常,新wait的线程也抛出异常,直到屏幕恢复,才继续按照parties数目要求继续拦截线程     

 wait方法超时实例

wait方法超时发生,屏障将处于broken状态,直到reset

 

import threadingimport logging#输出格式定义FORMAT = '%(asctime)s-15s\t [%(threadName)s,%(thread)8d] %(message)s'logging.basicConfig(level=logging.INFO,format=FORMAT)def  worker(barrier:threading.Barrier,i:int):    logging.info("waiting for {}".format(barrier.n_waiting))    try:        logging.info(barrier.broken) #是否broker        if i < 3:            barrier_id = barrier.wait(1)  #超时后,屏障brokern        else:            if i == 6:                barrier.reset()  #恢复屏障            barrier_id = barrier.wait()        logging.info('after barrier {}'.format(barrier_id))    except threading.BrokenBarrierError:        logging.info("Broken barrier.run")barrier = threading.Barrier(3)for x  in range(0,9):    threading.Event().wait(2)    threading.Thread(target=worker,name='worker-{}'.format(x),args=(barrier,x)).start()结果:2021-06-20 22:30:19,253-15s	 [worker-0,   24124] waiting for 02021-06-20 22:30:19,254-15s	 [worker-0,   24124] False2021-06-20 22:30:20,255-15s	 [worker-0,   24124] Broken barrier.run2021-06-20 22:30:21,255-15s	 [worker-1,   14364] waiting for 02021-06-20 22:30:21,255-15s	 [worker-1,   14364] True2021-06-20 22:30:21,255-15s	 [worker-1,   14364] Broken barrier.run2021-06-20 22:30:23,257-15s	 [worker-2,   24020] waiting for 02021-06-20 22:30:23,258-15s	 [worker-2,   24020] True2021-06-20 22:30:23,258-15s	 [worker-2,   24020] Broken barrier.run2021-06-20 22:30:25,259-15s	 [worker-3,    3532] waiting for 02021-06-20 22:30:25,259-15s	 [worker-3,    3532] True2021-06-20 22:30:25,259-15s	 [worker-3,    3532] Broken barrier.run2021-06-20 22:30:27,260-15s	 [worker-4,   24260] waiting for 02021-06-20 22:30:27,260-15s	 [worker-4,   24260] True2021-06-20 22:30:27,260-15s	 [worker-4,   24260] Broken barrier.run2021-06-20 22:30:29,260-15s	 [worker-5,   14320] waiting for 02021-06-20 22:30:29,261-15s	 [worker-5,   14320] True2021-06-20 22:30:29,261-15s	 [worker-5,   14320] Broken barrier.run2021-06-20 22:30:31,261-15s	 [worker-6,   24016] waiting for 02021-06-20 22:30:31,261-15s	 [worker-6,   24016] True2021-06-20 22:30:33,262-15s	 [worker-7,    9308] waiting for 12021-06-20 22:30:33,263-15s	 [worker-7,    9308] False2021-06-20 22:30:35,266-15s	 [worker-8,   22676] waiting for 22021-06-20 22:30:35,266-15s	 [worker-8,   22676] False2021-06-20 22:30:35,266-15s	 [worker-8,   22676] after barrier 22021-06-20 22:30:35,266-15s	 [worker-7,    9308] after barrier 12021-06-20 22:30:35,266-15s	 [worker-6,   24016] after barrier 0

Barrier应用

并发初始化

所有线程都必须初始化完成后,才能继续工作,列如运行前加载数据、检查,如果这些工作没完成,就开始运行,将不能工作

10个线程10种准备工作,每个线程负责一种工作,只有这10个线程都完成后,才能继续工作,先完成的要等待后完成的线程

例如:启动一个程序,需要先加载磁盘文件、缓存预热、初始化连接池等工作,这些工作可以齐头并进,不过只有满足了,程序才能继续向后执行,假设数据库连接失败,则初始化工作失败,就要abort,barrier置为broken,所有线程收到异常退出

工作量:有10个计算任务,完成6个,就算工作完成

转载地址:http://woesi.baihongyu.com/

你可能感兴趣的文章
LeetCode第46题思悟——全排列(permutations)
查看>>
LeetCode第47题思悟—— 全排列 II(permutations-ii)
查看>>
LeetCode第48题思悟——旋转图像(rotate-image)
查看>>
驱动力3.0,动力全开~
查看>>
记CSDN访问量10万+
查看>>
Linux下Oracle数据库账户被锁:the account is locked问题的解决
查看>>
记CSDN访问20万+
查看>>
Windows 环境下Webstorm 2020.3 版本在右下角找不到Git分支切换部件的一种解决方法
查看>>
Electron-Vue项目中遇到fs.rm is not a function问题的解决过程
查看>>
飞机换乘次数最少问题的两种解决方案
查看>>
有向无回路图的理解
查看>>
设计模式中英文汇总分类
查看>>
MFC实现五子棋游戏
查看>>
WPF实现蜘蛛纸牌游戏
查看>>
单例模式
查看>>
工厂方法模式
查看>>
模板方法模式
查看>>
数据结构之队列、栈
查看>>
数据结构之树
查看>>
数据结构之二叉树
查看>>