diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index 4d06f3b1aab..b7eb6cfc671 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -526,6 +526,11 @@ ConditionalTabletMutator requireSame(TabletMetadata tabletMetadata, ColumnType t */ ConditionalTabletMutator requireFiles(Set files); + /* + * Require that a tablet contain all the files in the set with the exact same DataFileValue + */ + ConditionalTabletMutator requireFiles(Map files); + /** * Require that a tablet have less than or equals the specified number of files. */ diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/DataFileValue.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/DataFileValue.java index 78bbb8d8ef8..92c26e1a9d6 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/DataFileValue.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/DataFileValue.java @@ -32,17 +32,34 @@ public class DataFileValue { private final long size; private final long numEntries; private long time = -1; + private boolean shared; + + public DataFileValue(long size, long numEntries, long time, boolean shared) { + this.size = size; + this.numEntries = numEntries; + this.time = time; + this.shared = shared; + } + + public DataFileValue(long size, long numEntries, boolean shared) { + this.size = size; + this.numEntries = numEntries; + this.time = -1; + this.shared = shared; + } public DataFileValue(long size, long numEntries, long time) { this.size = size; this.numEntries = numEntries; this.time = time; + this.shared = false; } public DataFileValue(long size, long numEntries) { this.size = size; this.numEntries = numEntries; this.time = -1; + this.shared = false; } public DataFileValue(String encodedDFV) { @@ -50,11 +67,19 @@ public DataFileValue(String encodedDFV) { size = Long.parseLong(ba[0]); numEntries = Long.parseLong(ba[1]); + time = -1; + shared = false; if (ba.length == 3) { - time = Long.parseLong(ba[2]); - } else { - time = -1; + // Could be old format with time, or new format with shared + try { + time = Long.parseLong(ba[2]); + } catch (NumberFormatException e) { + shared = Boolean.parseBoolean(ba[2]); + } + } else if (ba.length == 4) { + shared = Boolean.parseBoolean(ba[2]); + time = Long.parseLong(ba[3]); } } @@ -78,15 +103,19 @@ public long getTime() { return time; } + public boolean isShared() { + return shared; + } + public byte[] encode() { return encodeAsString().getBytes(UTF_8); } public String encodeAsString() { if (time >= 0) { - return ("" + size + "," + numEntries + "," + time); + return ("" + size + "," + numEntries + "," + shared + "," + time); } - return ("" + size + "," + numEntries); + return ("" + size + "," + numEntries + "," + shared); } public Value encodeAsValue() { @@ -97,7 +126,8 @@ public Value encodeAsValue() { public boolean equals(Object o) { if (o instanceof DataFileValue odfv) { - return size == odfv.size && numEntries == odfv.numEntries && time == odfv.time; + return size == odfv.size && numEntries == odfv.numEntries && time == odfv.time + && shared == odfv.shared; } return false; @@ -105,12 +135,12 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Long.valueOf(size + numEntries).hashCode(); + return Long.valueOf(size + numEntries + (shared ? 1 : 0)).hashCode(); } @Override public String toString() { - return size + " " + numEntries; + return size + " " + numEntries + " " + shared; } public void setTime(long time) { @@ -120,6 +150,10 @@ public void setTime(long time) { this.time = time; } + public void setShared(boolean shared) { + this.shared = shared; + } + /** * @return true if {@link #wrapFileIterator} would wrap a given iterator, false otherwise. */ diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java index 50acebc727d..617bba2cb57 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java @@ -32,6 +32,7 @@ import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.encodePrevEndRow; import java.util.HashSet; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.function.BiConsumer; @@ -52,6 +53,7 @@ import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.Ample.ConditionalTabletMutator; +import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CompactedColumnFamily; @@ -363,6 +365,17 @@ public ConditionalTabletMutator requireFiles(Set files) { return this; } + @Override + public ConditionalTabletMutator requireFiles(Map files) { + Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); + Condition c = SetEncodingIterator.createConditionWithVal(files.entrySet(), + entry -> new Pair<>(entry.getKey().getMetadata().getBytes(UTF_8), + entry.getValue().encode()), + DataFileColumnFamily.NAME); + mutation.addCondition(c); + return this; + } + @Override public ConditionalTabletMutator requireLessOrEqualsFiles(long limit) { Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java index 92f0c035987..7c467a4b9d0 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java @@ -34,6 +34,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.SortedMap; import java.util.TreeMap; @@ -179,6 +180,48 @@ public static void deleteTable(TableId tableId, boolean insertDeletes, ServerCon return new Pair<>(result, sizes); } + private static void markTabletFilesAsShared(ServerContext ctx, TabletMetadata srcTablet) { + if (srcTablet.getFiles().isEmpty()) { + return; + } + + Map currentFilesMap = srcTablet.getFilesMap(); + + // Skip if all files are already shared + boolean anyNonShared = currentFilesMap.values().stream().anyMatch(dfv -> !dfv.isShared()); + if (!anyNonShared) { + return; + } + + try (var conditionalMutator = ctx.getAmple().conditionallyMutateTablets()) { + var tabletMutator = conditionalMutator.mutateTablet(srcTablet.getExtent()) + .requireAbsentOperation().requireFiles(currentFilesMap); + + // Write updated DataFileValues with shared=true for any non-shared files + for (var entry : currentFilesMap.entrySet()) { + StoredTabletFile file = entry.getKey(); + DataFileValue dfv = entry.getValue(); + if (!dfv.isShared()) { + DataFileValue sharedDfv = new DataFileValue(dfv.getSize(), dfv.getNumEntries(), + dfv.isTimeSet() ? dfv.getTime() : -1, true); + tabletMutator.putFile(file, sharedDfv); + log.debug("Marking file {} as shared in source tablet {} (conditional)", + file.getFileName(), srcTablet.getExtent()); + } + } + + tabletMutator.submit(tm -> false, () -> "mark source files as shared for clone"); + + var result = conditionalMutator.process().get(srcTablet.getExtent()); + if (result.getStatus() != Ample.ConditionalResult.Status.ACCEPTED) { + log.debug( + "Conditional mutation to mark files as shared was rejected for tablet {} — " + + "tablet changed concurrently, clone will retry for this tablet", + srcTablet.getExtent()); + } + } + } + private static Mutation createCloneMutation(TableId srcTableId, TableId tableId, Iterable> tablet) { @@ -191,7 +234,12 @@ private static Mutation createCloneMutation(TableId srcTableId, TableId tableId, if (!cf.startsWith("../") && !cf.contains(":")) { cf = "../" + srcTableId + entry.getKey().getColumnQualifier(); } - m.put(entry.getKey().getColumnFamily(), new Text(cf), entry.getValue()); + + DataFileValue ogVal = new DataFileValue(entry.getValue().get()); + DataFileValue newSharedVal = new DataFileValue(ogVal.getSize(), ogVal.getNumEntries(), + ogVal.isTimeSet() ? ogVal.getTime() : -1, true); + + m.put(entry.getKey().getColumnFamily(), new Text(cf), newSharedVal.encodeAsValue()); } else if (entry.getKey().getColumnFamily().equals(CurrentLocationColumnFamily.NAME)) { m.put(LastLocationColumnFamily.NAME, entry.getKey().getColumnQualifier(), entry.getValue()); } else if (entry.getKey().getColumnFamily().equals(LastLocationColumnFamily.NAME)) { @@ -350,6 +398,11 @@ public static void cloneTable(ServerContext context, TableId srcTableId, TableId while (true) { try { + try (TabletsMetadata tabletsMetadata = createCloneScanner(null, srcTableId, context)) { + for (TabletMetadata tablet : tabletsMetadata) { + markTabletFilesAsShared(context, tablet); + } + } initializeClone(null, srcTableId, tableId, context, bw); // the following loop looks changes in the file that occurred during the copy.. if files diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java index bf54f9b836f..04e6a9b6162 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java @@ -26,6 +26,7 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; import java.time.Duration; +import java.util.ArrayList; import java.util.HashSet; import java.util.Optional; import java.util.Set; @@ -67,6 +68,10 @@ public CommitCompaction(CompactionCommitData commitData, String newDatafile) { this.newDatafile = newDatafile; } + private record CompactionFileResult(TabletMetadata tabletMetadata, + ArrayList filesToDeleteViaGc) { + } + @Override public Repo call(FateId fateId, FateEnv env) throws Exception { var ecid = ExternalCompactionId.of(commitData.ecid); @@ -79,25 +84,38 @@ public Repo call(FateId fateId, FateEnv env) throws Exception { // process died and now its running again. In this case commit should do nothing, but its // important to still carry on with the rest of the steps after commit. This code ignores a that // fact that a commit may not have happened in the current call and continues for this reason. - TabletMetadata tabletMetadata = commitCompaction(env.getContext(), ecid, newFile); + CompactionFileResult fileResult = commitCompaction(env.getContext(), ecid, newFile); String loc = null; - if (tabletMetadata != null && tabletMetadata.getLocation() != null) { - loc = tabletMetadata.getLocation().getHostPortSession(); + if (fileResult != null && fileResult.tabletMetadata.getLocation() != null) { + loc = fileResult.tabletMetadata.getLocation().getHostPortSession(); } // This will causes the tablet to be reexamined to see if it needs any more compactions. var extent = KeyExtent.fromThrift(commitData.textent); env.getEventPublisher().event(extent, "Compaction completed %s", extent); - return new PutGcCandidates(commitData, loc); + return new PutGcCandidates(commitData, loc, fileResult.filesToDeleteViaGc()); } KeyExtent getExtent() { return KeyExtent.fromThrift(commitData.textent); } - private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId ecid, + private ArrayList computeNonSharedFiles(TabletMetadata tablet, + CompactionMetadata ecm) { + ArrayList nonSharedFiles = new ArrayList<>(); + var tabletFilesMap = tablet.getFilesMap(); + for (StoredTabletFile file : ecm.getJobFiles()) { + DataFileValue dfv = tabletFilesMap.get(file); + if (dfv == null || !dfv.isShared()) { + nonSharedFiles.add(file); + } + } + return nonSharedFiles; + } + + private CompactionFileResult commitCompaction(ServerContext ctx, ExternalCompactionId ecid, Optional newDatafile) { var tablet = @@ -107,6 +125,8 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId .incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(10)).backOffFactor(1.5) .logInterval(Duration.ofMinutes(3)).createRetry(); + ArrayList filesToDeleteViaGc = new ArrayList<>(); + while (canCommitCompaction(ecid, tablet)) { CompactionMetadata ecm = tablet.getExternalCompactions().get(ecid); @@ -119,7 +139,7 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { var tabletMutator = tabletsMutator.mutateTablet(getExtent()).requireAbsentOperation() .requireCompaction(ecid).requireSame(tablet, LOCATION) - .requireFiles(commitData.getJobFiles()); + .requireFiles(tablet.getFilesMap()); if (ecm.getKind() == CompactionKind.USER) { tabletMutator.requireSame(tablet, SELECTED, COMPACTED); @@ -140,6 +160,7 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId var result = tabletsMutator.process().get(getExtent()); if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) { + filesToDeleteViaGc = computeNonSharedFiles(tablet, ecm); // Compaction was successfully committed to the tablet so log it TabletLogger.compacted(getExtent(), ecid, commitData.kind, commitData.getJobFiles(), newDatafile); @@ -156,7 +177,7 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId } } - return tablet; + return new CompactionFileResult(tablet, filesToDeleteViaGc); } private void updateTabletForCompaction(TCompactionStats stats, ExternalCompactionId ecid, @@ -212,11 +233,14 @@ private void updateTabletForCompaction(TCompactionStats stats, ExternalCompactio ecm.getJobFiles().forEach(tabletMutator::putScan); } ecm.getJobFiles().forEach(tabletMutator::deleteFile); + tabletMutator.deleteExternalCompaction(ecid); if (newDatafile.isPresent()) { - tabletMutator.putFile(newDatafile.orElseThrow(), - new DataFileValue(stats.getFileSize(), stats.getEntriesWritten())); + // Mark new compaction files as not shared. + DataFileValue newFileValue = + new DataFileValue(stats.getFileSize(), stats.getEntriesWritten(), false); + tabletMutator.putFile(newDatafile.orElseThrow(), newFileValue); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/PutGcCandidates.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/PutGcCandidates.java index 89cdedab8ec..cb583dfdf39 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/PutGcCandidates.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/PutGcCandidates.java @@ -18,26 +18,78 @@ */ package org.apache.accumulo.manager.compaction.coordinator.commit; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.gc.ReferenceFile; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.manager.tableOps.AbstractFateOperation; import org.apache.accumulo.manager.tableOps.FateEnv; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class PutGcCandidates extends AbstractFateOperation { private static final long serialVersionUID = 1L; private final CompactionCommitData commitData; private final String refreshLocation; + private final ArrayList filesToDeleteViaGc; + private static final Logger LOG = LoggerFactory.getLogger(PutGcCandidates.class); - public PutGcCandidates(CompactionCommitData commitData, String refreshLocation) { + public PutGcCandidates(CompactionCommitData commitData, String refreshLocation, + ArrayList filesToDeleteViaGc) { this.commitData = commitData; this.refreshLocation = refreshLocation; + this.filesToDeleteViaGc = new ArrayList<>(); + for (StoredTabletFile file : filesToDeleteViaGc) { + this.filesToDeleteViaGc.add(file.getMetadataPath()); + } } @Override public Repo call(FateId fateId, FateEnv env) throws Exception { + var extent = KeyExtent.fromThrift(commitData.textent); + Set nonSharedSet = new HashSet<>(filesToDeleteViaGc); + List gcCandidates = new ArrayList<>(); - // add the GC candidates - env.getContext().getAmple().putGcCandidates(commitData.getTableId(), commitData.getJobFiles()); + for (StoredTabletFile jobFile : commitData.getJobFiles()) { + if (nonSharedSet.contains(jobFile.getMetadataPath())) { + // File is confirmed non-shared, attempt direct filesystem deletion + try { + Path filePath = new Path(jobFile.getMetadataPath()); + boolean deleted = env.getContext().getVolumeManager().deleteRecursively(filePath); + if (deleted) { + LOG.debug("Directly deleted non-shared compaction input file: {}", + jobFile.getFileName()); + } else { + LOG.debug("Non-shared file {} was already absent (likely retry), no GC marker needed", + jobFile.getFileName()); + } + } catch (IOException e) { + // Direct deletion failed, fall back to GC marker to ensure eventual cleanup + LOG.warn("Failed to directly delete non-shared file {}, falling back to GC marker: {}", + jobFile.getFileName(), e.getMessage()); + gcCandidates.add(ReferenceFile.forFile(extent.tableId(), jobFile)); + } + } else { + LOG.debug("File {} is shared or shared status unknown, will create GC delete marker", + jobFile.getFileName()); + gcCandidates.add(ReferenceFile.forFile(extent.tableId(), jobFile)); + } + } + + // Write GC delete markers for any shared files or direct-delete fallbacks + if (!gcCandidates.isEmpty()) { + env.getContext().getAmple().putGcFileAndDirCandidates(extent.tableId(), gcCandidates); + LOG.debug("Created {} GC delete markers for compaction", gcCandidates.size()); + } if (refreshLocation == null) { return null; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java index 75c71f02229..08f947c751c 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java @@ -109,7 +109,16 @@ public long isReady(FateId fateId, FateEnv env) throws Exception { } VolumeManager fs = env.getVolumeManager(); final Path bulkDir = new Path(bulkInfo.bulkDir); - try (LoadMappingIterator lmi = + env.updateBulkImportStatus(bulkInfo.sourceDir, BulkImportState.LOADING); + + // Compute which files appear in more than one tablet range + Set sharedFiles; + try (LoadMappingIterator lmiPass1 = + BulkSerialize.getUpdatedLoadMapping(bulkDir.toString(), bulkInfo.tableId, fs::open)) { + sharedFiles = computeSharedFiles(fateId, lmiPass1); + } + + try (LoadMappingIterator lmiPass2 = BulkSerialize.getUpdatedLoadMapping(bulkDir.toString(), bulkInfo.tableId, fs::open)) { Loader loader = new Loader(env, bulkInfo.tableId); @@ -125,8 +134,26 @@ public long isReady(FateId fateId, FateEnv env) throws Exception { int skip = env.getContext().getTableConfiguration(bulkInfo.tableId) .getCount(Property.TABLE_BULK_SKIP_THRESHOLD); - return loadFiles(loader, bulkInfo, bulkDir, lmi, tmf, fateId, skip); + return loadFiles(loader, bulkInfo, bulkDir, lmiPass2, tmf, fateId, skip, sharedFiles); + } + } + + /** + * Iterate the load mapping once to compute which file names appear in more than one tablet range, + * returns the set of "shared" file names + */ + static Set computeSharedFiles(FateId fateId, LoadMappingIterator lmi) throws Exception { + Map fileTabletCount = new HashMap<>(); + while (lmi.hasNext()) { + for (var fileInfo : lmi.next().getValue()) { + fileTabletCount.merge(fileInfo.getFileName(), 1, Integer::sum); + } } + Set sharedFiles = fileTabletCount.entrySet().stream().filter(e -> e.getValue() > 1) + .map(Map.Entry::getKey).collect(Collectors.toSet()); + log.debug("{}: Detected {} shared files out of {} total files", fateId.getTxUUIDStr(), + sharedFiles.size(), fileTabletCount.size()); + return sharedFiles; } @Override @@ -163,7 +190,7 @@ void start(Path bulkDir, TableId tableId, FateId fateId, boolean setTime) throws this.loadingFiles = new HashMap<>(); } - void load(List tablets, Files files) { + void load(List tablets, Files files, Set sharedFiles) { Map toLoad = new HashMap<>(); for (var fileInfo : files) { @@ -241,16 +268,18 @@ void load(List tablets, Files files) { ReferencedTabletFile refTabFile = entry.getKey(); Bulk.FileInfo fileInfo = entry.getValue(); + boolean isShared = sharedFiles.contains(fileInfo.getFileName()); DataFileValue dfv; if (setTime) { // This should always be set outside the loop when setTime is true and should not be // null at this point Preconditions.checkState(fileTime != null); - dfv = - new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries(), fileTime); + dfv = new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries(), + fileTime, isShared); } else { - dfv = new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries()); + dfv = + new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries(), isShared); } filesToLoad.put(refTabFile, dfv); @@ -389,11 +418,15 @@ static class ImportTimingStats { // visible for testing static long loadFiles(Loader loader, BulkInfo bulkInfo, Path bulkDir, LoadMappingIterator loadMapIter, TabletsMetadataFactory factory, FateId fateId, - int skipDistance) throws Exception { + int skipDistance, Set sharedFiles) throws Exception { PeekingIterator> lmi = new PeekingIterator<>(loadMapIter); - Map.Entry loadMapEntry = lmi.peek(); - Text startRow = loadMapEntry.getKey().prevEndRow(); + if (!lmi.hasNext()) { + log.warn("{}: No files to load", fateId.getTxUUIDStr()); + return 0; + } + + Text startRow = lmi.peek().getKey().prevEndRow(); String fmtTid = fateId.getTxUUIDStr(); log.trace("{}: Started loading files at row: {}", fmtTid, startRow); @@ -407,11 +440,11 @@ static long loadFiles(Loader loader, BulkInfo bulkInfo, Path bulkDir, try { PeekingIterator pi = new PeekingIterator<>(tabletsMetadata.iterator()); while (lmi.hasNext()) { - loadMapEntry = lmi.next(); + var entry = lmi.next(); // If the user set the TABLE_BULK_SKIP_THRESHOLD property, then only look // at the next skipDistance tablets before recreating the iterator if (skipDistance > 0) { - final KeyExtent loadMapKey = loadMapEntry.getKey(); + final KeyExtent loadMapKey = entry.getKey(); if (!pi.findWithin( tm -> PREV_COMP.compare(tm.getPrevEndRow(), loadMapKey.prevEndRow()) >= 0, skipDistance)) { @@ -424,8 +457,8 @@ static long loadFiles(Loader loader, BulkInfo bulkInfo, Path bulkDir, } } List tablets = - findOverlappingTablets(fmtTid, loadMapEntry.getKey(), pi, importTimingStats); - loader.load(tablets, loadMapEntry.getValue()); + findOverlappingTablets(fmtTid, entry.getKey(), pi, importTimingStats); + loader.load(tablets, entry.getValue(), sharedFiles); } } finally { tabletsMetadata.close(); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java index c12386a3721..00d4851f019 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java @@ -132,6 +132,7 @@ static Map> getNewTabletFiles(Fate Function fileInfoProvider) { Map> tabletsFiles = new TreeMap<>(); + Map fileTabletCount = new HashMap<>(); newTablets.keySet().forEach(extent -> tabletsFiles.put(extent, new HashMap<>())); @@ -175,6 +176,8 @@ static Map> getNewTabletFiles(Fate double numOverlapping = newTablets.keySet().stream().map(KeyExtent::toDataRange).filter(overlapPredicate).count(); + fileTabletCount.put(file, (int) numOverlapping); + if (numOverlapping == 0) { log.debug("{} File {} with range {} that does not overlap tablet {}", fateId, file, fileRange, tabletMetadata.getExtent()); @@ -186,8 +189,9 @@ static Map> getNewTabletFiles(Fate // add the file to the tablets it overlaps newTablets.keySet().forEach(newTablet -> { if (overlapPredicate.apply(newTablet.toDataRange())) { + boolean isShared = numOverlapping > 1; DataFileValue ndfv = new DataFileValue((long) sizePerTablet, (long) entriesPerTablet, - dataFileValue.getTime()); + dataFileValue.getTime(), isShared); tabletsFiles.get(newTablet).put(file, ndfv); } }); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java index 3782572bacf..3b171be9291 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java @@ -28,6 +28,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import org.apache.accumulo.core.clientImpl.bulk.Bulk.FileInfo; @@ -95,7 +96,7 @@ void start(Path bulkDir, TableId tableId, FateId fateId, boolean setTime) throws } @Override - void load(List tablets, Files files) { + void load(List tablets, Files files, Set sharedFiles) { results.add(new LoadResult(tablets, files)); } @@ -184,7 +185,10 @@ private Map> runLoadFilesLoad(Map lo TabletsMetadataFactory tmf = (startRow) -> tabletMeta; FateId txid = FateId.from(FateInstanceType.USER, UUID.randomUUID()); - LoadFiles.loadFiles(cl, info, bulkDir, lmi, tmf, txid, 0); + Set sharedFiles = LoadFiles.computeSharedFiles(txid, lmi); + LoadMappingIterator lmi2 = PrepBulkImportTest.createLoadMappingIter(loadRanges); + + LoadFiles.loadFiles(cl, info, bulkDir, lmi2, tmf, txid, 0, sharedFiles); EasyMock.verify(env, ctx, tconf, bulkDir); List results = cl.getLoadResults(); assertEquals(loadRanges.size(), results.size()); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java index dad40a0cb13..784c9172eed 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java @@ -136,14 +136,14 @@ public void testFileParitioning() { var firstAndLastKeys = Map.of(file2, newFileInfo("m", "r"), file3, newFileInfo("g", "x"), file4, newFileInfo("s", "v")); - var ke1Expected = Map.of(file1, new DataFileValue(250, 25, 20), file2, - new DataFileValue(1000, 100, 50), file3, new DataFileValue(1000, 100)); - var ke2Expected = Map.of(file1, new DataFileValue(250, 25, 20), file2, - new DataFileValue(1000, 100, 50), file3, new DataFileValue(1000, 100)); - var ke3Expected = Map.of(file1, new DataFileValue(250, 25, 20), file3, - new DataFileValue(1000, 100), file4, new DataFileValue(4000, 400)); - var ke4Expected = - Map.of(file1, new DataFileValue(250, 25, 20), file3, new DataFileValue(1000, 100)); + var ke1Expected = Map.of(file1, new DataFileValue(250, 25, 20, true), file2, + new DataFileValue(1000, 100, 50, true), file3, new DataFileValue(1000, 100, true)); + var ke2Expected = Map.of(file1, new DataFileValue(250, 25, 20, true), file2, + new DataFileValue(1000, 100, 50, true), file3, new DataFileValue(1000, 100, true)); + var ke3Expected = Map.of(file1, new DataFileValue(250, 25, 20, true), file3, + new DataFileValue(1000, 100, true), file4, new DataFileValue(4000, 400, false)); + var ke4Expected = Map.of(file1, new DataFileValue(250, 25, 20, true), file3, + new DataFileValue(1000, 100, true)); var expected = Map.of(ke1, ke1Expected, ke2, ke2Expected, ke3, ke3Expected, ke4, ke4Expected); @@ -166,9 +166,9 @@ public void testFileParitioning() { // Test a tablet with no files going to it var tabletFiles2 = Map.of(file2, tabletFiles.get(file2), file4, tabletFiles.get(file4)); - ke1Expected = Map.of(file2, new DataFileValue(1000, 100, 50)); - ke2Expected = Map.of(file2, new DataFileValue(1000, 100, 50)); - ke3Expected = Map.of(file4, new DataFileValue(4000, 400)); + ke1Expected = Map.of(file2, new DataFileValue(1000, 100, 50, true)); + ke2Expected = Map.of(file2, new DataFileValue(1000, 100, 50, true)); + ke3Expected = Map.of(file4, new DataFileValue(4000, 400, false)); ke4Expected = Map.of(); expected = Map.of(ke1, ke1Expected, ke2, ke2Expected, ke3, ke3Expected, ke4, ke4Expected); @@ -210,10 +210,10 @@ public void testManyColumns() throws Exception { var flid2 = FateId.from(FateInstanceType.USER, UUID.randomUUID()); var loaded = Map.of(loaded1, flid1, loaded2, flid2); - var dfv1 = new DataFileValue(1000, 100, 20); - var dfv2 = new DataFileValue(500, 50, 20); - var dfv3 = new DataFileValue(4000, 400); - var dfv4 = new DataFileValue(2000, 200); + var dfv1 = new DataFileValue(1000, 100, 20, true); + var dfv2 = new DataFileValue(500, 50, 20, false); + var dfv3 = new DataFileValue(4000, 400, false); + var dfv4 = new DataFileValue(2000, 200, true); var tabletFiles = Map.of(file1, dfv1, file2, dfv2, file3, dfv3, file4, dfv4); @@ -312,7 +312,7 @@ public void testManyColumns() throws Exception { .andReturn(tablet1Mutator); EasyMock.expect(tablet1Mutator.putBulkFile(loaded2.getTabletFile(), flid2)) .andReturn(tablet1Mutator); - EasyMock.expect(tablet1Mutator.putFile(file1, new DataFileValue(333, 33, 20))) + EasyMock.expect(tablet1Mutator.putFile(file1, new DataFileValue(333, 33, 20, true))) .andReturn(tablet1Mutator); EasyMock.expect(tablet1Mutator.putFile(file2, dfv2)).andReturn(tablet1Mutator); // SplitInfo marked as system generated so should be set to ALWAYS (0 delay) @@ -345,10 +345,10 @@ public void testManyColumns() throws Exception { .andReturn(tablet2Mutator); EasyMock.expect(tablet2Mutator.putBulkFile(loaded2.getTabletFile(), flid2)) .andReturn(tablet2Mutator); - EasyMock.expect(tablet2Mutator.putFile(file1, new DataFileValue(333, 33, 20))) + EasyMock.expect(tablet2Mutator.putFile(file1, new DataFileValue(333, 33, 20, true))) .andReturn(tablet2Mutator); EasyMock.expect(tablet2Mutator.putFile(file3, dfv3)).andReturn(tablet2Mutator); - EasyMock.expect(tablet2Mutator.putFile(file4, new DataFileValue(1000, 100))) + EasyMock.expect(tablet2Mutator.putFile(file4, new DataFileValue(1000, 100, true))) .andReturn(tablet2Mutator); tablet2Mutator.submit(EasyMock.anyObject()); EasyMock.expectLastCall().once(); @@ -361,9 +361,9 @@ public void testManyColumns() throws Exception { EasyMock.expect(tablet3Mutator.requireAbsentLogs()).andReturn(tablet3Mutator); EasyMock.expect(tablet3Mutator.putPrevEndRow(newExtent3.prevEndRow())) .andReturn(tablet3Mutator); - EasyMock.expect(tablet3Mutator.putFile(file1, new DataFileValue(333, 33, 20))) + EasyMock.expect(tablet3Mutator.putFile(file1, new DataFileValue(333, 33, 20, true))) .andReturn(tablet3Mutator); - EasyMock.expect(tablet3Mutator.putFile(file4, new DataFileValue(1000, 100))) + EasyMock.expect(tablet3Mutator.putFile(file4, new DataFileValue(1000, 100, true))) .andReturn(tablet3Mutator); EasyMock.expect(tablet3Mutator.deleteFile(file2)).andReturn(tablet3Mutator); EasyMock.expect(tablet3Mutator.deleteFile(file3)).andReturn(tablet3Mutator); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index fa38dceeb8e..ea7f32578d4 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -408,7 +408,7 @@ DataFileValue minorCompact(InMemoryMap memTable, ReferencedTabletFile tmpDatafil Span span2 = TraceUtil.startSpan(this.getClass(), "minorCompact::bringOnline"); try (Scope scope = span2.makeCurrent()) { bringMinorCompactionOnline(tmpDatafile, newDatafile, - new DataFileValue(stats.getFileSize(), stats.getEntriesWritten()), commitSession, + new DataFileValue(stats.getFileSize(), stats.getEntriesWritten(), false), commitSession, flushId, mincReason); } catch (RuntimeException e) { final ServiceLock tserverLock = tabletServer.getLock(); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CompactionFileDeleteIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CompactionFileDeleteIT.java new file mode 100644 index 00000000000..09c465eced0 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionFileDeleteIT.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Duration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CompactionFileDeleteIT extends AccumuloClusterHarness { + private static final Logger log = LoggerFactory.getLogger(CompactionFileDeleteIT.class); + + @Override + protected Duration defaultTimeout() { + return Duration.ofMinutes(5); + } + + /** + * Test that files created by minor compaction are marked as not shared + */ + @Test + public void testMinorCompactionCreatesNonSharedFiles() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = getUniqueNames(1)[0]; + client.tableOperations().create(tableName); + + try (BatchWriter bw = client.createBatchWriter(tableName)) { + for (int i = 0; i < 1000; i++) { + Mutation m = new Mutation(String.format("row%04d", i)); + m.put("cf", "cq", "value" + i); + bw.addMutation(m); + } + } + + client.tableOperations().flush(tableName, null, null, true); + TableId tableId = TableId.of(client.tableOperations().tableIdMap().get(tableName)); + Map filesSharedStatus = getFilesSharedStatus(client, tableId); + + assertFalse(filesSharedStatus.isEmpty(), "Expected at least one file after flush"); + filesSharedStatus.forEach((file, isShared) -> { + assertFalse(isShared, "File created by minor compaction should not be marked as shared: " + + file.getFileName()); + log.info("Verified file {} is not shared", file.getFileName()); + }); + } + } + + /** + * Test that files are marked as shared during table clone + */ + @Test + public void testCloneMarksFilesAsShared() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + String[] names = getUniqueNames(2); + String sourceTable = names[0]; + String cloneTable = names[1]; + + client.tableOperations().create(sourceTable); + TableId sourceTableId = TableId.of(client.tableOperations().tableIdMap().get(sourceTable)); + + try (BatchWriter bw = client.createBatchWriter(sourceTable)) { + for (int i = 0; i < 100; i++) { + Mutation m = new Mutation(String.format("row%04d", i)); + m.put("cf", "cq", "value" + i); + bw.addMutation(m); + } + } + client.tableOperations().flush(sourceTable, null, null, true); + + Map sourceFilesBefore = getFilesSharedStatus(client, sourceTableId); + assertFalse(sourceFilesBefore.isEmpty(), "Expected files in source table"); + sourceFilesBefore + .forEach((file, isShared) -> assertFalse(isShared, "Source files should not be shared")); + + client.tableOperations().clone(sourceTable, cloneTable, true, null, null); + TableId cloneTableId = TableId.of(client.tableOperations().tableIdMap().get(cloneTable)); + + Map sourceFilesAfter = getFilesSharedStatus(client, sourceTableId); + sourceFilesAfter.forEach((file, isShared) -> { + assertTrue(isShared, + "Source file " + file.getFileName() + " should be marked as shared after clone"); + log.info("Verified source file {} is shared after clone", file.getFileName()); + }); + + Map cloneFiles = getFilesSharedStatus(client, cloneTableId); + cloneFiles.forEach((file, isShared) -> { + assertTrue(isShared, "Clone file " + file.getFileName() + " should be marked as shared"); + log.info("Verified clone file {} is shared", file.getFileName()); + }); + } + } + + /** + * Test that bulk import marks files as shared or not based on how many tablets they go to + */ + @Test + public void testBulkImportMarksFilesCorrectly() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = getUniqueNames(1)[0]; + NewTableConfiguration ntc = new NewTableConfiguration(); + SortedSet splits = new TreeSet<>(); + splits.add(new Text("row0500")); + ntc.withSplits(splits); + client.tableOperations().create(tableName, ntc); + + TableId tableId = TableId.of(client.tableOperations().tableIdMap().get(tableName)); + + try (BatchWriter bw = client.createBatchWriter(tableName)) { + for (int i = 0; i < 1000; i++) { + Mutation m = new Mutation(String.format("row%04d", i)); + m.put("cf", "cq", "value" + i); + bw.addMutation(m); + } + } + client.tableOperations().flush(tableName, null, null, true); + + Map> tabletFiles = getFilesPerTablet(client, tableId); + assertEquals(2, tabletFiles.size(), "Expected 2 tablets"); + + tabletFiles.forEach((tablet, files) -> { + files.forEach((file, isShared) -> { + long tabletCount = tabletFiles.values().stream() + .filter(tabletFileMap -> tabletFileMap.containsKey(file)).count(); + + if (tabletCount == 1) { + assertFalse(isShared, "File in single tablet should not be shared"); + } + }); + }); + } + } + + /** + * Test that compaction processes non-shared files correctly + */ + @Test + public void testCompactionDeletesNonSharedFiles() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = getUniqueNames(1)[0]; + client.tableOperations().create(tableName); + TableId tableId = TableId.of(client.tableOperations().tableIdMap().get(tableName)); + + for (int batch = 0; batch < 4; batch++) { + try (BatchWriter bw = client.createBatchWriter(tableName)) { + for (int i = 0; i < 100; i++) { + Mutation m = new Mutation(String.format("row%04d", i)); + m.put("cf", "cq", "value_" + batch + "_" + i); + bw.addMutation(m); + } + } + client.tableOperations().flush(tableName, null, null, true); + } + + Map before = getFilesSharedStatus(client, tableId); + int beforeCount = before.size(); + log.info("Files before compaction: {}", beforeCount); + assertTrue(beforeCount >= 3, "Expected at least 3 files"); + before.forEach((f, s) -> assertFalse(s, "Files should not be shared")); + + client.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + Map after = getFilesSharedStatus(client, tableId); + int afterCount = after.size(); + log.info("Files after compaction: {}", afterCount); + + assertTrue(afterCount <= beforeCount, "Expected same or fewer files after compaction"); + after.forEach((f, s) -> assertFalse(s, "Compaction output should not be shared")); + + } + } + + /** + * Get all files and their shared status for a table + */ + private Map getFilesSharedStatus(AccumuloClient client, + TableId tableId) { + Map result = new HashMap<>(); + + try (Scanner scanner = + client.createScanner(Ample.DataLevel.USER.metaTable(), Authorizations.EMPTY)) { + scanner.setRange(MetadataSchema.TabletsSection.getRange(tableId)); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); + + for (var entry : scanner) { + String row = entry.getKey().getRow().toString(); + String cq = entry.getKey().getColumnQualifier().toString(); + String value = entry.getValue().toString(); + + try { + StoredTabletFile file = new StoredTabletFile(cq); + DataFileValue dfv = new DataFileValue(value); + result.put(file, dfv.isShared()); + + log.trace("File: {}, Size: {}, Entries: {}, Shared: {}", file.getFileName(), + dfv.getSize(), dfv.getNumEntries(), dfv.isShared()); + } catch (Exception e) { + log.warn("Error parsing file entry", e); + } + } + } catch (Exception e) { + log.error("Error reading files for table {}", tableId, e); + } + + return result; + } + + /** + * Get files per tablet with their shared status + */ + private Map> getFilesPerTablet(AccumuloClient client, + TableId tableId) { + Map> result = new HashMap<>(); + + try (Scanner scanner = + client.createScanner(Ample.DataLevel.USER.metaTable(), Authorizations.EMPTY)) { + + scanner.setRange(MetadataSchema.TabletsSection.getRange(tableId)); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.TabletColumnFamily.NAME); + + String currentTablet = null; + Map currentFiles = new HashMap<>(); + + for (var entry : scanner) { + String row = entry.getKey().getRow().toString(); + String cf = entry.getKey().getColumnFamily().toString(); + String cq = entry.getKey().getColumnQualifier().toString(); + String value = entry.getValue().toString(); + + if (currentTablet == null || !currentTablet.equals(row)) { + if (currentTablet != null && !currentFiles.isEmpty()) { + result.put(currentTablet, currentFiles); + } + currentTablet = row; + currentFiles = new HashMap<>(); + } + + if (cf.equals(MetadataSchema.TabletsSection.DataFileColumnFamily.STR_NAME)) { + try { + StoredTabletFile file = new StoredTabletFile(cq); + DataFileValue dfv = new DataFileValue(value); + currentFiles.put(file, dfv.isShared()); + } catch (Exception e) { + log.warn("Error parsing file entry for tablet {}", row, e); + } + } + } + + if (currentTablet != null && !currentFiles.isEmpty()) { + result.put(currentTablet, currentFiles); + } + } catch (Exception e) { + log.error("Error reading files per tablet for table {}", tableId, e); + } + + return result; + } + + /** + * Get GC file candidates for a table + */ + private Set getGcFileCandidates(AccumuloClient client, TableId tableId) { + Set candidates = new HashSet<>(); + try (Scanner scanner = + client.createScanner(Ample.DataLevel.USER.metaTable(), Authorizations.EMPTY)) { + + scanner.setRange(MetadataSchema.DeletesSection.getRange()); + + scanner.forEach(entry -> { + try { + String row = entry.getKey().getRow().toString(); + String decodedPath = MetadataSchema.DeletesSection.decodeRow(row); + + String cq = entry.getKey().getColumnQualifier().toString(); + String value = entry.getValue().toString(); + + if (cq.equals(tableId.canonical()) || value.equals(tableId.canonical())) { + candidates.add(decodedPath); + log.debug("Found GC delete marker for table {}: {}", tableId, decodedPath); + } + } catch (Exception e) { + log.warn("Error parsing GC delete marker entry", e); + } + }); + } catch (Exception e) { + log.warn("Error reading GC candidates", e); + } + return candidates; + } +}