ih_muse_core/buffer/
element_buffer.rs

1use std::collections::HashMap;
2
3use ih_muse_proto::*;
4use tokio::sync::Mutex;
5
6#[derive(Clone)]
7pub struct BufferEntry {
8    pub id: LocalElementId,
9    pub registration: ElementRegistration,
10}
11
12impl BufferEntry {
13    pub fn new(id: LocalElementId, registration: ElementRegistration) -> Self {
14        Self { id, registration }
15    }
16}
17
18// Buffer to manage element registration attempts
19pub struct ElementBuffer {
20    pending: Mutex<Vec<BufferEntry>>,
21    retry_counts: Mutex<HashMap<LocalElementId, usize>>,
22    max_retries: usize,
23}
24
25impl ElementBuffer {
26    pub fn new(max_retries: usize) -> Self {
27        Self {
28            pending: Mutex::new(Vec::new()),
29            retry_counts: Mutex::new(HashMap::new()),
30            max_retries,
31        }
32    }
33
34    /// Adds an element to the pending queue.
35    pub async fn add_element(
36        &self,
37        element_id: LocalElementId,
38        element_registration: ElementRegistration,
39    ) {
40        let mut pending = self.pending.lock().await;
41        pending.push(BufferEntry::new(element_id, element_registration));
42    }
43
44    /// Retrieves and removes all pending elements.
45    pub async fn get_pending_elements(&self) -> Vec<BufferEntry> {
46        let mut pending = self.pending.lock().await;
47        pending.drain(..).collect()
48    }
49
50    /// Marks an element as failed and handles retries.
51    /// Returns `true` if the element will be retried, `false` otherwise.
52    pub async fn mark_failed(&self, element: BufferEntry) {
53        let mut retry_counts = self.retry_counts.lock().await;
54        let count = retry_counts.entry(element.id).or_insert(0);
55        *count += 1;
56
57        if *count >= self.max_retries {
58            retry_counts.remove(&element.id);
59            log::warn!(
60                "Element {:?} won't be retried after {} failed attempts",
61                element.id,
62                self.max_retries
63            );
64        } else {
65            let mut pending = self.pending.lock().await;
66            pending.push(element);
67        }
68    }
69
70    /// Marks an element as successfully registered.
71    pub async fn mark_succeeded(&self, element_id: &LocalElementId) {
72        let mut retry_counts = self.retry_counts.lock().await;
73        retry_counts.remove(element_id);
74    }
75}