问题描述
我正在尝试使用 pika 和 scrapy 来运行 MQ,并让消费者调用蜘蛛。我有一个 consumer.py
和一个爬虫蜘蛛 spider.py
。
蜘蛛在消费者中运行,并带有生产者发送的参数。我用
used_channel.basic_ack(delivery_tag=basic_deliver.delivery_tag)
删除消息。
我预计当蜘蛛完成工作时消息会被删除,如果出现错误,消息应该重新排队。当蜘蛛正常运行时,一切看起来都很好;消息被删除,工作完成。但是,如果在运行爬虫时发生错误,消息仍会被删除,作业未完成但消息丢失。
我看了Rabbitmq的管理界面,发现spider还在运行的时候消息变成了0(控制台还没有显示工作完成)。
我想知道是因为scrapy是异步的吗?因此,当这一行 run_spider(message=decodebody)
仍在运行时,下一行 used_channel.basic_ack(delivery_tag=basic_deliver.delivery_tag)
不会等到蜘蛛完成。
from scrapy.utils.project import get_project_settings
setup() # for CrawlerRunner
settings = get_project_settings()
def get_message(used_channel,basic_deliver,properties,body):
decodebody = bytes.decode(body)
try:
run_spider(message=decodebody)
used_channel.basic_ack(delivery_tag=basic_deliver.delivery_tag)
except:
channel.basic_reject(delivery_tag=basic_deliver.delivery_tag)
def run_spider(message):
crawler = CrawlerRunner(settings)
crawler.crawl(MySpider,message=message)
while(True):
try:
# blocking connection
connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbit_host))
channel = connection.channel()
# declare exchange,the setting must be same as producer
channel.exchange_declare(
exchange=rabbit_exchange,exchange_type='direct',durable=True,auto_delete=False
)
# declare queue,the setting must be same as producer
channel.queue_declare(
queue=rabbit_queue,exclusive=False,auto_delete=False
)
# bind the setting
channel.queue_bind(
exchange=rabbit_exchange,queue=rabbit_queue,routing_key=routing_key
)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue=rabbit_queue,on_message_callback=get_message,auto_ack=False
)
logger.info(' [*] Waiting for messages. To exit press CTRL+C')
# start crawler
channel.start_consuming()
except pika.exceptions.ConnectionClosed as err:
print('ConnectionClosed error:',err)
continue
# Do not recover on channel errors
except pika.exceptions.AMQPChannelError as err:
print("Caught a channel error: {},stopping...".format(err))
break
# Recover on all other connection errors
except pika.exceptions.AMQPConnectionError as err:
print("Connection was closed,retrying...",err)
continue
解决方法
我发现有人用 MQ 处理 pika 库的多线程。他使用 .is_alive
来检查线程是否完成。所以,我遵循这个想法。 Scrapy 是多线程的,我添加了返回 crawler
,并在删除消息之前检查 crawler._active
。
Source code for scrapy.crawler
def run_spider(news_info):
# run spider with CrawlerRunner
crawler = CrawlerRunner(settings)
# run the spider script
crawler.crawl(UrlSpider,news_info=news_info)
return crawler
crawler = run_spider(news_info=decodebody)
# wait until the crawler is done
while (len(crawler._active) > 0):
time.sleep(1)
used_channel.basic_ack(delivery_tag=basic_deliver.delivery_tag)