[python]yield使用的最佳实践[2] - 数据管道的处理
【摘要】
回顾
上一篇我们就简单的介绍了yield的两种场景
yield生产数据 (生成器)
yield消费数据 (协程)
Coroutines,Pipelines 以及Dataflow
数据流处理的pipeline [串行方式]
我们使用coroutine 可以将数据以pipeline的方式进行处理 send() -> coroutine -> send() ...
回顾
- 上一篇我们就简单的介绍了yield的两种场景
- yield生产数据 (生成器)
- yield消费数据 (协程)
Coroutines,Pipelines 以及Dataflow
数据流处理的pipeline [串行方式]
- 我们使用coroutine 可以将数据以pipeline的方式进行处理
send() -> coroutine -> send() -> coroutine -> send() -> coroutine
-
我们将整个的coroutines串行起来,使用.send()方法,将数据一层一层的处理就可以完成整个数的操作流了
数据流的源头
- 整个的数据流的源头应该是一个生产者
- 由数据源头来驱动整个数据流
def source(target): while not done: item = produce_item() .... .... target.send(item) target.close()
- 但是从技术层面上来讲,这并不是一个coroutine
pipeline slink (数据终端管道)
- 所有的pipeline 必须有一个终点(slink)
send() -> coroutine() -> send() -> slink
- 收集数据所有向它传入的数据并且 处理它们
@coroutine def slink(): try: while True: item = yield except GeneratorExit: # Done print 'the slink has ended now'
一个实际的例子
import time def coroutine(func): def wrapper(*args, **kwargs): # start the func cr = func(*args, **kwargs) cr.next() return cr return wrapper def follow(thefile, target): while True: line = thefile.readline() if not line: time.sleep(0.1) continue target.send(line) @coroutine def grep(partten, target): while True: line = yield if partten in line: target.send(line) @coroutine def printer(): print 'coroutine printer has start ' try: while True: line = yield print line except GeneratorExit: print 'coroutine printer has stoped' thefile = open('test.txt', 'r') follow( thefile, grep('python', printer()) )
- 上面的代码示意
follow() -> grep() -> printer()
生产数据,传递数据,消费数据
以广播的形式来处理数据
import time def coroutine(func): def wrapper(*args, **kwargs): # start the func cr = func(*args, **kwargs) cr.next() return cr return wrapper @coroutine def grep(partten, target): while True: line = yield if partten in line: target.send(line) @coroutine def printer(): print 'coroutine printer has start ' try: while True: line = yield print line except GeneratorExit: print 'coroutine printer has stoped' @coroutine def broadcast(targets): while True: line = yield if not line: time.sleep(0.1) continue for target in targets: target.send(line) def follow(thefile, broadcast): while True: line = thefile.readline() if not line: time.sleep(0.1) continue broadcast.send(line) thefile = open('test.txt', 'r') follow( thefile, broadcast([ grep('python', printer()), grep('ssssss', printer()), grep('good', printer()) ]) )
图解
分析
- 协程提供了很多强大的数据流的功能,这可比单纯的迭代器强多了
- 如果你要编写一些数据处理组件,你可以依靠上面的这些例子,来做分支,数据管道,或者合流等操作
文章来源: brucedone.com,作者:大鱼的鱼塘,版权归原作者所有,如需转载,请联系作者。
原文链接:brucedone.com/archives/1041
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)