diff --git a/src/java/org/apache/nutch/crawl/CrawlDbFilter.java b/src/java/org/apache/nutch/crawl/CrawlDbFilter.java index 7f28a3a85a..912c6e4abf 100644 --- a/src/java/org/apache/nutch/crawl/CrawlDbFilter.java +++ b/src/java/org/apache/nutch/crawl/CrawlDbFilter.java @@ -22,6 +22,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.conf.Configuration; import org.apache.nutch.metrics.NutchMetrics; @@ -50,6 +51,11 @@ public class CrawlDbFilter extends private String scope; + // Cached counter references for performance + private Counter goneRecordsRemovedCounter; + private Counter orphanRecordsRemovedCounter; + private Counter urlsFilteredCounter; + private static final Logger LOG = LoggerFactory .getLogger(MethodHandles.lookup().lookupClass()); @@ -68,6 +74,21 @@ public void setup(Mapper.Context context) { scope = conf.get(URL_NORMALIZING_SCOPE, URLNormalizers.SCOPE_CRAWLDB); normalizers = new URLNormalizers(conf, scope); } + + // Initialize cached counter references + initCounters(context); + } + + /** + * Initialize cached counter references to avoid repeated lookups in hot paths. + */ + private void initCounters(Context context) { + goneRecordsRemovedCounter = context.getCounter( + NutchMetrics.GROUP_CRAWLDB_FILTER, NutchMetrics.CRAWLDB_GONE_RECORDS_REMOVED_TOTAL); + orphanRecordsRemovedCounter = context.getCounter( + NutchMetrics.GROUP_CRAWLDB_FILTER, NutchMetrics.CRAWLDB_ORPHAN_RECORDS_REMOVED_TOTAL); + urlsFilteredCounter = context.getCounter( + NutchMetrics.GROUP_CRAWLDB_FILTER, NutchMetrics.CRAWLDB_URLS_FILTERED_TOTAL); } private Text newKey = new Text(); @@ -81,15 +102,13 @@ public void map(Text key, CrawlDatum value, // https://issues.apache.org/jira/browse/NUTCH-1101 check status first, // cheaper than normalizing or filtering if (url404Purging && CrawlDatum.STATUS_DB_GONE == value.getStatus()) { - context.getCounter(NutchMetrics.GROUP_CRAWLDB_FILTER, - NutchMetrics.CRAWLDB_GONE_RECORDS_REMOVED_TOTAL).increment(1); + goneRecordsRemovedCounter.increment(1); return; } // Whether to remove orphaned pages // https://issues.apache.org/jira/browse/NUTCH-1932 if (purgeOrphans && CrawlDatum.STATUS_DB_ORPHAN == value.getStatus()) { - context.getCounter(NutchMetrics.GROUP_CRAWLDB_FILTER, - NutchMetrics.CRAWLDB_ORPHAN_RECORDS_REMOVED_TOTAL).increment(1); + orphanRecordsRemovedCounter.increment(1); return; } if (url != null && urlNormalizers) { @@ -109,8 +128,7 @@ public void map(Text key, CrawlDatum value, } } if (url == null) { - context.getCounter(NutchMetrics.GROUP_CRAWLDB_FILTER, - NutchMetrics.CRAWLDB_URLS_FILTERED_TOTAL).increment(1); + urlsFilteredCounter.increment(1); } else { // URL has passed filters newKey.set(url); // collect it diff --git a/src/java/org/apache/nutch/crawl/CrawlDbReducer.java b/src/java/org/apache/nutch/crawl/CrawlDbReducer.java index 3ba1734478..3454116575 100644 --- a/src/java/org/apache/nutch/crawl/CrawlDbReducer.java +++ b/src/java/org/apache/nutch/crawl/CrawlDbReducer.java @@ -18,13 +18,16 @@ import java.lang.invoke.MethodHandles; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; @@ -52,6 +55,9 @@ public class CrawlDbReducer extends private FetchSchedule schedule; private ErrorTracker errorTracker; + // Cached counter references for status-based metrics + private Map statusCounters = new HashMap<>(); + @Override public void setup(Reducer.Context context) { Configuration conf = context.getConfiguration(); @@ -66,6 +72,15 @@ public void setup(Reducer.Context context) { errorTracker = new ErrorTracker(NutchMetrics.GROUP_CRAWLDB, context); } + /** + * Get counter for status, caching for subsequent lookups. + */ + private Counter getStatusCounter(byte status, Context context) { + return statusCounters.computeIfAbsent(status, + s -> context.getCounter(NutchMetrics.GROUP_CRAWLDB, + CrawlDatum.getStatusName(s))); + } + @Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { @@ -170,8 +185,7 @@ public void reduce(Text key, Iterable values, } context.write(key, old); // Dynamic counter based on status name - context.getCounter(NutchMetrics.GROUP_CRAWLDB, - CrawlDatum.getStatusName(old.getStatus())).increment(1); + getStatusCounter(old.getStatus(), context).increment(1); } else { LOG.warn("Missing fetch and old value, signature={}", StringUtil.toHexString(signature)); @@ -329,8 +343,7 @@ public void reduce(Text key, Iterable values, result.getMetaData().remove(Nutch.WRITABLE_GENERATE_TIME_KEY); context.write(key, result); // Dynamic counter based on status name - context.getCounter(NutchMetrics.GROUP_CRAWLDB, - CrawlDatum.getStatusName(result.getStatus())).increment(1); + getStatusCounter(result.getStatus(), context).increment(1); } } diff --git a/src/java/org/apache/nutch/crawl/DeduplicationJob.java b/src/java/org/apache/nutch/crawl/DeduplicationJob.java index d5f983a273..50aa4cd7bd 100644 --- a/src/java/org/apache/nutch/crawl/DeduplicationJob.java +++ b/src/java/org/apache/nutch/crawl/DeduplicationJob.java @@ -128,11 +128,25 @@ public static class DedupReducer protected String[] compareOrder; + // Cached counter reference for performance + private Counter documentsMarkedDuplicateCounter; + @Override public void setup( Reducer.Context context) { Configuration conf = context.getConfiguration(); compareOrder = conf.get(DEDUPLICATION_COMPARE_ORDER).split(","); + + // Initialize cached counter reference + initCounters(context); + } + + /** + * Initialize cached counter references to avoid repeated lookups in hot paths. + */ + private void initCounters(Context context) { + documentsMarkedDuplicateCounter = context.getCounter( + NutchMetrics.GROUP_DEDUP, NutchMetrics.DEDUP_DOCUMENTS_MARKED_DUPLICATE_TOTAL); } protected void writeOutAsDuplicate(CrawlDatum datum, @@ -140,8 +154,7 @@ protected void writeOutAsDuplicate(CrawlDatum datum, throws IOException, InterruptedException { datum.setStatus(CrawlDatum.STATUS_DB_DUPLICATE); Text key = (Text) datum.getMetaData().remove(urlKey); - context.getCounter(NutchMetrics.GROUP_DEDUP, - NutchMetrics.DEDUP_DOCUMENTS_MARKED_DUPLICATE_TOTAL).increment(1); + documentsMarkedDuplicateCounter.increment(1); context.write(key, datum); } diff --git a/src/java/org/apache/nutch/crawl/Generator.java b/src/java/org/apache/nutch/crawl/Generator.java index 456ba689a9..57bf7f4766 100644 --- a/src/java/org/apache/nutch/crawl/Generator.java +++ b/src/java/org/apache/nutch/crawl/Generator.java @@ -194,6 +194,17 @@ public static class SelectorMapper private JexlScript expr = null; private ErrorTracker errorTracker; + // Cached counter references for performance + private Counter urlFiltersRejectedCounter; + private Counter scheduleRejectedCounter; + private Counter waitForUpdateCounter; + private Counter exprRejectedCounter; + private Counter statusRejectedCounter; + private Counter scoreTooLowCounter; + private Counter intervalRejectedCounter; + private Counter hostsAffectedPerHostOverflowCounter; + private Counter urlsSkippedPerHostOverflowCounter; + @Override public void setup( Mapper.Context context) @@ -219,6 +230,32 @@ public void setup( expr = JexlUtil.parseExpression(conf.get(GENERATOR_EXPR, null)); // Initialize error tracker with cached counters errorTracker = new ErrorTracker(NutchMetrics.GROUP_GENERATOR, context); + // Initialize cached counter references + initCounters(context); + } + + /** + * Initialize cached counter references to avoid repeated lookups in hot paths. + */ + private void initCounters(Context context) { + urlFiltersRejectedCounter = context.getCounter( + NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_URL_FILTERS_REJECTED_TOTAL); + scheduleRejectedCounter = context.getCounter( + NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_SCHEDULE_REJECTED_TOTAL); + waitForUpdateCounter = context.getCounter( + NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_WAIT_FOR_UPDATE_TOTAL); + exprRejectedCounter = context.getCounter( + NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_EXPR_REJECTED_TOTAL); + statusRejectedCounter = context.getCounter( + NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_STATUS_REJECTED_TOTAL); + scoreTooLowCounter = context.getCounter( + NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_SCORE_TOO_LOW_TOTAL); + intervalRejectedCounter = context.getCounter( + NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_INTERVAL_REJECTED_TOTAL); + hostsAffectedPerHostOverflowCounter = context.getCounter( + NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_HOSTS_AFFECTED_PER_HOST_OVERFLOW_TOTAL); + urlsSkippedPerHostOverflowCounter = context.getCounter( + NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_URLS_SKIPPED_PER_HOST_OVERFLOW_TOTAL); } @Override @@ -230,8 +267,7 @@ public void map(Text key, CrawlDatum value, Context context) // URLFilters try { if (filters.filter(url.toString()) == null) { - context.getCounter(NutchMetrics.GROUP_GENERATOR, - NutchMetrics.GENERATOR_URL_FILTERS_REJECTED_TOTAL).increment(1); + urlFiltersRejectedCounter.increment(1); return; } } catch (URLFilterException e) { @@ -245,8 +281,7 @@ public void map(Text key, CrawlDatum value, Context context) if (!schedule.shouldFetch(url, crawlDatum, curTime)) { LOG.debug("-shouldFetch rejected '{}', fetchTime={}, curTime={}", url, crawlDatum.getFetchTime(), curTime); - context.getCounter(NutchMetrics.GROUP_GENERATOR, - NutchMetrics.GENERATOR_SCHEDULE_REJECTED_TOTAL).increment(1); + scheduleRejectedCounter.increment(1); return; } @@ -255,8 +290,7 @@ public void map(Text key, CrawlDatum value, Context context) if (oldGenTime != null) { // awaiting fetch & update if (oldGenTime.get() + genDelay > curTime) { // still wait for // update - context.getCounter(NutchMetrics.GROUP_GENERATOR, - NutchMetrics.GENERATOR_WAIT_FOR_UPDATE_TOTAL).increment(1); + waitForUpdateCounter.increment(1); return; } } @@ -271,22 +305,19 @@ public void map(Text key, CrawlDatum value, Context context) // check expr if (expr != null) { if (!crawlDatum.execute(expr, key.toString())) { - context.getCounter(NutchMetrics.GROUP_GENERATOR, - NutchMetrics.GENERATOR_EXPR_REJECTED_TOTAL).increment(1); + exprRejectedCounter.increment(1); return; } } if (restrictStatus != -1 && restrictStatus != crawlDatum.getStatus()) { - context.getCounter(NutchMetrics.GROUP_GENERATOR, - NutchMetrics.GENERATOR_STATUS_REJECTED_TOTAL).increment(1); + statusRejectedCounter.increment(1); return; } // consider only entries with a score superior to the threshold if (!Float.isNaN(scoreThreshold) && sort < scoreThreshold) { - context.getCounter(NutchMetrics.GROUP_GENERATOR, - NutchMetrics.GENERATOR_SCORE_TOO_LOW_TOTAL).increment(1); + scoreTooLowCounter.increment(1); return; } @@ -294,8 +325,7 @@ public void map(Text key, CrawlDatum value, Context context) // threshold if (intervalThreshold != -1 && crawlDatum.getFetchInterval() > intervalThreshold) { - context.getCounter(NutchMetrics.GROUP_GENERATOR, - NutchMetrics.GENERATOR_INTERVAL_REJECTED_TOTAL).increment(1); + intervalRejectedCounter.increment(1); return; } @@ -332,6 +362,10 @@ public static class SelectorReducer extends private Map hostDatumCache = new HashMap<>(); private ErrorTracker errorTracker; + // Cached counter references for performance + private Counter hostsAffectedPerHostOverflowCounter; + private Counter urlsSkippedPerHostOverflowCounter; + public void readHostDb() throws IOException { if (conf.get(GENERATOR_HOSTDB) == null) { return; @@ -426,10 +460,22 @@ public void setup(Context context) throws IOException { } // Initialize error tracker with cached counters errorTracker = new ErrorTracker(NutchMetrics.GROUP_GENERATOR, context); + // Initialize cached counter references + initReducerCounters(context); readHostDb(); } + /** + * Initialize cached counter references to avoid repeated lookups in hot paths. + */ + private void initReducerCounters(Context context) { + hostsAffectedPerHostOverflowCounter = context.getCounter( + NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_HOSTS_AFFECTED_PER_HOST_OVERFLOW_TOTAL); + urlsSkippedPerHostOverflowCounter = context.getCounter( + NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_URLS_SKIPPED_PER_HOST_OVERFLOW_TOTAL); + } + @Override public void cleanup(Context context) throws IOException, InterruptedException { @@ -555,15 +601,13 @@ public void reduce(FloatWritable key, Iterable values, hostCount[1] = 1; } else { if (hostCount[1] == (maxCount+1)) { - context.getCounter(NutchMetrics.GROUP_GENERATOR, - NutchMetrics.GENERATOR_HOSTS_AFFECTED_PER_HOST_OVERFLOW_TOTAL).increment(1); + hostsAffectedPerHostOverflowCounter.increment(1); LOG.info( "Host or domain {} has more than {} URLs for all {} segments. Additional URLs won't be included in the fetchlist.", hostordomain, maxCount, maxNumSegments); } // skip this entry - context.getCounter(NutchMetrics.GROUP_GENERATOR, - NutchMetrics.GENERATOR_URLS_SKIPPED_PER_HOST_OVERFLOW_TOTAL).increment(1); + urlsSkippedPerHostOverflowCounter.increment(1); continue; } } diff --git a/src/java/org/apache/nutch/crawl/Injector.java b/src/java/org/apache/nutch/crawl/Injector.java index f84366c2c1..3fe4ce9cec 100644 --- a/src/java/org/apache/nutch/crawl/Injector.java +++ b/src/java/org/apache/nutch/crawl/Injector.java @@ -24,6 +24,7 @@ import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; @@ -130,6 +131,12 @@ public static class InjectMapper private boolean filterNormalizeAll = false; private ErrorTracker errorTracker; + // Cached counter references for performance + private Counter urlsFilteredCounter; + private Counter urlsInjectedCounter; + private Counter urlsPurged404Counter; + private Counter urlsPurgedFilterCounter; + @Override public void setup(Context context) { Configuration conf = context.getConfiguration(); @@ -151,6 +158,22 @@ public void setup(Context context) { url404Purging = conf.getBoolean(CrawlDb.CRAWLDB_PURGE_404, false); // Initialize error tracker with cached counters errorTracker = new ErrorTracker(NutchMetrics.GROUP_INJECTOR, context); + // Initialize cached counter references + initCounters(context); + } + + /** + * Initialize cached counter references to avoid repeated lookups in hot paths. + */ + private void initCounters(Context context) { + urlsFilteredCounter = context.getCounter( + NutchMetrics.GROUP_INJECTOR, NutchMetrics.INJECTOR_URLS_FILTERED_TOTAL); + urlsInjectedCounter = context.getCounter( + NutchMetrics.GROUP_INJECTOR, NutchMetrics.INJECTOR_URLS_INJECTED_TOTAL); + urlsPurged404Counter = context.getCounter( + NutchMetrics.GROUP_INJECTOR, NutchMetrics.INJECTOR_URLS_PURGED_404_TOTAL); + urlsPurgedFilterCounter = context.getCounter( + NutchMetrics.GROUP_INJECTOR, NutchMetrics.INJECTOR_URLS_PURGED_FILTER_TOTAL); } /* Filter and normalize the input url */ @@ -223,8 +246,7 @@ public void map(Text key, Writable value, Context context) url = filterNormalize(url); if (url == null) { - context.getCounter(NutchMetrics.GROUP_INJECTOR, - NutchMetrics.INJECTOR_URLS_FILTERED_TOTAL).increment(1); + urlsFilteredCounter.increment(1); } else { CrawlDatum datum = new CrawlDatum(); datum.setStatus(CrawlDatum.STATUS_INJECTED); @@ -245,8 +267,7 @@ public void map(Text key, Writable value, Context context) url, e.getMessage()); errorTracker.incrementCounters(e); } - context.getCounter(NutchMetrics.GROUP_INJECTOR, - NutchMetrics.INJECTOR_URLS_INJECTED_TOTAL).increment(1); + urlsInjectedCounter.increment(1); context.write(key, datum); } } else if (value instanceof CrawlDatum) { @@ -256,16 +277,14 @@ public void map(Text key, Writable value, Context context) // remove 404 urls if (url404Purging && CrawlDatum.STATUS_DB_GONE == datum.getStatus()) { - context.getCounter(NutchMetrics.GROUP_INJECTOR, - NutchMetrics.INJECTOR_URLS_PURGED_404_TOTAL).increment(1); + urlsPurged404Counter.increment(1); return; } if (filterNormalizeAll) { String url = filterNormalize(key.toString()); if (url == null) { - context.getCounter(NutchMetrics.GROUP_INJECTOR, - NutchMetrics.INJECTOR_URLS_PURGED_FILTER_TOTAL).increment(1); + urlsPurgedFilterCounter.increment(1); } else { key.set(url); context.write(key, datum); @@ -287,6 +306,10 @@ public static class InjectReducer private CrawlDatum old = new CrawlDatum(); private CrawlDatum injected = new CrawlDatum(); + // Cached counter references for performance + private Counter urlsInjectedUniqueCounter; + private Counter urlsMergedCounter; + @Override public void setup(Context context) { Configuration conf = context.getConfiguration(); @@ -296,6 +319,19 @@ public void setup(Context context) { update = conf.getBoolean("db.injector.update", false); LOG.info("Injector: overwrite: {}", overwrite); LOG.info("Injector: update: {}", update); + + // Initialize cached counter references + initCounters(context); + } + + /** + * Initialize cached counter references to avoid repeated lookups in hot paths. + */ + private void initCounters(Context context) { + urlsInjectedUniqueCounter = context.getCounter( + NutchMetrics.GROUP_INJECTOR, NutchMetrics.INJECTOR_URLS_INJECTED_UNIQUE_TOTAL); + urlsMergedCounter = context.getCounter( + NutchMetrics.GROUP_INJECTOR, NutchMetrics.INJECTOR_URLS_MERGED_TOTAL); } /** @@ -351,11 +387,9 @@ public void reduce(Text key, Iterable values, Context context) } } if (injectedSet) { - context.getCounter(NutchMetrics.GROUP_INJECTOR, - NutchMetrics.INJECTOR_URLS_INJECTED_UNIQUE_TOTAL).increment(1); + urlsInjectedUniqueCounter.increment(1); if (oldSet) { - context.getCounter(NutchMetrics.GROUP_INJECTOR, - NutchMetrics.INJECTOR_URLS_MERGED_TOTAL).increment(1); + urlsMergedCounter.increment(1); } } context.write(key, result); diff --git a/src/java/org/apache/nutch/fetcher/Fetcher.java b/src/java/org/apache/nutch/fetcher/Fetcher.java index 01144f4935..029d95ff7c 100644 --- a/src/java/org/apache/nutch/fetcher/Fetcher.java +++ b/src/java/org/apache/nutch/fetcher/Fetcher.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; @@ -155,6 +156,13 @@ public static class FetcherRun extends private boolean storingContent; private boolean parsing; + // Cached counter references for performance + private Counter bytesDownloadedCounter; + private Counter hitByThroughputThresholdCounter; + private Counter hitByTimelimitCounter; + private Counter hungThreadsCounter; + private Counter hitByTimeoutCounter; + private AtomicInteger getActiveThreads() { return activeThreads; } @@ -193,11 +201,28 @@ public void setup(Mapper.Context context) parsing = isParsing(conf); } + /** + * Initialize cached counter references to avoid repeated lookups in hot paths. + */ + private void initCounters(Context context) { + bytesDownloadedCounter = context.getCounter( + NutchMetrics.GROUP_FETCHER, NutchMetrics.FETCHER_BYTES_DOWNLOADED_TOTAL); + hitByThroughputThresholdCounter = context.getCounter( + NutchMetrics.GROUP_FETCHER, NutchMetrics.FETCHER_HIT_BY_THROUGHPUT_THRESHOLD_TOTAL); + hitByTimelimitCounter = context.getCounter( + NutchMetrics.GROUP_FETCHER, NutchMetrics.FETCHER_HIT_BY_TIMELIMIT_TOTAL); + hungThreadsCounter = context.getCounter( + NutchMetrics.GROUP_FETCHER, NutchMetrics.FETCHER_HUNG_THREADS_TOTAL); + hitByTimeoutCounter = context.getCounter( + NutchMetrics.GROUP_FETCHER, NutchMetrics.FETCHER_HIT_BY_TIMEOUT_TOTAL); + } + @Override public void run(Context innerContext) throws IOException, InterruptedException { setup(innerContext); + initCounters(innerContext); try { Configuration conf = innerContext.getConfiguration(); LinkedList fetcherThreads = new LinkedList<>(); @@ -292,8 +317,7 @@ public void run(Context innerContext) pagesLastSec = pages.get() - pagesLastSec; bytesLastSec = (int) bytes.get() - bytesLastSec; - innerContext.getCounter(NutchMetrics.GROUP_FETCHER, - NutchMetrics.FETCHER_BYTES_DOWNLOADED_TOTAL).increment(bytesLastSec); + bytesDownloadedCounter.increment(bytesLastSec); reportStatus(innerContext, fetchQueues, pagesLastSec, bytesLastSec); @@ -331,9 +355,7 @@ public void run(Context innerContext) int hitByThrougputThreshold = fetchQueues.emptyQueues(); if (hitByThrougputThreshold != 0) - innerContext.getCounter(NutchMetrics.GROUP_FETCHER, - NutchMetrics.FETCHER_HIT_BY_THROUGHPUT_THRESHOLD_TOTAL) - .increment(hitByThrougputThreshold); + hitByThroughputThresholdCounter.increment(hitByThrougputThreshold); } } } @@ -414,8 +436,7 @@ else if (bandwidthTargetCheckCounter == bandwidthTargetCheckEveryNSecs) { if (!feeder.isAlive()) { int hitByTimeLimit = fetchQueues.checkTimelimit(); if (hitByTimeLimit != 0) - innerContext.getCounter(NutchMetrics.GROUP_FETCHER, - NutchMetrics.FETCHER_HIT_BY_TIMELIMIT_TOTAL).increment(hitByTimeLimit); + hitByTimelimitCounter.increment(hitByTimeLimit); } /* @@ -431,8 +452,7 @@ else if (bandwidthTargetCheckCounter == bandwidthTargetCheckEveryNSecs) { timeout); LOG.warn("Aborting with {} hung threads{}.", activeThreads, feeder.isAlive() ? " (queue feeder still alive)" : ""); - innerContext.getCounter(NutchMetrics.GROUP_FETCHER, - NutchMetrics.FETCHER_HUNG_THREADS_TOTAL).increment(activeThreads.get()); + hungThreadsCounter.increment(activeThreads.get()); for (int i = 0; i < fetcherThreads.size(); i++) { FetcherThread thread = fetcherThreads.get(i); if (thread.isAlive()) { @@ -467,8 +487,7 @@ else if (bandwidthTargetCheckCounter == bandwidthTargetCheckEveryNSecs) { fetchQueues.getTotalSize(), fetchQueues.getQueueCount(), feeder.isAlive() ? " (queue feeder still alive)" : ""); int hitByTimeout = fetchQueues.emptyQueues(); - innerContext.getCounter(NutchMetrics.GROUP_FETCHER, - NutchMetrics.FETCHER_HIT_BY_TIMEOUT_TOTAL).increment(hitByTimeout); + hitByTimeoutCounter.increment(hitByTimeout); return; } diff --git a/src/java/org/apache/nutch/hostdb/ResolverThread.java b/src/java/org/apache/nutch/hostdb/ResolverThread.java index 4c42c02b4b..05e4a940c8 100644 --- a/src/java/org/apache/nutch/hostdb/ResolverThread.java +++ b/src/java/org/apache/nutch/hostdb/ResolverThread.java @@ -21,6 +21,7 @@ import java.net.UnknownHostException; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.util.StringUtils; @@ -44,6 +45,17 @@ public class ResolverThread implements Runnable { protected Context context; protected int purgeFailedHostsThreshold; + // Cached counter references for performance + private Counter newKnownHostCounter; + private Counter rediscoveredHostCounter; + private Counter existingKnownHostCounter; + private Counter newUnknownHostCounter; + private Counter existingUnknownHostCounter; + private Counter purgedUnknownHostCounter; + private Counter checkedHostsCounter; + private Counter errorsCounter; + private Counter errorsNetworkCounter; + /** * Overloaded constructor. * @param host name of the host to lookup @@ -61,6 +73,33 @@ public ResolverThread(String host, HostDatum datum, this.datum = datum; this.context = context; this.purgeFailedHostsThreshold = purgeFailedHostsThreshold; + + // Initialize cached counters for performance + initCounters(); + } + + /** + * Initialize cached counter references to avoid repeated lookups. + */ + private void initCounters() { + newKnownHostCounter = context.getCounter( + NutchMetrics.GROUP_HOSTDB, NutchMetrics.HOSTDB_NEW_KNOWN_HOST_TOTAL); + rediscoveredHostCounter = context.getCounter( + NutchMetrics.GROUP_HOSTDB, NutchMetrics.HOSTDB_REDISCOVERED_HOST_TOTAL); + existingKnownHostCounter = context.getCounter( + NutchMetrics.GROUP_HOSTDB, NutchMetrics.HOSTDB_EXISTING_KNOWN_HOST_TOTAL); + newUnknownHostCounter = context.getCounter( + NutchMetrics.GROUP_HOSTDB, NutchMetrics.HOSTDB_NEW_UNKNOWN_HOST_TOTAL); + existingUnknownHostCounter = context.getCounter( + NutchMetrics.GROUP_HOSTDB, NutchMetrics.HOSTDB_EXISTING_UNKNOWN_HOST_TOTAL); + purgedUnknownHostCounter = context.getCounter( + NutchMetrics.GROUP_HOSTDB, NutchMetrics.HOSTDB_PURGED_UNKNOWN_HOST_TOTAL); + checkedHostsCounter = context.getCounter( + NutchMetrics.GROUP_HOSTDB, NutchMetrics.HOSTDB_CHECKED_HOSTS_TOTAL); + errorsCounter = context.getCounter( + NutchMetrics.GROUP_HOSTDB, NutchMetrics.ERROR_TOTAL); + errorsNetworkCounter = context.getCounter( + NutchMetrics.GROUP_HOSTDB, NutchMetrics.ERROR_NETWORK_TOTAL); } /** @@ -75,19 +114,16 @@ public void run() { InetAddress inetAddr = InetAddress.getByName(host); if (datum.isEmpty()) { - context.getCounter(NutchMetrics.GROUP_HOSTDB, - NutchMetrics.HOSTDB_NEW_KNOWN_HOST_TOTAL).increment(1); + newKnownHostCounter.increment(1); datum.setLastCheck(); LOG.info("{}: new_known_host {}", host, datum); } else if (datum.getDnsFailures() > 0) { - context.getCounter(NutchMetrics.GROUP_HOSTDB, - NutchMetrics.HOSTDB_REDISCOVERED_HOST_TOTAL).increment(1); + rediscoveredHostCounter.increment(1); datum.setLastCheck(); datum.setDnsFailures(0l); LOG.info("{}: rediscovered_host {}", host, datum); } else { - context.getCounter(NutchMetrics.GROUP_HOSTDB, - NutchMetrics.HOSTDB_EXISTING_KNOWN_HOST_TOTAL).increment(1); + existingKnownHostCounter.increment(1); datum.setLastCheck(); LOG.info("{}: existing_known_host {}", host, datum); } @@ -101,8 +137,7 @@ public void run() { datum.setLastCheck(); datum.setDnsFailures(1l); context.write(hostText, datum); - context.getCounter(NutchMetrics.GROUP_HOSTDB, - NutchMetrics.HOSTDB_NEW_UNKNOWN_HOST_TOTAL).increment(1); + newUnknownHostCounter.increment(1); LOG.info("{}: new_unknown_host {}", host, datum); } else { datum.setLastCheck(); @@ -113,12 +148,10 @@ public void run() { purgeFailedHostsThreshold < datum.getDnsFailures()) { context.write(hostText, datum); - context.getCounter(NutchMetrics.GROUP_HOSTDB, - NutchMetrics.HOSTDB_EXISTING_UNKNOWN_HOST_TOTAL).increment(1); + existingUnknownHostCounter.increment(1); LOG.info("{}: existing_unknown_host {}", host, datum); } else { - context.getCounter(NutchMetrics.GROUP_HOSTDB, - NutchMetrics.HOSTDB_PURGED_UNKNOWN_HOST_TOTAL).increment(1); + purgedUnknownHostCounter.increment(1); LOG.info("{}: purged_unknown_host {}", host, datum); } } @@ -126,10 +159,8 @@ public void run() { // Dynamic counter based on failure count - can't cache context.getCounter(NutchMetrics.GROUP_HOSTDB, createFailureCounterLabel(datum)).increment(1); // Common error counters for consistency - context.getCounter(NutchMetrics.GROUP_HOSTDB, - NutchMetrics.ERROR_TOTAL).increment(1); - context.getCounter(NutchMetrics.GROUP_HOSTDB, - NutchMetrics.ERROR_NETWORK_TOTAL).increment(1); + errorsCounter.increment(1); + errorsNetworkCounter.increment(1); } catch (Exception ioe) { LOG.warn(StringUtils.stringifyException(ioe)); context.getCounter(NutchMetrics.GROUP_HOSTDB, @@ -139,14 +170,12 @@ public void run() { } } catch (Exception e) { LOG.warn(StringUtils.stringifyException(e)); - context.getCounter(NutchMetrics.GROUP_HOSTDB, - NutchMetrics.ERROR_TOTAL).increment(1); + errorsCounter.increment(1); context.getCounter(NutchMetrics.GROUP_HOSTDB, ErrorTracker.getCounterName(e)).increment(1); } - context.getCounter(NutchMetrics.GROUP_HOSTDB, - NutchMetrics.HOSTDB_CHECKED_HOSTS_TOTAL).increment(1); + checkedHostsCounter.increment(1); } private String createFailureCounterLabel(HostDatum datum) { diff --git a/src/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java b/src/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java index 10a08d55a0..b1736348b8 100644 --- a/src/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java +++ b/src/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java @@ -80,12 +80,19 @@ public void setup(Mapper.Context context) { normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_DEFAULT); // Initialize cached counter references - filteredRecordsCounter = context.getCounter( - NutchMetrics.GROUP_HOSTDB, NutchMetrics.HOSTDB_FILTERED_RECORDS_TOTAL); + initCounters(context); // Initialize error tracker with cached counters errorTracker = new ErrorTracker(NutchMetrics.GROUP_HOSTDB, context); } + /** + * Initialize cached counter references to avoid repeated lookups in hot paths. + */ + private void initCounters(Context context) { + filteredRecordsCounter = context.getCounter( + NutchMetrics.GROUP_HOSTDB, NutchMetrics.HOSTDB_FILTERED_RECORDS_TOTAL); + } + /** * Filters and or normalizes the input hostname by applying the configured URL * filters and normalizers the URL "http://hostname/". diff --git a/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java b/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java index 6c979f222e..878216b3c6 100644 --- a/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java +++ b/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java @@ -154,6 +154,13 @@ public void setup(Reducer.Context context) } // Initialize cached counter references + initCounters(context); + } + + /** + * Initialize cached counter references to avoid repeated lookups in hot paths. + */ + private void initCounters(Reducer.Context context) { urlLimitNotReachedCounter = context.getCounter( NutchMetrics.GROUP_HOSTDB, NutchMetrics.HOSTDB_URL_LIMIT_NOT_REACHED_TOTAL); totalHostsCounter = context.getCounter( diff --git a/src/java/org/apache/nutch/indexer/CleaningJob.java b/src/java/org/apache/nutch/indexer/CleaningJob.java index ae01e4b0d1..dc466dad06 100644 --- a/src/java/org/apache/nutch/indexer/CleaningJob.java +++ b/src/java/org/apache/nutch/indexer/CleaningJob.java @@ -26,6 +26,7 @@ import org.apache.hadoop.io.ByteWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; @@ -89,6 +90,9 @@ public static class DeleterReducer extends IndexWriters writers = null; + // Cached counter reference for performance + private Counter deletedDocumentsCounter; + @Override public void setup(Reducer.Context context) { Configuration conf = context.getConfiguration(); @@ -99,6 +103,17 @@ public void setup(Reducer.Context contex throw new RuntimeException(e); } noCommit = conf.getBoolean("noCommit", false); + + // Initialize cached counter reference + initCounters(context); + } + + /** + * Initialize cached counter references to avoid repeated lookups in hot paths. + */ + private void initCounters(Context context) { + deletedDocumentsCounter = context.getCounter( + NutchMetrics.GROUP_CLEANING, NutchMetrics.CLEANING_DELETED_DOCUMENTS_TOTAL); } @Override @@ -119,8 +134,7 @@ public void reduce(ByteWritable key, Iterable values, for (Text document : values) { writers.delete(document.toString()); totalDeleted++; - context.getCounter(NutchMetrics.GROUP_CLEANING, - NutchMetrics.CLEANING_DELETED_DOCUMENTS_TOTAL).increment(1); + deletedDocumentsCounter.increment(1); } } } diff --git a/src/java/org/apache/nutch/scoring/webgraph/WebGraph.java b/src/java/org/apache/nutch/scoring/webgraph/WebGraph.java index 0b728a588c..fee0921d0a 100644 --- a/src/java/org/apache/nutch/scoring/webgraph/WebGraph.java +++ b/src/java/org/apache/nutch/scoring/webgraph/WebGraph.java @@ -48,6 +48,7 @@ import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat; import org.apache.hadoop.mapreduce.Mapper; @@ -328,6 +329,10 @@ public static class OutlinkDbReducer extends // url normalizers, filters and job configuration private Configuration conf; + // Cached counter references for performance + private Counter addedLinksCounter; + private Counter removedLinksCounter; + /** * Configures the OutlinkDb job reducer. Sets up internal links and link limiting. */ @@ -340,6 +345,18 @@ public void setup(Reducer.Context context) limitPages = conf.getBoolean("link.ignore.limit.page", true); limitDomains = conf.getBoolean("link.ignore.limit.domain", true); + // Initialize cached counter references + initCounters(context); + } + + /** + * Initialize cached counter references to avoid repeated lookups in hot paths. + */ + private void initCounters(Context context) { + addedLinksCounter = context.getCounter( + NutchMetrics.GROUP_WEBGRAPH, NutchMetrics.WEBGRAPH_ADDED_LINKS_TOTAL); + removedLinksCounter = context.getCounter( + NutchMetrics.GROUP_WEBGRAPH, NutchMetrics.WEBGRAPH_REMOVED_LINKS_TOTAL); } @Override @@ -362,16 +379,14 @@ public void reduce(Text key, Iterable values, mostRecent = timestamp; } outlinkList.add(WritableUtils.clone(next, conf)); - context.getCounter(NutchMetrics.GROUP_WEBGRAPH, - NutchMetrics.WEBGRAPH_ADDED_LINKS_TOTAL).increment(1); + addedLinksCounter.increment(1); } else if (value instanceof BooleanWritable) { BooleanWritable delete = (BooleanWritable) value; // Actually, delete is always true, otherwise we don't emit it in the // mapper in the first place if (delete.get() == true) { // This page is gone, do not emit it's outlinks - context.getCounter(NutchMetrics.GROUP_WEBGRAPH, - NutchMetrics.WEBGRAPH_REMOVED_LINKS_TOTAL).increment(1); + removedLinksCounter.increment(1); return; } } diff --git a/src/java/org/apache/nutch/tools/warc/WARCExporter.java b/src/java/org/apache/nutch/tools/warc/WARCExporter.java index f271adfe94..14b59ac85c 100644 --- a/src/java/org/apache/nutch/tools/warc/WARCExporter.java +++ b/src/java/org/apache/nutch/tools/warc/WARCExporter.java @@ -124,6 +124,15 @@ public static class WARCReducer @Override public void setup(Context context) { // Initialize cached counter references + initCounters(context); + // Initialize error tracker with cached counters + errorTracker = new ErrorTracker(NutchMetrics.GROUP_WARC_EXPORTER, context); + } + + /** + * Initialize cached counter references to avoid repeated lookups in hot paths. + */ + private void initCounters(Context context) { missingContentCounter = context.getCounter( NutchMetrics.GROUP_WARC_EXPORTER, NutchMetrics.WARC_MISSING_CONTENT_TOTAL); missingMetadataCounter = context.getCounter( @@ -132,8 +141,6 @@ public void setup(Context context) { NutchMetrics.GROUP_WARC_EXPORTER, NutchMetrics.WARC_OMITTED_EMPTY_RESPONSE_TOTAL); recordsGeneratedCounter = context.getCounter( NutchMetrics.GROUP_WARC_EXPORTER, NutchMetrics.WARC_RECORDS_GENERATED_TOTAL); - // Initialize error tracker with cached counters - errorTracker = new ErrorTracker(NutchMetrics.GROUP_WARC_EXPORTER, context); } @Override diff --git a/src/java/org/apache/nutch/util/DomainStatistics.java b/src/java/org/apache/nutch/util/DomainStatistics.java index 5ee09c846a..4057795d52 100644 --- a/src/java/org/apache/nutch/util/DomainStatistics.java +++ b/src/java/org/apache/nutch/util/DomainStatistics.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; @@ -38,6 +39,7 @@ import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.metrics.NutchMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,10 +54,6 @@ public class DomainStatistics extends Configured implements Tool { private static final Text FETCHED_TEXT = new Text("FETCHED"); private static final Text NOT_FETCHED_TEXT = new Text("NOT_FETCHED"); - public static enum MyCounter { - FETCHED, NOT_FETCHED, EMPTY_RESULT - }; - private static final int MODE_HOST = 1; private static final int MODE_DOMAIN = 2; private static final int MODE_SUFFIX = 3; @@ -158,10 +156,29 @@ static class DomainStatisticsMapper extends Mapper { int mode = 0; + // Cached counter references for performance + private Counter fetchedCounter; + private Counter notFetchedCounter; + private Counter emptyResultCounter; + @Override public void setup(Context context) { mode = context.getConfiguration().getInt("domain.statistics.mode", MODE_DOMAIN); + // Initialize cached counter references + initCounters(context); + } + + /** + * Initialize cached counter references to avoid repeated lookups in hot paths. + */ + private void initCounters(Context context) { + fetchedCounter = context.getCounter( + NutchMetrics.GROUP_DOMAIN_STATS, NutchMetrics.DOMAIN_STATS_FETCHED_TOTAL); + notFetchedCounter = context.getCounter( + NutchMetrics.GROUP_DOMAIN_STATS, NutchMetrics.DOMAIN_STATS_NOT_FETCHED_TOTAL); + emptyResultCounter = context.getCounter( + NutchMetrics.GROUP_DOMAIN_STATS, NutchMetrics.DOMAIN_STATS_EMPTY_RESULT_TOTAL); } @Override @@ -197,17 +214,17 @@ public void map(Text urlText, CrawlDatum datum, Context context) } if (out.trim().equals("")) { LOG.info("url : {}", url); - context.getCounter(MyCounter.EMPTY_RESULT).increment(1); + emptyResultCounter.increment(1); } context.write(new Text(out), new LongWritable(1)); } catch (Exception ex) { } - context.getCounter(MyCounter.FETCHED).increment(1); + fetchedCounter.increment(1); context.write(FETCHED_TEXT, new LongWritable(1)); } else { - context.getCounter(MyCounter.NOT_FETCHED).increment(1); + notFetchedCounter.increment(1); context.write(NOT_FETCHED_TEXT, new LongWritable(1)); } } diff --git a/src/java/org/apache/nutch/util/SitemapProcessor.java b/src/java/org/apache/nutch/util/SitemapProcessor.java index 4b55a72ebb..21362223cd 100644 --- a/src/java/org/apache/nutch/util/SitemapProcessor.java +++ b/src/java/org/apache/nutch/util/SitemapProcessor.java @@ -151,6 +151,15 @@ public void setup(Context context) { } // Initialize cached counter references + initCounters(context); + // Initialize error tracker with cached counters + errorTracker = new ErrorTracker(NutchMetrics.GROUP_SITEMAP, context); + } + + /** + * Initialize cached counter references to avoid repeated lookups in hot paths. + */ + private void initCounters(Context context) { filteredRecordsCounter = context.getCounter( NutchMetrics.GROUP_SITEMAP, NutchMetrics.SITEMAP_FILTERED_RECORDS_TOTAL); seedsCounter = context.getCounter( @@ -161,8 +170,6 @@ public void setup(Context context) { NutchMetrics.GROUP_SITEMAP, NutchMetrics.SITEMAP_FILTERED_FROM_HOSTNAME_TOTAL); failedFetchesCounter = context.getCounter( NutchMetrics.GROUP_SITEMAP, NutchMetrics.SITEMAP_FAILED_FETCHES_TOTAL); - // Initialize error tracker with cached counters - errorTracker = new ErrorTracker(NutchMetrics.GROUP_SITEMAP, context); } @Override @@ -377,6 +384,13 @@ public void setup(Context context) { this.overwriteExisting = conf.getBoolean(SITEMAP_OVERWRITE_EXISTING, false); // Initialize cached counter references + initCounters(context); + } + + /** + * Initialize cached counter references to avoid repeated lookups in hot paths. + */ + private void initCounters(Context context) { existingEntriesCounter = context.getCounter( NutchMetrics.GROUP_SITEMAP, NutchMetrics.SITEMAP_EXISTING_ENTRIES_TOTAL); newEntriesCounter = context.getCounter(