Skip to content

[Flink-39911][historyserver] Support Pluggable Storage Backend for HistoryServer#28402

Open
chenzihao5 wants to merge 4 commits into
apache:masterfrom
chenzihao5:FLINK-39911
Open

[Flink-39911][historyserver] Support Pluggable Storage Backend for HistoryServer#28402
chenzihao5 wants to merge 4 commits into
apache:masterfrom
chenzihao5:FLINK-39911

Conversation

@chenzihao5

@chenzihao5 chenzihao5 commented Jun 12, 2026

Copy link
Copy Markdown

What is the purpose of the change

This pull request introduces a pluggable archive storage backend for the HistoryServer, so that downloaded job archives no longer have to be unpacked into a large number of small JSON files on the local filesystem. And this PR adds a RocksDB backend that stores all archive entries as key-value pairs inside a single embedded RocksDB instance, while keeping the existing FILE backend as the default to preserve backwards compatibility.

Brief change log

  • Introduce the ArchiveStorage abstraction with two implementations:
    • FileArchiveStorage — existing behavior, one JSON file per entry under historyserver.web.tmpdir (default).
    • RocksDBArchiveStorage — stores all entries as key-value pairs in a single embedded RocksDB instance under historyserver.web.tmpdir/rocksdb.
  • Add new ConfigOptions in HistoryServerOptions:
    • historyserver.archive.storage.type (FILE | ROCKSDB, default FILE)
    • historyserver.archive.rocksdb.native-lib-dir
    • historyserver.archive.rocksdb.compression
    • historyserver.archive.rocksdb.bottommost-compression
  • Wire the selected backend into HistoryServer / HistoryServerArchiveFetcher so archive read/write paths go through ArchiveStorage.
  • Standardized the resource request processing workflow in AbstractHistoryServerHandler.
  • Add documents for this feature.

Verifying this change

This change added tests and can be verified as follows:

  • Added unit tests for ArchiveStorage covering exists / get / put / delete / deletePrefix / getByPrefix and lifecycle of archive data.
  • Added common unit tests in AbstractHistoryServerHandlerTest for different archive storage.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): yes
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? docs

Was generative AI tooling used to co-author this PR?
  • Yes (Claude-4.7-Opus 1M Context)

@flinkbot

flinkbot commented Jun 12, 2026

Copy link
Copy Markdown
Collaborator

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@reswqa reswqa left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @chenzihao5 for improving history server.

I made a rough scan first and left some comments.

Comment thread flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java Outdated
@chenzihao5

Copy link
Copy Markdown
Author

@reswqa Thanks for your review. I have modified the code based on the review comments. Please take a look again. Thanks.

Comment on lines +85 to +94
void deleteByPrefix(String keyPrefix) throws IOException;

/**
* Returns the entries identified by {@code prefix} from this storage.
*
* @param prefix storage key prefix
* @return the entries
* @throws IOException if the entries cannot be read
*/
List<Entry> getEntriesByPrefix(String prefix) throws IOException;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest either keeping all the Entries word from method name or removing them all.

*
* @param prefix storage key prefix
* @return the entries
* @throws IOException if the entries cannot be read

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@throws IOException if the entries cannot be read

Any entries or all?


@Override
public boolean exists(String key) throws IOException {
return new File(rootPath, key).exists();

@reswqa reswqa Jun 17, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder could we avoid the allocation here(e.g. using Files.exists)?

Comment on lines +80 to +81
File target = new File(rootPath, key);
Files.deleteIfExists(target.toPath());

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we avoid creating the File instance here?

private final FileSystem fs;

private RefreshLocation(Path path, FileSystem fs) {
protected RefreshLocation(Path path, FileSystem fs) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this should be protected?

* HistoryServerOptions#HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS}.
*/
public class HistoryServerApplicationArchiveFetcher extends HistoryServerArchiveFetcher {
public class HistoryServerApplicationArchiveFetcher<Entry>

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add java doc for the generic type Entry.

Comment on lines +91 to 94
File webApplicationDir = new File(webDir, APPLICATIONS_SUBDIR);
Files.createDirectories(webApplicationDir.toPath());
this.webApplicationsOverviewDir = new File(webDir, APPLICATION_OVERVIEWS_SUBDIR);
File webApplicationsOverviewDir = new File(webDir, APPLICATION_OVERVIEWS_SUBDIR);
Files.createDirectories(webApplicationsOverviewDir.toPath());

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we need the Path rather than File obj itselft.

String key;
if (path.equals(JobsOverviewHeaders.URL)) {
target = new File(webOverviewDir, jobId + JSON_FILE_ENDING);
key = "/" + JOB_OVERVIEWS_SUBDIR + "/" + jobId + JSON_FILE_ENDING;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we extract "/" + JOB_OVERVIEWS_SUBDIR + "/" and "/" + JOBS_SUBDIR + "/" as a const String variable(e.g. JOB_OVERVIEWS_KEY_PREFIX and JOBS_KEY_PREFIX or any other reasonable name).

Comment on lines +446 to 450
if (overview instanceof File) {
subJobs = mapper.readValue((File) overview, MultipleJobsDetails.class);
} else {
subJobs = mapper.readValue((String) overview, MultipleJobsDetails.class);
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We assume that Entry either a File or a String, If I introduce a new type of ArchivedStore then things got worse. I don't think dev should add new branch here?

protected abstract ArchiveStorage<T> createStorage() throws Exception;

/** Reads the textual content of a storage entry. */
protected abstract String readContent(T entry) throws Exception;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we introduce a method like getAsContent in the ArchivedStore that directly returns the string, then this method and the generic type of the test class would no longer be necessary. Furthermore, we should be able to convert it into parameterized test class.

final Path uploadDir = Files.createDirectory(tmpDir.resolve("uploadDir"));

AbstractHistoryServerHandler<?> handler = createHandler(webDir.toFile());
Router router = new Router().addGet("/:*", handler);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raw use of parameterized class 'Router'

* Common HTTP-level tests for {@link AbstractHistoryServerHandler} subclasses. Subclasses only need
* to provide a concrete handler instance via {@link #createHandler(File)}.
*/
abstract class AbstractHistoryServerHandlerTest {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a parameterized test.

+ "extracts the library into a unique sub-directory under this "
+ "directory. Defaults to the JVM 'java.io.tmpdir' when not "
+ "configured. Only applies when "
+ "'historyserver.archive.storage.type' is 'ROCKSDB'.");

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer HISTORY_SERVER_ARCHIVE_STORAGE_TYPE.key() rather than hard-coded historyserver.archive.storage.type.

Comment on lines +273 to +290
public static final ConfigOption<RocksDBCompressionType>
HISTORY_SERVER_ARCHIVE_ROCKSDB_COMPRESSION =
key("historyserver.archive.rocksdb.compression")
.enumType(RocksDBCompressionType.class)
.defaultValue(RocksDBCompressionType.LZ4_COMPRESSION)
.withDescription(
"Compression type used for the non-bottommost levels of the RocksDB "
+ "SST files. Only applies when 'historyserver.archive.storage.type' is 'ROCKSDB'.");

/** Compression type used for the bottommost level of the RocksDB SST files. */
public static final ConfigOption<RocksDBCompressionType>
HISTORY_SERVER_ARCHIVE_ROCKSDB_BOTTOMMOST_COMPRESSION =
key("historyserver.archive.rocksdb.bottommost-compression")
.enumType(RocksDBCompressionType.class)
.defaultValue(RocksDBCompressionType.ZSTD_COMPRESSION)
.withDescription(
"Compression type used for the bottommost level of the "
+ "RocksDB SST files. Only applies when 'historyserver.archive.storage.type' is 'ROCKSDB'.");

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These configurations are too low-level. Do any users actually have the need to optimize these? If so, it's not too late to introduce them later. I prefer to remove these two configuration options first.

byte[] startKey = keyPrefix.getBytes(UTF_8);
byte[] endKey = keyPrefix.getBytes(UTF_8);
// Add 1 to the last byte to get the next lexicographic byte
endKey[endKey.length - 1]++;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, this trick should heavily rely on the fact that the byte array is encoded in UTF-8. It would be best to add some comments to explain why it works.

@Override
public List<String> getEntriesByPrefix(String prefix) throws IOException {
List<String> result = new ArrayList<>();
if (prefix == null || prefix.isEmpty()) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StringUtils.isNullOrWhitespaceOnly(prefix))

} else {
break;
}
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should add a check after while block per https://github.com/facebook/rocksdb/wiki/Iterator 🤔

while(iterator.isValid()){
xxx
}
iterator.status();

libDir.getAbsolutePath());
} catch (Throwable t) {
LOG.warn("Failed to load RocksDB native library to '{}'.", libDir.getAbsolutePath(), t);
deleteDirectoryQuietly(libDir);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we clean up this libDir even if history server stop w/o error?

try (RocksIterator iterator = db.newIterator()) {
byte[] prefixBytes = prefix.getBytes(UTF_8);
iterator.seek(prefixBytes);
while (iterator.isValid()) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use ReadOptions.setIterateUpperBound(Slice) to set upbound here?

        byte[] upper = xxx;
        try (Slice upperSlice = new Slice(upper);
             ReadOptions ro = new ReadOptions().setIterateUpperBound(upperSlice);
             RocksIterator it = db.newIterator(ro)) {
            for (it.seek(prefixBytes); it.isValid(); it.next()) {
                result.add(new String(it.value(), UTF_8));
            }
            try {
                it.status();
            } catch (RocksDBException e) {
                throw new IOException(e);
            }
        }


@Override
public void close() {
handlesToClose.forEach(IOUtils::closeQuietly);

@reswqa reswqa Jun 18, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should close db first. The db internally holds references to objects from options — including the bloom filter. As long as the db is alive, these resources are still in use by background threads, flushes, and compactions. If you free the options before closing the db, it may accesses freed memory(i.e. use-after-free).

Even without this issue, generally speaking, the rule-of-thumb is close all the handlesToClose in reverse order.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants