diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java index d233b0ac05b5..82251c00e72e 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java @@ -31,10 +31,8 @@ import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.encryption.EncryptionKeyMetadata; import org.apache.iceberg.io.DataWriter; -import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.parquet.Parquet; -import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +45,6 @@ class RecordWriter { private final Table table; private final String absoluteFilename; private final FileFormat fileFormat; - private @Nullable FileIO io; RecordWriter( Catalog catalog, IcebergDestination destination, String filename, PartitionKey partitionKey) @@ -74,11 +71,9 @@ class RecordWriter { } OutputFile outputFile; EncryptionKeyMetadata keyMetadata; - // Keep FileIO open for the lifetime of this writer to avoid - // premature shutdown of underlying client pools (e.g., S3), - // which manifests as "Connection pool shut down" (Issue #36438). - this.io = table.io(); - OutputFile tmpFile = io.newOutputFile(absoluteFilename); + // table.io() may return a shared FileIO instance. + // FileIO lifecycle is managed by RecordWriterManager.close(). + OutputFile tmpFile = table.io().newOutputFile(absoluteFilename); EncryptedOutputFile encryptedOutputFile = table.encryption().encrypt(tmpFile); outputFile = encryptedOutputFile.encryptingOutputFile(); keyMetadata = encryptedOutputFile.keyMetadata(); @@ -135,20 +130,6 @@ public void close() throws IOException { fileFormat, table.name(), absoluteFilename), e); } finally { - // Always attempt to close FileIO and decrement metrics - if (io != null) { - try { - io.close(); - } catch (Exception ioCloseError) { - if (closeError != null) { - closeError.addSuppressed(ioCloseError); - } else { - closeError = new IOException("Failed to close FileIO", ioCloseError); - } - } finally { - io = null; - } - } activeIcebergWriters.dec(); } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index da62fb658846..eb79513df4f9 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -29,8 +29,10 @@ import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.schemas.Schema; @@ -58,6 +60,7 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.transforms.Transforms; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; @@ -403,33 +406,50 @@ public boolean write(WindowedValue icebergDestination, Row r */ @Override public void close() throws IOException { - for (Map.Entry, DestinationState> - windowedDestinationAndState : destinations.entrySet()) { - DestinationState state = windowedDestinationAndState.getValue(); + try { + for (Map.Entry, DestinationState> + windowedDestinationAndState : destinations.entrySet()) { + DestinationState state = windowedDestinationAndState.getValue(); - // removing writers from the state's cache will trigger the logic to collect each writer's - // data file. - state.writers.invalidateAll(); - // first check for any exceptions swallowed by the cache - if (!state.exceptions.isEmpty()) { - IllegalStateException exception = - new IllegalStateException( - String.format("Encountered %s failed writer(s).", state.exceptions.size())); - for (Exception e : state.exceptions) { - exception.addSuppressed(e); + // removing writers from the state's cache will trigger the logic to collect each writer's + // data file. + state.writers.invalidateAll(); + // first check for any exceptions swallowed by the cache + if (!state.exceptions.isEmpty()) { + IllegalStateException exception = + new IllegalStateException( + String.format("Encountered %s failed writer(s).", state.exceptions.size())); + for (Exception e : state.exceptions) { + exception.addSuppressed(e); + } + throw exception; } - throw exception; - } - if (state.dataFiles.isEmpty()) { - continue; - } + if (state.dataFiles.isEmpty()) { + continue; + } - totalSerializableDataFiles.put( - windowedDestinationAndState.getKey(), new ArrayList<>(state.dataFiles)); - state.dataFiles.clear(); + totalSerializableDataFiles.put( + windowedDestinationAndState.getKey(), new ArrayList<>(state.dataFiles)); + state.dataFiles.clear(); + } + } finally { + // Close unique FileIO instances now that all writers are done. + // table.io() may return a shared FileIO; we deduplicate by identity + // so we close each underlying connection pool exactly once. + Set closedIOs = new HashSet<>(); + for (DestinationState state : destinations.values()) { + FileIO io = state.table.io(); + if (io != null && closedIOs.add(io)) { + try { + io.close(); + } catch (Exception e) { + LOG.warn("Failed to close FileIO for table '{}'", state.table.name(), e); + } + } + } + destinations.clear(); } - destinations.clear(); checkArgument( openWriters == 0, "Expected all data writers to be closed, but found %s data writer(s) still open", diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java index 375d90737117..2672ac70c088 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java @@ -65,6 +65,7 @@ import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.Record; import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; @@ -957,7 +958,7 @@ public void testDefaultMetrics() throws IOException { } @Test - public void testRecordWriterKeepsFileIOOpenUntilClose() throws IOException { + public void testRecordWriterDoesNotCloseSharedFileIO() throws IOException { TableIdentifier tableId = TableIdentifier.of( "default", @@ -980,7 +981,104 @@ public void testRecordWriterKeepsFileIOOpenUntilClose() throws IOException { writer.write(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row)); writer.close(); - assertTrue("FileIO should be closed after writer close", trackingFileIO.closed); + // RecordWriter must NOT close FileIO — it may be a shared instance. + assertFalse("RecordWriter.close() must not close the shared FileIO", trackingFileIO.closed); + } + + /** + * Verifies that when multiple writers share the same FileIO, closing any writer does not close + * the shared FileIO — that is the responsibility of RecordWriterManager.close(). + */ + @Test + public void testMultipleWritersSharingFileIOSurviveBatchClose() throws IOException { + // Create two tables that share the same FileIO (simulating dynamic destinations + // backed by the same catalog) + TableIdentifier tableId1 = + TableIdentifier.of( + "default", + "table_batch_close_a_" + UUID.randomUUID().toString().replace("-", "").substring(0, 6)); + TableIdentifier tableId2 = + TableIdentifier.of( + "default", + "table_batch_close_b_" + UUID.randomUUID().toString().replace("-", "").substring(0, 6)); + Table table1 = warehouse.createTable(tableId1, ICEBERG_SCHEMA); + Table table2 = warehouse.createTable(tableId2, ICEBERG_SCHEMA); + + // Both tables share the same CloseTrackingFileIO — mirrors how some catalogs + // return a shared FileIO instance across tables + CloseTrackingFileIO sharedFileIO = new CloseTrackingFileIO(table1.io()); + Table spyTable1 = Mockito.spy(table1); + Table spyTable2 = Mockito.spy(table2); + Mockito.doReturn(sharedFileIO).when(spyTable1).io(); + Mockito.doReturn(sharedFileIO).when(spyTable2).io(); + + PartitionKey pk1 = new PartitionKey(spyTable1.spec(), spyTable1.schema()); + PartitionKey pk2 = new PartitionKey(spyTable2.spec(), spyTable2.schema()); + + RecordWriter writer1 = new RecordWriter(spyTable1, FileFormat.PARQUET, "file1.parquet", pk1); + RecordWriter writer2 = new RecordWriter(spyTable2, FileFormat.PARQUET, "file2.parquet", pk2); + + Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build(); + Record record = IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row); + + writer1.write(record); + writer2.write(record); + + writer1.close(); + assertFalse("FileIO must remain open between batch writer closes", sharedFileIO.closed); + + writer2.close(); + assertFalse("FileIO must remain open after all writers close", sharedFileIO.closed); + + // Both writers produced valid data files + assertNotNull(writer1.getDataFile()); + assertNotNull(writer2.getDataFile()); + } + + /** + * Verifies that RecordWriterManager.close() flushes data files from multiple destinations and + * closes the shared FileIO. + */ + @Test + public void testRecordWriterManagerClosesSharedFileIOAfterFlush() throws IOException { + String tableName1 = + "table_mgr_io_a_" + UUID.randomUUID().toString().replace("-", "").substring(0, 6); + String tableName2 = + "table_mgr_io_b_" + UUID.randomUUID().toString().replace("-", "").substring(0, 6); + TableIdentifier tableId1 = TableIdentifier.of("default", tableName1); + TableIdentifier tableId2 = TableIdentifier.of("default", tableName2); + + Table realTable1 = warehouse.createTable(tableId1, ICEBERG_SCHEMA); + Table realTable2 = warehouse.createTable(tableId2, ICEBERG_SCHEMA); + + CloseTrackingFileIO sharedTrackingIO = new CloseTrackingFileIO(realTable1.io()); + Table spyTable1 = Mockito.spy(realTable1); + Table spyTable2 = Mockito.spy(realTable2); + Mockito.doReturn(sharedTrackingIO).when(spyTable1).io(); + Mockito.doReturn(sharedTrackingIO).when(spyTable2).io(); + + Catalog spyCatalog = Mockito.spy(catalog); + Mockito.doReturn(spyTable1).when(spyCatalog).loadTable(tableId1); + Mockito.doReturn(spyTable2).when(spyCatalog).loadTable(tableId2); + + WindowedValue dest1 = getWindowedDestination(tableName1, null); + WindowedValue dest2 = getWindowedDestination(tableName2, null); + + RecordWriterManager writerManager = + new RecordWriterManager(spyCatalog, "test_file_name", 1000, 3); + + Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build(); + assertTrue(writerManager.write(dest1, row)); + assertTrue(writerManager.write(dest2, row)); + assertEquals(2, writerManager.openWriters); + + writerManager.close(); + + Map, List> dataFiles = + writerManager.getSerializableDataFiles(); + assertTrue("Should have data files for dest1", dataFiles.containsKey(dest1)); + assertTrue("Should have data files for dest2", dataFiles.containsKey(dest2)); + assertTrue("Shared FileIO should be closed", sharedTrackingIO.closed); } private static final class CloseTrackingFileIO implements FileIO {