use std::{
collections::{BTreeMap, BTreeSet},
fmt, iter,
};
#[cfg(feature = "e2e-encryption")]
use std::{ops::Deref, sync::Arc};
use eyeball::{SharedObservable, Subscriber};
use matrix_sdk_common::instant::Instant;
#[cfg(feature = "e2e-encryption")]
use matrix_sdk_crypto::{
store::DynCryptoStore, EncryptionSettings, EncryptionSyncChanges, OlmError, OlmMachine,
ToDeviceRequest,
};
#[cfg(feature = "e2e-encryption")]
use ruma::events::{
room::{history_visibility::HistoryVisibility, message::MessageType},
SyncMessageLikeEvent,
};
use ruma::{
api::client::{self as api, push::get_notifications::v3::Notification},
events::{
push_rules::{PushRulesEvent, PushRulesEventContent},
room::{
member::{MembershipState, SyncRoomMemberEvent},
power_levels::{RoomPowerLevelsEvent, RoomPowerLevelsEventContent},
},
AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent,
AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent, AnySyncStateEvent,
AnySyncTimelineEvent, GlobalAccountDataEventType, StateEventType,
},
push::{Action, PushConditionRoomCtx, Ruleset},
serde::Raw,
MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedUserId, RoomId, RoomVersionId, UInt, UserId,
};
use tokio::sync::RwLock;
#[cfg(feature = "e2e-encryption")]
use tokio::sync::RwLockReadGuard;
use tracing::{debug, info, instrument, trace, warn};
#[cfg(all(feature = "e2e-encryption", feature = "experimental-sliding-sync"))]
use crate::latest_event::{is_suitable_for_latest_event, LatestEvent, PossibleLatestEvent};
use crate::{
deserialized_responses::{AmbiguityChanges, MembersResponse, SyncTimelineEvent},
error::Result,
rooms::{Room, RoomInfo, RoomState},
store::{
ambiguity_map::AmbiguityCache, DynStateStore, MemoryStore, Result as StoreResult,
StateChanges, StateStoreDataKey, StateStoreDataValue, StateStoreExt, Store, StoreConfig,
},
sync::{JoinedRoom, LeftRoom, Rooms, SyncResponse, Timeline},
RoomStateFilter, SessionMeta,
};
#[cfg(feature = "e2e-encryption")]
use crate::{error::Error, RoomMemberships};
#[derive(Clone)]
pub struct BaseClient {
pub(crate) store: Store,
#[cfg(feature = "e2e-encryption")]
crypto_store: Arc<DynCryptoStore>,
#[cfg(feature = "e2e-encryption")]
olm_machine: Arc<RwLock<Option<OlmMachine>>>,
pub(crate) ignore_user_list_changes: SharedObservable<()>,
}
#[cfg(not(tarpaulin_include))]
impl fmt::Debug for BaseClient {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Client")
.field("session_meta", &self.store.session_meta())
.field("sync_token", &self.store.sync_token)
.finish_non_exhaustive()
}
}
impl BaseClient {
pub fn new() -> Self {
BaseClient::with_store_config(StoreConfig::default())
}
pub fn with_store_config(config: StoreConfig) -> Self {
BaseClient {
store: Store::new(config.state_store),
#[cfg(feature = "e2e-encryption")]
crypto_store: config.crypto_store,
#[cfg(feature = "e2e-encryption")]
olm_machine: Default::default(),
ignore_user_list_changes: Default::default(),
}
}
pub fn clone_with_in_memory_state_store(&self) -> Self {
let config = StoreConfig::new().state_store(MemoryStore::new());
#[cfg(feature = "e2e-encryption")]
let config = config.crypto_store(self.crypto_store.clone());
Self::with_store_config(config)
}
pub fn session_meta(&self) -> Option<&SessionMeta> {
self.store.session_meta()
}
pub fn get_rooms(&self) -> Vec<Room> {
self.store.get_rooms()
}
pub fn get_rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
self.store.get_rooms_filtered(filter)
}
pub fn get_or_create_room(&self, room_id: &RoomId, room_state: RoomState) -> Room {
self.store.get_or_create_room(room_id, room_state)
}
#[deprecated = "Use get_rooms_filtered with RoomStateFilter::INVITED instead."]
pub fn get_stripped_rooms(&self) -> Vec<Room> {
self.get_rooms_filtered(RoomStateFilter::INVITED)
}
#[allow(unknown_lints, clippy::explicit_auto_deref)]
pub fn store(&self) -> &DynStateStore {
&*self.store
}
pub fn logged_in(&self) -> bool {
self.store.session_meta().is_some()
}
pub async fn set_session_meta(&self, session_meta: SessionMeta) -> Result<()> {
debug!(user_id = ?session_meta.user_id, device_id = ?session_meta.device_id, "Restoring login");
self.store.set_session_meta(session_meta.clone()).await?;
#[cfg(feature = "e2e-encryption")]
self.regenerate_olm().await?;
Ok(())
}
#[cfg(feature = "e2e-encryption")]
pub async fn regenerate_olm(&self) -> Result<()> {
tracing::debug!("regenerating OlmMachine");
let session_meta = self.session_meta().ok_or(Error::OlmError(OlmError::MissingSession))?;
let olm_machine = OlmMachine::with_store(
&session_meta.user_id,
&session_meta.device_id,
self.crypto_store.clone(),
)
.await
.map_err(OlmError::from)?;
*self.olm_machine.write().await = Some(olm_machine);
Ok(())
}
pub async fn sync_token(&self) -> Option<String> {
self.store.sync_token.read().await.clone()
}
#[cfg(feature = "e2e-encryption")]
async fn handle_verification_event(
&self,
event: &AnySyncMessageLikeEvent,
room_id: &RoomId,
) -> Result<()> {
if let Some(olm) = self.olm_machine().await.as_ref() {
olm.receive_verification_event(&event.clone().into_full_event(room_id.to_owned()))
.await?;
}
Ok(())
}
#[cfg(feature = "e2e-encryption")]
async fn decrypt_sync_room_event(
&self,
event: &Raw<AnySyncTimelineEvent>,
room_id: &RoomId,
) -> Result<Option<SyncTimelineEvent>> {
let olm = self.olm_machine().await;
let Some(olm) = olm.as_ref() else { return Ok(None) };
let event: SyncTimelineEvent =
olm.decrypt_room_event(event.cast_ref(), room_id).await?.into();
if let Ok(AnySyncTimelineEvent::MessageLike(e)) = event.event.deserialize() {
match &e {
AnySyncMessageLikeEvent::RoomMessage(SyncMessageLikeEvent::Original(
original_event,
)) => {
if let MessageType::VerificationRequest(_) = &original_event.content.msgtype {
self.handle_verification_event(&e, room_id).await?;
}
}
_ if e.event_type().to_string().starts_with("m.key.verification") => {
self.handle_verification_event(&e, room_id).await?;
}
_ => (),
}
}
Ok(Some(event))
}
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all, fields(room_id = ?room_info.room_id))]
pub(crate) async fn handle_timeline(
&self,
room: &Room,
limited: bool,
events: Vec<Raw<AnySyncTimelineEvent>>,
prev_batch: Option<String>,
push_rules: &Ruleset,
user_ids: &mut BTreeSet<OwnedUserId>,
room_info: &mut RoomInfo,
changes: &mut StateChanges,
notifications: &mut BTreeMap<OwnedRoomId, Vec<Notification>>,
ambiguity_cache: &mut AmbiguityCache,
) -> Result<Timeline> {
let mut timeline = Timeline::new(limited, prev_batch);
let mut push_context = self.get_push_room_context(room, room_info, changes).await?;
for event in events {
let mut event: SyncTimelineEvent = event.into();
match event.event.deserialize() {
Ok(e) => {
#[allow(clippy::single_match)]
match &e {
AnySyncTimelineEvent::State(s) => {
match s {
AnySyncStateEvent::RoomMember(member) => {
Box::pin(ambiguity_cache.handle_event(
changes,
room.room_id(),
member,
))
.await?;
match member.membership() {
MembershipState::Join | MembershipState::Invite => {
user_ids.insert(member.state_key().to_owned());
}
_ => {
user_ids.remove(member.state_key());
}
}
if member.state_key() == member.sender() {
changes
.profiles
.entry(room.room_id().to_owned())
.or_default()
.insert(member.sender().to_owned(), member.into());
}
}
_ => {
room_info.handle_state_event(s);
}
}
let raw_event: Raw<AnySyncStateEvent> = event.event.clone().cast();
changes.add_state_event(room.room_id(), s.clone(), raw_event);
}
AnySyncTimelineEvent::MessageLike(
AnySyncMessageLikeEvent::RoomRedaction(r),
) => {
let room_version =
room_info.room_version().unwrap_or(&RoomVersionId::V1);
if let Some(redacts) = r.redacts(room_version) {
room_info.handle_redaction(r, event.event.cast_ref());
let raw_event = event.event.clone().cast();
changes.add_redaction(room.room_id(), redacts, raw_event);
}
}
#[cfg(feature = "e2e-encryption")]
AnySyncTimelineEvent::MessageLike(e) => match e {
AnySyncMessageLikeEvent::RoomEncrypted(
SyncMessageLikeEvent::Original(_),
) => {
if let Ok(Some(e)) = Box::pin(
self.decrypt_sync_room_event(&event.event, room.room_id()),
)
.await
{
event = e;
}
}
AnySyncMessageLikeEvent::RoomMessage(
SyncMessageLikeEvent::Original(original_event),
) => match &original_event.content.msgtype {
MessageType::VerificationRequest(_) => {
Box::pin(self.handle_verification_event(e, room.room_id()))
.await?;
}
_ => (),
},
_ if e.event_type().to_string().starts_with("m.key.verification") => {
Box::pin(self.handle_verification_event(e, room.room_id())).await?;
}
_ => (),
},
#[cfg(not(feature = "e2e-encryption"))]
AnySyncTimelineEvent::MessageLike(_) => (),
}
if let Some(context) = &mut push_context {
self.update_push_room_context(
context,
room.own_user_id(),
room_info,
changes,
)
.await;
} else {
push_context = self.get_push_room_context(room, room_info, changes).await?;
}
if let Some(context) = &push_context {
let actions = push_rules.get_actions(&event.event, context);
if actions.iter().any(Action::should_notify) {
notifications.entry(room.room_id().to_owned()).or_default().push(
Notification::new(
actions.to_owned(),
event.event.clone(),
false,
room.room_id().to_owned(),
MilliSecondsSinceUnixEpoch::now(),
),
);
}
event.push_actions = actions.to_owned();
}
}
Err(e) => {
warn!("Error deserializing event {e:?}");
}
}
timeline.events.push(event);
}
Ok(timeline)
}
#[instrument(skip_all, fields(room_id = ?room_info.room_id))]
pub(crate) fn handle_invited_state(
&self,
events: &[Raw<AnyStrippedStateEvent>],
room_info: &mut RoomInfo,
changes: &mut StateChanges,
) {
let mut state_events = BTreeMap::new();
for raw_event in events {
match raw_event.deserialize() {
Ok(e) => {
room_info.handle_stripped_state_event(&e);
state_events
.entry(e.event_type())
.or_insert_with(BTreeMap::new)
.insert(e.state_key().to_owned(), raw_event.clone());
}
Err(err) => {
warn!(
room_id = ?room_info.room_id,
"Couldn't deserialize stripped state event: {err:?}",
);
}
}
}
changes.stripped_state.insert(room_info.room_id().to_owned(), state_events);
}
#[instrument(skip_all, fields(room_id = ?room_info.room_id))]
pub(crate) async fn handle_state(
&self,
raw_events: &[Raw<AnySyncStateEvent>],
events: &[AnySyncStateEvent],
room_info: &mut RoomInfo,
changes: &mut StateChanges,
ambiguity_cache: &mut AmbiguityCache,
) -> StoreResult<BTreeSet<OwnedUserId>> {
let mut state_events = BTreeMap::new();
let mut user_ids = BTreeSet::new();
let mut profiles = BTreeMap::new();
assert_eq!(raw_events.len(), events.len());
for (raw_event, event) in iter::zip(raw_events, events) {
room_info.handle_state_event(event);
if let AnySyncStateEvent::RoomMember(member) = &event {
ambiguity_cache.handle_event(changes, &room_info.room_id, member).await?;
match member.membership() {
MembershipState::Join | MembershipState::Invite => {
user_ids.insert(member.state_key().to_owned());
}
_ => (),
}
if member.state_key() == member.sender() {
profiles.insert(member.sender().to_owned(), member.into());
}
}
state_events
.entry(event.event_type())
.or_insert_with(BTreeMap::new)
.insert(event.state_key().to_owned(), raw_event.clone());
}
changes.profiles.insert((*room_info.room_id).to_owned(), profiles);
changes.state.insert((*room_info.room_id).to_owned(), state_events);
Ok(user_ids)
}
#[instrument(skip_all, fields(?room_id))]
pub(crate) async fn handle_room_account_data(
&self,
room_id: &RoomId,
events: &[Raw<AnyRoomAccountDataEvent>],
changes: &mut StateChanges,
) {
for raw_event in events {
if let Ok(event) = raw_event.deserialize() {
changes.add_room_account_data(room_id, event, raw_event.clone());
}
}
}
#[instrument(skip_all)]
pub(crate) async fn handle_account_data(
&self,
events: &[Raw<AnyGlobalAccountDataEvent>],
changes: &mut StateChanges,
) {
let mut account_data = BTreeMap::new();
for raw_event in events {
let event = match raw_event.deserialize() {
Ok(e) => e,
Err(e) => {
let event_type: Option<String> = raw_event.get_field("type").ok().flatten();
warn!(event_type, "Failed to deserialize a global account data event: {e}");
continue;
}
};
if let AnyGlobalAccountDataEvent::Direct(e) = &event {
for (user_id, rooms) in e.content.iter() {
for room_id in rooms {
trace!(
?room_id, target = ?user_id,
"Marking room as direct room"
);
if let Some(room) = changes.room_infos.get_mut(room_id) {
room.base_info.dm_targets.insert(user_id.clone());
} else if let Some(room) = self.store.get_room(room_id) {
let mut info = room.clone_info();
if info.base_info.dm_targets.insert(user_id.clone()) {
changes.add_room(info);
}
}
}
}
}
account_data.insert(event.event_type(), raw_event.clone());
}
changes.account_data = account_data;
}
#[cfg(feature = "e2e-encryption")]
#[instrument(skip_all)]
pub(crate) async fn preprocess_to_device_events(
&self,
encryption_sync_changes: EncryptionSyncChanges<'_>,
#[cfg(feature = "experimental-sliding-sync")] changes: &mut StateChanges,
#[cfg(not(feature = "experimental-sliding-sync"))] _changes: &mut StateChanges,
) -> Result<Vec<Raw<ruma::events::AnyToDeviceEvent>>> {
if let Some(o) = self.olm_machine().await.as_ref() {
let (events, room_key_updates) =
o.receive_sync_changes(encryption_sync_changes).await?;
#[cfg(feature = "experimental-sliding-sync")]
for room_key_update in room_key_updates {
if let Some(room) = self.get_room(&room_key_update.room_id) {
self.decrypt_latest_events(&room, changes).await;
}
}
#[cfg(not(feature = "experimental-sliding-sync"))]
drop(room_key_updates); Ok(events)
} else {
Ok(encryption_sync_changes.to_device_events)
}
}
#[cfg(all(feature = "e2e-encryption", feature = "experimental-sliding-sync"))]
async fn decrypt_latest_events(&self, room: &Room, changes: &mut StateChanges) {
if let Some((found, found_index)) = self.decrypt_latest_suitable_event(room).await {
room.on_latest_event_decrypted(found, found_index, changes);
}
}
#[cfg(all(feature = "e2e-encryption", feature = "experimental-sliding-sync"))]
async fn decrypt_latest_suitable_event(
&self,
room: &Room,
) -> Option<(Box<LatestEvent>, usize)> {
let enc_events = room.latest_encrypted_events();
for (i, event) in enc_events.iter().enumerate().rev() {
let decrypt_sync_room_event =
Box::pin(self.decrypt_sync_room_event(event, room.room_id()));
if let Ok(Some(decrypted)) = decrypt_sync_room_event.await {
if let Ok(any_sync_event) = decrypted.event.deserialize() {
if let PossibleLatestEvent::YesRoomMessage(_) =
is_suitable_for_latest_event(&any_sync_event)
{
return Some((Box::new(LatestEvent::new(decrypted)), i));
}
}
}
}
None
}
pub async fn room_joined(&self, room_id: &RoomId) -> Result<Room> {
let room = self.store.get_or_create_room(room_id, RoomState::Joined);
if room.state() != RoomState::Joined {
let _sync_lock = self.sync_lock().read().await;
let mut room_info = room.clone_info();
room_info.mark_as_joined();
room_info.mark_state_partially_synced();
room_info.mark_members_missing(); let mut changes = StateChanges::default();
changes.add_room(room_info.clone());
self.store.save_changes(&changes).await?; room.set_room_info(room_info); }
Ok(room)
}
pub async fn room_left(&self, room_id: &RoomId) -> Result<()> {
let room = self.store.get_or_create_room(room_id, RoomState::Left);
if room.state() != RoomState::Left {
let _sync_lock = self.sync_lock().read().await;
let mut room_info = room.clone_info();
room_info.mark_as_left();
room_info.mark_state_partially_synced();
room_info.mark_members_missing(); let mut changes = StateChanges::default();
changes.add_room(room_info.clone());
self.store.save_changes(&changes).await?; room.set_room_info(room_info); }
Ok(())
}
pub fn sync_lock(&self) -> &RwLock<()> {
self.store.sync_lock()
}
#[instrument(skip_all)]
pub async fn receive_sync_response(
&self,
response: api::sync::sync_events::v3::Response,
) -> Result<SyncResponse> {
if self.store.sync_token.read().await.as_ref() == Some(&response.next_batch) {
info!("Got the same sync response twice");
return Ok(SyncResponse::default());
}
let now = Instant::now();
let mut changes = Box::new(StateChanges::new(response.next_batch.clone()));
#[cfg(feature = "e2e-encryption")]
let to_device = self
.preprocess_to_device_events(
EncryptionSyncChanges {
to_device_events: response.to_device.events,
changed_devices: &response.device_lists,
one_time_keys_counts: &response.device_one_time_keys_count,
unused_fallback_keys: response.device_unused_fallback_key_types.as_deref(),
next_batch_token: Some(response.next_batch.clone()),
},
&mut changes,
)
.await?;
#[cfg(not(feature = "e2e-encryption"))]
let to_device = response.to_device.events;
let mut ambiguity_cache = AmbiguityCache::new(self.store.inner.clone());
self.handle_account_data(&response.account_data.events, &mut changes).await;
let push_rules = self.get_push_rules(&changes).await?;
let mut new_rooms = Rooms::default();
let mut notifications = Default::default();
for (room_id, new_info) in response.rooms.join {
let room = self.store.get_or_create_room(&room_id, RoomState::Joined);
let mut room_info = room.clone_info();
room_info.mark_as_joined();
room_info.update_summary(&new_info.summary);
room_info.set_prev_batch(new_info.timeline.prev_batch.as_deref());
room_info.mark_state_fully_synced();
let state_events = Self::deserialize_state_events(&new_info.state.events);
let (raw_state_events, state_events): (Vec<_>, Vec<_>) =
state_events.into_iter().unzip();
let mut user_ids = self
.handle_state(
&raw_state_events,
&state_events,
&mut room_info,
&mut changes,
&mut ambiguity_cache,
)
.await?;
for raw in &new_info.ephemeral.events {
match raw.deserialize() {
Ok(AnySyncEphemeralRoomEvent::Receipt(event)) => {
changes.add_receipts(&room_id, event.content);
}
Ok(_) => {}
Err(e) => {
let event_id: Option<String> = raw.get_field("event_id").ok().flatten();
#[rustfmt::skip]
info!(
?room_id, event_id,
"Failed to deserialize ephemeral room event: {e}"
);
}
}
}
if new_info.timeline.limited {
room_info.mark_members_missing();
}
let timeline = self
.handle_timeline(
&room,
new_info.timeline.limited,
new_info.timeline.events,
new_info.timeline.prev_batch,
&push_rules,
&mut user_ids,
&mut room_info,
&mut changes,
&mut notifications,
&mut ambiguity_cache,
)
.await?;
self.handle_room_account_data(&room_id, &new_info.account_data.events, &mut changes)
.await;
#[cfg(feature = "e2e-encryption")]
if room_info.is_encrypted() {
if let Some(o) = self.olm_machine().await.as_ref() {
if !room.is_encrypted() {
let user_ids =
self.store.get_user_ids(&room_id, RoomMemberships::ACTIVE).await?;
o.update_tracked_users(user_ids.iter().map(Deref::deref)).await?
}
o.update_tracked_users(user_ids.iter().map(Deref::deref)).await?;
}
}
let notification_count = new_info.unread_notifications.into();
room_info.update_notification_count(notification_count);
new_rooms.join.insert(
room_id,
JoinedRoom::new(
timeline,
new_info.state.events,
new_info.account_data.events,
new_info.ephemeral.events,
notification_count,
),
);
changes.add_room(room_info);
}
for (room_id, new_info) in response.rooms.leave {
let room = self.store.get_or_create_room(&room_id, RoomState::Left);
let mut room_info = room.clone_info();
room_info.mark_as_left();
room_info.mark_state_partially_synced();
let state_events = Self::deserialize_state_events(&new_info.state.events);
let (raw_state_events, state_events): (Vec<_>, Vec<_>) =
state_events.into_iter().unzip();
let mut user_ids = self
.handle_state(
&raw_state_events,
&state_events,
&mut room_info,
&mut changes,
&mut ambiguity_cache,
)
.await?;
let timeline = self
.handle_timeline(
&room,
new_info.timeline.limited,
new_info.timeline.events,
new_info.timeline.prev_batch,
&push_rules,
&mut user_ids,
&mut room_info,
&mut changes,
&mut notifications,
&mut ambiguity_cache,
)
.await?;
self.handle_room_account_data(&room_id, &new_info.account_data.events, &mut changes)
.await;
changes.add_room(room_info);
new_rooms.leave.insert(
room_id,
LeftRoom::new(timeline, new_info.state.events, new_info.account_data.events),
);
}
for (room_id, new_info) in response.rooms.invite {
let room = self.store.get_or_create_room(&room_id, RoomState::Invited);
let mut room_info = room.clone_info();
room_info.mark_as_invited();
room_info.mark_state_fully_synced();
self.handle_invited_state(&new_info.invite_state.events, &mut room_info, &mut changes);
changes.add_room(room_info);
new_rooms.invite.insert(room_id, new_info);
}
self.handle_account_data(&response.account_data.events, &mut changes).await;
changes.presence = response
.presence
.events
.iter()
.filter_map(|e| {
let event = e.deserialize().ok()?;
Some((event.sender, e.clone()))
})
.collect();
changes.ambiguity_maps = ambiguity_cache.cache;
let sync_lock = self.sync_lock().write().await;
self.store.save_changes(&changes).await?;
*self.store.sync_token.write().await = Some(response.next_batch.clone());
self.apply_changes(&changes);
drop(sync_lock);
info!("Processed a sync response in {:?}", now.elapsed());
let response = SyncResponse {
rooms: new_rooms,
presence: response.presence.events,
account_data: response.account_data.events,
to_device,
ambiguity_changes: AmbiguityChanges { changes: ambiguity_cache.changes },
notifications,
};
Ok(response)
}
pub(crate) fn apply_changes(&self, changes: &StateChanges) {
if changes.account_data.contains_key(&GlobalAccountDataEventType::IgnoredUserList) {
self.ignore_user_list_changes.set(());
}
for (room_id, room_info) in &changes.room_infos {
if let Some(room) = self.store.get_room(room_id) {
room.set_room_info(room_info.clone())
}
}
}
#[instrument(skip_all, fields(?room_id))]
pub async fn receive_members(
&self,
room_id: &RoomId,
response: &api::membership::get_member_events::v3::Response,
) -> Result<MembersResponse> {
let mut chunk = Vec::with_capacity(response.chunk.len());
let mut ambiguity_cache = AmbiguityCache::new(self.store.inner.clone());
if let Some(room) = self.store.get_room(room_id) {
let mut changes = StateChanges::default();
#[cfg(feature = "e2e-encryption")]
let mut user_ids = BTreeSet::new();
for raw_event in &response.chunk {
let member = match raw_event.deserialize() {
Ok(ev) => ev,
Err(e) => {
let event_id: Option<String> =
raw_event.get_field("event_id").ok().flatten();
debug!(event_id, "Failed to deserialize member event: {e}");
continue;
}
};
#[cfg(feature = "e2e-encryption")]
match member.membership() {
MembershipState::Join | MembershipState::Invite => {
user_ids.insert(member.state_key().to_owned());
}
_ => (),
}
let sync_member: SyncRoomMemberEvent = member.clone().into();
ambiguity_cache.handle_event(&changes, room_id, &sync_member).await?;
if member.state_key() == member.sender() {
changes
.profiles
.entry(room_id.to_owned())
.or_default()
.insert(member.sender().to_owned(), sync_member.into());
}
changes
.state
.entry(room_id.to_owned())
.or_default()
.entry(member.event_type())
.or_default()
.insert(member.state_key().to_string(), raw_event.clone().cast());
chunk.push(member);
}
#[cfg(feature = "e2e-encryption")]
if room.is_encrypted() {
if let Some(o) = self.olm_machine().await.as_ref() {
o.update_tracked_users(user_ids.iter().map(Deref::deref)).await?
}
}
changes.ambiguity_maps = ambiguity_cache.cache;
let _sync_lock = self.sync_lock().write().await;
let mut room_info = room.clone_info();
room_info.mark_members_synced();
changes.add_room(room_info);
self.store.save_changes(&changes).await?;
self.apply_changes(&changes);
}
Ok(MembersResponse {
chunk,
ambiguity_changes: AmbiguityChanges { changes: ambiguity_cache.changes },
})
}
pub async fn receive_filter_upload(
&self,
filter_name: &str,
response: &api::filter::create_filter::v3::Response,
) -> Result<()> {
Ok(self
.store
.set_kv_data(
StateStoreDataKey::Filter(filter_name),
StateStoreDataValue::Filter(response.filter_id.clone()),
)
.await?)
}
pub async fn get_filter(&self, filter_name: &str) -> StoreResult<Option<String>> {
let filter = self
.store
.get_kv_data(StateStoreDataKey::Filter(filter_name))
.await?
.map(|d| d.into_filter().expect("State store data not a filter"));
Ok(filter)
}
#[cfg(feature = "e2e-encryption")]
pub async fn share_room_key(&self, room_id: &RoomId) -> Result<Vec<Arc<ToDeviceRequest>>> {
match self.olm_machine().await.as_ref() {
Some(o) => {
let (history_visibility, settings) = self
.get_room(room_id)
.map(|r| (r.history_visibility(), r.encryption_settings()))
.unwrap_or((HistoryVisibility::Joined, None));
let filter = if history_visibility == HistoryVisibility::Joined {
RoomMemberships::JOIN
} else {
RoomMemberships::ACTIVE
};
let members = self.store.get_user_ids(room_id, filter).await?;
let settings = settings.ok_or(Error::EncryptionNotEnabled)?;
let settings = EncryptionSettings::new(settings, history_visibility, false);
Ok(o.share_room_key(room_id, members.iter().map(Deref::deref), settings).await?)
}
None => panic!("Olm machine wasn't started"),
}
}
pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
self.store.get_room(room_id)
}
#[cfg(feature = "e2e-encryption")]
pub async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
self.olm_machine.read().await
}
pub async fn get_push_rules(&self, changes: &StateChanges) -> Result<Ruleset> {
if let Some(event) = changes
.account_data
.get(&GlobalAccountDataEventType::PushRules)
.and_then(|ev| ev.deserialize_as::<PushRulesEvent>().ok())
{
Ok(event.content.global)
} else if let Some(event) = self
.store
.get_account_data_event_static::<PushRulesEventContent>()
.await?
.and_then(|ev| ev.deserialize().ok())
{
Ok(event.content.global)
} else if let Some(session_meta) = self.store.session_meta() {
Ok(Ruleset::server_default(&session_meta.user_id))
} else {
Ok(Ruleset::new())
}
}
pub async fn get_push_room_context(
&self,
room: &Room,
room_info: &RoomInfo,
changes: &StateChanges,
) -> Result<Option<PushConditionRoomCtx>> {
let room_id = room.room_id();
let user_id = room.own_user_id();
let member_count = room_info.active_members_count();
let user_display_name = if let Some(Ok(AnySyncStateEvent::RoomMember(member))) = changes
.state
.get(room_id)
.and_then(|events| events.get(&StateEventType::RoomMember))
.and_then(|members| members.get(user_id.as_str()))
.map(Raw::deserialize)
{
member
.as_original()
.and_then(|ev| ev.content.displayname.clone())
.unwrap_or_else(|| user_id.localpart().to_owned())
} else if let Some(member) = Box::pin(room.get_member(user_id)).await? {
member.name().to_owned()
} else {
return Ok(None);
};
let room_power_levels = if let Some(event) = changes
.state
.get(room_id)
.and_then(|types| types.get(&StateEventType::RoomPowerLevels)?.get(""))
.and_then(|e| e.deserialize_as::<RoomPowerLevelsEvent>().ok())
{
event.power_levels()
} else if let Some(event) = self
.store
.get_state_event_static::<RoomPowerLevelsEventContent>(room_id)
.await?
.and_then(|e| e.deserialize().ok())
{
event.power_levels()
} else {
return Ok(None);
};
Ok(Some(PushConditionRoomCtx {
user_id: user_id.to_owned(),
room_id: room_id.to_owned(),
member_count: UInt::new(member_count).unwrap_or(UInt::MAX),
user_display_name,
users_power_levels: room_power_levels.users,
default_power_level: room_power_levels.users_default,
notification_power_levels: room_power_levels.notifications,
}))
}
pub async fn update_push_room_context(
&self,
push_rules: &mut PushConditionRoomCtx,
user_id: &UserId,
room_info: &RoomInfo,
changes: &StateChanges,
) {
let room_id = &*room_info.room_id;
push_rules.member_count = UInt::new(room_info.active_members_count()).unwrap_or(UInt::MAX);
if let Some(Ok(AnySyncStateEvent::RoomMember(member))) = changes
.state
.get(room_id)
.and_then(|events| events.get(&StateEventType::RoomMember))
.and_then(|members| members.get(user_id.as_str()))
.map(Raw::deserialize)
{
push_rules.user_display_name = member
.as_original()
.and_then(|ev| ev.content.displayname.clone())
.unwrap_or_else(|| user_id.localpart().to_owned())
}
if let Some(AnySyncStateEvent::RoomPowerLevels(event)) = changes
.state
.get(room_id)
.and_then(|types| types.get(&StateEventType::RoomPowerLevels)?.get(""))
.and_then(|e| e.deserialize().ok())
{
let room_power_levels = event.power_levels();
push_rules.users_power_levels = room_power_levels.users;
push_rules.default_power_level = room_power_levels.users_default;
push_rules.notification_power_levels = room_power_levels.notifications;
}
}
pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<()> {
self.ignore_user_list_changes.subscribe()
}
pub(crate) fn deserialize_state_events(
raw_events: &[Raw<AnySyncStateEvent>],
) -> Vec<(Raw<AnySyncStateEvent>, AnySyncStateEvent)> {
raw_events
.iter()
.filter_map(|raw_event| match raw_event.deserialize() {
Ok(event) => Some((raw_event.clone(), event)),
Err(e) => {
warn!("Couldn't deserialize state event: {e}");
None
}
})
.collect()
}
}
impl Default for BaseClient {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use matrix_sdk_test::{
async_test, response_from_file, sync_timeline_event, InvitedRoomBuilder, JoinedRoomBuilder,
LeftRoomBuilder, StrippedStateTestEvent, SyncResponseBuilder,
};
use ruma::{
api::{client as api, IncomingResponse},
room_id, user_id, RoomId, UserId,
};
use serde_json::json;
use super::BaseClient;
use crate::{store::StateStoreExt, DisplayName, Room, RoomState, SessionMeta, StateChanges};
#[async_test]
async fn invite_after_leaving() {
let user_id = user_id!("@alice:example.org");
let room_id = room_id!("!test:example.org");
let client = logged_in_client(user_id).await;
let mut ev_builder = SyncResponseBuilder::new();
let response = ev_builder
.add_left_room(LeftRoomBuilder::new(room_id).add_timeline_event(sync_timeline_event!({
"content": {
"displayname": "Alice",
"membership": "left",
},
"event_id": "$994173582443PhrSn:example.org",
"origin_server_ts": 1432135524678u64,
"sender": user_id,
"state_key": user_id,
"type": "m.room.member",
})))
.build_sync_response();
client.receive_sync_response(response).await.unwrap();
assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Left);
let response = ev_builder
.add_invited_room(InvitedRoomBuilder::new(room_id).add_state_event(
StrippedStateTestEvent::Custom(json!({
"content": {
"displayname": "Alice",
"membership": "invite",
},
"event_id": "$143273582443PhrSn:example.org",
"origin_server_ts": 1432735824653u64,
"sender": "@example:example.org",
"state_key": user_id,
"type": "m.room.member",
})),
))
.build_sync_response();
client.receive_sync_response(response).await.unwrap();
assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Invited);
}
#[async_test]
async fn invite_displayname_integration_test() {
let user_id = user_id!("@alice:example.org");
let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
let client = logged_in_client(user_id).await;
let response = api::sync::sync_events::v3::Response::try_from_http_response(response_from_file(&json!({
"next_batch": "asdkl;fjasdkl;fj;asdkl;f",
"device_one_time_keys_count": {
"signed_curve25519": 50u64
},
"device_unused_fallback_key_types": [
"signed_curve25519"
],
"rooms": {
"invite": {
"!ithpyNKDtmhneaTQja:example.org": {
"invite_state": {
"events": [
{
"content": {
"creator": "@test:example.org",
"room_version": "9"
},
"sender": "@test:example.org",
"state_key": "",
"type": "m.room.create"
},
{
"content": {
"join_rule": "invite"
},
"sender": "@test:example.org",
"state_key": "",
"type": "m.room.join_rules"
},
{
"content": {
"algorithm": "m.megolm.v1.aes-sha2"
},
"sender": "@test:example.org",
"state_key": "",
"type": "m.room.encryption"
},
{
"content": {
"avatar_url": "mxc://example.org/dcBBDwuWEUrjfrOchvkirUST",
"displayname": "Kyra",
"membership": "join"
},
"sender": "@test:example.org",
"state_key": "@test:example.org",
"type": "m.room.member"
},
{
"content": {
"avatar_url": "mxc://example.org/ABFEXSDrESxovWwEnCYdNcHT",
"displayname": "alice",
"is_direct": true,
"membership": "invite"
},
"origin_server_ts": 1650878657984u64,
"sender": "@test:example.org",
"state_key": "@alice:example.org",
"type": "m.room.member",
"unsigned": {
"age": 14u64
},
"event_id": "$fLDqltg9Puj-kWItLSFVHPGN4YkgpYQf2qImPzdmgrE"
}
]
}
}
}
}
}))).expect("static json doesn't fail to parse");
client.receive_sync_response(response).await.unwrap();
let room = client.get_room(room_id).expect("Room not found");
assert_eq!(room.state(), RoomState::Invited);
assert_eq!(
room.display_name().await.expect("fetching display name failed"),
DisplayName::Calculated("Kyra".to_owned())
);
}
#[cfg(all(feature = "e2e-encryption", feature = "experimental-sliding-sync"))]
#[async_test]
async fn when_there_are_no_latest_encrypted_events_decrypting_them_does_nothing() {
let user_id = user_id!("@u:u.to");
let room_id = room_id!("!r:u.to");
let client = logged_in_client(user_id).await;
let room = process_room_join(&client, room_id, "$1", user_id).await;
assert!(room.latest_encrypted_events().is_empty());
assert!(room.latest_event().is_none());
let mut changes = StateChanges::default();
client.decrypt_latest_events(&room, &mut changes).await;
assert!(room.latest_encrypted_events().is_empty());
assert!(room.latest_event().is_none());
assert!(changes.room_infos.is_empty());
}
async fn logged_in_client(user_id: &UserId) -> BaseClient {
let client = BaseClient::new();
client
.set_session_meta(SessionMeta {
user_id: user_id.to_owned(),
device_id: "FOOBAR".into(),
})
.await
.expect("set_session_meta failed!");
client
}
#[cfg(feature = "e2e-encryption")]
async fn process_room_join(
client: &BaseClient,
room_id: &RoomId,
event_id: &str,
user_id: &UserId,
) -> Room {
let mut ev_builder = SyncResponseBuilder::new();
let response = ev_builder
.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event(
sync_timeline_event!({
"content": {
"displayname": "Alice",
"membership": "join",
},
"event_id": event_id,
"origin_server_ts": 1432135524678u64,
"sender": user_id,
"state_key": user_id,
"type": "m.room.member",
}),
))
.build_sync_response();
client.receive_sync_response(response).await.unwrap();
client.get_room(room_id).expect("Just-created room not found!")
}
#[async_test]
async fn deserialization_failure_test() {
let user_id = user_id!("@alice:example.org");
let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
let client = BaseClient::new();
client
.set_session_meta(SessionMeta {
user_id: user_id.to_owned(),
device_id: "FOOBAR".into(),
})
.await
.unwrap();
let response = api::sync::sync_events::v3::Response::try_from_http_response(
response_from_file(&json!({
"next_batch": "asdkl;fjasdkl;fj;asdkl;f",
"rooms": {
"join": {
"!ithpyNKDtmhneaTQja:example.org": {
"state": {
"events": [
{
"invalid": "invalid",
},
{
"content": {
"name": "The room name"
},
"event_id": "$143273582443PhrSn:example.org",
"origin_server_ts": 1432735824653u64,
"room_id": "!jEsUZKDJdhlrceRyVU:example.org",
"sender": "@example:example.org",
"state_key": "",
"type": "m.room.name",
"unsigned": {
"age": 1234
}
},
]
}
}
}
}
})),
)
.expect("static json doesn't fail to parse");
client.receive_sync_response(response).await.unwrap();
client
.store()
.get_state_event_static::<ruma::events::room::name::RoomNameEventContent>(room_id)
.await
.expect("Failed to fetch state event")
.expect("State event not found")
.deserialize()
.expect("Failed to deserialize state event");
}
}