use std::collections::BTreeMap;
#[cfg(feature = "e2e-encryption")]
use std::ops::Deref;
use matrix_sdk_common::deserialized_responses::SyncTimelineEvent;
#[cfg(feature = "e2e-encryption")]
use ruma::events::AnyToDeviceEvent;
use ruma::{
api::client::{
push::get_notifications::v3::Notification,
sync::sync_events::{
v3::{self, InvitedRoom, RoomSummary},
v4,
},
},
events::{AnySyncStateEvent, AnySyncTimelineEvent},
serde::Raw,
OwnedRoomId, RoomId,
};
use tracing::{instrument, trace, warn};
use super::BaseClient;
#[cfg(feature = "e2e-encryption")]
use crate::latest_event::{is_suitable_for_latest_event, LatestEvent, PossibleLatestEvent};
#[cfg(feature = "e2e-encryption")]
use crate::RoomMemberships;
use crate::{
deserialized_responses::AmbiguityChanges,
error::Result,
read_receipts::{compute_notifications, PreviousEventsProvider},
rooms::RoomState,
store::{ambiguity_map::AmbiguityCache, StateChanges, Store},
sync::{JoinedRoom, LeftRoom, Rooms, SyncResponse},
Room, RoomInfo,
};
impl BaseClient {
#[cfg(feature = "e2e-encryption")]
pub async fn process_sliding_sync_e2ee(
&self,
extensions: &v4::Extensions,
) -> Result<Vec<Raw<AnyToDeviceEvent>>> {
if extensions.is_empty() {
return Ok(Default::default());
}
let v4::Extensions { to_device, e2ee, .. } = extensions;
let to_device_events = to_device.as_ref().map(|v4| v4.events.clone()).unwrap_or_default();
trace!(
to_device_events = to_device_events.len(),
device_one_time_keys_count = e2ee.device_one_time_keys_count.len(),
device_unused_fallback_key_types =
e2ee.device_unused_fallback_key_types.as_ref().map(|v| v.len()),
"Processing sliding sync e2ee events",
);
let mut changes = StateChanges::default();
let to_device = self
.preprocess_to_device_events(
matrix_sdk_crypto::EncryptionSyncChanges {
to_device_events,
changed_devices: &e2ee.device_lists,
one_time_keys_counts: &e2ee.device_one_time_keys_count,
unused_fallback_keys: e2ee.device_unused_fallback_key_types.as_deref(),
next_batch_token: to_device
.as_ref()
.map(|to_device| to_device.next_batch.clone()),
},
&mut changes,
)
.await?;
trace!("ready to submit changes to store");
self.store.save_changes(&changes).await?;
self.apply_changes(&changes);
trace!("applied changes");
Ok(to_device)
}
#[instrument(skip_all, level = "trace")]
pub async fn process_sliding_sync<PEP: PreviousEventsProvider>(
&self,
response: &v4::Response,
previous_events_provider: &PEP,
) -> Result<SyncResponse> {
let v4::Response {
rooms,
lists,
extensions,
..
} = response;
trace!(
rooms = rooms.len(),
lists = lists.len(),
extensions = !extensions.is_empty(),
"Processing sliding sync room events"
);
if rooms.is_empty() && extensions.is_empty() {
return Ok(SyncResponse::default());
};
let mut changes = StateChanges::default();
let store = self.store.clone();
let mut ambiguity_cache = AmbiguityCache::new(store.inner.clone());
let account_data = &extensions.account_data;
if !account_data.is_empty() {
self.handle_account_data(&account_data.global, &mut changes).await;
}
let mut new_rooms = Rooms::default();
let mut notifications = Default::default();
for (room_id, response_room_data) in rooms {
let (room_info, joined_room, left_room, invited_room) = self
.process_sliding_sync_room(
room_id,
response_room_data,
account_data,
&store,
&mut changes,
&mut notifications,
&mut ambiguity_cache,
)
.await?;
changes.add_room(room_info);
if let Some(joined_room) = joined_room {
new_rooms.join.insert(room_id.clone(), joined_room);
}
if let Some(left_room) = left_room {
new_rooms.leave.insert(room_id.clone(), left_room);
}
if let Some(invited_room) = invited_room {
new_rooms.invite.insert(room_id.clone(), invited_room);
}
}
for (room_id, raw) in &extensions.receipts.rooms {
match raw.deserialize() {
Ok(event) => {
changes.add_receipts(room_id, event.content);
}
Err(e) => {
let event_id: Option<String> = raw.get_field("event_id").ok().flatten();
#[rustfmt::skip]
warn!(
?room_id, event_id,
"Failed to deserialize read receipt room event: {e}"
);
}
}
new_rooms
.join
.entry(room_id.to_owned())
.or_insert_with(JoinedRoom::default)
.ephemeral
.push(raw.clone().cast());
}
for (room_id, raw) in &extensions.typing.rooms {
new_rooms
.join
.entry(room_id.to_owned())
.or_insert_with(JoinedRoom::default)
.ephemeral
.push(raw.clone().cast());
}
let user_id = &self.session_meta().expect("logged in user").user_id;
for (room_id, joined_room_update) in &mut new_rooms.join {
if let Some(mut room_info) = changes
.room_infos
.get(room_id)
.cloned()
.or_else(|| self.get_room(room_id).map(|r| r.clone_info()))
{
if compute_notifications(
user_id,
room_id,
changes.receipts.get(room_id),
previous_events_provider,
&joined_room_update.timeline.events,
&mut room_info.read_receipts,
)? {
changes.add_room(room_info);
}
}
}
if !account_data.is_empty() {
self.handle_account_data(&account_data.global, &mut changes).await;
}
changes.ambiguity_maps = ambiguity_cache.cache;
trace!("ready to submit changes to store");
store.save_changes(&changes).await?;
self.apply_changes(&changes);
trace!("applied changes");
Ok(SyncResponse {
rooms: new_rooms,
ambiguity_changes: AmbiguityChanges { changes: ambiguity_cache.changes },
notifications,
presence: Default::default(),
account_data: account_data.global.clone(),
to_device: Default::default(),
})
}
#[allow(clippy::too_many_arguments)]
async fn process_sliding_sync_room(
&self,
room_id: &RoomId,
room_data: &v4::SlidingSyncRoom,
account_data: &v4::AccountData,
store: &Store,
changes: &mut StateChanges,
notifications: &mut BTreeMap<OwnedRoomId, Vec<Notification>>,
ambiguity_cache: &mut AmbiguityCache,
) -> Result<(RoomInfo, Option<JoinedRoom>, Option<LeftRoom>, Option<InvitedRoom>)> {
let mut state_events = Self::deserialize_state_events(&room_data.required_state);
state_events.extend(Self::deserialize_state_events_from_timeline(&room_data.timeline));
let (raw_state_events, state_events): (Vec<_>, Vec<_>) = state_events.into_iter().unzip();
#[allow(unused_mut)] let (mut room, mut room_info, invited_room) = self.process_sliding_sync_room_membership(
room_data,
&state_events,
store,
room_id,
changes,
);
room_info.mark_state_partially_synced();
let mut user_ids = if !state_events.is_empty() {
self.handle_state(
&raw_state_events,
&state_events,
&mut room_info,
changes,
ambiguity_cache,
)
.await?
} else {
Default::default()
};
let room_account_data = if let Some(events) = account_data.rooms.get(room_id) {
self.handle_room_account_data(room_id, events, changes).await;
Some(events.to_vec())
} else {
None
};
process_room_properties(room_data, &mut room_info);
let push_rules = self.get_push_rules(changes).await?;
let timeline = self
.handle_timeline(
&room,
room_data.limited,
room_data.timeline.clone(),
room_data.prev_batch.clone(),
&push_rules,
&mut user_ids,
&mut room_info,
changes,
notifications,
ambiguity_cache,
)
.await?;
#[cfg(feature = "e2e-encryption")]
cache_latest_events(&room, &mut room_info, &timeline.events, Some(changes), Some(store))
.await;
#[cfg(feature = "e2e-encryption")]
if room_info.is_encrypted() {
if let Some(o) = self.olm_machine().await.as_ref() {
if !room.is_encrypted() {
let user_ids = store.get_user_ids(room_id, RoomMemberships::ACTIVE).await?;
o.update_tracked_users(user_ids.iter().map(Deref::deref)).await?
}
if !user_ids.is_empty() {
o.update_tracked_users(user_ids.iter().map(Deref::deref)).await?;
}
}
}
let notification_count = room_data.unread_notifications.clone().into();
room_info.update_notification_count(notification_count);
match room_info.state() {
RoomState::Joined => {
let ephemeral = Vec::new();
Ok((
room_info,
Some(JoinedRoom::new(
timeline,
raw_state_events,
room_account_data.unwrap_or_default(),
ephemeral,
notification_count,
)),
None,
None,
))
}
RoomState::Left => Ok((
room_info,
None,
Some(LeftRoom::new(
timeline,
raw_state_events,
room_account_data.unwrap_or_default(),
)),
None,
)),
RoomState::Invited => Ok((room_info, None, None, invited_room)),
}
}
fn process_sliding_sync_room_membership(
&self,
room_data: &v4::SlidingSyncRoom,
state_events: &[AnySyncStateEvent],
store: &Store,
room_id: &RoomId,
changes: &mut StateChanges,
) -> (Room, RoomInfo, Option<InvitedRoom>) {
if let Some(invite_state) = &room_data.invite_state {
let room = store.get_or_create_room(room_id, RoomState::Invited);
let mut room_info = room.clone_info();
room_info.mark_as_invited();
self.handle_invited_state(invite_state.as_slice(), &mut room_info, changes);
(
room,
room_info,
Some(v3::InvitedRoom::from(v3::InviteState::from(invite_state.clone()))),
)
} else {
let room = store.get_or_create_room(room_id, RoomState::Joined);
let mut room_info = room.clone_info();
room_info.mark_as_joined();
self.handle_own_room_membership(state_events, &mut room_info);
(room, room_info, None)
}
}
pub(crate) fn handle_own_room_membership(
&self,
state_events: &[AnySyncStateEvent],
room_info: &mut RoomInfo,
) {
let Some(meta) = self.session_meta() else {
return;
};
for event in state_events.iter().rev() {
if let AnySyncStateEvent::RoomMember(member) = &event {
if member.sender() == meta.user_id && member.state_key() == meta.user_id.as_str() {
room_info.set_state(member.membership().into());
break;
}
}
}
}
pub(crate) fn deserialize_state_events_from_timeline(
raw_events: &[Raw<AnySyncTimelineEvent>],
) -> Vec<(Raw<AnySyncStateEvent>, AnySyncStateEvent)> {
raw_events
.iter()
.filter_map(|raw_event| {
if raw_event.get_field::<serde::de::IgnoredAny>("state_key").transpose().is_some() {
match raw_event.deserialize_as::<AnySyncStateEvent>() {
Ok(event) => {
Some((raw_event.clone().cast(), event))
}
Err(error) => {
warn!("Couldn't deserialize state event from timeline: {error}");
None
}
}
} else {
None
}
})
.collect()
}
}
#[cfg(feature = "e2e-encryption")]
async fn cache_latest_events(
room: &Room,
room_info: &mut RoomInfo,
events: &[SyncTimelineEvent],
changes: Option<&StateChanges>,
store: Option<&Store>,
) {
let mut encrypted_events =
Vec::with_capacity(room.latest_encrypted_events.read().unwrap().capacity());
for event in events.iter().rev() {
if let Ok(timeline_event) = event.event.deserialize() {
match is_suitable_for_latest_event(&timeline_event) {
PossibleLatestEvent::YesRoomMessage(_) | PossibleLatestEvent::YesPoll(_) => {
let mut sender_profile = None;
let mut sender_name_is_ambiguous = None;
if let Some(changes) = changes {
sender_profile = changes
.profiles
.get(room.room_id())
.and_then(|profiles_by_user| {
profiles_by_user.get(timeline_event.sender())
})
.cloned();
if let Some(sender_profile) = sender_profile.as_ref() {
sender_name_is_ambiguous = sender_profile
.as_original()
.and_then(|profile| profile.content.displayname.as_ref())
.and_then(|display_name| {
changes.ambiguity_maps.get(room.room_id()).and_then(
|map_for_room| {
map_for_room
.get(display_name)
.map(|user_ids| user_ids.len() > 1)
},
)
});
}
}
if sender_profile.is_none() {
if let Some(store) = store {
sender_profile = store
.get_profile(room.room_id(), timeline_event.sender())
.await
.ok()
.flatten();
}
}
let latest_event = Box::new(LatestEvent::new_with_sender_details(
event.clone(),
sender_profile,
sender_name_is_ambiguous,
));
room_info.latest_event = Some(latest_event.clone());
room.latest_encrypted_events.write().unwrap().clear();
break;
}
PossibleLatestEvent::NoEncrypted => {
if encrypted_events.len() < encrypted_events.capacity() {
encrypted_events.push(event.event.clone());
}
}
_ => {
}
}
} else {
warn!(
"Failed to deserialize event as AnySyncTimelineEvent. ID={}",
event.event_id().expect("Event has no ID!")
);
}
}
room.latest_encrypted_events.write().unwrap().extend(encrypted_events.into_iter().rev());
}
fn process_room_properties(room_data: &v4::SlidingSyncRoom, room_info: &mut RoomInfo) {
if let Some(name) = &room_data.name {
room_info.update_name(name.to_owned());
}
let mut room_summary = RoomSummary::new();
room_summary.invited_member_count = room_data.invited_count;
room_summary.joined_member_count = room_data.joined_count;
room_info.update_summary(&room_summary);
room_info.set_prev_batch(room_data.prev_batch.as_deref());
if room_data.limited {
room_info.mark_members_missing();
}
}
#[cfg(test)]
mod tests {
use std::{
collections::{BTreeMap, HashSet},
sync::{Arc, RwLock as SyncRwLock},
};
use assert_matches::assert_matches;
use matrix_sdk_common::{deserialized_responses::SyncTimelineEvent, ring_buffer::RingBuffer};
use matrix_sdk_test::async_test;
use ruma::{
api::client::sync::sync_events::{v4, UnreadNotificationsCount},
assign, device_id, event_id,
events::{
direct::DirectEventContent,
room::{
avatar::RoomAvatarEventContent,
canonical_alias::RoomCanonicalAliasEventContent,
member::{MembershipState, RoomMemberEventContent},
message::SyncRoomMessageEvent,
},
AnySyncMessageLikeEvent, AnySyncTimelineEvent, GlobalAccountDataEventContent,
StateEventContent,
},
mxc_uri, room_alias_id, room_id,
serde::Raw,
uint, user_id, MxcUri, OwnedRoomId, OwnedUserId, RoomAliasId, RoomId, UserId,
};
use serde_json::json;
use super::cache_latest_events;
use crate::{store::MemoryStore, BaseClient, Room, RoomState, SessionMeta};
#[async_test]
async fn test_notification_count_set() {
let client = logged_in_client().await;
let mut response = v4::Response::new("42".to_owned());
let room_id = room_id!("!room:example.org");
let count = assign!(UnreadNotificationsCount::default(), {
highlight_count: Some(uint!(13)),
notification_count: Some(uint!(37)),
});
response.rooms.insert(
room_id.to_owned(),
assign!(v4::SlidingSyncRoom::new(), {
unread_notifications: count.clone()
}),
);
let sync_response =
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
let room = sync_response.rooms.join.get(room_id).unwrap();
assert_eq!(room.unread_notifications, count.clone().into());
let room = client.get_room(room_id).expect("found room");
assert_eq!(room.unread_notification_counts(), count.into());
}
#[async_test]
async fn can_process_empty_sliding_sync_response() {
let client = logged_in_client().await;
let empty_response = v4::Response::new("5".to_owned());
client.process_sliding_sync(&empty_response, &()).await.expect("Failed to process sync");
}
#[async_test]
async fn room_with_unspecified_state_is_added_to_client_and_joined_list() {
let client = logged_in_client().await;
let room_id = room_id!("!r:e.uk");
let mut room = v4::SlidingSyncRoom::new();
room.joined_count = Some(uint!(41));
let response = response_with_room(room_id, room).await;
let sync_resp =
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
let client_room = client.get_room(room_id).expect("No room found");
assert_eq!(client_room.room_id(), room_id);
assert_eq!(client_room.joined_members_count(), 41);
assert_eq!(client_room.state(), RoomState::Joined);
assert!(sync_resp.rooms.join.get(room_id).is_some());
assert!(sync_resp.rooms.leave.get(room_id).is_none());
assert!(sync_resp.rooms.invite.get(room_id).is_none());
}
#[async_test]
async fn room_name_is_found_when_processing_sliding_sync_response() {
let client = logged_in_client().await;
let room_id = room_id!("!r:e.uk");
let mut room = v4::SlidingSyncRoom::new();
room.name = Some("little room".to_owned());
let response = response_with_room(room_id, room).await;
let sync_resp =
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
let client_room = client.get_room(room_id).expect("No room found");
assert_eq!(client_room.name(), Some("little room".to_owned()));
assert_eq!(client_room.state(), RoomState::Joined);
assert!(sync_resp.rooms.join.get(room_id).is_some());
assert!(sync_resp.rooms.leave.get(room_id).is_none());
assert!(sync_resp.rooms.invite.get(room_id).is_none());
}
#[async_test]
async fn invited_room_name_is_found_when_processing_sliding_sync_response() {
let client = logged_in_client().await;
let room_id = room_id!("!r:e.uk");
let user_id = user_id!("@w:e.uk");
let mut room = v4::SlidingSyncRoom::new();
set_room_invited(&mut room, user_id);
room.name = Some("little room".to_owned());
let response = response_with_room(room_id, room).await;
let sync_resp =
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
let client_room = client.get_room(room_id).expect("No room found");
assert_eq!(client_room.name(), Some("little room".to_owned()));
assert_eq!(client_room.state(), RoomState::Invited);
assert!(sync_resp.rooms.join.get(room_id).is_none());
assert!(sync_resp.rooms.leave.get(room_id).is_none());
assert!(sync_resp.rooms.invite.get(room_id).is_some());
}
#[async_test]
async fn left_a_room_from_required_state_event() {
let client = logged_in_client().await;
let room_id = room_id!("!r:e.uk");
let user_id = user_id!("@u:e.uk");
let mut room = v4::SlidingSyncRoom::new();
set_room_joined(&mut room, user_id);
let response = response_with_room(room_id, room).await;
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Joined);
let mut room = v4::SlidingSyncRoom::new();
set_room_left(&mut room, user_id);
let response = response_with_room(room_id, room).await;
let sync_resp =
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Left);
assert!(sync_resp.rooms.join.get(room_id).is_none());
assert!(sync_resp.rooms.leave.get(room_id).is_some());
assert!(sync_resp.rooms.invite.get(room_id).is_none());
}
#[async_test]
async fn left_a_room_from_timeline_state_event() {
let client = logged_in_client().await;
let room_id = room_id!("!r:e.uk");
let user_id = user_id!("@u:e.uk");
let mut room = v4::SlidingSyncRoom::new();
set_room_joined(&mut room, user_id);
let response = response_with_room(room_id, room).await;
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Joined);
let mut room = v4::SlidingSyncRoom::new();
set_room_left_as_timeline_event(&mut room, user_id);
let response = response_with_room(room_id, room).await;
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Left);
}
#[async_test]
async fn can_be_reinvited_to_a_left_room() {
let client = logged_in_client().await;
let room_id = room_id!("!r:e.uk");
let user_id = user_id!("@u:e.uk");
let mut room = v4::SlidingSyncRoom::new();
set_room_joined(&mut room, user_id);
let response = response_with_room(room_id, room).await;
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Joined);
let mut room = v4::SlidingSyncRoom::new();
set_room_left(&mut room, user_id);
let response = response_with_room(room_id, room).await;
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Left);
let mut room = v4::SlidingSyncRoom::new();
set_room_invited(&mut room, user_id);
let response = response_with_room(room_id, room).await;
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Invited);
}
#[async_test]
async fn other_person_leaving_a_dm_is_reflected_in_their_membership_and_direct_targets() {
let room_id = room_id!("!r:e.uk");
let user_a_id = user_id!("@a:e.uk");
let user_b_id = user_id!("@b:e.uk");
let client = logged_in_client().await;
create_dm(&client, room_id, user_a_id, user_b_id, MembershipState::Join).await;
assert!(direct_targets(&client, room_id).contains(user_b_id));
assert_eq!(membership(&client, room_id, user_b_id).await, MembershipState::Join);
update_room_membership(&client, room_id, user_b_id, MembershipState::Leave).await;
assert!(direct_targets(&client, room_id).contains(user_b_id));
assert_eq!(membership(&client, room_id, user_b_id).await, MembershipState::Leave);
}
#[async_test]
async fn other_person_refusing_invite_to_a_dm_is_reflected_in_their_membership_and_direct_targets(
) {
let room_id = room_id!("!r:e.uk");
let user_a_id = user_id!("@a:e.uk");
let user_b_id = user_id!("@b:e.uk");
let client = logged_in_client().await;
create_dm(&client, room_id, user_a_id, user_b_id, MembershipState::Invite).await;
assert!(direct_targets(&client, room_id).contains(user_b_id));
assert_eq!(membership(&client, room_id, user_b_id).await, MembershipState::Invite);
update_room_membership(&client, room_id, user_b_id, MembershipState::Leave).await;
assert!(direct_targets(&client, room_id).contains(user_b_id));
assert_eq!(membership(&client, room_id, user_b_id).await, MembershipState::Leave);
}
#[async_test]
async fn members_count_in_a_dm_where_other_person_has_joined() {
let room_id = room_id!("!r:bar.org");
let user_a_id = user_id!("@a:bar.org");
let user_b_id = user_id!("@b:bar.org");
let client = logged_in_client().await;
create_dm(&client, room_id, user_a_id, user_b_id, MembershipState::Join).await;
assert_eq!(membership(&client, room_id, user_a_id).await, MembershipState::Join);
assert!(direct_targets(&client, room_id).contains(user_b_id));
assert_eq!(membership(&client, room_id, user_b_id).await, MembershipState::Join);
let room = client.get_room(room_id).unwrap();
assert_eq!(room.active_members_count(), 2);
assert_eq!(room.joined_members_count(), 2);
assert_eq!(room.invited_members_count(), 0);
}
#[async_test]
async fn members_count_in_a_dm_where_other_person_is_invited() {
let room_id = room_id!("!r:bar.org");
let user_a_id = user_id!("@a:bar.org");
let user_b_id = user_id!("@b:bar.org");
let client = logged_in_client().await;
create_dm(&client, room_id, user_a_id, user_b_id, MembershipState::Invite).await;
assert_eq!(membership(&client, room_id, user_a_id).await, MembershipState::Join);
assert!(direct_targets(&client, room_id).contains(user_b_id));
assert_eq!(membership(&client, room_id, user_b_id).await, MembershipState::Invite);
let room = client.get_room(room_id).unwrap();
assert_eq!(room.active_members_count(), 2);
assert_eq!(room.joined_members_count(), 1);
assert_eq!(room.invited_members_count(), 1);
}
#[async_test]
async fn avatar_is_found_when_processing_sliding_sync_response() {
let client = logged_in_client().await;
let room_id = room_id!("!r:e.uk");
let user_id = user_id!("@u:e.uk");
let room = room_with_avatar(mxc_uri!("mxc://e.uk/med1"), user_id);
let response = response_with_room(room_id, room).await;
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
let client_room = client.get_room(room_id).expect("No room found");
assert_eq!(
client_room.avatar_url().expect("No avatar URL").media_id().expect("No media ID"),
"med1"
);
}
#[async_test]
async fn invitation_room_is_added_to_client_and_invite_list() {
let client = logged_in_client().await;
let room_id = room_id!("!r:e.uk");
let user_id = user_id!("@u:e.uk");
let mut room = v4::SlidingSyncRoom::new();
set_room_invited(&mut room, user_id);
let response = response_with_room(room_id, room).await;
let sync_resp =
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
let client_room = client.get_room(room_id).expect("No room found");
assert_eq!(client_room.room_id(), room_id);
assert_eq!(client_room.state(), RoomState::Invited);
assert!(!sync_resp.rooms.invite[room_id].invite_state.is_empty());
assert!(sync_resp.rooms.join.get(room_id).is_none());
}
#[async_test]
async fn avatar_is_found_in_invitation_room_when_processing_sliding_sync_response() {
let client = logged_in_client().await;
let room_id = room_id!("!r:e.uk");
let user_id = user_id!("@u:e.uk");
let mut room = room_with_avatar(mxc_uri!("mxc://e.uk/med1"), user_id);
set_room_invited(&mut room, user_id);
let response = response_with_room(room_id, room).await;
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
let client_room = client.get_room(room_id).expect("No room found");
assert_eq!(
client_room.avatar_url().expect("No avatar URL").media_id().expect("No media ID"),
"med1"
);
}
#[async_test]
async fn canonical_alias_is_found_in_invitation_room_when_processing_sliding_sync_response() {
let client = logged_in_client().await;
let room_id = room_id!("!r:e.uk");
let user_id = user_id!("@u:e.uk");
let room_alias_id = room_alias_id!("#myroom:e.uk");
let mut room = room_with_canonical_alias(room_alias_id, user_id);
set_room_invited(&mut room, user_id);
let response = response_with_room(room_id, room).await;
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
let client_room = client.get_room(room_id).expect("No room found");
assert_eq!(client_room.canonical_alias(), Some(room_alias_id.to_owned()));
}
#[async_test]
async fn display_name_from_sliding_sync_overrides_alias() {
let client = logged_in_client().await;
let room_id = room_id!("!r:e.uk");
let user_id = user_id!("@u:e.uk");
let room_alias_id = room_alias_id!("#myroom:e.uk");
let mut room = room_with_canonical_alias(room_alias_id, user_id);
room.name = Some("This came from the server".to_owned());
let response = response_with_room(room_id, room).await;
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
let client_room = client.get_room(room_id).expect("No room found");
assert_eq!(
client_room.display_name().await.unwrap().to_string(),
"This came from the server"
);
}
#[async_test]
async fn last_event_from_sliding_sync_is_cached() {
let client = logged_in_client().await;
let room_id = room_id!("!r:e.uk");
let event_a = json!({
"sender":"@alice:example.com",
"type":"m.room.message",
"event_id": "$ida",
"origin_server_ts": 12344446,
"content":{"body":"A", "msgtype": "m.text"}
});
let event_b = json!({
"sender":"@alice:example.com",
"type":"m.room.message",
"event_id": "$idb",
"origin_server_ts": 12344447,
"content":{"body":"B", "msgtype": "m.text"}
});
let events = &[event_a, event_b.clone()];
let room = room_with_timeline(events);
let response = response_with_room(room_id, room).await;
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
let client_room = client.get_room(room_id).expect("No room found");
assert_eq!(
ev_id(client_room.latest_event().map(|latest_event| latest_event.event().clone())),
"$idb"
);
}
#[async_test]
async fn cached_latest_event_can_be_redacted() {
let client = logged_in_client().await;
let room_id = room_id!("!r:e.uk");
let event_a = json!({
"sender": "@alice:example.com",
"type": "m.room.message",
"event_id": "$ida",
"origin_server_ts": 12344446,
"content": { "body":"A", "msgtype": "m.text" },
});
let room = room_with_timeline(&[event_a]);
let response = response_with_room(room_id, room).await;
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
let client_room = client.get_room(room_id).expect("No room found");
assert_eq!(
ev_id(client_room.latest_event().map(|latest_event| latest_event.event().clone())),
"$ida"
);
let redaction = json!({
"sender": "@alice:example.com",
"type": "m.room.redaction",
"event_id": "$idb",
"redacts": "$ida",
"origin_server_ts": 12344448,
"content": {},
});
let room = room_with_timeline(&[redaction]);
let response = response_with_room(room_id, room).await;
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
let client_room = client.get_room(room_id).expect("No room found");
let latest_event = client_room.latest_event().unwrap();
assert_eq!(latest_event.event_id().unwrap(), "$ida");
assert_matches!(
latest_event.event().event.deserialize().unwrap(),
AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(
SyncRoomMessageEvent::Redacted(_)
))
);
}
#[async_test]
async fn when_no_events_we_dont_cache_any() {
let events = &[];
let chosen = choose_event_to_cache(events).await;
assert!(chosen.is_none());
}
#[async_test]
async fn when_only_one_event_we_cache_it() {
let event1 = make_event("m.room.message", "$1");
let events = &[event1.clone()];
let chosen = choose_event_to_cache(events).await;
assert_eq!(ev_id(chosen), rawev_id(event1));
}
#[async_test]
async fn with_multiple_events_we_cache_the_last_one() {
let event1 = make_event("m.room.message", "$1");
let event2 = make_event("m.room.message", "$2");
let events = &[event1, event2.clone()];
let chosen = choose_event_to_cache(events).await;
assert_eq!(ev_id(chosen), rawev_id(event2));
}
#[async_test]
async fn cache_the_latest_relevant_event_and_ignore_irrelevant_ones_even_if_later() {
let event1 = make_event("m.room.message", "$1");
let event2 = make_event("m.room.message", "$2");
let event3 = make_event("m.room.powerlevels", "$3");
let event4 = make_event("m.room.powerlevels", "$5");
let events = &[event1, event2.clone(), event3, event4];
let chosen = choose_event_to_cache(events).await;
assert_eq!(ev_id(chosen), rawev_id(event2));
}
#[async_test]
async fn prefer_to_cache_nothing_rather_than_irrelevant_events() {
let event1 = make_event("m.room.power_levels", "$1");
let events = &[event1];
let chosen = choose_event_to_cache(events).await;
assert!(chosen.is_none());
}
#[async_test]
async fn cache_encrypted_events_that_are_after_latest_message() {
let event1 = make_event("m.room.message", "$1");
let event2 = make_event("m.room.message", "$2");
let event3 = make_encrypted_event("$3");
let event4 = make_encrypted_event("$4");
let events = &[event1, event2.clone(), event3.clone(), event4.clone()];
let room = make_room();
let mut room_info = room.clone_info();
cache_latest_events(&room, &mut room_info, events, None, None).await;
assert_eq!(
ev_id(room_info.latest_event.as_ref().map(|latest_event| latest_event.event().clone())),
rawev_id(event2.clone())
);
room.set_room_info(room_info);
assert_eq!(
ev_id(room.latest_event().map(|latest_event| latest_event.event().clone())),
rawev_id(event2)
);
assert_eq!(rawevs_ids(&room.latest_encrypted_events), evs_ids(&[event3, event4]));
}
#[async_test]
async fn dont_cache_encrypted_events_that_are_before_latest_message() {
let event1 = make_encrypted_event("$1");
let event2 = make_event("m.room.message", "$2");
let event3 = make_encrypted_event("$3");
let events = &[event1, event2.clone(), event3.clone()];
let room = make_room();
let mut room_info = room.clone_info();
cache_latest_events(&room, &mut room_info, events, None, None).await;
room.set_room_info(room_info);
assert_eq!(
ev_id(room.latest_event().map(|latest_event| latest_event.event().clone())),
rawev_id(event2)
);
assert_eq!(rawevs_ids(&room.latest_encrypted_events), evs_ids(&[event3]));
}
#[async_test]
async fn skip_irrelevant_events_eg_receipts_even_if_after_message() {
let event1 = make_event("m.room.message", "$1");
let event2 = make_event("m.room.message", "$2");
let event3 = make_encrypted_event("$3");
let event4 = make_event("m.read", "$4");
let event5 = make_encrypted_event("$5");
let events = &[event1, event2.clone(), event3.clone(), event4, event5.clone()];
let room = make_room();
let mut room_info = room.clone_info();
cache_latest_events(&room, &mut room_info, events, None, None).await;
room.set_room_info(room_info);
assert_eq!(
ev_id(room.latest_event().map(|latest_event| latest_event.event().clone())),
rawev_id(event2)
);
assert_eq!(rawevs_ids(&room.latest_encrypted_events), evs_ids(&[event3, event5]));
}
#[async_test]
async fn only_store_the_max_number_of_encrypted_events() {
let evente = make_event("m.room.message", "$e");
let eventd = make_event("m.room.message", "$d");
let eventc = make_encrypted_event("$c");
let event9 = make_encrypted_event("$9");
let event8 = make_encrypted_event("$8");
let event7 = make_encrypted_event("$7");
let eventb = make_event("m.read", "$b");
let event6 = make_encrypted_event("$6");
let event5 = make_encrypted_event("$5");
let event4 = make_encrypted_event("$4");
let event3 = make_encrypted_event("$3");
let event2 = make_encrypted_event("$2");
let eventa = make_event("m.read", "$a");
let event1 = make_encrypted_event("$1");
let event0 = make_encrypted_event("$0");
let events = &[
evente,
eventd.clone(),
eventc,
event9.clone(),
event8.clone(),
event7.clone(),
eventb,
event6.clone(),
event5.clone(),
event4.clone(),
event3.clone(),
event2.clone(),
eventa,
event1.clone(),
event0.clone(),
];
let room = make_room();
let mut room_info = room.clone_info();
cache_latest_events(&room, &mut room_info, events, None, None).await;
room.set_room_info(room_info);
assert_eq!(
ev_id(room.latest_event().map(|latest_event| latest_event.event().clone())),
rawev_id(eventd)
);
assert_eq!(
rawevs_ids(&room.latest_encrypted_events),
evs_ids(&[
event9, event8, event7, event6, event5, event4, event3, event2, event1, event0
])
);
}
#[async_test]
async fn dont_overflow_capacity_if_previous_encrypted_events_exist() {
let room = make_room();
let mut room_info = room.clone_info();
cache_latest_events(
&room,
&mut room_info,
&[
make_encrypted_event("$0"),
make_encrypted_event("$1"),
make_encrypted_event("$2"),
make_encrypted_event("$3"),
make_encrypted_event("$4"),
make_encrypted_event("$5"),
make_encrypted_event("$6"),
make_encrypted_event("$7"),
make_encrypted_event("$8"),
make_encrypted_event("$9"),
],
None,
None,
)
.await;
room.set_room_info(room_info);
assert_eq!(room.latest_encrypted_events.read().unwrap().len(), 10);
let eventa = make_encrypted_event("$a");
let mut room_info = room.clone_info();
cache_latest_events(&room, &mut room_info, &[eventa], None, None).await;
room.set_room_info(room_info);
assert!(!rawevs_ids(&room.latest_encrypted_events).contains(&"$0".to_owned()));
assert_eq!(rawevs_ids(&room.latest_encrypted_events)[9], "$a");
}
#[async_test]
async fn existing_encrypted_events_are_deleted_if_we_receive_unencrypted() {
let room = make_room();
let mut room_info = room.clone_info();
cache_latest_events(
&room,
&mut room_info,
&[make_encrypted_event("$0"), make_encrypted_event("$1"), make_encrypted_event("$2")],
None,
None,
)
.await;
room.set_room_info(room_info.clone());
let eventa = make_event("m.room.message", "$a");
let eventb = make_encrypted_event("$b");
cache_latest_events(&room, &mut room_info, &[eventa, eventb], None, None).await;
room.set_room_info(room_info);
assert_eq!(rawevs_ids(&room.latest_encrypted_events), &["$b"]);
assert_eq!(rawev_id(room.latest_event().unwrap().event().clone()), "$a");
}
async fn choose_event_to_cache(events: &[SyncTimelineEvent]) -> Option<SyncTimelineEvent> {
let room = make_room();
let mut room_info = room.clone_info();
cache_latest_events(&room, &mut room_info, events, None, None).await;
room.set_room_info(room_info);
room.latest_event().map(|latest_event| latest_event.event().clone())
}
fn rawev_id(event: SyncTimelineEvent) -> String {
event.event_id().unwrap().to_string()
}
fn ev_id(event: Option<SyncTimelineEvent>) -> String {
event.unwrap().event_id().unwrap().to_string()
}
fn rawevs_ids(events: &Arc<SyncRwLock<RingBuffer<Raw<AnySyncTimelineEvent>>>>) -> Vec<String> {
events.read().unwrap().iter().map(|e| e.get_field("event_id").unwrap().unwrap()).collect()
}
fn evs_ids(events: &[SyncTimelineEvent]) -> Vec<String> {
events.iter().map(|e| e.event_id().unwrap().to_string()).collect()
}
fn make_room() -> Room {
Room::new(
user_id!("@u:e.co"),
Arc::new(MemoryStore::new()),
room_id!("!r:e.co"),
RoomState::Joined,
)
}
fn make_event(typ: &str, id: &str) -> SyncTimelineEvent {
SyncTimelineEvent::new(
Raw::from_json_string(
json!({
"type": typ,
"event_id": id,
"content": { "msgtype": "m.text", "body": "my msg" },
"sender": "@u:h.uk",
"origin_server_ts": 12344445,
})
.to_string(),
)
.unwrap(),
)
}
fn make_encrypted_event(id: &str) -> SyncTimelineEvent {
SyncTimelineEvent::new(
Raw::from_json_string(
json!({
"type": "m.room.encrypted",
"event_id": id,
"content": {
"algorithm": "m.megolm.v1.aes-sha2",
"ciphertext": "",
"sender_key": "",
"device_id": "",
"session_id": "",
},
"sender": "@u:h.uk",
"origin_server_ts": 12344445,
})
.to_string(),
)
.unwrap(),
)
}
async fn membership(
client: &BaseClient,
room_id: &RoomId,
user_id: &UserId,
) -> MembershipState {
let room = client.get_room(room_id).expect("Room not found!");
let member = room.get_member(user_id).await.unwrap().expect("B not in room");
member.membership().clone()
}
fn direct_targets(client: &BaseClient, room_id: &RoomId) -> HashSet<OwnedUserId> {
let room = client.get_room(room_id).expect("Room not found!");
room.direct_targets()
}
async fn create_dm(
client: &BaseClient,
room_id: &RoomId,
my_id: &UserId,
their_id: &UserId,
other_state: MembershipState,
) {
let mut room = v4::SlidingSyncRoom::new();
set_room_joined(&mut room, my_id);
match other_state {
MembershipState::Join => {
room.joined_count = Some(uint!(2));
room.invited_count = None;
}
MembershipState::Invite => {
room.joined_count = Some(uint!(1));
room.invited_count = Some(uint!(1));
}
_ => {
room.joined_count = Some(uint!(1));
room.invited_count = None;
}
}
room.required_state.push(make_membership_event(their_id, other_state));
let mut response = response_with_room(room_id, room).await;
set_direct_with(&mut response, their_id.to_owned(), vec![room_id.to_owned()]);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
}
async fn update_room_membership(
client: &BaseClient,
room_id: &RoomId,
user_id: &UserId,
new_state: MembershipState,
) {
let mut room = v4::SlidingSyncRoom::new();
room.required_state.push(make_membership_event(user_id, new_state));
let response = response_with_room(room_id, room).await;
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
}
fn set_direct_with(
response: &mut v4::Response,
user_id: OwnedUserId,
room_ids: Vec<OwnedRoomId>,
) {
let mut direct_content = BTreeMap::new();
direct_content.insert(user_id, room_ids);
response
.extensions
.account_data
.global
.push(make_global_account_data_event(DirectEventContent(direct_content)));
}
async fn logged_in_client() -> BaseClient {
let client = BaseClient::new();
client
.set_session_meta(SessionMeta {
user_id: user_id!("@u:e.uk").to_owned(),
device_id: device_id!("XYZ").to_owned(),
})
.await
.expect("Failed to set session meta");
client
}
async fn response_with_room(room_id: &RoomId, room: v4::SlidingSyncRoom) -> v4::Response {
let mut response = v4::Response::new("5".to_owned());
response.rooms.insert(room_id.to_owned(), room);
response
}
fn room_with_avatar(avatar_uri: &MxcUri, user_id: &UserId) -> v4::SlidingSyncRoom {
let mut room = v4::SlidingSyncRoom::new();
let mut avatar_event_content = RoomAvatarEventContent::new();
avatar_event_content.url = Some(avatar_uri.to_owned());
room.required_state.push(make_state_event(user_id, "", avatar_event_content, None));
room
}
fn room_with_canonical_alias(
room_alias_id: &RoomAliasId,
user_id: &UserId,
) -> v4::SlidingSyncRoom {
let mut room = v4::SlidingSyncRoom::new();
let mut canonical_alias_event_content = RoomCanonicalAliasEventContent::new();
canonical_alias_event_content.alias = Some(room_alias_id.to_owned());
room.required_state.push(make_state_event(
user_id,
"",
canonical_alias_event_content,
None,
));
room
}
fn room_with_timeline(events: &[serde_json::Value]) -> v4::SlidingSyncRoom {
let mut room = v4::SlidingSyncRoom::new();
room.timeline.extend(
events
.iter()
.map(|e| Raw::from_json_string(e.to_string()).unwrap())
.collect::<Vec<_>>(),
);
room
}
fn set_room_invited(room: &mut v4::SlidingSyncRoom, user_id: &UserId) {
let evt = Raw::new(&json!({
"type": "m.room.member",
}))
.expect("Failed to make raw event")
.cast();
room.invite_state = Some(vec![evt]);
room.required_state.push(make_membership_event(user_id, MembershipState::Invite));
}
fn set_room_joined(room: &mut v4::SlidingSyncRoom, user_id: &UserId) {
room.required_state.push(make_membership_event(user_id, MembershipState::Join));
}
fn set_room_left(room: &mut v4::SlidingSyncRoom, user_id: &UserId) {
room.required_state.push(make_membership_event(user_id, MembershipState::Leave));
}
fn set_room_left_as_timeline_event(room: &mut v4::SlidingSyncRoom, user_id: &UserId) {
room.timeline.push(make_membership_event(user_id, MembershipState::Leave));
}
fn make_membership_event<K>(user_id: &UserId, state: MembershipState) -> Raw<K> {
make_state_event(user_id, user_id.as_str(), RoomMemberEventContent::new(state), None)
}
fn make_global_account_data_event<C: GlobalAccountDataEventContent, E>(content: C) -> Raw<E> {
Raw::new(&json!({
"type": content.event_type(),
"content": content,
}))
.expect("Failed to create account data event")
.cast()
}
fn make_state_event<C: StateEventContent, E>(
sender: &UserId,
state_key: &str,
content: C,
prev_content: Option<C>,
) -> Raw<E> {
let unsigned = if let Some(prev_content) = prev_content {
json!({ "prev_content": prev_content })
} else {
json!({})
};
Raw::new(&json!({
"type": content.event_type(),
"state_key": state_key,
"content": content,
"event_id": event_id!("$evt"),
"sender": sender,
"origin_server_ts": 10,
"unsigned": unsigned,
}))
.expect("Failed to create state event")
.cast()
}
}