ih_muse_core/buffer/
element_buffer.rs1use std::collections::HashMap;
2
3use ih_muse_proto::*;
4use tokio::sync::Mutex;
5
6#[derive(Clone)]
7pub struct BufferEntry {
8 pub id: LocalElementId,
9 pub registration: ElementRegistration,
10}
11
12impl BufferEntry {
13 pub fn new(id: LocalElementId, registration: ElementRegistration) -> Self {
14 Self { id, registration }
15 }
16}
17
18pub struct ElementBuffer {
20 pending: Mutex<Vec<BufferEntry>>,
21 retry_counts: Mutex<HashMap<LocalElementId, usize>>,
22 max_retries: usize,
23}
24
25impl ElementBuffer {
26 pub fn new(max_retries: usize) -> Self {
27 Self {
28 pending: Mutex::new(Vec::new()),
29 retry_counts: Mutex::new(HashMap::new()),
30 max_retries,
31 }
32 }
33
34 pub async fn add_element(
36 &self,
37 element_id: LocalElementId,
38 element_registration: ElementRegistration,
39 ) {
40 let mut pending = self.pending.lock().await;
41 pending.push(BufferEntry::new(element_id, element_registration));
42 }
43
44 pub async fn get_pending_elements(&self) -> Vec<BufferEntry> {
46 let mut pending = self.pending.lock().await;
47 pending.drain(..).collect()
48 }
49
50 pub async fn mark_failed(&self, element: BufferEntry) {
53 let mut retry_counts = self.retry_counts.lock().await;
54 let count = retry_counts.entry(element.id).or_insert(0);
55 *count += 1;
56
57 if *count >= self.max_retries {
58 retry_counts.remove(&element.id);
59 log::warn!(
60 "Element {:?} won't be retried after {} failed attempts",
61 element.id,
62 self.max_retries
63 );
64 } else {
65 let mut pending = self.pending.lock().await;
66 pending.push(element);
67 }
68 }
69
70 pub async fn mark_succeeded(&self, element_id: &LocalElementId) {
72 let mut retry_counts = self.retry_counts.lock().await;
73 retry_counts.remove(element_id);
74 }
75}