fuel_gql_client/
client.rs

1use crate::client::schema::contract::ContractBalanceQueryArgs;
2use anyhow::Context;
3use cynic::{
4    http::ReqwestExt,
5    GraphQlResponse,
6    Id,
7    MutationBuilder,
8    Operation,
9    QueryBuilder,
10    StreamingOperation,
11};
12use eventsource_client::HttpsConnector;
13use fuel_vm::prelude::*;
14use futures::StreamExt;
15use itertools::Itertools;
16use schema::{
17    balance::BalanceArgs,
18    block::BlockByIdArgs,
19    coin::{
20        Coin,
21        CoinByIdArgs,
22    },
23    contract::{
24        Contract,
25        ContractByIdArgs,
26    },
27    resource::SpendQueryElementInput,
28    tx::{
29        TxArg,
30        TxIdArgs,
31    },
32    Bytes,
33    ContinueTx,
34    ContinueTxArgs,
35    ConversionError,
36    HexString,
37    IdArg,
38    MemoryArgs,
39    RegisterArgs,
40    RunResult,
41    SetBreakpoint,
42    SetBreakpointArgs,
43    SetSingleStepping,
44    SetSingleSteppingArgs,
45    StartTx,
46    StartTxArgs,
47    TransactionId,
48    U64,
49};
50use std::{
51    convert::TryInto,
52    future,
53    io::{
54        self,
55        ErrorKind,
56    },
57    net,
58    str::{
59        self,
60        FromStr,
61    },
62};
63use types::{
64    TransactionResponse,
65    TransactionStatus,
66};
67
68use crate::client::schema::{
69    block::BlockByHeightArgs,
70    resource::ExcludeInput,
71    tx::DryRunArg,
72};
73pub use schema::{
74    PageDirection,
75    PaginatedResult,
76    PaginationRequest,
77};
78
79use self::schema::{
80    block::{
81        ProduceBlockArgs,
82        TimeParameters,
83    },
84    message::MessageProofArgs,
85};
86
87pub mod schema;
88pub mod types;
89
90#[derive(Debug, Clone, PartialEq, Eq, Hash)]
91pub struct FuelClient {
92    url: reqwest::Url,
93}
94
95impl FromStr for FuelClient {
96    type Err = anyhow::Error;
97
98    fn from_str(str: &str) -> Result<Self, Self::Err> {
99        let mut raw_url = str.to_string();
100        if !raw_url.starts_with("http") {
101            raw_url = format!("http://{}", raw_url);
102        }
103
104        let mut url = reqwest::Url::parse(&raw_url)
105            .with_context(|| format!("Invalid fuel-core URL: {}", str))?;
106        url.set_path("/graphql");
107        Ok(Self { url })
108    }
109}
110
111impl<S> From<S> for FuelClient
112where
113    S: Into<net::SocketAddr>,
114{
115    fn from(socket: S) -> Self {
116        format!("http://{}", socket.into())
117            .as_str()
118            .parse()
119            .unwrap()
120    }
121}
122
123pub fn from_strings_errors_to_std_error(errors: Vec<String>) -> io::Error {
124    let e = errors
125        .into_iter()
126        .fold(String::from("Response errors"), |mut s, e| {
127            s.push_str("; ");
128            s.push_str(e.as_str());
129            s
130        });
131    io::Error::new(io::ErrorKind::Other, e)
132}
133
134impl FuelClient {
135    pub fn new(url: impl AsRef<str>) -> anyhow::Result<Self> {
136        Self::from_str(url.as_ref())
137    }
138
139    async fn query<ResponseData, Vars>(
140        &self,
141        q: Operation<ResponseData, Vars>,
142    ) -> io::Result<ResponseData>
143    where
144        Vars: serde::Serialize,
145        ResponseData: serde::de::DeserializeOwned + 'static,
146    {
147        let response = reqwest::Client::new()
148            .post(self.url.clone())
149            .run_graphql(q)
150            .await
151            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
152
153        Self::decode_response(response)
154    }
155
156    fn decode_response<R>(response: GraphQlResponse<R>) -> io::Result<R>
157    where
158        R: serde::de::DeserializeOwned + 'static,
159    {
160        match (response.data, response.errors) {
161            (Some(d), _) => Ok(d),
162            (_, Some(e)) => Err(from_strings_errors_to_std_error(
163                e.into_iter().map(|e| e.message).collect(),
164            )),
165            _ => Err(io::Error::new(io::ErrorKind::Other, "Invalid response")),
166        }
167    }
168
169    async fn subscribe<ResponseData, Vars>(
170        &self,
171        q: StreamingOperation<ResponseData, Vars>,
172    ) -> io::Result<impl futures::Stream<Item = io::Result<ResponseData>>>
173    where
174        Vars: serde::Serialize,
175        ResponseData: serde::de::DeserializeOwned + 'static,
176    {
177        use eventsource_client as es;
178        let mut url = self.url.clone();
179        url.set_path("/graphql-sub");
180        let json_query = serde_json::to_string(&q)?;
181        let client = es::ClientBuilder::for_url(url.as_str())
182            .map_err(|e| {
183                io::Error::new(
184                    io::ErrorKind::Other,
185                    format!("Failed to start client {:?}", e),
186                )
187            })?
188            .body(json_query)
189            .method("POST".to_string())
190            .header("content-type", "application/json")
191            .map_err(|e| {
192                io::Error::new(
193                    io::ErrorKind::Other,
194                    format!("Failed to add header to client {:?}", e),
195                )
196            })?
197            .build_with_conn(HttpsConnector::with_webpki_roots());
198
199        let mut last = None;
200
201        let stream = es::Client::stream(&client)
202            .take_while(|result| {
203                futures::future::ready(!matches!(result, Err(es::Error::Eof)))
204            })
205            .filter_map(move |result| {
206                let r = match result {
207                    Ok(es::SSE::Event(es::Event { data, .. })) => {
208                        match serde_json::from_str::<GraphQlResponse<ResponseData>>(&data)
209                        {
210                            Ok(resp) => {
211                                match Self::decode_response(resp) {
212                                    Ok(resp) => {
213                                        match last.replace(data) {
214                                            // Remove duplicates
215                                            Some(l)
216                                                if l == *last.as_ref().expect(
217                                                    "Safe because of the replace above",
218                                                ) =>
219                                            {
220                                                None
221                                            }
222                                            _ => Some(Ok(resp)),
223                                        }
224                                    }
225                                    Err(e) => Some(Err(io::Error::new(
226                                        io::ErrorKind::Other,
227                                        format!("Decode error: {:?}", e),
228                                    ))),
229                                }
230                            }
231                            Err(e) => Some(Err(io::Error::new(
232                                io::ErrorKind::Other,
233                                format!("Json error: {:?}", e),
234                            ))),
235                        }
236                    }
237                    Ok(_) => None,
238                    Err(e) => Some(Err(io::Error::new(
239                        io::ErrorKind::Other,
240                        format!("Graphql error: {:?}", e),
241                    ))),
242                };
243                futures::future::ready(r)
244            });
245
246        Ok(stream)
247    }
248
249    pub async fn health(&self) -> io::Result<bool> {
250        let query = schema::Health::build(());
251        self.query(query).await.map(|r| r.health)
252    }
253
254    pub async fn node_info(&self) -> io::Result<schema::node_info::NodeInfo> {
255        let query = schema::node_info::QueryNodeInfo::build(());
256        self.query(query).await.map(|r| r.node_info)
257    }
258
259    pub async fn chain_info(&self) -> io::Result<schema::chain::ChainInfo> {
260        let query = schema::chain::ChainQuery::build(());
261        self.query(query).await.map(|r| r.chain)
262    }
263
264    /// Default dry run, matching the exact configuration as the node
265    pub async fn dry_run(&self, tx: &Transaction) -> io::Result<Vec<Receipt>> {
266        self.dry_run_opt(tx, None).await
267    }
268
269    /// Dry run with options to override the node behavior
270    pub async fn dry_run_opt(
271        &self,
272        tx: &Transaction,
273        // Disable utxo input checks (exists, unspent, and valid signature)
274        utxo_validation: Option<bool>,
275    ) -> io::Result<Vec<Receipt>> {
276        let tx = tx.clone().to_bytes();
277        let query = schema::tx::DryRun::build(DryRunArg {
278            tx: HexString(Bytes(tx)),
279            utxo_validation,
280        });
281        let receipts = self.query(query).await.map(|r| r.dry_run)?;
282        receipts
283            .into_iter()
284            .map(|receipt| receipt.try_into().map_err(Into::into))
285            .collect()
286    }
287
288    pub async fn submit(&self, tx: &Transaction) -> io::Result<TransactionId> {
289        let tx = tx.clone().to_bytes();
290        let query = schema::tx::Submit::build(TxArg {
291            tx: HexString(Bytes(tx)),
292        });
293
294        let id = self.query(query).await.map(|r| r.submit)?.id;
295        Ok(id)
296    }
297
298    /// Submit the transaction and wait for it to be included into a block.
299    ///
300    /// This will wait forever if needed, so consider wrapping this call
301    /// with a `tokio::time::timeout`.
302    pub async fn submit_and_await_commit(
303        &self,
304        tx: &Transaction,
305    ) -> io::Result<TransactionStatus> {
306        let tx_id = self.submit(tx).await?;
307        self.await_transaction_commit(&tx_id.to_string()).await
308    }
309
310    pub async fn start_session(&self) -> io::Result<String> {
311        let query = schema::StartSession::build(());
312
313        self.query(query)
314            .await
315            .map(|r| r.start_session.into_inner())
316    }
317
318    pub async fn end_session(&self, id: &str) -> io::Result<bool> {
319        let query = schema::EndSession::build(IdArg { id: id.into() });
320
321        self.query(query).await.map(|r| r.end_session)
322    }
323
324    pub async fn reset(&self, id: &str) -> io::Result<bool> {
325        let query = schema::Reset::build(IdArg { id: id.into() });
326
327        self.query(query).await.map(|r| r.reset)
328    }
329
330    pub async fn execute(&self, id: &str, op: &Opcode) -> io::Result<bool> {
331        let op = serde_json::to_string(op)?;
332        let query = schema::Execute::build(schema::ExecuteArgs { id: id.into(), op });
333
334        self.query(query).await.map(|r| r.execute)
335    }
336
337    pub async fn register(&self, id: &str, register: RegisterId) -> io::Result<Word> {
338        let query = schema::Register::build(RegisterArgs {
339            id: id.into(),
340            register: register.into(),
341        });
342
343        Ok(self.query(query).await?.register.0 as Word)
344    }
345
346    pub async fn memory(
347        &self,
348        id: &str,
349        start: usize,
350        size: usize,
351    ) -> io::Result<Vec<u8>> {
352        let query = schema::Memory::build(MemoryArgs {
353            id: id.into(),
354            start: start.into(),
355            size: size.into(),
356        });
357
358        let memory = self.query(query).await?.memory;
359
360        Ok(serde_json::from_str(memory.as_str())?)
361    }
362
363    pub async fn set_breakpoint(
364        &self,
365        session_id: &str,
366        contract: ::fuel_vm::fuel_types::ContractId,
367        pc: u64,
368    ) -> io::Result<()> {
369        let operation = SetBreakpoint::build(SetBreakpointArgs {
370            id: Id::new(session_id),
371            bp: schema::Breakpoint {
372                contract: contract.into(),
373                pc: U64(pc),
374            },
375        });
376
377        let response = self.query(operation).await?;
378        assert!(
379            response.set_breakpoint,
380            "Setting breakpoint returned invalid reply"
381        );
382        Ok(())
383    }
384
385    pub async fn set_single_stepping(
386        &self,
387        session_id: &str,
388        enable: bool,
389    ) -> io::Result<()> {
390        let operation = SetSingleStepping::build(SetSingleSteppingArgs {
391            id: Id::new(session_id),
392            enable,
393        });
394        self.query(operation).await?;
395        Ok(())
396    }
397
398    pub async fn start_tx(
399        &self,
400        session_id: &str,
401        tx: &Transaction,
402    ) -> io::Result<RunResult> {
403        let operation = StartTx::build(StartTxArgs {
404            id: Id::new(session_id),
405            tx: serde_json::to_string(tx).expect("Couldn't serialize tx to json"),
406        });
407        let response = self.query(operation).await?.start_tx;
408        Ok(response)
409    }
410
411    pub async fn continue_tx(&self, session_id: &str) -> io::Result<RunResult> {
412        let operation = ContinueTx::build(ContinueTxArgs {
413            id: Id::new(session_id),
414        });
415        let response = self.query(operation).await?.continue_tx;
416        Ok(response)
417    }
418
419    pub async fn transaction(&self, id: &str) -> io::Result<Option<TransactionResponse>> {
420        let query = schema::tx::TransactionQuery::build(TxIdArgs { id: id.parse()? });
421
422        let transaction = self.query(query).await?.transaction;
423
424        Ok(transaction.map(|tx| tx.try_into()).transpose()?)
425    }
426
427    /// Get the status of a transaction
428    pub async fn transaction_status(&self, id: &str) -> io::Result<TransactionStatus> {
429        let query = schema::tx::TransactionQuery::build(TxIdArgs { id: id.parse()? });
430
431        let tx = self.query(query).await?.transaction.ok_or_else(|| {
432            io::Error::new(
433                ErrorKind::NotFound,
434                format!("status not found for transaction {} ", id),
435            )
436        })?;
437
438        let status = tx
439            .status
440            .ok_or_else(|| {
441                io::Error::new(
442                    ErrorKind::NotFound,
443                    format!("status not found for transaction {}", id),
444                )
445            })?
446            .try_into()?;
447        Ok(status)
448    }
449
450    /// Subscribe to the status of a transaction
451    pub async fn subscribe_transaction_status(
452        &self,
453        id: &str,
454    ) -> io::Result<impl futures::Stream<Item = io::Result<TransactionStatus>>> {
455        use cynic::SubscriptionBuilder;
456        let s = schema::tx::StatusChangeSubscription::build(TxIdArgs { id: id.parse()? });
457
458        let stream = self.subscribe(s).await?.map(|tx| {
459            let tx = tx?;
460            let status = tx.status_change.try_into()?;
461            Ok(status)
462        });
463
464        Ok(stream)
465    }
466
467    /// Awaits for the transaction to be committed into a block
468    ///
469    /// This will wait forever if needed, so consider wrapping this call
470    /// with a `tokio::time::timeout`.
471    pub async fn await_transaction_commit(
472        &self,
473        id: &str,
474    ) -> io::Result<TransactionStatus> {
475        // skip until we've reached a final status and then stop consuming the stream
476        // to avoid an EOF which the eventsource client considers as an error.
477        let status_result = self
478            .subscribe_transaction_status(id)
479            .await?
480            .skip_while(|status| {
481                future::ready(matches!(status, Ok(TransactionStatus::Submitted { .. })))
482            })
483            .next()
484            .await;
485
486        if let Some(Ok(status)) = status_result {
487            Ok(status)
488        } else {
489            Err(io::Error::new(
490                io::ErrorKind::Other,
491                "Failed to get status for transaction",
492            ))
493        }
494    }
495
496    /// returns a paginated set of transactions sorted by block height
497    pub async fn transactions(
498        &self,
499        request: PaginationRequest<String>,
500    ) -> io::Result<PaginatedResult<TransactionResponse, String>> {
501        let query = schema::tx::TransactionsQuery::build(request.into());
502        let transactions = self.query(query).await?.transactions.try_into()?;
503        Ok(transactions)
504    }
505
506    /// Returns a paginated set of transactions associated with a txo owner address.
507    pub async fn transactions_by_owner(
508        &self,
509        owner: &str,
510        request: PaginationRequest<String>,
511    ) -> io::Result<PaginatedResult<TransactionResponse, String>> {
512        let owner: schema::Address = owner.parse()?;
513        let query = schema::tx::TransactionsByOwnerQuery::build((owner, request).into());
514
515        let transactions = self.query(query).await?.transactions_by_owner.try_into()?;
516        Ok(transactions)
517    }
518
519    pub async fn receipts(
520        &self,
521        id: &str,
522    ) -> io::Result<Vec<::fuel_vm::fuel_tx::Receipt>> {
523        let query = schema::tx::TransactionQuery::build(TxIdArgs { id: id.parse()? });
524
525        let tx = self.query(query).await?.transaction.ok_or_else(|| {
526            io::Error::new(ErrorKind::NotFound, format!("transaction {} not found", id))
527        })?;
528
529        let receipts: Result<Vec<::fuel_vm::fuel_tx::Receipt>, ConversionError> = tx
530            .receipts
531            .unwrap_or_default()
532            .into_iter()
533            .map(|r| r.try_into())
534            .collect();
535
536        Ok(receipts?)
537    }
538
539    pub async fn produce_blocks(
540        &self,
541        blocks_to_produce: u64,
542        time: Option<TimeParameters>,
543    ) -> io::Result<u64> {
544        let query = schema::block::BlockMutation::build(ProduceBlockArgs {
545            blocks_to_produce: blocks_to_produce.into(),
546            time,
547        });
548
549        let new_height = self.query(query).await?.produce_blocks;
550
551        Ok(new_height.into())
552    }
553
554    pub async fn block(&self, id: &str) -> io::Result<Option<schema::block::Block>> {
555        let query = schema::block::BlockByIdQuery::build(BlockByIdArgs {
556            id: Some(id.parse()?),
557        });
558
559        let block = self.query(query).await?.block;
560
561        Ok(block)
562    }
563
564    pub async fn block_by_height(
565        &self,
566        height: u64,
567    ) -> io::Result<Option<schema::block::Block>> {
568        let query = schema::block::BlockByHeightQuery::build(BlockByHeightArgs {
569            height: Some(U64(height)),
570        });
571
572        let block = self.query(query).await?.block;
573
574        Ok(block)
575    }
576
577    /// Retrieve multiple blocks
578    pub async fn blocks(
579        &self,
580        request: PaginationRequest<String>,
581    ) -> io::Result<PaginatedResult<schema::block::Block, String>> {
582        let query = schema::block::BlocksQuery::build(request.into());
583
584        let blocks = self.query(query).await?.blocks.into();
585
586        Ok(blocks)
587    }
588
589    pub async fn coin(&self, id: &str) -> io::Result<Option<Coin>> {
590        let query = schema::coin::CoinByIdQuery::build(CoinByIdArgs {
591            utxo_id: id.parse()?,
592        });
593        let coin = self.query(query).await?.coin;
594        Ok(coin)
595    }
596
597    /// Retrieve a page of coins by their owner
598    pub async fn coins(
599        &self,
600        owner: &str,
601        asset_id: Option<&str>,
602        request: PaginationRequest<String>,
603    ) -> io::Result<PaginatedResult<schema::coin::Coin, String>> {
604        let owner: schema::Address = owner.parse()?;
605        let asset_id: schema::AssetId = match asset_id {
606            Some(asset_id) => asset_id.parse()?,
607            None => schema::AssetId::default(),
608        };
609        let query = schema::coin::CoinsQuery::build((owner, asset_id, request).into());
610
611        let coins = self.query(query).await?.coins.into();
612        Ok(coins)
613    }
614
615    /// Retrieve resources to spend in a transaction
616    pub async fn resources_to_spend(
617        &self,
618        owner: &str,
619        spend_query: Vec<(&str, u64, Option<u64>)>,
620        // (Utxos, messages)
621        excluded_ids: Option<(Vec<&str>, Vec<&str>)>,
622    ) -> io::Result<Vec<Vec<schema::resource::Resource>>> {
623        let owner: schema::Address = owner.parse()?;
624        let spend_query: Vec<SpendQueryElementInput> = spend_query
625            .iter()
626            .map(|(asset_id, amount, max)| -> Result<_, ConversionError> {
627                Ok(SpendQueryElementInput {
628                    asset_id: asset_id.parse()?,
629                    amount: (*amount).into(),
630                    max: (*max).map(|max| max.into()),
631                })
632            })
633            .try_collect()?;
634        let excluded_ids: Option<ExcludeInput> =
635            excluded_ids.map(ExcludeInput::from_tuple).transpose()?;
636        let query = schema::resource::ResourcesToSpendQuery::build(
637            (owner, spend_query, excluded_ids).into(),
638        );
639
640        let resources_per_asset = self.query(query).await?.resources_to_spend;
641        Ok(resources_per_asset)
642    }
643
644    pub async fn contract(&self, id: &str) -> io::Result<Option<Contract>> {
645        let query = schema::contract::ContractByIdQuery::build(ContractByIdArgs {
646            id: id.parse()?,
647        });
648        let contract = self.query(query).await?.contract;
649        Ok(contract)
650    }
651
652    pub async fn contract_balance(
653        &self,
654        id: &str,
655        asset: Option<&str>,
656    ) -> io::Result<u64> {
657        let asset_id: schema::AssetId = match asset {
658            Some(asset) => asset.parse()?,
659            None => schema::AssetId::default(),
660        };
661
662        let query =
663            schema::contract::ContractBalanceQuery::build(ContractBalanceQueryArgs {
664                id: id.parse()?,
665                asset: asset_id,
666            });
667
668        let balance = self.query(query).await.unwrap().contract_balance.amount;
669        Ok(balance.into())
670    }
671
672    pub async fn balance(&self, owner: &str, asset_id: Option<&str>) -> io::Result<u64> {
673        let owner: schema::Address = owner.parse()?;
674        let asset_id: schema::AssetId = match asset_id {
675            Some(asset_id) => asset_id.parse()?,
676            None => schema::AssetId::default(),
677        };
678        let query = schema::balance::BalanceQuery::build(BalanceArgs { owner, asset_id });
679        let balance = self.query(query).await?.balance;
680        Ok(balance.amount.into())
681    }
682
683    // Retrieve a page of balances by their owner
684    pub async fn balances(
685        &self,
686        owner: &str,
687        request: PaginationRequest<String>,
688    ) -> io::Result<PaginatedResult<schema::balance::Balance, String>> {
689        let owner: schema::Address = owner.parse()?;
690        let query = schema::balance::BalancesQuery::build((owner, request).into());
691
692        let balances = self.query(query).await?.balances.into();
693        Ok(balances)
694    }
695
696    pub async fn contract_balances(
697        &self,
698        contract: &str,
699        request: PaginationRequest<String>,
700    ) -> io::Result<PaginatedResult<schema::contract::ContractBalance, String>> {
701        let contract_id: schema::ContractId = contract.parse()?;
702        let query =
703            schema::contract::ContractBalancesQuery::build((contract_id, request).into());
704
705        let balances = self.query(query).await?.contract_balances.into();
706
707        Ok(balances)
708    }
709
710    pub async fn messages(
711        &self,
712        owner: Option<&str>,
713        request: PaginationRequest<String>,
714    ) -> io::Result<PaginatedResult<schema::message::Message, String>> {
715        let owner: Option<schema::Address> =
716            owner.map(|owner| owner.parse()).transpose()?;
717        let query = schema::message::OwnedMessageQuery::build((owner, request).into());
718
719        let messages = self.query(query).await?.messages.into();
720
721        Ok(messages)
722    }
723
724    /// Request a merkle proof of an output message.
725    pub async fn message_proof(
726        &self,
727        transaction_id: &str,
728        message_id: &str,
729    ) -> io::Result<Option<schema::message::MessageProof>> {
730        let transaction_id: schema::TransactionId = transaction_id.parse()?;
731        let message_id: schema::MessageId = message_id.parse()?;
732        let query = schema::message::MessageProofQuery::build(MessageProofArgs {
733            transaction_id,
734            message_id,
735        });
736
737        let proof = self.query(query).await?.message_proof;
738
739        Ok(proof)
740    }
741}
742
743#[cfg(any(test, feature = "test-helpers"))]
744impl FuelClient {
745    pub async fn transparent_transaction(
746        &self,
747        id: &str,
748    ) -> io::Result<Option<::fuel_vm::fuel_tx::Transaction>> {
749        let query = schema::tx::TransactionQuery::build(TxIdArgs { id: id.parse()? });
750
751        let transaction = self.query(query).await?.transaction;
752
753        Ok(transaction.map(|tx| tx.try_into()).transpose()?)
754    }
755}