万字长文,带你入门异步编程

答疑·限时优惠

如果,你想让我看见你的疑问并且百分之百的回答。可以加入我的知识星球。
AI悦创·进化岛
AI悦创·进化岛

1. 初窥门径

你好,我是悦创。

异步模型是事件驱动模型的基础,而事件驱动的编程很多,比如:VBPyQt

事件驱动是指在持续事务管理过程中,进行决策的一种策略,即跟随当前时间点上出现的事件,调动可用资源,执行相关任务,使不断出现的问题得以解决,防止事务堆积。在计算机编程、公共关系、经济活动等领域均有应用。

所谓事件驱动,简单地说就是你点什么按钮(即产生什么事件),电脑执行什么操作(即调用什么函数)。当然事件不仅限于用户的操作,事件驱动的核心自然是事件。从事件角度说,事件驱动程序的基本结构是由一个事件收集器、一个事件发送器和一个事件处理器组成。事件收集器专门负责收集所有事件,包括来自用户的(如鼠标、键盘事件等)、来自硬件的(如时钟事件等)和来自软件的(如操作系统、应用程序本身等)。事件发送器负责将收集器收集到的事件分发到目标对象中。事件处理器做具体的事件响应工作,它往往要到实现阶段才完全确定,因而需要运用虚函数机制(函数名往往取为类似于 HandleMsg 的一个名字)。对于框架的使用者来说,他们唯一能够看到的是事件处理器。这也是他们所关心的内容。

视图(即我们通常所说的“窗口”)是“事件驱动”应用程序的另一个要元。它是我们所说的事件发送器的目标对象。视图接受事件并能够对其进行处理。当我们将事件发送到具体的视图时,实际上我们完成了一个根本性的变化:从传统的流线型程序结构到事件触发方式的转变。这样应用程序具备相当的柔性,可以应付种种离散的、随机的事件。

换个说法:你点击出来一个页面,你点击一下它给你一个反馈,你点击一下它给你个反馈,这个就是事件驱动。

image-20200309195631032.png

图 一

异步活动的执行模型可以只有一个单一的主控制流,能在单核心系统和多核心系统中运行。

在并发执行的异步模型中,许多任务被穿插在同一时间线上,所有任务都由一个控制流执行(单一线程)。任务的执行可能被暂停(挂起)或恢复,中间的这段时间线程将会去执行其他任务。

比如:

上图(图一)就是一个单线程,但是它去可以穿插许多任务(Task 1、Task 2、Task 3)

比方说:Task 1 它需要执行三次,每执行一次 Task 1 就要等待一段时间才可以继续执行。

异步的时候,我们就可以在它(Task 1 )等待的时候,分出去 Task 1 去等待,然后让该线程去执行下一个任务(Task 2)。当 Task 2 也需要等待的时候,也就把 Task 2 放出去等待, 再执行 Task 3 之后循环往复。等哪个准备好了,哪个需要执行就再放回我们的线程上执行。(如果需要等待的化,就再把它剔除出去)

当然,我们这些异步执行任务时随机的,并不是说 Task 1、Task 2、Task 3 按顺序来执行。

异步有一个特点,它的执行顺序时随机的不可控的,一切都是由操作系统随机进行的。(我们只需要定义任务即可)具体哪个时刻执行哪个任务我们时无法预测的。

Tips:

  1. 它的所有任务时单线程,同学们不要认为是多线程。
  2. 它的异步就是一条时间线上的,不可控的任务随机执行。
  3. 这个任务可能被暂停或恢复。

Ps:任务需要等待一段时间的时候,就被暂停放出去。等任务等待时间过去要再次执行任务的时候,任务就被恢复。

所以,上面说了这么多,异步其实就是单线程执行多种任务,这些线程上的任务可以被暂停或者恢复,不断地一些小人物穿插在一起。

异步中间其实主要是协程,同学们可能听过这两个概念,不过异步和协程是不一样的,它们是搭配起来用的。

接下来,我们来正式的讲解一下异步。

我们知道爬虫是 IO 密集型任务,比如如果我们使用 requests库来爬取某个站点的话,发出一个请求之后,程序必须要等待网站返回响应之后才能接着运行,而在等待响应的过程中,整个爬虫程序是一直在等待的,实际上没有做任何的事情。对于这种情况我们有没有优化方案呢?

2. 实例引入

比如在这里我们看这么一个示例网站:https://static4.scrape.cuiqingcai.com/,如图所示。

img

这个网站在内部实现返回响应的逻辑的时候特意加了 5 秒的延迟,也就是说如果我们用 requests来爬取其中某个页面的话,至少需要 5 秒才能得到响应。

另外这个网站的逻辑结构在之前的案例中我们也分析过,其内容就是电影数据,一共 100 部,每个电影的详情页是一个自增 ID,从 1~100,比如 https://static4.scrape.cuiqingcai.com/detail/43 就代表第 43 部电影,如图所示。

img

下面我们来用 requests 写一个遍历程序,直接遍历 1~100 部电影数据,代码实现如下:

import requests
import logging
import time
logging.basicConfig(level=logging.INFO,
                   format='%(asctime)s - %(levelname)s: %(message)s')
TOTAL_NUMBER = 100
BASE_URL = 'https://static4.scrape.cuiqingcai.com/detail/{id}'
start_time = time.time()
for id in range(1, TOTAL_NUMBER + 1):
   url = BASE_URL.format(id=id)
   logging.info('scraping %s', url)
   response = requests.get(url)
end_time = time.time()
logging.info('total time %s seconds', end_time - start_time)

这里我们直接用循环的方式构造了 100 个详情页的爬取,使用的是 requests 单线程,在爬取之前和爬取之后记录下时间,最后输出爬取了 100 个页面消耗的时间。

运行结果如下:

2020-03-31 14:40:35,411 - INFO: scraping https://static4.scrape.cuiqingcai.com/detail/1
2020-03-31 14:40:40,578 - INFO: scraping https://static4.scrape.cuiqingcai.com/detail/2
2020-03-31 14:40:45,658 - INFO: scraping https://static4.scrape.cuiqingcai.com/detail/3
2020-03-31 14:40:50,761 - INFO: scraping https://static4.scrape.cuiqingcai.com/detail/4
2020-03-31 14:40:55,852 - INFO: scraping https://static4.scrape.cuiqingcai.com/detail/5
2020-03-31 14:41:00,956 - INFO: scraping https://static4.scrape.cuiqingcai.com/detail/6
...
2020-03-31 14:48:58,785 - INFO: scraping https://static4.scrape.cuiqingcai.com/detail/99
2020-03-31 14:49:03,867 - INFO: scraping https://static4.scrape.cuiqingcai.com/detail/100
2020-03-31 14:49:09,042 - INFO: total time 513.6309871673584 seconds
2020-03-31 14:49:09,042 - INFO: total time 513.6309871673584 seconds

由于每个页面都至少要等待 5 秒才能加载出来,因此 100 个页面至少要花费 500 秒的时间,总的爬取时间最终为 513.6 秒,将近 9 分钟。

这个在实际情况下是很常见的,有些网站本身加载速度就比较慢,稍慢的可能 1~3 秒,更慢的说不定 10 秒以上才可能加载出来。如果我们用 requests 单线程这么爬取的话,总的耗时是非常多的。此时如果我们开了多线程或多进程来爬取的话,其爬取速度确实会成倍提升,但有没有更好的解决方案呢?

本课时我们就来了解一下使用异步执行方式来加速的方法,此种方法对于 IO 密集型任务非常有效。如将其应用到网络爬虫中,爬取效率甚至可以成百倍地提升。

3. 基本了解

在了解异步协程之前,我们首先得了解一些基础概念,如阻塞和非阻塞、同步和异步、多进程和协程。

3.1 阻塞

阻塞状态指程序未得到所需计算资源时被挂起的状态。程序在等待某个操作完成期间,自身无法继续处理其他的事情,则称该程序在该操作上是阻塞的。

常见的阻塞形式有:

  • 网络 I/O 阻塞
  • 磁盘 I/O 阻塞
  • 用户输入阻塞等。

阻塞是无处不在的,包括 CPU 切换上下文时,所有的进程都无法真正处理事情,它们也会被阻塞。如果是多核 CPU 则正在执行上下文切换操作的核不可被利用。

3.2 非阻塞

程序在等待某操作过程中,自身不被阻塞,可以继续处理其他的事情,则称该程序在该操作上是非阻塞的。

非阻塞并不是在任何程序级别、任何情况下都可以存在的。仅当程序封装的级别可以囊括独立的子程序单元时,它才可能存在非阻塞状态。

非阻塞的存在是因为阻塞存在,正因为某个操作阻塞导致的耗时与效率低下,我们才要把它变成非阻塞的。

3.3 同步

不同程序单元为了完成某个任务,在执行过程中需靠某种通信方式以协调一致,我们称这些程序单元是同步执行的。

例如购物系统中更新商品库存,需要用“行锁”作为通信信号,让不同的更新请求强制排队顺序执行,那更新库存的操作是同步的。

简言之,同步意味着有序。

3.4 异步

为完成某个任务,不同程序单元之间过程中无需通信协调,也能完成任务的方式,不相关的程序单元之间可以是异步的。

例如,爬虫下载网页。调度程序调用下载程序后,即可调度其他任务,而无需与该下载任务保持通信以协调行为。不同网页的下载、保存等操作都是无关的,也无需相互通知协调。这些异步操作的完成时刻并不确定。

简言之,异步意味着无序。

3.5 多进程

多进程就是利用 CPU 的多核优势,在同一时间并行地执行多个任务,可以大大提高执行效率。

3.6 协程

协程,英文叫作 Coroutine,又称微线程、纤程,协程是一种用户态的轻量级线程。

协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此协程能保留上一次调用时的状态,即所有局部状态的一个特定组合,每次过程重入时,就相当于进入上一次调用的状态。

协程本质上是个单进程,协程相对于多进程来说,无需线程上下文切换的开销,无需原子操作锁定及同步的开销,编程模型也非常简单。

我们可以使用协程来实现异步操作,比如在网络爬虫场景下,我们发出一个请求之后,需要等待一定的时间才能得到响应,但其实在这个等待过程中,程序可以干许多其他的事情,等到响应得到之后才切换回来继续处理,这样可以充分利用 CPU 和其他资源,这就是协程的优势。

3.7 事件循环

在学习 asyncio之前还需要知道这样的几个概念。

事件循环是一种处理多并发量的有效方式,在维基百科中它被描述为「一种等待程序分配事件或消息的编程架构」,我们可以定义事件循环来简化使用轮询方法来监控事件,通俗的说法就是「当 A 发生时,执行 B」

所谓的事件,其实就是函数。

事件循环,就是有一个队列,里面存放着一堆函数,从第一个函数开始执行,在函数执行的过程中,可能会有新的函数继续加入到这个队列中。一直到队列中所有的函数被执行完毕,并且再也不会有新的函数被添加到这个队列中,程序就结束了。

3.8 Future

Future 是一个数据结构,表示还未完成的工作结果。

事件循环可以监视 Future 对象是否完成。从而允许应用的一部分等待另一部分完成一些工作。

简单说,Future 就是一个类,用生成器实现了回调。

Future 的状态大概有如下几种:

  • Pending
  • Running
  • Done
  • Cancelled

创建 future 的时候,task 为 pending,事件循环调用执行的时候当然就是 running,调用完毕自然就是 done,如果需要停止事件循环,就需要先把 task 取消,状态为 cancel。这里先做了解知道 Task 是有状态的就够了。

3.9 Task

Task 是 Future 的一个子类,它知道如何包装和管理一个协程的执行。任务所需的资源可用时,事件循环会调度任务允许,并生成一个结果,从而可以由其他协程消费。一般操作最多的还是 Task。用 Task 来封装协程,给原本没有状态的协程增加一些状态。

3.10 awaitable objects(可等待对象)

如果一个对象可以用在 wait 表达式中,那么它就是一个可等待的对象。在 asyncio 模块中会一直提到这个概念,其中协程函数,Task,Future 都是 awaitable 对象。

用于 await 表达式中的对象。可以是 coroutine 也可以是实现了 __await__() 方法的对象,参见 PEP 492。类比于 Iterable 对象是 Generator 或实现了 __iter__() 方法的对象。

3.11 object._await_(self)

必须返回生成器,asyncio.Future 类也实现了该方法,用于兼容 await 表达式。

Task继承自 Future,因此 awaitable 对象有三种:coroutinesTasksFutures

await的目的:

  • 获取协程的结果
  • 挂起当前协程,将控制交由事件循环,切换到其他协程,然后等待结果,最后恢复协程继续执行

3.12 并发运行任务

一系列的协程可以通过 await 链式的调用,但是有的时候我们需要在一个协程里等待多个协程,比如我们在一个协程里等待 1000 个异步网络请求,对于访问次序有没有要求的时候,就可以使用另外的关键字 asyncio.waitasyncio.gather 来解决了。

3.12.1 asyncio.gather

使用方法:

asyncio.gather(*aws, loop=None, return_exceptions=False)

也就是说使用 gather 语句并发协程,就得用 await去执行它。

这个方法可以接收三个参数,第一个 aws

aws一般是一个列表,如果里面的元素是 awaitable类型,在运行的时候它将自动被包装成 Taskgather会根据 aws中元素添加的顺序。顺序执行并返回结果列表。

第二个 loop 可以传入一个事件循环对象,一般不用管,最后一个 return_exceptions 默认是 False,如果 return_exceptions 为 True,异常将被视为成功结果,然后添加到结果列表中。

下面是一个 10 个数字并输出的例子:

import asyncio

async def foo(num):
    return num
async def main():
    coro = [asyncio.create_task(foo(i)) for i in range(10) ]
    done= await asyncio.gather(*coro)
    print(done)
    for i in done:
        print(i)


if __name__ == '__main__':
    asyncio.run(main())

运行之后结果如下:

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
0
1
2
3
4
5
6
7
8
9

gather 返回的结果是一个列表,迭代这个列表可以看到任务依次输出。

gather 通常被用来阶段性的一个操作,做完第一步才能做第二步,比如下面这样:

import asyncio
import time

async def step1(n, start):
    await asyncio.sleep(n)
    print("第一阶段完成")
    print("此时用时", time.time() - start)
    return n


async def step2(n, start):
    await asyncio.sleep(n)
    print("第二阶段完成")
    print("此时用时", time.time() - start)
    return n

async def main():
    now = time.time()
    result = await asyncio.gather(step1(5, now), step2(2, now))
    for i in result:
        print(i)
    print("总用时", time.time() - now)


if __name__ == '__main__':
    asyncio.run(main())

输出内容:

第二阶段完成
此时用时 2.0001566410064697
第一阶段完成
此时用时 5.000334739685059
5
2
总用时 5.000334739685059

修改时间试一试看看结果:

import asyncio
import time

async def step1(n, start):
    await asyncio.sleep(n)
    print("第一阶段完成")
    print("此时用时", time.time() - start)
    return n


async def step2(n, start):
    await asyncio.sleep(n)
    print("第二阶段完成")
    print("此时用时", time.time() - start)
    return n

async def main():
    now = time.time()
    result = await asyncio.gather(step1(5, now), step2(10, now))
    for i in result:
        print(i)
    print("总用时", time.time() - now)


if __name__ == '__main__':
    asyncio.run(main())

可以通过上面结果得到如下结论:

  1. step1 和 step2 是并行运行的。
  2. gather 会等待最耗时的那个完成之后才返回结果,耗时总时间取决于其中任务最长时间的那个。

3.12.2 asyncio.wait

我们先看一下 wait的语法结构:

asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

wait 一共有 4 个参数,第一个参数 aws,一般是一个任务列表。

第二个*之后的都是强制关键字参数,即 looptimeoutreturn_when

loopgather的参数是一个事件循环,该参数计划在 Python 3.10 中删除。

timeout可以指定这组任务的超时时间,请注意,此函数不会引发 asyncio.TimeoutError,超时的时候会返回已完成的任务。

return_when可以指定什么条件下返回结果,默认是所以任务完成就返回结果列表。

return_when的具体参数看下面的表格:

参数名 含义
FIRST_COMPLETED 任何一个 future 完成或取消时返回
FIRST_EXCEPTION 任何一个 future 出现错误将返回,如果没有出现异常等价于 ALL_COMPLETED
ALL_COMPLETED 当所有任务完成或者被取消时返回结果,默认值。
import asyncio,time,aiohttp
from requests.exceptions import RequestException

start_time = time.time()
async def request(url):
    try:
        async with aiohttp.ClientSession() as session:
            html = await session.get(url)
            res = await html.text()
            # print(res)
    except RequestException:
        return None

# async def main(loop):
#   url = 'https://static4.scrape.cuiqingcai.com/'
#   tasks = [request(url) for _ in range(10)]
#   tasks = loop.create_task(tasks)


if __name__ == '__main__':
    url = 'https://static4.scrape.cuiqingcai.com/'
    loop = asyncio.get_event_loop()
    tasks = [asyncio.ensure_future(request(url)) for _ in range(10)]
    wait = loop.run_until_complete(asyncio.wait(tasks))
    print(type(wait))
    print(time.time() - start_time)

# 输出
<class 'tuple'>
6.529097557067871

wait 返回的结果是一个元组,第一部分是完成的任务,第二部分是准备中的任务。

done, pending = await asyncio.wait(aws)

其中 done 表示完成的任务,可以通过迭代获取每个任务。

pending 表示的是还没执行的任务。

下面看一个例子来进一步了解:

import asyncio

async def foo(num):
    await asyncio.sleep(0.99991)
    return num
async def main():
    #coro = foo()
    coro = [asyncio.create_task(foo(i)) for i in range(10) ]
    done, pending = await asyncio.wait(coro,timeout=1,return_when="ALL_COMPLETED")

    for coro in done:
        print(coro.result())
    print("pending",pending)
    for item in pending:
         print(item)    

if __name__ == '__main__':
    asyncio.run(main())

4. 协程用法

接下来,我们来了解下协程的实现,从 Python 3.4 开始,Python 中加入了协程的概念,但这个版本的协程还是以生成器对象为基础的,在 Python 3.5 则增加了 async/await,使得协程的实现更加方便。

Python 中使用协程最常用的库莫过于 asyncio,所以本文会以 asyncio 为基础来介绍协程的使用。

首先我们需要了解下面几个概念。

  • event_loop:事件循环,相当于一个无限循环,我们可以把一些函数注册到这个事件循环上,当满足条件发生的时候,就会调用对应的处理方法。
  • coroutine:中文翻译叫协程,在 Python 中常指代为协程对象类型,我们可以将协程对象注册到事件循环中,它会被事件循环调用。我们可以使用 async 关键字来定义一个方法,这个方法在调用时不会立即被执行,而是返回一个协程对象。
  • task:任务,它是对协程对象的进一步封装,包含了任务的各个状态。
  • future:代表将来执行或没有执行的任务的结果,实际上和 task 没有本质区别。

另外我们还需要了解 async/await 关键字,它是从 Python 3.5 才出现的,专门用于定义协程。其中,async 定义一个协程,await 用来挂起阻塞方法的执行。

4.1 定义协程

协程就是一个函数,只是它满足以下几个特征:

  • 依赖 I/O 操作(有 I/O 依赖的操作)
  • 可以在进行 I/O 操作时暂停
  • 无法直接运行

它的作用就是对有大量 I/O 操作的程序进行加速。

Python 协程属于可等待对象,因此可以在其他协程中被等待。

什么叫可等待对象?——await,如果前面被标记 await 就表明他是个协程,我们需要等待它返回一个数据。

# 代码示例 一
import asyncio
async def net():
    return 11
async def main():
    # net() # error
    await net() # right
asyncio.run(main())

import asyncio
async def net():
    return 11
async def main():
    # net() # error
    return await net() # right
print(asyncio.run(main()))

举个例子,我从网络上下载某个数据文件下载到我的本地电脑上,这很显然是一个 I/O 操作。比方这个文件较大(2GB),可能需要耗时 30min 才能下载成功。而在这 30min 里面,它会卡在 await 后面。这个 await 标记了协程,那就意味着它可以被暂停,那既然该任务可以被暂停,我们就把它分离出去。我这个线程继续执行其它任务,它这个 30min 分出去慢慢的传输,我这个程序再运行其他操作。

上面的代码,Python 3.6 会给你报错。报错信息如下:

Traceback (most recent call last):
  File "C:/Code/pycharm_daima/爬虫大师班/14-异步编程/test.py", line 26, in <module>
    asyncio.run(main())
AttributeError: module 'asyncio' has no attribute 'run'

为什么会出现这样的报错呢?

因为从 Python 3.7+ 之后 Python 已经完全支持异步了,Python 3.6 之前只是支持部分异步,许多的方法是非常冗长的。

一个异步函数调用另一个异步函数:

import asyncio
async def net():
    return 11
async def main():
    # net() # error
    await net() # right
asyncio.run(main())

tips:

异步主要做得是 I/O 类型,CPU 密集型就不需要使用异步。

一个异步调用另一个异步函数,不能直接被调用,必须添加 await

我们使用代码验证一下,不加 await 调用试一试:

import asyncio

async def net():
    return 11
async def main():
    net() # error
asyncio.run(main())

输出结果:

C:/Code/pycharm_daima/爬虫大师班/14-异步编程/test.py:31: RuntimeWarning: coroutine 'net' was never awaited
  net() # error
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

我们添加上 await 即可正常运行:

import asyncio

async def net():
    return 11
async def main():
    # net() # error
    await net() # right
asyncio.run(main())

运行结果:

C:\Users\clela\AppData\Local\Programs\Python\Python37\python.exe C:/Code/pycharm_daima/异步编程/test.py

Process finished with exit code 0

运行成功并没有报错,接下来我们要输出得到的结果该怎么编写代码呢?直接赋值即可:

import asyncio

async def net():
    return 11
async def main():
    # net() # error
    a = await net() # right
    print(a)
asyncio.run(main())

# 输出结果:
11

Ps:async 标记异步,await 标记等待。

如果我们不想使用 await 来运行异步函数,那这个时候我们就可以按如下方法来运行代码:

import asyncio

async def net():
    return 11

async def main():
    task = asyncio.create_task(net())
    await task # right

asyncio.run(main())

首先我们来定义一个协程,体验一下它和普通进程在实现上的不同之处,代码如下:

# 代码示例二
import asyncio
async def execute(x):
    print('Number:', x)

coroutine = execute(1)
print('Coroutine:', coroutine)
print('After calling execute')

loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)
print('After calling loop')

代码示例二中,我们首先引入了 asyncio这个包,这样我们才可以使用 asyncawait,然后我们使用 async定义了一个 execute方法,方法接收一个数字参数,方法执行之后会打印这个数字。

随后我们直接调用了这个方法,然而这个方法并没有执行,而是返回了一个 coroutine协程对象。

随后我们使用 get_event_loop方法创建了一个事件循环 loop,并调用了 loop对象的 run_until_complete方法将协程注册到事件循环 loop中,然后启动。最后我们才看到了 execute方法打印了输出结果。

可见,async定义的方法就会变成一个无法直接执行的 coroutine对象,必须将其注册到事件循环中才可以执行。

4.2 Task 显式地进行声明

上面我们还提到了 task,它是对 coroutine 对象的进一步封装,它里面相比 coroutine 对象多了运行状态,比如 running、finished 等,我们可以用这些状态来获取协程对象的执行情况。

在上面的例子中,当我们将 coroutine 对象传递给 run_until_complete 方法的时候,实际上它进行了一个操作就是将 coroutine 封装成了 task 对象,我们也可以显式地进行声明,如下所示:

"""
project = 'Code', file_name = 'yibudaima', author = 'AI悦创'
time = '2020/4/22 19:24', product_name = PyCharm, 公众号:AI悦创
# code is far away from bugs with the god animal protecting
    I love animals. They taste delicious.
"""
import asyncio

async def execute(x):
    print('Number:', x)
    return x

coroutine = execute(1)
print('Coroutine:', coroutine)
print('After calling execute')
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print('Task:', task)
loop.run_until_complete(task)
print('Task:', task)
# print('Task:', task.result())
print('After calling loop')

运行结果:

Coroutine: <coroutine object execute at 0x10e0f7830>
After calling execute
Task: <Task pending coro=<execute() running at demo.py:4>>
Number: 1
Task: <Task finished coro=<execute() done, defined at demo.py:4> result=1>
After calling loop

这里我们定义了 loop 对象之后,接着调用了它的 create_task 方法将 coroutine 对象转化为了 task 对象,随后我们打印输出一下,发现它是 pending 状态。接着我们将 task 对象添加到事件循环中得到执行,随后我们再打印输出一下 task 对象,发现它的状态就变成了 finished,同时还可以看到其 result 变成了 1,也就是我们定义的 execute 方法的返回结果。

4.3 使用 ensure_future 定义,返回的也是 task

另外定义 task 对象还有一种方式,就是直接通过 asyncio 的 ensure_future 方法,返回结果也是 task 对象,这样的话我们就可以不借助于 loop 来定义,即使我们还没有声明 loop 也可以提前定义好 task 对象,写法如下:

"""
project = 'Code', file_name = 'lession.py', author = 'AI悦创'
time = '2020/4/23 10:18', product_name = PyCharm, 公众号:AI悦创
# code is far away from bugs with the god animal protecting
    I love animals. They taste delicious.
"""
import asyncio
async def execute(x):
    print('Number:', x)
    return x
coroutine = execute(1)
print('Coroutine:', coroutine)
print('After calling execute')
task = asyncio.ensure_future(coroutine)
print('Task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
print('After calling loop')

运行结果:

Coroutine: <coroutine object execute at 0x10aa33830>
After calling execute
Task: <Task pending coro=<execute() running at demo.py:4>>
Number: 1
Task: <Task finished coro=<execute() done, defined at demo.py:4> result=1>
After calling loop

发现其运行效果都是一样的。

4.3.1 创建 task 总结

  1. 以下代码都是异步函数

  2. loop = asyncio.get_event_loop()

    task = loop.create_task(coroutine) # 需要提前声明 loop

  3. task = asyncio.create_task(net())

  4. task = asyncio.ensure_future(coroutine) # 不需要提前声明

4.4 绑定回调

另外我们也可以为某个 task 绑定一个回调方法,比如我们来看下面的例子:

"""
project = 'Code', file_name = 'lession.py', author = 'AI悦创'
time = '2020/4/23 10:18', product_name = PyCharm, 公众号:AI悦创
# code is far away from bugs with the god animal protecting
    I love animals. They taste delicious.
"""
import asyncio
import requests


async def request():
    url = 'https://www.baidu.com'
    status = requests.get(url)
    return status


def callback(task):
    print('Status:', task.result())


coroutine = request()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
print('Task:', task)

loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)

在这里我们定义了一个 request 方法,请求了百度,获取其状态码,但是这个方法里面我们没有任何 print 语句。随后我们定义了一个 callback 方法,这个方法接收一个参数,是 task 对象,然后调用 print 方法打印了 task 对象的结果。这样我们就定义好了一个 coroutine 对象和一个回调方法,我们现在希望的效果是,当 coroutine 对象执行完毕之后,就去执行声明的 callback 方法。

那么它们二者怎样关联起来呢?

很简单,只需要调用 add_done_callback方法即可,我们将 callback 方法传递给了封装好的 task 对象,这样当 task 执行完毕之后就可以调用 callback 方法了,同时 task 对象还会作为参数传递给 callback 方法,调用 task 对象的 result 方法就可以获取返回结果了。

运行结果:

Task: <Task pending coro=<request() running at demo.py:5> cb=[callback() at demo.py:11]>
Status: <Response [200]>
Task: <Task finished coro=<request() done, defined at demo.py:5> result=<Response [200]>>

实际上不用回调方法,直接在 task 运行完毕之后也可以直接调用 result 方法获取结果,如下所示:

import asyncio
import requests

async def request():
   url = 'https://www.baidu.com'
   status = requests.get(url)
   return status

coroutine = request()
task = asyncio.ensure_future(coroutine) # 分配任务
print('Task:', task) # 当前任务状态

loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
print('Task Result:', task.result())

运行结果是一样的:

Task: <Task pending coro=<request() running at demo.py:4>>
Task: <Task finished coro=<request() done, defined at demo.py:4> result=<Response [200]>>
Task Result: <Response [200]>

4.5 多任务协程(创建 tasks 列表)

上面的例子我们只执行了一次请求,如果我们想执行多次请求应该怎么办呢?我们可以定义一个 task 列表,然后使用 asyncio 的 wait 方法即可执行,看下面的例子:

import asyncio
import requests

async def request():
   url = 'https://www.baidu.com'
   status = requests.get(url)
   return status

tasks = [asyncio.ensure_future(request()) for _ in range(5)]
print('Tasks:', tasks)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:
   print('Task Result:', task.result())

这里我们使用一个 for 循环创建了五个 task,组成了一个列表,然后把这个列表首先传递给了 asyncio 的 wait() 方法,然后再将其注册到时间循环中,就可以发起五个任务了。最后我们再将任务的运行结果输出出来,运行结果如下:

Tasks: [<Task pending coro=<request() running at demo.py:5>>, 
<Task pending coro=<request() running at demo.py:5>>, 
<Task pending coro=<request() running at demo.py:5>>, 
<Task pending coro=<request() running at demo.py:5>>, 
<Task pending coro=<request() running at demo.py:5>>]

Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>

可以看到五个任务被顺次执行了,并得到了运行结果。

5. 协程实现

5.1 协程与异步

接下来,我们用睡眠来模仿一下耗时的 IO 操作。

import asyncio

# 定义异步函数

async def hello(i):
    print('hello', i)
    await asyncio.sleep(3) # 假设我们下载文件需要3s
    print('world', i)

if __name__ == '__main__':
    tasks = []
    for i in range(4):
        tasks.append(hello(i)) # 把要下载请求的页面放入我们的 tasks,然后交给 asyncio 处理
    loop = asyncio.get_event_loop() # 获取时间循环
    loop.run_until_complete(asyncio.wait(tasks)) # run_until_complete:把所有程序都运行完毕,然后再停止运行。
    loop.close()

输出结果:

hello 3
hello 1
hello 0
hello 2
world 3
world 0
world 1
world 2

tips:

注意区别 time.sleep() 这个是不能使用到异步里面的 sleep,如果你直接用 time 模块里面的 sleep ,那代码是真正睡眠了,不会执行其他任务了。所以需要使用 asyncio.sleep() 的睡眠才可以。requests 包也是同理,所以接下来我会给大家讲解一个新的包(aiohttp),我们将用 aiohttp 来代替 requests。

接下来我们来分析一下输出结果:

hello 3 # 当程序执行在这个任务时需要 3s 的时间,所以进入等待,然后继续执行下一个任务
hello 1 # 当上一个任务在等待的时候,这个任务在也遇到了要等待 3s ,接着执行下一个任务,以此类推。
hello 0
hello 2
world 3 # 当任务等待完成(恢复)那 world 就输出出来了)
world 0
world 1
world 2

这时候细心的小伙伴有可能会说,我们添加任务进去的时候是 0、1、2、3,可是在执行的时候却是 3、1、0、2这就是我上面说的异步是不可控,随机的。

小结:

我在使用异步的时候,上面一共说到了三种:

执行单个任务:

  1. await执行异步
  2. asyncio.create_task(function)

执行多个任务:

  1. 获取事件循环:loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.wait(list))

5.2 异步爬虫

pip install aiohttp

抓取目标网站:百思不得姐

import asyncio
import aiohttp
from bs4 import BeautifulSoup
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.116 Safari/537.36',
}

async def crawl(i):
    url = f'http://www.budejie.com/{i}'
    async with aiohttp.ClientSession(headers = headers)as session:
        async with session.get(url)as response:
            print(response.status)
            text = await response.text()
            print('start', i)
    soup = BeautifulSoup(text, 'lxml')
    lis = soup.select(".j-r-list ul li div .u-txt a")
    for li in lis:
        print(li.get_text())
if __name__ == '__main__':
    tasks = [crawl(i) for i in range(1, 10)]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

运行结果:

C:\Users\clela\AppData\Local\Programs\Python\Python37\python.exe C:/Code/pycharm_daima/爬虫大师班/14-异步编程/异步爬虫实战.py
200
start 6
怀疑人生 
金月 
游先生 
加强 
南南 
心之痕 
怀疑人生 
能认真点吗 
雨婷思梦 
原装正版无添加 
随便了 
滒特 誃瑙菏 
糖水菠萝 
诠忄 
知鱼之乐 
墨染锦年 
.
.
.
.
.
.

懒洋洋 
死神小一生 
圆圆呐 
仙境里的童话 
汪坚他爹是我 
嘘呀 
路上城静 
顾蒙蒙 
Pescado 

Process finished with exit code 0

补充:

if __name__ == '__main__':
    tasks = [crawl(i) for i in range(1, 10)]
    loop = asyncio.get_event_loop()
    # 方法一:
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
    # 方法二:
    loop.run_until_complete(asyncio.gather(*tasks))
    loop.close()
    # 方法三:
    for task in tasks:
        loop.run_until_complete(asyncio.gather(task))
    loop.close()

前面讲了这么多,又是 async,又是 coroutine,又是 task,又是 callback,但似乎并没有看出协程的优势啊?反而写法上更加奇怪和麻烦了,别急,上面的案例只是为后面的使用作铺垫,接下来我们正式来看下协程在解决 IO 密集型任务上有怎样的优势吧!

上面的代码中,我们用一个网络请求作为示例,这就是一个耗时等待的操作,因为我们请求网页之后需要等待页面响应并返回结果。耗时等待的操作一般都是 IO 操作,比如文件读取、网络请求等等。

协程对于处理这种操作是有很大优势的,当遇到需要等待的情况的时候,程序可以暂时挂起,转而去执行其他的操作,从而避免一直等待一个程序而耗费过多的时间,充分利用资源。

为了表现出协程的优势,我们还是拿本课时开始介绍的网站 https://static4.scrape.cuiqingcai.com/ 为例来进行演示,因为该网站响应比较慢,所以我们可以通过爬取时间来直观地感受到爬取速度的提升。

为了让你更好地理解协程的正确使用方法,这里我们先来看看使用协程时常犯的错误,后面再给出正确的例子来对比一下。

首先,我们还是拿之前的 requests 来进行网页请求,接下来我们再重新使用上面的方法请求一遍:

import asyncio
import requests
import time

start = time.time()

async def request():
   url = 'https://static4.scrape.cuiqingcai.com/'
   print('Waiting for', url)
   response = requests.get(url)
   print('Get response from', url, 'response', response)


tasks = [asyncio.ensure_future(request()) for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print('Cost time:', end - start)

在这里我们还是创建了 10 个 task,然后将 task 列表传给 wait 方法并注册到时间循环中执行。

运行结果如下:

Waiting for https://static4.scrape.cuiqingcai.com/
Get response from https://static4.scrape.cuiqingcai.com/ response <Response [200]>
Waiting for https://static4.scrape.cuiqingcai.com/
Get response from https://static4.scrape.cuiqingcai.com/ response <Response [200]>
Waiting for https://static4.scrape.cuiqingcai.com/
Get response from https://static4.scrape.cuiqingcai.com/ response <Response [200]>
Waiting for https://static4.scrape.cuiqingcai.com/
Get response from https://static4.scrape.cuiqingcai.com/ response <Response [200]>
Waiting for https://static4.scrape.cuiqingcai.com/
Get response from https://static4.scrape.cuiqingcai.com/ response <Response [200]>
Waiting for https://static4.scrape.cuiqingcai.com/
Get response from https://static4.scrape.cuiqingcai.com/ response <Response [200]>
Waiting for https://static4.scrape.cuiqingcai.com/
Get response from https://static4.scrape.cuiqingcai.com/ response <Response [200]>
Waiting for https://static4.scrape.cuiqingcai.com/
Get response from https://static4.scrape.cuiqingcai.com/ response <Response [200]>
Waiting for https://static4.scrape.cuiqingcai.com/
Get response from https://static4.scrape.cuiqingcai.com/ response <Response [200]>
Waiting for https://static4.scrape.cuiqingcai.com/
Get response from https://static4.scrape.cuiqingcai.com/ response <Response [200]>
Cost time: 54.17627549171448

可以发现和正常的请求并没有什么两样,依然还是顺次执行的,耗时 51 秒,平均一个请求耗时 5 秒,说好的异步处理呢?

其实,要实现异步处理,我们得先要有挂起的操作,当一个任务需要等待 IO 结果的时候,可以挂起当前任务,转而去执行其他任务,这样我们才能充分利用好资源,上面方法都是一本正经的串行走下来,连个挂起都没有,怎么可能实现异步?想太多了。

要实现异步,接下来我们需要了解一下 await 的用法,使用 await 可以将耗时等待的操作挂起,让出控制权。当协程执行的时候遇到 await,时间循环就会将本协程挂起,转而去执行别的协程,直到其他的协程挂起或执行完毕。

所以,我们可能会将代码中的 request 方法改成如下的样子:

async def request():
   url = 'https://static4.scrape.cuiqingcai.com/'
   print('Waiting for', url)
   response = await requests.get(url)
   print('Get response from', url, 'response', response)

仅仅是在 requests 前面加了一个 await,然而执行以下代码,会得到如下报错:

C:\Users\clela\AppData\Local\Programs\Python\Python37\python.exe C:/Code/pycharm_daima/实战项目代码/上课代码/lession.py
Waiting for https://static4.scrape.cuiqingcai.com/
Waiting for https://static4.scrape.cuiqingcai.com/
Waiting for https://static4.scrape.cuiqingcai.com/
Waiting for https://static4.scrape.cuiqingcai.com/
Waiting for https://static4.scrape.cuiqingcai.com/
Waiting for https://static4.scrape.cuiqingcai.com/
Waiting for https://static4.scrape.cuiqingcai.com/
Waiting for https://static4.scrape.cuiqingcai.com/
Waiting for https://static4.scrape.cuiqingcai.com/
Waiting for https://static4.scrape.cuiqingcai.com/
Cost time: 118.57068157196045
During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\clela\AppData\Local\Programs\Python\Python37\lib\site-packages\urllib3\connectionpool.py", line 672, in urlopen
    chunked=chunked,
  File "C:\Users\clela\AppData\Local\Programs\Python\Python37\lib\site-packages\urllib3\connectionpool.py", line 376, in _make_request
    self._validate_conn(conn)
  File "C:\Users\clela\AppData\Local\Programs\Python\Python37\lib\site-packages\urllib3\connectionpool.py", line 994, in _validate_conn
    conn.connect()
  File "C:\Users\clela\AppData\Local\Programs\Python\Python37\lib\site-packages\urllib3\connection.py", line 300, in connect
    conn = self._new_conn()
  File "C:\Users\clela\AppData\Local\Programs\Python\Python37\lib\site-packages\urllib3\connection.py", line 169, in _new_conn
    self, "Failed to establish a new connection: %s" % e
urllib3.exceptions.NewConnectionError: <urllib3.connection.VerifiedHTTPSConnection object at 0x00000202B1C3A5C8>: Failed to establish a new connection: [WinError 10060] 由于连接方在一段时间后没有正确答复或连接的主机没有反应,连接尝试失败。

Traceback (most recent call last):
  File "C:/Code/pycharm_daima/实战项目代码/上课代码/lession.py", line 17, in request
    response = await requests.get(url)
TypeError: object Response can't be used in 'await' expression
Task exception was never retrieved
future: <Task finished coro=<request() done, defined at C:/Code/pycharm_daima/实战项目代码/上课代码/lession.py:14> exception=TypeError("object Response can't be used in 'await' expression")>
Traceback (most recent call last):
  File "C:/Code/pycharm_daima/实战项目代码/上课代码/lession.py", line 17, in request
    response = await requests.get(url)
TypeError: object Response can't be used in 'await' expression

这次它遇到 await 方法确实挂起了,也等待了,但是最后却报了这么个错,这个错误的意思是 requests 返回的 Response 对象不能和 await 一起使用,为什么呢?因为根据官方文档说明,await 后面的对象必须是如下格式之一:

  • A native coroutine object returned from a native coroutine function,一个原生 coroutine对象。
  • A generator-based coroutine object returned from a function decorated with types.coroutine,一个由 types.coroutine 修饰的生成器,这个生成器可以返回 coroutine对象。
  • An object with an __await__ method returning an iterator,一个包含 __await__ 方法的对象返回的一个迭代器。

可以参见:https://www.python.org/dev/peps/pep-0492/#await-expression。

requests 返回的 Response 不符合上面任一条件,因此就会报上面的错误了。

那么你可能会发现,既然 await 后面可以跟一个 coroutine 对象,那么我用 async把请求的方法改成 coroutine 对象不就可以了吗?所以就改写成如下的样子:

import asyncio
import requests
import time

start = time.time()

async def get(url):
   return requests.get(url)

async def request():
   url = 'https://static4.scrape.cuiqingcai.com/'
   print('Waiting for', url)
   response = await get(url)
   print('Get response from', url, 'response', response)

tasks = [asyncio.ensure_future(request()) for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print('Cost time:', end - start)

这里我们将请求页面的方法独立出来,并用 async 修饰,这样就得到了一个 coroutine 对象,我们运行一下看看:

Waiting for https://static4.scrape.cuiqingcai.com/
Get response from https://static4.scrape.cuiqingcai.com/ response <Response [200]>
Waiting for https://static4.scrape.cuiqingcai.com/
Get response from https://static4.scrape.cuiqingcai.com/ response <Response [200]>
Waiting for https://static4.scrape.cuiqingcai.com/
...
Get response from https://static4.scrape.cuiqingcai.com/ response <Response [200]>
Waiting for https://static4.scrape.cuiqingcai.com/
Get response from https://static4.scrape.cuiqingcai.com/ response <Response [200]>
Waiting for https://static4.scrape.cuiqingcai.com/
Get response from https://static4.scrape.cuiqingcai.com/ response <Response [200]>
Cost time: 51.394437756259273

还是不行,它还不是异步执行,也就是说我们仅仅将涉及 IO 操作的代码封装到 async 修饰的方法里面是不可行的!

我们必须要使用支持异步操作的请求方式才可以实现真正的异步,所以这里就需要 aiohttp 派上用场了。

6. 使用 aiohttp

aiohttp是一个支持异步请求的库,利用它和 asyncio配合我们可以非常方便地实现异步请求操作。

安装方式如下:

pip3 install aiohttp

官方文档链接为:https://aiohttp.readthedocs.io/,它分为两部分,一部分是 Client,一部分是 Server,详细的内容可以参考官方文档。

下面我们将 aiohttp用上来,将代码改成如下样子:

import asyncio
import aiohttp
import time

start = time.time()

async def get(url):
   session = aiohttp.ClientSession()
   response = await session.get(url)
   await response.text()
   await session.close()
   return response

async def request():
   url = 'https://static4.scrape.cuiqingcai.com/'
   print('Waiting for', url)
   response = await get(url)
   print('Get response from', url, 'response', response)

tasks = [asyncio.ensure_future(request()) for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print('Cost time:', end - start)

在这里我们将请求库由 requests 改成了 aiohttp,通过 aiohttp 的 ClientSession 类的 get 方法进行请求,结果如下:

Waiting for https://static4.scrape.cuiqingcai.com/
Waiting for https://static4.scrape.cuiqingcai.com/
Waiting for https://static4.scrape.cuiqingcai.com/
Waiting for https://static4.scrape.cuiqingcai.com/
Waiting for https://static4.scrape.cuiqingcai.com/
Waiting for https://static4.scrape.cuiqingcai.com/
Waiting for https://static4.scrape.cuiqingcai.com/
Waiting for https://static4.scrape.cuiqingcai.com/
Waiting for https://static4.scrape.cuiqingcai.com/
Waiting for https://static4.scrape.cuiqingcai.com/
Get response from https://static4.scrape.cuiqingcai.com/ response <ClientResponse(https://static4.scrape.cuiqingcai.com/) [200 OK]>
<CIMultiDictProxy('Server': 'nginx/1.17.8', 'Date': 'Tue, 31 Mar 2020 09:35:43 GMT', 'Content-Type': 'text/html; charset=utf-8', 'Transfer-Encoding': 'chunked', 'Connection': 'keep-alive', 'Vary': 'Accept-Encoding', 'X-Frame-Options': 'SAMEORIGIN', 'Strict-Transport-Security': 'max-age=15724800; includeSubDomains', 'Content-Encoding': 'gzip')>
...
Get response from https://static4.scrape.cuiqingcai.com/ response <ClientResponse(https://static4.scrape.cuiqingcai.com/) [200 OK]>
<CIMultiDictProxy('Server': 'nginx/1.17.8', 'Date': 'Tue, 31 Mar 2020 09:35:44 GMT', 'Content-Type': 'text/html; charset=utf-8', 'Transfer-Encoding': 'chunked', 'Connection': 'keep-alive', 'Vary': 'Accept-Encoding', 'X-Frame-Options': 'SAMEORIGIN', 'Strict-Transport-Security': 'max-age=15724800; includeSubDomains', 'Content-Encoding': 'gzip')>
Cost time: 6.1102519035339355

成功了!我们发现这次请求的耗时由 51 秒变直接成了 6 秒,耗费时间减少了非常非常多。

还可以使用如下代码请求:

import time
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor

import requests
from requests.exceptions import RequestException



def requests_fun(url):
    headers = {
        'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.130 Safari/537.36',
    }
    session = requests.Session()
    session.headers = headers
    try:
        html = session.get(url)
        if html.status_code == 200:
            return html.text
        return None
    except RequestException:
        return None


if __name__ == '__main__':
    url = 'https://static4.scrape.cuiqingcai.com/'
    urls = [url for _ in range(10)]
    start_time = time.time()
    # with ProcessPoolExecutor(max_workers=10) as executor:
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = [executor.submit(requests_fun, url) for url in urls]

    print(time.time() - start_time)

输出结果:

6.639995336532593

当然,我推荐你使用 with 方法:

Client example:

import aiohttp
import asyncio

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, 'http://python.org')
        print(html)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Server example:

from aiohttp import web

async def handle(request):
    name = request.match_info.get('name', "Anonymous")
    text = "Hello, " + name
    return web.Response(text=text)

app = web.Application()
app.add_routes([web.get('/', handle),
                web.get('/{name}', handle)])

if __name__ == '__main__':
    web.run_app(app)

所以,上面的代码我们可以改写成如下内容:

"""
project = 'Code', file_name = 'yibudaima', author = 'AI悦创'
time = '2020/4/22 19:24', product_name = PyCharm, 公众号:AI悦创
# code is far away from bugs with the god animal protecting
    I love animals. They taste delicious.
"""
import asyncio
import aiohttp
import time

start = time.time()


async def get(url):
    # session = aiohttp.ClientSession() # Client example,创建客户端
    async with aiohttp.ClientSession() as session:
        response = await session.get(url)
        await response.text()
        await session.close()
        return response


async def request():
    url = 'https://static4.scrape.cuiqingcai.com/'
    print('Waiting for', url)
    response = await get(url)
    print('Get response from', url, 'response', response)


tasks = [asyncio.ensure_future(request()) for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print('Cost time:', end - start)

接下来,我们继续:

代码里面我们使用了 await,后面跟了 get 方法,在执行这 10 个协程的时候,如果遇到了 await,那么就会将当前协程挂起,转而去执行其他的协程,直到其他的协程也挂起或执行完毕,再进行下一个协程的执行。

开始运行时,事件循环会运行第一个 task,针对第一个 task 来说,当执行到第一个 await 跟着的 get 方法时,它被挂起,但这个 get 方法第一步的执行是非阻塞的,挂起之后立马被唤醒,所以立即又进入执行,创建了 ClientSession对象,接着遇到了第二个 await,调用了 session.get 请求方法,然后就被挂起了,由于请求需要耗时很久,所以一直没有被唤醒。

当第一个 task 被挂起了,那接下来该怎么办呢?

事件循环会寻找当前未被挂起的协程继续执行,于是就转而执行第二个 task 了,也是一样的流程操作,直到执行了第十个 task 的 session.get 方法之后,全部的 task 都被挂起了。

所有 task 都已经处于挂起状态,怎么办?

只好等待了。

5 秒之后,几个请求几乎同时都有了响应,然后几个 task 也被唤醒接着执行,输出请求结果,最后总耗时,6 秒!

怎么样?这就是异步操作的便捷之处,当遇到阻塞式操作时,任务被挂起,程序接着去执行其他的任务,而不是傻傻地等待,这样可以充分利用 CPU 时间,而不必把时间浪费在等待 IO 上。

你可能会说,既然这样的话,在上面的例子中,在发出网络请求后,既然接下来的 5 秒都是在等待的,在 5 秒之内,CPU 可以处理的 task 数量远不止这些,那么岂不是我们放 10 个、20 个、50 个、100 个、1000 个 task 一起执行,最后得到所有结果的耗时不都是差不多的吗?因为这几个任务被挂起后都是一起等待的。

理论来说确实是这样的,不过有个前提,那就是服务器在同一时刻接受无限次请求都能保证正常返回结果,也就是服务器无限抗压,另外还要忽略 IO 传输时延,确实可以做到无限 task 一起执行且在预想时间内得到结果。但由于不同服务器处理的实现机制不同,可能某些服务器并不能承受这么高的并发,因此响应速度也会减慢。

在这里我们以百度为例,来测试下并发数量为 1、3、5、10、...、500 的情况下的耗时情况,代码如下:

# -*- coding: utf-8 -*-
# @Author: clela
# @Date:   2020-04-24 22:34:17
# @Last Modified by:   clela
# @Last Modified time: 2020-04-24 23:54:49
import asyncio
import aiohttp
import time


def test(number):
    start = time.time()

    async def get(url):
        # session = aiohttp.ClientSession()
        async with aiohttp.ClientSession() as session:
            response = await session.get(url)
            await response.text()
            await session.close()
            return response

    async def request():
        url = 'https://www.baidu.com/'
        await get(url)

    tasks = [asyncio.ensure_future(request()) for _ in range(number)]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))

    end = time.time()
    print('Number:', number, 'Cost time:', end - start)

for number in [1, 3, 5, 10, 15, 30, 50, 75, 100, 200, 500]:
    test(number)

运行结果如下:

Number: 1 Cost time: 0.05885505676269531
Number: 3 Cost time: 0.05773782730102539
Number: 5 Cost time: 0.05768704414367676
Number: 10 Cost time: 0.15174412727355957
Number: 15 Cost time: 0.09603095054626465
Number: 30 Cost time: 0.17843103408813477
Number: 50 Cost time: 0.3741800785064697
Number: 75 Cost time: 0.2894289493560791
Number: 100 Cost time: 0.6185381412506104
Number: 200 Cost time: 1.0894129276275635
Number: 500 Cost time: 1.8213098049163818

可以看到,即使我们增加了并发数量,但在服务器能承受高并发的前提下,其爬取速度几乎不太受影响。

综上所述,使用了异步请求之后,我们几乎可以在相同的时间内实现成百上千倍次的网络请求,把这个运用在爬虫中,速度提升是非常可观的。

那到这里,同学们已经掌握了:多线程、多进程、线程池、进程池、异步。那有同学可能会问:可不可以把这几个方法结合起来呢?

那我告诉你们的是,异步只能用异步的方法执行,不过大家是否记得 concurrent.future 模块呢?这个模块是底层是 异步,所以这也是我接下来所要说的。

7. 异步使用线程池与进程池

Concurrent.futures 这个模块可以和异步连接,具有线程池和进程池。管理并发编程,处理非确定性的执行流程,同步功能。

使用 requests 的异步

我们先导入所需要的库:

import asyncio, requests
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# ThreadPoolExecutor :线程池
# ProcessPoolExecutor:进程池
from bs4 import BeautifulSoup
"""
project = 'Code', file_name = '使用requests的异步', author = 'AI悦创'
time = '2020/3/10 20:34', product_name = PyCharm
# code is far away from bugs with the god animal protecting
    I love animals. They taste delicious.
              ┏┓      ┏┓
            ┏┛┻━━━┛┻┓
            ┃        ┃
            ┃  ┳┛  ┗┳  ┃
            ┃      ┻      ┃
            ┗━┓      ┏━┛
                ┃      ┗━━━┓
                ┃  神兽保佑    ┣┓
                ┃ 永无BUG!   ┏┛
                ┗┓┓┏━┳┓┏┛
                  ┃┫┫  ┃┫┫
                  ┗┻┛  ┗┻┛
"""
import asyncio, requests,aiohttp
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from bs4 import BeautifulSoup
from requests.exceptions import RequestException

headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.116 Safari/537.36',
}

def crawl(i):
    url = f'http://www.budejie.com/{i}'
    try:
        html = requests.get(url, headers=headers)
        if html.status_code == 200:
            soup = BeautifulSoup(html.text, 'lxml')
            lis = soup.select(".j-r-list ul li div .u-txt a")
            for li in lis:
                print(li.get_text())
        return "ok"
    except RequestException:
        return None

async def main():
    loop = asyncio.get_event_loop() # 获取循环事件
    tasks = []
    with ThreadPoolExecutor(max_workers=10)as t:
        # 10 个线程,10 个任务
        for i in range(1, 10):
            tasks.append(loop.run_in_executor(t, crawl, i))
    #       task.append(loop.run_in_executor(放入你的线程,爬虫函数,爬虫函数参数)

    # 以下代码可以不写
    # await asyncio.wait(tasks)
    # for result in await asyncio.wait(tasks):
    #   print(result)# 当你执行的爬虫函数有返回信息时使用
    #   pass

if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()
    print(time.time() - start_time)

8. 一般函数使用 concurrent

import asyncio, requests,aiohttp
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from bs4 import BeautifulSoup
from requests.exceptions import RequestException

headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.116 Safari/537.36',
}

def crawl(i):
    url = f'http://www.budejie.com/{i}'
    try:
        html = requests.get(url, headers=headers)
        if html.status_code == 200:
            soup = BeautifulSoup(html.text, 'lxml')
            lis = soup.select(".j-r-list ul li div .u-txt a")
            for li in lis:
                pass
            #   print(li.get_text())
        return "ok"
    except RequestException:
        return None

if __name__ == '__main__':
    start_time_1 = time.time()
    for i in range(1, 10):
        crawl(i)
    print("单线程时间:>>>", time.time() - start_time_1)

    start_time_2 = time.time()
    with ThreadPoolExecutor(max_workers=10)as t:
        for i in range(1, 10):
            t.submit(crawl, i)
    print("线程池时间:>>>", time.time() - start_time_2)

    start_time_3 = time.time()
    with ProcessPoolExecutor(max_workers=10)as t:
        for i in range(1, 10):
            t.submit(crawl, i)
    print("进程池时间:>>>", time.time() - start_time_3)

输出结果:

单线程时间:>>> 2.1695995330810547
线程池时间:>>> 0.5049772262573242
进程池时间:>>> 0.920097827911377

我们来分析一下输出结果,我们会分析进程池花费的时间会比线程池更多,这是为什么呢?

  1. 多线程非常适合 I/O 密集型,不适合 CPU 密集型;
  2. 进程池创建销毁的资源开销大,创建一个进程所耗费的资源要比创建一个线程耗费的时间大很多,销毁它也需要很长的时间。(准备工作非常多)

9. 分布式 Python 编程

大型项目的分布式架构所用的 Celery,当我们使用分布式编程的时候就是要在很多机器上运行,就不止在你一台电脑上运行了,也就是一个企业中的大型机房,然后进行任务分发。

分布式计算基本思想:

分布式计算是一种计算方法,和集中式计算是相对的。

随着计算技术的发展,有些应用需要非常巨大的计算能力才能完成,如果采用集中式计算,需要耗费相当长的时间来完成。

分布式计算将该应用分解成许多小的部分,分配给多台计算机进行处理。这样可以节约整体计算时间,大大提高计算效率。

而我们所说的分布式思想是将一个大任务 分散成几个小任务,交给 分布式网络中的计算机去完成。在分布式计算的环境中,必须保证网络中的计算机的可用性(避免网络延迟、非预知的崩溃等)。所以就需要可以可持续的监控架构。

有一个分布式系统基础特征产生问题:网络由不同操作系统的计算机组成,很多互不兼容。

所以有了兼容不同环境的框架,比如 Celery。

image-20200310224044349.png

比如你写了一个爬虫程序,你可以提交许多任务,这里假如你创建了10个 URL。然后,把 URL 提交到 Celery 中的 Broker,而 Broker 之后会维护一个队列,把你提交的任务存储下来。之后它会把不同的任务分发给不同的机器上的不同电脑,运行程序去执行这些任务。

这个在什么时候用的多呢?

用的最多的时候是人工智能网站,比如某人工智能网站现在又10万人进行访问了(提交任务上去)也可以说是比较耗时的操作。详细来说,假设这是个购物网站,你下了一个订单需要2~3s 进行处理,也就是很多人提交订单。到时候它把很多订单交给 Broker,之后 Broker 会分发其他服务器处理,这样就能提高网站的并发能力。一两个人操作你的网站,你一台服务器还可以,但是几千上网就需要许多台服务器进行操作了。

10. Celery 环境

image-20200310225702120.png


我们接下来看一下 Celery 如何使用。

因为 Celery 依赖于任务分发程序, 那就是一个数据库,我们一般选择 redis(直接在我们的 CPU 内存上执行速度、分发速度也非常快)。所以,我们一般把 redis 当作 Broker。

我们先来安装一下:

pip install celery redis eventlet

11. Redis 安装

1. Window 下安装

下载地址:https://github.com/MicrosoftArchive/redis/releases

考虑到中国访问 GitHub 网速慢,大家可以去我公众号后台回复:redis。即可获取下载连接。

Redis 支持 32 位和 64 位。这个需要根据你系统平台的实际情况选择,这里我们下载 Redis-x64-xxx.zip压缩包到 C 盘,解压后,将文件夹重新命名为 redis

img

打开文件夹,内容如下:

img

打开一个 cmd 窗口 使用 cd 命令切换目录到 C:\redis 运行:

redis-server.exe redis.windows.conf

如果想方便的话,可以把 redis 的路径加到系统的环境变量里,这样就省得再输路径了,后面的那个 redis.windows.conf 可以省略,如果省略,会启用默认的。输入之后,会显示如下界面:

Redis 安装

这时候另启一个 cmd 窗口,原来的不要关闭,不然就无法访问服务端了。

切换到 redis 目录下运行:

redis-cli.exe -h 127.0.0.1 -p 6379

设置键值对:

set myKey abc

取出键值对:

get myKey

Redis 安装


2. Linux 下安装

下载地址:http://redis.io/download,下载最新稳定版本。

本教程使用的最新文档版本为 2.8.17,下载并安装:

$ wget http://download.redis.io/releases/redis-2.8.17.tar.gz
$ tar xzf redis-2.8.17.tar.gz
$ cd redis-2.8.17
$ make

make完后 redis-2.8.17目录下会出现编译后的redis服务程序redis-server,还有用于测试的客户端程序redis-cli,两个程序位于安装目录 src 目录下:

下面启动redis服务.

$ cd src
$ ./redis-server

注意这种方式启动 redis 使用的是默认配置。也可以通过启动参数告诉 redis使用指定配置文件使用下面命令启动。

$ cd src
$ ./redis-server ../redis.conf

redis.conf 是一个默认的配置文件。我们可以根据需要使用自己的配置文件。

启动 redis 服务进程后,就可以使用测试客户端程序 redis-cli 和 redis 服务交互了。 比如:

$ cd src
$ ./redis-cli
redis> set foo bar
OK
redis> get foo
"bar"

3. Ubuntu 下安装

在 Ubuntu 系统安装 Redis 可以使用以下命令:

$sudo apt-get update
$sudo apt-get install redis-server

4. 启动 Redis

$ redis-server

5. 查看 redis 是否启动?

$ redis-cli

以上命令将打开以下终端:

redis 127.0.0.1:6379>

127.0.0.1 是本机 IP ,6379 是 redis 服务端口。现在我们输入 PING 命令。

redis 127.0.0.1:6379> ping
PONG

安装完成 redis 之后,我们来运行一下 redis sever,启动之后会监听 6379 端口,这是默认端口。

开启之后就是一个键值对数据库。这时候有同学会问,我们如何进行 Celery 任务分发呢?

我们创建一个 Server.py 文件:

from celery import Celery
from bs4 import BeautifulSoup
import requests
app = Celery('tasks',
             backend='redis://127.0.0.1://6397/0',
             broker='redis://127.0.0.1:6397/1')

启动程序也很简单,命令行输入:

$ Celery -A server worker -l info -P eventlet
# Celery:启动芹菜
# -A server:启动服务
# worker:我们上面写的 Python 文件名称
# -l info:日志级别
# -P eventlet:并发执行

tip:

-P eventlet:在Windows 下不加这条的化,会出现运行错误,不能执行大型并发,在 Linux 下不加没有问题。

Windows下不支持各种异步线程,所以需要我们自己下载一个 eventlet,我们使用 eventlet 代替里面的线程进行并发。(pip install eventlet)

接下来,继续编写上面创建的 Server.py 代码:

"""
project = 'Code', file_name = 'Server', author = 'AI悦创'
time = '2020/3/11 8:44', product_name = PyCharm
# code is far away from bugs with the god animal protecting
    I love animals. They taste delicious.
"""
from celery import Celery
from bs4 import BeautifulSoup
import requests
from requests import RequestException

app = Celery('tasks',
             backend='redis://127.0.0.1://6397/0',
             broker='redis://127.0.0.1:6397/1')
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.116 Safari/537.36',
}
@app.tasks
def crawl(i):
    url = f'http://www.budejie.com/{i}'
    try:
        html = requests.get(url, headers=headers)
        if html.status_code == 200:
            soup = BeautifulSoup(html.text, 'lxml')
            lis = soup.select(".j-r-list ul li div .u-txt a")
            result = ''
            for li in lis:
                result += li.get_text()
            #   print(li.get_text())
        return result
    except RequestException:
        return None

Celery 任务都把 app.tasks 当作装饰器,标志着这是一个任务。

编写生产者 Client.py

我们新建代码文件:Client.py

from Server import crawl # 导入刚才编写的 Server 模块中 crawl 任务
from celery.result import AsyncResult # 异步返回结果

tasks = []
for i in range(1, 10):
    task = crawl.delay(i)
    # delay:代表异步提交任务,会返回一个 id,之后我们就可以通过这个 id 来找到运行结果
    tasks.append(task) # 添加到我们所创建的列表,我到时候需要去找到返回结果

for t in tasks:
    print(AsyncResult(t.get()))
#   AsyncResult() :等待任务返回
#   get() :取得结果

8. Docker 分布式多机

如何进行我们的大型分布式架构呢?这个时候就需要容器(Docker)的使用了,举个例子:在 Docker 还没出现之前,大家都是使用很多虚拟机来模仿许多电脑的。但是虚拟机有一个缺陷:一台电脑能开启的虚拟机是有限的,而且开一台虚拟机我们有时并不需要该虚拟机的全部功能,因为有时候我们只是单纯的要运行一个程序而已。但是它给我下载了许多的文件系统、NFS等之类我们不需要的。

Docker 就是一个简单容器,只会有我们需要的程序组件去组建环境,只会提供很少很少的环境配置,可以理解成轻量级的虚拟机,不会像虚拟机提供那么大,只是足够我们项目使用而已。(很简陋的电脑)然后我们就可以把 Docker 当成电脑来执行。

Celery 的 worker 应该跑在不同的机器上,一台机器上的 worker 可以跑多个 task 进程或者线程。这个时候你唯一要创建的就是一个 Broker,其实也就是任务分发节点。

我们提交任务到我们的任务分发节点,然后再由任务分发节点分配给我们的不同电脑,这样就能实现了。更多关于的这样的知识点我这里就是简单的给你介绍一下,有兴趣的可以自行去学习。欢迎关注我公众号:AI悦创,博客:www.aiyc.yop

总结

以上便是 Python 中协程的基本原理和用法,在后面一课时会详细介绍 aiohttp 的使用和爬取实战,实现快速高并发的爬取。

本节代码:https://github.com/Python3WebSpider/AsyncTest。

AI悦创·创造不同!
AI悦创 » 万字长文,带你入门异步编程

21 评论

  1. 说实话,挺喜欢这种把知识分成几个知识点,循序渐进,就像做一件事之前已经知道需要些什么。还没看到实战的时候,只听理论还是有点懵逼,但是跟着实战敲一遍代码,思路就清晰了。我觉得理论和实践结合很重要,就像学校有理论课和实验课一样。整体听下来还是不错的,实战讲解那里思路清晰,明天再巩固一下代码,理论再梳理一遍。我记得你有写过一篇详细的多线程多进程的,还没来的及看。

    1. 还有区分版本,好评ヾ(≧∇≦*)ゝ

    2. 我看到了一个有心评论的你,我很开心,真的开心,希望有更多的反馈,我们共同进步!(不管是好评还是差评内容都会显示,如果觉得有不足的话也请指正哈)

      1. 一起加油!OωO

  2. 滴!访客卡!请上车的乘客系好安全带,现在是:Sat May 30 2020 11:28:19 GMT+0800 (中国标准时间)

  3. 我觉得前面进程线程讲的还是不错的!就到了协程那里,可能是我第一次接触,代码实现这方面,还不能很好的理解,就直接一下上代码,太秃然了,我要好好消化下。|´・ω・)ノ

    1. 哈,别急哦,后面有讲实现哦,这个是安排的,上代码的目的不是让你懂,而是要让你运行结果的和一些规律,后面协程的实现才是重头戏,有没有发现理论很多?我可是还删除了一些呢!不过希望后面带你们敲代码的之后的完整感受哦!越真实越好,这样我也就进步一些啦,又不了解的可以继续留言提问,skr~ ::aru:cheer::

      1. 好的好的,还没看到实战。

      2. 是的,理论还分的挺细的。

        1. 理论考验一个老师自己会不会,青灯教育就是个渣渣,加油。等你最终的反馈skr~

  4. [secret]可能我理解力不是很强,生产者消费者那里能讲细点吗。还有视频里举买手机那个例子感觉不太合适
    [/secret]

    1. 这个生产者和消费者的话主要拿来对比,如果要了解我之后写篇文章,为什么要拿生产者和消费者对比呢?

    2. 答:为了引出异步的优势,对于手机的例子不太合适你的感受是?

      1. [secret]就是拿爬虫来说任务就是发送请求,等待,处理这三个。你要说网购买手机我就感觉都应该说网购买了,在等待这个时候还可以继续买,互不影响。只是个人观点勿喷!!!
        [/secret]

        1. 对啊,不会喷的,你再想想:你去手机店买手机,那店里面是模型机肯定是要去仓库拿真机的嘛,这个时候你只能等待了;那换一个方法买:你去网购,那这个时候你下单之后是不是可以做其他的事情(比如:lol、王者荣耀、追剧啥的),这样不就错开了嘛,这样不就很形象的讲解异步。不知道这样有没有解决的你疑问呢?没有的话,继续提问交流,不要害羞! ::aru:proud::

          1. [secret]emmmm我又想了一下,明白你说的意思。你要这么说也行,可能我当时理解的角度不一样。我刚才说的意思是,应该把爬虫和网购类比,这样我感觉相对会好一点,爬虫三个步骤请求等待处理,网购是下单等待收获。我是从这方面考虑的
            [/secret]

  5. 异步。。。。

Leave a Reply