1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
use crate::BlockSource;
use async_trait::async_trait;
use bitcoin::{Block, BlockHash, OutPoint, Txid};
use bitcoin::blockdata::block::Header as BlockHeader;
use txoo::spv::SpvProof;

/// A follower error
#[derive(Debug)]
pub enum Error {
    /// The block source is not available
    SourceError(String),
}

impl From<crate::Error> for Error {
    fn from(e: crate::Error) -> Error {
        Error::SourceError(e.to_string())
    }
}

/// The next action to take when following the chain
#[derive(PartialEq)]
pub enum FollowAction {
    /// No action required, synced to chain tip
    None,
    /// A block has been added to the chain.
    /// Provides the new block.
    BlockAdded(Block),
    /// The current block has been reorganized out of the chain.
    /// Provides the block that was reorged out and the previous block header.
    BlockReorged(Block, BlockHeader),
}

/// A follower for BlockSource
pub struct SourceFollower {
    source: Box<dyn BlockSource>,
}

impl SourceFollower {
    /// Create a new follower
    pub fn new(source: Box<dyn BlockSource>) -> Self {
        SourceFollower { source }
    }
}

impl SourceFollower {
    /// Follow the chain, returning the next action to take
    pub async fn follow(
        &self,
        current_height: u32,
        current_hash: BlockHash,
    ) -> Result<FollowAction, Error> {
        match self.source.get_block_hash(current_height + 1).await? {
            None => {
                // No new block, but check if the current block has been reorged
                match self.source.get_block_hash(current_height).await? {
                    None => {
                        // The current block has been reorged out of the chain
                        let current_block = self.source.get_block(&current_hash).await?;
                        let prev_block_header = self
                            .source
                            .get_header(&current_block.header.prev_blockhash)
                            .await?
                            .header;
                        Ok(FollowAction::BlockReorged(current_block, prev_block_header))
                    }
                    Some(check_hash) => {
                        if check_hash == current_hash {
                            // No action required, synced to chain tip
                            Ok(FollowAction::None)
                        } else {
                            // The current block has been reorged out of the chain
                            let current_block = self.source.get_block(&current_hash).await?;
                            let prev_block_header = self
                                .source
                                .get_header(&current_block.header.prev_blockhash)
                                .await?
                                .header;
                            Ok(FollowAction::BlockReorged(current_block, prev_block_header))
                        }
                    }
                }
            }
            Some(new_hash) => {
                let block = self.source.get_block(&new_hash).await?;
                if block.header.prev_blockhash == current_hash {
                    // A block has been added to the chain
                    Ok(FollowAction::BlockAdded(block))
                } else {
                    // The new block actually extends a different chain
                    let current_block = self.source.get_block(&current_hash).await?;
                    let prev_block_header = self
                        .source
                        .get_header(&current_block.header.prev_blockhash)
                        .await?
                        .header;
                    Ok(FollowAction::BlockReorged(current_block, prev_block_header))
                }
            }
        }
    }
}

/// The next action to take when following the chain, with SPV proofs
pub enum FollowWithProofAction {
    /// No action required, synced to chain tip
    None,
    /// A block has been added to the chain.
    /// Provides the new block.
    BlockAdded(Block, SpvProof),
    /// The current block has been reorganized out of the chain.
    /// Provides the block that was reorged out.
    /// Note that the transactions should be "un-processed" in reverse order
    /// in case they have inter-dependencies.
    ///
    /// Also provides the previous block header.
    BlockReorged(Block, SpvProof, BlockHeader),
}

/// A callback trait returning which transactions and outpoints should be included in proofs
#[async_trait]
pub trait Tracker {
    /// Returns all Txid and OutPoints to watch for in future blocks
    async fn forward_watches(&self) -> (Vec<Txid>, Vec<OutPoint>);

    /// Returns all Txid and OutPoint watches used for prior blocks.
    /// Used when removing blocks during reorg.
    async fn reverse_watches(&self) -> (Vec<Txid>, Vec<OutPoint>);
}

/// A follower for BlockSource with SPV proofs
pub struct SourceWithProofFollower(SourceFollower);

impl SourceWithProofFollower {
    /// Create a new follower
    pub fn new(source: Box<dyn BlockSource>) -> Self {
        SourceWithProofFollower(SourceFollower::new(source))
    }

    /// Follow the chain, returning the next action to take
    pub async fn follow_with_proof(
        &self,
        current_height: u32,
        current_hash: BlockHash,
        tracker: &impl Tracker,
    ) -> Result<FollowWithProofAction, Error> {
        match self.0.follow(current_height, current_hash).await? {
            FollowAction::None => Ok(FollowWithProofAction::None),
            FollowAction::BlockAdded(block) => {
                let (txids, outpoints) = tracker.forward_watches().await;
                let proof = SpvProof::build(&block, &txids, &outpoints).0;
                Ok(FollowWithProofAction::BlockAdded(block, proof))
            }
            FollowAction::BlockReorged(block, prev_block_header) => {
                let (txids, outpoints) = tracker.reverse_watches().await;
                let proof = SpvProof::build(&block, &txids, &outpoints).0;
                Ok(FollowWithProofAction::BlockReorged(
                    block,
                    proof,
                    prev_block_header,
                ))
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::test_utils::{DummyBlockSource, DummyTracker};
    use crate::BlockSource;
    use bitcoin::BlockHash;

    #[tokio::test]
    async fn dummy_test() {
        let mut source = DummyBlockSource::new();
        source.add();
        source.add();
        source.add();
        let tip = source.get_best_block().await.unwrap();
        assert_eq!(tip.1, 3);
        let tip_block = source.get_block(&tip.0).await.unwrap();
        source.remove();
        let prev_tip = source.get_best_block().await.unwrap();
        assert_eq!(prev_tip.1, 2);
        assert_eq!(tip_block.header.prev_blockhash, prev_tip.0);
    }

    #[tokio::test]
    async fn follow_test() {
        let mut source = DummyBlockSource::new();
        let follower = SourceFollower::new(Box::new(source.clone()));
        let genesis_hash = source.genesis_hash();
        assert!(follower.follow(0, genesis_hash).await.unwrap() == FollowAction::None);
        source.add();
        let hash1 = assert_add(follower.follow(0, genesis_hash).await.unwrap());
        source.add();
        let hash2 = assert_add(follower.follow(1, hash1).await.unwrap());
        assert!(follower.follow(2, hash2).await.unwrap() == FollowAction::None);
        source.remove();
        source.remove();
        let action = follower.follow(2, hash2).await.unwrap();
        if let FollowAction::BlockReorged(block, _prev_block_header) = action {
            assert_eq!(block.block_hash(), hash2);
        } else {
            panic!("expected reorg");
        }
        let action = follower.follow(1, hash1).await.unwrap();
        if let FollowAction::BlockReorged(block, _prev_block_header) = action {
            assert_eq!(block.block_hash(), hash1);
        } else {
            panic!("expected reorg");
        }
        source.add();
        let hash1a = assert_add(follower.follow(0, genesis_hash).await.unwrap());
        assert_eq!(hash1a, hash1);
        assert!(follower.follow(1, hash1).await.unwrap() == FollowAction::None);
    }

    #[tokio::test]
    async fn follow_with_proof_test() {
        let tracker = DummyTracker();
        let mut source = DummyBlockSource::new();
        let follower = SourceWithProofFollower::new(Box::new(source.clone()));
        let genesis_hash = source.genesis_hash();
        if let FollowWithProofAction::None = follower
            .follow_with_proof(0, genesis_hash, &tracker)
            .await
            .unwrap()
        {
        } else {
            panic!("expected None");
        }
        source.add();
        let hash1 = match follower
            .follow_with_proof(0, genesis_hash, &tracker)
            .await
            .unwrap()
        {
            FollowWithProofAction::BlockAdded(block, proof) => {
                assert!(proof.proof.is_none());
                block.block_hash()
            }
            _ => panic!("expected block added with proof"),
        };
        if let FollowWithProofAction::None = follower
            .follow_with_proof(1, hash1, &tracker)
            .await
            .unwrap()
        {
        } else {
            panic!("expected None");
        }
        source.remove();
        let action = follower
            .follow_with_proof(1, hash1, &tracker)
            .await
            .unwrap();
        if let FollowWithProofAction::BlockReorged(block, proof, _prev_block_header) = action {
            assert!(proof.proof.is_none());
            assert_eq!(block.block_hash(), hash1);
        } else {
            panic!("expected reorg");
        }
    }

    fn assert_add(action: FollowAction) -> BlockHash {
        if let FollowAction::BlockAdded(block) = action {
            block.block_hash()
        } else {
            panic!("wrong follow action");
        }
    }
}