diff --git a/crates/matrix-sdk-crypto/CHANGELOG.md b/crates/matrix-sdk-crypto/CHANGELOG.md index d73f45c04b5..367072cc4fb 100644 --- a/crates/matrix-sdk-crypto/CHANGELOG.md +++ b/crates/matrix-sdk-crypto/CHANGELOG.md @@ -8,6 +8,8 @@ All notable changes to this project will be documented in this file. ### Features +- When we receive an inbound Megolm session from two different sources, merge the two copies together to get the best of both. + ([#5865](https://github.com/matrix-org/matrix-rust-sdk/pull/5865) - When constructing a key bundle for history sharing, if we had received a key bundle ourselves, in which one or more sessions was marked as "history not shared", pass that on to the new user. ([#5820](https://github.com/matrix-org/matrix-rust-sdk/pull/5820) - Expose new method `CryptoStore::get_withheld_sessions_by_room_id`. diff --git a/crates/matrix-sdk-crypto/src/gossiping/machine.rs b/crates/matrix-sdk-crypto/src/gossiping/machine.rs index 68aff8f65eb..d25fd8e58e6 100644 --- a/crates/matrix-sdk-crypto/src/gossiping/machine.rs +++ b/crates/matrix-sdk-crypto/src/gossiping/machine.rs @@ -39,7 +39,7 @@ use ruma::{ TransactionId, UserId, }; use tracing::{debug, field::debug, info, instrument, trace, warn, Span}; -use vodozemac::{megolm::SessionOrdering, Curve25519PublicKey}; +use vodozemac::Curve25519PublicKey; use super::{GossipRequest, GossippedSecret, RequestEvent, RequestInfo, SecretInfo, WaitQueue}; use crate::{ @@ -964,6 +964,7 @@ impl GossipMachine { Ok(name) } + #[tracing::instrument(skip_all)] async fn accept_forwarded_room_key( &self, info: &GossipRequest, @@ -972,33 +973,11 @@ impl GossipMachine { ) -> Result, CryptoStoreError> { match InboundGroupSession::try_from(event) { Ok(session) => { - if self.inner.store.compare_group_session(&session).await? - == SessionOrdering::Better - { + let new_session = self.inner.store.merge_received_group_session(session).await?; + if new_session.is_some() { self.mark_as_done(info).await?; - - info!( - ?sender_key, - claimed_sender_key = ?session.sender_key(), - room_id = ?session.room_id(), - session_id = session.session_id(), - algorithm = ?session.algorithm(), - "Received a forwarded room key", - ); - - Ok(Some(session)) - } else { - info!( - ?sender_key, - claimed_sender_key = ?session.sender_key(), - room_id = ?session.room_id(), - session_id = session.session_id(), - algorithm = ?session.algorithm(), - "Received a forwarded room key but we already have a better version of it", - ); - - Ok(None) } + Ok(new_session) } Err(e) => { warn!(?sender_key, "Couldn't create a group session from a received room key"); diff --git a/crates/matrix-sdk-crypto/src/machine/mod.rs b/crates/matrix-sdk-crypto/src/machine/mod.rs index bbb3ec31fdf..140febbd33a 100644 --- a/crates/matrix-sdk-crypto/src/machine/mod.rs +++ b/crates/matrix-sdk-crypto/src/machine/mod.rs @@ -62,10 +62,7 @@ use tracing::{ field::{debug, display}, info, instrument, trace, warn, Span, }; -use vodozemac::{ - megolm::{DecryptionError, SessionOrdering}, - Curve25519PublicKey, Ed25519Signature, -}; +use vodozemac::{megolm::DecryptionError, Curve25519PublicKey, Ed25519Signature}; #[cfg(feature = "experimental-send-custom-to-device")] use crate::session_manager::split_devices_for_share_strategy; @@ -916,25 +913,9 @@ impl OlmMachine { let sender_data = SenderDataFinder::find_using_event(self.store(), sender_key, event, &session) .await?; - session.sender_data = sender_data; - match self.store().compare_group_session(&session).await? { - SessionOrdering::Better => { - info!("Received a new megolm room key"); - - Ok(Some(session)) - } - comparison_result => { - warn!( - ?comparison_result, - "Received a megolm room key that we already have a better version \ - of, discarding" - ); - - Ok(None) - } - } + Ok(self.store().merge_received_group_session(session).await?) } Err(e) => { Span::current().record("session_id", &content.session_id); diff --git a/crates/matrix-sdk-crypto/src/olm/group_sessions/inbound.rs b/crates/matrix-sdk-crypto/src/olm/group_sessions/inbound.rs index 755426efe0f..13667259e2f 100644 --- a/crates/matrix-sdk-crypto/src/olm/group_sessions/inbound.rs +++ b/crates/matrix-sdk-crypto/src/olm/group_sessions/inbound.rs @@ -340,6 +340,36 @@ impl InboundGroupSession { Self::try_from(exported_session) } + /// Create a new [`InboundGroupSession`] which is a copy of this one, except + /// that its Megolm ratchet is replaced with a copy of that from another + /// [`InboundGroupSession`]. + /// + /// This can be useful, for example, when we receive a new copy of the room + /// key, but at an earlier ratchet index. + /// + /// # Panics + /// + /// If the two sessions are for different room IDs, or have different + /// session IDs, this function will panic. It is up to the caller to ensure + /// that it only attempts to merge related sessions. + pub(crate) fn with_ratchet(mut self, other: &InboundGroupSession) -> Self { + if self.session_id != other.session_id { + panic!( + "Attempt to merge Megolm sessions with different session IDs: {} vs {}", + self.session_id, other.session_id + ); + } + if self.room_id != other.room_id { + panic!( + "Attempt to merge Megolm sessions with different room IDs: {} vs {}", + self.room_id, other.room_id, + ); + } + self.inner = other.inner.clone(); + self.first_known_index = other.first_known_index; + self + } + /// Convert the [`InboundGroupSession`] into a /// [`PickledInboundGroupSession`] which can be serialized. pub async fn pickle(&self) -> PickledInboundGroupSession { @@ -488,7 +518,34 @@ impl InboundGroupSession { /// Check if the [`InboundGroupSession`] is better than the given other /// [`InboundGroupSession`] + #[deprecated( + note = "Sessions cannot be compared on a linear scale. Consider calling `compare_ratchet`, as well as comparing the `sender_data`." + )] pub async fn compare(&self, other: &InboundGroupSession) -> SessionOrdering { + match self.compare_ratchet(other).await { + SessionOrdering::Equal => { + match self.sender_data.compare_trust_level(&other.sender_data) { + Ordering::Less => SessionOrdering::Worse, + Ordering::Equal => SessionOrdering::Equal, + Ordering::Greater => SessionOrdering::Better, + } + } + result => result, + } + } + + /// Check if the [`InboundGroupSession`]'s ratchet index is better than that + /// of the given other [`InboundGroupSession`]. + /// + /// If the two sessions are not connected (i.e., they are from different + /// senders, or if advancing the ratchets to the same index does not + /// give the same ratchet value), returns [`SessionOrdering::Unconnected`]. + /// + /// Otherwise, returns [`SessionOrdering::Equal`], + /// [`SessionOrdering::Better`], or [`SessionOrdering::Worse`] respectively + /// depending on whether this session's first known index is equal to, + /// lower than, or higher than, that of `other`. + pub async fn compare_ratchet(&self, other: &InboundGroupSession) -> SessionOrdering { // If this is the same object the ordering is the same, we can't compare because // we would deadlock while trying to acquire the same lock twice. if Arc::ptr_eq(&self.inner, &other.inner) { @@ -501,17 +558,7 @@ impl InboundGroupSession { SessionOrdering::Unconnected } else { let mut other_inner = other.inner.lock().await; - - match self.inner.lock().await.compare(&mut other_inner) { - SessionOrdering::Equal => { - match self.sender_data.compare_trust_level(&other.sender_data) { - Ordering::Less => SessionOrdering::Worse, - Ordering::Equal => SessionOrdering::Equal, - Ordering::Greater => SessionOrdering::Better, - } - } - result => result, - } + self.inner.lock().await.compare(&mut other_inner) } } @@ -1057,6 +1104,7 @@ mod tests { } #[async_test] + #[allow(deprecated)] async fn test_session_comparison() { let alice = Account::with_device_id(alice_id(), alice_device_id()); let room_id = room_id!("!test:localhost"); @@ -1067,18 +1115,24 @@ mod tests { let mut copy = InboundGroupSession::from_pickle(inbound.pickle().await).unwrap(); assert_eq!(inbound.compare(&worse).await, SessionOrdering::Better); + assert_eq!(inbound.compare_ratchet(&worse).await, SessionOrdering::Better); assert_eq!(worse.compare(&inbound).await, SessionOrdering::Worse); + assert_eq!(worse.compare_ratchet(&inbound).await, SessionOrdering::Worse); assert_eq!(inbound.compare(&inbound).await, SessionOrdering::Equal); + assert_eq!(inbound.compare_ratchet(&inbound).await, SessionOrdering::Equal); assert_eq!(inbound.compare(©).await, SessionOrdering::Equal); + assert_eq!(inbound.compare_ratchet(©).await, SessionOrdering::Equal); copy.creator_info.curve25519_key = Curve25519PublicKey::from_base64("XbmrPa1kMwmdtNYng1B2gsfoo8UtF+NklzsTZiaVKyY") .unwrap(); assert_eq!(inbound.compare(©).await, SessionOrdering::Unconnected); + assert_eq!(inbound.compare_ratchet(©).await, SessionOrdering::Unconnected); } #[async_test] + #[allow(deprecated)] async fn test_session_comparison_sender_data() { let alice = Account::with_device_id(alice_id(), alice_device_id()); let room_id = room_id!("!test:localhost"); diff --git a/crates/matrix-sdk-crypto/src/store/mod.rs b/crates/matrix-sdk-crypto/src/store/mod.rs index 68b0cbd4dec..d98f49cff41 100644 --- a/crates/matrix-sdk-crypto/src/store/mod.rs +++ b/crates/matrix-sdk-crypto/src/store/mod.rs @@ -634,27 +634,92 @@ impl Store { self.inner.store.save_changes(changes).await } - /// Compare the given `InboundGroupSession` with an existing session we have - /// in the store. - /// - /// This method returns `SessionOrdering::Better` if the given session is - /// better than the one we already have or if we don't have such a - /// session in the store. - pub(crate) async fn compare_group_session( + /// Given an `InboundGroupSession` which we have just received, see if we + /// have a matching session already in the store, and determine how to + /// handle it. + /// + /// If the store already has everything we can gather from the new session, + /// returns `None`. Otherwise, returns a merged session which should be + /// persisted to the store. + pub(crate) async fn merge_received_group_session( &self, - session: &InboundGroupSession, - ) -> Result { + session: InboundGroupSession, + ) -> Result> { let old_session = self .inner .store .get_inbound_group_session(session.room_id(), session.session_id()) .await?; - Ok(if let Some(old_session) = old_session { - session.compare(&old_session).await - } else { - SessionOrdering::Better - }) + // If there is no old session, just use the new session. + let Some(old_session) = old_session else { + info!("Received a new megolm room key"); + return Ok(Some(session)); + }; + + let index_comparison = session.compare_ratchet(&old_session).await; + let trust_level_comparison = + session.sender_data.compare_trust_level(&old_session.sender_data); + + let result = match (index_comparison, trust_level_comparison) { + (SessionOrdering::Unconnected, _) => { + // If this happens, it means that we have two sessions purporting to have the + // same session id, but where the ratchets do not match up. + // In other words, someone is playing silly buggers. + warn!("Received a group session with an ratchet that does not connect to the one in the store, discarding"); + None + } + + (SessionOrdering::Better, std::cmp::Ordering::Greater) + | (SessionOrdering::Better, std::cmp::Ordering::Equal) + | (SessionOrdering::Equal, std::cmp::Ordering::Greater) => { + // The new session is unambiguously better than what we have in the store. + info!( + ?index_comparison, + ?trust_level_comparison, + "Received a megolm room key that we have a worse version of, merging" + ); + Some(session) + } + + (SessionOrdering::Worse, std::cmp::Ordering::Less) + | (SessionOrdering::Worse, std::cmp::Ordering::Equal) + | (SessionOrdering::Equal, std::cmp::Ordering::Less) => { + // The new session is unambiguously worse than the one we have in the store. + warn!( + ?index_comparison, + ?trust_level_comparison, + "Received a megolm room key that we already have a better version \ + of, discarding" + ); + None + } + + (SessionOrdering::Equal, std::cmp::Ordering::Equal) => { + // The new session is the same as what we have. + info!("Received a megolm room key that we already have, discarding"); + None + } + + (SessionOrdering::Better, std::cmp::Ordering::Less) => { + // We need to take the ratchet from the new session, and the + // sender data from the old session. + info!("Upgrading a previously-received megolm session with new ratchet"); + let result = old_session.with_ratchet(&session); + // We'll need to back it up again. + result.reset_backup_state(); + Some(result) + } + + (SessionOrdering::Worse, std::cmp::Ordering::Greater) => { + // We need to take the ratchet from the old session, and the + // sender data from the new session. + info!("Upgrading a previously-received megolm session with new sender data"); + Some(session.with_ratchet(&old_session)) + } + }; + + Ok(result) } #[cfg(test)] @@ -1438,43 +1503,26 @@ impl Store { { let mut sessions = Vec::new(); - async fn new_session_better( - session: &InboundGroupSession, - old_session: Option, - ) -> bool { - if let Some(old_session) = &old_session { - session.compare(old_session).await == SessionOrdering::Better - } else { - true - } - } - let total_count = room_keys.len(); let mut keys = BTreeMap::new(); for (i, key) in room_keys.into_iter().enumerate() { match key.try_into() { Ok(session) => { - let old_session = self - .inner - .store - .get_inbound_group_session(session.room_id(), session.session_id()) - .await?; - // Only import the session if we didn't have this session or // if it's a better version of the same session. - if new_session_better(&session, old_session).await { + if let Some(merged) = self.merge_received_group_session(session).await? { if from_backup_version.is_some() { - session.mark_as_backed_up(); + merged.mark_as_backed_up(); } - keys.entry(session.room_id().to_owned()) + keys.entry(merged.room_id().to_owned()) .or_insert_with(BTreeMap::new) - .entry(session.sender_key().to_base64()) + .entry(merged.sender_key().to_base64()) .or_insert_with(BTreeSet::new) - .insert(session.session_id().to_owned()); + .insert(merged.session_id().to_owned()); - sessions.push(session); + sessions.push(merged); } } Err(e) => { @@ -1799,9 +1847,9 @@ impl matrix_sdk_common::cross_process_lock::TryLock for LockableCryptoStore { #[cfg(test)] mod tests { - use std::pin::pin; + use std::{collections::BTreeMap, pin::pin}; - use assert_matches2::assert_matches; + use assert_matches2::{assert_let, assert_matches}; use futures_util::StreamExt; use insta::{_macro_support::Content, assert_json_snapshot, internals::ContentPath}; use matrix_sdk_test::async_test; @@ -1813,7 +1861,7 @@ mod tests { user_id, RoomId, }; use serde_json::json; - use vodozemac::megolm::SessionKey; + use vodozemac::{megolm::SessionKey, Ed25519Keypair}; use crate::{ machine::test_helpers::get_machine_pair, @@ -1826,9 +1874,144 @@ mod tests { }, EventEncryptionAlgorithm, }, - OlmMachine, + Account, OlmMachine, }; + #[async_test] + async fn test_merge_received_group_session() { + let alice_account = Account::with_device_id(user_id!("@a:s.co"), device_id!("ABC")); + let bob = OlmMachine::new(user_id!("@b:s.co"), device_id!("DEF")).await; + + let room_id = room_id!("!test:localhost"); + + let megolm_signing_key = Ed25519Keypair::new(); + let inbound = make_inbound_group_session(&alice_account, &megolm_signing_key, room_id); + + // Bob already knows about the session, at index 5, with the device keys. + let mut inbound_at_index_5 = + InboundGroupSession::from_export(&inbound.export_at_index(5).await).unwrap(); + inbound_at_index_5.sender_data = inbound.sender_data.clone(); + bob.store().save_inbound_group_sessions(&[inbound_at_index_5.clone()]).await.unwrap(); + + // No changes if we get a disconnected session. + let disconnected = make_inbound_group_session(&alice_account, &megolm_signing_key, room_id); + assert_eq!(bob.store().merge_received_group_session(disconnected).await.unwrap(), None); + + // No changes needed when we receive a worse copy of the session + let mut worse = + InboundGroupSession::from_export(&inbound.export_at_index(10).await).unwrap(); + worse.sender_data = inbound.sender_data.clone(); + assert_eq!(bob.store().merge_received_group_session(worse).await.unwrap(), None); + + // Nor when we receive an exact copy of what we already have + let mut copy = InboundGroupSession::from_pickle(inbound_at_index_5.pickle().await).unwrap(); + copy.sender_data = inbound.sender_data.clone(); + assert_eq!(bob.store().merge_received_group_session(copy).await.unwrap(), None); + + // But when we receive a better copy of the session, we should get it back + let mut better = + InboundGroupSession::from_export(&inbound.export_at_index(0).await).unwrap(); + better.sender_data = inbound.sender_data.clone(); + assert_let!(Some(update) = bob.store().merge_received_group_session(better).await.unwrap()); + assert_eq!(update.first_known_index(), 0); + + // A worse copy of the ratchet, but better trust data + { + let mut worse_ratchet_better_trust = + InboundGroupSession::from_export(&inbound.export_at_index(10).await).unwrap(); + let updated_sender_data = SenderData::sender_verified( + alice_account.user_id(), + alice_account.device_id(), + Ed25519Keypair::new().public_key(), + ); + worse_ratchet_better_trust.sender_data = updated_sender_data.clone(); + assert_let!( + Some(update) = bob + .store() + .merge_received_group_session(worse_ratchet_better_trust) + .await + .unwrap() + ); + assert_eq!(update.sender_data, updated_sender_data); + assert_eq!(update.first_known_index(), 5); + assert_eq!( + update.export_at_index(0).await.session_key.to_bytes(), + inbound.export_at_index(5).await.session_key.to_bytes() + ); + } + + // A better copy of the ratchet, but worse trust data + { + let mut better_ratchet_worse_trust = + InboundGroupSession::from_export(&inbound.export_at_index(0).await).unwrap(); + let updated_sender_data = SenderData::unknown(); + better_ratchet_worse_trust.sender_data = updated_sender_data.clone(); + assert_let!( + Some(update) = bob + .store() + .merge_received_group_session(better_ratchet_worse_trust) + .await + .unwrap() + ); + assert_eq!(update.sender_data, inbound.sender_data); + assert_eq!(update.first_known_index(), 0); + assert_eq!( + update.export_at_index(0).await.session_key.to_bytes(), + inbound.export_at_index(0).await.session_key.to_bytes() + ); + } + } + + /// Create an [`InboundGroupSession`] for the given room, using the given + /// Ed25519 key as the signing key/session ID. + fn make_inbound_group_session( + sender_account: &Account, + signing_key: &Ed25519Keypair, + room_id: &RoomId, + ) -> InboundGroupSession { + InboundGroupSession::new( + sender_account.identity_keys.curve25519, + sender_account.identity_keys.ed25519, + room_id, + &make_session_key(signing_key), + SenderData::device_info(crate::types::DeviceKeys::new( + sender_account.user_id().to_owned(), + sender_account.device_id().to_owned(), + vec![], + BTreeMap::new(), + crate::types::Signatures::new(), + )), + EventEncryptionAlgorithm::MegolmV1AesSha2, + Some(ruma::events::room::history_visibility::HistoryVisibility::Shared), + true, + ) + .unwrap() + } + + /// Make a Megolm [`SessionKey`] using the given Ed25519 key as a signing + /// key/session ID. + fn make_session_key(signing_key: &Ed25519Keypair) -> SessionKey { + use rand::Rng; + + // `SessionKey::new` is not public, so the easiest way to construct a Megolm + // session using a known Ed25519 key is to build a byte array in the export + // format. + + let mut session_key_bytes = vec![0u8; 229]; + // 0: version + session_key_bytes[0] = 2; + // 1..5: index + // 5..133: ratchet key + rand::thread_rng().fill(&mut session_key_bytes[5..133]); + // 133..165: public ed25519 key + session_key_bytes[133..165].copy_from_slice(signing_key.public_key().as_bytes()); + // 165..229: signature + let sig = signing_key.sign(&session_key_bytes[0..165]); + session_key_bytes[165..229].copy_from_slice(&sig.to_bytes()); + + SessionKey::from_bytes(&session_key_bytes).unwrap() + } + #[async_test] async fn test_import_room_keys_notifies_stream() { use futures_util::FutureExt; diff --git a/testing/matrix-sdk-integration-testing/src/tests/e2ee/shared_history.rs b/testing/matrix-sdk-integration-testing/src/tests/e2ee/shared_history.rs index 9ff287462e1..2d5d951a104 100644 --- a/testing/matrix-sdk-integration-testing/src/tests/e2ee/shared_history.rs +++ b/testing/matrix-sdk-integration-testing/src/tests/e2ee/shared_history.rs @@ -25,7 +25,8 @@ use matrix_sdk::{ }; use matrix_sdk_base::crypto::types::events::UtdCause; use matrix_sdk_common::deserialized_responses::{ - ProcessedToDeviceEvent, UnableToDecryptReason::MissingMegolmSession, WithheldCode, + DeviceLinkProblem, ProcessedToDeviceEvent, UnableToDecryptReason::MissingMegolmSession, + VerificationLevel, VerificationState, WithheldCode, }; use matrix_sdk_ui::{ Timeline, @@ -553,6 +554,148 @@ async fn test_transitive_history_share_with_withhelds() -> Result<()> { Ok(()) } +/// Test megolm session merging with history sharing +/// +/// 1. Alice and Bob share a room +/// 2. Bob sends a message +/// 3. Alice invites Charlie, sharing the history +/// 4. Charlie can see Bob's message, but the sender is unauthenticated. +/// 5. Bob sends another message (on the same session) +/// 6. Charlie can now decrypt both of Bob's messages, with authenticated +/// sender. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_history_sharing_session_merging() -> Result<()> { + let alice_span = tracing::info_span!("alice"); + let bob_span = tracing::info_span!("bob"); + let charlie_span = tracing::info_span!("charlie"); + + let alice = create_encryption_enabled_client("alice").instrument(alice_span.clone()).await?; + let bob = create_encryption_enabled_client("bob").instrument(bob_span.clone()).await?; + let charlie = + create_encryption_enabled_client("charlie").instrument(charlie_span.clone()).await?; + + // 1. Alice creates a room, and enables encryption + let alice_room = alice + .create_room(assign!(CreateRoomRequest::new(), { + preset: Some(RoomPreset::PublicChat), + })) + .instrument(alice_span.clone()) + .await?; + let alice_timeline = alice_room.timeline().await?; + + alice_room.enable_encryption().instrument(alice_span.clone()).await?; + info!(room_id = ?alice_room.room_id(), "Alice has created and enabled encryption in the room"); + + // ... and invites Bob to the room + alice_room.invite_user_by_id(bob.user_id().unwrap()).instrument(alice_span.clone()).await?; + + // Bob joins + bob.sync_once().instrument(bob_span.clone()).await?; + + let bob_room = bob + .join_room_by_id(alice_room.room_id()) + .instrument(bob_span.clone()) + .await + .expect("Bob should be able to accept the invitation from Alice"); + + // 2. Bob sends a message, which Alice should receive + let bob_send_test_event = async |event_content: &str| { + let bob_event_id = bob_room + .send(RoomMessageEventContent::text_plain(event_content)) + .into_future() + .instrument(bob_span.clone()) + .await + .expect("We should be able to send a message to the room") + .event_id; + + alice + .sync_once() + .instrument(alice_span.clone()) + .await + .expect("Alice should be able to sync"); + + assert_event_received(&alice_timeline, &bob_event_id, event_content).await; + + bob_event_id + }; + + let event_id_1 = bob_send_test_event("Event 1").await; + + // 3. Alice invites Charlie. + alice_room.invite_user_by_id(charlie.user_id().unwrap()).instrument(alice_span.clone()).await?; + + // Workaround for https://github.com/matrix-org/matrix-rust-sdk/issues/5770: Charlie needs a copy of + // Alice's identity. + charlie + .encryption() + .request_user_identity(alice.user_id().unwrap()) + .instrument(charlie_span.clone()) + .await?; + + charlie.sync_once().instrument(charlie_span.clone()).await?; + let charlie_room = charlie + .join_room_by_id(alice_room.room_id()) + .instrument(charlie_span.clone()) + .await + .expect("Charlie should be able to accept the invitation from Alice"); + + // 4. Charlie can see Bob's message, but the sender is unauthenticated. + let charlie_timeline = charlie_room.timeline().await?; + charlie.sync_once().instrument(charlie_span.clone()).await?; + let received_event = assert_event_received(&charlie_timeline, &event_id_1, "Event 1").await; + assert_eq!( + received_event + .as_event() + .unwrap() + .encryption_info() + .expect("Received event should be encrypted") + .verification_state, + VerificationState::Unverified(VerificationLevel::None(DeviceLinkProblem::InsecureSource)) + ); + + // 5. Bob sends another message (on the same session) + bob.sync_once().instrument(bob_span.clone()).await?; + // Sanity: make sure Bob knows that Charlie has joined + bob_room + .get_member_no_sync(charlie.user_id().unwrap()) + .instrument(bob_span.clone()) + .await? + .expect("Bob should see Charlie in the room"); + let event_id_2 = bob_send_test_event("Event 2").await; + + // 6. Charlie can now decrypt both of Bob's messages, with authenticated sender + let mut charlie_room_stream = charlie.encryption().room_keys_received_stream().await.unwrap(); + charlie.sync_once().instrument(charlie_span.clone()).await?; + + // Make sure we're decrypting with the newly-received keys. + assert_next_with_timeout!(&mut charlie_room_stream).expect("charlie should receive room keys"); + + let received_event = assert_event_received(&charlie_timeline, &event_id_2, "Event 2").await; + assert_eq!( + received_event + .as_event() + .unwrap() + .encryption_info() + .expect("Received event should be encrypted") + .verification_state, + VerificationState::Unverified(VerificationLevel::UnverifiedIdentity) + ); + + // The earlier event should now have a better verification status. + let received_event = assert_event_received(&charlie_timeline, &event_id_1, "Event 1").await; + assert_eq!( + received_event + .as_event() + .unwrap() + .encryption_info() + .expect("Received event should be encrypted") + .verification_state, + VerificationState::Unverified(VerificationLevel::UnverifiedIdentity) + ); + + Ok(()) +} + async fn create_encryption_enabled_client(username: &str) -> Result { let encryption_settings = EncryptionSettings { auto_enable_cross_signing: true, ..Default::default() }; @@ -624,7 +767,11 @@ async fn wait_for_timeline_event( * Wait for the given event to arrive in the timeline, and assert that its * content matches that given. */ -async fn assert_event_received(timeline: &Timeline, event_id: &EventId, expected_content: &str) { +async fn assert_event_received( + timeline: &Timeline, + event_id: &EventId, + expected_content: &str, +) -> Arc { let timeline_item = wait_for_timeline_event(timeline, event_id).await.unwrap_or_else(|| { panic!("Timeout waiting for event {event_id} with content {expected_content} to arrive") }); @@ -639,6 +786,8 @@ async fn assert_event_received(timeline: &Timeline, event_id: &EventId, expected expected_content, "The decrypted event should match the message Bob has sent" ); + + timeline_item } /**