Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::io::{IsTerminal, Stdout, Write, stdout};
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::{env, fmt, io};

Expand All @@ -37,9 +38,9 @@ use quickwit_config::{
TransformConfig,
};
use quickwit_index_management::{IndexService, clear_cache_directory};
use quickwit_indexing::IndexingPipeline;
use quickwit_indexing::actors::IndexingService;
use quickwit_indexing::models::{DetachIndexingPipeline, IndexingStatistics, SpawnPipeline};
use quickwit_indexing::{IndexingPipeline, IndexingSplitCache};
use quickwit_ingest::IngesterPool;
use quickwit_metastore::IndexMetadataResponseExt;
use quickwit_proto::indexing::CpuCapacity;
Expand Down Expand Up @@ -417,6 +418,8 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
&HashSet::from_iter([QuickwitService::Indexer]),
)?;
let universe = Universe::new();
let split_cache =
Arc::new(IndexingSplitCache::from_config(&indexer_config, &config.data_dir_path).await?);
let indexing_server = IndexingService::new(
config.node_id.clone(),
config.data_dir_path.clone(),
Expand All @@ -428,6 +431,7 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
IngesterPool::default(),
storage_resolver,
EventBroker::default(),
split_cache,
)
.await?;
let (indexing_server_mailbox, indexing_server_handle) =
Expand Down
2 changes: 0 additions & 2 deletions quickwit/quickwit-compaction/src/compaction_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ pub struct PipelineStatusUpdate {
pub index_uid: IndexUid,
pub source_id: SourceId,
pub split_ids: Vec<SplitId>,
pub merge_level: u64,
pub status: PipelineStatus,
}

Expand Down Expand Up @@ -270,7 +269,6 @@ impl CompactionPipeline {
.map(|split| split.split_id().to_string())
.collect(),
status: self.status.clone(),
merge_level: self.merge_operation.merge_level() as u64,
}
}

Expand Down
11 changes: 6 additions & 5 deletions quickwit/quickwit-compaction/src/compactor_supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub struct CompactorSupervisor {
io_throughput_limiter: Option<Limiter>,
metastore: MetastoreServiceClient,
storage_resolver: StorageResolver,
split_cache: Arc<IndexingSplitCache>,
max_concurrent_split_uploads: usize,
event_broker: EventBroker,

Expand All @@ -75,6 +76,7 @@ impl CompactorSupervisor {
io_throughput_limiter: Option<Limiter>,
metastore: MetastoreServiceClient,
storage_resolver: StorageResolver,
split_cache: Arc<IndexingSplitCache>,
max_concurrent_split_uploads: usize,
event_broker: EventBroker,
compaction_root_directory: TempDirectory,
Expand All @@ -87,6 +89,7 @@ impl CompactorSupervisor {
io_throughput_limiter,
metastore,
storage_resolver,
split_cache,
max_concurrent_split_uploads,
event_broker,
compaction_root_directory,
Expand Down Expand Up @@ -182,8 +185,7 @@ impl CompactorSupervisor {

let index_storage_uri = Uri::from_str(&assignment.index_storage_uri)?;
let index_storage = self.storage_resolver.resolve(&index_storage_uri).await?;
let split_cache = Arc::new(IndexingSplitCache::no_caching());
let split_store = IndexingSplitStore::new(index_storage, split_cache);
let split_store = IndexingSplitStore::new(index_storage, self.split_cache.clone());

let doc_mapper = build_doc_mapper(&doc_mapping, &search_settings)?;
let merge_policy = merge_policy_from_settings(&indexing_settings);
Expand Down Expand Up @@ -332,6 +334,7 @@ mod tests {
None,
metastore,
StorageResolver::for_test(),
Arc::new(IndexingSplitCache::no_caching()),
2,
EventBroker::default(),
TempDirectory::for_test(),
Expand Down Expand Up @@ -541,6 +544,7 @@ mod tests {
None,
metastore,
StorageResolver::for_test(),
Arc::new(IndexingSplitCache::no_caching()),
2,
EventBroker::default(),
TempDirectory::for_test(),
Expand Down Expand Up @@ -581,15 +585,13 @@ mod tests {
source_id: "src".to_string(),
split_ids: vec!["s1".to_string(), "s2".to_string()],
status: PipelineStatus::InProgress,
merge_level: 1,
},
PipelineStatusUpdate {
task_id: "task-2".to_string(),
index_uid: quickwit_proto::types::IndexUid::for_test("test-index", 0),
source_id: "src".to_string(),
split_ids: vec!["s3".to_string()],
status: PipelineStatus::Completed,
merge_level: 1,
},
PipelineStatusUpdate {
task_id: "task-3".to_string(),
Expand All @@ -599,7 +601,6 @@ mod tests {
status: PipelineStatus::Failed {
error: "boom".to_string(),
},
merge_level: 1,
},
];

Expand Down
8 changes: 5 additions & 3 deletions quickwit/quickwit-compaction/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,22 @@

#![deny(clippy::disallowed_methods)]

#[allow(dead_code)]
mod compaction_pipeline;
#[allow(dead_code)]
mod compactor_supervisor;
mod metrics;
pub mod planner;

pub type TaskId = String;

use std::sync::Arc;

pub use compactor_supervisor::CompactorSupervisor;
use quickwit_actors::{Mailbox, Universe};
use quickwit_common::io;
use quickwit_common::pubsub::EventBroker;
use quickwit_common::temp_dir::TempDirectory;
use quickwit_config::CompactorConfig;
use quickwit_indexing::IndexingSplitCache;
use quickwit_proto::compaction::CompactionPlannerServiceClient;
use quickwit_proto::metastore::MetastoreServiceClient;
use quickwit_proto::types::{IndexUid, NodeId, SourceId};
Expand All @@ -47,11 +48,11 @@ pub async fn start_compactor_service(
compactor_config: &CompactorConfig,
metastore: MetastoreServiceClient,
storage_resolver: StorageResolver,
split_cache: Arc<IndexingSplitCache>,
event_broker: EventBroker,
compaction_root_directory: TempDirectory,
) -> anyhow::Result<Mailbox<CompactorSupervisor>> {
info!("starting compactor service");
// TODO: configure this for real
let io_throughput_limiter = compactor_config.max_merge_write_throughput.map(io::limiter);
let supervisor = CompactorSupervisor::new(
node_id,
Expand All @@ -60,6 +61,7 @@ pub async fn start_compactor_service(
io_throughput_limiter,
metastore,
storage_resolver,
split_cache,
compactor_config.max_concurrent_split_uploads,
event_broker,
compaction_root_directory,
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ impl Default for IndexerConfig {
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct CompactorConfig {
/// Maximum number of concurrent merge pipelines. Defaults to 2/3 of CPU count.
/// Maximum number of concurrent merge pipelines. Defaults to CPU count.
#[serde(default = "CompactorConfig::default_max_concurrent_pipelines")]
pub max_concurrent_pipelines: NonZeroUsize,
/// Maximum number of concurrent split uploads across all pipelines.
Expand Down
19 changes: 7 additions & 12 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use quickwit_actors::{
Observation,
};
use quickwit_cluster::Cluster;
use quickwit_common::fs::get_cache_directory_path;
use quickwit_common::pubsub::EventBroker;
use quickwit_common::temp_dir;
use quickwit_config::{
Expand Down Expand Up @@ -56,7 +55,7 @@ use tracing::{debug, error, info, warn};

use crate::models::{DetachIndexingPipeline, ObservePipeline, SpawnPipeline};
use crate::source::{AssignShards, Assignment};
use crate::split_store::{IndexingSplitCache, SplitStoreQuota};
use crate::split_store::IndexingSplitCache;
use crate::{IndexingPipeline, IndexingPipelineParams, IndexingSplitStore, IndexingStatistics};

/// Name of the indexing directory, usually located at `<data_dir_path>/indexing`.
Expand Down Expand Up @@ -94,7 +93,7 @@ pub struct IndexingService {
storage_resolver: StorageResolver,
indexing_pipelines: HashMap<PipelineUid, PipelineHandle>,
counters: IndexingServiceCounters,
local_split_store: Arc<IndexingSplitCache>,
split_cache: Arc<IndexingSplitCache>,
max_concurrent_split_uploads: usize,
cooperative_indexing_permits: Option<Arc<Semaphore>>,
event_broker: EventBroker,
Expand Down Expand Up @@ -124,14 +123,8 @@ impl IndexingService {
ingester_pool: IngesterPool,
storage_resolver: StorageResolver,
event_broker: EventBroker,
split_cache: Arc<IndexingSplitCache>,
) -> anyhow::Result<IndexingService> {
let split_store_space_quota = SplitStoreQuota::try_new(
indexer_config.split_store_max_num_splits,
indexer_config.split_store_max_num_bytes,
)?;
let split_cache_dir_path = get_cache_directory_path(&data_dir_path);
let local_split_store =
IndexingSplitCache::open(split_cache_dir_path, split_store_space_quota).await?;
let indexing_root_directory =
temp_dir::create_or_purge_directory(&data_dir_path.join(INDEXING_DIR_NAME)).await?;
let queue_dir_path = data_dir_path.join(QUEUES_DIR_NAME);
Expand All @@ -149,7 +142,7 @@ impl IndexingService {
ingest_api_service_opt,
ingester_pool,
storage_resolver,
local_split_store: Arc::new(local_split_store),
split_cache,
indexing_pipelines: Default::default(),
counters: Default::default(),
max_concurrent_split_uploads: indexer_config.max_concurrent_split_uploads,
Expand Down Expand Up @@ -245,7 +238,7 @@ impl IndexingService {
})?;
let merge_policy =
crate::merge_policy::merge_policy_from_settings(&index_config.indexing_settings);
let split_store = IndexingSplitStore::new(storage.clone(), self.local_split_store.clone());
let split_store = IndexingSplitStore::new(storage.clone(), self.split_cache.clone());

let doc_mapper = build_doc_mapper(&index_config.doc_mapping, &index_config.search_settings)
.map_err(|error| IndexingError::Internal(error.to_string()))?;
Expand Down Expand Up @@ -828,6 +821,7 @@ mod tests {
IngesterPool::default(),
storage_resolver.clone(),
EventBroker::default(),
Arc::new(IndexingSplitCache::no_caching()),
)
.await
.unwrap();
Expand Down Expand Up @@ -1435,6 +1429,7 @@ mod tests {
IngesterPool::default(),
storage_resolver.clone(),
EventBroker::default(),
Arc::new(IndexingSplitCache::no_caching()),
)
.await
.unwrap();
Expand Down
4 changes: 4 additions & 0 deletions quickwit/quickwit-indexing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#![deny(clippy::disallowed_methods)]

use std::sync::Arc;

use quickwit_actors::{Mailbox, Universe};
use quickwit_cluster::Cluster;
use quickwit_common::pubsub::EventBroker;
Expand Down Expand Up @@ -71,6 +73,7 @@ pub async fn start_indexing_service(
ingester_pool: IngesterPool,
storage_resolver: StorageResolver,
event_broker: EventBroker,
indexing_split_cache: Arc<IndexingSplitCache>,
) -> anyhow::Result<Mailbox<IndexingService>> {
info!("starting indexer service");
let ingest_api_service_mailbox = universe.get_one::<IngestApiService>();
Expand All @@ -85,6 +88,7 @@ pub async fn start_indexing_service(
ingester_pool,
storage_resolver,
event_broker,
indexing_split_cache,
)
.await?;
let (indexing_service, _) = universe.spawn_builder().spawn(indexing_service);
Expand Down
76 changes: 76 additions & 0 deletions quickwit/quickwit-indexing/src/split_store/indexing_split_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use std::time::{Duration, SystemTime};

use anyhow::Context;
use bytesize::ByteSize;
use quickwit_common::fs::get_cache_directory_path;
use quickwit_common::split_file;
use quickwit_config::IndexerConfig;
use quickwit_directories::BundleDirectory;
use quickwit_storage::StorageResult;
use tantivy::Directory;
Expand Down Expand Up @@ -364,6 +366,31 @@ impl IndexingSplitCache {
IndexingSplitCache { inner }
}

/// Builds an [`IndexingSplitCache`] from an [`IndexerConfig`].
///
/// A zero quota for either dimension produces a [`IndexingSplitCache::no_caching`]
/// instance — useful when compaction runs on dedicated nodes and indexers no
/// longer benefit from caching freshly produced splits. Otherwise, opens the
/// cache rooted at `<data_dir>/indexer-split-cache/splits`.
pub async fn from_config(
indexer_config: &IndexerConfig,
data_dir_path: &Path,
) -> anyhow::Result<IndexingSplitCache> {
if indexer_config.split_store_max_num_bytes.as_u64() == 0
|| indexer_config.split_store_max_num_splits == 0
{
return Ok(IndexingSplitCache::no_caching());
}
let cache_path = get_cache_directory_path(data_dir_path);
let quota = SplitStoreQuota::try_new(
indexer_config.split_store_max_num_splits,
indexer_config.split_store_max_num_bytes,
)?;
IndexingSplitCache::open(cache_path, quota)
.await
.context("failed to open indexing split cache")
}

/// Try to open an existing local split store directory.
///
/// If the directory does not exists, it will be created.
Expand Down Expand Up @@ -511,6 +538,7 @@ mod tests {
use std::time::Duration;

use bytesize::ByteSize;
use quickwit_config::IndexerConfig;
use quickwit_directories::BundleDirectory;
use quickwit_storage::{PutPayload, SplitPayloadBuilder};
use tantivy::Directory;
Expand All @@ -533,6 +561,54 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_from_config() {
// A zero quota in either dimension yields a no-caching cache that does
// not touch the filesystem; a positive quota opens (and creates) the
// cache directory at `<data_dir>/indexer-split-cache/splits`.
let zero_bytes = {
let mut config = IndexerConfig::for_test().unwrap();
config.split_store_max_num_bytes = ByteSize(0);
config
};
let zero_splits = {
let mut config = IndexerConfig::for_test().unwrap();
config.split_store_max_num_splits = 0;
config
};
let both_zero = {
let mut config = IndexerConfig::for_test().unwrap();
config.split_store_max_num_bytes = ByteSize(0);
config.split_store_max_num_splits = 0;
config
};
for config in [zero_bytes, zero_splits, both_zero] {
let data_dir = tempdir().unwrap();
let _cache = IndexingSplitCache::from_config(&config, data_dir.path())
.await
.unwrap();
assert!(
!data_dir
.path()
.join("indexer-split-cache")
.try_exists()
.unwrap(),
"no-caching variant must not create the cache directory",
);
}

let data_dir = tempdir().unwrap();
let config = IndexerConfig::for_test().unwrap();
let _cache = IndexingSplitCache::from_config(&config, data_dir.path())
.await
.unwrap();
let cache_dir = data_dir.path().join("indexer-split-cache").join("splits");
assert!(
cache_dir.is_dir(),
"positive quota must open (and create) the cache directory",
);
}

#[tokio::test]
async fn test_local_split_store_load_existing_splits() -> anyhow::Result<()> {
let temp_dir = tempfile::tempdir()?;
Expand Down
Loading
Loading