ambient_ecs/
stream.rs

1use std::{
2    collections::{HashMap, HashSet},
3    fmt::Display,
4    sync::Arc,
5};
6
7use itertools::Itertools;
8use serde::{Deserialize, Serialize};
9
10use super::{
11    ArchetypeFilter, Component, ComponentValue, Entity, EntityId, FramedEventsReader, Query,
12    QueryState, World,
13};
14use crate::{ComponentDesc, ComponentEntry, Serializable};
15
16#[derive(Serialize, Deserialize, Debug, Default, Clone)]
17pub struct WorldDiff {
18    pub changes: Vec<WorldChange>,
19}
20impl WorldDiff {
21    pub fn new() -> Self {
22        Self {
23            changes: Vec::new(),
24        }
25    }
26    pub fn set<T: ComponentValue>(self, id: EntityId, component: Component<T>, value: T) -> Self {
27        self.set_entry(id, ComponentEntry::new(component, value))
28    }
29    pub fn add_component<T: ComponentValue>(
30        self,
31        id: EntityId,
32        component: Component<T>,
33        value: T,
34    ) -> Self {
35        self.add_entry(id, ComponentEntry::new(component, value))
36    }
37
38    pub fn remove_component(mut self, id: EntityId, component: ComponentDesc) -> Self {
39        self.changes
40            .push(WorldChange::RemoveComponents(id, vec![component]));
41        self
42    }
43
44    pub fn remove_components_raw(mut self, id: EntityId, components: Vec<ComponentDesc>) -> Self {
45        self.changes
46            .push(WorldChange::RemoveComponents(id, components));
47        self
48    }
49    pub fn set_entry(mut self, id: EntityId, entry: ComponentEntry) -> Self {
50        self.changes.push(WorldChange::Set(id, entry));
51        self
52    }
53    pub fn add_entry(mut self, id: EntityId, entry: ComponentEntry) -> Self {
54        let mut data = Entity::new();
55        data.set_entry(entry);
56        self.changes.push(WorldChange::AddComponents(id, data));
57        self
58    }
59    pub fn despawn(mut self, ids: Vec<EntityId>) -> Self {
60        self.changes
61            .extend(ids.into_iter().map(WorldChange::Despawn));
62        self
63    }
64    pub fn apply(
65        self,
66        world: &mut World,
67        spanwed_extra_data: Entity,
68        create_revert: bool,
69    ) -> Option<Self> {
70        let revert_changes = self
71            .changes
72            .into_iter()
73            .map(|change| change.apply(world, &spanwed_extra_data, false, create_revert))
74            .collect_vec();
75        if create_revert {
76            Some(Self {
77                changes: revert_changes.into_iter().rev().flatten().collect_vec(),
78            })
79        } else {
80            None
81        }
82    }
83    pub fn is_empty(&self) -> bool {
84        self.changes.len() == 0
85    }
86    /// This creates a list of changes that would take you from the `from` world to the `to` world, if applied to the `from` world.
87    pub fn from_a_to_b(filter: WorldStreamFilter, from: &World, to: &World) -> Self {
88        let from_entities: HashSet<EntityId> = filter.all_entities(from).collect();
89        let to_entities: HashSet<EntityId> = filter.all_entities(to).collect();
90        let spawned = to_entities
91            .iter()
92            .filter(|id| !from_entities.contains(id))
93            .cloned()
94            .collect_vec();
95        let despawned = from_entities
96            .iter()
97            .filter(|id| !to_entities.contains(id))
98            .cloned()
99            .collect_vec();
100        let in_both = to_entities
101            .iter()
102            .filter(|id| from_entities.contains(id))
103            .cloned()
104            .collect_vec();
105
106        let spawned = spawned
107            .into_iter()
108            .map(|id| WorldChange::Spawn(Some(id), filter.read_entity_components(to, id).into()));
109        let despanwed = despawned.into_iter().map(WorldChange::Despawn);
110        let updated = in_both.into_iter().flat_map(|id| {
111            let from_comps: HashMap<_, _> = filter
112                .get_entity_components(from, id)
113                .into_iter()
114                .map(|v| (v.index(), v))
115                .collect();
116            let to_comps: HashMap<_, _> = filter
117                .get_entity_components(to, id)
118                .into_iter()
119                .map(|v| (v.index(), v))
120                .collect();
121
122            let added = to_comps
123                .iter()
124                .filter(|c| !from_comps.contains_key(c.0))
125                .map(|v| *v.1)
126                .collect_vec();
127            let removed = from_comps
128                .iter()
129                .filter(|c| !to_comps.contains_key(c.0))
130                .map(|v| *v.1)
131                .collect_vec();
132            let in_both = to_comps
133                .iter()
134                .filter(|c| from_comps.contains_key(c.0))
135                .collect_vec();
136
137            let changed = in_both
138                .iter()
139                .filter(|&c| {
140                    from.get_component_content_version(id, *c.0).unwrap()
141                        != to.get_component_content_version(id, *c.0).unwrap()
142                })
143                .collect_vec();
144
145            let added: Entity = added
146                .iter()
147                .map(|&comp| to.get_entry(id, comp).unwrap())
148                .collect();
149
150            let added = if !added.is_empty() {
151                vec![WorldChange::AddComponents(id, added)]
152            } else {
153                vec![]
154            };
155            let removed = if !removed.is_empty() {
156                vec![WorldChange::RemoveComponents(id, removed)]
157            } else {
158                vec![]
159            };
160            let changed = changed
161                .into_iter()
162                .map(|(_, &comp)| {
163                    let entry = to.get_entry(id, comp).unwrap();
164                    WorldChange::Set(id, entry)
165                })
166                .collect_vec();
167            added
168                .into_iter()
169                .chain(removed.into_iter())
170                .chain(changed.into_iter())
171        });
172
173        Self {
174            changes: despanwed.chain(spawned).chain(updated).collect_vec(),
175        }
176    }
177}
178impl Display for WorldDiff {
179    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
180        for change in self.changes.iter().take(3) {
181            change.fmt(f)?;
182            write!(f, " ").unwrap();
183        }
184        if self.changes.len() > 3 {
185            write!(f, "...{} more", self.changes.len() - 3).unwrap();
186        }
187        Ok(())
188    }
189}
190
191#[derive(Clone, Copy, PartialEq, Eq, Debug)]
192pub enum WorldStreamCompEvent {
193    Init,
194    Set,
195    Spawn,
196    AddComponent,
197    RemoveComponent,
198}
199
200#[derive(Clone)]
201pub struct WorldStreamFilter {
202    arch_filter: ArchetypeFilter,
203    component_filter: Arc<dyn Fn(ComponentDesc, WorldStreamCompEvent) -> bool + Sync + Send>,
204}
205impl WorldStreamFilter {
206    pub fn new(
207        arch_filter: ArchetypeFilter,
208        component_filter: Arc<dyn Fn(ComponentDesc, WorldStreamCompEvent) -> bool + Sync + Send>,
209    ) -> Self {
210        Self {
211            arch_filter,
212            component_filter,
213        }
214    }
215    pub fn initial_diff(&self, world: &World) -> WorldDiff {
216        WorldDiff {
217            changes: self
218                .all_entities(world)
219                .map(|id| {
220                    WorldChange::Spawn(Some(id), self.read_entity_components(world, id).into())
221                })
222                .collect_vec(),
223        }
224    }
225    pub fn all_entities<'a>(&self, world: &'a World) -> impl Iterator<Item = EntityId> + 'a {
226        Query::all()
227            .filter(&self.arch_filter)
228            .iter(world, None)
229            .map(|x| x.id())
230    }
231    pub fn get_entity_components(&self, world: &World, id: EntityId) -> Vec<ComponentDesc> {
232        world
233            .get_components(id)
234            .unwrap()
235            .into_iter()
236            .filter(|&comp| (self.component_filter)(comp, WorldStreamCompEvent::Init))
237            .collect_vec()
238    }
239    fn read_entity_components(&self, world: &World, id: EntityId) -> Vec<ComponentEntry> {
240        self.get_entity_components(world, id)
241            .into_iter()
242            .map(|comp| world.get_entry(id, comp).unwrap())
243            .collect_vec()
244    }
245}
246impl Default for WorldStreamFilter {
247    fn default() -> Self {
248        Self {
249            arch_filter: Default::default(),
250            component_filter: Arc::new(|_, _| true),
251        }
252    }
253}
254
255#[derive(Serialize, Deserialize, Debug, Clone)]
256pub enum WorldChange {
257    Spawn(Option<EntityId>, Entity),
258    Despawn(EntityId),
259    AddComponents(EntityId, Entity),
260    RemoveComponents(EntityId, Vec<ComponentDesc>),
261    Set(EntityId, ComponentEntry),
262}
263
264impl WorldChange {
265    pub fn is_set(&self) -> bool {
266        matches!(self, Self::Set(_, _))
267    }
268
269    pub fn is_remove_components(&self) -> bool {
270        matches!(self, Self::RemoveComponents(_, _))
271    }
272
273    fn apply(
274        self,
275        world: &mut World,
276        spanwed_extra_data: &Entity,
277        panic_on_error: bool,
278        create_revert: bool,
279    ) -> Option<Self> {
280        match self {
281            Self::Spawn(id, data) => {
282                if let Some(id) = id {
283                    if !world.spawn_with_id(id, data.with_merge(spanwed_extra_data.clone())) {
284                        if panic_on_error {
285                            panic!("WorldChange::apply spawn_mirror entity already exists: {id:?}");
286                        } else {
287                            log::error!(
288                                "WorldChange::apply spawn_mirror entity already exists: {id:?}"
289                            );
290                        }
291                    }
292                    if create_revert {
293                        return Some(Self::Despawn(id));
294                    }
295                } else {
296                    let id = world.spawn(data.with_merge(spanwed_extra_data.clone()));
297                    if create_revert {
298                        return Some(Self::Despawn(id));
299                    }
300                }
301            }
302            Self::Despawn(id) => {
303                let res = if create_revert {
304                    world.get_components(id).ok().map(|components| {
305                        let mut ed = Entity::new();
306                        for comp in components {
307                            // Only serializable components
308                            if comp.has_attribute::<Serializable>() {
309                                ed.set_entry(world.get_entry(id, comp).unwrap());
310                            }
311                        }
312                        Self::Spawn(Some(id), ed)
313                    })
314                } else {
315                    None
316                };
317                world.despawn(id);
318                return res;
319            }
320            Self::AddComponents(id, data) => {
321                let res = if create_revert {
322                    Some(Self::RemoveComponents(id, data.components()))
323                } else {
324                    None
325                };
326                world.add_components(id, data).unwrap();
327                return res;
328            }
329            Self::RemoveComponents(id, comps) => {
330                let res = if create_revert {
331                    Some(Self::AddComponents(
332                        id,
333                        comps
334                            .iter()
335                            .filter_map(|&comp| world.get_entry(id, comp).ok())
336                            .collect(),
337                    ))
338                } else {
339                    None
340                };
341                for comp in comps {
342                    world.remove_component(id, comp).unwrap();
343                }
344                return res;
345            }
346            Self::Set(id, entry) => {
347                // let prev = match entry.set_at_entity(world, id) {
348                let prev = match world.set_entry(id, entry) {
349                    Ok(entry) => entry,
350                    Err(err) => {
351                        if panic_on_error {
352                            panic!("Failed to set: {err:?}");
353                        } else {
354                            return None;
355                        }
356                    }
357                };
358                if create_revert {
359                    return Some(Self::Set(id, prev));
360                }
361            }
362        }
363        None
364    }
365    fn filter(&self, world: &World, filter: &WorldStreamFilter) -> Option<Self> {
366        match self {
367            Self::Spawn(id, data) => {
368                if !filter.arch_filter.matches_entity(world, (*id).unwrap()) {
369                    return None;
370                }
371                let mut data = data.clone();
372                data.filter(&|comp| (filter.component_filter)(comp, WorldStreamCompEvent::Spawn));
373                Some(Self::Spawn(*id, data.clone()))
374            }
375            Self::Despawn(id) => {
376                // TODO: Right now we don't filter despawns, because the spawns are filtered, so the "bad" despawns will just be
377                // ignored on the client side. Maybe should filter them on the server side too
378                Some(Self::Despawn(*id))
379            }
380            Self::AddComponents(id, data) => {
381                if !filter.arch_filter.matches_entity(world, *id) {
382                    return None;
383                }
384                let mut data = data.clone();
385                data.filter(&|comp| {
386                    (filter.component_filter)(comp, WorldStreamCompEvent::AddComponent)
387                });
388                if data.is_empty() {
389                    return None;
390                }
391                Some(Self::AddComponents(*id, data.clone()))
392            }
393            Self::RemoveComponents(id, comps) => {
394                if !filter.arch_filter.matches_entity(world, *id) {
395                    return None;
396                }
397                Some(Self::RemoveComponents(
398                    *id,
399                    comps
400                        .iter()
401                        .filter_map(|&comp| {
402                            if (filter.component_filter)(
403                                comp,
404                                WorldStreamCompEvent::RemoveComponent,
405                            ) {
406                                Some(comp)
407                            } else {
408                                None
409                            }
410                        })
411                        .collect_vec(),
412                ))
413            }
414            Self::Set(_id, _entry) => Some(self.clone()),
415        }
416    }
417}
418impl Display for WorldChange {
419    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
420        match self {
421            WorldChange::Spawn(id, data) => write!(
422                f,
423                "spawn({}, {})",
424                id.unwrap_or(EntityId::null()),
425                data.len()
426            ),
427            WorldChange::Despawn(id) => write!(f, "despawn({id})"),
428            WorldChange::AddComponents(id, data) => write!(f, "add_components({id}, {data:?})"),
429            WorldChange::RemoveComponents(id, _) => write!(f, "remove_components({id})"),
430            WorldChange::Set(id, data) => write!(f, "set({id}, {data:?})"),
431        }
432    }
433}
434
435#[derive(Clone)]
436pub struct WorldStream {
437    changed_qs: QueryState,
438    shape_stream_reader: FramedEventsReader<WorldChange>,
439    filter: WorldStreamFilter,
440    version: u64,
441}
442impl WorldStream {
443    pub fn new(filter: WorldStreamFilter) -> Self {
444        Self {
445            changed_qs: QueryState::new(),
446            shape_stream_reader: FramedEventsReader::new(),
447            filter,
448            version: 0,
449        }
450    }
451    pub fn filter(&self) -> &WorldStreamFilter {
452        &self.filter
453    }
454    #[ambient_profiling::function]
455    pub fn next_diff(&mut self, world: &World) -> WorldDiff {
456        let shape_changes = self
457            .shape_stream_reader
458            .iter(world.shape_change_events.as_ref().unwrap())
459            .filter_map(|(_, change)| change.filter(world, &self.filter))
460            .collect_vec();
461
462        let mut sets = HashMap::new();
463        for arch in world.archetypes.iter() {
464            if self.filter.arch_filter.matches(&arch.active_components) {
465                for arch_comp in arch.components.iter() {
466                    if (self.filter.component_filter)(
467                        arch_comp.component,
468                        WorldStreamCompEvent::Set,
469                    ) {
470                        let reader = self
471                            .changed_qs
472                            .change_readers
473                            .get(arch.id, arch_comp.component.index() as _);
474
475                        for (_, &entity_id) in reader.iter(&*arch_comp.changes.borrow()) {
476                            if let Some(loc) = world.entity_loc(entity_id) {
477                                if loc.archetype == arch.id
478                                    && arch_comp.get_content_version(loc.index) > self.version
479                                {
480                                    let entry = sets.entry(entity_id).or_insert_with(Vec::new);
481                                    entry.push(
482                                        world.get_entry(entity_id, arch_comp.component).unwrap(),
483                                    );
484                                }
485                            }
486                        }
487                    }
488                }
489            }
490        }
491        self.version = world.version();
492        let mut changes = shape_changes;
493        changes.extend(sets.into_iter().flat_map(|(id, entrys)| {
494            entrys
495                .into_iter()
496                .map(move |entry| WorldChange::Set(id, entry))
497        }));
498        WorldDiff { changes }
499    }
500}