当spider有scrapy和Rabbitmqpika错误时如何重新排队消息

问题描述

我正在尝试使用 pika 和 scrapy 来运行 MQ,并让消费者调用蜘蛛。我有一个 consumer.py一个爬虫蜘蛛 spider.py

蜘蛛在消费者中运行,并带有生产者发送的参数。我用 used_channel.basic_ack(delivery_tag=basic_deliver.delivery_tag) 删除消息。

我预计当蜘蛛完成工作时消息会被删除,如果出现错误,消息应该重新排队。当蜘蛛正常运行时,一切看起来都很好;消息被删除,工作完成。但是,如果在运行爬虫时发生错误,消息仍会被删除,作业未完成但消息丢失。

我看了Rabbitmq的管理界面,发现spider还在运行的时候消息变成了0(控制台还没有显示工作完成)。

我想知道是因为scrapy是异步的吗?因此,当这一行 run_spider(message=decodebody) 仍在运行时,下一行 used_channel.basic_ack(delivery_tag=basic_deliver.delivery_tag) 不会等到蜘蛛完成。

我该如何解决这个问题?我想在蜘蛛正常完成工作后删除消息。

from scrapy.utils.project import get_project_settings

setup() # for CrawlerRunner
settings = get_project_settings()

def get_message(used_channel,basic_deliver,properties,body):
    decodebody = bytes.decode(body)

    try:
        run_spider(message=decodebody)
        used_channel.basic_ack(delivery_tag=basic_deliver.delivery_tag)

    except: 
        channel.basic_reject(delivery_tag=basic_deliver.delivery_tag)


def run_spider(message):
    crawler = CrawlerRunner(settings)
    crawler.crawl(MySpider,message=message)


while(True):
    try: 
        # blocking connection
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbit_host))
        channel = connection.channel()
        # declare exchange,the setting must be same as producer
        channel.exchange_declare(
            exchange=rabbit_exchange,exchange_type='direct',durable=True,auto_delete=False        
        )
        # declare queue,the setting must be same as producer
        channel.queue_declare(
            queue=rabbit_queue,exclusive=False,auto_delete=False
        )
        # bind the setting
        channel.queue_bind(
            exchange=rabbit_exchange,queue=rabbit_queue,routing_key=routing_key
        )

        channel.basic_qos(prefetch_count=1) 
        channel.basic_consume(
            queue=rabbit_queue,on_message_callback=get_message,auto_ack=False
        )

        logger.info(' [*] Waiting for messages. To exit press CTRL+C')
        # start crawler
        channel.start_consuming()
    
    except pika.exceptions.ConnectionClosed as err:
        print('ConnectionClosed error:',err)
        continue
    # Do not recover on channel errors
    except pika.exceptions.AMQPChannelError as err:
        print("Caught a channel error: {},stopping...".format(err))
        break
    # Recover on all other connection errors
    except pika.exceptions.AMQPConnectionError as err:    
        print("Connection was closed,retrying...",err)
        continue



解决方法

我发现有人用 MQ 处理 pika 库的多线程。他使用 .is_alive 来检查线程是否完成。所以,我遵循这个想法。 Scrapy 是多线程的,我添加了返回 crawler,并在删除消息之前检查 crawler._active

Source code for scrapy.crawler

def run_spider(news_info):
    # run spider with CrawlerRunner
    crawler = CrawlerRunner(settings)
    # run the spider script
    crawler.crawl(UrlSpider,news_info=news_info)

    return crawler
crawler = run_spider(news_info=decodebody)
        
# wait until the crawler is done
while (len(crawler._active) > 0):
    time.sleep(1)

used_channel.basic_ack(delivery_tag=basic_deliver.delivery_tag)