我刚开始学习ZeroMQ,并希望在学习的同时构建一个分布式webcrawler作为示例.
我的想法是有一个用PHP编写的“服务器”,它接受一个应该开始爬行的URL.
Worker(C#cli)必须抓取该URL,提取链接,并将它们推回到服务器上的堆栈中.服务器不断将堆栈中的URL发送给工作人员.
也许redis会跟踪所有已爬网的网址,因此我们不会多次抓取网站,并且能够提取当前进程的统计信息.
我想让服务器均匀地分配任务,注意新工作/缺少工作人员,并在工人没有响应时重新分配网址.
为什么PHP服务器:我对PHP非常熟悉,就是这样.我不想让示例/测试项目更复杂.
为什么C#为minions:因为它在大多数Windows机器上运行.我可以将可执行文件提供给各种朋友,他们可以执行它并帮助我测试我的项目.
我的第一种方法是PUSH / PULL模式,它通常适用于我的场景,但不知道它的仆从.我想我需要一个中间经销商/经纪人经纪人,并且必须为自己处理工人意识.
我找到了this question,但我不确定我是否理解答案……
我想要一些提示如何强制zmq的东西.经销商的方法是否正确?有没有办法获得自动工人意识?我想我需要一些资源/示例,或者你认为我只需要深入研究zmq指南?
但是,朝正确方向的一些提示会很棒:)
干杯
解决方法:
我正在建立一个工作/任务经销商,至少与你的爬虫一样.这是我学到的一些东西:
定义所有事件
服务器和爬网程序之间的通信将基于系统中发生的不同事情,例如从服务器到爬网程序的调度工作,或者向服务器发送心跳消息的爬网程序.定义系统的事件类型;他们是用例:
disPATCH_WORK_TO_CRAWLER_EVENT
CRAWLER_NODE_STATUS_EVENT
...
定义消息标准
服务器和爬虫之间的所有通信都应该使用ZMsg来完成,因此定义一个组织框架的标准,如下所示:
Frame1: "Crawler v1.0" //this is a static header
Frame2: <event type> //ex: "CRAWLER_NODE_STATUS_EVENT"
Frame3: <content xml/json/binary> //content that applies to this event (if any)
现在,您可以创建消息验证器来验证在对等体之间收到的ZMsgs,因为您有一个标准约定,所有消息都必须遵循.
服务器
在服务器上使用单个ROUTER进行与爬网程序的异步和双向通信.另外,使用PUB套接字来广播心跳消息.
不要阻塞ROUTER套接字,使用POLLER循环每5秒或其他任何东西,这允许服务器定期做其他事情,比如向抓取者广播心跳事件;这样的事情:
Socket rtr = .. //ZMQ.ROUTER
Socket pub = .. //ZMQ.PUB
ZMQ.Poller poller = new ZMQ.Poller(2)
poller.register( rtr, ZMQ.Poller.POLLIN)
poller.register( pub, ZMQ.Poller.POLLIN)
while (true) {
ZMsg msg = null
poller.poll(5000)
if( poller.pollin(0)){
//messages from crawlers
msg = ZMsg.recvMsg(rtr)
}
//send heartbeat messages
ZMsg hearbeatMsg = ...
//create message content here,
//publish to all crawlers
heartbeatMsg.send(pub)
}
为了解决有关工作人员意识的问题,一种简单有效的方法使用FIFO堆栈和心跳消息;这样的事情:
>服务器在内存中维护一个简单的FIFO堆栈
>服务器发出心跳;抓取工具以其节点名称响应; ROUTER也自动将节点的地址放在消息中(阅读消息enveloping)
>将1个对象推入包含节点名称和节点地址的堆栈
>当服务器想要将工作分派给爬虫时,只需从堆栈中弹出下一个对象,正确创建消息和地址(使用节点地址),然后关闭它到该工作者
>以同样的方式向其他爬虫发送更多工作;当爬虫响应回服务器时,只需将具有节点名称/地址的另一个对象推回堆栈;其他工人在回应之前将无法使用,所以我们不会打扰他们.
这是一种基于工作人员可用性而不是盲目地发送工作来分配工作的简单但有效的方法.检查lbbroker.php的例子,概念是一样的.
履带(工人)
工作人员应使用单个DEALER套接字和SUB. DEALER是异步通信的主套接字,SUB订阅来自服务器的心跳消息.当工作程序收到心跳消息时,它会响应DEALER套接字上的服务器.
Socket dlr = .. //ZMQ.DEALER
Socket sub = .. //ZMQ.SUB
ZMQ.Poller poller = new ZMQ.Poller(2)
poller.register( dlr, ZMQ.Poller.POLLIN)
poller.register( sub, ZMQ.Poller.POLLIN)
while (true) {
ZMsg msg = null
poller.poll(5000)
if( poller.pollin(0)){
//message from server
msg = ZMsg.recvMsg(dlr)
}
if( poller.pollin(1)){
//heartbeat message from server
msg = ZMsg.recvMsg(sub)
//reply back with status
ZMsg statusMsg = ...
statusMsg.send(dlr)
}
其余的你可以自己搞清楚.完成PHP示例,构建内容,打破它,构建更多内容,这是您学习的唯一方法!
玩得开心,希望它有所帮助!