0%

Python_协程

协程本身的定义,与进程、线程其实差不多,都是用于控制过程的工具:

[流畅的Python 16章](流畅的Python 16章)

协程的概念与简单示例

协程本身的定义,与进程、线程其实差不多,都是用于控制过程的工具。协程的协字,我们可以理解为协作,协程通过与调用方协作,产出相应的结果。

协程能自然的表述多种算法,例如仿真、游戏、异步I/O等事件驱动型编程形式或协作多任务。

协程:关键字 yield

上面的关键字,与生成器里面的yield,是一模一样的。其实协程就是一种生成器,只是在协程的概念里面,还存在自己独有的特性。两者还是存在一点点差异。下面给出两个例子:

“生成器”的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def gen_123():
yield 1
yield 2
yield 3
print(type(gen_123))
# <class 'function'>
print(type(gen_123()))
# <class 'generator'>
g = gen_123()
print(next(g))
# 1
print(next(g))
# 2
print(next(g))
# 3

协程的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 用作协程的生成器样例
def simple_coroutine():
print("coroutine start")
# 注意注意:这里将yield 进行了赋值操作,就是将一个普通的生成器转换为了协程生成器
x = yield
print("coroutine received: " , x)
print("coroutine ended")

my_coroutine = simple_coroutine()
print(my_coroutine)
# <generator object simple_coroutine at 0x042E3F30>
next(my_coroutine)
# coroutine start
my_coroutine.send(123)
# coroutine ended
# python-BaseException
"""Traceback (most recent call last):
File "D:\pycharm_install\PyCharm 2020.2.3\plugins\python\helpers\pydev\pydevd.py", line 1448, in _exec
pydev_imports.execfile(file, globals, locals) # execute the script
"""
  • 差异点:
    • 纯生成器:直接写明yield即可,不需要赋值
    • 协程生成器:将yield通过赋值符号,赋值给内部的局部变量。这里实现了外部调用者动态键入值的功能。

协程的状态

就像进程那样存在多种状态,协程也是包含有多种状态的,分别是:

GEN_CREATED:等待开始执行

GEN_RUNNING:解释器正在执行

GEN_SUSPENDED:在yield表达式处暂停

GEN_CLOSED: 执行结束

上面给出的例子中通过 send 方法将数据传递给协程内部变量,而send方法参数只能成为暂停的yield表达式的值,故当且仅当协程处于GEN_SUSPENDED 状态时,才能够调用send方法。这就需要对协程进行激活,也就是next方法。这也就是为什么上面的例子。

协程语句的执行顺序

  • 创建协程
  • 激活协程
  • 协程交互

核心在于,yield 在协程中更能够发挥其功能,其功能可以分为两种:产出与接收。分别对应函数next , send.

比如一句话

1
x = yield 5

上面这句话,包含有两层含义:

  1. 产出 5 .
  2. 接收一个值并赋值给x .

其执行顺序为:先产出再接收。下面一个例子能够更好的解释:

示例:产出两个值的协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def simple_coro2(a):
print("Started: a = " , a)
b = yield a
print("Received: b = " , b)
c = yield a + b
print("Received: c = " , c)
"""
>>> from test import simple_coro2
>>> my_coro2 = simple_coro2(14)

>>> next(my_coro2)
Started: a = 14
14
可以看到上面的一个next语句,将协程一直运行到第一个yield 产出值后的位置
>>> my_coro2.send(28)
Received: b = 28
42
可以看到一个send语句完成了对b的赋值与下一个yield语句产出值两个功能。
>>> my_coro2.send(99)
Received: c = 99
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration

"""

图示如下:

image-20201110110022552

协程 实现平均数功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# BEGIN CORO_AVERAGER
def averager():
total = 0.0
count = 0
average = None
while True: # <1>
# 下面的这一句话有两个含义:1.产出yield 2.接收一个值并赋给term
term = yield average # <2>
total += term
count += 1
average = total/count
# END CORO_AVERAGER

"""
A coroutine to compute a running average
# BEGIN CORO_AVERAGER_TEST
>>> coro_avg = averager() # <1>
>>> next(coro_avg) # <2>
>>> coro_avg.send(10) # <3>
10.0
>>> coro_avg.send(30)
20.0
>>> coro_avg.send(5)
15.0

# END CORO_AVERAGER_TEST

"""

我们可以看到代码与测试的数据,每次我们向协程发送一个值,其会传递出一个平均值,那么他的具体执行顺序是怎样的呢?

image-20201110140817893

  1. 激活协程,next(),并将运行到产出第一个值的位置(此时处于GEN_SUSPENDED 状态 )
  2. 协程交互,send(),首先将调用方数据进行赋值操作,其次一直运行到下一次yield产出值后(又处于GEN_SUSPENDED 状态);

预激协程的装饰器

上面的例子均需要通过next()方法激活协程,但这一步是很容易遗忘的。而装饰器是一种能够对函数进行包装的设计模式,却不会影响你的使用过程。

有关装饰器的使用与设计,在其他笔记中已有详细论述,这里直接贴代码。

装饰器定义:

1
2
3
4
5
6
7
8
9
10
from functools import wraps

def coroutine(func):
"""Decorator: primes `func` by advancing to first `yield`"""
@wraps(func)
def primer(*args,**kwargs): # <1>
gen = func(*args,**kwargs) # <2>
next(gen) # <3>
return gen # <4>
return primer

协程定义:

1
2
3
4
5
6
7
8
9
10
11
12
from coroutil import coroutine  # <4>

@coroutine # <5>
def averager(): # <6>
total = 0.0
count = 0
average = None
while True:
term = yield average
total += term
count += 1
average = total/count

代码运行情况展示

1
2
3
4
5
6
7
8
9
>>> from coroaverager1 import averager
>>> ave = averager()
>>> ave.send(10)
10.0
>>> ave.send(20)
15.0
>>> ave.send(30)
20.0
>>>

通过装饰器的作用,已经不需要显示的激活操作。

终止协程与异常处理

协程的终止

协程应该如何终止,这是一只没有考虑过的问题。下面的例子提供了协程终止实例:

协程终止的样例

1. 协程迭代结束:抛出 StopIteration 异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def simple_coro2(a):
print("Started: a = " , a)
b = yield a
print("Received: b = " , b)
c = yield a + b
print("Received: c = " , c)
"""
>>> from test import simple_coro2
>>> my_coro2 = simple_coro2(14)

>>> next(my_coro2)
Started: a = 14
14
>>> my_coro2.send(28)
Received: b = 28
42
>>> my_coro2.send(99)
Received: c = 99
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration

"""

2. 出现协程无法处理的异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def simple_coro2(a):
print("Started: a = " , a)
b = yield a
print("Received: b = " , b)
c = yield a + b
print("Received: c = " , c)
"""
>>> from test import simple_coro2
>>> coro = simple_coro2(14)
>>> next(coro)
Started: a = 14
14
>>> coro.send("abc")
Received: b = abc
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "F:\git_localRepository\fluentPython\example-code\16-coroutine\test.py", line 5, in simple_coro2
c = yield a + b
TypeError: unsupported operand type(s) for +: 'int' and 'str'
>>>

"""

上面的两个例子说明了协程终止的方式:异常。受此启发,Python提供了两种显示终止的协程的方法:throw()与close().

协程终止的专用方法:throw 与 close

generator.throw( ): 使生成器在暂停的yield表达式处抛出指定异常。

  • 如果此异常被生成器处理了,那么生成器会运行到下一个yield 表达式处,产生的值称为调用generator.throw方法得到的返回值。
  • 未被生成器处理,异常向外抛出,传到调用方。

generator.close( ):致使生成器在暂停的yield处抛出GeneratorExit异常

  • 如果生成器没有处理此异常,或者抛出了StopIteration 异常,调用方不会报错,正常退出协程。
  • 如果处理了此异常,(在接收到GeneratorExit异常后),仍旧产出值,那么会报RuntimeError异常。

下面会展示相关例子用于理解:

定义一个处理指定异常的协程:

1
2
3
4
5
6
7
8
9
10
11
12
13
class DemoException(Exception):
"""An exception type for the demonstration."""

def demo_exc_handling():
print('-> coroutine started')
while True:
try:
x = yield
except DemoException: # <1>
print('*** DemoException handled. Continuing...')
else: # <2>
print('-> coroutine received: {!r}'.format(x))
raise RuntimeError('This line should never run.') # <3>

close 方法示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
"""
Coroutine closing demonstration::
调用close方法后,协程就是关闭状态
# BEGIN DEMO_CORO_EXC_1
>>> exc_coro = demo_exc_handling()
>>> next(exc_coro)
-> coroutine started
>>> exc_coro.send(11)
-> coroutine received: 11
>>> exc_coro.send(22)
-> coroutine received: 22
>>> exc_coro.close()
>>> from inspect import getgeneratorstate
>>> getgeneratorstate(exc_coro)
'GEN_CLOSED'

# END DEMO_CORO_EXC_1
"""

throw 方法示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
"""
Coroutine handling exception::

# BEGIN DEMO_CORO_EXC_2
# 抛出的指定异常,能够被处理,那么协程状态还是暂停状态
>>> exc_coro = demo_exc_handling()
>>> next(exc_coro)
-> coroutine started
>>> exc_coro.send(11)
-> coroutine received: 11
>>> exc_coro.throw(DemoException)
*** DemoException handled. Continuing...
>>> getgeneratorstate(exc_coro)
'GEN_SUSPENDED'

# END DEMO_CORO_EXC_2

Coroutine not handling exception::
# 抛出的指定异常,不能够被处理,那么协程状态就是关闭状态
# BEGIN DEMO_CORO_EXC_3
>>> exc_coro = demo_exc_handling()
>>> next(exc_coro)
-> coroutine started
>>> exc_coro.send(11)
-> coroutine received: 11
>>> exc_coro.throw(ZeroDivisionError)
Traceback (most recent call last):
...
ZeroDivisionError
>>> getgeneratorstate(exc_coro)
'GEN_CLOSED'

# END DEMO_CORO_EXC_3
"""

协程返回值的处理方式

我们上面的协程都是采用的产出的方式,这都是协程在运行过程中的产出,我们能否采用return的方式获得一个最终的结果。例如上面提到的平均值。下面会做一些尝试,并提出一个目前可行的解决方案。

有返回值的协程代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from collections import namedtuple

Result = namedtuple('Result', 'count average')
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield
if term is None:
break # <1>
total += term
count += 1
average = total/count
return Result(count, average) # <2>

运行代码:

1
2
3
4
5
6
7
8
9
10
11
>>> from coroaverager2 import *
>>> ave = averager()
>>> next(ave)
>>> ave.send(10)
>>> ave.send(20)
>>> ave.send(30)
>>> ave.send(None)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration: Result(count=3, average=20.0)

我们发现确实返回了,但是其返回值在StopIteration 异常的一个属性里面。

捕获异常中的值的做法:

根据上面的例子进行一定的改变:

1
2
3
4
5
6
7
8
9
10
11
12
>>> ave = averager()
>>> next(ave)
>>> ave.send(10)
>>> ave.send(20)
>>> ave.send(30)
>>> try:
... ave.send(None)
... except StopIteration as exc:
... result = exc.value
...
>>> result
Result(count=3, average=20.0)

上面的例子捕获的此异常,并获得了异常中的值。

Yield From

yield from 是一种全新的句法结构,作用比yield多。

作用:在生成器gen中使用 yield from subgen()时 , subgen() 获得控制权,把产出的值传递给调用gen的调用方,即调用方可以直接控制subgen 。

上面描述的很抽象,在我理解看来 有三方,两层关系:调用方与gen , gen与 subgen 。gen就是采用了yield from语法的函数。

书本中有这么一句话:yield from 的主要功能是打开双向通道,把最外层的调用方与最内层的子生成器连接起来。这两者可以直接发送和产出值,还可以直接传入异常,不需要在位于中间的协程中添加大量处理异常的样板代码。

Yield From 用于简化for循环中的yield 表达式

产出目标值的几种方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
def gen():
for c in "ABC":
yield c
for i in range(1,4):
yield i
def gen2():
yield from "ABC"
yield from range(1,4)

print(list(gen()))
print(list(gen2()))
# ['A', 'B', 'C', 1, 2, 3]
# ['A', 'B', 'C', 1, 2, 3]
  • yield from x 表达式,会首先调用 iter(x) ,并对内部的

Yield From 用做通道

这里是Yield From 的主要用处,也是上面提到的话,这种方式的好处在于:能够解决在第4部分的异常值获取问题。

名词解释:

  • 委派生成器: 包含有 yield from 表达式的生成器函数
  • 子生成器:从yield from 表达式中部分中获取的生成器
  • 调用方: 调用 委派生成器 部分的代码

yield from 结构的用法图示:

image-20201110190218832

应用示例:使用 yield from 计算平均值并输出统计报告

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# BEGIN YIELD_FROM_AVERAGER
from collections import namedtuple

Result = namedtuple('Result', 'count average')


# the subgenerator
def averager(): # <1>
total = 0.0
count = 0
average = None
while True:
term = yield # <2>
if term is None: # <3>
break
total += term
count += 1
average = total/count
return Result(count, average) # <4>


# the delegating generator
def grouper(results, key): # <5>
while True: # <6>
results[key] = yield from averager() # <7>


# the client code, a.k.a. the caller
def main(data): # <8>
results = {}
for key, values in data.items():
group = grouper(results, key) # <9>
next(group) # <10>
for value in values:
group.send(value) # <11>
group.send(None) # important! <12>

# print(results) # uncomment to debug
report(results)


# output report
def report(results):
for key, result in sorted(results.items()):
group, unit = key.split(';')
print('{:2} {:5} averaging {:.2f}{}'.format(
result.count, group, result.average, unit))


data = {
'girls;kg':
[40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5],
'girls;m':
[1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43],
'boys;kg':
[39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
'boys;m':
[1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46],
}


if __name__ == '__main__':
main(data)

# END YIELD_FROM_AVERAGER

上面代码中包含有 子生成器,委派生成器,调用方 三部分定义代码 ,运行结果如下:

1
2
3
4
 9 boys  averaging 40.42kg
9 boys averaging 1.39m
10 girls averaging 42.04kg
10 girls averaging 1.43m

理解上面的代码是很重要的,一开始我自己在看代码时不知道调用方在调用 send() 函数时,究竟是直接作用于内部的子生成器还是作用于外部的委派生成器。作用的位置不同,其逻辑理解也会产生较大的误差。原书中对其内部逻辑的解释很清晰,这里我就直接摘抄下来,看看结果。

  • 外层for循环每一迭代会新建一个grouper实例,赋值给group变量,此时group就是委派生成器。
  • 调用 next(group) ,对委派生成器进行激活,从而进入 while True循环,调用了子生成器 average后,在 yield from 表达式处暂停。
  • 内层for 循环调用group.send(value),直接把值传给子生成器 averager. 同时,当前的grouper 实例(group)在yield from表达式处暂停。
  • 内层循环结束后,group 实例依旧在yield from 表达式处暂定,因此,grouper 函数定义体中的results[key]赋值语句还没有执行。
  • 如果外层 for 循环尾部没有,group.send(None) , 那么averager 子生成器永远不会终止,委派生成器 group 永远不会被激活,因此永远不会为 results[key] 进行赋值。
  • 外层 for 循环重新迭代式会创建一个新的 grouper 实例,然后绑定到 group 变量上。前一个grouper实例,被垃圾回收。

委派生成器作用是一个管道,那么多几个管道也是可以的,此管道链条需要以一个只使用 yield 表达式的简单生成器结束或者以任何可迭代的对象结束。

应用:仿真系统

没有去了解相关的内容,如果需要再去看。