Skip to content

Commit 08a2dcb

Browse files
committed
[lake] Use file to store lake snapshot
1 parent a0abed0 commit 08a2dcb

File tree

19 files changed

+551
-56
lines changed

19 files changed

+551
-56
lines changed

fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ public class FlussPaths {
9191
/** The name of the directory for shared remote snapshot kv files. */
9292
public static final String REMOTE_KV_SNAPSHOT_SHARED_DIR = "shared";
9393

94+
private static final String REMOTE_LAKE_DIR_NAME = "lake";
95+
9496
// ----------------------------------------------------------------------------------------
9597
// LOG/KV Tablet Paths
9698
// ----------------------------------------------------------------------------------------
@@ -681,6 +683,28 @@ public static FsPath remoteKvSnapshotDir(FsPath remoteKvTabletDir, long snapshot
681683
return new FsPath(remoteKvTabletDir, REMOTE_KV_SNAPSHOT_DIR_PREFIX + snapshotId);
682684
}
683685

686+
/**
687+
* Returns the remote path for storing lake snapshot metadata required by Fluss for a table.
688+
*
689+
* <p>The path contract:
690+
*
691+
* <pre>
692+
* {$remote.data.dir}/lake/{databaseName}/{tableName}-{tableId}/snapshot/{snapshotId}
693+
* </pre>
694+
*/
695+
public static FsPath remoteLakeTableSnapshotPath(
696+
Configuration conf, TablePath tablePath, long tableId, long snapshotId) {
697+
return new FsPath(
698+
String.format(
699+
"%s/%s/%s/%s-%d/snapshot/%d",
700+
conf.get(ConfigOptions.REMOTE_DATA_DIR),
701+
REMOTE_LAKE_DIR_NAME,
702+
tablePath.getDatabaseName(),
703+
tablePath.getTableName(),
704+
tableId,
705+
snapshotId));
706+
}
707+
684708
/**
685709
* Returns the remote directory path for storing kv snapshot shared files (SST files with UUID
686710
* prefix).

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TestingLakeTieringFactory.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
public class TestingLakeTieringFactory
4040
implements LakeTieringFactory<TestingWriteResult, TestingCommittable> {
4141

42-
@Nullable private final TestingLakeCommitter testingLakeCommitter;
42+
@Nullable private TestingLakeCommitter testingLakeCommitter;
4343

4444
public TestingLakeTieringFactory(@Nullable TestingLakeCommitter testingLakeCommitter) {
4545
this.testingLakeCommitter = testingLakeCommitter;
@@ -63,7 +63,10 @@ public SimpleVersionedSerializer<TestingWriteResult> getWriteResultSerializer()
6363
@Override
6464
public LakeCommitter<TestingWriteResult, TestingCommittable> createLakeCommitter(
6565
CommitterInitContext committerInitContext) throws IOException {
66-
return testingLakeCommitter == null ? new TestingLakeCommitter() : testingLakeCommitter;
66+
if (testingLakeCommitter == null) {
67+
this.testingLakeCommitter = new TestingLakeCommitter();
68+
}
69+
return testingLakeCommitter;
6770
}
6871

6972
@Override

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import org.apache.fluss.metadata.TableBucket;
3131
import org.apache.fluss.metadata.TablePath;
3232
import org.apache.fluss.server.zk.ZooKeeperClient;
33-
import org.apache.fluss.server.zk.data.LakeTableSnapshot;
33+
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
3434
import org.apache.fluss.utils.types.Tuple2;
3535

3636
import org.apache.flink.configuration.Configuration;
@@ -161,7 +161,7 @@ void testCommitNonPartitionedTable() throws Exception {
161161
expectedMaxTimestamps.put(t2b0, 31L);
162162
expectedMaxTimestamps.put(t2b1, 32L);
163163
expectedMaxTimestamps.put(t2b2, 33L);
164-
verifyLakeSnapshot(tablePath2, tableId2, 1, expectedLogEndOffsets, expectedMaxTimestamps);
164+
verifyLakeSnapshot(tablePath2, tableId2, 2, expectedLogEndOffsets, expectedMaxTimestamps);
165165

166166
// let's process one round of TableBucketWriteResult again
167167
expectedLogEndOffsets = new HashMap<>();
@@ -181,7 +181,7 @@ void testCommitNonPartitionedTable() throws Exception {
181181
expectedLogEndOffsets.put(tableBucket, offset);
182182
expectedMaxTimestamps.put(tableBucket, timestamp);
183183
}
184-
verifyLakeSnapshot(tablePath1, tableId1, 1, expectedLogEndOffsets, expectedMaxTimestamps);
184+
verifyLakeSnapshot(tablePath1, tableId1, 3, expectedLogEndOffsets, expectedMaxTimestamps);
185185
}
186186

187187
@Test

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,7 @@ void testPartitionedPrimaryKeyTable() throws Throwable {
393393

394394
// mock finished tiered this round, check second round
395395
context.getSplitsAssignmentSequence().clear();
396+
long snapshotId = 1;
396397
final Map<Long, Map<Integer, Long>> bucketOffsetOfInitialWrite = new HashMap<>();
397398
for (Map.Entry<String, Long> partitionNameById : partitionNameByIds.entrySet()) {
398399
Map<Integer, Long> partitionBucketOffsetOfEarliest = new HashMap<>();
@@ -409,7 +410,7 @@ void testPartitionedPrimaryKeyTable() throws Throwable {
409410
genCommitLakeTableSnapshotRequest(
410411
tableId,
411412
partitionNameById.getValue(),
412-
1,
413+
snapshotId++,
413414
partitionBucketOffsetOfEarliest,
414415
bucketOffsetOfInitialWrite.get(
415416
partitionNameById.getValue())))
@@ -421,8 +422,7 @@ void testPartitionedPrimaryKeyTable() throws Throwable {
421422
Map<Long, Map<Integer, Long>> bucketOffsetOfSecondWrite =
422423
upsertRowForPartitionedTable(
423424
tablePath, DEFAULT_PK_TABLE_DESCRIPTOR, partitionNameByIds, 10, 20);
424-
long snapshotId = 0;
425-
waitUntilPartitionTableSnapshot(tableId, partitionNameByIds, snapshotId);
425+
waitUntilPartitionTableSnapshot(tableId, partitionNameByIds, 0);
426426

427427
// request tiering table splits
428428
for (int subtaskId = 0; subtaskId < numSubtasks; subtaskId++) {
@@ -518,6 +518,7 @@ void testPartitionedLogTableSplits() throws Throwable {
518518
// mock finished tiered this round, check second round
519519
context.getSplitsAssignmentSequence().clear();
520520
final Map<Long, Map<Integer, Long>> bucketOffsetOfInitialWrite = new HashMap<>();
521+
long snapshot = 1;
521522
for (Map.Entry<String, Long> partitionNameById : partitionNameByIds.entrySet()) {
522523
long partitionId = partitionNameById.getValue();
523524
Map<Integer, Long> partitionInitialBucketOffsets = new HashMap<>();
@@ -537,7 +538,7 @@ void testPartitionedLogTableSplits() throws Throwable {
537538
genCommitLakeTableSnapshotRequest(
538539
tableId,
539540
partitionId,
540-
1,
541+
snapshot++,
541542
partitionInitialBucketOffsets,
542543
bucketOffsetOfInitialWrite.get(partitionId)))
543544
.get();

fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@
9393
import org.apache.fluss.server.utils.ServerRpcMessageUtils;
9494
import org.apache.fluss.server.zk.ZooKeeperClient;
9595
import org.apache.fluss.server.zk.data.BucketSnapshot;
96-
import org.apache.fluss.server.zk.data.LakeTableSnapshot;
96+
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
9797

9898
import org.slf4j.Logger;
9999
import org.slf4j.LoggerFactory;

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,14 +79,14 @@
7979
import org.apache.fluss.server.utils.ServerRpcMessageUtils;
8080
import org.apache.fluss.server.zk.ZooKeeperClient;
8181
import org.apache.fluss.server.zk.data.BucketAssignment;
82-
import org.apache.fluss.server.zk.data.LakeTableSnapshot;
8382
import org.apache.fluss.server.zk.data.LeaderAndIsr;
8483
import org.apache.fluss.server.zk.data.PartitionAssignment;
8584
import org.apache.fluss.server.zk.data.RemoteLogManifestHandle;
8685
import org.apache.fluss.server.zk.data.TableAssignment;
8786
import org.apache.fluss.server.zk.data.TabletServerRegistration;
8887
import org.apache.fluss.server.zk.data.ZkData.PartitionIdsZNode;
8988
import org.apache.fluss.server.zk.data.ZkData.TableIdsZNode;
89+
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
9090
import org.apache.fluss.utils.types.Tuple2;
9191

9292
import org.slf4j.Logger;
@@ -1161,7 +1161,13 @@ private CommitLakeTableSnapshotResponse tryProcessCommitLakeTableSnapshot(
11611161
tableResp.setTableId(tableId);
11621162

11631163
try {
1164-
zooKeeperClient.upsertLakeTableSnapshot(tableId, lakeTableSnapshotEntry.getValue());
1164+
TablePath tablePath = coordinatorContext.getTablePathById(tableId);
1165+
if (tablePath == null) {
1166+
throw new RuntimeException(
1167+
String.format("Failed to find table path for table id: %d", tableId));
1168+
}
1169+
zooKeeperClient.upsertLakeTableSnapshot(
1170+
tableId, tablePath, lakeTableSnapshotEntry.getValue());
11651171
} catch (Exception e) {
11661172
ApiError error = ApiError.fromThrowable(e);
11671173
tableResp.setError(error.error().code(), error.message());

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@
4343
import org.apache.fluss.server.metadata.BucketMetadata;
4444
import org.apache.fluss.server.metadata.PartitionMetadata;
4545
import org.apache.fluss.server.metadata.TableMetadata;
46-
import org.apache.fluss.server.zk.data.LakeTableSnapshot;
4746
import org.apache.fluss.server.zk.data.LeaderAndIsr;
47+
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
4848

4949
import org.slf4j.Logger;
5050
import org.slf4j.LoggerFactory;

fluss-server/src/main/java/org/apache/fluss/server/entity/CommitLakeTableSnapshotData.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.fluss.server.entity;
1919

2020
import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest;
21-
import org.apache.fluss.server.zk.data.LakeTableSnapshot;
21+
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
2222

2323
import java.util.Map;
2424
import java.util.Objects;

fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@
9393
import org.apache.fluss.server.replica.fetcher.ReplicaFetcherManager;
9494
import org.apache.fluss.server.utils.FatalErrorHandler;
9595
import org.apache.fluss.server.zk.ZooKeeperClient;
96-
import org.apache.fluss.server.zk.data.LakeTableSnapshot;
96+
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
9797
import org.apache.fluss.utils.FileUtils;
9898
import org.apache.fluss.utils.FlussPaths;
9999
import org.apache.fluss.utils.MapUtils;

fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,8 @@
158158
import org.apache.fluss.server.metadata.ServerInfo;
159159
import org.apache.fluss.server.metadata.TableMetadata;
160160
import org.apache.fluss.server.zk.data.BucketSnapshot;
161-
import org.apache.fluss.server.zk.data.LakeTableSnapshot;
162161
import org.apache.fluss.server.zk.data.LeaderAndIsr;
162+
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
163163

164164
import javax.annotation.Nullable;
165165

0 commit comments

Comments
 (0)