Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
1a6c642
Implement high-performance sorted writing support in IcebergIO
atognolag May 7, 2026
094c721
Fix multiple-iteration anti-pattern in WriteGroupedRowsToFiles by ext…
atognolag May 7, 2026
f9dd27d
Refactor row sorting calls to be explicitly conditional on table.sort…
atognolag May 7, 2026
ca4ff74
Simplify sorting logic to keep call DRY and use straightforward 'rows…
atognolag May 7, 2026
df8dc69
Rename variable rows to sortedOrUnsortedRows to explicitly communicat…
atognolag May 7, 2026
9120fda
Fix sorted write PR comments: resolve null-ordering bugs and optimize…
atognolag May 7, 2026
75dcc4f
Fix Checkstyle check MissingDeprecated warning in IcebergRowSorter.java
atognolag May 7, 2026
e4c97bf
Remove unused deprecated encodeSortKey method in IcebergRowSorter.java
atognolag May 7, 2026
ecdaf7f
Finalize Hybrid Partitioning and Sorting Architecture in IcebergIO: e…
atognolag May 8, 2026
890fe76
Set HASH as the default distribution mode in IcebergIO and comprehens…
atognolag May 8, 2026
0cecd01
Add testRangeDistribution integration test case for RANGE distributio…
atognolag May 8, 2026
94fa738
Add Javadoc documentation for withAutosharding method in IcebergIO.java
atognolag May 8, 2026
f24183a
Update RANGE sharding code sample to use ID partitioning and document…
atognolag May 8, 2026
670aab1
Document the multi-dimensional distribution mode decision matrix in I…
atognolag May 8, 2026
acf0e56
Add Operational Impact column to distribution modes Javadoc table in …
atognolag May 8, 2026
e22d4dc
Fix ClassCastException in IcebergUtils string field copying by using …
atognolag May 8, 2026
1ab4fcf
Add comprehensive test scenarios for dynamic BigDecimal and Integer t…
atognolag May 8, 2026
1e19180
Add thorough test scenarios for Double, Boolean, and Null value to St…
atognolag May 8, 2026
4d20b11
Use catalog.buildTable to set SortOrder during dynamic table creation…
atognolag May 8, 2026
fca10f8
Adding a note and a warning
atognolag May 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdks/java/io/iceberg/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ dependencies {
implementation library.java.vendored_guava_32_1_2_jre
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(path: ":sdks:java:extensions:sorter")
implementation library.java.avro
implementation library.java.slf4j_api
implementation library.java.joda_time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
Expand All @@ -51,38 +53,65 @@ class AssignDestinationsAndPartitions

private final DynamicDestinations dynamicDestinations;
private final IcebergCatalogConfig catalogConfig;
private final DistributionMode distributionMode;
private final @Nullable SerializableFunction<Row, Integer> distributionFunction;

static final String DESTINATION = "destination";
static final String PARTITION = "partition";
static final String SHARD = "shard";
static final org.apache.beam.sdk.schemas.Schema OUTPUT_SCHEMA =
org.apache.beam.sdk.schemas.Schema.builder()
.addStringField(DESTINATION)
.addStringField(PARTITION)
.addNullableField(SHARD, org.apache.beam.sdk.schemas.Schema.FieldType.INT32)
.build();

public AssignDestinationsAndPartitions(
DynamicDestinations dynamicDestinations, IcebergCatalogConfig catalogConfig) {
this(dynamicDestinations, catalogConfig, DistributionMode.HASH, null);
}

public AssignDestinationsAndPartitions(
DynamicDestinations dynamicDestinations,
IcebergCatalogConfig catalogConfig,
DistributionMode distributionMode,
@Nullable SerializableFunction<Row, Integer> distributionFunction) {
this.dynamicDestinations = dynamicDestinations;
this.catalogConfig = catalogConfig;
this.distributionMode = distributionMode;
this.distributionFunction = distributionFunction;
}

@Override
public PCollection<KV<Row, Row>> expand(PCollection<Row> input) {
return input
.apply(ParDo.of(new AssignDoFn(dynamicDestinations, catalogConfig)))
.apply(
ParDo.of(
new AssignDoFn(
dynamicDestinations, catalogConfig, distributionMode, distributionFunction)))
.setCoder(
KvCoder.of(
RowCoder.of(OUTPUT_SCHEMA), RowCoder.of(dynamicDestinations.getDataSchema())));
}

@SuppressWarnings("nullness")
static class AssignDoFn extends DoFn<Row, KV<Row, Row>> {
private transient @MonotonicNonNull Map<String, PartitionKey> partitionKeys;
private transient @MonotonicNonNull Map<String, BeamRowWrapper> wrappers;
private final DynamicDestinations dynamicDestinations;
private final IcebergCatalogConfig catalogConfig;
private final DistributionMode distributionMode;
private final @Nullable SerializableFunction<Row, Integer> distributionFunction;

AssignDoFn(DynamicDestinations dynamicDestinations, IcebergCatalogConfig catalogConfig) {
AssignDoFn(
DynamicDestinations dynamicDestinations,
IcebergCatalogConfig catalogConfig,
DistributionMode distributionMode,
@Nullable SerializableFunction<Row, Integer> distributionFunction) {
this.dynamicDestinations = dynamicDestinations;
this.catalogConfig = catalogConfig;
this.distributionMode = distributionMode;
this.distributionFunction = distributionFunction;
}

@Setup
Expand Down Expand Up @@ -132,8 +161,17 @@ public void processElement(
partitionKey.partition(wrapper.wrap(data));
String partitionPath = partitionKey.toPath();

Integer shardId = null;
if (distributionMode == DistributionMode.RANGE && distributionFunction != null) {
shardId = distributionFunction.apply(data);
}

Row destAndPartition =
Row.withSchema(OUTPUT_SCHEMA).addValues(tableIdentifier, partitionPath).build();
Row.withSchema(OUTPUT_SCHEMA)
.withFieldValue(DESTINATION, tableIdentifier)
.withFieldValue(PARTITION, partitionPath)
.withFieldValue(SHARD, shardId)
.build();
out.output(KV.of(destAndPartition, data));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
Expand Down Expand Up @@ -384,7 +385,7 @@ public class IcebergIO {
public static WriteRows writeRows(IcebergCatalogConfig catalog) {
return new AutoValue_IcebergIO_WriteRows.Builder()
.setCatalogConfig(catalog)
.setDistributionMode(DistributionMode.NONE)
.setDistributionMode(DistributionMode.HASH)
.setAutoSharding(false)
.build();
}
Expand All @@ -404,6 +405,8 @@ public abstract static class WriteRows extends PTransform<PCollection<Row>, Iceb

abstract DistributionMode getDistributionMode();

abstract @Nullable SerializableFunction<Row, Integer> getDistributionFunction();

abstract boolean getAutoSharding();

abstract Builder toBuilder();
Expand All @@ -422,6 +425,8 @@ abstract static class Builder {

abstract Builder setDistributionMode(DistributionMode mode);

abstract Builder setDistributionFunction(SerializableFunction<Row, Integer> shardFn);

abstract Builder setAutoSharding(boolean autoSharding);

abstract WriteRows build();
Expand Down Expand Up @@ -457,19 +462,193 @@ public WriteRows withDirectWriteByteLimit(Integer directWriteByteLimit) {
}

/**
* Defines distribution of write data. Supported distributions:
* The default distribution mode is {@link DistributionMode#HASH}.
*
* <p><b>Warning on HASH mode:</b> Utilizing {@code HASH} distribution mode (with or without
* auto-sharding) can suffer from large unpartitioned or skewed writes if key spaces are
* not uniformly distributed. This can bottleneck workers and produce fragmented layout files.
*
* <p><b>Note on RANGE mode:</b> When utilizing {@code RANGE} distribution mode, it is
* recommended that the custom distribution function is designed to produce adequately sized and
* strictly non-overlapping ranges of the sorting column to optimize downstream read
* performance.
*
* <h3>Comparison of Distribution Modes:</h3>
*
* <table border="1">
* <caption>Comparison of Distribution Modes</caption>
* <tr>
* <td><b>Mode</b></td>
* <td><b>Description</b></td>
* <td><b>Pros</b></td>
* <td><b>Cons</b></td>
* </tr>
* <tr>
* <td>{@link DistributionMode#NONE}</td>
* <td>No network shuffle is performed. Records are sorted locally on workers prior to writing.</td>
* <td>Highly lightweight with zero shuffle/network overhead. Best for smaller data volumes.</td>
* <td>Writers on different workers can write to overlapping min/max key ranges across multiple files. Relies heavily on post-fact compaction or query time merges.</td>
* </tr>
* <tr>
* <td>{@link DistributionMode#HASH}</td>
* <td>Data is shuffled and consolidated by partition key. All records for a partition are routed to a single worker.</td>
* <td>Consolidates partition files, eliminating cross-worker file overlapping for partition keys. Excellent worker stability.</td>
* <td>Can suffer from severe data skew if a single partition contains significantly more data than others (hot partitions).</td>
* </tr>
* <tr>
* <td>{@link DistributionMode#RANGE}</td>
* <td>Data is shuffled based on a user-provided shard/bucket function (e.g., hashing/binning continuous keys).</td>
* <td>Distributes writes for hot partitions across multiple workers. Eliminates skew while keeping file min/max key ranges tight and non-overlapping.</td>
* <td>Requires providing a custom {@link SerializableFunction} mapping rows to integer shard/bucket IDs.</td>
* </tr>
* </table>
*
* <h3>Recommendation Matrix (Sorting &amp; Partitioning vs. Scale):</h3>
*
* <table border="1">
* <caption>Recommendation Matrix</caption>
* <tr>
* <td><b>Partitioning</b></td>
* <td><b>Sorting</b></td>
* <td><b>Scale / Volume</b></td>
* <td><b>Latency Priority</b></td>
* <td><b>Recommended Mode</b></td>
* <td><b>Operational Impact</b></td>
* </tr>
* <tr>
* <td>Partitioned</td>
* <td>Sorted</td>
* <td>Small</td>
* <td>Any</td>
* <td>{@link DistributionMode#HASH}</td>
* <td>Consolidates partition files and sorts them locally. Avoids file overlaps for small volumes.</td>
* </tr>
* <tr>
* <td>Partitioned</td>
* <td>Sorted</td>
* <td>Medium / Large</td>
* <td>Low Write Latency</td>
* <td>{@link DistributionMode#NONE}</td>
* <td>Eliminates shuffle overhead for maximum write speed. Results in overlapping key ranges across files, which requires downstream compaction.</td>
* </tr>
* <tr>
* <td>Partitioned</td>
* <td>Sorted</td>
* <td>Medium / Large</td>
* <td>Low Read Latency</td>
* <td>{@link DistributionMode#HASH} with auto-sharding OR {@link DistributionMode#RANGE}</td>
* <td>HASH with auto-sharding scales writes for hot partitions but can result in overlapping file ranges requiring query-time sort merges. RANGE sharding distributes hot partitions into sequential, non-overlapping files to optimize reads.</td>
* </tr>
* <tr>
* <td>Partitioned</td>
* <td>Unsorted</td>
* <td>Small</td>
* <td>Any</td>
* <td>{@link DistributionMode#HASH}</td>
* <td>Consolidates data files into single partition directories to prevent file fragmentation.</td>
* </tr>
* <tr>
* <td>Partitioned</td>
* <td>Unsorted</td>
* <td>Medium / Large</td>
* <td>Any</td>
* <td>{@link DistributionMode#HASH} with auto-sharding</td>
* <td>Consolidates partition files while dynamically balancing hot partition writes across parallel workers.</td>
* </tr>
* <tr>
* <td>Unpartitioned</td>
* <td>Sorted</td>
* <td>Small</td>
* <td>Any</td>
* <td>{@link DistributionMode#NONE}</td>
* <td>Bypasses network shuffle for fast, low-volume local sorting.</td>
* </tr>
* <tr>
* <td>Unpartitioned</td>
* <td>Sorted</td>
* <td>Medium / Large</td>
* <td>Low Write Latency</td>
* <td>{@link DistributionMode#NONE}</td>
* <td>Bypasses network shuffle for parallel worker writes. Requires downstream compaction to resolve overlapping file ranges.</td>
* </tr>
* <tr>
* <td>Unpartitioned</td>
* <td>Sorted</td>
* <td>Medium / Large</td>
* <td>Low Read Latency</td>
* <td>{@link DistributionMode#RANGE} (with custom sharding function)</td>
* <td>Shards continuous keys into non-overlapping worker ranges. Eliminates single-worker bottlenecks and guarantees zero file overlap for fast queries.</td>
* </tr>
* <tr>
* <td>Unpartitioned</td>
* <td>Unsorted</td>
* <td>Any</td>
* <td>Any</td>
* <td>{@link DistributionMode#NONE}</td>
* <td>Direct, parallel worker writes with maximum throughput and zero network shuffle overhead.</td>
* </tr>
* </table>
*
* <h3>Code Samples:</h3>
*
* <ol>
* <li>{@link DistributionMode.NONE}: don't shuffle rows (default)
* <li>{@link DistributionMode.HASH}: shuffle rows by partition key before writing data
* </ol>
* <pre>{@code
* // 1. Using default HASH distribution mode (Consolidates by partition key)
* pipeline
* .apply(Create.of(BEAM_ROWS))
* .apply(IcebergIO.writeRows(catalogConfig)
* .to(tableId));
*
* {@link DistributionMode.RANGE} is not supported yet
* // 2. Using NONE distribution mode (No shuffle, local sorting only)
* pipeline
* .apply(Create.of(BEAM_ROWS))
* .apply(IcebergIO.writeRows(catalogConfig)
* .to(tableId)
* .withDistributionMode(DistributionMode.NONE));
*
* // 3. Using RANGE distribution mode with a custom shard/bucket function to avoid data skew
* pipeline
* .apply(Create.of(BEAM_ROWS))
* .apply(IcebergIO.writeRows(catalogConfig)
* .to(tableId)
* .withDistributionMode(DistributionMode.RANGE)
* .withDistributionFunction(row -> {
* // Group continuous IDs into 16 parallel, non-overlapping shards
* long id = row.getInt64("id");
* return (int) (id / 10000);
* }));
* }</pre>
*/
public WriteRows withDistributionMode(DistributionMode mode) {
return toBuilder().setDistributionMode(mode).build();
}

/**
* Sets the custom range-distribution function.
*
* <p>Only applicable when the distribution mode is set to {@link DistributionMode#RANGE}. The
* function maps a Beam {@link Row} to an Integer representing a shard/bucket ID.
*/
public WriteRows withDistributionFunction(SerializableFunction<Row, Integer> shardFn) {
return toBuilder().setDistributionFunction(shardFn).build();
}

/**
* Enables Beam's dynamic auto-sharding when using {@link DistributionMode#HASH}.
*
* <p>When enabled, the pipeline uses {@link
* org.apache.beam.sdk.transforms.GroupIntoBatches#withShardedKey()} under the hood. The runner
* (such as Dataflow) dynamically monitors throughput per partition key. If a partition is
* extremely hot, the runner automatically splits it into parallel sub-shards distributed across
* multiple workers to prevent single-worker bottlenecks and out-of-memory (OOM) errors, while
* keeping the number of data files for cold partitions minimal.
*
* <p>Note that because auto-sharding distributes hot-partition data randomly across worker
* shards, the written data files cannot guarantee non-overlapping key ranges. Downstream
* queries may require read-time sort merges for overlapping file segments until an Iceberg
* compaction job (e.g., `rewriteDataFiles`) is executed.
*
* <p>Only applicable when using {@link DistributionMode#HASH}.
*/
public WriteRows withAutosharding() {
return toBuilder().setAutoSharding(true).build();
}
Expand Down Expand Up @@ -514,7 +693,30 @@ public IcebergWriteResult expand(PCollection<Row> input) {
return input
.apply(
"AssignDestinationAndPartition",
new AssignDestinationsAndPartitions(destinations, getCatalogConfig()))
new AssignDestinationsAndPartitions(
destinations,
getCatalogConfig(),
getDistributionMode(),
getDistributionFunction()))
.apply(
"Write Rows to Partitions",
new WriteToPartitions(
getCatalogConfig(),
destinations,
getTriggeringFrequency(),
getAutoSharding()));
case RANGE:
Preconditions.checkArgument(
getDistributionFunction() != null,
"Must provide a distribution function when using RANGE distribution mode.");
return input
.apply(
"AssignDestinationAndPartitionWithRange",
new AssignDestinationsAndPartitions(
destinations,
getCatalogConfig(),
getDistributionMode(),
getDistributionFunction()))
.apply(
"Write Rows to Partitions",
new WriteToPartitions(
Expand Down
Loading
Loading