rust_nebula/storage/
transport_response_handler.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
use std::io::{Cursor, Error as IoError, ErrorKind as IoErrorKind};

use crate::fbthrift::{
    binary_protocol::BinaryProtocolDeserializer, ApplicationException, Deserialize, MessageType,
    ProtocolReader,
};
use crate::fbthrift_transport_response_handler::ResponseHandler;
use crate::nebula_fbthrift_storage_v3::services::graph_storage_service::{ScanEdgeExn, ScanVertexExn};

#[derive(Clone)]
pub struct StorageTransportResponseHandler;

impl ResponseHandler for StorageTransportResponseHandler {
    fn try_make_static_response_bytes(
        &mut self,
        _service_name: &'static [u8],
        fn_name: &'static [u8],
        _request_bytes: &[u8],
    ) -> Result<Option<Vec<u8>>, IoError> {
        match fn_name {
            b"GraphStorageService.scanVertex" | b"GraphStorageService.scanEdge" => Ok(None),
            _ => Err(IoError::new(
                IoErrorKind::Other,
                format!("Unknown method {}", String::from_utf8_lossy(fn_name)),
            )),
        }
    }

    fn parse_response_bytes(&mut self, response_bytes: &[u8]) -> Result<Option<usize>, IoError> {
        let mut des = BinaryProtocolDeserializer::new(Cursor::new(response_bytes));
        let (name, message_type, _) = match des.read_message_begin(|v| v.to_vec()) {
            Ok(v) => v,
            Err(_) => return Ok(None),
        };

        match &name[..] {
            b"scanVertex" | b"scanEdge" => {}
            _ => return Ok(None),
        };

        match message_type {
            MessageType::Reply => {
                match &name[..] {
                    b"scanVertex" => {
                        let _: ScanVertexExn = match Deserialize::read(&mut des) {
                            Ok(v) => v,
                            Err(_) => return Ok(None),
                        };
                    }
                    b"scanEdge" => {
                        let _: ScanEdgeExn = match Deserialize::read(&mut des) {
                            Ok(v) => v,
                            Err(_) => return Ok(None),
                        };
                    }
                    _ => unreachable!(),
                };
            }
            MessageType::Exception => {
                let _: ApplicationException = match Deserialize::read(&mut des) {
                    Ok(v) => v,
                    Err(_) => return Ok(None),
                };
            }
            MessageType::Call | MessageType::Oneway | MessageType::InvalidMessageType => {}
        }

        match des.read_message_end() {
            Ok(v) => v,
            Err(_) => return Ok(None),
        };

        Ok(Some(des.into_inner().position() as usize))
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_try_make_static_response_bytes() -> Result<(), Box<dyn std::error::Error>> {
        let mut handler = StorageTransportResponseHandler;

        assert_eq!(
            handler.try_make_static_response_bytes(
                b"StorageService",
                b"StorageService.scanVertex",
                b"FOO"
            )?,
            None
        );
        assert_eq!(
            handler.try_make_static_response_bytes(
                b"StorageService",
                b"StorageService.scanEdge",
                b"FOO"
            )?,
            None
        );
        match handler.try_make_static_response_bytes(
            b"StorageService",
            b"StorageService.foo",
            b"FOO",
        ) {
            Ok(_) => panic!(),
            Err(err) => {
                assert_eq!(err.kind(), IoErrorKind::Other);

                assert_eq!(err.to_string(), "Unknown method StorageService.foo");
            }
        }

        Ok(())
    }
}