Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/binary_protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ bytemuck = { workspace = true }
bytes = { workspace = true }
compio-buf = { workspace = true }
enumset = { workspace = true }
secrecy = { workspace = true }
smallvec = { workspace = true }
thiserror = { workspace = true }

Expand Down
4 changes: 4 additions & 0 deletions core/binary_protocol/src/codes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ pub const UPDATE_PERMISSIONS_CODE: u32 = 36;
pub const CHANGE_PASSWORD_CODE: u32 = 37;
pub const LOGIN_USER_CODE: u32 = 38;
pub const LOGOUT_USER_CODE: u32 = 39;
pub const LOGIN_REGISTER_CODE: u32 = 40;
pub const LOGIN_REGISTER_WITH_PAT_CODE: u32 = 45;

// -- Personal Access Tokens --
pub const GET_PERSONAL_ACCESS_TOKENS_CODE: u32 = 41;
Expand Down Expand Up @@ -119,6 +121,8 @@ mod tests {
CHANGE_PASSWORD_CODE,
LOGIN_USER_CODE,
LOGOUT_USER_CODE,
LOGIN_REGISTER_CODE,
LOGIN_REGISTER_WITH_PAT_CODE,
GET_PERSONAL_ACCESS_TOKENS_CODE,
CREATE_PERSONAL_ACCESS_TOKEN_CODE,
DELETE_PERSONAL_ACCESS_TOKEN_CODE,
Expand Down
98 changes: 93 additions & 5 deletions core/binary_protocol/src/consensus/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,16 @@ pub struct RequestHeader {
pub operation: Operation,
pub operation_padding: [u8; 7],
pub namespace: u64,
pub reserved: [u8; 64],
pub session: u64,
pub reserved: [u8; 56],
}
const _: () = {
assert!(size_of::<RequestHeader>() == HEADER_SIZE);
assert!(
offset_of!(RequestHeader, client)
== offset_of!(RequestHeader, reserved_frame) + size_of::<[u8; 66]>()
);
assert!(offset_of!(RequestHeader, reserved) + size_of::<[u8; 64]>() == HEADER_SIZE);
assert!(offset_of!(RequestHeader, reserved) + size_of::<[u8; 56]>() == HEADER_SIZE);
};

impl Default for RequestHeader {
Expand All @@ -135,7 +136,8 @@ impl Default for RequestHeader {
operation: Operation::Reserved,
operation_padding: [0; 7],
namespace: 0,
reserved: [0; 64],
session: 0,
reserved: [0; 56],
}
}
}
Expand All @@ -159,6 +161,31 @@ impl ConsensusHeader for RequestHeader {
found: self.command,
});
}
// Register: session must be 0, request must be 0.
// Non-register: session must be > 0, request must be > 0.
if self.operation == Operation::Register {
if self.session != 0 {
return Err(ConsensusError::InvalidField(
"register: session must be 0".to_string(),
));
}
if self.request != 0 {
return Err(ConsensusError::InvalidField(
"register: request must be 0".to_string(),
));
}
} else if self.operation != Operation::Reserved {
if self.session == 0 {
return Err(ConsensusError::InvalidField(
"non-register: session must be > 0".to_string(),
));
}
if self.request == 0 {
return Err(ConsensusError::InvalidField(
"non-register: request must be > 0".to_string(),
));
}
}
Ok(())
}
}
Expand Down Expand Up @@ -670,8 +697,9 @@ impl ConsensusHeader for StartViewHeader {
#[cfg(test)]
mod tests {
use super::{
Command2, CommitHeader, ConsensusHeader, DoViewChangeHeader, GenericHeader, PrepareHeader,
PrepareOkHeader, ReplyHeader, RequestHeader, StartViewChangeHeader, StartViewHeader,
Command2, CommitHeader, ConsensusHeader, DoViewChangeHeader, GenericHeader, Operation,
PrepareHeader, PrepareOkHeader, ReplyHeader, RequestHeader, StartViewChangeHeader,
StartViewHeader,
};

fn aligned_zeroed(size: usize) -> bytes::BytesMut {
Expand Down Expand Up @@ -715,6 +743,66 @@ mod tests {
assert!(header.validate().is_err());
}

#[test]
fn request_register_nonzero_session_rejected() {
let header = RequestHeader {
command: Command2::Request,
operation: Operation::Register,
session: 5,
request: 0,
..RequestHeader::default()
};
assert!(header.validate().is_err());
}

#[test]
fn request_register_nonzero_request_rejected() {
let header = RequestHeader {
command: Command2::Request,
operation: Operation::Register,
session: 0,
request: 1,
..RequestHeader::default()
};
assert!(header.validate().is_err());
}

#[test]
fn request_non_register_valid() {
let header = RequestHeader {
command: Command2::Request,
operation: Operation::SendMessages,
session: 10,
request: 1,
..RequestHeader::default()
};
assert!(header.validate().is_ok());
}

#[test]
fn request_non_register_zero_session_rejected() {
let header = RequestHeader {
command: Command2::Request,
operation: Operation::SendMessages,
session: 0,
request: 1,
..RequestHeader::default()
};
assert!(header.validate().is_err());
}

#[test]
fn request_non_register_zero_request_rejected() {
let header = RequestHeader {
command: Command2::Request,
operation: Operation::SendMessages,
session: 10,
request: 0,
..RequestHeader::default()
};
assert!(header.validate().is_err());
}

#[test]
fn reply_header_zero_copy() {
let mut buf = aligned_zeroed(256);
Expand Down
31 changes: 29 additions & 2 deletions core/binary_protocol/src/consensus/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,17 @@ use bytemuck::{CheckedBitPattern, NoUninit};
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, NoUninit, CheckedBitPattern)]
#[repr(u8)]
pub enum Operation {
/// The value 0 is reserved to prevent a spurious zero from being
/// interpreted as a valid operation.
#[default]
Reserved = 0,

/// Register a client session with the cluster. Goes through the same
/// consensus pipeline (prepare/replicate/commit) as normal operations
/// but skips state machine dispatch at commit time, the metadata
/// plane calls `commit_register` directly. Session number = commit op.
Register = 1,

// Metadata operations (shard 0)
CreateStream = 128,
UpdateStream = 129,
Expand Down Expand Up @@ -85,6 +93,14 @@ impl Operation {
)
}

/// VSR protocol-level operations that go through consensus but skip
/// state machine dispatch at commit time.
#[must_use]
#[inline]
pub const fn is_vsr_reserved(&self) -> bool {
matches!(self, Self::Reserved | Self::Register)
}

/// Data-plane operations routed to the shard owning the partition.
#[must_use]
#[inline]
Expand All @@ -104,7 +120,7 @@ impl Operation {
#[must_use]
pub const fn to_command_code(&self) -> Option<u32> {
match self {
Self::Reserved => None,
Self::Reserved | Self::Register => None,
Self::CreateStream
| Self::UpdateStream
| Self::DeleteStream
Expand Down Expand Up @@ -188,8 +204,19 @@ mod tests {
}

#[test]
fn reserved_has_no_code() {
fn vsr_reserved_have_no_code() {
assert_eq!(Operation::Reserved.to_command_code(), None);
assert_eq!(Operation::Register.to_command_code(), None);
}

#[test]
fn vsr_reserved_classification() {
assert!(Operation::Reserved.is_vsr_reserved());
assert!(Operation::Register.is_vsr_reserved());
assert!(!Operation::CreateStream.is_vsr_reserved());
assert!(!Operation::SendMessages.is_vsr_reserved());
assert!(!Operation::Register.is_metadata());
assert!(!Operation::Register.is_partition());
}

#[test]
Expand Down
107 changes: 57 additions & 50 deletions core/binary_protocol/src/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ pub const COMMAND_TABLE: &[CommandMeta] = &[
),
CommandMeta::non_replicated(LOGIN_USER_CODE, "user.login"),
CommandMeta::non_replicated(LOGOUT_USER_CODE, "user.logout"),
CommandMeta::non_replicated(LOGIN_REGISTER_CODE, "user.login_register"),
// Personal Access Tokens
CommandMeta::non_replicated(
GET_PERSONAL_ACCESS_TOKENS_CODE,
Expand Down Expand Up @@ -171,6 +172,8 @@ pub const COMMAND_TABLE: &[CommandMeta] = &[
),
CommandMeta::non_replicated(JOIN_CONSUMER_GROUP_CODE, "consumer_group.join"),
CommandMeta::non_replicated(LEAVE_CONSUMER_GROUP_CODE, "consumer_group.leave"),
// Login + Register (PAT - Personal Access Token variant)
CommandMeta::non_replicated(LOGIN_REGISTER_WITH_PAT_CODE, "user.login_register_with_pat"),
];

/// Lookup command metadata by command code.
Expand Down Expand Up @@ -198,37 +201,39 @@ pub const fn lookup_command(code: u32) -> Option<&'static CommandMeta> {
CHANGE_PASSWORD_CODE => 13,
LOGIN_USER_CODE => 14,
LOGOUT_USER_CODE => 15,
GET_PERSONAL_ACCESS_TOKENS_CODE => 16,
CREATE_PERSONAL_ACCESS_TOKEN_CODE => 17,
DELETE_PERSONAL_ACCESS_TOKEN_CODE => 18,
LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE => 19,
POLL_MESSAGES_CODE => 20,
SEND_MESSAGES_CODE => 21,
FLUSH_UNSAVED_BUFFER_CODE => 22,
GET_CONSUMER_OFFSET_CODE => 23,
STORE_CONSUMER_OFFSET_CODE => 24,
DELETE_CONSUMER_OFFSET_CODE => 25,
GET_STREAM_CODE => 26,
GET_STREAMS_CODE => 27,
CREATE_STREAM_CODE => 28,
DELETE_STREAM_CODE => 29,
UPDATE_STREAM_CODE => 30,
PURGE_STREAM_CODE => 31,
GET_TOPIC_CODE => 32,
GET_TOPICS_CODE => 33,
CREATE_TOPIC_CODE => 34,
DELETE_TOPIC_CODE => 35,
UPDATE_TOPIC_CODE => 36,
PURGE_TOPIC_CODE => 37,
CREATE_PARTITIONS_CODE => 38,
DELETE_PARTITIONS_CODE => 39,
DELETE_SEGMENTS_CODE => 40,
GET_CONSUMER_GROUP_CODE => 41,
GET_CONSUMER_GROUPS_CODE => 42,
CREATE_CONSUMER_GROUP_CODE => 43,
DELETE_CONSUMER_GROUP_CODE => 44,
JOIN_CONSUMER_GROUP_CODE => 45,
LEAVE_CONSUMER_GROUP_CODE => 46,
LOGIN_REGISTER_CODE => 16,
GET_PERSONAL_ACCESS_TOKENS_CODE => 17,
CREATE_PERSONAL_ACCESS_TOKEN_CODE => 18,
DELETE_PERSONAL_ACCESS_TOKEN_CODE => 19,
LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE => 20,
POLL_MESSAGES_CODE => 21,
SEND_MESSAGES_CODE => 22,
FLUSH_UNSAVED_BUFFER_CODE => 23,
GET_CONSUMER_OFFSET_CODE => 24,
STORE_CONSUMER_OFFSET_CODE => 25,
DELETE_CONSUMER_OFFSET_CODE => 26,
GET_STREAM_CODE => 27,
GET_STREAMS_CODE => 28,
CREATE_STREAM_CODE => 29,
DELETE_STREAM_CODE => 30,
UPDATE_STREAM_CODE => 31,
PURGE_STREAM_CODE => 32,
GET_TOPIC_CODE => 33,
GET_TOPICS_CODE => 34,
CREATE_TOPIC_CODE => 35,
DELETE_TOPIC_CODE => 36,
UPDATE_TOPIC_CODE => 37,
PURGE_TOPIC_CODE => 38,
CREATE_PARTITIONS_CODE => 39,
DELETE_PARTITIONS_CODE => 40,
DELETE_SEGMENTS_CODE => 41,
GET_CONSUMER_GROUP_CODE => 42,
GET_CONSUMER_GROUPS_CODE => 43,
CREATE_CONSUMER_GROUP_CODE => 44,
DELETE_CONSUMER_GROUP_CODE => 45,
JOIN_CONSUMER_GROUP_CODE => 46,
LEAVE_CONSUMER_GROUP_CODE => 47,
LOGIN_REGISTER_WITH_PAT_CODE => 48,
_ => return None,
};
Some(&COMMAND_TABLE[idx])
Expand All @@ -242,30 +247,30 @@ pub const fn lookup_command(code: u32) -> Option<&'static CommandMeta> {
pub const fn lookup_by_operation(op: Operation) -> Option<&'static CommandMeta> {
// Indices must match the order of entries in COMMAND_TABLE above.
let idx = match op {
Operation::CreateStream => 28,
Operation::UpdateStream => 30,
Operation::DeleteStream => 29,
Operation::PurgeStream => 31,
Operation::CreateTopic => 34,
Operation::UpdateTopic => 36,
Operation::DeleteTopic => 35,
Operation::PurgeTopic => 37,
Operation::CreatePartitions => 38,
Operation::DeletePartitions => 39,
Operation::DeleteSegments => 40,
Operation::CreateConsumerGroup => 43,
Operation::DeleteConsumerGroup => 44,
Operation::CreateStream => 29,
Operation::UpdateStream => 31,
Operation::DeleteStream => 30,
Operation::PurgeStream => 32,
Operation::CreateTopic => 35,
Operation::UpdateTopic => 37,
Operation::DeleteTopic => 36,
Operation::PurgeTopic => 38,
Operation::CreatePartitions => 39,
Operation::DeletePartitions => 40,
Operation::DeleteSegments => 41,
Operation::CreateConsumerGroup => 44,
Operation::DeleteConsumerGroup => 45,
Operation::CreateUser => 9,
Operation::UpdateUser => 11,
Operation::DeleteUser => 10,
Operation::ChangePassword => 13,
Operation::UpdatePermissions => 12,
Operation::CreatePersonalAccessToken => 17,
Operation::DeletePersonalAccessToken => 18,
Operation::SendMessages => 21,
Operation::StoreConsumerOffset => 24,
Operation::DeleteConsumerOffset => 25,
Operation::Reserved => return None,
Operation::CreatePersonalAccessToken => 18,
Operation::DeletePersonalAccessToken => 19,
Operation::SendMessages => 22,
Operation::StoreConsumerOffset => 25,
Operation::DeleteConsumerOffset => 26,
Operation::Reserved | Operation::Register => return None,
};
Some(&COMMAND_TABLE[idx])
}
Expand Down Expand Up @@ -293,6 +298,8 @@ mod tests {
CHANGE_PASSWORD_CODE,
LOGIN_USER_CODE,
LOGOUT_USER_CODE,
LOGIN_REGISTER_CODE,
LOGIN_REGISTER_WITH_PAT_CODE,
GET_PERSONAL_ACCESS_TOKENS_CODE,
CREATE_PERSONAL_ACCESS_TOKEN_CODE,
DELETE_PERSONAL_ACCESS_TOKEN_CODE,
Expand Down
Loading
Loading