Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -47,7 +45,6 @@ class RecordWriter {
private final Table table;
private final String absoluteFilename;
private final FileFormat fileFormat;
private @Nullable FileIO io;

RecordWriter(
Catalog catalog, IcebergDestination destination, String filename, PartitionKey partitionKey)
Expand All @@ -74,11 +71,9 @@ class RecordWriter {
}
OutputFile outputFile;
EncryptionKeyMetadata keyMetadata;
// Keep FileIO open for the lifetime of this writer to avoid
// premature shutdown of underlying client pools (e.g., S3),
// which manifests as "Connection pool shut down" (Issue #36438).
this.io = table.io();
OutputFile tmpFile = io.newOutputFile(absoluteFilename);
// table.io() may return a shared FileIO instance.
// FileIO lifecycle is managed by RecordWriterManager.close().
OutputFile tmpFile = table.io().newOutputFile(absoluteFilename);
EncryptedOutputFile encryptedOutputFile = table.encryption().encrypt(tmpFile);
outputFile = encryptedOutputFile.encryptingOutputFile();
keyMetadata = encryptedOutputFile.keyMetadata();
Expand Down Expand Up @@ -135,20 +130,6 @@ public void close() throws IOException {
fileFormat, table.name(), absoluteFilename),
e);
} finally {
// Always attempt to close FileIO and decrement metrics
if (io != null) {
try {
io.close();
} catch (Exception ioCloseError) {
if (closeError != null) {
closeError.addSuppressed(ioCloseError);
} else {
closeError = new IOException("Failed to close FileIO", ioCloseError);
}
} finally {
io = null;
}
}
activeIcebergWriters.dec();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.schemas.Schema;
Expand Down Expand Up @@ -58,6 +60,7 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.transforms.Transforms;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
Expand Down Expand Up @@ -403,33 +406,50 @@ public boolean write(WindowedValue<IcebergDestination> icebergDestination, Row r
*/
@Override
public void close() throws IOException {
for (Map.Entry<WindowedValue<IcebergDestination>, DestinationState>
windowedDestinationAndState : destinations.entrySet()) {
DestinationState state = windowedDestinationAndState.getValue();
try {
for (Map.Entry<WindowedValue<IcebergDestination>, DestinationState>
windowedDestinationAndState : destinations.entrySet()) {
DestinationState state = windowedDestinationAndState.getValue();

// removing writers from the state's cache will trigger the logic to collect each writer's
// data file.
state.writers.invalidateAll();
// first check for any exceptions swallowed by the cache
if (!state.exceptions.isEmpty()) {
IllegalStateException exception =
new IllegalStateException(
String.format("Encountered %s failed writer(s).", state.exceptions.size()));
for (Exception e : state.exceptions) {
exception.addSuppressed(e);
// removing writers from the state's cache will trigger the logic to collect each writer's
// data file.
state.writers.invalidateAll();
// first check for any exceptions swallowed by the cache
if (!state.exceptions.isEmpty()) {
IllegalStateException exception =
new IllegalStateException(
String.format("Encountered %s failed writer(s).", state.exceptions.size()));
for (Exception e : state.exceptions) {
exception.addSuppressed(e);
}
throw exception;
}
throw exception;
}

if (state.dataFiles.isEmpty()) {
continue;
}
if (state.dataFiles.isEmpty()) {
continue;
}

totalSerializableDataFiles.put(
windowedDestinationAndState.getKey(), new ArrayList<>(state.dataFiles));
state.dataFiles.clear();
totalSerializableDataFiles.put(
windowedDestinationAndState.getKey(), new ArrayList<>(state.dataFiles));
state.dataFiles.clear();
}
} finally {
// Close unique FileIO instances now that all writers are done.
// table.io() may return a shared FileIO; we deduplicate by identity
// so we close each underlying connection pool exactly once.
Set<FileIO> closedIOs = new HashSet<>();
for (DestinationState state : destinations.values()) {
FileIO io = state.table.io();
if (io != null && closedIOs.add(io)) {
try {
io.close();
} catch (Exception e) {
LOG.warn("Failed to close FileIO for table '{}'", state.table.name(), e);
}
}
}
destinations.clear();
}
destinations.clear();
checkArgument(
openWriters == 0,
"Expected all data writers to be closed, but found %s data writer(s) still open",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
Expand Down Expand Up @@ -957,7 +958,7 @@ public void testDefaultMetrics() throws IOException {
}

@Test
public void testRecordWriterKeepsFileIOOpenUntilClose() throws IOException {
public void testRecordWriterDoesNotCloseSharedFileIO() throws IOException {
TableIdentifier tableId =
TableIdentifier.of(
"default",
Expand All @@ -980,7 +981,104 @@ public void testRecordWriterKeepsFileIOOpenUntilClose() throws IOException {
writer.write(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row));
writer.close();

assertTrue("FileIO should be closed after writer close", trackingFileIO.closed);
// RecordWriter must NOT close FileIO — it may be a shared instance.
assertFalse("RecordWriter.close() must not close the shared FileIO", trackingFileIO.closed);
}

/**
* Verifies that when multiple writers share the same FileIO, closing any writer does not close
* the shared FileIO — that is the responsibility of RecordWriterManager.close().
*/
@Test
public void testMultipleWritersSharingFileIOSurviveBatchClose() throws IOException {
// Create two tables that share the same FileIO (simulating dynamic destinations
// backed by the same catalog)
TableIdentifier tableId1 =
TableIdentifier.of(
"default",
"table_batch_close_a_" + UUID.randomUUID().toString().replace("-", "").substring(0, 6));
TableIdentifier tableId2 =
TableIdentifier.of(
"default",
"table_batch_close_b_" + UUID.randomUUID().toString().replace("-", "").substring(0, 6));
Table table1 = warehouse.createTable(tableId1, ICEBERG_SCHEMA);
Table table2 = warehouse.createTable(tableId2, ICEBERG_SCHEMA);

// Both tables share the same CloseTrackingFileIO — mirrors how some catalogs
// return a shared FileIO instance across tables
CloseTrackingFileIO sharedFileIO = new CloseTrackingFileIO(table1.io());
Table spyTable1 = Mockito.spy(table1);
Table spyTable2 = Mockito.spy(table2);
Mockito.doReturn(sharedFileIO).when(spyTable1).io();
Mockito.doReturn(sharedFileIO).when(spyTable2).io();

PartitionKey pk1 = new PartitionKey(spyTable1.spec(), spyTable1.schema());
PartitionKey pk2 = new PartitionKey(spyTable2.spec(), spyTable2.schema());

RecordWriter writer1 = new RecordWriter(spyTable1, FileFormat.PARQUET, "file1.parquet", pk1);
RecordWriter writer2 = new RecordWriter(spyTable2, FileFormat.PARQUET, "file2.parquet", pk2);

Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
Record record = IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row);

writer1.write(record);
writer2.write(record);

writer1.close();
assertFalse("FileIO must remain open between batch writer closes", sharedFileIO.closed);

writer2.close();
assertFalse("FileIO must remain open after all writers close", sharedFileIO.closed);

// Both writers produced valid data files
assertNotNull(writer1.getDataFile());
assertNotNull(writer2.getDataFile());
}

/**
* Verifies that RecordWriterManager.close() flushes data files from multiple destinations and
* closes the shared FileIO.
*/
@Test
public void testRecordWriterManagerClosesSharedFileIOAfterFlush() throws IOException {
String tableName1 =
"table_mgr_io_a_" + UUID.randomUUID().toString().replace("-", "").substring(0, 6);
String tableName2 =
"table_mgr_io_b_" + UUID.randomUUID().toString().replace("-", "").substring(0, 6);
TableIdentifier tableId1 = TableIdentifier.of("default", tableName1);
TableIdentifier tableId2 = TableIdentifier.of("default", tableName2);

Table realTable1 = warehouse.createTable(tableId1, ICEBERG_SCHEMA);
Table realTable2 = warehouse.createTable(tableId2, ICEBERG_SCHEMA);

CloseTrackingFileIO sharedTrackingIO = new CloseTrackingFileIO(realTable1.io());
Table spyTable1 = Mockito.spy(realTable1);
Table spyTable2 = Mockito.spy(realTable2);
Mockito.doReturn(sharedTrackingIO).when(spyTable1).io();
Mockito.doReturn(sharedTrackingIO).when(spyTable2).io();

Catalog spyCatalog = Mockito.spy(catalog);
Mockito.doReturn(spyTable1).when(spyCatalog).loadTable(tableId1);
Mockito.doReturn(spyTable2).when(spyCatalog).loadTable(tableId2);

WindowedValue<IcebergDestination> dest1 = getWindowedDestination(tableName1, null);
WindowedValue<IcebergDestination> dest2 = getWindowedDestination(tableName2, null);

RecordWriterManager writerManager =
new RecordWriterManager(spyCatalog, "test_file_name", 1000, 3);

Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
assertTrue(writerManager.write(dest1, row));
assertTrue(writerManager.write(dest2, row));
assertEquals(2, writerManager.openWriters);

writerManager.close();

Map<WindowedValue<IcebergDestination>, List<SerializableDataFile>> dataFiles =
writerManager.getSerializableDataFiles();
assertTrue("Should have data files for dest1", dataFiles.containsKey(dest1));
assertTrue("Should have data files for dest2", dataFiles.containsKey(dest2));
assertTrue("Shared FileIO should be closed", sharedTrackingIO.closed);
}

private static final class CloseTrackingFileIO implements FileIO {
Expand Down
Loading