Skip to content

Commit

Permalink
Fetcher, set number of threads via metadata, fix #1111. Clarify varia…
Browse files Browse the repository at this point in the history
…ble for custom minCrawlDelay

Signed-off-by: Julien Nioche <[email protected]>
  • Loading branch information
jnioche committed Oct 20, 2023
1 parent e233c85 commit c313b4f
Showing 1 changed file with 20 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,12 @@ public class FetcherBolt extends StatusEmitterBolt {
*/
public static final String QUEUED_TIMEOUT_PARAM_KEY = "fetcher.timeout.queue";

/** Key name of the custom crawl delay for a page that may be present in metadata */
/** Key name of the custom crawl delay for a queue that may be present in the metadata */
private static final String CRAWL_DELAY_KEY_NAME = "crawl.delay";

/** Key name of the custom max number of threads that may be present in the metadata */
private static final String CRAWL_MAX_THREAD_KEY_NAME = "max.threads.queue";

private final AtomicInteger activeThreads = new AtomicInteger(0);
private final AtomicInteger spinWaiting = new AtomicInteger(0);

Expand Down Expand Up @@ -318,11 +321,12 @@ public synchronized void finishFetchItem(FetchItem it, boolean asap) {

public synchronized FetchItemQueue getFetchItemQueue(String id, Metadata metadata) {
FetchItemQueue fiq = queues.get(id);
// custom crawl delay from metadata?
final long customCrawlDelay =
// custom min crawl delay from metadata?
final long minDelay =
metadata != null && metadata.getFirstValue(CRAWL_DELAY_KEY_NAME) != null
? Long.parseLong(metadata.getFirstValue(CRAWL_DELAY_KEY_NAME))
: minCrawlDelay;

if (fiq == null) {
int customThreadVal = defaultMaxThread;
// custom maxThread value?
Expand All @@ -332,16 +336,24 @@ public synchronized FetchItemQueue getFetchItemQueue(String id, Metadata metadat
break;
}
}

// overridden at URL level
// custom thread number from metadata?
if (metadata != null) {
final String val = metadata.getFirstValue(CRAWL_MAX_THREAD_KEY_NAME);
if (val != null) {
customThreadVal = Integer.parseInt(val);
}
}

// initialize queue
fiq =
new FetchItemQueue(
customThreadVal, crawlDelay, customCrawlDelay, maxQueueSize);
fiq = new FetchItemQueue(customThreadVal, crawlDelay, minDelay, maxQueueSize);
queues.put(id, fiq);
}
// in cases where we have different pages with the same key that will fall in the same
// queue, each one with a custom crawl delay, we take the less aggressive
if (fiq.minCrawlDelay < customCrawlDelay) {
fiq.minCrawlDelay = customCrawlDelay;
if (fiq.minCrawlDelay < minDelay) {
fiq.minCrawlDelay = minDelay;
}
return fiq;
}
Expand Down

0 comments on commit c313b4f

Please sign in to comment.