[源码学习][知了开发]WebMagic四大组件-Downloader,Pipeline,PageProcesser

写在前面

关于WebMagic这应该是最后一篇博文了,这一篇相对也简单一些

Pipeline & PageProcesser

这两部分是应该程序员自己实现的部分,因为PageProcesser关乎如何解析页面而Pipeline则是存储,推荐使用OOSpider也就是注解式编程。

Downloader

public interface Downloader {

    /** * Downloads web pages and store in Page object. * * @param request request * @param task task * @return page */
    public Page download(Request request,Task task);

    /** * Tell the downloader how many threads the spider used. * @param threadNum number of threads */
    public void setThread(int threadNum);
}

主要的实现类又3个,我只重点介绍一下HttpClientDownloader,有兴趣的可以自己看看源码

@ThreadSafe
public class HttpClientDownloader extends AbstractDownloader {

    private Logger logger = LoggerFactory.getLogger(getClass());

    private final Map<String,CloseableHttpClient> httpClients = new HashMap<String,CloseableHttpClient>();

    private HttpClientGenerator httpClientGenerator = new HttpClientGenerator();

    private CloseableHttpClient getHttpClient(Site site,Proxy proxy) {
        if (site == null) {
            return httpClientGenerator.getClient(null,proxy);
        }
        String domain = site.getDomain();
        CloseableHttpClient httpClient = httpClients.get(domain);
        if (httpClient == null) {
            synchronized (this) {
                httpClient = httpClients.get(domain);
                if (httpClient == null) {
                    httpClient = httpClientGenerator.getClient(site,proxy);
                    httpClients.put(domain,httpClient);
                }
            }
        }
        return httpClient;
    }

    @Override
    public Page download(Request request,Task task) {
        Site site = null;
        if (task != null) {
            site = task.getSite();
        }
        Set<Integer> acceptStatCode;
        String charset = null;
        Map<String,String> headers = null;
        if (site != null) {
            acceptStatCode = site.getAcceptStatCode();
            charset = site.getCharset();
            headers = site.getHeaders();
        } else {
            acceptStatCode = Sets.newHashSet(200);
        }
        logger.info("downloading page {}",request.getUrl());
        CloseableHttpResponse httpResponse = null;
        int statusCode=0;
        try {
            HttpHost proxyHost = null;
            Proxy proxy = null; //Todo
            if (site.getHttpProxyPool() != null && site.getHttpProxyPool().isEnable()) {
                proxy = site.getHttpProxyFromPool();
                proxyHost = proxy.getHttpHost();
            } else if(site.getHttpProxy()!= null){
                proxyHost = site.getHttpProxy();
            }

            HttpUriRequest httpUriRequest = getHttpUriRequest(request,site,headers,proxyHost);
            httpResponse = getHttpClient(site,proxy).execute(httpUriRequest);��֤
            statusCode = httpResponse.getStatusLine().getStatusCode();
            request.putExtra(Request.STATUS_CODE,statusCode);
            if (statusAccept(acceptStatCode,statusCode)) {
                Page page = handleResponse(request,charset,httpResponse,task);
                onSuccess(request);
                return page;
            } else {
                logger.warn("code error " + statusCode + "\t" + request.getUrl());
                return null;
            }
        } catch (IOException e) {
            logger.warn("download page " + request.getUrl() + " error",e);
            if (site.getCycleRetryTimes() > 0) {
                return addToCycleRetry(request,site);
            }
            onError(request);
            return null;
        } finally {
            request.putExtra(Request.STATUS_CODE,statusCode);
            if (site.getHttpProxyPool()!=null && site.getHttpProxyPool().isEnable()) {
                site.returnHttpProxyToPool((HttpHost) request.getExtra(Request.PROXY),(Integer) request
                        .getExtra(Request.STATUS_CODE));
            }
            try {
                if (httpResponse != null) {
                    //ensure the connection is released back to pool
                    EntityUtils.consume(httpResponse.getEntity());
                }
            } catch (IOException e) {
                logger.warn("close response fail",e);
            }
        }
    }

    @Override
    public void setThread(int thread) {
        httpClientGenerator.setPoolSize(thread);
    }

    protected boolean statusAccept(Set<Integer> acceptStatCode,int statusCode) {
        return acceptStatCode.contains(statusCode);
    }

    protected HttpUriRequest getHttpUriRequest(Request request,Site site,Map<String,String> headers,HttpHost proxy) {
        RequestBuilder requestBuilder = selectRequestMethod(request).setUri(request.getUrl());
        if (headers != null) {
            for (Map.Entry<String,String> headerEntry : headers.entrySet()) {
                requestBuilder.addHeader(headerEntry.getKey(),headerEntry.getValue());
            }
        }
        RequestConfig.Builder requestConfigBuilder = RequestConfig.custom()
                .setConnectionRequestTimeout(site.getTimeOut())
                .setSocketTimeout(site.getTimeOut())
                .setConnectTimeout(site.getTimeOut())
                .setCookieSpec(CookieSpecs.BEST_MATCH);
        if (proxy !=null) {
            requestConfigBuilder.setProxy(proxy);
            request.putExtra(Request.PROXY,proxy);
        }
        requestBuilder.setConfig(requestConfigBuilder.build());
        return requestBuilder.build();
    }

    protected RequestBuilder selectRequestMethod(Request request) {
        String method = request.getmethod();
        if (method == null || method.equalsIgnoreCase(HttpConstant.Method.GET)) {
            //default get
            return RequestBuilder.get();
        } else if (method.equalsIgnoreCase(HttpConstant.Method.POST)) {
            RequestBuilder requestBuilder = RequestBuilder.post();
            NameValuePair[] nameValuePair = (NameValuePair[]) request.getExtra("nameValuePair");
            if (nameValuePair != null && nameValuePair.length > 0) {
                requestBuilder.addParameters(nameValuePair);
            }
            return requestBuilder;
        } else if (method.equalsIgnoreCase(HttpConstant.Method.HEAD)) {
            return RequestBuilder.head();
        } else if (method.equalsIgnoreCase(HttpConstant.Method.PUT)) {
            return RequestBuilder.put();
        } else if (method.equalsIgnoreCase(HttpConstant.Method.DELETE)) {
            return RequestBuilder.delete();
        } else if (method.equalsIgnoreCase(HttpConstant.Method.TRACE)) {
            return RequestBuilder.trace();
        }
        throw new IllegalArgumentException("Illegal HTTP Method " + method);
    }

    protected Page handleResponse(Request request,String charset,HttpResponse httpResponse,Task task) throws IOException {
        String content = getContent(charset,httpResponse);
        Page page = new Page();
        page.setRawText(content);
        page.setUrl(new PlainText(request.getUrl()));
        page.setRequest(request);
        page.setStatusCode(httpResponse.getStatusLine().getStatusCode());
        return page;
    }

    protected String getContent(String charset,HttpResponse httpResponse) throws IOException {
        if (charset == null) {
            byte[] contentBytes = IoUtils.toByteArray(httpResponse.getEntity().getContent());
            String htmlCharset = getHtmlCharset(httpResponse,contentBytes);
            if (htmlCharset != null) {
                return new String(contentBytes,htmlCharset);
            } else {
                logger.warn("Charset autodetect Failed,use {} as charset. Please specify charset in Site.setCharset()",Charset.defaultCharset());
                return new String(contentBytes);
            }
        } else {
            return IoUtils.toString(httpResponse.getEntity().getContent(),charset);
        }
    }

    protected String getHtmlCharset(HttpResponse httpResponse,byte[] contentBytes) throws IOException {
        String charset;
        // charset
        // 1、encoding in http header Content-Type
        String value = httpResponse.getEntity().getContentType().getValue();
        charset = UrlUtils.getCharset(value);
        if (StringUtils.isNotBlank(charset)) {
            logger.debug("Auto get charset: {}",charset);
            return charset;
        }
        // use default charset to decode first time
        Charset defaultCharset = Charset.defaultCharset();
        String content = new String(contentBytes,defaultCharset.name());
        // 2、charset in Meta
        if (StringUtils.isNotEmpty(content)) {
            Document document = Jsoup.parse(content);
            Elements links = document.select("Meta");
            for (Element link : links) {
                // 2.1、html4.01 <Meta http-equiv="Content-Type" content="text/html; charset=UTF-8" />
                String MetaContent = link.attr("content");
                String MetaCharset = link.attr("charset");
                if (MetaContent.indexOf("charset") != -1) {
                    MetaContent = MetaContent.substring(MetaContent.indexOf("charset"),MetaContent.length());
                    charset = MetaContent.split("=")[1];
                    break;
                }
                // 2.2、html5 <Meta charset="UTF-8" />
                else if (StringUtils.isNotEmpty(MetaCharset)) {
                    charset = MetaCharset;
                    break;
                }
            }
        }
        logger.debug("Auto get charset: {}",charset);
        // 3、todo use tools as cpdetector for content decode
        return charset;
    }
}

其中包括添加http proxy这部分官方文档都没有介绍,如果需要那就自行看源码吧- -b
再看带那种的这部分

if (statusAccept(acceptStatCode,statusCode)) {
    Page page = handleResponse(request,task);
    onSuccess(request);
    return page;
} else {
    logger.warn("code error " + statusCode + "\t" + request.getUrl());
    return null;
}

acceptStatCode认是200,如果出现其他resultCode那么将会直接return null,也不会释放HttpClient的资源,也就是下面的finally块不会被执行。也算是一个bug吧

finally {
    request.putExtra(Request.STATUS_CODE,statusCode);
    if (site.getHttpProxyPool()!=null && site.getHttpProxyPool().isEnable()) {
        site.returnHttpProxyToPool((HttpHost) request.getExtra(Request.PROXY),(Irequest
                .getExtra(Request.STATUS_CODE));
    }
    try {
        if (httpResponse != null) {
            //ensure the connection is released back to pool
            EntityUtils.consume(httpResponse.getEntity());
        }
    } catch (IOException e) {
        logger.warn("close response fail",e);
    }
}

最后

到此为止,所有的关于WebMagic的主体源码都介绍完毕了,如果你需要使用那么目前的知识已经足够了,如果出现bug还是需要自行更改,还好WebMagic给我们提供了更换组件的接口,使用起来还是很方便的。

相关文章

迭代器模式(Iterator)迭代器模式(Iterator)[Cursor]意图...
高性能IO模型浅析服务器端编程经常需要构造高性能的IO模型,...
策略模式(Strategy)策略模式(Strategy)[Policy]意图:定...
访问者模式(Visitor)访问者模式(Visitor)意图:表示一个...
命令模式(Command)命令模式(Command)[Action/Transactio...
生成器模式(Builder)生成器模式(Builder)意图:将一个对...