1、生成器版本
下面的例子中,生成器函数可用于建立一个处理管道(本质上类似于Linux Shell中使用一个管道)。
下面给出包括关于查找、打开、读取和处理文件的一组生成器函数:
# 生成器版本:
import os
import fnmatch
import gzip
import bz2
import sys
def to_str(bytes_or_str):
if isinstance(bytes_or_str, bytes):
value = bytes_or_str.decode('gbk')
else:
value = bytes_or_str
return value
def to_bytes(bytes_or_str):
if isinstance(bytes_or_str, str):
value = bytes_or_str.encode('gbk')
else:
value = bytes_or_str
return value
def find_files(topdir, pattern):
for path, dirname, filelist in os.walk(topdir):
for name in filelist:
if fnmatch.fnmatch(name, pattern):
yield os.path.join(path, name)
def opener(filenames):
for name in filenames:
if name.endswith(".gz"):
f = gzip.open(name)
elif name.endswith(".bz2"):
f = bz2.open(name)
else:
f = open(name)
yield f
def cat(filelist):
for f in filelist:
print(f'\n[+] filename:{f}')
for line in f:
yield line
def grep(pattern, lines):
for line in lines:
if pattern in to_str(line):
yield line
wwwlogs = find_files('/var/log/nginx/', 'access.log*')
files = opener(wwwlogs)
lines = cat(files)
pylines = grep("admin", lines)
for line in pylines:
sys.stdout.write(line)
在这个例子中,程序要处理的是顶级目录 '/var/log/nginx/' 的所有子目录中的所有 'access.log*' 文件中的全部行。
程序将测试每个 access.log 文件的文件压缩情况,然后使用正确的文件打开器打开它们。
程序将各行连接在一起,并通过查找子字符串 “php” 的过滤器进行处理。
整个程序由最后的 for 语句驱动的。
该循环的每次迭代都会通过管道获得一个新值并使用之。
此外,这种实现占用内存极少,因为它无需创建任何临时列表或者其他大型的数据结构。
2、协程版本
协程可用于编写数据流处理程序。以这种方式组织的程序像是反转的管道。
你将值发送到一些相互连接的协程中,而不是通过一系列for循环的生成器函数获得值。
下面给出的例子,其中的协程函数模拟了前面给出的生成器函数:
# 协程版本:
import os
import fnmatch
import gzip
import bz2
import sys
def to_str(bytes_or_str):
if isinstance(bytes_or_str, bytes):
value = bytes_or_str.decode('gbk')
else:
value = bytes_or_str
return value
def to_bytes(bytes_or_str):
if isinstance(bytes_or_str, str):
value = bytes_or_str.encode('gbk')
else:
value = bytes_or_str
return value
def coroutine(func):
def start(*args, **kwargs):
g = func(*args, **kwargs)
g.__next__()
return g
return start
@coroutine
def find_files(target):
while True:
topdir, pattern = (yield)
for path, dirname, filelist in os.walk(topdir):
for name in filelist:
if fnmatch.fnmatch(name, pattern):
target.send(os.path.join(path, name))
@coroutine
def opener(target):
while True:
name = (yield)
if name.endswith(".gz"):
f = gzip.open(name)
elif name.endswith(".bz2"):
f = bz2.open(name)
else:
f = open(name)
target.send(f)
@coroutine
def cat(target):
while True:
f = (yield)
for line in f:
target.send(line)
@coroutine
def grep(pattern, target):
while True:
line = (yield )
if pattern in to_str(line):
target.send(line)
@coroutine
def printer():
while True:
line = (yield )
sys.stdout.write(line)
# 以下代码说明如果将这些协程连接起来,创建一个数据流处理管道:
finder = find_files(opener(cat(grep("php", printer()))))
# 现在发送一个值:
finder.send(('/var/log/nginx/', 'access.log*'))
finder.send(('/var/log/apache/', 'access.log*'))
在这个例子中,每个协程都发送数据给在他们的target参数中指定的另一个协程。
和生成器的例子不同,执行完全由将数据发送到第一个协程find_files()中来驱动。
接下来,这个协程将数据转入下一个阶段。
这个例子有一个关键地方,即协程管道永远保持活动状态,知道它显示调用close()方法为止。
因此,只有需要,程序可以不断地给协程中注入数据,例如本例中对于send()方法的两次重复调用。协程可用于实现某种形式的并发。
例如,一个集中式的任务管理器或者事件循环,可以安排并将数据发送到成百上千个用于执行各种处理任务的协程中。
输入数据“被发送”到协程中这个事实还说明,若程序使用消息队列或消息传递在组件之间进行通信,协程可以很容易地与之在一起混合使用。
文章评论