Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,11 @@ ConditionalTabletMutator requireSame(TabletMetadata tabletMetadata, ColumnType t
*/
ConditionalTabletMutator requireFiles(Set<StoredTabletFile> files);

/*
* Require that a tablet contain all the files in the set with the exact same DataFileValue
*/
ConditionalTabletMutator requireFiles(Map<StoredTabletFile,DataFileValue> files);

/**
* Require that a tablet have less than or equals the specified number of files.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,29 +32,54 @@ public class DataFileValue {
private final long size;
private final long numEntries;
private long time = -1;
private boolean shared;

public DataFileValue(long size, long numEntries, long time, boolean shared) {
this.size = size;
this.numEntries = numEntries;
this.time = time;
this.shared = shared;
}

public DataFileValue(long size, long numEntries, boolean shared) {
this.size = size;
this.numEntries = numEntries;
this.time = -1;
this.shared = shared;
}

public DataFileValue(long size, long numEntries, long time) {
this.size = size;
this.numEntries = numEntries;
this.time = time;
this.shared = false;
}

public DataFileValue(long size, long numEntries) {
this.size = size;
this.numEntries = numEntries;
this.time = -1;
this.shared = false;
}

public DataFileValue(String encodedDFV) {
String[] ba = encodedDFV.split(",");

size = Long.parseLong(ba[0]);
numEntries = Long.parseLong(ba[1]);
time = -1;
shared = false;

if (ba.length == 3) {
time = Long.parseLong(ba[2]);
} else {
time = -1;
// Could be old format with time, or new format with shared
try {
time = Long.parseLong(ba[2]);
} catch (NumberFormatException e) {
shared = Boolean.parseBoolean(ba[2]);
}
} else if (ba.length == 4) {
shared = Boolean.parseBoolean(ba[2]);
time = Long.parseLong(ba[3]);
}
}

Expand All @@ -78,15 +103,19 @@ public long getTime() {
return time;
}

public boolean isShared() {
return shared;
}

public byte[] encode() {
return encodeAsString().getBytes(UTF_8);
}

public String encodeAsString() {
if (time >= 0) {
return ("" + size + "," + numEntries + "," + time);
return ("" + size + "," + numEntries + "," + shared + "," + time);
}
return ("" + size + "," + numEntries);
return ("" + size + "," + numEntries + "," + shared);
}

public Value encodeAsValue() {
Expand All @@ -97,20 +126,21 @@ public Value encodeAsValue() {
public boolean equals(Object o) {
if (o instanceof DataFileValue odfv) {

return size == odfv.size && numEntries == odfv.numEntries && time == odfv.time;
return size == odfv.size && numEntries == odfv.numEntries && time == odfv.time
&& shared == odfv.shared;
}

return false;
}

@Override
public int hashCode() {
return Long.valueOf(size + numEntries).hashCode();
return Long.valueOf(size + numEntries + (shared ? 1 : 0)).hashCode();
}

@Override
public String toString() {
return size + " " + numEntries;
return size + " " + numEntries + " " + shared;
}

public void setTime(long time) {
Expand All @@ -120,6 +150,10 @@ public void setTime(long time) {
this.time = time;
}

public void setShared(boolean shared) {
this.shared = shared;
}

/**
* @return true if {@link #wrapFileIterator} would wrap a given iterator, false otherwise.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.encodePrevEndRow;

import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiConsumer;
Expand All @@ -52,6 +53,7 @@
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.Ample.ConditionalTabletMutator;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CompactedColumnFamily;
Expand Down Expand Up @@ -363,6 +365,17 @@ public ConditionalTabletMutator requireFiles(Set<StoredTabletFile> files) {
return this;
}

@Override
public ConditionalTabletMutator requireFiles(Map<StoredTabletFile,DataFileValue> files) {
Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate.");
Condition c = SetEncodingIterator.createConditionWithVal(files.entrySet(),
entry -> new Pair<>(entry.getKey().getMetadata().getBytes(UTF_8),
entry.getValue().encode()),
DataFileColumnFamily.NAME);
mutation.addCondition(c);
return this;
}

@Override
public ConditionalTabletMutator requireLessOrEqualsFiles(long limit) {
Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.TreeMap;
Expand Down Expand Up @@ -179,6 +180,48 @@ public static void deleteTable(TableId tableId, boolean insertDeletes, ServerCon
return new Pair<>(result, sizes);
}

private static void markTabletFilesAsShared(ServerContext ctx, TabletMetadata srcTablet) {
if (srcTablet.getFiles().isEmpty()) {
return;
}

Map<StoredTabletFile,DataFileValue> currentFilesMap = srcTablet.getFilesMap();

// Skip if all files are already shared
boolean anyNonShared = currentFilesMap.values().stream().anyMatch(dfv -> !dfv.isShared());
if (!anyNonShared) {
return;
}

try (var conditionalMutator = ctx.getAmple().conditionallyMutateTablets()) {
var tabletMutator = conditionalMutator.mutateTablet(srcTablet.getExtent())
.requireAbsentOperation().requireFiles(currentFilesMap);

// Write updated DataFileValues with shared=true for any non-shared files
for (var entry : currentFilesMap.entrySet()) {
StoredTabletFile file = entry.getKey();
DataFileValue dfv = entry.getValue();
if (!dfv.isShared()) {
DataFileValue sharedDfv = new DataFileValue(dfv.getSize(), dfv.getNumEntries(),
dfv.isTimeSet() ? dfv.getTime() : -1, true);
tabletMutator.putFile(file, sharedDfv);
log.debug("Marking file {} as shared in source tablet {} (conditional)",
file.getFileName(), srcTablet.getExtent());
}
}

tabletMutator.submit(tm -> false, () -> "mark source files as shared for clone");

var result = conditionalMutator.process().get(srcTablet.getExtent());
if (result.getStatus() != Ample.ConditionalResult.Status.ACCEPTED) {
log.debug(
"Conditional mutation to mark files as shared was rejected for tablet {} — "
+ "tablet changed concurrently, clone will retry for this tablet",
srcTablet.getExtent());
}
}
}

private static Mutation createCloneMutation(TableId srcTableId, TableId tableId,
Iterable<Entry<Key,Value>> tablet) {

Expand All @@ -191,7 +234,12 @@ private static Mutation createCloneMutation(TableId srcTableId, TableId tableId,
if (!cf.startsWith("../") && !cf.contains(":")) {
cf = "../" + srcTableId + entry.getKey().getColumnQualifier();
}
m.put(entry.getKey().getColumnFamily(), new Text(cf), entry.getValue());

DataFileValue ogVal = new DataFileValue(entry.getValue().get());
DataFileValue newSharedVal = new DataFileValue(ogVal.getSize(), ogVal.getNumEntries(),
ogVal.isTimeSet() ? ogVal.getTime() : -1, true);

m.put(entry.getKey().getColumnFamily(), new Text(cf), newSharedVal.encodeAsValue());
} else if (entry.getKey().getColumnFamily().equals(CurrentLocationColumnFamily.NAME)) {
m.put(LastLocationColumnFamily.NAME, entry.getKey().getColumnQualifier(), entry.getValue());
} else if (entry.getKey().getColumnFamily().equals(LastLocationColumnFamily.NAME)) {
Expand Down Expand Up @@ -350,6 +398,11 @@ public static void cloneTable(ServerContext context, TableId srcTableId, TableId
while (true) {

try {
try (TabletsMetadata tabletsMetadata = createCloneScanner(null, srcTableId, context)) {
for (TabletMetadata tablet : tabletsMetadata) {
markTabletFilesAsShared(context, tablet);
}
}
initializeClone(null, srcTableId, tableId, context, bw);

// the following loop looks changes in the file that occurred during the copy.. if files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -67,6 +68,10 @@ public CommitCompaction(CompactionCommitData commitData, String newDatafile) {
this.newDatafile = newDatafile;
}

private record CompactionFileResult(TabletMetadata tabletMetadata,
ArrayList<StoredTabletFile> filesToDeleteViaGc) {
}

@Override
public Repo<FateEnv> call(FateId fateId, FateEnv env) throws Exception {
var ecid = ExternalCompactionId.of(commitData.ecid);
Expand All @@ -79,25 +84,38 @@ public Repo<FateEnv> call(FateId fateId, FateEnv env) throws Exception {
// process died and now its running again. In this case commit should do nothing, but its
// important to still carry on with the rest of the steps after commit. This code ignores a that
// fact that a commit may not have happened in the current call and continues for this reason.
TabletMetadata tabletMetadata = commitCompaction(env.getContext(), ecid, newFile);
CompactionFileResult fileResult = commitCompaction(env.getContext(), ecid, newFile);

String loc = null;
if (tabletMetadata != null && tabletMetadata.getLocation() != null) {
loc = tabletMetadata.getLocation().getHostPortSession();
if (fileResult != null && fileResult.tabletMetadata.getLocation() != null) {
loc = fileResult.tabletMetadata.getLocation().getHostPortSession();
}

// This will causes the tablet to be reexamined to see if it needs any more compactions.
var extent = KeyExtent.fromThrift(commitData.textent);
env.getEventPublisher().event(extent, "Compaction completed %s", extent);

return new PutGcCandidates(commitData, loc);
return new PutGcCandidates(commitData, loc, fileResult.filesToDeleteViaGc());
}

KeyExtent getExtent() {
return KeyExtent.fromThrift(commitData.textent);
}

private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId ecid,
private ArrayList<StoredTabletFile> computeNonSharedFiles(TabletMetadata tablet,
CompactionMetadata ecm) {
ArrayList<StoredTabletFile> nonSharedFiles = new ArrayList<>();
var tabletFilesMap = tablet.getFilesMap();
for (StoredTabletFile file : ecm.getJobFiles()) {
DataFileValue dfv = tabletFilesMap.get(file);
if (dfv == null || !dfv.isShared()) {
nonSharedFiles.add(file);
}
}
return nonSharedFiles;
}

private CompactionFileResult commitCompaction(ServerContext ctx, ExternalCompactionId ecid,
Optional<ReferencedTabletFile> newDatafile) {

var tablet =
Expand All @@ -107,6 +125,8 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(10)).backOffFactor(1.5)
.logInterval(Duration.ofMinutes(3)).createRetry();

ArrayList<StoredTabletFile> filesToDeleteViaGc = new ArrayList<>();

while (canCommitCompaction(ecid, tablet)) {
CompactionMetadata ecm = tablet.getExternalCompactions().get(ecid);

Expand All @@ -119,7 +139,7 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId
try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
var tabletMutator = tabletsMutator.mutateTablet(getExtent()).requireAbsentOperation()
.requireCompaction(ecid).requireSame(tablet, LOCATION)
.requireFiles(commitData.getJobFiles());
.requireFiles(tablet.getFilesMap());

if (ecm.getKind() == CompactionKind.USER) {
tabletMutator.requireSame(tablet, SELECTED, COMPACTED);
Expand All @@ -140,6 +160,7 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId

var result = tabletsMutator.process().get(getExtent());
if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) {
filesToDeleteViaGc = computeNonSharedFiles(tablet, ecm);
// Compaction was successfully committed to the tablet so log it
TabletLogger.compacted(getExtent(), ecid, commitData.kind, commitData.getJobFiles(),
newDatafile);
Expand All @@ -156,7 +177,7 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId
}
}

return tablet;
return new CompactionFileResult(tablet, filesToDeleteViaGc);
}

private void updateTabletForCompaction(TCompactionStats stats, ExternalCompactionId ecid,
Expand Down Expand Up @@ -212,11 +233,14 @@ private void updateTabletForCompaction(TCompactionStats stats, ExternalCompactio
ecm.getJobFiles().forEach(tabletMutator::putScan);
}
ecm.getJobFiles().forEach(tabletMutator::deleteFile);

tabletMutator.deleteExternalCompaction(ecid);

if (newDatafile.isPresent()) {
tabletMutator.putFile(newDatafile.orElseThrow(),
new DataFileValue(stats.getFileSize(), stats.getEntriesWritten()));
// Mark new compaction files as not shared.
DataFileValue newFileValue =
new DataFileValue(stats.getFileSize(), stats.getEntriesWritten(), false);
tabletMutator.putFile(newDatafile.orElseThrow(), newFileValue);
}
}

Expand Down
Loading