Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions core/src/main/java/org/apache/iceberg/DVUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import org.apache.iceberg.deletes.BaseDVFileWriter;
import org.apache.iceberg.deletes.DVFileWriter;
import org.apache.iceberg.deletes.Deletes;
import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptingFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.IOUtil;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -188,8 +190,12 @@ private static List<DeleteFile> writeDVs(
FileIO fileIO,
String dvOutputLocation,
Map<String, Pair<PartitionSpec, StructLike>> partitions) {
OutputFile dvOutputFile = fileIO.newOutputFile(dvOutputLocation);
try (DVFileWriter dvFileWriter = new BaseDVFileWriter(() -> dvOutputFile, path -> null)) {
EncryptedOutputFile dvOutputFile =
fileIO instanceof EncryptingFileIO encryptingFileIO
? encryptingFileIO.newEncryptingOutputFile(dvOutputLocation)
: EncryptedFiles.plainAsEncryptedOutput(fileIO.newOutputFile(dvOutputLocation));

try (DVFileWriter dvFileWriter = Deletes.writeDVs(dvOutputFile, path -> null)) {
for (Map.Entry<String, PositionDeleteIndex> entry : mergedIndexByFile.entrySet()) {
String referencedLocation = entry.getKey();
PositionDeleteIndex mergedPositions = entry.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.deletes;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
Expand All @@ -30,6 +31,10 @@
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.encryption.NativeEncryptionKeyMetadata;
import org.apache.iceberg.io.DeleteWriteResult;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.OutputFileFactory;
Expand All @@ -52,20 +57,26 @@ public class BaseDVFileWriter implements DVFileWriter {
private static final String REFERENCED_DATA_FILE_KEY = "referenced-data-file";
private static final String CARDINALITY_KEY = "cardinality";

private final Supplier<OutputFile> dvOutputFile;
private final Supplier<EncryptedOutputFile> dvOutputFile;
private final Function<String, PositionDeleteIndex> loadPreviousDeletes;
private final Map<String, Deletes> deletesByPath = Maps.newHashMap();
private final Map<String, BlobMetadata> blobsByPath = Maps.newHashMap();
private DeleteWriteResult result = null;

public BaseDVFileWriter(
OutputFileFactory fileFactory, Function<String, PositionDeleteIndex> loadPreviousDeletes) {
this(() -> fileFactory.newOutputFile().encryptingOutputFile(), loadPreviousDeletes);
this(loadPreviousDeletes, fileFactory::newOutputFile);
}

public BaseDVFileWriter(
Supplier<OutputFile> dvOutputFile,
Function<String, PositionDeleteIndex> loadPreviousDeletes) {
this(loadPreviousDeletes, () -> EncryptedFiles.plainAsEncryptedOutput(dvOutputFile.get()));
}

BaseDVFileWriter(
Function<String, PositionDeleteIndex> loadPreviousDeletes,
Supplier<EncryptedOutputFile> dvOutputFile) {
this.dvOutputFile = dvOutputFile;
this.loadPreviousDeletes = loadPreviousDeletes;
}
Expand Down Expand Up @@ -108,7 +119,12 @@ public void close() throws IOException {
return;
}

PuffinWriter writer = newWriter();
EncryptedOutputFile outputFile = dvOutputFile.get();
EncryptionKeyMetadata keyMetadata = outputFile.keyMetadata();
PuffinWriter writer =
Puffin.write(outputFile.encryptingOutputFile())
.createdBy(IcebergBuild.fullVersion())
.build();

try (PuffinWriter closeableWriter = writer) {
for (Deletes deletes : deletesByPath.values()) {
Expand All @@ -134,15 +150,16 @@ public void close() throws IOException {
long puffinFileSize = writer.fileSize();

for (String path : deletesByPath.keySet()) {
DeleteFile dv = createDV(puffinPath, puffinFileSize, path);
DeleteFile dv = createDV(puffinPath, puffinFileSize, path, keyMetadata);
dvs.add(dv);
}

this.result = new DeleteWriteResult(dvs, referencedDataFiles, rewrittenDeleteFiles);
}
}

private DeleteFile createDV(String path, long size, String referencedDataFile) {
private DeleteFile createDV(
String path, long size, String referencedDataFile, EncryptionKeyMetadata keyMetadata) {
Deletes deletes = deletesByPath.get(referencedDataFile);
BlobMetadata blobMetadata = blobsByPath.get(referencedDataFile);
return FileMetadata.deleteFileBuilder(deletes.spec())
Expand All @@ -151,6 +168,7 @@ private DeleteFile createDV(String path, long size, String referencedDataFile) {
.withPath(path)
.withPartition(deletes.partition())
.withFileSizeInBytes(size)
.withEncryptionKeyMetadata(encryptionKeyMetadata(size, keyMetadata))
.withReferencedDataFile(referencedDataFile)
.withContentOffset(blobMetadata.offset())
.withContentSizeInBytes(blobMetadata.length())
Expand All @@ -165,9 +183,13 @@ private void write(PuffinWriter writer, Deletes deletes) {
blobsByPath.put(path, blobMetadata);
}

private PuffinWriter newWriter() {
OutputFile outputFile = dvOutputFile.get();
return Puffin.write(outputFile).createdBy(IcebergBuild.fullVersion()).build();
private ByteBuffer encryptionKeyMetadata(
long fileSizeInBytes, EncryptionKeyMetadata keyMetadata) {
if (keyMetadata instanceof NativeEncryptionKeyMetadata nativeKeyMetadata) {
return nativeKeyMetadata.copyWithLength(fileSizeInBytes).buffer();
}

return keyMetadata.buffer();
}

private Blob toBlob(PositionDeleteIndex positions, String path) {
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/org/apache/iceberg/deletes/Deletes.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
Expand All @@ -52,6 +53,11 @@ public class Deletes {

private Deletes() {}

public static DVFileWriter writeDVs(
EncryptedOutputFile outputFile, Function<String, PositionDeleteIndex> loadPreviousDeletes) {
return new BaseDVFileWriter(loadPreviousDeletes, () -> outputFile);
}

public static <T> CloseableIterable<T> filter(
CloseableIterable<T> rows, Function<T, StructLike> rowToDeleteKey, StructLikeSet deleteSet) {
if (deleteSet.isEmpty()) {
Expand Down
73 changes: 70 additions & 3 deletions core/src/test/java/org/apache/iceberg/TestRowDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
import org.apache.iceberg.deletes.DVFileWriter;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.encryption.EncryptingFileIO;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.encryption.EncryptionTestHelpers;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
Expand Down Expand Up @@ -1958,6 +1961,35 @@ public void testDuplicateDVsAreMerged() throws IOException {
assertDVHasDeletedPositions(mergedDV, LongStream.range(0, 8).boxed()::iterator);
}

@TestTemplate
public void testDuplicateDVsAreMergedWithEncryption() throws IOException {
assumeThat(formatVersion).isGreaterThanOrEqualTo(3);

TestTables.TestTable encryptedTable = createEncryptedTable();
DataFile dataFile = newDataFile("data_bucket=0");
commit(encryptedTable, encryptedTable.newRowDelta().addRows(dataFile), branch);

OutputFileFactory fileFactory =
OutputFileFactory.builderFor(encryptedTable, 1, 1).format(FileFormat.PUFFIN).build();

DeleteFile deleteFile1 = dvWithPositions(encryptedTable, dataFile, fileFactory, 0, 2);
DeleteFile deleteFile2 = dvWithPositions(encryptedTable, dataFile, fileFactory, 2, 4);
commit(
encryptedTable,
encryptedTable.newRowDelta().addDeletes(deleteFile1).addDeletes(deleteFile2),
branch);

Iterable<DeleteFile> addedDeleteFiles =
SnapshotChanges.builderFor(encryptedTable)
.snapshot(latestSnapshot(encryptedTable, branch))
.build()
.addedDeleteFiles();
DeleteFile mergedDV = Iterables.getOnlyElement(addedDeleteFiles);

assertThat(mergedDV.keyMetadata()).isNotNull();
assertDVHasDeletedPositions(encryptedTable, mergedDV, LongStream.range(0, 4).boxed()::iterator);
}

@TestTemplate
public void testDuplicateDVsMergedMultipleSpecs() throws IOException {
assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
Expand Down Expand Up @@ -2567,18 +2599,33 @@ public void testCannotMergeDVsMismatchedPartitionTuples() {
private DeleteFile dvWithPositions(
DataFile dataFile, OutputFileFactory fileFactory, int fromInclusive, int toExclusive)
throws IOException {
return dvWithPositions(table, dataFile, fileFactory, fromInclusive, toExclusive);
}

private DeleteFile dvWithPositions(
Table targetTable,
DataFile dataFile,
OutputFileFactory fileFactory,
int fromInclusive,
int toExclusive)
throws IOException {

List<PositionDelete<?>> deletes = Lists.newArrayList();
for (int i = fromInclusive; i < toExclusive; i++) {
deletes.add(PositionDelete.create().set(dataFile.location(), i));
}

return writeDV(deletes, dataFile.specId(), dataFile.partition(), fileFactory);
return writeDV(targetTable, deletes, dataFile.specId(), dataFile.partition(), fileFactory);
}

private void assertDVHasDeletedPositions(DeleteFile dv, Iterable<Long> positions) {
assertDVHasDeletedPositions(table, dv, positions);
}

private void assertDVHasDeletedPositions(
Table targetTable, DeleteFile dv, Iterable<Long> positions) {
assertThat(dv).isNotNull();
PositionDeleteIndex index = DVUtil.readDV(dv, table.io());
PositionDeleteIndex index = DVUtil.readDV(dv, targetTable.io());
assertThat(positions)
.allSatisfy(
pos ->
Expand All @@ -2588,6 +2635,7 @@ private void assertDVHasDeletedPositions(DeleteFile dv, Iterable<Long> positions
}

private DeleteFile writeDV(
Table targetTable,
List<PositionDelete<?>> deletes,
int specId,
StructLike partition,
Expand All @@ -2598,10 +2646,29 @@ private DeleteFile writeDV(
try (DVFileWriter closeableWriter = writer) {
for (PositionDelete<?> delete : deletes) {
closeableWriter.delete(
delete.path().toString(), delete.pos(), table.specs().get(specId), partition);
delete.path().toString(), delete.pos(), targetTable.specs().get(specId), partition);
}
}

return Iterables.getOnlyElement(writer.result().deleteFiles());
}

private TestTables.TestTable createEncryptedTable() {
EncryptionManager encryptionManager = EncryptionTestHelpers.createEncryptionManager();
String tableName = "encrypted-" + branch;
java.io.File encryptedTableDir = temp.resolve(tableName).toFile();
TestTables.TestTableOperations ops =
new TestTables.TestTableOperations(
tableName,
encryptedTableDir,
EncryptingFileIO.combine(new TestTables.LocalFileIO(), encryptionManager)) {
@Override
public EncryptionManager encryption() {
return encryptionManager;
}
};

return TestTables.create(
encryptedTableDir, tableName, SCHEMA, SPEC, SortOrder.unsorted(), formatVersion, ops);
}
}
Loading