Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion include/cucascade/data/data_batch.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,56 @@ class data_batch : public std::enable_shared_from_this<data_batch> {
* The CV is notified outside of the batch mutex.
*/
void set_state_change_cv(std::condition_variable* cv);

/**
* @brief Blocking call: wait until the batch can accept a new task, then create it.
*
* Blocks until the batch leaves in_transit state, then performs the same transition
* as try_to_create_task(). Always succeeds when it returns.
*/
void wait_to_create_task();

/**
* @brief Blocking call: wait until a created task can be cancelled, then cancel it.
*
* Blocks until the batch is in task_created or processing state, then performs
* the same transition as try_to_cancel_task(). Always succeeds when it returns.
*/
void wait_to_cancel_task();

/**
* @brief Blocking call: wait until the batch can be locked for processing, then lock it.
*
* Blocks until the batch is in task_created or processing state with a pending
* task_created_count. Non-waitable failures (missing_data, memory_space_mismatch)
* are returned immediately with the appropriate status.
*
* @param requested_memory_space The memory space the caller expects to process from.
* @return lock_for_processing_result success=true with handle on success; success=false
* with status describing non-waitable failure.
*/
lock_for_processing_result wait_to_lock_for_processing(
memory::memory_space_id requested_memory_space);

/**
* @brief Blocking call: wait until the batch can be locked for in-transit, then lock it.
*
* Blocks until processing_count == 0 and the batch is in idle or task_created state,
* then performs the same transition as try_to_lock_for_in_transit(). Always succeeds
* when it returns.
*/
void wait_to_lock_for_in_transit();

/**
* @brief Blocking call: wait until the batch is in in_transit state, then release it.
*
* Blocks until the batch is in in_transit state, then performs the same transition
* as try_to_release_in_transit(). Always succeeds when it returns.
*
* @param target_state Optional state to transition to when releasing in_transit. If not set,
* the batch returns to idle.
*/
void wait_to_release_in_transit(std::optional<batch_state> target_state = std::nullopt);
/**
* @brief Replace the underlying data representation.
* Requires no active processing.
Expand Down Expand Up @@ -417,7 +467,8 @@ class data_batch : public std::enable_shared_from_this<data_batch> {
void decrement_processing_count();

mutable std::mutex _mutex; ///< Mutex for thread-safe access to state and processing count
uint64_t _batch_id; ///< Unique identifier for this data batch
std::condition_variable _internal_cv; ///< CV used by blocking wait_to_* calls
uint64_t _batch_id; ///< Unique identifier for this data batch
std::unique_ptr<idata_representation> _data; ///< Pointer to the actual data representation
size_t _processing_count = 0; ///< Count of active processing handles
size_t _task_created_count = 0; ///< Count of pending task_created requests
Expand Down
129 changes: 129 additions & 0 deletions src/data/data_batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,32 @@ bool data_batch::try_to_create_task()
success = true;
}
}
if (should_notify) { _internal_cv.notify_all(); }
if (should_notify && cv_to_notify) { cv_to_notify->notify_all(); }
return success;
}

void data_batch::wait_to_create_task()
{
std::condition_variable* cv_to_notify = nullptr;
bool should_notify = false;
{
std::unique_lock<std::mutex> lock(_mutex);
_internal_cv.wait(lock, [&] { return _state != batch_state::in_transit; });
if (_state == batch_state::idle) {
_state = batch_state::task_created;
++_task_created_count;
should_notify = true;
cv_to_notify = _state_change_cv;
} else {
// task_created or processing: just increment counter
++_task_created_count;
}
}
if (should_notify) { _internal_cv.notify_all(); }
if (should_notify && cv_to_notify) { cv_to_notify->notify_all(); }
}

size_t data_batch::get_task_created_count() const
{
std::lock_guard<std::mutex> lock(_mutex);
Expand Down Expand Up @@ -162,9 +184,35 @@ bool data_batch::try_to_cancel_task()
success = true;
}
}
if (should_notify) { _internal_cv.notify_all(); }
if (should_notify && cv_to_notify) { cv_to_notify->notify_all(); }
return success;
}

void data_batch::wait_to_cancel_task()
{
std::condition_variable* cv_to_notify = nullptr;
bool should_notify = false;
{
std::unique_lock<std::mutex> lock(_mutex);
_internal_cv.wait(lock, [&] {
return _state == batch_state::task_created || _state == batch_state::processing;
});
if (_task_created_count == 0) {
throw std::runtime_error(
"Cannot cancel task: task_created_count is zero. "
"try_to_create_task() must be called before wait_to_cancel_task()");
}
--_task_created_count;
if (_task_created_count == 0 && _processing_count == 0) {
_state = batch_state::idle;
should_notify = true;
cv_to_notify = _state_change_cv;
}
}
if (should_notify) { _internal_cv.notify_all(); }
if (should_notify && cv_to_notify) { cv_to_notify->notify_all(); }
}
lock_for_processing_result data_batch::try_to_lock_for_processing(
memory::memory_space_id requested_memory_space)
{
Expand Down Expand Up @@ -202,10 +250,58 @@ lock_for_processing_result data_batch::try_to_lock_for_processing(
result = {
true, data_batch_processing_handle{shared_from_this()}, lock_for_processing_status::success};
}
if (should_notify) { _internal_cv.notify_all(); }
if (should_notify && cv_to_notify) { cv_to_notify->notify_all(); }
return result;
}

lock_for_processing_result data_batch::wait_to_lock_for_processing(
memory::memory_space_id requested_memory_space)
{
std::condition_variable* cv_to_notify = nullptr;
lock_for_processing_result result{
false, data_batch_processing_handle{}, lock_for_processing_status::not_attempted};
{
std::unique_lock<std::mutex> lock(_mutex);

// Return immediately for failures that cannot be resolved by waiting
if (_data == nullptr) {
result.status = lock_for_processing_status::missing_data;
return result;
}
if (_data->get_memory_space().get_id() != requested_memory_space) {
result.status = lock_for_processing_status::memory_space_mismatch;
return result;
}

// Wait until the state allows locking for processing
_internal_cv.wait(lock, [&] {
return (_state == batch_state::task_created || _state == batch_state::processing) &&
_task_created_count > 0;
});

// Re-check non-waitable conditions after waking (data may have changed)
if (_data == nullptr) {
result.status = lock_for_processing_status::missing_data;
return result;
}
if (_data->get_memory_space().get_id() != requested_memory_space) {
result.status = lock_for_processing_status::memory_space_mismatch;
return result;
}

--_task_created_count;
++_processing_count;
_state = batch_state::processing;
cv_to_notify = _state_change_cv;
result = {
true, data_batch_processing_handle{shared_from_this()}, lock_for_processing_status::success};
}
_internal_cv.notify_all();
if (cv_to_notify) { cv_to_notify->notify_all(); }
return result;
}

bool data_batch::try_to_lock_for_in_transit()
{
std::condition_variable* cv_to_notify = nullptr;
Expand All @@ -222,10 +318,28 @@ bool data_batch::try_to_lock_for_in_transit()
success = true;
}
}
if (should_notify) { _internal_cv.notify_all(); }
if (should_notify && cv_to_notify) { cv_to_notify->notify_all(); }
return success;
}

void data_batch::wait_to_lock_for_in_transit()
{
std::condition_variable* cv_to_notify = nullptr;
{
std::unique_lock<std::mutex> lock(_mutex);
_internal_cv.wait(lock, [&] {
return _processing_count == 0 &&
((_state == batch_state::idle) ||
(_state == batch_state::task_created && _task_created_count > 0));
});
_state = batch_state::in_transit;
cv_to_notify = _state_change_cv;
}
_internal_cv.notify_all();
if (cv_to_notify) { cv_to_notify->notify_all(); }
}

bool data_batch::try_to_release_in_transit(std::optional<batch_state> target_state)
{
std::condition_variable* cv_to_notify = nullptr;
Expand All @@ -245,10 +359,24 @@ bool data_batch::try_to_release_in_transit(std::optional<batch_state> target_sta
success = true;
}
}
if (should_notify) { _internal_cv.notify_all(); }
if (should_notify && cv_to_notify) { cv_to_notify->notify_all(); }
return success;
}

void data_batch::wait_to_release_in_transit(std::optional<batch_state> target_state)
{
std::condition_variable* cv_to_notify = nullptr;
{
std::unique_lock<std::mutex> lock(_mutex);
_internal_cv.wait(lock, [&] { return _state == batch_state::in_transit; });
_state = target_state.has_value() ? *target_state : batch_state::idle;
cv_to_notify = _state_change_cv;
}
_internal_cv.notify_all();
if (cv_to_notify) { cv_to_notify->notify_all(); }
}

void data_batch::decrement_processing_count()
{
std::condition_variable* cv_to_notify = nullptr;
Expand All @@ -272,6 +400,7 @@ void data_batch::decrement_processing_count()
cv_to_notify = _state_change_cv;
}
}
if (should_notify) { _internal_cv.notify_all(); }
if (should_notify && cv_to_notify) { cv_to_notify->notify_all(); }
}

Expand Down
Loading
Loading