penumbra_sdk_tendermint_proxy/
tendermint_proxy.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
use crate::TendermintProxy;
use penumbra_sdk_proto::{
    util::tendermint_proxy::v1::{
        tendermint_proxy_service_server::TendermintProxyService, AbciQueryRequest,
        AbciQueryResponse, BroadcastTxAsyncRequest, BroadcastTxAsyncResponse,
        BroadcastTxSyncRequest, BroadcastTxSyncResponse, GetBlockByHeightRequest,
        GetBlockByHeightResponse, GetStatusRequest, GetStatusResponse, GetTxRequest, GetTxResponse,
    },
    DomainType,
};
use penumbra_sdk_transaction::Transaction;
use tap::TapFallible;
use tendermint::{abci::Code, block::Height};
use tendermint_rpc::{Client, HttpClient};
use tonic::Status;
use tracing::instrument;

#[tonic::async_trait]
impl TendermintProxyService for TendermintProxy {
    // Note: the conversions that take place in here could be moved to
    // from/try_from impls, but they're not used anywhere else, so it's
    // unimportant right now, and would require additional wrappers
    // since none of the structs are defined in our crates :(
    // TODO: move those to proto/src/protobuf.rs

    /// Fetches a transaction by hash.
    ///
    /// Returns a [`GetTxResponse`] information about the requested transaction.
    #[instrument(level = "info", skip_all)]
    async fn get_tx(
        &self,
        req: tonic::Request<GetTxRequest>,
    ) -> Result<tonic::Response<GetTxResponse>, Status> {
        // Create an HTTP client, connecting to tendermint.
        let client = HttpClient::new(self.tendermint_url.as_ref()).map_err(|e| {
            Status::unavailable(format!("error creating tendermint http client: {e:#?}"))
        })?;

        // Parse the inbound transaction hash from the client request.
        let GetTxRequest { hash, prove } = req.into_inner();
        let hash = hash
            .try_into()
            .map_err(|e| Status::invalid_argument(format!("invalid transaction hash: {e:#?}")))?;

        // Send the request to Tendermint.
        let rsp = client
            .tx(hash, prove)
            .await
            .map(GetTxResponse::from)
            .map_err(|e| Status::unavailable(format!("error getting tx: {e}")))?;

        // Before forwarding along the response, verify that the transaction can be
        // successfully decoded into our domain type.
        Transaction::decode(rsp.tx.as_ref())
            .map_err(|e| Status::unavailable(format!("error decoding tx: {e}")))?;

        Ok(tonic::Response::new(rsp))
    }

    /// Broadcasts a transaction asynchronously.
    #[instrument(
        level = "info",
        skip_all,
        fields(req_id = tracing::field::Empty),
    )]
    async fn broadcast_tx_async(
        &self,
        req: tonic::Request<BroadcastTxAsyncRequest>,
    ) -> Result<tonic::Response<BroadcastTxAsyncResponse>, Status> {
        // Create an HTTP client, connecting to tendermint.
        let client = HttpClient::new(self.tendermint_url.as_ref()).map_err(|e| {
            Status::unavailable(format!("error creating tendermint http client: {e:#?}"))
        })?;

        // Process the inbound request, recording the request ID in the tracing span.
        let BroadcastTxAsyncRequest { req_id, params } = req.into_inner();
        tracing::Span::current().record("req_id", req_id);

        // Broadcast the transaction parameters.
        client
            .broadcast_tx_async(params)
            .await
            .map(BroadcastTxAsyncResponse::from)
            .map(tonic::Response::new)
            .map_err(|e| Status::unavailable(format!("error broadcasting tx async: {e}")))
    }

    // Broadcasts a transaction synchronously.
    #[instrument(
        level = "info",
        skip_all,
        fields(req_id = tracing::field::Empty),
    )]
    async fn broadcast_tx_sync(
        &self,
        req: tonic::Request<BroadcastTxSyncRequest>,
    ) -> Result<tonic::Response<BroadcastTxSyncResponse>, Status> {
        // Create an HTTP client, connecting to tendermint.
        let client = HttpClient::new(self.tendermint_url.as_ref()).map_err(|e| {
            Status::unavailable(format!("error creating tendermint http client: {e:#?}"))
        })?;

        // Process the inbound request, recording the request ID in the tracing span.
        let BroadcastTxSyncRequest { req_id, params } = req.into_inner();
        tracing::Span::current().record("req_id", req_id);

        // Broadcast the transaction parameters.
        client
            .broadcast_tx_sync(params)
            .await
            .map(BroadcastTxSyncResponse::from)
            .map(tonic::Response::new)
            .map_err(|e| tonic::Status::unavailable(format!("error broadcasting tx sync: {e}")))
            .tap_ok(|res| tracing::debug!("{:?}", res))
    }

    // Queries the current status.
    #[instrument(level = "info", skip_all)]
    async fn get_status(
        &self,
        _req: tonic::Request<GetStatusRequest>,
    ) -> Result<tonic::Response<GetStatusResponse>, Status> {
        // generic bounds on HttpClient::new are not well-constructed, so we have to
        // render the URL as a String, then borrow it, then re-parse the borrowed &str
        let client = HttpClient::new(self.tendermint_url.as_ref()).map_err(|e| {
            tonic::Status::unavailable(format!("error creating tendermint http client: {e:#?}"))
        })?;

        // Send the status request.
        client
            .status()
            .await
            .map(GetStatusResponse::from)
            .map(tonic::Response::new)
            .map_err(|e| tonic::Status::unavailable(format!("error querying status: {e}")))
    }

    #[instrument(level = "info", skip_all)]
    async fn abci_query(
        &self,
        req: tonic::Request<AbciQueryRequest>,
    ) -> Result<tonic::Response<AbciQueryResponse>, Status> {
        // generic bounds on HttpClient::new are not well-constructed, so we have to
        // render the URL as a String, then borrow it, then re-parse the borrowed &str
        let client = HttpClient::new(self.tendermint_url.to_string().as_ref()).map_err(|e| {
            tonic::Status::unavailable(format!("error creating tendermint http client: {e:#?}"))
        })?;

        // Parse the inbound request, confirm that the height provided is valid.
        // TODO: how does path validation work on tendermint-rs@29
        let AbciQueryRequest {
            data,
            path,
            height,
            prove,
        } = req.into_inner();
        let height: Height = height
            .try_into()
            .map_err(|_| Status::invalid_argument("invalid height"))?;

        // Send the ABCI query to Tendermint.
        let rsp = client
            .abci_query(Some(path), data, Some(height), prove)
            .await
            .map_err(|e| Status::unavailable(format!("error querying abci: {e}")))
            // Confirm that the response code is 0, or return an error response.
            .and_then(|rsp| match rsp.code {
                Code::Ok => Ok(rsp),
                tendermint::abci::Code::Err(e) => {
                    Err(Status::unavailable(format!("error querying abci: {e}")))
                }
            })?;

        AbciQueryResponse::try_from(rsp)
            .map(tonic::Response::new)
            .map_err(|error| Status::internal(format!("{error}")))
    }

    #[instrument(level = "info", skip_all)]
    async fn get_block_by_height(
        &self,
        req: tonic::Request<GetBlockByHeightRequest>,
    ) -> Result<tonic::Response<GetBlockByHeightResponse>, Status> {
        // generic bounds on HttpClient::new are not well-constructed, so we have to
        // render the URL as a String, then borrow it, then re-parse the borrowed &str
        let client = HttpClient::new(self.tendermint_url.to_string().as_ref()).map_err(|e| {
            tonic::Status::unavailable(format!("error creating tendermint http client: {e:#?}"))
        })?;

        // Parse the height from the inbound client request.
        let GetBlockByHeightRequest { height } = req.into_inner();
        let height =
            tendermint::block::Height::try_from(height).expect("height should be less than 2^63");

        // Fetch the block and forward Tendermint's response back to the client.
        client
            .block(height)
            .await
            .map_err(|e| tonic::Status::unavailable(format!("error querying abci: {e}")))
            .and_then(|b| {
                match GetBlockByHeightResponse::try_from(b) {
                    Ok(b) => Ok(b),
                    Err(e) => {
                        tracing::warn!(?height, error = ?e, "proxy: error deserializing GetBlockByHeightResponse");
                        Err(tonic::Status::internal("error deserializing GetBlockByHeightResponse"))
                    }
                }
            })
            .map(tonic::Response::new)
    }
}