ed_journals/modules/state/models/
state_container.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
use crate::state::models::feed_result::FeedResult;
use crate::state::traits::state_resolver::StateResolver;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::fmt::{Debug, Formatter};
use std::mem;
use std::ops::{Deref, DerefMut};

/// This is what is used internally take care of input and manage flushing and handling retries.
pub struct StateContainer<S, T>
where
    S: StateResolver<T>,
    T: Clone,
{
    /// The inner resolver which is what actually takes input and does something with it. The state
    /// container calls the methods on the given [StateResolver].
    inner: S,

    /// Some events might get queued for later processing because not all information might be
    /// available at the time.
    later: Vec<T>,
}

impl<S, T> StateContainer<S, T>
where
    S: StateResolver<T>,
    T: Clone,
{
    /// Takes the log events and processes it in the state. Note that it does not guarantee that the
    /// event will be processed immediately. In some situations the event will be queued when the
    /// state things it is better able to process the event, but it doesn't do this automatically.
    /// For those events to be processed, you need to call [StateContainer::flush]. This will go through
    /// the remaining events and tries to process them.
    pub fn feed(&mut self, input: &T) {
        let handle_result = self.inner.feed(input);

        if let FeedResult::Later = handle_result {
            self.later.push(input.clone());
        }
    }

    /// Processes any left-over events that were scheduled for later processing. Call this sparingly
    /// especially not while you're also still reading a lot of events through
    /// [StateContainer::feed] as that will likely cause performance issues.
    pub fn flush(&mut self) {
        let queued = mem::take(&mut self.later);

        for item in queued {
            if let FeedResult::Later = self.inner.feed(&item) {
                self.later.push(item);
            }
        }

        self.inner.flush_inner();
    }
}

impl<S, T> Deref for StateContainer<S, T>
where
    S: StateResolver<T>,
    T: Clone,
{
    type Target = S;

    fn deref(&self) -> &Self::Target {
        &self.inner
    }
}

impl<S, T> DerefMut for StateContainer<S, T>
where
    S: StateResolver<T>,
    T: Clone,
{
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.inner
    }
}

impl<S, T> Default for StateContainer<S, T>
where
    S: StateResolver<T> + Default,
    T: Clone,
{
    fn default() -> Self {
        StateContainer {
            inner: S::default(),
            later: Vec::new(),
        }
    }
}

impl<S, T> Serialize for StateContainer<S, T>
where
    S: StateResolver<T> + Serialize,
    T: Clone,
{
    fn serialize<Se>(&self, serializer: Se) -> Result<Se::Ok, Se::Error>
    where
        Se: Serializer,
    {
        self.inner.serialize(serializer)
    }
}

impl<'de, S, T> Deserialize<'de> for StateContainer<S, T>
where
    S: StateResolver<T> + Deserialize<'de>,
    T: Clone,
{
    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
    where
        D: Deserializer<'de>,
    {
        Ok(StateContainer {
            inner: S::deserialize(deserializer)?,
            later: Vec::new(),
        })
    }
}

impl<S, T> From<S> for StateContainer<S, T>
where
    S: StateResolver<T>,
    T: Clone,
{
    fn from(value: S) -> Self {
        StateContainer {
            inner: value,
            later: Vec::new(),
        }
    }
}

impl<S, T> Debug for StateContainer<S, T>
where
    S: StateResolver<T> + Debug,
    T: Clone,
{
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        Debug::fmt(&self.inner, f)
    }
}