Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 17 additions & 94 deletions be/src/io/cache/block_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,16 @@ void BlockFileCache::add_need_update_lru_block(FileBlockSPtr block) {
}

std::string BlockFileCache::clear_file_cache_async() {
LOG(INFO) << "start clear_file_cache_async, path=" << _cache_base_path;
return clear_file_cache_impl(false);
}

std::string BlockFileCache::clear_file_cache_sync() {
return clear_file_cache_impl(true);
}

std::string BlockFileCache::clear_file_cache_impl(bool sync_remove) {
const char* action = sync_remove ? "clear_file_cache_sync" : "clear_file_cache_async";
LOG(INFO) << "start " << action << ", path=" << _cache_base_path;
_lru_dumper->remove_lru_dump_files();
int64_t num_cells_all = 0;
int64_t num_cells_to_delete = 0;
Expand All @@ -733,7 +742,11 @@ std::string BlockFileCache::clear_file_cache_async() {
}
}

// we cannot delete the element in the loop above, because it will break the iterator
// Do not erase while walking _files above: remove() may erase the current map element.
//
// sync_remove only changes how already releasable DOWNLOADED blocks are deleted from
// storage. Busy blocks keep the existing holder lifecycle: mark them deleting and leave
// them in _files until the last holder releases them.
for (auto& cell : deleting_cells) {
if (!cell->releasable()) {
LOG(INFO) << "cell is not releasable, hash="
Expand All @@ -745,15 +758,15 @@ std::string BlockFileCache::clear_file_cache_async() {
FileBlockSPtr file_block = cell->file_block;
if (file_block) {
std::lock_guard block_lock(file_block->_mutex);
remove(file_block, cache_lock, block_lock, false);
remove(file_block, cache_lock, block_lock, sync_remove);
++num_cells_to_delete;
}
}
clear_need_update_lru_blocks();
}

std::stringstream ss;
ss << "finish clear_file_cache_async, path=" << _cache_base_path
ss << "finish " << action << ", path=" << _cache_base_path << " sync_remove=" << sync_remove
<< " num_files_all=" << num_files_all << " num_cells_all=" << num_cells_all
<< " num_cells_to_delete=" << num_cells_to_delete
<< " num_cells_wait_recycle=" << num_cells_wait_recycle;
Expand Down Expand Up @@ -2243,96 +2256,6 @@ void BlockFileCache::clear_need_update_lru_blocks() {
*_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
}

void BlockFileCache::pause_ttl_manager() {
if (_ttl_mgr) {
_ttl_mgr->stop();
}
}

void BlockFileCache::resume_ttl_manager() {
if (_ttl_mgr) {
_ttl_mgr->resume();
}
}

std::string BlockFileCache::clear_file_cache_directly() {
pause_ttl_manager();
_lru_dumper->remove_lru_dump_files();
using namespace std::chrono;
std::stringstream ss;
auto start = steady_clock::now();
std::string result;
{
SCOPED_CACHE_LOCK(_mutex, this);
LOG_INFO("start clear_file_cache_directly").tag("path", _cache_base_path);

std::string clear_msg;
auto s = _storage->clear(clear_msg);
if (!s.ok()) {
result = clear_msg;
} else {
int64_t num_files = _files.size();
int64_t cache_size = _cur_cache_size;
int64_t index_queue_size = _index_queue.get_elements_num(cache_lock);
int64_t normal_queue_size = _normal_queue.get_elements_num(cache_lock);
int64_t disposible_queue_size = _disposable_queue.get_elements_num(cache_lock);
int64_t ttl_queue_size = _ttl_queue.get_elements_num(cache_lock);

int64_t clear_fd_duration = 0;
{
// clear FDCache to release fd
SCOPED_RAW_TIMER(&clear_fd_duration);
for (const auto& [file_key, file_blocks] : _files) {
for (const auto& [offset, file_block_cell] : file_blocks) {
AccessKeyAndOffset access_key_and_offset(file_key, offset);
FDCache::instance()->remove_file_reader(access_key_and_offset);
}
}
}

_files.clear();
_cur_cache_size = 0;
_cur_ttl_size = 0;
_time_to_key.clear();
_key_to_time.clear();
_index_queue.clear(cache_lock);
_normal_queue.clear(cache_lock);
_disposable_queue.clear(cache_lock);
_ttl_queue.clear(cache_lock);

// Update cache metrics immediately so consumers observe the cleared state
// without waiting for the next background monitor round.
_cur_cache_size_metrics->set_value(0);
_cur_ttl_cache_size_metrics->set_value(0);
_cur_ttl_cache_lru_queue_cache_size_metrics->set_value(0);
_cur_ttl_cache_lru_queue_element_count_metrics->set_value(0);
_cur_normal_queue_cache_size_metrics->set_value(0);
_cur_normal_queue_element_count_metrics->set_value(0);
_cur_index_queue_cache_size_metrics->set_value(0);
_cur_index_queue_element_count_metrics->set_value(0);
_cur_disposable_queue_cache_size_metrics->set_value(0);
_cur_disposable_queue_element_count_metrics->set_value(0);

clear_need_update_lru_blocks();

ss << "finish clear_file_cache_directly"
<< " path=" << _cache_base_path << " time_elapsed_ms="
<< duration_cast<milliseconds>(steady_clock::now() - start).count()
<< " fd_clear_time_ms=" << (clear_fd_duration / 1000000)
<< " num_files=" << num_files << " cache_size=" << cache_size
<< " index_queue_size=" << index_queue_size
<< " normal_queue_size=" << normal_queue_size
<< " disposible_queue_size=" << disposible_queue_size
<< "ttl_queue_size=" << ttl_queue_size;
result = ss.str();
LOG(INFO) << result;
}
}
_lru_dumper->remove_lru_dump_files();
resume_ttl_manager();
return result;
}

std::map<size_t, FileBlockSPtr> BlockFileCache::get_blocks_by_key(const UInt128Wrapper& hash) {
std::map<size_t, FileBlockSPtr> offset_to_block;
SCOPED_CACHE_LOCK(_mutex, this);
Expand Down
10 changes: 6 additions & 4 deletions be/src/io/cache/block_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ class BlockFileCache {
* @returns summary message
*/
std::string clear_file_cache_async();
std::string clear_file_cache_directly();
std::string clear_file_cache_sync();

/**
* Reset the cache capacity. If the new_capacity is smaller than _capacity, the redundant data will be remove async.
Expand Down Expand Up @@ -311,9 +311,6 @@ class BlockFileCache {

void update_ttl_atime(const UInt128Wrapper& hash);

void pause_ttl_manager();
void resume_ttl_manager();

std::map<std::string, double> get_stats();

// for be UTs
Expand Down Expand Up @@ -393,6 +390,11 @@ class BlockFileCache {
}

private:
// Shared scan used by both clear modes. It keeps the FileBlock holder lifecycle intact:
// releasable blocks are removed immediately, while blocks held by readers are only marked
// deleting and are later removed by FileBlocksHolder destruction.
std::string clear_file_cache_impl(bool sync_remove);

LRUQueue& get_queue(FileCacheType type);
const LRUQueue& get_queue(FileCacheType type) const;

Expand Down
45 changes: 37 additions & 8 deletions be/src/io/cache/block_file_cache_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#endif

#include <algorithm>
#include <atomic>
#include <execution>
#include <ostream>
#include <utility>
Expand Down Expand Up @@ -243,27 +244,55 @@ FileCacheFactory::get_query_context_holders(const TUniqueId& query_id,
return holders;
}

std::string FileCacheFactory::clear_file_caches(bool sync) {
Status FileCacheFactory::clear_file_caches(bool sync, std::string* ret) {
DCHECK(ret != nullptr);

// Sync clear is an operational action and can synchronously remove many files. Keep a single
// process-wide sync clear in flight, so a second HTTP request fails fast instead of piling onto
// the same cache instances. Async clear keeps the previous behavior and is not gated here.
static std::atomic_bool sync_clear_running {false};
struct SyncClearRunningGuard {
std::atomic_bool* running = nullptr;
~SyncClearRunningGuard() {
if (running != nullptr) {
running->store(false, std::memory_order_release);
}
}
} sync_clear_guard;
if (sync) {
bool expected = false;
if (!sync_clear_running.compare_exchange_strong(expected, true, std::memory_order_acq_rel,
std::memory_order_acquire)) {
return Status::InvalidArgument("sync clear_file_caches is already running");
}
sync_clear_guard.running = &sync_clear_running;
}

std::vector<std::string> results(_caches.size());
#ifndef USE_LIBCPP
std::for_each(std::execution::par, _caches.begin(), _caches.end(), [&](const auto& cache) {
size_t index = &cache - &_caches[0];
results[index] =
sync ? cache->clear_file_cache_directly() : cache->clear_file_cache_async();
results[index] = sync ? cache->clear_file_cache_sync() : cache->clear_file_cache_async();
});
#else
// libcpp do not support std::execution::par
std::for_each(_caches.begin(), _caches.end(), [&](const auto& cache) {
size_t index = &cache - &_caches[0];
results[index] =
sync ? cache->clear_file_cache_directly() : cache->clear_file_cache_async();
results[index] = sync ? cache->clear_file_cache_sync() : cache->clear_file_cache_async();
});
#endif
std::stringstream ss;
for (const auto& result : results) {
ss << result << "\n";
for (const auto& cache_result : results) {
ss << cache_result << "\n";
}
return ss.str();
*ret = ss.str();
return Status::OK();
}

std::string FileCacheFactory::clear_file_caches(bool sync) {
std::string result;
auto st = clear_file_caches(sync, &result);
return st.ok() ? result : st.to_string();
}

void FileCacheFactory::dump_all_caches() {
Expand Down
2 changes: 2 additions & 0 deletions be/src/io/cache/block_file_cache_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <gen_cpp/internal_service.pb.h>

#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <string_view>
Expand Down Expand Up @@ -85,6 +86,7 @@ class FileCacheFactory {
* @return summary message
*/
std::string clear_file_caches(bool sync);
Status clear_file_caches(bool sync, std::string* result);

/**
* dump lru queue info for all file cache instances
Expand Down
4 changes: 4 additions & 0 deletions be/src/io/cache/fs_file_cache_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,10 @@ Status FSFileCacheStorage::read(const FileCacheKey& key, size_t value_offset, Sl
}

Status FSFileCacheStorage::remove(const FileCacheKey& key) {
// Large clear-cache tests only need to verify the synchronous remove handoff and in-memory
// index cleanup. They can return early here to avoid test-only disk churn.
TEST_SYNC_POINT_RETURN_WITH_VALUE("FSFileCacheStorage::remove", Status::OK(), &key);

const std::string v3_dir = get_path_in_local_cache_v3(key.hash);
const std::string v3_file = get_path_in_local_cache_v3(v3_dir, key.offset);
FDCache::instance()->remove_file_reader(std::make_pair(key.hash, key.offset));
Expand Down
24 changes: 14 additions & 10 deletions be/src/service/http/action/file_cache_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ constexpr static std::string_view RELEASED_ELEMENTS = "released_elements";
constexpr static std::string_view DUMP = "dump";
constexpr static std::string_view VALUE = "value";
constexpr static std::string_view RELOAD = "reload";
constexpr static std::string_view SYNC_CLEAR_UNSUPPORTED_MSG =
"sync clear_file_cache is no longer supported in http api, running async clear instead";

Status FileCacheAction::_handle_header(HttpRequest* req, std::string* json_metrics) {
const std::string header_json(HEADER_JSON);
Expand All @@ -85,18 +83,21 @@ Status FileCacheAction::_handle_header(HttpRequest* req, std::string* json_metri
const std::string& sync = req->param(std::string(SYNC));
const std::string& segment_path = req->param(std::string(VALUE));
if (segment_path.empty()) {
io::FileCacheFactory::instance()->clear_file_caches(false);
const bool sync_clear = to_lower(sync) == "true";
std::string clear_msg;
RETURN_IF_ERROR(
io::FileCacheFactory::instance()->clear_file_caches(sync_clear, &clear_msg));
if (sync_clear) {
EasyJson json;
json["status"] = "OK";
json["msg"] = clear_msg;
*json_metrics = json.ToString();
}
} else {
io::UInt128Wrapper hash = io::BlockFileCache::hash(segment_path);
io::BlockFileCache* cache = io::FileCacheFactory::instance()->get_by_path(hash);
cache->remove_if_cached_async(hash);
}
if (to_lower(sync) == "true") {
EasyJson json;
json["status"] = "OK";
json["msg"] = std::string(SYNC_CLEAR_UNSUPPORTED_MSG);
*json_metrics = json.ToString();
}
} else if (operation == RESET) {
std::string capacity = req->param(std::string(CAPACITY));
int64_t new_capacity = 0;
Expand Down Expand Up @@ -194,7 +195,10 @@ void FileCacheAction::handle(HttpRequest* req) {
HttpChannel::send_reply(req, HttpStatus::OK,
json_metrics.empty() ? status.to_json() : json_metrics);
} else {
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, status_result);
const auto http_status = status.is<ErrorCode::INVALID_ARGUMENT>()
? HttpStatus::BAD_REQUEST
: HttpStatus::INTERNAL_SERVER_ERROR;
HttpChannel::send_reply(req, http_status, status_result);
}
}

Expand Down
Loading
Loading