Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@ target
.devbox
/.envrc
*.credentials
/.idea
/examples/docker-compose/data
devbox.lock
/var/cache
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/bin/bash
set -e

echo "Running delete partition test..."

# Create namespace
{{ICE_CLI}} --config {{CLI_CONFIG}} create-namespace ${NAMESPACE_NAME}
echo "OK Created namespace: ${NAMESPACE_NAME}"

# Get the full path to the input file
SCENARIO_DIR="{{SCENARIO_DIR}}"
INPUT_PATH="${SCENARIO_DIR}/${INPUT_FILE}"

# Create table with partitioning and insert data
{{ICE_CLI}} --config {{CLI_CONFIG}} insert --create-table ${TABLE_NAME} ${INPUT_PATH} --partition="${PARTITION_SPEC}"
echo "OK Inserted data with partitioning into table ${TABLE_NAME}"

# Delete partition for a specific day
output=$({{ICE_CLI}} --config {{CLI_CONFIG}} delete ${TABLE_NAME} --partition "[{\"name\": \"${TRANSFORMED_PARTITION_COLUMN}\", \"values\": [\"${DELETE_PARTITION_DAY}\"]}]" 2>&1)

echo "$output"

line_count=$(echo "$output" | wc -l)

if [ "$line_count" -ne 1 ]; then
echo "FAIL Expected 1 line, got $line_count"
exit 1
fi

if ! echo "$output" | grep -qF "${TRANSFORMED_PARTITION_COLUMN}=${DELETE_PARTITION_DAY}"; then
echo "FAIL Expected output to contain '${TRANSFORMED_PARTITION_COLUMN}=${DELETE_PARTITION_DAY}'"
exit 1
fi

echo "OK Deleted partition with value ${DELETE_PARTITION_DAY}"

# Cleanup
{{ICE_CLI}} --config {{CLI_CONFIG}} delete-table ${TABLE_NAME}
echo "OK Deleted table: ${TABLE_NAME}"

{{ICE_CLI}} --config {{CLI_CONFIG}} delete-namespace ${NAMESPACE_NAME}
echo "OK Deleted namespace: ${NAMESPACE_NAME}"

echo "Delete partition test completed successfully"
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
name: Delete partition
description: Tests deleting individual partitions by value

catalogConfig:
warehouse: s3://test-bucket/warehouse

env:
NAMESPACE_NAME: test_delete_partition
TABLE_NAME: test_delete_partition.tripdata_p_by_day
INPUT_FILE: tripdata.parquet
PARTITION_SPEC: '[{"column":"tpep_pickup_datetime","transform":"day"}]'
PARTITION_COLUMN: tpep_pickup_datetime
TRANSFORMED_PARTITION_COLUMN: tpep_pickup_datetime_day
DELETE_PARTITION_DAY: 2025-01-02
Binary file not shown.
73 changes: 55 additions & 18 deletions ice/src/main/java/com/altinity/ice/cli/internal/cmd/Delete.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,25 @@
*/
package com.altinity.ice.cli.internal.cmd;

import com.altinity.ice.cli.Main.PartitionFilter;
import com.altinity.ice.cli.internal.iceberg.Partitioning;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestReader;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.rest.RESTCatalog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -33,30 +41,57 @@ private Delete() {}
public static void run(
RESTCatalog catalog,
TableIdentifier tableId,
List<com.altinity.ice.cli.Main.PartitionFilter> partitions,
List<PartitionFilter> partitions,
boolean dryRun)
throws IOException, URISyntaxException {

Table table = catalog.loadTable(tableId);
TableScan scan = table.newScan();
if (partitions != null && !partitions.isEmpty()) {
org.apache.iceberg.expressions.Expression expr = null;
for (com.altinity.ice.cli.Main.PartitionFilter pf : partitions) {
org.apache.iceberg.expressions.Expression e = null;

Snapshot currentSnapshot = table.currentSnapshot();
if (currentSnapshot == null) {
logger.error("There are no snapshots in this table");
return;
}

FileIO io = table.io();
Map<Integer, PartitionSpec> specsById = table.specs();

List<ManifestFile> dataManifests = currentSnapshot.dataManifests(io);
List<DataFile> filesToDelete = new ArrayList<>();

Expression expression = null;

if (partitions != null) {
for (PartitionFilter pf : partitions) {
String fieldName = pf.name();

Expression fieldExpr = null;
for (Object value : pf.values()) {
org.apache.iceberg.expressions.Expression valueExpr =
org.apache.iceberg.expressions.Expressions.equal(pf.name(), value);
e = (e == null) ? valueExpr : org.apache.iceberg.expressions.Expressions.or(e, valueExpr);
Integer transformed = Partitioning.applyTimestampTransform(table, fieldName, value);
if (transformed != null) {
value = transformed;
}

Expression singleValueExpr = Expressions.equal(fieldName, value);
fieldExpr =
fieldExpr == null ? singleValueExpr : Expressions.or(fieldExpr, singleValueExpr);
}
if (fieldExpr == null) {
continue;
}
expr = (expr == null) ? e : org.apache.iceberg.expressions.Expressions.and(expr, e);
expression = expression == null ? fieldExpr : Expressions.and(expression, fieldExpr);
}
scan = scan.filter(expr);
}
List<DataFile> filesToDelete = new ArrayList<>();

try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
for (FileScanTask task : tasks) {
filesToDelete.add(task.file());
for (ManifestFile manifest : dataManifests) {
ManifestReader<DataFile> reader = ManifestFiles.read(manifest, io, specsById);
if (expression != null) {
reader.filterPartitions(expression);
}
try (reader) {
for (DataFile dataFile : reader) {
filesToDelete.add(dataFile);
}
}
}

Expand All @@ -73,6 +108,8 @@ public static void run(
}
rewrite.commit();
}
} else {
logger.info("No files to delete");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.ChronoField;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -27,6 +30,7 @@
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.UpdatePartitionSpec;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
Expand All @@ -37,6 +41,7 @@
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SerializableFunction;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
Expand All @@ -49,6 +54,23 @@ public final class Partitioning {

private Partitioning() {}

// Formatter with optional time component (2025-01-01 or 2025-01-01T00:00:00)
private static final DateTimeFormatter DATE_TIME_INPUT_FORMATTER =
new DateTimeFormatterBuilder()
.append(DateTimeFormatter.ISO_LOCAL_DATE)
.optionalStart()
.appendLiteral("T")
.append(DateTimeFormatter.ISO_LOCAL_TIME)
.optionalEnd()
.optionalStart()
.appendOffsetId()
.optionalEnd()
.parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
.parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
.parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
.parseDefaulting(ChronoField.NANO_OF_SECOND, 0)
.toFormatter();

public record InferPartitionKeyResult(
@Nullable PartitionKey partitionKey, @Nullable String failureReason) {
public boolean success() {
Expand Down Expand Up @@ -347,4 +369,48 @@ public static long toEpochMicros(Object tsValue) {
throw new UnsupportedOperationException("unexpected value type: " + tsValue.getClass());
}
}

/**
* Converts a datetime string input value to the table's partition transform unit if it is a
* timestamp transform. The transformed value is the Iceberg internal representation (e.g. days
* since Unix epoch).
*
* @return The timestamp converted to the partition unit as an integer, or null if not
* convertible.
*/
@Nullable
public static Integer applyTimestampTransform(Table table, String fieldName, Object value) {
PartitionField partitionField = getPartitionField(table, fieldName);
if (partitionField == null) return null;

Transform<?, ?> transform = partitionField.transform();
if (transform.isIdentity() || !(value instanceof String s)) {
return null;
}
if (s.isEmpty()) {
return null;
}

Type sourceType = table.schema().findType(partitionField.sourceId());
if (!(sourceType instanceof Types.TimestampType)) {
return null;
}

long timestampMicros = toEpochMicros(LocalDateTime.parse(s, DATE_TIME_INPUT_FORMATTER));

@SuppressWarnings("unchecked")
Transform<Long, Integer> typedTransform = (Transform<Long, Integer>) transform;

return typedTransform.bind(sourceType).apply(timestampMicros);
}

@Nullable
private static PartitionField getPartitionField(Table table, String fieldName) {
for (PartitionField field : table.spec().fields()) {
if (field.name().equals(fieldName)) {
return field;
}
}
return null;
}
}