分析scrapy-redis源码及redis学习

redis简介

redis(Remote Dictionary Server)是一个开源的基于内存(也可以持久化)的日志型key-value数据库,可以用作数据库(DB)、缓存(Cache)、消息队列(MQ)等。

优势

传统的数据库,例如Mysql,这类数据库的性能瓶颈越来越明显,这主要是由于磁盘IO导致的,磁盘的读写速度远不如内存读写快。

相比之下,基于内存的redis性能高于这些传统数据库。——redis是单线程(避免上下文切换)、内存操作、I/O多路复用的集合体,故性能特别高。

redis的持久化

这篇文章讲得挺清楚:内存快照(RDB)+日志(AOF)。

初次连接先用快照,再通过执行增量命令,使得主从服务器达到一致状态。

https://cloud.tencent.com/developer/article/2529898

然而,一般写爬虫存储数据的时候,redis一般用于缓存和小项目的储存,这时,还需要一个NoSQL类型的数据库进行长久储存:MongoDB,之后我会学一下的。

安装

在Linux直接apt-get安装,windows的安装比较麻烦。

GUI工具

使用免费的开源项目。

https://github.com/qishibo/AnotherRedisDesktopManager/releases/tag/v1.7.1

简单使用

【5分钟Python学会操作Redis数据库】 https://www.bilibili.com/video/BV1T54y1U7kb/?share_source=copy_web&vd_source=51797c11bb8b5031197f44a3ad9e668d

下面是python的api调用。

连接redis

1
2
3
4
5
6
7
import redis

# 建立连接,decode_responses=True 会自动将 bytes 转为 str,选中第0个db数据库
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

# 为了演示,先清空当前数据库
r.flushdb()

字符串

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 1. set(key, value, ex=None): 设置一个键值对,ex 是可选的过期时间(秒)
r.set("user:1:name", "Alice")
print(f"设置 user:1:name -> {r.get('user:1:name')}")

# 2. get(key): 获取一个键的值
user_name = r.get("user:1:name")
print(f"获取 user:1:name -> {user_name}")

# 3. setex(key, time, value): 设置一个带过期时间的键值对
r.setex("session:token", 60, "xyz-abc-123") # 60秒后过期
print(f"获取 session:token -> {r.get('session:token')}")
print(f"session:token 的剩余时间 -> {r.ttl('session:token')}")

# 4. incr(key, amount=1): 将键的值(必须是数字)增加指定的量(默认为1)
r.set("page:views", 100)
r.incr("page:views") # 增加1
print(f"incr page:views -> {r.get('page:views')}")
r.incr("page:views", 10) # 增加10
print(f"incr page:views by 10 -> {r.get('page:views')}")

# 5. decr(key, amount=1): 将键的值减少指定的量
r.decr("page:views", 5)
print(f"decr page:views by 5 -> {r.get('page:views')}")

哈希

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 1. hset(key, mapping={...}) 或 hset(key, field, value): 设置哈希中的字段
# 使用 mapping 一次性设置多个字段
r.hset("user:1", mapping={"name": "Bob", "age": 30, "city": "London"})

# 单独设置或更新一个字段
r.hset("user:1", "email", "bob@example.com")
print(f"设置 user:1 哈希 -> {r.hgetall('user:1')}")

# 2. hget(key, field): 获取哈希中指定字段的值
user_age = r.hget("user:1", "age")
print(f"获取 user:1 的 age -> {user_age}")

# 3. hgetall(key): 获取哈希中所有的字段和值,返回一个字典
all_user_info = r.hgetall("user:1")
print(f"获取 user:1 的所有信息 -> {all_user_info}")

# 4. hdel(key, *fields): 删除哈希中的一个或多个字段
r.hdel("user:1", "city")
print(f"删除 city 字段后 -> {r.hgetall('user:1')}")

# 5. hkeys(key): 获取哈希中所有的字段名
fields = r.hkeys("user:1")
print(f"user:1 的所有字段 -> {fields}")

列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 1. lpush(key, *values): 从列表左侧(头部)推入一个或多个值
r.lpush("tasks", "task3", "task2")
r.lpush("tasks", "task4") # tasks 现在的顺序: [task4, task2, task3]
print(f"lpush 后的列表 -> {r.lrange('tasks', 0, -1)}")

# 2. rpush(key, *values): 从列表右侧(尾部)推入一个或多个值
r.rpush("tasks", "task1") # tasks 现在的顺序: [task4, task2, task3, task1]
print(f"rpush 后的列表 -> {r.lrange('tasks', 0, -1)}")

# 3. lpop(key): 从列表左侧(头部)弹出一个值
task = r.lpop("tasks")
print(f"lpop 弹出的任务 -> {task}")
print(f"lpop 后的列表 -> {r.lrange('tasks', 0, -1)}")

# 4. lrange(key, start, end): 获取指定范围内的元素(-1表示最后一个)
all_tasks = r.lrange("tasks", 0, -1)
print(f"lrange 获取所有任务 -> {all_tasks}")

# 5. llen(key): 获取列表的长度
num_tasks = r.llen("tasks")
print(f"当前任务数量 -> {num_tasks}")

集合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 1. sadd(key, *values):向集合添加一个或多个成员,重复的成员会被忽略
r.sadd("tags:post:1", "python", "redis", "database")
r.sadd("tags:post:1", "python") # 重复添加,不会有变化
print(f"tags:post:1 的所有标签 -> {r.smembers('tags:post:1')}")

# 2. srem(key, *values): 从集合中移除一个或多个成员
r.srem("tags:post:1", "database")
print(f"移除 database 后的标签 -> {r.smembers('tags:post:1')}")

# 3. sismember(key, value): 判断一个成员是否存在于集合中
is_member = r.sismember("tags:post:1", "redis")
print(f"redis 是标签吗? -> {is_member}")

# 4. smembers(key): 获取集合中的所有成员
all_tags = r.smembers("tags:post:1")
print(f"smembers 获取所有标签 -> {all_tags}") # 注意:集合是无序的

# 5. sunion(*keys): 返回所有给定集合的并集
r.sadd("tags:post:2", "python", "web", "api")
union_tags = r.sunion("tags:post:1", "tags:post:2")
print(f"post:1 和 post:2 标签的并集 -> {union_tags}")

有序集合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 1. zadd(key, mapping={...}): 添加一个或多个成员及其分数
r.zadd("leaderboard", {"Alice": 9500, "Bob": 8700, "Cathy": 9200})
print(f"获取排行榜所有成员 -> {r.zrange('leaderboard', 0, -1, withscores=True)}")

# 2. zrange(key, start, end, withscores=False): 按分数从小到大返回指定排名的成员
top_3_asc = r.zrange("leaderboard", 0, 2)
print(f"分数正序前3名 -> {top_3_asc}")

# 3. zrevrange(key, start, end, withscores=False): 按分数从大到小返回(常用于排行榜)
top_3_desc = r.zrevrange("leaderboard", 0, 2, withscores=True)
print(f"分数倒序前3名(排行榜)-> {top_3_desc}")

# 4. zrem(key, *values): 移除一个或多个成员
r.zrem("leaderboard", "Bob")
print(f"移除 Bob 后的排行榜 -> {r.zrevrange('leaderboard', 0, -1, withscores=True)}")

# 5. zscore(key, member): 获取指定成员的分数
cathy_score = r.zscore("leaderboard", "Cathy")
print(f"Cathy 的分数 -> {cathy_score}")

redis集群

数据分片、主从复制。

redis集群意味着数据需要分片,服务器会根据数据的key,生成哈希值,哈希值不同,该数据存放的哈希槽就不同,不同的主节点负责不同的哈希槽。

每个主节点都有一个或多个从节点,为保证数据的可用性,还需保证主从复制。

image-20250713091408193

redis缓存

由于redis是内存操作,不用从硬盘中取数据,常被用作缓存,一般的读取数据流程如下(旁路缓存,Cache-Aside):

  1. 应用先请求 Redis 缓存,看有没有需要的数据。

  2. 缓存命中 (Cache Hit): 如果 Redis 中有数据,则直接读取并返回给用户。流程结束。

  3. 缓存未命中 (Cache Miss): 如果 Redis 中没有数据,则去访问后端数据库

  4. 从数据库中查到数据后,先将其写入 Redis 缓存(并设置一个合适的过期时间),然后再返回给用户。这样,下次再有同样的请求,就能直接命中缓存了。

更新数据 (Write):

当数据发生变化时(例如用户修改了个人信息),为了保证数据一致性,必须同时处理数据库和缓存。常见的做法是:

  1. 先更新数据库中的数据。
  2. 然后直接从缓存中删除这个 key。

为什么是删除缓存而不是更新缓存? 这是一种叫做“懒加载”(Lazy Loading)的思想。直接删除可以保证下次读取时一定会从数据库加载最新的数据并重新写入缓存。这比直接更新缓存要简单,并且可以避免一些并发场景下的数据不一致问题。

scrapy-redis源码理解

第三方库scrapy-redis已经封装好了scrapy和redis的联合使用,实现分布式爬虫。——实际上,分布式爬虫的含义是:几个爬虫一起爬取数据,共享任务列表、调度器、数据存储表等,还具备断点续爬的特点(因为有redis数据库)。

来看看scrapy-redis是如何接入redis的,先看看源文件,相比之前的项目多了“connection.py”、“defaults.py”、“picklecompat.py”、“duperfilter.py”、“queue.py”、“stats.py”、“utils.py”,同时还修改了pipelines.py等文件。

接下来会按照调度器、去重过滤器、爬虫、管道的顺序详细介绍。

image-20250713150100411

调度器(源文件Scheduler)

调度器的默认类是 scrapy.core.scheduler.Scheduler,可以在settings.py中,通过SCHEDULER修改,官方文档如下:

image-20250713154251181

如果要使用scrapy-redis,需要将SCHEDULER设置成“<模块>.Scheduler”

__init__(构造函数)

类Scheduler构造函数,用来实例化调度器。

注释已经把构造函数的字段介绍的很清楚了,这里翻译成中文好了。

  • server: 这是与 Redis 数据库建立的连接实例。所有后续的数据库操作(如读写队列)都通过这个对象来执行。
  • persist: 一个布尔值,用于决定爬虫关闭时是否“持久化”队列。如果设为 True,请求队列和去重记录会保留在 Redis 中,方便下次启动时继续爬取。如果为 False (默认值),爬虫关闭时会清空这些数据。
  • flush_on_start: 一个布尔值,用于决定爬虫启动时是否清空之前的队列。如果设为 True,每次启动都会忽略上次遗留的请求,进行一次全新的爬取。
  • queue_key: 字符串类型,定义了在 Redis 中存储请求队列的键(key)的格式。默认值是 %(spider)s:requests,这意味着每个爬虫都有自己独立的请求队列。
  • queue_cls: 字符串类型,指向一个实现了队列功能的类的完整路径。这允许你更换不同的排队策略,例如先进先出(FIFO)、后进先出(LIFO)或默认的优先级队列(PriorityQueue)。
  • dupefilter: 这是一个去重过滤器实例。通常情况下,调度器会根据 dupefilter_cls 自己创建一个实例。但这个字段允许你传入一个已经手动配置好的实例,提供了更高的灵活性。
  • dupefilter_key: 字符串类型,定义了在 Redis 中存储请求指纹(用于去重)的键的格式。默认值是%(spider)s:dupefilter
  • dupefilter_cls: 字符串类型,指向一个实现了去重逻辑的类的完整路径。默认是scrapy_redis.dupefilter.RFPDupeFilter
  • idle_before_close: 整数类型,表示在队列变空后,调度器在关闭爬虫前应等待多少秒。如果在这期间有新的请求加入队列,爬虫会继续工作。这对于等待动态添加任务的持续性爬虫非常重要。
  • serializer: 这是一个序列化器对象。它负责将 Scrapy 的请求对象(Request)转换成可以存入 Redis 的格式(通常是字符串或字节),以及在从 Redis 取出时进行反向转换。
  • stats: 用于存储一个统计信息收集器实例,它会在调度器工作时记录相关数据(如入队/出队的请求数)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def __init__(
self,
server,
persist=False,
flush_on_start=False,
queue_key=defaults.SCHEDULER_QUEUE_KEY,
queue_cls=defaults.SCHEDULER_QUEUE_CLASS,
dupefilter=None,
dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY,
dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS,
idle_before_close=0,
serializer=None,
):
"""Initialize scheduler.

Parameters
----------
server : Redis
The redis server instance.
persist : bool
Whether to flush requests when closing. Default is False.
flush_on_start : bool
Whether to flush requests on start. Default is False.
queue_key : str
Requests queue key.
queue_cls : str
Importable path to the queue class.
dupefilter: Dupefilter
Custom dupefilter instance.
dupefilter_key : str
Duplicates filter key.
dupefilter_cls : str
Importable path to the dupefilter class.
idle_before_close : int
Timeout before giving up.

"""
if idle_before_close < 0:
raise TypeError("idle_before_close cannot be negative")

self.server = server
self.persist = persist
self.flush_on_start = flush_on_start
self.queue_key = queue_key
self.queue_cls = queue_cls
self.df = dupefilter
self.dupefilter_cls = dupefilter_cls
self.dupefilter_key = dupefilter_key
self.idle_before_close = idle_before_close
self.serializer = serializer
self.stats = None

from_settings

@classmethod修饰

这个函数使用了@classmethod修饰,是说明这个方法是一个工厂方法,它的作用是创建并返回一个类的实例,而不是操作一个已存在的实例,@classmethod修饰的方法,其第一个参数需要是类本身,而不是实例self。

行为

加载核心设置: 它首先从 settings 中读取 SCHEDULER_PERSISTSCHEDULER_FLUSH_ON_STARTSCHEDULER_IDLE_BEFORE_CLOSE 设置,并将它们存入一个名为 kwargs 的字典中 。

加载可选设置: 它会遍历一个预定义的 optional 字典,检查 settings 中是否存在如 SCHEDULER_QUEUE_CLASSDUPEFILTER_CLASS 等可选配置 。如果存在,就将它们也添加到 kwargs 字典中 。这使得用户可以只配置需要覆盖的选项,而其它选项则使用默认值。

处理去重过滤器: 它使用 load_object 函数加载去重过滤器类 。接着,它会检查这个类是否需要基于 spider 对象进行初始化(通过检查有无 from_spider 方法)。如果不需要,它会立刻用 from_settings 方法创建一个实例,并存入 kwargs

处理序列化器: 如果 serializer 是以字符串路径的形式提供的,它会使用 importlib.import_module 将其作为一个模块加载进来 。

建立并验证 Redis 连接: 它调用 connection.from_settings 来创建一个 Redis 连接实例,然后立刻执行 server.ping() 以确保该连接是真实有效的 。

创建并返回调度器实例: 最后,它使用 cls(server=server, **kwargs),将 Redis 连接和所有从配置中收集到的参数传递给 Scheduler 的构造函数,创建一个完全配置好的实例并返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@classmethod
def from_settings(cls, settings):
kwargs = {
"persist": settings.getbool("SCHEDULER_PERSIST"),
"flush_on_start": settings.getbool("SCHEDULER_FLUSH_ON_START"),
"idle_before_close": settings.getint("SCHEDULER_IDLE_BEFORE_CLOSE"),
}

# If these values are missing, it means we want to use the defaults.
optional = {
# TODO: Use custom prefixes for this settings to note that are
# specific to scrapy-redis.
"queue_key": "SCHEDULER_QUEUE_KEY",
"queue_cls": "SCHEDULER_QUEUE_CLASS",
"dupefilter_key": "SCHEDULER_DUPEFILTER_KEY",
# We use the default setting name to keep compatibility.
"dupefilter_cls": "DUPEFILTER_CLASS",
"serializer": "SCHEDULER_SERIALIZER",
}
for name, setting_name in optional.items():
val = settings.get(setting_name)
if val:
kwargs[name] = val

dupefilter_cls = load_object(kwargs["dupefilter_cls"])
if not hasattr(dupefilter_cls, "from_spider"):
kwargs["dupefilter"] = dupefilter_cls.from_settings(settings)

# Support serializer as a path to a module.
if isinstance(kwargs.get("serializer"), str):
kwargs["serializer"] = importlib.import_module(kwargs["serializer"])

server = connection.from_settings(settings)
# Ensure the connection is working.
server.ping()

return cls(server=server, **kwargs)

from_crawler

对象Crawler是Scrapy框架的核心,它持有对所有关键组件和设置引用,这里便是通过from_crawler的设置信息,初始化一个调度器实例。

这里的cls是SCHEDULER中指向的类,也就是这里的Scheduler。

1
2
3
4
5
6
@classmethod
def from_crawler(cls, crawler):
instance = cls.from_settings(crawler.settings)
# FIXME: for now, stats are only supported from this constructor
instance.stats = crawler.stats
return instance

而from_crawler也是Scrapy框架中,调度器最小接口的函数之一,由框架代码调用。

open

函数open用于获得请求队列去重过滤器的key,之后通过其它函数获得这些key的具体值。

哦,对了,函数open也是调度器最小接口的函数之一。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def open(self, spider):
self.spider = spider

try:
self.queue = load_object(self.queue_cls)(
server=self.server,
spider=spider,
key=self.queue_key % {"spider": spider.name},
serializer=self.serializer,
)
except TypeError as e:
raise ValueError(
f"Failed to instantiate queue class '{self.queue_cls}': {e}"
)

if not self.df:
self.df = load_object(self.dupefilter_cls).from_spider(spider)

if self.flush_on_start:
self.flush()
# notice if there are requests already in the queue to resume the crawl
if len(self.queue):
spider.log(f"Resuming crawl ({len(self.queue)} requests scheduled)")

self.queue通过类self.queue_cls和函数load_object获得一个队列实例,类self.queue_cls实际上就是字符串SCHEDULER_QUEUE_CLASS = "scrapy_redis.queue.PriorityQueue",函数load_object通过类的路径,对类进行加载,然后以类()的方式调用构造函数进行实例化。self.queue指向字符串SCHEDULER_QUEUE_KEY = "%(spider)s:requests",这个队列对象内部保存了要操作的 Redis 键名 (key),这个键名才是由 SCHEDULER_QUEUE_KEY 格式化而来的字符串。key为”{spider.name}:requests”的value,如果爬虫名字设置为”x14nuy”,这里的key的值就是”x14nuy:requests”。

(然后取出这个键中存储的requests请求对象,如何取出呢?通过self.serializer进行反序列化)

当然,构造函数只是设置了请求队列去重过滤器的key,还没从redis中取数据,我这里为了方便描述,把取的过程也算入了构造函数中,这是不对的。

真正取值的函数是enqueue_request/next_request。

enqueue_request/next_request(入队/出队)

enqueue是入队,next是出队。

request.dont_filter是判断当前request请求,是否被特殊标记为“不过滤”,df即DupeFilter,用来判断当前的request是否出现过。

这里的self.stats是用来统计数据的,指向的是类RedisStatsCollector的实例,用来收集信息。

self.queue.push(request)入队一个请求。

request = self.queue.pop(block_pop_timeout)出队一个请求,block_pop_timeout用于当队列为空时,阻塞一段时机再返回——这段阻塞的时候,可能会获得新请求,如果没用获得新请求,爬虫进程就结束了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def enqueue_request(self, request):
if not request.dont_filter and self.df.request_seen(request):
self.df.log(request, self.spider)
return False
if self.stats:
self.stats.inc_value("scheduler/enqueued/redis", spider=self.spider)
self.queue.push(request)
return True

def next_request(self):
block_pop_timeout = self.idle_before_close
request = self.queue.pop(block_pop_timeout)
if request and self.stats:
self.stats.inc_value("scheduler/dequeued/redis", spider=self.spider)
return request

去重过滤器(源文件DuperFilter)

在阅读调度器的代码的时候,出现了self.df.request_seen(request),这里的df是类DuperFilter的实例。

这里主要看看如何去重的。

锁定函数request_fingerprint。

request_fingerprint

将请求的method、url、body构成字典,然后转换成json文本,通过sha1算法得到哈希值。

from_crawler:用于创建一次性的redis键,用来记录requests产生的哈希值,下次这些哈希值就用不上了。——因为key使用的是时间函数。

而from_spider:用于持续性的收集requests的哈希值,即便这次进程退出了,redis存储的哈希值在程序下一次跑起来时,仍然存在并筛选重复的requests。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def request_fingerprint(self, request):
"""Returns a fingerprint for a given request.

Parameters
----------
request : scrapy.http.Request

Returns
-------
str

"""
fingerprint_data = {
"method": to_unicode(request.method),
"url": canonicalize_url(request.url),
"body": (request.body or b"").hex(),
}
fingerprint_json = json.dumps(fingerprint_data, sort_keys=True)
return hashlib.sha1(fingerprint_json.encode()).hexdigest()

爬虫(源文件Spiders)

在文件spiders.py中,定义了3个类:RedisMixin、RedisSpider、RedisCrawlSpider。

在原来的Scrapy框架中,也存在2个类,SpiderCrawlSpider,后者比前者多了基于 Rule 的链接提取和自动跟进能力。

RedisMixin是一个混合类,相当于一个集成了redis的插件,任何类继承了它,就拥有了从redis读取url的能力。

start_request(RedisMixin)

这个函数是Scrapy框架中,爬虫启动调用的第一个方法,直接封装了self.next_requests()来获得request。

setup_redis(RedisMixin)

这个函数负责建立与redis的连接,其它的组件可以通过self.server获得redis的数据。

**获得配置:**从settings.py中读取与Redis相关的配置,如redis_key(URL存储在Redis中的键名)、redis_batch_size(每次从Redis中取出的URL数量)、redis_encoding(编码格式)等。如果用户没有在自己的爬虫里定义这些属性,就使用配置文件中的默认值。——crawler.settings对应着文件settings.py

**建立连接:**它使用connection.from_settings()根据Scrapy项目中的settings.py配置来建立与Redis服务器的连接——redis的类是:REDIS_CLS = redis.StrictRedis,先加载类,然后通过from_url或者构造函数建立连接。

确定数据类型:根据配置判断URL在Redis中是作为列表(list)、**集合(set)还是有序集合(zset)**存储的。并据此将self.fetch_data(获取数据的方法)指向对应的弹出操作函数(如pop_list_queue)。这使得爬虫可以灵活地处理不同类型的Redis数据结构。

**处理信号:**当没有待处理的请求时,会触发信号spider_idle,将这个信号和self.spider_idle方法关联起来,避免直接关闭爬虫。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def setup_redis(self, crawler=None):
"""Setup redis connection and idle signal.

This should be called after the spider has set its crawler object.
"""
if self.server is not None:
return

if crawler is None:
# We allow optional crawler argument to keep backwards
# compatibility.
# XXX: Raise a deprecation warning.
crawler = getattr(self, "crawler", None)

if crawler is None:
raise ValueError("crawler is required")

settings = crawler.settings

if self.redis_key is None:
self.redis_key = settings.get(
"REDIS_START_URLS_KEY",
defaults.START_URLS_KEY,
)

self.redis_key = self.redis_key % {"name": self.name}

if not self.redis_key.strip():
raise ValueError("redis_key must not be empty")

if self.redis_batch_size is None:
self.redis_batch_size = settings.getint(
"CONCURRENT_REQUESTS", defaults.REDIS_CONCURRENT_REQUESTS
)

try:
self.redis_batch_size = int(self.redis_batch_size)
except (TypeError, ValueError):
raise ValueError("redis_batch_size must be an integer")

if self.redis_encoding is None:
self.redis_encoding = settings.get(
"REDIS_ENCODING", defaults.REDIS_ENCODING
)

self.logger.info(
"Reading start URLs from redis key '%(redis_key)s' "
"(batch size: %(redis_batch_size)s, encoding: %(redis_encoding)s)",
self.__dict__,
)

self.server = connection.from_settings(crawler.settings)

if settings.getbool("REDIS_START_URLS_AS_SET", defaults.START_URLS_AS_SET):
self.fetch_data = self.server.spop
self.count_size = self.server.scard
elif settings.getbool("REDIS_START_URLS_AS_ZSET", defaults.START_URLS_AS_ZSET):
self.fetch_data = self.pop_priority_queue
self.count_size = self.server.zcard
else:
self.fetch_data = self.pop_list_queue
self.count_size = self.server.llen

if self.max_idle_time is None:
self.max_idle_time = settings.get(
"MAX_IDLE_TIME_BEFORE_CLOSE", defaults.MAX_IDLE_TIME
)

try:
self.max_idle_time = int(self.max_idle_time)
except (TypeError, ValueError):
raise ValueError("max_idle_time must be an integer")

# The idle signal is called when the spider has no requests left,
# that's when we will schedule new requests from redis queue
crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)

pop_list_queue/pop_priority_queue(RedisMixin)

获取redis的request数据。

pop_list_queue: 如果URL存储在**列表(list)**中,此方法会被调用。它原子性地(通过pipeline)获取列表头部的batch_size个URL,并从列表中删除它们。

pop_priority_queue: 如果URL存储在**有序集合(zset)**中(用于优先级队列),此方法会被调用。它获取分数最高的batch_size个URL,并从集合中删除它们。

这两个函数在setup_redis中,会传地址给函数fetch_data。

1
2
3
4
5
6
7
8
9
10
11
12
13
def pop_list_queue(self, redis_key, batch_size):
with self.server.pipeline() as pipe:
pipe.lrange(redis_key, 0, batch_size - 1)
pipe.ltrim(redis_key, batch_size, -1)
datas, _ = pipe.execute()
return datas

def pop_priority_queue(self, redis_key, batch_size):
with self.server.pipeline() as pipe:
pipe.zrevrange(redis_key, 0, batch_size - 1)
pipe.zremrangebyrank(redis_key, -batch_size, -1)
datas, _ = pipe.execute()
return datas

next_requests(RedisMixin)

调用fecth_data获取数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def next_requests(self):
"""Returns a request to be scheduled or none."""
# XXX: Do we need to use a timeout here?
found = 0
datas = self.fetch_data(self.redis_key, self.redis_batch_size)
for data in datas:
reqs = self.make_request_from_data(data)
if isinstance(reqs, Iterable):
for req in reqs:
yield req
# XXX: should be here?
found += 1
self.logger.info(f"start req url:{req.url}")
elif reqs:
yield reqs
found += 1
else:
self.logger.debug(f"Request not made from data: {data}")

if found:
self.logger.debug(f"Read {found} requests from '{self.redis_key}'")

make_request_from_data(RedisMixin)

这是一个把redis的字节数据转换成请求的转换函数,是定制化请求的关键。

解析数据:它接收从Redis取出的原始字节数据data,并尝试将其解码和解析。它优先处理JSON格式的字符串。

创建请求:如果数据是JSON,它可以解析出urlmeta(元数据)、method(如’POST’)等多个参数,并创建一个功能更丰富的FormRequest对象。这允许你通过Redis传递非常复杂的请求信息,而不仅仅是一个URL。

兼容旧版:如果数据只是一个普通的字符串(即一个URL),它会打印一个警告,但仍然会创建一个基本的FormRequest来兼容旧的用法。

错误处理:如果解析后的数据中没有url字段,它会发出警告并返回空,防止程序出错。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def make_request_from_data(self, data):
"""Returns a `Request` instance for data coming from Redis.

Overriding this function to support the `json` requested `data` that contains
`url` ,`meta` and other optional parameters. `meta` is a nested json which contains sub-data.

Along with:
After accessing the data, sending the FormRequest with `url`, `meta` and addition `formdata`, `method`

For example:

.. code:: json

{
"url": "https://example.com",
"meta": {
"job-id":"123xsd",
"start-date":"dd/mm/yy",
},
"url_cookie_key":"fertxsas",
"method":"POST",
}

If `url` is empty, return `[]`. So you should verify the `url` in the data.
If `method` is empty, the request object will set method to 'GET', optional.
If `meta` is empty, the request object will set `meta` to an empty dictionary, optional.

This json supported data can be accessed from 'scrapy.spider' through response.
'request.url', 'request.meta', 'request.cookies', 'request.method'

Parameters
----------
data : bytes
Message from redis.

"""
formatted_data = bytes_to_str(data, self.redis_encoding)

if is_dict(formatted_data):
parameter = json.loads(formatted_data)
else:
self.logger.warning(
f"{TextColor.WARNING}WARNING: String request is deprecated, please use JSON data format. "
f"Detail information, please check https://github.com/rmax/scrapy-redis#features{TextColor.ENDC}"
)
return FormRequest(formatted_data, dont_filter=True)

if parameter.get("url", None) is None:
self.logger.warning(
f"{TextColor.WARNING}The data from Redis has no url key in push data{TextColor.ENDC}"
)
return []

url = parameter.pop("url")
method = parameter.pop("method").upper() if "method" in parameter else "GET"
metadata = parameter.pop("meta") if "meta" in parameter else {}

return FormRequest(
url, dont_filter=True, method=method, formdata=parameter, meta=metadata
)

schedule_next_requests(RedisMixin)

再次从redis中获得一批request数据,然后将这些请求逐一交给Scrapy引擎(self.crawler.engine.crawl(req)),让引擎去安排下载。

这个函数是给idle函数调用的,当爬虫处于空闲状态,就会调用这个函数。

1
2
3
4
5
6
7
8
9
def schedule_next_requests(self):
"""Schedules a request if available"""
# TODO: While there is capacity, schedule a batch of redis requests.
for req in self.next_requests():
# see https://github.com/scrapy/scrapy/issues/5994
if scrapy_version >= (2, 6):
self.crawler.engine.crawl(req)
else:
self.crawler.engine.crawl(req, spider=self)

spider_idle(RedisMixin)

处理爬虫空闲状态,实现“永不停止”的分布式爬取。

检查Redis:当Scrapy引擎变为空闲时,此方法被触发。它会首先检查Redis队列中是否还有剩余的URL。

调度新请求:如果Redis中还有URL,它会调用self.schedule_next_requests()来获取并调度新的请求。

防止关闭:最关键的一步是,在最后raise DontCloseSpider。这个异常会告诉Scrapy引擎:“请不要关闭我,我可能很快就会有新的任务。” 这样,只要Redis中还有URL,或者有新的URL被不断添加进来,爬虫就会一直运行下去。

超时关闭:它也会检查空闲时间是否超过了设定的max_idle_time。如果长时间没有从Redis获取到新任务,它就不会抛出DontCloseSpider异常,从而允许爬虫正常关闭,避免了资源的无限期占用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def spider_idle(self):
"""
Schedules a request if available, otherwise waits.
or close spider when waiting seconds > MAX_IDLE_TIME_BEFORE_CLOSE.
MAX_IDLE_TIME_BEFORE_CLOSE will not affect SCHEDULER_IDLE_BEFORE_CLOSE.
"""
if self.server is not None and self.count_size(self.redis_key) > 0:
self.spider_idle_start_time = int(time.time())

self.schedule_next_requests()

idle_time = int(time.time()) - self.spider_idle_start_time
if self.max_idle_time != 0 and idle_time >= self.max_idle_time:
return
raise DontCloseSpider

类RedisSpider/RedisCrawlSpider的函数

直接继承父类RedisMixin和Spider/CrawlSpider。

相当于获得了一个实现了Redis接口的Spider/CrawlSpider。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class RedisSpider(RedisMixin, Spider):
"""Spider that reads urls from redis queue when idle.

Attributes
----------
redis_key : str (default: REDIS_START_URLS_KEY)
Redis key where to fetch start URLs from..
redis_batch_size : int (default: CONCURRENT_REQUESTS)
Number of messages to fetch from redis on each attempt.
redis_encoding : str (default: REDIS_ENCODING)
Encoding to use when decoding messages from redis queue.

Settings
--------
REDIS_START_URLS_KEY : str (default: "<spider.name>:start_urls")
Default Redis key where to fetch start URLs from..
REDIS_START_URLS_BATCH_SIZE : int (deprecated by CONCURRENT_REQUESTS)
Default number of messages to fetch from redis on each attempt.
REDIS_START_URLS_AS_SET : bool (default: False)
Use SET operations to retrieve messages from the redis queue. If False,
the messages are retrieve using the LPOP command.
REDIS_ENCODING : str (default: "utf-8")
Default encoding to use when decoding messages from redis queue.

"""

@classmethod
def from_crawler(cls, crawler, *args, **kwargs):
obj = super().from_crawler(crawler, *args, **kwargs)
obj.setup_redis(crawler)
return obj


class RedisCrawlSpider(RedisMixin, CrawlSpider):
"""Spider that reads urls from redis queue when idle.

Attributes
----------
redis_key : str (default: REDIS_START_URLS_KEY)
Redis key where to fetch start URLs from..
redis_batch_size : int (default: CONCURRENT_REQUESTS)
Number of messages to fetch from redis on each attempt.
redis_encoding : str (default: REDIS_ENCODING)
Encoding to use when decoding messages from redis queue.

Settings
--------
REDIS_START_URLS_KEY : str (default: "<spider.name>:start_urls")
Default Redis key where to fetch start URLs from..
REDIS_START_URLS_BATCH_SIZE : int (deprecated by CONCURRENT_REQUESTS)
Default number of messages to fetch from redis on each attempt.
REDIS_START_URLS_AS_SET : bool (default: True)
Use SET operations to retrieve messages from the redis queue.
REDIS_ENCODING : str (default: "utf-8")
Default encoding to use when decoding messages from redis queue.

"""

@classmethod
def from_crawler(cls, crawler, *args, **kwargs):
obj = super().from_crawler(crawler, *args, **kwargs)
obj.setup_redis(crawler)
return obj

管道数据存储(源文件pipelines)

只提2个地方——process_item和from_crawler。

from_crawler

这个函数在很多地方都看到了,这是一个工厂模式的对象创建函数。

from_crawler总会先获得一个类,然后将settings.py或者是其它来源(其它文件或者redis数据库)的配置信息,用于这个类的构造函数,以此创建一个实例。这样子创建出的实例,跟配置文件相关。

而工厂模式是:将所有组件的对象的实例,统一交给工厂类来完成,而非直接调用它们的构造函数。

Scrapy引擎在启动后,会通过settings.py获得类RedisPipeline,然后调用它的from_crawler进行实例化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
from scrapy.utils.misc import load_object
from scrapy.utils.serialize import ScrapyJSONEncoder
from twisted.internet.threads import deferToThread

from . import connection, defaults

default_serialize = ScrapyJSONEncoder().encode


class RedisPipeline:
"""Pushes serialized item into a redis list/queue

Settings
--------
REDIS_ITEMS_KEY : str
Redis key where to store items.
REDIS_ITEMS_SERIALIZER : str
Object path to serializer function.

"""

def __init__(
self, server, key=defaults.PIPELINE_KEY, serialize_func=default_serialize
):
"""Initialize pipeline.

Parameters
----------
server : StrictRedis
Redis client instance.
key : str
Redis key where to store items.
serialize_func : callable
Items serializer function.

"""
self.server = server
self.key = key
self.serialize = serialize_func

@classmethod
def from_settings(cls, settings):
params = {
"server": connection.from_settings(settings),
}
if settings.get("REDIS_ITEMS_KEY"):
params["key"] = settings["REDIS_ITEMS_KEY"]
if settings.get("REDIS_ITEMS_SERIALIZER"):
params["serialize_func"] = load_object(settings["REDIS_ITEMS_SERIALIZER"])

return cls(**params)

@classmethod
def from_crawler(cls, crawler):
return cls.from_settings(crawler.settings)

def process_item(self, item, spider):
return deferToThread(self._process_item, item, spider)

def _process_item(self, item, spider):
key = self.item_key(item, spider)
data = self.serialize(item)
self.server.rpush(key, data)
return item

def item_key(self, item, spider):
"""Returns redis key based on given spider.

Override this function to use a different key depending on the item
and/or spider.

"""
return self.key % {"spider": spider.name}

process_item

Scrapy引擎会将item发送给process_item,而scrapy-redit的process_item只做一件事。

1
return deferToThread(self._process_item, item, spider)

deferToThread 是这里的关键! 它的意思是“把 _process_item 这个任务扔到另一个线程去做”。

为什么这么做? 因为数据库操作(比如写入Redis)可能会有网络延迟,是阻塞操作。Scrapy是基于Twisted构建的异步框架,主线程非常宝贵,不能被任何耗时操作卡住。通过将写入操作移到子线程,主线程可以毫无延迟地继续去处理其他网络请求,极大地提高了爬虫的整体效率。这是一种异步非阻塞的设计。

在新的线程里,_process_item 方法开始执行实际的存储工作:

  1. key = self.item_key(item, spider): 调用 item_key 方法生成最终的Redis键名,默认会把爬虫的名字嵌入进去,例如 my_spider:items。这个方法可以被重写,以实现更复杂的逻辑(比如根据item内容存到不同的key)。
  2. data = self.serialize(item): 调用初始化时设置的序列化函数,将 item 这个Python字典转换成 data (一个JSON字符串)。
  3. self.server.rpush(key, data): 使用Redis的 rpush 命令,将这个JSON字符串推入指定 key 的列表末尾。
  4. return item: 将原始的 item 返回。这很重要,因为如果后面还有其他的Pipeline,item 可以继续被传递下去。

更多scrapy扩展参考

https://docs.scrapy.net.cn/en/latest/topics/extensions.html#module-scrapy.extensions.spiderstate