答疑·限时优惠

如果,你想让我看见你的疑问并且百分之百的回答。可以加入我的知识星球。
AI悦创·进化岛
AI悦创·进化岛

目录

  1. 简单的百度新闻爬虫
  2. 实现一个更好的网络请求函数
  3. 实现功能强大、简洁易用的网址池(URL Pool)
  4. 让 MySQL 数据库操作更方便
  5. 实现一个同步定向新闻爬虫
  6. 网页正文的提取
  7. 用 asyncio 实现异步爬虫

1. 简单的百度新闻爬虫

前面悦创我唠叨了很多内容,都是为今天的实战做铺垫。小悦们可能已经等得有些不耐烦了,那么我们就废话不多说,马上干起来!

这个实战例子是构建一个大规模的异步新闻爬虫,但要分几步走,从简单到复杂,循序渐进的来构建这个Python爬虫

本教程所有代码以 Python 3.6实现,不兼顾 Python 2,这也是强烈建议小悦们使用 Python 3的良苦用心啊,O(∩_∩)O哈哈哈~


要抓取新闻,首先得有新闻源,也就是抓取的目标网站。国内的新闻网站,从中央到地方,从综合到垂直行业,大大小小有几千家新闻网站。百度新闻(news.baidu.com)收录的大约两千多家。那么我们先从百度新闻入手。

打开百度新闻的网站首页:news.baidu.com
我们可以看到这就是一个新闻聚合网页,里面列举了很多新闻的标题及其原始链接。如图所示:

image-20200715155050290


我们的目标就是从这里提取那些新闻的链接并下载。流程比较简单:

新闻爬虫简单流程图

根据这个简单流程,我们先实现下面的简单代码:

#!/usr/bin/env python3
# Author: AI悦创


import re
import time
import requests
import tldextract


def save_to_db(url, html):
    # 保存网页到数据库,我们暂时用打印相关信息代替
    print('%s : %s' % (url, len(html)))


def crawl():
    # 1. download baidu news
    hub_url = 'http://news.baidu.com/'
    res = requests.get(hub_url)
    html = res.text

    # 2. extract news links
    ## 2.1 extract all links with 'href'
    links = re.findall(r'href=[\'"]?(.*?)[\'"\s]', html)
    print('find links:', len(links))
    news_links = []
    ## 2.2 filter non-news link
    for link in links:
        if not link.startswith('http'):
            continue
        tld = tldextract.extract(link)
        if tld.domain == 'baidu':
            continue
        news_links.append(link)

    print('find news links:', len(news_links))
    # 3. download news and save to database
    for link in news_links:
        html = requests.get(link).text
        save_to_db(link, html)
    print('works done!')


def main():
    while 1:
        crawl()
        time.sleep(300)


if __name__ == '__main__':
    main()

简单解释一下上面的代码:

  1. 使用 requests 下载百度新闻首页;

  2. 先用正则表达式提取 a 标签的 href 属性,也就是网页中的链接;然后找出新闻的链接,方法是:假定非百度的外链都是新闻链接;

  3. 逐个下载找到的所有新闻链接并保存到数据库;保存到数据库的函数暂时用打印相关信息代替。

  4. 每隔 300 秒重复 1-3 步,以抓取更新的新闻。

以上代码能工作,但也仅仅是能工作,槽点多得也不是一点半点,那就让我们一起边吐槽边完善这个爬虫吧。

1. 增加异常处理

在写爬虫,尤其是网络请求相关的代码,一定要有异常处理。

  1. 目标服务器是否正常,当时的网络连接是否顺畅(超时)等状况都是爬虫无法控制的,所以在处理网络请求时必须要处理异常。
  2. 网络请求最好设置 timeout ,别在某个请求耗费太多时间。timeout 导致的失败,有可能是服务器响应不过来,也可能是暂时的网络出问题。所以,对于 timeout 的异常,我们需要过段时间再尝试。

2. 要对服务器返回的状态,如404,500等做出处理

服务器返回的状态很重要,这决定着我们爬虫下一步该怎么做。需要处理的常见状态有:

  • 301, 该 URL 被永久转移到其它 URL,以后请求的话就请求被转移的 URL
  • 404,基本上是这个网站已经失效了,后面也就别试了
  • 500,服务器内部出错了,可能是暂时的,后面要再次请求试试

3. 管理好URL的状态

记录下此次失败的 URL,以便后面再试一次。对于 timeout 的 URL ,需要后面再次抓取,所以需要记录所有 URL 的各种状态,包括:

  • 已经下载成功
  • 下载多次失败无需再下载
  • 正在下载
  • 下载失败要再次尝试

增加了对网络请求的各种处理,这个爬虫就健壮多了,不会动不动就异常退出,给后面运维带来很多的工作量。
下一节我们讲对上面三个槽点结合代码一一完善。欲知详情,请听下回分解。

4. Python 爬虫知识点

本节中我们用到了Python的几个模块,他们在爬虫中的作用如下:

1. requests模块

它用来做 http 网络请求,下载 URL 内容,相比 Python 自带的 urllib.request,requests 更加易用。GET,POST信手拈来:

import requests

res = requests.get(url, timeout=5, headers=my_headers)

res2 = requests.post(url, data=post_data, timeout=5, headers=my_headers)

get() 和 post() 函数有很多参数可选,上面用到了设置 timeout,自定义 headers,更多参数可参考 requests 文档。

requests 无论 get() 还是 post() 都会返回一个 Response 对象,下载到的内容就通过这个对象获取:

  • res.content 是得到的二进制内容,其类型是 bytes;

  • res.text 是二进制内容 content decode 后的 str 内容;

    它先从 response headers 里面找到 encoding ,没找到就通过 chardet 自动判断得到 encoding ,并赋值给 res.encoding,最后把二进制的 content 解密为 str 类型。

小悦经验: res.text 判断中文编码时有时候会出错,还是自己通过 cchardet(用C语言实现的 chardet )获取更准确。这里,我们列举一个例子:

In [1]: import requests

In [2]: r = requests.get('http://epaper.sxrb.com/')

In [3]: r.encoding
Out[3]: 'ISO-8859-1'

In [4]: import chardet

In [5]: chardet.detect(r.content)
Out[5]: {'confidence': 0.99, 'encoding': 'utf-8', 'language': ''}

上面是用 ipython 交互式解释器(强烈推荐 ipython,比 Python 自己的解释器好太多)演示了一下。打开的网址是山西日报数字报,手动查看网页源码其编码是 utf8,用 chardet 判断得到的也是 utf8 。而 requests 自己判断的 encoding 是 ISO-8859-1 ,那么它返回的 text 的中文也就会是乱码。

requests 还有个好用的就是 Session,它部分类似浏览器,保存了 cookies,在后面需要登录和与 cookies 相关的爬虫都可以用它的 session 来实现。

2. re模块

正则表达式主要是用来提取 html 中的相关内容,比如本例中的链接提取。更复杂的 html 内容提取,推荐使用 lxml 来实现。

3. tldextract模块

这是个第三方模块,需要 pip install tldextract 进行安装。它的意思就是 Top Level Domain extract,即顶级域名提取。

前面我们讲过 URL 的结构,news.baidu.com 里面的 news.baidu.com 叫做 host,它是注册域名 baidu.com 的子域名,而 com 就是顶级域名 TLD。它的结果是这样的:

In [6]: import tldextract

In [7]: tldextract.extract('http://news.baidu.com/')
Out[7]: ExtractResult(subdomain='news', domain='baidu', suffix='com')

返回结构包含三部分:subdomain, domain, suffix

4. time模块

时间,是我们在程序中经常用到的概念,比如,在循环中停顿一段时间,获取当前的时间戳等。而 time 模块就是提供时间相关功能的模块。同时还有另外一个模块 datetime 也是时间相关的,可以根据情况适当选择来用。

记住这几个模块,在今后的写爬虫生涯中将会受益匪浅。

2. 实现一个更好的网络请求函数

上一节我们实现了一个简单的再也不能简单的新闻爬虫,这个爬虫有很多槽点,估计小悦们也会鄙视这个爬虫。上一节最后我们讨论了这些槽点,现在我们就来去除这些槽点来完善我们的新闻爬虫。

问题我们前面已经描述清楚,解决的方法也有了,那就废话不多讲,代码立刻上(Talk is cheap, show me the code!)。

写网络爬虫要注意处理网络异常

1. downloader 的实现

import requests
import cchardet
import traceback  # 在 Python 代码出错的时候,会打印一些:出错信息,会告诉我们第几行出错了,


def downloader(url, timeout=10, headers=None, debug=False, binary=False):
    _headers = {
        'User-Agent': ('Mozilla/5.0 (compatible; MSIE 9.0; '
                       'Windows NT 6.1; Win64; x64; Trident/5.0)'),
    }
    redirected_url = url
    if headers:
        _headers = headers
    try:
        r = requests.get(url, headers=_headers, timeout=timeout)
        if binary:
            html = r.content
        else:
            encoding = cchardet.detect(r.content)['encoding']
            html = r.content.decode(encoding)
        status = r.status_code
        redirected_url = r.url # response 目前请求的 url ,也就是我们最初的 url 和最终的 url 有可能不是同一个,有可能被重定向了。
    except:
        if debug:
            traceback.print_exc()
        msg = 'failed download: {}'.format(url)
        print(msg)
        if binary:
            html = b''
        else:
            html = ''
        status = 0
    return status, html, redirected_url


if __name__ == '__main__':
    url = 'http://news.baidu.com/'
    status, html, redirected_url = downloader(url)
    print(status, len(html), redirected_url)

这个 downloader() 函数,内置了默认的 User-Agent 模拟成一个 IE9 浏览器,同时接受调用者自定义的 headers 和 timeout 。使用 cchardet 来处理编码问题,返回数据包括:

  • 状态码:如果出现异常,设置为0
  • 内容: 默认返回 str 内容。但是 URL 链接的是图片等二进制内容时,注意调用时要设 binary=True
  • 重定向 URL: 有些 URL 会被重定向,最终页面的 url 包含在响应对象里面

进一步优化代码:(下面的这个版本等待校验,小伙伴可以提出宝贵意见)

import requests
import cchardet
import traceback  # 在 Python 代码出错的时候,会打印一些:出错信息,会告诉我们第几行出错了,


def downloader(url, timeout=10, headers=None, debug=False, binary=False):
    _headers = {
        'User-Agent': ('Mozilla/5.0 (compatible; MSIE 9.0; '
                       'Windows NT 6.1; Win64; x64; Trident/5.0)'),
    }
    redirected_url = url
    if headers:
        _headers = headers
    try:
        r = requests.get(url, headers=_headers, timeout=timeout)
        if r.status_code == 200:
            if binary:
                html = r.content
            else:
                encoding = cchardet.detect(r.content)['encoding']
                html = r.content.decode(encoding)
            status = r.status_code
            redirected_url = r.url # response 目前请求的 url ,也就是我们最初的 url 和最终的 url 有可能不是同一个,有可能被重定向了。
        else:
            status = r.status_code
            redirected_url = r.url
            return status, '', redirected_url
    except:
        if debug:
            traceback.print_exc()
        msg = 'failed download: {}'.format(url)
        print(msg)
        if binary:
            html = b''
        else:
            html = ''
        status = 0
    return status, html, redirected_url


if __name__ == '__main__':
    url = 'http://news.baidu.com/'
    status, html, redirected_url = downloader(url)
    print(status, len(html), redirected_url)

2. 新闻URL的清洗

我们先看看这两个新闻网址:

http://xinwen.eastday.com/a/n181106070849091.html?qid=news.baidu.com

http://news.ifeng.com/a/20181106/60146589_0.shtml?_zbs_baidu_news

上面两个带 ? 的网站来自百度新闻的首页,这个问号 ? 的作用就是告诉目标服务器,这个网址是从百度新闻链接过来的,是百度带过来的流量。但是它们的表示方式不完全一样,一个是 qid=news.baidu.com , 一个是 _zbs_baidu_news 。这有可能是目标服务器要求的格式不同导致的,这个在目标服务器的后台的浏览统计程序中可能用得到。

然后去掉问号?及其后面的字符,发现它们和不去掉指向的是相同的新闻网页。

从字符串对比上看,有问号和没问号是两个不同的网址,但是它们又指向完全相同的新闻网页,说明问号后面的参数对响应内容没有任何影响。

正在抓取新闻的大量实践后,我们发现了这样的规律:

新闻类网址都做了大量SEO,它们把新闻网址都静态化了,基本上都是以 .html, .htm, .shtml 等结尾,后面再加任何请求参数都无济于事。

但是,还是会有些新闻网站以参数 id 的形式动态获取新闻网页。

那么我们抓取新闻时,就要利用这个规律,防止重复抓取。由此,我们实现一个清洗网址的函数。

g_bin_postfix = set([
    'exe', 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx',
    'pdf',
    'jpg', 'png', 'bmp', 'jpeg', 'gif',
    'zip', 'rar', 'tar', 'bz2', '7z', 'gz',
    'flv', 'mp4', 'avi', 'wmv', 'mkv',
    'apk',
])

g_news_postfix = [
    '.html?', '.htm?', '.shtml?',
    '.shtm?',
]


def clean_url(url):
    # 1. 是否为合法的http url
    if not url.startswith('http'):
        return ''
    # 2. 去掉静态化url后面的参数
    for np in g_news_postfix:
        p = url.find(np)
        if p > -1:
            p = url.find('?')
            url = url[:p]
            return url
    # 3. 不下载二进制类内容的链接
    up = urlparse.urlparse(url)
    path = up.path
    if not path:
        path = '/'
    postfix = path.split('.')[-1].lower()
    if postfix in g_bin_postfix:
        return ''

    # 4. 去掉标识流量来源的参数
    # badquery = ['spm', 'utm_source', 'utm_source', 'utm_medium', 'utm_campaign']
    good_queries = []
    for query in up.query.split('&'):
        qv = query.split('=')
        if qv[0].startswith('spm') or qv[0].startswith('utm_'):
            continue
        if len(qv) == 1:
            continue
        good_queries.append(query)
    query = '&'.join(good_queries)
    url = urlparse.urlunparse((
        up.scheme,
        up.netloc,
        path,
        up.params,
        query,
        ''  #  crawler do not care fragment
    ))
    return url

清洗 url 的方法都在代码的注释里面了,这里面包含了两类操作:

  • 判断是否合法 url,非法的直接返回空字符串
  • 去掉不必要的参数,去掉静态化 url 的参数

3. 网络爬虫知识点

1. URL清洗

网络请求开始之前,先把 url 清洗一遍,可以避免重复下载、无效下载(二进制内容),节省服务器和网络开销。

2. cchardet 模块

该模块是 chardet 的升级版,功能和chardet完全一样,用来检测一个字符串的编码。由于是用C和C++实现的,所以它的速度非常快,非常适合在爬虫中用来判断网页的编码。

切记,不要相信requests返回的encoding,自己判断一下更放心

上一节,我们已经列举了一个例子来证明 requests 对编码识别的错误,如果忘了的话,可以再去回顾一下。

3. traceback 模块

我们写的爬虫在运行过程中,会出现各种异常,而且有些异常是不可预期的,也不知道它会出现在什么地方,我们就需要用 try 来捕获异常让程序不中断,但是我们又需要看看捕获的异常是什么内容,由此来改善我们的爬虫。

这个时候,就需要 traceback 模块。

比如在 downloader() 函数里面我们用 try 捕获了 get() 的异常,但是,异常也有可能是 cchardet.detect() 引起的,用 traceback.print_exc() 来输出异常,有助于我们发现更多问题。

3. 实现功能强大、简洁易用的网址池(URL Pool)


对于比较大型的爬虫来说,URL 管理的管理是个核心问题,管理不好,就可能重复下载,也可能遗漏下载。

这里,我们设计一个 URL Pool 来管理 URL。

这个 URL Pool 就是一个生产者-消费者模式:

生产者-消费者流程图

依葫芦画瓢,URLPool 就是这样的

设计的网络爬虫URLPool

我们从网址池的使用目的出发来设计网址池的接口,它应该具有以下功能:

  • 往池子里面添加 URL;
  • 从池子里面取 URL 以下载;
  • 池子内部要管理 URL 状态;

前面我提到 URL 的状态有以下4中:

  • 已经下载成功
  • 下载多次失败无需再下载
  • 正在下载
  • 下载失败要再次尝试

前两个是永久状态,也就是已经下载成功的不再下载,多次尝试后仍失败的也就不再下载,它们需要永久存储起来,以便爬虫重启后,这种永久状态记录不会消失,已经成功下载的 URL 不再被重复下载。永久存储的方法有很多种:

  • 比如,直接写入文本文件,但它不利于查找某个 URL 是否已经存在文本中;
  • 比如,直接写入 MySQL 等关系型数据库,它利用查找,但是速度又比较慢;
  • 比如,使用 key-value 数据库,查找和速度都符合要求,是不错的选择!

我们这个 URL Pool 选用 LevelDB 来作为 URL 状态的永久存储。LevelDB 是 Google 开源的一个 key-value 数据库,速度非常快,同时自动压缩数据。我们用它先来实现一个 UrlDB 作为永久存储数据库。

1. UrlDB 的实现

import leveldb

class UrlDB:
    '''Use LevelDB to store URLs what have been done(succeed or faile)
    '''
    status_failure = b'0'
    status_success = b'1'

    def __init__(self, db_name):
        self.name = db_name + '.urldb'
        self.db = leveldb.LevelDB(self.name)

    def set_success(self, url):
        if isinstance(url, str):
            url = url.encode('utf8')
        try:
            self.db.Put(url, self.status_success)
            s = True
        except:
            s = False
        return s

    def set_failure(self, url):
        if isinstance(url, str):
            url = url.encode('utf8')
        try:
            self.db.Put(url, self.status_failure)
            s = True
        except:
            s = False
        return s

    def has(self, url):
        if isinstance(url, str):
            url = url.encode('utf8')
        try:
            attr = self.db.Get(url)
            return attr
        except:
            pass
        return False

UrlDB 将被 UrlPool 使用,主要有三个方法被使用:

  • has(url) 查看是否已经存在某 url
  • set_success(url) 存储 url 状态为成功
  • set_failure(url) 存储 url 状态为失败

2. UrlPool 的实现

而正在下载和下载失败次数这两个 URL 的状态只需暂时保存在内容即可,我们把它们放到 UrlPool 这个类中进行管理。接着我们来实现网址池:

#Author: veelion

import pickle
import leveldb
import time
import urllib.parse as urlparse


class UrlPool:
    '''URL Pool for crawler to manage URLs
    '''

    def __init__(self, pool_name):
        self.name = pool_name
        self.db = UrlDB(pool_name)

        self.waiting = {}  # {host: set([urls]), } 按host分组,记录等待下载的URL
        self.pending = {}  # {url: pended_time, } 记录已被取出(self.pop())但还未被更新状态(正在下载)的URL
        self.failure = {}  # {url: times,} 记录失败的URL的次数
        self.failure_threshold = 3
        self.pending_threshold = 10  # pending的最大时间,过期要重新下载
        self.waiting_count = 0  # self.waiting 字典里面的url的个数
        self.max_hosts = ['', 0]  # [host: url_count] 目前pool中url最多的host及其url数量
        self.hub_pool = {}  # {url: last_query_time, }  存放hub url
        self.hub_refresh_span = 0
        self.load_cache()

    def __del__(self):
        self.dump_cache()

    def load_cache(self,):
        path = self.name + '.pkl'
        try:
            with open(path, 'rb') as f:
                self.waiting = pickle.load(f)
            cc = [len(v) for k, v in self.waiting.items()]
            print('saved pool loaded! urls:', sum(cc))
        except:
            pass

    def dump_cache(self):
        path = self.name + '.pkl'
        try:
            with open(path, 'wb') as f:
                pickle.dump(self.waiting, f)
            print('self.waiting saved!')
        except:
            pass

    def set_hubs(self, urls, hub_refresh_span):
        self.hub_refresh_span = hub_refresh_span
        self.hub_pool = {}
        for url in urls:
            self.hub_pool[url] = 0

    def set_status(self, url, status_code):
        if url in self.pending:
            self.pending.pop(url)

        if status_code == 200:
            self.db.set_success(url)
            return
        if status_code == 404:
            self.db.set_failure(url)
            return
        if url in self.failure:
            self.failure[url] += 1
            if self.failure[url] > self.failure_threshold:
                self.db.set_failure(url)
                self.failure.pop(url)
            else:
                self.add(url)
        else:
            self.failure[url] = 1
            self.add(url)

    def push_to_pool(self, url):
        host = urlparse.urlparse(url).netloc
        if not host or '.' not in host:
            print('try to push_to_pool with bad url:', url, ', len of ur:', len(url))
            return False
        if host in self.waiting:
            if url in self.waiting[host]:
                return True
            self.waiting[host].add(url)
            if len(self.waiting[host]) > self.max_hosts[1]:
                self.max_hosts[1] = len(self.waiting[host])
                self.max_hosts[0] = host
        else:
            self.waiting[host] = set([url])
        self.waiting_count += 1
        return True

    def add(self, url, always=False):
        if always:
            return self.push_to_pool(url)
        pended_time = self.pending.get(url, 0)
        if time.time() - pended_time < self.pending_threshold:
            print('being downloading:', url)
            return
        if self.db.has(url):
            return
        if pended_time:
            self.pending.pop(url)
        return self.push_to_pool(url)

    def addmany(self, urls, always=False):
        if isinstance(urls, str):
            print('urls is a str !!!!', urls)
            self.add(urls, always)
        else:
            for url in urls:
                self.add(url, always)

    def pop(self, count, hub_percent=50):
        print('\n\tmax of host:', self.max_hosts)

        # 取出的url有两种类型:hub=1, 普通=0
        url_attr_url = 0
        url_attr_hub = 1
        # 1. 首先取出hub,保证获取hub里面的最新url.
        hubs = {}
        hub_count = count * hub_percent // 100
        for hub in self.hub_pool:
            span = time.time() - self.hub_pool[hub]
            if span < self.hub_refresh_span: 
                continue
            hubs[hub] = url_attr_hub # 1 means hub-url 
            self.hub_pool[hub] = time.time() 
            if len(hubs) >= hub_count:
                break

        # 2. 再取出普通url
        left_count = count - len(hubs)
        urls = {}
        for host in self.waiting:
            if not self.waiting[host]:
                continue
            url = self.waiting[host].pop()
            urls[url] = url_attr_url
            self.pending[url] = time.time()
            if self.max_hosts[0] == host:
                self.max_hosts[1] -= 1
            if len(urls) >= left_count:
                break
        self.waiting_count -= len(urls)
        print('To pop:%s, hubs: %s, urls: %s, hosts:%s' % (count, len(hubs), len(urls), len(self.waiting)))
        urls.update(hubs)
        return urls

    def size(self,):
        return self.waiting_count

    def empty(self,):
        return self.waiting_count == 0

UrlPool 的实现有些复杂,且听我一一分解。

3. UrlPool 的使用

先看看它的主要成员及其用途:

  • self.db 是一个 UrlDB 的示例,用来永久存储 url 的永久状态
  • self.pool 是用来存放 url 的,它是一个字典(dict)结构,key 是 url 的 host ,value 是一个用来存储这个 host 的所有 url 的集合(set)。
  • self.pending 用来管理正在下载的 url 状态。它是一个字典结构,key 是 url,value 是它被 pop 的时间戳。当一个 url 被 pop() 时,就是它被下载的开始。当该 url 被 set_status() 时,就是下载结束的时刻。如果一个 url 被 add() 入 pool 时,发现它已经被 pended 的时间超过 pending_threshold 时,就可以再次入库等待被下载。否则,暂不入池。
  • self.failue 是一个字典,key 是 url,value 是识别的次数,超过 failure_threshold 就会被永久记录为失败,不再尝试下载。
  • hub_pool 是一个用来存储 hub 页面的字典,key 是 hub url ,value 是上次刷新该 hub 页面的时间.

以上成员就构成了我们这个网址池的数据结构,再通过以下成员方法对这个网址池进行操作:

  1. load_cache() 和 dump_cache() 对网址池进行缓存

load_cache() 在 __init__() 中调用,创建 pool 的时候,尝试去加载上次退出时缓存的 URL pool;
dump_cache() 在 __del__() 中调用,也就是在网址池销毁前(比如爬虫意外退出),把内存中的 URL pool 缓存到硬盘。
这里使用了pickle 模块,这是一个把内存数据序列化到硬盘的工具。

  1. set_hubs() 方法设置 hub URL

hub 网页就是像百度新闻那样的页面,整个页面都是新闻的标题和链接,是我们真正需要的新闻的聚合页面,并且这样的页面会不断更新,把最新的新闻聚合到这样的页面,我们称它们为 hub 页面,其 URL.就是 hub url 。在新闻爬虫中添加大量的这样的 url,有助于爬虫及时发现并抓取最新的新闻。

该方法就是将这样的 hub url 列表传给网址池,在爬虫从池中取 URL 时,根据时间间隔(self.hub_refresh_span)来取 hub url。

  1. add(), addmany(), push_to_pool() 对网址池进行入池操作

把 url 放入网址池时,先检查内存中的 self.pending 是否存在该 url,即是否正在下载该 url。如果正在下载就不入池;如果正下载或已经超时,就进行到下一步;
接着检查该 url 是否已经在 leveldb 中存在,存在就表明之前已经成功下载或彻底失败,不再下载了也不入池。如果没有则进行到下一步;
最后通过 push_to_pool() 把 url 放入 self.pool 中。存放的规则是,按照 url 的 host 进行分类,相同 host 的 url 放到一起,在取出时每个 host 取一个 url,尽量保证每次取出的一批 url 都是指向不同的服务器的,这样做的目的也是为了尽量减少对抓取目标服务器的请求压力。力争做一个服务器友好的爬虫 O(∩_∩)O

  1. pop() 对网址池进行出池操作

爬虫通过该方法,从网址池中获取一批 url 去下载。取出 url 分两步:

第一步 ,先从 self.hub_pool 中获得,方法是遍历 hub_pool ,检查每个 hub-url 距上次被 pop 的时间间隔是否超过 hub 页面刷新间隔(self.hub_refresh_span),来决定 hub-url 是否应该被 pop。

第二步 ,从 self.pool 中获取。前面 push_to_pool 中,介绍了 pop 的原则,就是每次取出的一批 url 都是指向不同服务器的,有了self.pool 的特殊数据结构,安装这个原则获取 url 就简单了,按 host(self.pool 的 key)遍历 self.pool 即可。

  1. set_status() 方法设置网址池中 url 的状态

其参数 status_code 是 http 响应的状态码。爬虫在下载完 URL 后进行 url 状态设置。

首先,把该 url 成 self.pending 中删除,已经下载完毕,不再是 pending 状态;

接着,根据 status_code 来设置 url状态,200和404的直接设置为永久状态;其它status就记录失败次数,并再次入池进行后续下载尝试。

通过以上成员变量和方法,我们把这个网址池(UrlPool)解析的清清楚楚。同学们可以毫不客气的收藏起来,今后在写爬虫时可以用它方便的管理 URL,并且这个实现只有一个 py 文件,方便加入到任何项目中。

4. 爬虫知识点

  1. 网址的管理
    网址的管理,其目的就是为了:不重抓,不漏抓。

  2. pickle 模块
    把内存数据保存到硬盘,再把硬盘数据重新加载到内存,这是很多程序停止和启动的必要步骤。pickle就是实现数据在内存和硬盘之间转移的模块。

  3. leveldb 模块
    这是一个经典且强大的硬盘型 key-value 数据库,非常适合 url-status 这种结构的存储。
  4. urllib.parse
    解析网址的模块,在处理url时首先想到的模块就应该是它。

4. 让 MySQL 数据库操作更方便

同学们还记得最开始我们实现的那个槽点多多的百度新闻爬虫吗?那里的逻辑最后是把下载的网页和网址存储到数据库,但是我们只是简单的实现为打印信息。

现如今,我们能用的数据库很多,老牌关系型数据库如 MySQL(MariaDB), PostgreSQL 等,新型的 NoSQL 数据库,还有 NewSqL 数据库。选择实在太多,但 MySQL(Mariadb) 从易获取性、易使用性、稳定性、社区活跃性方面都有较大优势,所以,我们在够用的情况下都选择 MySQL。

今天,我们就把 MySQL 的操作单独拿出来探讨一下,并实现一个更方便的封装。

Python 对 MySQL 操作的模块最好的两个模块是:

4.1 MySQLdb

这是一个老牌的 MySQL 模块,它封装了 MySQL client 的 C 语言 API ,但是它主要支持 Python 2.x 的版本,后来有人 fork 了一个版本加入了 Python 3 的支持,并起名为 mysqlclient-python 它的 pypi 包名为 mysqlclient ,所以通过 pip 安装就是 pip install mysqlclient

4.2 PyMySQL

这是一个纯 Python 实现的 MySQL 客户端。因为是纯 Python 实现,它和 Python 3 的异步模块 aysncio 可以很好的结合起来,形成了aiomysql 模块,后面我们写异步爬虫时就可以对数据库进行异步操作了。

通过以上简单的对比,我们选择了 PyMySQL 来作为我们的数据库客户端模块。

我在 Python 中操作 MySQL 的时间比较的久远,总结下来,还是 tornado 里面的那个 torndb 的封装使用比较方便。torndb 在 Python 2.x 时代早就出现了,那时候它是对 MySQLdb 的封装。后来接触 Python 3 和 PyMySQL,就自己参考 torndb 和自己的经验,对PyMySQL 进行了一个封装,并给它起了个很土的名字: ezpymysql

不过,这个很土的名字背后,还是有着让人省心的方便,希望同学们能看在它好用的份儿上,别计较它很土的名字。

废话不多讲,代码接着上!

4.2.1 使用示例

首先我们先通过一个使用例子看看它的方便性:

from ezpymysql import Connection

db = Connection(
    'localhost',
    'db_name',
    'user',
    'password'
)
# 获取一条记录
sql = 'select * from test_table where id=%s'
data = db.get(sql, 2)

# 获取多天记录
sql = 'select * from test_table where id>%s'
data = db.query(sql, 2)

# 插入一条数据
sql = 'insert into test_table(title, url) values(%s, %s)'
last_id = db.execute(sql, 'test', 'http://a.com/')
# 或者
last_id = db.insert(sql, 'test', 'http://a.com/')


# 使用更高级的方法插入一条数据
item = {
    'title': 'test',
    'url': 'http://a.com/',
}
last_id = db.table_insert('test_table', item)

它的使用分两步:

  1. 首先,建立一个 MySQL 连接;
  2. 然后,通过 sql 语句查询或插入数据。

可能有小伙伴提出疑问,为什么不用像 SQLAlchemy 之类的 ORM 呢?

简单说,就是因为这个简单,我们的操作基本上都是查询和插入,用基本的 select, insert 这些 sql 语句是最方便和简单的。而 ORM 要先对表建立映射模型,查询方法也是因 ORM 而不同,过度的封装很不适合爬虫应用场景。其实,我在写 web 应用时,仍然是自己写 sql,感觉就是那么的清爽!

好吧,不再卖关子了,该上 ezpymysql 的实现了。

4.2.2 具体实现

#File: ezpymysql.py
#Author: AI悦创

"""A lightweight wrapper around PyMySQL.
only for python3

"""

import time
import logging
import traceback
import pymysql.cursors

version = "0.7"
version_info = (0, 7, 0, 0)


class Connection(object):
    """A lightweight wrapper around PyMySQL.
    """
    def __init__(self, host, database, user=None, password=None,
                 port=0,
                 max_idle_time=7 * 3600, connect_timeout=10,
                 time_zone="+0:00", charset = "utf8mb4", sql_mode="TRADITIONAL"):
        self.host = host
        self.database = database
        self.max_idle_time = float(max_idle_time)

        args = dict(use_unicode=True, charset=charset,
                    database=database,
                    init_command=('SET time_zone = "%s"' % time_zone),
                    cursorclass=pymysql.cursors.DictCursor,
                    connect_timeout=connect_timeout, sql_mode=sql_mode)
        if user is not None:
            args["user"] = user
        if password is not None:
            args["passwd"] = password

        # We accept a path to a MySQL socket file or a host(:port) string
        if "/" in host:
            args["unix_socket"] = host
        else:
            self.socket = None
            pair = host.split(":")
            if len(pair) == 2:
                args["host"] = pair[0]
                args["port"] = int(pair[1])
            else:
                args["host"] = host
                args["port"] = 3306
        if port:
            args['port'] = port

        self._db = None
        self._db_args = args
        self._last_use_time = time.time()
        try:
            self.reconnect()
        except Exception:
            logging.error("Cannot connect to MySQL on %s", self.host,
                          exc_info=True)

    def _ensure_connected(self):
        # Mysql by default closes client connections that are idle for
        # 8 hours, but the client library does not report this fact until
        # you try to perform a query and it fails.  Protect against this
        # case by preemptively closing and reopening the connection
        # if it has been idle for too long (7 hours by default).
        if (self._db is None or
            (time.time() - self._last_use_time > self.max_idle_time)):
            self.reconnect()
        self._last_use_time = time.time()

    def _cursor(self):
        self._ensure_connected()
        return self._db.cursor()

    def __del__(self):
        self.close()

    def close(self):
        """Closes this database connection."""
        if getattr(self, "_db", None) is not None:
            self._db.close()
            self._db = None

    def reconnect(self):
        """Closes the existing database connection and re-opens it."""
        self.close()
        self._db = pymysql.connect(**self._db_args)
        self._db.autocommit(True)

    def query(self, query, *parameters, **kwparameters):
        """Returns a row list for the given query and parameters."""
        cursor = self._cursor()
        try:
            cursor.execute(query, kwparameters or parameters)
            result = cursor.fetchall()
            return result
        finally:
            cursor.close()

    def get(self, query, *parameters, **kwparameters):
        """Returns the (singular) row returned by the given query.
        """
        cursor = self._cursor()
        try:
            cursor.execute(query, kwparameters or parameters)
            return cursor.fetchone()
        finally:
            cursor.close()

    def execute(self, query, *parameters, **kwparameters):
        """Executes the given query, returning the lastrowid from the query."""
        cursor = self._cursor()
        try:
            cursor.execute(query, kwparameters or parameters)
            return cursor.lastrowid
        except Exception as e:
            if e.args[0] == 1062:
                pass
            else:
                traceback.print_exc()
                raise e
        finally:
            cursor.close()

    insert = execute

    ## =============== high level method for table ===================

    def table_has(self, table_name, field, value):
        if isinstance(value, str):
            value = value.encode('utf8')
        sql = 'SELECT %s FROM %s WHERE %s="%s"' % (
            field,
            table_name,
            field,
            value)
        d = self.get(sql)
        return d

    def table_insert(self, table_name, item):
        '''item is a dict : key is mysql table field'''
        fields = list(item.keys())
        values = list(item.values())
        fieldstr = ','.join(fields)
        valstr = ','.join(['%s'] * len(item))
        for i in range(len(values)):
            if isinstance(values[i], str):
                values[i] = values[i].encode('utf8')
        sql = 'INSERT INTO %s (%s) VALUES(%s)' % (table_name, fieldstr, valstr)
        try:
            last_id = self.execute(sql, *values)
            return last_id
        except Exception as e:
            if e.args[0] == 1062:
                # just skip duplicated item
                pass
            else:
                traceback.print_exc()
                print('sql:', sql)
                print('item:')
                for i in range(len(fields)):
                    vs = str(values[i])
                    if len(vs) > 300:
                        print(fields[i], ' : ', len(vs), type(values[i]))
                    else:
                        print(fields[i], ' : ', vs, type(values[i]))
                raise e

    def table_update(self, table_name, updates,
                     field_where, value_where):
        '''updates is a dict of {field_update:value_update}'''
        upsets = []
        values = []
        for k, v in updates.items():
            s = '%s=%%s' % k
            upsets.append(s)
            values.append(v)
        upsets = ','.join(upsets)
        sql = 'UPDATE %s SET %s WHERE %s="%s"' % (
            table_name,
            upsets,
            field_where, value_where,
        )
        self.execute(sql, *(values))

4.2.3 使用方法

这个实现是对 pymysql 的简单封装,但提供了一些方便的操作:

1. 建立 MySQL 连接
db = Connection(
    'localhost',
    'db_name',
    'user',
    'password'
)

一般只需要四个参数就可以建立连接了:

  • host:数据库地址,本节就是 localhost
  • database: 数据库名
  • user: 数据库用户名
  • password:数据库用户的密码

后面还有几个参数可酌情使用:

  • max_idle_time: MySQL server 默认 8 小时闲置就会断开客户端的连接;这个参数告诉客户端闲置多长时间要重新连接;
  • time_zone: 这里默认时区为 0 区,你可以设置为自己的时区,比如东 8 区 +8:00;
  • charset:默认为 utf8mb4,即支持 moji 字符的 utf8;
2. 操作数据库

数据库操作分为两类:读和写。

读操作: 使用 get() 获取一个数据,返回的是一个 dict,key 就是数据库表的字段;使用 query() 来获取一组数据,返回的是一个 list,其中每个 item 就是一个 dict,跟 get() 返回的字典一样。

写操作: 使用 insert() 或 execute(),看源码就知道,inseret 就是 execute 的别名。

3. 高级操作

以 table_ 开头的方法:

  • table_has() 查询某个值是否存在于表中。查询的字段最好建立的在 MySQL 中建立了索引,不然数据量稍大就会很慢。
  • table_insert() 把一个字典类型的数据插入表中。字典的 key 必须是表的字段。
  • table_update() 更新表中的一条记录。其中, field_where 最好是建立了索引,不然数据量稍大就会很慢。

好了,这就是我们封装的 MySQL 数据库模块,通过简洁的方法来使用,加快我们今后写爬虫的速度,是写爬虫存储数据的居家必备之良器哦,还不赶紧收藏起来。

5. 爬虫知识点

  1. logging 模块

    Python提供的输出日志的模块,可以输出到屏幕(stdout、stderr),也可以输出到文件。爬虫在运行过程中,可能会碰到千奇百怪的异常,把这些异常都记录下来,可以很好的帮助改善爬虫。

  2. pymysql

    一个纯 Python 实现的 MySQL 客户端。在使用中,我们把它封装为 ezpymysql。

5. 实现一个同步定向新闻爬虫

前面,我们先写了一个简单的百度新闻爬虫,可是它槽点满满。接着,我们实现了一些模块,来为我们的爬虫提供基础功能,包括:网络请求、网址池、MySQL封装。

有了这些基础模块,我们的就可以实现一个更通用化的新闻爬虫了。为什么要加“定向”这个修饰词呢?因为我们的爬虫不是漫无目的的广撒网(广撒网给我们带来的服务器、带宽压力是指数级增长的),而是在我们规定的一个范围内进行爬取。

用python实现一个同步定向新闻爬虫

这个范围如何规定呢?

我们称之为:hub 列表。在实现网址池的到时候,我们简单介绍了 hub 页面是什么,这里我们再简单定义一下它:hub 页面就是含有大量新闻链接、不断更新的网页。

我们收集大量不同新闻网站的 hub 页面组成一个列表,并配置给新闻爬虫,也就是我们给爬虫规定了抓取范围:host 跟 hub 列表里面提到的 host 一样的新闻我们才抓。这样可以有些控制爬虫只抓我们感兴趣的新闻而不跑偏乱抓一气。

这里要实现的新闻爬虫还有一个定语 “同步”,没错,这次实现的是同步机制下的爬虫。后面会有异步爬虫的实现。

同步和异步的思维方式不太一样,同步的逻辑更清晰,所以我们先把同步爬虫搞清楚,后面再实现异步爬虫就相对简单些,同时也可以对比同步和异步两种不同机制下爬虫的抓取效率。

这个爬虫需要用到 MySQL 数据库,在开始写爬虫之前,我们要简单设计一下数据库的表结构:

1. 数据库设计

创建一个名为 crawler 的数据库,并创建爬虫需要的两个表:

crawler_hub :此表用于存储 hub 页面的 url

+------------+------------------+------+-----+-------------------+----------------+
| Field      | Type             | Null | Key | Default           | Extra          |
+------------+------------------+------+-----+-------------------+----------------+
| id         | int(10) unsigned | NO   | PRI | NULL              | auto_increment |
| url        | varchar(64)      | NO   | UNI | NULL              |                |
| created_at | timestamp        | NO   |     | CURRENT_TIMESTAMP |                |
+------------+------------------+------+-----+-------------------+----------------+

创建该表的语句就是:

CREATE TABLE `crawler_hub` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `url` varchar(64) NOT NULL,
  `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  UNIQUE KEY `url` (`url`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8

对 url 字段建立唯一索引,可以防止重复插入相同的 url。

crawler_html :此表存储 html 内容

html 是大量的文本内容,压缩存储会大大减少磁盘使用量。这里,我们选用 lzma 压缩算法。表的结构如下:

+------------+---------------------+------+-----+-------------------+----------------+
| Field      | Type                | Null | Key | Default           | Extra          |
+------------+---------------------+------+-----+-------------------+----------------+
| id         | bigint(20) unsigned | NO   | PRI | NULL              | auto_increment |
| urlhash    | bigint(20) unsigned | NO   | UNI | NULL              |                |
| url        | varchar(512)        | NO   |     | NULL              |                |
| html_lzma  | longblob            | NO   |     | NULL              |                |
| created_at | timestamp           | YES  |     | CURRENT_TIMESTAMP |                |
+------------+---------------------+------+-----+-------------------+----------------+

创建该表的语句为:

CREATE TABLE `crawler_html` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `urlhash` bigint(20) unsigned NOT NULL COMMENT 'farmhash',
  `url` varchar(512) NOT NULL,
  `html_lzma` longblob NOT NULL,
  `created_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  UNIQUE KEY `urlhash` (`urlhash`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

该表中,我们存储了 url 的 64 位的 farmhash ,并对这个 urlhash 建立了唯一索引。id 类型为无符号的 bigint ,也就是 2 的 64 次方,足够放下你能抓取的网页。

farmhash 是 Google 开源的一个 hash 算法。64 位的 hash 空间有 2 的 64 次方那么大,大到随意把 url 映射为一个 64 位无符号整数,也不会出现 hash 碰撞。悦创使用它多年也未发现 hash 碰撞的问题。

由于上传到 pypi 时,farmhash 这个包名不能用,就以 pyfarmhash 上传到 pypi 上了,所以要安装 farmhash 的 python 包,应该是:

pip install pyfarmhash

数据库建立好后,我们就可以开始写爬虫的代码了。

2. 新闻爬虫的代码实现

#!/usr/bin/env python3
# Author: aiyuechuang

import urllib.parse as urlparse
import lzma
import farmhash
import traceback


from ezpymysql import Connection
from urlpool import UrlPool
import functions as fn
import config

class NewsCrawlerSync:
    def __init__(self, name):
        self.db = Connection(
            config.db_host,
            config.db_db,
            config.db_user,
            config.db_password
        )
        self.logger = fn.init_file_logger(name + '.log')
        self.urlpool = UrlPool(name)
        self.hub_hosts = None
        self.load_hubs()

    def load_hubs(self,):
        sql = 'select url from crawler_hub'
        data = self.db.query(sql)
        self.hub_hosts = set()
        hubs = []
        for d in data:
            host = urlparse.urlparse(d['url']).netloc
            self.hub_hosts.add(host)
            hubs.append(d['url'])
        self.urlpool.set_hubs(hubs, 300)

    def save_to_db(self, url, html):
        urlhash = farmhash.hash64(url)
        sql = 'select url from crawler_html where urlhash=%s'
        d = self.db.get(sql, urlhash)
        if d:
            if d['url'] != url:
                msg = 'farmhash collision: %s <=> %s' % (url, d['url'])
                self.logger.error(msg)
            return True
        if isinstance(html, str):
            html = html.encode('utf8')
        html_lzma = lzma.compress(html)
        sql = ('insert into crawler_html(urlhash, url, html_lzma) '
               'values(%s, %s, %s)')
        good = False
        try:
            self.db.execute(sql, urlhash, url, html_lzma)
            good = True
        except Exception as e:
            if e.args[0] == 1062:
                # Duplicate entry
                good = True
                pass
            else:
                traceback.print_exc()
                raise e
        return good

    def filter_good(self, urls):
        goodlinks = []
        for url in urls:
            host = urlparse.urlparse(url).netloc
            if host in self.hub_hosts:
                goodlinks.append(url)
        return goodlinks

    def process(self, url, ishub):
        status, html, redirected_url = fn.downloader(url)
        self.urlpool.set_status(url, status)
        if redirected_url != url:
            self.urlpool.set_status(redirected_url, status)
        # 提取hub网页中的链接, 新闻网页中也有“相关新闻”的链接,按需提取
        if status != 200:
            return
        if ishub:
            newlinks = fn.extract_links_re(redirected_url, html)
            goodlinks = self.filter_good(newlinks)
            print("%s/%s, goodlinks/newlinks" % (len(goodlinks), len(newlinks)))
            self.urlpool.addmany(goodlinks)
        else:
            self.save_to_db(redirected_url, html)

    def run(self,):
        while 1:
            urls = self.urlpool.pop(5)
            for url, ishub in urls.items():
                self.process(url, ishub)


if __name__ == '__main__':
    crawler = NewsCrawlerSync('yuanrenxyue')
    crawler.run()

3. 新闻爬虫的实现原理

上面代码就是在基础模块的基础上,实现的完整的新闻爬虫的代码。

它的流程大致如下图所示:

新闻爬虫流程图

我们把爬虫设计为一个类,类在初始化时,连接数据库,初始化 logger,创建网址池,加载 hubs 并设置到网址池。

爬虫开始运行的入口就是 run() ,它是一个 while 循环,设计为永不停息的爬。先从网址池获取一定数量的 url,然后对每个 url 进行处理,

处理 url 也就是实施抓取任务的是 process() ,它先通过 downloader 下载网页,然后在网址池中设置该 url 的状态。接着,从下载得到的 html 提取网址,并对得到的网址进行过滤(filter_good()),过滤的原则是,它们的 host 必须是 hubs的host 。最后把下载得到的 html 存储到数据。

运行这个新闻爬虫很简单,生成一个 NewsCrawlerSync 的对象,然后调用 run() 即可。当然,在运行之前,要先在 config.py 里面配置 MySQL 的用户名和密码,也要在 crawler_hub 表里面添加几个 hub 网址才行。

思考题: 如何收集大量 hub 列表

比如,我想要抓新浪新闻 news.sina.com.cn , 其首页是一个 hub 页面,但是,如何通过它获得新浪新闻更多的 hub 页面呢?同学们不妨思考一下这个问题,并用代码来实现一下。

这个时候已经抓取到很多网页了,但是怎么抽取网页里的文字呢?

6. 网页正文的提取

前面我们实现的新闻爬虫,运行起来后很快就可以抓取大量新闻网页,存到数据库里面的都是网页的 html 代码,并不是我们想要的最终结果。最终结果应该是结构化的数据,包含的信息至少有 url,标题、发布时间、正文内容、来源网站等。

网页正文抽取的方法

所以,爬虫不仅要干下载的活儿,清理、提取数据的活儿也得干。所以说嘛,写爬虫是综合能力的体现。

一个典型的新闻网页包括几个不同区域:

新闻网页区域

我们要提取的新闻要素包含在:

  • 标题区域
  • meta 数据区域(发布时间等)
  • 配图区域(如果想把配图也提取)
  • 正文区域

而导航栏区域、相关链接区域的文字就不属于该新闻的要素。

新闻的标题、发布时间、正文内容一般都是从我们抓取的 html 里面提取的。如果仅仅是一个网站的新闻网页,提取这三个内容很简单,写三个正则表达式就可以完美提取了。然而,我们的爬虫抓来的是成百上千的网站的网页。对这么多不同格式的网页写正则表达式会累死人的,而且网页一旦稍微改版,表达式可能就失效,维护这群表达式也是会累死人的。

累死人的做法当然想不通,我们就要探索一下好的算法来实现。

1. 标题的提取

标题基本上都会出现在 html 的 <title> 标签里面,但是又被附加了诸如频道名称、网站名称等信息;

标题还会出现在网页的 “标题区域” 。

那么这两个地方,从哪里提取标题比较容易呢?

网页的 “标题区域” 没有明显的标识,不同网站的 “标题区域” 的 html 代码部分千差万别。所以这个区域并不容易提取出来。

那么就只剩下 <title> 标签了,这个标签很容易提取,无论是正则表达式,还是 lxml 解析都很容易,不容易的是如何去除频道名称、网站名称等信息。

先来看看, <title> 标签里面都是设么样子的附加信息:

  • 上海用“智慧”激活城市交通脉搏,让道路更安全更有序更通畅_浦江头条_澎湃新闻-The Paper
  • “沪港大学联盟”今天在复旦大学成立_教育_新民网
  • 三亚老人脚踹司机致公交车失控撞墙 被判刑3年_社会
  • 外交部:中美外交安全对话9日在美举行
  • 进博会:中国行动全球瞩目,中国担当世界点赞_南方观澜_南方网
  • 资本市场迎来重大改革 设立科创板有何深意?-新华网

观察这些 title 不难发现,新闻标题和频道名、网站名之间都是有一些连接符号的。那么我就可以通过这些连接符吧 title 分割,找出最长的部分就是新闻标题了。

这个思路也很容易实现,这里就不再上代码了,留给同学们作为思考练习题自己实现一下。

2. 发布时间提取

发布时间,指的是这个网页在该网站上线的时间,一般它会出现在正文标题的下方—— meta 数据区域。从 html 代码看,这个区域没有什么特殊特征让我们定位,尤其是在非常多的网站版面面前,定位这个区域几乎是不可能的。这需要我们另辟蹊径。
跟标题一样,我们也先看看一些网站的发布时间都是怎么写的:

  • 央视网 2018年11月06日 22:22
  • 时间:2018-11-07 14:27:00
  • 2018-11-07 11:20:37 来源: 新华网
  • 来源:中国日报网 2018-11-07 08:06:39
  • 2018年11月07日 07:39:19
  • 2018-11-06 09:58 来源:澎湃新闻

这些写在网页上的发布时间,都有一个共同的特点,那就是一个表示时间的字符串,年月日时分秒,无外乎这几个要素。通过正则表达式,我们列举一些不同时间表达方式(也就那么几种)的正则表达式,就可以从网页文本中进行匹配提取发布时间了。

这也是一个很容易实现的思路,但是细节比较多,表达方式要涵盖的尽可能多,写好这么一个提取发布时间的函数也不是那么容易的哦。同学们尽情发挥动手能力,看看自己能写出怎样的函数实现。这也是留给同学们的一道练习题。

3. 正文的提取

正文(包括新闻配图)是一个新闻网页的主体部分,它在视觉上占据中间位置,是新闻的内容主要的文字区域。正文的提取有很多种方法,实现上有复杂也有简单。本文介绍的方法,是结合悦创多年的实践经验和思考得出来的一个简单快速的方法,姑且称之为“节点文本密度法”。

我们知道,网页的 html 代码是由不同的标签(tag)组成了一个树状结构树,每个标签是树的一个节点。通过遍历这个树状结构的每个节点,找到文本最多的节点,它就是正文所在的节点。根据这个思路,我们来实现一下代码。

3.1 实现源码

#!/usr/bin/env python3
#File: maincontent.py
#Author: aiyuechuang

import re
import time
import traceback

import cchardet
import lxml
import lxml.html
from lxml.html import HtmlComment

REGEXES = {
    'okMaybeItsACandidateRe': re.compile(
        'and|article|artical|body|column|main|shadow', re.I),
    'positiveRe': re.compile(
        ('article|arti|body|content|entry|hentry|main|page|'
         'artical|zoom|arti|context|message|editor|'
         'pagination|post|txt|text|blog|story'), re.I),
    'negativeRe': re.compile(
        ('copyright|combx|comment|com-|contact|foot|footer|footnote|decl|copy|'
         'notice|'
         'masthead|media|meta|outbrain|promo|related|scroll|link|pagebottom|bottom|'
         'other|shoutbox|sidebar|sponsor|shopping|tags|tool|widget'), re.I),
}



class MainContent:
    def __init__(self,):
        self.non_content_tag = set([
            'head',
            'meta',
            'script',
            'style',
            'object', 'embed',
            'iframe',
            'marquee',
            'select',
        ])
        self.title = ''
        self.p_space = re.compile(r'\s')
        self.p_html = re.compile(r'<html|</html>', re.IGNORECASE|re.DOTALL)
        self.p_content_stop = re.compile(r'正文.*结束|正文下|相关阅读|声明')
        self.p_clean_tree = re.compile(r'author|post-add|copyright')

    def get_title(self, doc):
        title = ''
        title_el = doc.xpath('//title')
        if title_el:
            title = title_el[0].text_content().strip()
        if len(title) < 7:
            tt = doc.xpath('//meta[@name="title"]')
            if tt:
                title = tt[0].get('content', '')
        if len(title) < 7:
            tt = doc.xpath('//*[contains(@id, "title") or contains(@class, "title")]')
            if not tt:
                tt =  doc.xpath('//*[contains(@id, "font01") or contains(@class, "font01")]')
            for t in tt:
                ti = t.text_content().strip()
                if ti in title and len(ti)*2 > len(title):
                    title = ti
                    break
                if len(ti) > 20: continue
                if len(ti) > len(title) or len(ti) > 7:
                    title = ti
        return title

    def shorten_title(self, title):
        spliters = [' - ', '–', '—', '-', '|', '::']
        for s in spliters:
            if s not in title:
                continue
            tts = title.split(s)
            if len(tts) < 2:
                continue
            title = tts[0]
            break
        return title

    def calc_node_weight(self, node):
        weight = 1
        attr = '%s %s %s' % (
            node.get('class', ''),
            node.get('id', ''),
            node.get('style', '')
        )
        if attr:
            mm = REGEXES['negativeRe'].findall(attr)
            weight -= 2 * len(mm)
            mm = REGEXES['positiveRe'].findall(attr)
            weight += 4 * len(mm)
        if node.tag in ['div', 'p', 'table']:
            weight += 2
        return weight

    def get_main_block(self, url, html, short_title=True):
        ''' return (title, etree_of_main_content_block)
        '''
        if isinstance(html, bytes):
            encoding = cchardet.detect(html)['encoding']
            if encoding is None:
                return None, None
            html = html.decode(encoding, 'ignore')
        try:
            doc = lxml.html.fromstring(html)
            doc.make_links_absolute(base_url=url)
        except :
            traceback.print_exc()
            return None, None
        self.title = self.get_title(doc)
        if short_title:
            self.title = self.shorten_title(self.title)
        body = doc.xpath('//body')
        if not body:
            return self.title, None
        candidates = []
        nodes = body[0].getchildren()
        while nodes:
            node = nodes.pop(0)
            children = node.getchildren()
            tlen = 0
            for child in children:
                if isinstance(child, HtmlComment):
                    continue
                if child.tag in self.non_content_tag:
                    continue
                if child.tag == 'a':
                    continue
                if child.tag == 'textarea':
                    # FIXME: this tag is only part of content?
                    continue
                attr = '%s%s%s' % (child.get('class', ''),
                                   child.get('id', ''),
                                   child.get('style'))
                if 'display' in attr and 'none' in attr:
                    continue
                nodes.append(child)
                if child.tag == 'p':
                    weight = 3
                else:
                    weight = 1
                text = '' if not child.text else child.text.strip()
                tail = '' if not child.tail else child.tail.strip()
                tlen += (len(text) + len(tail)) * weight
            if tlen < 10:
                continue
            weight = self.calc_node_weight(node)
            candidates.append((node, tlen*weight))
        if not candidates:
            return self.title, None
        candidates.sort(key=lambda a: a[1], reverse=True)
        good = candidates[0][0]
        if good.tag in ['p', 'pre', 'code', 'blockquote']:
            for i in range(5):
                good = good.getparent()
                if good.tag == 'div':
                    break
        good = self.clean_etree(good, url)
        return self.title, good

    def clean_etree(self, tree, url=''):
        to_drop = []
        drop_left = False
        for node in tree.iterdescendants():
            if drop_left:
                to_drop.append(node)
                continue
            if isinstance(node, HtmlComment):
                to_drop.append(node)
                if self.p_content_stop.search(node.text):
                    drop_left = True
                continue
            if node.tag in self.non_content_tag:
                to_drop.append(node)
                continue
            attr = '%s %s' % (
                node.get('class', ''),
                node.get('id', '')
            )
            if self.p_clean_tree.search(attr):
                to_drop.append(node)
                continue
            aa = node.xpath('.//a')
            if aa:
                text_node = len(self.p_space.sub('', node.text_content()))
                text_aa = 0
                for a in aa:
                    alen = len(self.p_space.sub('', a.text_content()))
                    if alen > 5:
                        text_aa += alen
                if text_aa > text_node * 0.4:
                    to_drop.append(node)
        for node in to_drop:
            try:
                node.drop_tree()
            except:
                pass
        return tree

    def get_text(self, doc):
        lxml.etree.strip_elements(doc, 'script')
        lxml.etree.strip_elements(doc, 'style')
        for ch in doc.iterdescendants():
            if not isinstance(ch.tag, str):
                continue
            if ch.tag in ['div', 'h1', 'h2', 'h3', 'p', 'br', 'table', 'tr', 'dl']:
                if not ch.tail:
                    ch.tail = '\n'
                else:
                    ch.tail = '\n' + ch.tail.strip() + '\n'
            if ch.tag in ['th', 'td']:
                if not ch.text:
                    ch.text = '  '
                else:
                    ch.text += '  '
            # if ch.tail:
            #     ch.tail = ch.tail.strip()
        lines = doc.text_content().split('\n')
        content = []
        for l in lines:
            l = l.strip()
            if not l:
                continue
            content.append(l)
        return '\n'.join(content)

    def extract(self, url, html):
        '''return (title, content)
        '''
        title, node = self.get_main_block(url, html)
        if node is None:
            print('\tno main block got !!!!!', url)
            return title, '', ''
        content = self.get_text(node)
        return title, content

3.2 代码解析

跟新闻爬虫一样,我们把整个算法实现为一个类:MainContent。

首先,定义了一个全局变量: REGEXES。它收集了一些经常出现在标签的 class 和 id 中的关键词,这些词标识着该标签可能是正文或者不是。我们用这些词来给标签节点计算权重,也就是方法 calc_node_weight() 的作用。

MainContent 类的初始化,先定义了一些不会包含正文的标签 self.non_content_tag,遇到这些标签节点,直接忽略掉即可。

本算法提取标题实现在 get_title() 这个函数里面。首先,它先获得 <title> 标签的内容,然后试着从 <meta> 里面找 title,再尝试从 <body> 里面找 id 和 class 包含 title 的节点,最后把从不同地方获得的可能是标题的文本进行对比,最终获得标题。对比的原则是:

  • <meta> , <body> 里面找到的疑似标题如果包含在 <title> 标签里面,则它是一个干净(没有频道名、网站名)的标题;
  • 如果疑似标题太长就忽略
  • 主要把 <title> 标签作为标题

<title> 标签里面获得标题,就要解决标题清洗的问题。这里实现了一个简单的方法: clean_title()。

在这个实现中,我们使用了 lxml.html 把网页的 html 转化成一棵树,从 body 节点开始遍历每一个节点,看它直接包含(不含子节点)的文本的长度,从中找出含有最长文本的节点。这个过程实现在方法:get_main_block() 中。其中一些细节,同学们可以仔细体会一下。

其中一个细节就是,clean_node() 这个函数。通过 get_main_block() 得到的节点,有可能包含相关新闻的链接,这些链接包含大量新闻标题,如果不去除,就会给新闻内容带来杂质(相关新闻的标题、概述等)。

还有一个细节,get_text() 函数。我们从 main block 中提取文本内容,不是直接使用 text_content() ,而是做了一些格式方面的处理,比如在一些标签后面加入换行符合 \n,在 table 的单元格之间加入空格。这样处理后,得到的文本格式比较符合原始网页的效果。

4. 爬虫知识点

  1. cchardet 模块

    用于快速判断文本编码的模块

  2. lxml.html 模块

    结构化 html 代码的模块,通过 xpath 解析网页的工具,高效易用,是写爬虫的居家必备的模块。

  3. 内容提取的复杂性

    我们这里实现的正文提取的算法,基本上可以正确处理 90%以上的新闻网页。

    但是,世界上没有千篇一律的网页一样,也没有一劳永逸的提取算法。大规模使用本文算法的过程中,你会碰到奇葩的网页,这个时候,你就要针对这些网页,来完善这个算法类。

5. 思考题

  1. 通过代码实现:从 <title> 标签的字符串里面去除频道名称、网站名称等杂质而得到干净的新闻标题。
  2. 通过代码实现:从网页文字中提取发布时间的函数。(提示:用正则表达式进行提取)

7. 用 asyncio 实现异步爬虫

“等了好久终于等到今天,梦里好久终于把梦实现”,脑海里不禁响起来刘德华这首歌。是啊,终于可以写我最喜欢的异步爬虫了。前面那么多章节,一步一步、循序渐进的讲解,实在是“唠叨”了不少,可是为了小猿们能由浅入深的学习爬虫,老猿我又不得不说那么多“唠叨”,可把我给憋死了,今天就大书特书异步爬虫,说个痛快!

关于异步 IO 这个概念,可能有些同学们不是非常明白,那就先来看看异步 IO 是怎么回事儿。

为了大家能够更形象得理解这个概念,我们拿放羊来打个比方:

  • 下载请求开始,就是放羊出去吃草;
  • 下载任务完成,就是羊吃饱回羊圈。

同步放羊的过程就是这样的:

羊倌儿小同要放100只羊,他就先放一只羊出去吃草,等羊吃饱了回来在放第二只羊,等第二只羊吃饱了回来再放第三只羊出去吃草……这样放羊的羊倌儿实在是……

再看看异步放羊的过程:

羊倌儿小异也要放100只羊,他观察后发现,小同放羊的方法比较笨,他觉得草地一下能容下10只羊(带宽)吃草,所以它就一次放出去10只羊等它们回来,然后他还可以给羊剪剪羊毛。有的羊吃得快回来的早,他就把羊关到羊圈接着就再放出去几只,尽量保证草地上都有10只羊在吃草。

很明显,异步放羊的效率高多了。同样的,网络世界里也是异步的效率高。

到了这里,可能有同学们要问,为什么不用多线程、多进程实现爬虫呢? 没错,多线程和多进程也可以提高前面那个同步爬虫的抓取效率,但是异步 IO 提高的更多,也更适合爬虫这个场景。后面机会我们可以对比一下三者抓取的效率。

1. 异步的 downloader

还记得我们之前使用 requests 实现的那个 downloader 吗?同步情况下,它很好用,但不适合异步,所以我们要先改造它。幸运的是,已经有 aiohttp 模块来支持异步 http 请求了,那么我们就用 aiohttp 来实现异步 downloader 。

async def fetch(session, url, headers=None, timeout=9):
    _headers = {
        'User-Agent': ('Mozilla/5.0 (compatible; MSIE 9.0; '
                       'Windows NT 6.1; Win64; x64; Trident/5.0)'),
    }
    if headers:
        _headers = headers
    try:
        async with session.get(url, headers=_headers, timeout=timeout) as response:
            status = response.status
            html = await response.read()
            encoding = response.get_encoding()
            if encoding == 'gb2312':
                encoding = 'gbk'
            html = html.decode(encoding, errors='ignore')
            redirected_url = str(response.url)
    except Exception as e:
        msg = 'Failed download: {} | exception: {}, {}'.format(url, str(type(e)), str(e))
        print(msg)
        html = ''
        status = 0
        redirected_url = url
    return status, html, redirected_url

这个异步的 downloader,我们称之为 fetch() ,它有两个必须参数:

  • seesion: 这是一个 aiohttp.ClientSession 的对象,这个对象的初始化在 crawler 里面完成,每次调用 fetch() 时,作为参数传递。
  • url:这是需要下载的网址。

实现中使用了异步上下文管理器(async with),编码的判断我们还是用 cchardet 来实现。
有了异步下载器,我们的异步爬虫就可以写起来啦~

2. 异步新闻爬虫

跟同步爬虫一样,我们还是把整个爬虫定义为一个类,它的主要成员有:

  • self.urlpool 网址池
  • self.loop 异步的事件循环
  • self.seesion aiohttp.ClientSession 的对象,用于异步下载
  • self.db 基于aiomysql的异步数据库连接
  • self._workers 当前并发下载(放出去的羊)的数量

通过这几个主要成员来达到异步控制、异步下载、异步存储(数据库)的目的,其它成员作为辅助。爬虫类的相关方法,参加下面的完整实现代码:

#!/usr/bin/env python3
# File: news-crawler-async.py
# Author: aiyuechuang

import traceback
import time
import asyncio
import aiohttp
import urllib.parse as urlparse
import farmhash
import lzma

import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

import sanicdb

from urlpool import UrlPool
import functions as fn
import config


class NewsCrawlerAsync:
    def __init__(self, name):
        self._workers = 0
        self._workers_max = 30
        self.logger = fn.init_file_logger(name+ '.log')

        self.urlpool = UrlPool(name)

        self.loop = asyncio.get_event_loop()
        self.session = aiohttp.ClientSession(loop=self.loop)
        self.db = sanicdb.SanicDB(
            config.db_host,
            config.db_db,
            config.db_user,
            config.db_password,
            loop=self.loop
        )

    async def load_hubs(self,):
        sql = 'select url from crawler_hub'
        data = await self.db.query(sql)
        self.hub_hosts = set()
        hubs = []
        for d in data:
            host = urlparse.urlparse(d['url']).netloc
            self.hub_hosts.add(host)
            hubs.append(d['url'])
        self.urlpool.set_hubs(hubs, 300)

    async def save_to_db(self, url, html):
        urlhash = farmhash.hash64(url)
        sql = 'select url from crawler_html where urlhash=%s'
        d = await self.db.get(sql, urlhash)
        if d:
            if d['url'] != url:
                msg = 'farmhash collision: %s <=> %s' % (url, d['url'])
                self.logger.error(msg)
            return True
        if isinstance(html, str):
            html = html.encode('utf8')
        html_lzma = lzma.compress(html)
        sql = ('insert into crawler_html(urlhash, url, html_lzma) '
               'values(%s, %s, %s)')
        good = False
        try:
            await self.db.execute(sql, urlhash, url, html_lzma)
            good = True
        except Exception as e:
            if e.args[0] == 1062:
                # Duplicate entry
                good = True
                pass
            else:
                traceback.print_exc()
                raise e
        return good

    def filter_good(self, urls):
        goodlinks = []
        for url in urls:
            host = urlparse.urlparse(url).netloc
            if host in self.hub_hosts:
                goodlinks.append(url)
        return goodlinks

    async def process(self, url, ishub):
        status, html, redirected_url = await fn.fetch(self.session, url)
        self.urlpool.set_status(url, status)
        if redirected_url != url:
            self.urlpool.set_status(redirected_url, status)
        # 提取hub网页中的链接, 新闻网页中也有“相关新闻”的链接,按需提取
        if status != 200:
            return
        if ishub:
            newlinks = fn.extract_links_re(redirected_url, html)
            goodlinks = self.filter_good(newlinks)
            print("%s/%s, goodlinks/newlinks" % (len(goodlinks), len(newlinks)))
            self.urlpool.addmany(goodlinks)
        else:
            await self.save_to_db(redirected_url, html)
        self._workers -= 1

    async def loop_crawl(self,):
        await self.load_hubs()
        last_rating_time = time.time()
        counter = 0
        while 1:
            tasks = self.urlpool.pop(self._workers_max)
            if not tasks:
                print('no url to crawl, sleep')
                await asyncio.sleep(3)
                continue
            for url, ishub in tasks.items():
                self._workers += 1
                counter += 1
                print('crawl:', url)
                asyncio.ensure_future(self.process(url, ishub))

            gap = time.time() - last_rating_time
            if gap > 5:
                rate = counter / gap
                print('\tloop_crawl() rate:%s, counter: %s, workers: %s' % (round(rate, 2), counter, self._workers))
                last_rating_time = time.time()
                counter = 0
            if self._workers > self._workers_max:
                print('====== got workers_max, sleep 3 sec to next worker =====')
                await asyncio.sleep(3)

    def run(self):
        try:
            self.loop.run_until_complete(self.loop_crawl())
        except KeyboardInterrupt:
            print('stopped by yourself!')
            del self.urlpool
            pass


if __name__ == '__main__':
    nc = NewsCrawlerAsync('yrx-async')
    nc.run()

爬虫的主流程是在方法 loop_crawl() 里面实现的。它的主体是一个 while 循环,每次从 self.urlpool 里面获取定量的爬虫作为下载任务(从羊圈里面选出一批羊),通过 ensure_future() 开始异步下载(把这些羊都放出去)。而 process() 这个方法的流程是下载网页并存储、提取新的 url,这就类似羊吃草、下崽等。

通过 self._workersself._workers_max 来控制并发量。不能一直并发,给本地 CPU、网络带宽带来压力,同样也会给目标服务器带来压力。

至此,我们实现了同步和异步两个新闻爬虫,分别实现了 NewsCrawlerSync 和 NewsCrawlerAsync 两个爬虫类,他们的结构几乎完全一样,只是抓取流程一个是顺序的,一个是并发的。同学们可以通过对比两个类的实现,来更好的理解异步的流程。

3. 爬虫知识点

  1. uvloop 模块

    uvloop 这个模块是用 Cython 编写建立在 libuv 库之上,它是 asyncio 内置事件循环的替代,使用它仅仅是多两行代码而已:

    import uvloop
    asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
    

    uvloop 使得 asyncio 很快,比 odejs、gevent 和其它 Python 异步框架的快至少2倍,接近于 Go 语言的性能。

    uvloop作者的性能测试

这是 uvloop 作者的性能对比测试。

目前,uvloop 不支持 Windows 系统和 Python 3.5 及其以上版本,这在它源码的 setup.py 文件中可以看到:

if sys.platform in ('win32', 'cygwin', 'cli'):
    raise RuntimeError('uvloop does not support Windows at the moment')

vi = sys.version_info
if vi < (3, 5):
    raise RuntimeError('uvloop requires Python 3.5 or greater')

所以,使用 Windows 的同学们要运行异步爬虫,就要把 uvloop 那两行注释掉哦。

4. 思考题

  1. 给同步的 downloader() 或异步的 fetch() 添加功能

    或许有些同学们还没见过这样的 html 代码,它出现在 <head> 里面:

    <meta http-equiv="refresh" content="5; url=https://example.com/">
    

    它的意思是,告诉浏览器在5秒之后跳转到另外一个 url:https://example.com/

    那么问题来了,请给 downloader(fetch())添加代码,让它支持这个跳转。

  2. 如何控制 hub 的刷新频率,及时发现最新新闻

    这是我们写新闻爬虫要考虑的一个很重要的问题,我们实现的新闻爬虫中并没有实现这个机制,同学们来思考一下,并对手实现实现。

到这悦创要讲的实现一个异步定向新闻爬虫已经讲完了,感谢你的阅读,有任何建议和问题请再下方留言,我会一一回复你,你也可以关注 AI悦创 公众号,那里可以及时看到我新发的文章。

AI悦创·创造不同!
AI悦创 » 大规模异步新闻爬虫

Leave a Reply