Fetcher

From command line, using this to start Fetcher:

hostname:${NUTCH_HOME}$ bin/nutch Fetcher <segment> [-threads n] [-noParsing]

if noParsing is set, Fetcher just fetch urls and don’t parse text/links.

org.apache.nutch.fetcher.Fetcher implements org.apache.hadoop.mapred.MapRunnable, so its mapred method is: run(RecordReader, OutputCollecotr, Reporter). And there’s no method named neither map(xxx) nor reduce(xxx).

Job properties:

  • Input path: ${segment}/crawl_generate
  • Input format: InputFormat extends SequenceFileInputFormat
  • Output format: org.apache.nutch.fetcher.FetcherOutputFormat
  • Output path: several directories under the segment

Methods/properties related to FetchItem in fetch progress are listed below (abstract):

class Fetcher {
  FetchItemQueues queues;
  void run(RecordReader inputReader, OutputCollecotr, Reporter) {
    new QueueFeeder(inputReader, queues).start();
    for(threadNumbers) {
      new FetchThread(xxx).start();
    }
  }
  class FetchThread extends Thread { // request URL...
    void run() {
      while (true) {
        fetchItem = queues.get();
        if (fetchItem == null) return;
        response = tryToFetch(fetchItem);
        judge(response) {
          // xxx
          queues.finished(fetchItem);
          if (someCondition) {
            // modify current working queue, but not the crawldb
            queues.add(anotherFetchItem);
          }
        }
      }
    }
  }
}
class QueueFeeder extends Thread {
  FetchItemQueues queues;
  void run() {
    while (reader.next(url, datum)) { // read from crawldb(dfs filesystem)
      // add to current working queue
      queues.addFetchItem(url, datum);
    }
  }
}
class FetchItemQueues {
  Map<String, FetchItemQueue> queues;
  void addFetchItem(Text, CrawlDatum) {
    addFetchItem(FetchItem.create(url, datum, byIP));
  }
  synchronized void addFetchItem(FetchItem it) {
    FetchItemQueue queue = getFetchItemQueue(xxx);
    queue.addFetchItem(it);
  }
  public synchronized FetchItem getFetchItem() {
    // xxx
  }
}
class FetchItemQueue {
  Set<FetchItem> inProgress; // synchronized
  List<FetchItem> queue; // synchronized
  public FetchItem getFetchItem() {
    // xxx
  }
}

Fetcher starts ${threads} FetchThread to do fetch work. And starts one QueueFeeder to read urls/CrawlDatum from input pathes. FetchItemQueues is the bridge bewteen FetchThread and QueueFeeder. FetchItemQueues manage FetchItemQueue with a Map, this map’s key is url’s hostname.

QueueFeeder call FetchItemQueues.addFetchItem(url, crawlDatum) to put object to queues while read record from input source.
FetchThread call synchronized FetchItemQueues.getFetchItem() to get object, this method is one key-point of Nutch crawler’s pliteness macharism. It made each node won’t over-request to one host.

class FetchItemQueues {
  public synchronized FetchItem getFetchItem() {
    // Iterate queues EntrySet<String, FetchItemQueue>
    // if FetchItemQueue.getFetchItem() return a not null object, return this obj.
    // else go to next
    // if nothing returns, return null. ---- (1)
  }
}
class FetchItemQueue {
  AtomicLong nextFetchTime = new AtomicLong();
  public FetchItem getFetchItem() {
    // return null if nextFetchTime > now ---- (2)
    // return next item.
  }
}

(1) if this return null, FetchThread checks the total size of FetchItemQueues, wait if size > 0.
(2) FetchItemQueue.nextFetchTime is set when:
FetchThread fetched a item, call FetchItemQueues.finishFetchItem(FetchItem)
FEtchItemQueues then call FetchItemQueue.finishFetchItem(FetchItem, boolean), this will set nextFetchTime.

Pay attention to the OutputFormat class. There’re something interesting.


Fetcher 启动10(默认)个线程 FetchThread 开始网页的爬取工作。同时启动一个线程 QueueFeeder 从输入路径(segments/2011xxxxx/crawl_generate)读取。但是这两个线程不直接打交道,而是通过 FetchItemQueues,其实就是一个缓冲区间,FetchItemQueues 利用 Map 维护一组 FetchItemQueue,这个 Map 的键(Key)是各个 URL 的主机名(hostname)。

QueueFeeder 从输入源中读取记录后调用 FetchItemQueues.addFetchItem(url, crawlDatum) 往缓冲区里添加记录。
FetchThread 调用 FetchItemQueues.getFetchItem() 的同步方法从中获取记录。注意,这个方法是 Nutch 爬虫礼貌机制的重要一点。它确定了同一个节点上的爬取线程不会对同一台主机发出过量的请求。代码如上,不用中文再说一遍代码来。

(1) 如果 FetchItemQueues.getFetchItem() 返回空,爬取线程(FetchThread)会检查缓冲区的总数,决定是否退出。
(2) FetchItemQueue.nextFetchTime 在爬取线程做完一个 URL 工作时进行设置,具体见上面的描述。

最后是 FetchOutputFormat ,它往多个路径输出,需要 关注/改造 Nutch 输出结果的话,必须看看这个类。

anyShare分享到:
          

没准儿您会对以下内容感兴趣: