# If these values are missing, it means we want to use the defaults. optional = { # TODO: Use custom prefixes for this settings to note that are # specific to scrapy-redis. "queue_key": "SCHEDULER_QUEUE_KEY", "queue_cls": "SCHEDULER_QUEUE_CLASS", "dupefilter_key": "SCHEDULER_DUPEFILTER_KEY", # We use the default setting name to keep compatibility. "dupefilter_cls": "DUPEFILTER_CLASS", "serializer": "SCHEDULER_SERIALIZER", } for name, setting_name in optional.items(): val = settings.get(setting_name) if val: kwargs[name] = val
# Support serializer as a path to a module. ifisinstance(kwargs.get("serializer"), str): kwargs["serializer"] = importlib.import_module(kwargs["serializer"])
server = connection.from_settings(settings) # Ensure the connection is working. server.ping()
@classmethod deffrom_crawler(cls, crawler): instance = cls.from_settings(crawler.settings) # FIXME: for now, stats are only supported from this constructor instance.stats = crawler.stats return instance
ifself.flush_on_start: self.flush() # notice if there are requests already in the queue to resume the crawl iflen(self.queue): spider.log(f"Resuming crawl ({len(self.queue)} requests scheduled)")
defsetup_redis(self, crawler=None): """Setup redis connection and idle signal. This should be called after the spider has set its crawler object. """ ifself.server isnotNone: return
if crawler isNone: # We allow optional crawler argument to keep backwards # compatibility. # XXX: Raise a deprecation warning. crawler = getattr(self, "crawler", None)
if crawler isNone: raise ValueError("crawler is required")
try: self.max_idle_time = int(self.max_idle_time) except (TypeError, ValueError): raise ValueError("max_idle_time must be an integer")
# The idle signal is called when the spider has no requests left, # that's when we will schedule new requests from redis queue crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)
defnext_requests(self): """Returns a request to be scheduled or none.""" # XXX: Do we need to use a timeout here? found = 0 datas = self.fetch_data(self.redis_key, self.redis_batch_size) for data in datas: reqs = self.make_request_from_data(data) ifisinstance(reqs, Iterable): for req in reqs: yield req # XXX: should be here? found += 1 self.logger.info(f"start req url:{req.url}") elif reqs: yield reqs found += 1 else: self.logger.debug(f"Request not made from data: {data}")
if found: self.logger.debug(f"Read {found} requests from '{self.redis_key}'")
defmake_request_from_data(self, data): """Returns a `Request` instance for data coming from Redis. Overriding this function to support the `json` requested `data` that contains `url` ,`meta` and other optional parameters. `meta` is a nested json which contains sub-data. Along with: After accessing the data, sending the FormRequest with `url`, `meta` and addition `formdata`, `method` For example: .. code:: json { "url": "https://example.com", "meta": { "job-id":"123xsd", "start-date":"dd/mm/yy", }, "url_cookie_key":"fertxsas", "method":"POST", } If `url` is empty, return `[]`. So you should verify the `url` in the data. If `method` is empty, the request object will set method to 'GET', optional. If `meta` is empty, the request object will set `meta` to an empty dictionary, optional. This json supported data can be accessed from 'scrapy.spider' through response. 'request.url', 'request.meta', 'request.cookies', 'request.method' Parameters ---------- data : bytes Message from redis. """ formatted_data = bytes_to_str(data, self.redis_encoding)
if is_dict(formatted_data): parameter = json.loads(formatted_data) else: self.logger.warning( f"{TextColor.WARNING}WARNING: String request is deprecated, please use JSON data format. " f"Detail information, please check https://github.com/rmax/scrapy-redis#features{TextColor.ENDC}" ) return FormRequest(formatted_data, dont_filter=True)
if parameter.get("url", None) isNone: self.logger.warning( f"{TextColor.WARNING}The data from Redis has no url key in push data{TextColor.ENDC}" ) return []
defschedule_next_requests(self): """Schedules a request if available""" # TODO: While there is capacity, schedule a batch of redis requests. for req inself.next_requests(): # see https://github.com/scrapy/scrapy/issues/5994 if scrapy_version >= (2, 6): self.crawler.engine.crawl(req) else: self.crawler.engine.crawl(req, spider=self)
defspider_idle(self): """ Schedules a request if available, otherwise waits. or close spider when waiting seconds > MAX_IDLE_TIME_BEFORE_CLOSE. MAX_IDLE_TIME_BEFORE_CLOSE will not affect SCHEDULER_IDLE_BEFORE_CLOSE. """ ifself.server isnotNoneandself.count_size(self.redis_key) > 0: self.spider_idle_start_time = int(time.time())
classRedisSpider(RedisMixin, Spider): """Spider that reads urls from redis queue when idle. Attributes ---------- redis_key : str (default: REDIS_START_URLS_KEY) Redis key where to fetch start URLs from.. redis_batch_size : int (default: CONCURRENT_REQUESTS) Number of messages to fetch from redis on each attempt. redis_encoding : str (default: REDIS_ENCODING) Encoding to use when decoding messages from redis queue. Settings -------- REDIS_START_URLS_KEY : str (default: "<spider.name>:start_urls") Default Redis key where to fetch start URLs from.. REDIS_START_URLS_BATCH_SIZE : int (deprecated by CONCURRENT_REQUESTS) Default number of messages to fetch from redis on each attempt. REDIS_START_URLS_AS_SET : bool (default: False) Use SET operations to retrieve messages from the redis queue. If False, the messages are retrieve using the LPOP command. REDIS_ENCODING : str (default: "utf-8") Default encoding to use when decoding messages from redis queue. """
classRedisCrawlSpider(RedisMixin, CrawlSpider): """Spider that reads urls from redis queue when idle. Attributes ---------- redis_key : str (default: REDIS_START_URLS_KEY) Redis key where to fetch start URLs from.. redis_batch_size : int (default: CONCURRENT_REQUESTS) Number of messages to fetch from redis on each attempt. redis_encoding : str (default: REDIS_ENCODING) Encoding to use when decoding messages from redis queue. Settings -------- REDIS_START_URLS_KEY : str (default: "<spider.name>:start_urls") Default Redis key where to fetch start URLs from.. REDIS_START_URLS_BATCH_SIZE : int (deprecated by CONCURRENT_REQUESTS) Default number of messages to fetch from redis on each attempt. REDIS_START_URLS_AS_SET : bool (default: True) Use SET operations to retrieve messages from the redis queue. REDIS_ENCODING : str (default: "utf-8") Default encoding to use when decoding messages from redis queue. """
from scrapy.utils.misc import load_object from scrapy.utils.serialize import ScrapyJSONEncoder from twisted.internet.threads import deferToThread
from . import connection, defaults
default_serialize = ScrapyJSONEncoder().encode
classRedisPipeline: """Pushes serialized item into a redis list/queue Settings -------- REDIS_ITEMS_KEY : str Redis key where to store items. REDIS_ITEMS_SERIALIZER : str Object path to serializer function. """
def__init__( self, server, key=defaults.PIPELINE_KEY, serialize_func=default_serialize ): """Initialize pipeline. Parameters ---------- server : StrictRedis Redis client instance. key : str Redis key where to store items. serialize_func : callable Items serializer function. """ self.server = server self.key = key self.serialize = serialize_func
@classmethod deffrom_settings(cls, settings): params = { "server": connection.from_settings(settings), } if settings.get("REDIS_ITEMS_KEY"): params["key"] = settings["REDIS_ITEMS_KEY"] if settings.get("REDIS_ITEMS_SERIALIZER"): params["serialize_func"] = load_object(settings["REDIS_ITEMS_SERIALIZER"])
defitem_key(self, item, spider): """Returns redis key based on given spider. Override this function to use a different key depending on the item and/or spider. """ returnself.key % {"spider": spider.name}