radicle_ci_broker/
queueadd.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
use std::thread::{spawn, JoinHandle};

use radicle::Profile;

use crate::{
    ci_event::CiEvent,
    ci_event_source::{CiEventSource, CiEventSourceError},
    db::{Db, DbError},
    filter::EventFilter,
    logger,
    notif::NotificationSender,
};

#[derive(Default)]
pub struct QueueAdderBuilder {
    db: Option<Db>,
    filters: Option<Vec<EventFilter>>,
    events_tx: Option<NotificationSender>,
}

impl QueueAdderBuilder {
    pub fn build(self) -> Result<QueueAdder, AdderError> {
        Ok(QueueAdder {
            db: self.db.ok_or(AdderError::Missing("db"))?,
            filters: self.filters.ok_or(AdderError::Missing("filters"))?,
            events_tx: self.events_tx.ok_or(AdderError::Missing("events_tx"))?,
        })
    }

    pub fn events_tx(mut self, tx: NotificationSender) -> Self {
        self.events_tx = Some(tx);
        self
    }

    pub fn db(mut self, db: Db) -> Self {
        self.db = Some(db);
        self
    }

    pub fn filters(mut self, filters: &[EventFilter]) -> Self {
        self.filters = Some(filters.to_vec());
        self
    }
}

pub struct QueueAdder {
    filters: Vec<EventFilter>,
    db: Db,
    events_tx: NotificationSender,
}

impl QueueAdder {
    pub fn add_events_in_thread(self) -> JoinHandle<Result<(), AdderError>> {
        spawn(move || self.add_events())
    }

    pub fn add_events(&self) -> Result<(), AdderError> {
        logger::queueadd_start();

        let profile = Profile::load()?;

        let mut source = CiEventSource::new(&profile)?;

        // This loop ends when there's an error, e.g., failure to read an
        // event from the node.
        'event_loop: loop {
            let events = source.event();
            logger::debug2(format!("queueadd: events={events:?}"));
            match events {
                Err(e) => {
                    logger::queueadd_control_socket_close();
                    return Err(e.into());
                }
                Ok(None) => {
                    break 'event_loop;
                }
                Ok(Some(events)) => {
                    for e in events {
                        for filter in self.filters.iter() {
                            if filter.allows(&e) {
                                logger::queueadd_push_event(&e);
                                self.push_event(e.clone())?;
                            }
                        }
                    }
                }
            }
        }

        logger::queueadd_end();
        Ok(())
    }

    fn push_event(&self, e: CiEvent) -> Result<(), AdderError> {
        self.db.push_queued_ci_event(e)?;
        self.events_tx.notify().map_err(|_| AdderError::Send)?;
        Ok(())
    }
}

#[derive(Debug, thiserror::Error)]
pub enum AdderError {
    #[error("programming error: QueueAdderBuilder field {0} was not set")]
    Missing(&'static str),

    #[error(transparent)]
    Profile(#[from] radicle::profile::Error),

    #[error(transparent)]
    CiEvent(#[from] CiEventSourceError),

    #[error(transparent)]
    Db(#[from] DbError),

    #[error("failed to notify other thread about database change")]
    Send,
}