fuel_core_client/
client.rs

1#[cfg(feature = "subscriptions")]
2use crate::client::types::StatusWithTransaction;
3use crate::{
4    client::{
5        schema::{
6            block::BlockByHeightArgs,
7            coins::{
8                ExcludeInput,
9                SpendQueryElementInput,
10            },
11            contract::ContractBalanceQueryArgs,
12            gas_price::EstimateGasPrice,
13            message::MessageStatusArgs,
14            relayed_tx::RelayedTransactionStatusArgs,
15            tx::{
16                DryRunArg,
17                TxWithEstimatedPredicatesArg,
18            },
19            Tai64Timestamp,
20            TransactionId,
21        },
22        types::{
23            asset::AssetDetail,
24            gas_price::LatestGasPrice,
25            message::MessageStatus,
26            primitives::{
27                Address,
28                AssetId,
29                BlockId,
30                ContractId,
31                UtxoId,
32            },
33            upgrades::StateTransitionBytecode,
34            RelayedTransactionStatus,
35        },
36    },
37    reqwest_ext::{
38        FuelGraphQlResponse,
39        FuelOperation,
40        ReqwestExt,
41    },
42};
43use anyhow::Context;
44#[cfg(feature = "subscriptions")]
45use base64::prelude::{
46    Engine as _,
47    BASE64_STANDARD,
48};
49#[cfg(feature = "subscriptions")]
50use cynic::StreamingOperation;
51use cynic::{
52    Id,
53    MutationBuilder,
54    Operation,
55    QueryBuilder,
56};
57use fuel_core_types::{
58    blockchain::header::{
59        ConsensusParametersVersion,
60        StateTransitionBytecodeVersion,
61    },
62    fuel_asm::{
63        Instruction,
64        Word,
65    },
66    fuel_tx::{
67        BlobId,
68        Bytes32,
69        ConsensusParameters,
70        Receipt,
71        Transaction,
72        TxId,
73    },
74    fuel_types::{
75        self,
76        canonical::Serialize,
77        BlockHeight,
78        Nonce,
79    },
80    services::executor::{
81        StorageReadReplayEvent,
82        TransactionExecutionStatus,
83    },
84};
85#[cfg(feature = "subscriptions")]
86use futures::{
87    Stream,
88    StreamExt,
89};
90use itertools::Itertools;
91use pagination::{
92    PageDirection,
93    PaginatedResult,
94    PaginationRequest,
95};
96use schema::{
97    assets::AssetInfoArg,
98    balance::BalanceArgs,
99    blob::BlobByIdArgs,
100    block::BlockByIdArgs,
101    coins::{
102        CoinByIdArgs,
103        CoinsConnectionArgs,
104    },
105    contract::{
106        ContractBalancesConnectionArgs,
107        ContractByIdArgs,
108    },
109    da_compressed::DaCompressedBlockByHeightArgs,
110    gas_price::BlockHorizonArgs,
111    storage_read_replay::{
112        StorageReadReplay,
113        StorageReadReplayArgs,
114    },
115    tx::{
116        AssembleTxArg,
117        TransactionsByOwnerConnectionArgs,
118        TxArg,
119        TxIdArgs,
120    },
121    Bytes,
122    ContinueTx,
123    ContinueTxArgs,
124    ConversionError,
125    HexString,
126    IdArg,
127    MemoryArgs,
128    RegisterArgs,
129    RunResult,
130    SetBreakpoint,
131    SetBreakpointArgs,
132    SetSingleStepping,
133    SetSingleSteppingArgs,
134    StartTx,
135    StartTxArgs,
136    U32,
137    U64,
138};
139#[cfg(feature = "subscriptions")]
140use std::future;
141use std::{
142    convert::TryInto,
143    io::{
144        self,
145        ErrorKind,
146    },
147    net,
148    str::{
149        self,
150        FromStr,
151    },
152    sync::{
153        Arc,
154        Mutex,
155    },
156};
157use tai64::Tai64;
158use tracing as _;
159use types::{
160    assemble_tx::{
161        AssembleTransactionResult,
162        RequiredBalance,
163    },
164    TransactionResponse,
165    TransactionStatus,
166};
167
168use self::schema::{
169    block::ProduceBlockArgs,
170    message::{
171        MessageProofArgs,
172        NonceArgs,
173    },
174};
175
176pub mod pagination;
177pub mod schema;
178pub mod types;
179
180type RegisterId = u32;
181
182#[derive(Debug, derive_more::Display, derive_more::From)]
183#[non_exhaustive]
184/// Error occurring during interaction with the FuelClient
185// anyhow::Error is wrapped inside a custom Error type,
186// so that we can specific error variants in the future.
187pub enum Error {
188    /// Unknown or not expected(by architecture) error.
189    #[from]
190    Other(anyhow::Error),
191}
192
193/// Consistency policy for the [`FuelClient`] to define the strategy
194/// for the required height feature.
195#[derive(Debug)]
196pub enum ConsistencyPolicy {
197    /// Automatically fetch the next block height from the response and
198    /// use it as an input to the next query to guarantee consistency
199    /// of the results for the queries.
200    Auto {
201        /// The required block height for the queries.
202        height: Arc<Mutex<Option<BlockHeight>>>,
203    },
204    /// Use manually sets the block height for all queries
205    /// via the [`FuelClient::with_required_fuel_block_height`].
206    Manual {
207        /// The required block height for the queries.
208        height: Option<BlockHeight>,
209    },
210}
211
212impl Clone for ConsistencyPolicy {
213    fn clone(&self) -> Self {
214        match self {
215            Self::Auto { height } => Self::Auto {
216                // We don't want to share the same mutex between the different
217                // instances of the `FuelClient`.
218                height: Arc::new(Mutex::new(height.lock().ok().and_then(|h| *h))),
219            },
220            Self::Manual { height } => Self::Manual { height: *height },
221        }
222    }
223}
224
225#[derive(Debug, Default)]
226struct ChainStateInfo {
227    current_stf_version: Arc<Mutex<Option<StateTransitionBytecodeVersion>>>,
228    current_consensus_parameters_version: Arc<Mutex<Option<ConsensusParametersVersion>>>,
229}
230
231impl Clone for ChainStateInfo {
232    fn clone(&self) -> Self {
233        Self {
234            current_stf_version: Arc::new(Mutex::new(
235                self.current_stf_version.lock().ok().and_then(|v| *v),
236            )),
237            current_consensus_parameters_version: Arc::new(Mutex::new(
238                self.current_consensus_parameters_version
239                    .lock()
240                    .ok()
241                    .and_then(|v| *v),
242            )),
243        }
244    }
245}
246
247#[derive(Debug, Clone)]
248pub struct FuelClient {
249    client: reqwest::Client,
250    #[cfg(feature = "subscriptions")]
251    cookie: std::sync::Arc<reqwest::cookie::Jar>,
252    url: reqwest::Url,
253    require_height: ConsistencyPolicy,
254    chain_state_info: ChainStateInfo,
255}
256
257impl FromStr for FuelClient {
258    type Err = anyhow::Error;
259
260    fn from_str(str: &str) -> Result<Self, Self::Err> {
261        let mut raw_url = str.to_string();
262        if !raw_url.starts_with("http") {
263            raw_url = format!("http://{raw_url}");
264        }
265
266        let mut url = reqwest::Url::parse(&raw_url)
267            .map_err(anyhow::Error::msg)
268            .with_context(|| format!("Invalid fuel-core URL: {str}"))?;
269        url.set_path("/v1/graphql");
270
271        #[cfg(feature = "subscriptions")]
272        {
273            let cookie = std::sync::Arc::new(reqwest::cookie::Jar::default());
274            let client = reqwest::Client::builder()
275                .cookie_provider(cookie.clone())
276                .build()
277                .map_err(anyhow::Error::msg)?;
278            Ok(Self {
279                client,
280                cookie,
281                url,
282                require_height: ConsistencyPolicy::Auto {
283                    height: Arc::new(Mutex::new(None)),
284                },
285                chain_state_info: Default::default(),
286            })
287        }
288
289        #[cfg(not(feature = "subscriptions"))]
290        {
291            let client = reqwest::Client::new();
292            Ok(Self {
293                client,
294                url,
295                require_height: ConsistencyPolicy::Auto {
296                    height: Arc::new(Mutex::new(None)),
297                },
298                chain_state_info: Default::default(),
299            })
300        }
301    }
302}
303
304impl<S> From<S> for FuelClient
305where
306    S: Into<net::SocketAddr>,
307{
308    fn from(socket: S) -> Self {
309        format!("http://{}", socket.into())
310            .as_str()
311            .parse()
312            .unwrap()
313    }
314}
315
316pub fn from_strings_errors_to_std_error(errors: Vec<String>) -> io::Error {
317    let e = errors
318        .into_iter()
319        .fold(String::from("Response errors"), |mut s, e| {
320            s.push_str("; ");
321            s.push_str(e.as_str());
322            s
323        });
324    io::Error::new(io::ErrorKind::Other, e)
325}
326
327impl FuelClient {
328    pub fn new(url: impl AsRef<str>) -> anyhow::Result<Self> {
329        Self::from_str(url.as_ref())
330    }
331
332    pub fn with_required_fuel_block_height(
333        &mut self,
334        new_height: Option<BlockHeight>,
335    ) -> &mut Self {
336        match &mut self.require_height {
337            ConsistencyPolicy::Auto { height } => {
338                *height.lock().expect("Mutex poisoned") = new_height;
339            }
340            ConsistencyPolicy::Manual { height } => {
341                *height = new_height;
342            }
343        }
344        self
345    }
346
347    pub fn use_manual_consistency_policy(
348        &mut self,
349        height: Option<BlockHeight>,
350    ) -> &mut Self {
351        self.require_height = ConsistencyPolicy::Manual { height };
352        self
353    }
354
355    pub fn required_block_height(&self) -> Option<BlockHeight> {
356        match &self.require_height {
357            ConsistencyPolicy::Auto { height } => height.lock().ok().and_then(|h| *h),
358            ConsistencyPolicy::Manual { height } => *height,
359        }
360    }
361
362    fn update_chain_state_info<R, E>(&self, response: &FuelGraphQlResponse<R, E>) {
363        if let Some(current_sft_version) = response
364            .extensions
365            .as_ref()
366            .and_then(|e| e.current_stf_version)
367        {
368            if let Ok(mut c) = self.chain_state_info.current_stf_version.lock() {
369                *c = Some(current_sft_version);
370            }
371        }
372
373        if let Some(current_consensus_parameters_version) = response
374            .extensions
375            .as_ref()
376            .and_then(|e| e.current_consensus_parameters_version)
377        {
378            if let Ok(mut c) = self
379                .chain_state_info
380                .current_consensus_parameters_version
381                .lock()
382            {
383                *c = Some(current_consensus_parameters_version);
384            }
385        }
386
387        let inner_required_height = match &self.require_height {
388            ConsistencyPolicy::Auto { height } => Some(height.clone()),
389            ConsistencyPolicy::Manual { .. } => None,
390        };
391
392        if let Some(inner_required_height) = inner_required_height {
393            if let Some(current_fuel_block_height) = response
394                .extensions
395                .as_ref()
396                .and_then(|e| e.current_fuel_block_height)
397            {
398                let mut lock = inner_required_height.lock().expect("Mutex poisoned");
399
400                if current_fuel_block_height >= lock.unwrap_or_default() {
401                    *lock = Some(current_fuel_block_height);
402                }
403            }
404        }
405    }
406
407    /// Send the GraphQL query to the client.
408    pub async fn query<ResponseData, Vars>(
409        &self,
410        q: Operation<ResponseData, Vars>,
411    ) -> io::Result<ResponseData>
412    where
413        Vars: serde::Serialize,
414        ResponseData: serde::de::DeserializeOwned + 'static,
415    {
416        let required_fuel_block_height = self.required_block_height();
417        let fuel_operation = FuelOperation::new(q, required_fuel_block_height);
418        let response = self
419            .client
420            .post(self.url.clone())
421            .run_fuel_graphql(fuel_operation)
422            .await
423            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
424
425        self.decode_response(response)
426    }
427
428    fn decode_response<R, E>(&self, response: FuelGraphQlResponse<R, E>) -> io::Result<R>
429    where
430        R: serde::de::DeserializeOwned + 'static,
431    {
432        self.update_chain_state_info(&response);
433
434        if let Some(failed) = response
435            .extensions
436            .as_ref()
437            .and_then(|e| e.fuel_block_height_precondition_failed)
438        {
439            if failed {
440                return Err(io::Error::new(
441                    io::ErrorKind::Other,
442                    "The required block height was not met",
443                ));
444            }
445        }
446
447        let response = response.response;
448
449        match (response.data, response.errors) {
450            (Some(d), _) => Ok(d),
451            (_, Some(e)) => Err(from_strings_errors_to_std_error(
452                e.into_iter().map(|e| e.message).collect(),
453            )),
454            _ => Err(io::Error::new(io::ErrorKind::Other, "Invalid response")),
455        }
456    }
457
458    #[tracing::instrument(skip_all)]
459    #[cfg(feature = "subscriptions")]
460    async fn subscribe<ResponseData, Vars>(
461        &self,
462        q: StreamingOperation<ResponseData, Vars>,
463    ) -> io::Result<impl futures::Stream<Item = io::Result<ResponseData>> + '_>
464    where
465        Vars: serde::Serialize,
466        ResponseData: serde::de::DeserializeOwned + 'static,
467    {
468        use core::ops::Deref;
469        use eventsource_client as es;
470        use hyper_rustls as _;
471        use reqwest::cookie::CookieStore;
472        let mut url = self.url.clone();
473        url.set_path("/v1/graphql-sub");
474
475        let required_fuel_block_height = self.required_block_height();
476        let fuel_operation = FuelOperation::new(q, required_fuel_block_height);
477
478        let json_query = serde_json::to_string(&fuel_operation)?;
479        let mut client_builder = es::ClientBuilder::for_url(url.as_str())
480            .map_err(|e| {
481                io::Error::new(
482                    io::ErrorKind::Other,
483                    format!("Failed to start client {e:?}"),
484                )
485            })?
486            .body(json_query)
487            .method("POST".to_string())
488            .header("content-type", "application/json")
489            .map_err(|e| {
490                io::Error::new(
491                    io::ErrorKind::Other,
492                    format!("Failed to add header to client {e:?}"),
493                )
494            })?;
495        if let Some(password) = url.password() {
496            let username = url.username();
497            let credentials = format!("{}:{}", username, password);
498            let authorization = format!("Basic {}", BASE64_STANDARD.encode(credentials));
499            client_builder = client_builder
500                .header("Authorization", &authorization)
501                .map_err(|e| {
502                    io::Error::new(
503                        io::ErrorKind::Other,
504                        format!("Failed to add header to client {e:?}"),
505                    )
506                })?;
507        }
508
509        if let Some(value) = self.cookie.deref().cookies(&self.url) {
510            let value = value.to_str().map_err(|e| {
511                io::Error::new(
512                    io::ErrorKind::Other,
513                    format!("Unable convert header value to string {e:?}"),
514                )
515            })?;
516            client_builder = client_builder
517                .header(reqwest::header::COOKIE.as_str(), value)
518                .map_err(|e| {
519                    io::Error::new(
520                        io::ErrorKind::Other,
521                        format!("Failed to add header from `reqwest` to client {e:?}"),
522                    )
523                })?;
524        }
525
526        let client = client_builder.build_with_conn(
527            hyper_rustls::HttpsConnectorBuilder::new()
528                .with_webpki_roots()
529                .https_or_http()
530                .enable_http1()
531                .build(),
532        );
533
534        let mut last = None;
535
536        let stream = es::Client::stream(&client)
537            .take_while(|result| {
538                futures::future::ready(!matches!(result, Err(es::Error::Eof)))
539            })
540            .filter_map(move |result| {
541                tracing::debug!("Got result: {result:?}");
542                let r = match result {
543                    Ok(es::SSE::Event(es::Event { data, .. })) => {
544                        match serde_json::from_str::<FuelGraphQlResponse<ResponseData>>(
545                            &data,
546                        ) {
547                            Ok(resp) => {
548                                match self.decode_response(resp) {
549                                    Ok(resp) => {
550                                        match last.replace(data) {
551                                            // Remove duplicates
552                                            Some(l)
553                                                if l == *last.as_ref().expect(
554                                                    "Safe because of the replace above",
555                                                ) =>
556                                            {
557                                                None
558                                            }
559                                            _ => Some(Ok(resp)),
560                                        }
561                                    }
562                                    Err(e) => Some(Err(io::Error::new(
563                                        io::ErrorKind::Other,
564                                        format!("Decode error: {e:?}"),
565                                    ))),
566                                }
567                            }
568                            Err(e) => Some(Err(io::Error::new(
569                                io::ErrorKind::Other,
570                                format!("Json error: {e:?}"),
571                            ))),
572                        }
573                    }
574                    Ok(_) => None,
575                    Err(e) => Some(Err(io::Error::new(
576                        io::ErrorKind::Other,
577                        format!("Graphql error: {e:?}"),
578                    ))),
579                };
580                futures::future::ready(r)
581            });
582
583        Ok(stream)
584    }
585
586    pub fn latest_stf_version(&self) -> Option<StateTransitionBytecodeVersion> {
587        self.chain_state_info
588            .current_stf_version
589            .lock()
590            .ok()
591            .and_then(|value| *value)
592    }
593
594    pub fn latest_consensus_parameters_version(
595        &self,
596    ) -> Option<ConsensusParametersVersion> {
597        self.chain_state_info
598            .current_consensus_parameters_version
599            .lock()
600            .ok()
601            .and_then(|value| *value)
602    }
603
604    pub async fn health(&self) -> io::Result<bool> {
605        let query = schema::Health::build(());
606        self.query(query).await.map(|r| r.health)
607    }
608
609    pub async fn node_info(&self) -> io::Result<types::NodeInfo> {
610        let query = schema::node_info::QueryNodeInfo::build(());
611        self.query(query).await.map(|r| r.node_info.into())
612    }
613
614    pub async fn latest_gas_price(&self) -> io::Result<LatestGasPrice> {
615        let query = schema::gas_price::QueryLatestGasPrice::build(());
616        self.query(query).await.map(|r| r.latest_gas_price.into())
617    }
618
619    pub async fn estimate_gas_price(
620        &self,
621        block_horizon: u32,
622    ) -> io::Result<EstimateGasPrice> {
623        let args = BlockHorizonArgs {
624            block_horizon: Some(block_horizon.into()),
625        };
626        let query = schema::gas_price::QueryEstimateGasPrice::build(args);
627        self.query(query).await.map(|r| r.estimate_gas_price)
628    }
629
630    #[cfg(feature = "std")]
631    pub async fn connected_peers_info(
632        &self,
633    ) -> io::Result<Vec<fuel_core_types::services::p2p::PeerInfo>> {
634        let query = schema::node_info::QueryPeersInfo::build(());
635        self.query(query)
636            .await
637            .map(|r| r.node_info.peers.into_iter().map(Into::into).collect())
638    }
639
640    pub async fn chain_info(&self) -> io::Result<types::ChainInfo> {
641        let query = schema::chain::ChainQuery::build(());
642        self.query(query).await.and_then(|r| {
643            let result = r.chain.try_into()?;
644            Ok(result)
645        })
646    }
647
648    pub async fn consensus_parameters(
649        &self,
650        version: i32,
651    ) -> io::Result<Option<ConsensusParameters>> {
652        let args = schema::upgrades::ConsensusParametersByVersionArgs { version };
653        let query = schema::upgrades::ConsensusParametersByVersionQuery::build(args);
654
655        let result = self
656            .query(query)
657            .await?
658            .consensus_parameters
659            .map(TryInto::try_into)
660            .transpose()?;
661
662        Ok(result)
663    }
664
665    pub async fn state_transition_byte_code_by_version(
666        &self,
667        version: i32,
668    ) -> io::Result<Option<StateTransitionBytecode>> {
669        let args = schema::upgrades::StateTransitionBytecodeByVersionArgs { version };
670        let query = schema::upgrades::StateTransitionBytecodeByVersionQuery::build(args);
671
672        let result = self
673            .query(query)
674            .await?
675            .state_transition_bytecode_by_version
676            .map(TryInto::try_into)
677            .transpose()?;
678
679        Ok(result)
680    }
681
682    pub async fn state_transition_byte_code_by_root(
683        &self,
684        root: Bytes32,
685    ) -> io::Result<Option<StateTransitionBytecode>> {
686        let args = schema::upgrades::StateTransitionBytecodeByRootArgs {
687            root: HexString(Bytes(root.to_vec())),
688        };
689        let query = schema::upgrades::StateTransitionBytecodeByRootQuery::build(args);
690
691        let result = self
692            .query(query)
693            .await?
694            .state_transition_bytecode_by_root
695            .map(TryInto::try_into)
696            .transpose()?;
697
698        Ok(result)
699    }
700
701    /// Default dry run, matching the exact configuration as the node
702    pub async fn dry_run(
703        &self,
704        txs: &[Transaction],
705    ) -> io::Result<Vec<TransactionExecutionStatus>> {
706        self.dry_run_opt(txs, None, None, None).await
707    }
708
709    /// Dry run with options to override the node behavior
710    pub async fn dry_run_opt(
711        &self,
712        txs: &[Transaction],
713        // Disable utxo input checks (exists, unspent, and valid signature)
714        utxo_validation: Option<bool>,
715        gas_price: Option<u64>,
716        at_height: Option<BlockHeight>,
717    ) -> io::Result<Vec<TransactionExecutionStatus>> {
718        let txs = txs
719            .iter()
720            .map(|tx| HexString(Bytes(tx.to_bytes())))
721            .collect::<Vec<HexString>>();
722        let query: Operation<schema::tx::DryRun, DryRunArg> =
723            schema::tx::DryRun::build(DryRunArg {
724                txs,
725                utxo_validation,
726                gas_price: gas_price.map(|gp| gp.into()),
727                block_height: at_height.map(|bh| bh.into()),
728            });
729        let tx_statuses = self.query(query).await.map(|r| r.dry_run)?;
730        tx_statuses
731            .into_iter()
732            .map(|tx_status| tx_status.try_into().map_err(Into::into))
733            .collect()
734    }
735
736    /// Like `dry_run_opt`, but also returns the storage reads
737    pub async fn dry_run_opt_record_storage_reads(
738        &self,
739        txs: &[Transaction],
740        // Disable utxo input checks (exists, unspent, and valid signature)
741        utxo_validation: Option<bool>,
742        gas_price: Option<u64>,
743        at_height: Option<BlockHeight>,
744    ) -> io::Result<(Vec<TransactionExecutionStatus>, Vec<StorageReadReplayEvent>)> {
745        let txs = txs
746            .iter()
747            .map(|tx| HexString(Bytes(tx.to_bytes())))
748            .collect::<Vec<HexString>>();
749        let query: Operation<schema::tx::DryRunRecordStorageReads, DryRunArg> =
750            schema::tx::DryRunRecordStorageReads::build(DryRunArg {
751                txs,
752                utxo_validation,
753                gas_price: gas_price.map(|gp| gp.into()),
754                block_height: at_height.map(|bh| bh.into()),
755            });
756        let result = self
757            .query(query)
758            .await
759            .map(|r| r.dry_run_record_storage_reads)?;
760        let tx_statuses = result
761            .tx_statuses
762            .into_iter()
763            .map(|tx_status| tx_status.try_into().map_err(Into::into))
764            .collect::<io::Result<Vec<_>>>()?;
765        let storage_reads = result
766            .storage_reads
767            .into_iter()
768            .map(Into::into)
769            .collect::<Vec<_>>();
770        Ok((tx_statuses, storage_reads))
771    }
772
773    /// Get storage read replay for a block
774    pub async fn storage_read_replay(
775        &self,
776        height: &BlockHeight,
777    ) -> io::Result<Vec<StorageReadReplayEvent>> {
778        let query: Operation<StorageReadReplay, StorageReadReplayArgs> =
779            StorageReadReplay::build(StorageReadReplayArgs {
780                height: (*height).into(),
781            });
782        Ok(self
783            .query(query)
784            .await
785            .map(|r| r.storage_read_replay)?
786            .into_iter()
787            .map(Into::into)
788            .collect())
789    }
790
791    /// Assembles the transaction based on the provided requirements.
792    /// The return transaction contains:
793    /// - Input coins to cover `required_balances`
794    /// - Input coins to cover the fee of the transaction based on the gas price from `block_horizon`
795    /// - `Change` or `Destroy` outputs for all assets from the inputs
796    /// - `Variable` outputs in the case they are required during the execution
797    /// - `Contract` inputs and outputs in the case they are required during the execution
798    /// - Reserved witness slots for signed coins filled with `64` zeroes
799    /// - Set script gas limit(unless `script` is empty)
800    /// - Estimated predicates, if `estimate_predicates == true`
801    ///
802    /// Returns an error if:
803    /// - The number of required balances exceeds the maximum number of inputs allowed.
804    /// - The fee address index is out of bounds.
805    /// - The same asset has multiple change policies(either the receiver of
806    ///     the change is different, or one of the policies states about the destruction
807    ///     of the token while the other does not). The `Change` output from the transaction
808    ///     also count as a `ChangePolicy`.
809    /// - The number of excluded coin IDs exceeds the maximum number of inputs allowed.
810    /// - Required assets have multiple entries.
811    /// - If accounts don't have sufficient amounts to cover the transaction requirements in assets.
812    /// - If a constructed transaction breaks the rules defined by consensus parameters.
813    #[allow(clippy::too_many_arguments)]
814    pub async fn assemble_tx(
815        &self,
816        tx: &Transaction,
817        block_horizon: u32,
818        required_balances: Vec<RequiredBalance>,
819        fee_address_index: u16,
820        exclude: Option<(Vec<UtxoId>, Vec<Nonce>)>,
821        estimate_predicates: bool,
822        reserve_gas: Option<u64>,
823    ) -> io::Result<AssembleTransactionResult> {
824        let tx = HexString(Bytes(tx.to_bytes()));
825        let block_horizon = block_horizon.into();
826
827        let required_balances: Vec<_> = required_balances
828            .into_iter()
829            .map(schema::tx::RequiredBalance::try_from)
830            .collect::<Result<Vec<_>, _>>()?;
831
832        let fee_address_index = fee_address_index.into();
833
834        let exclude_input = exclude.map(Into::into);
835
836        let reserve_gas = reserve_gas.map(U64::from);
837
838        let query_arg = AssembleTxArg {
839            tx,
840            block_horizon,
841            required_balances,
842            fee_address_index,
843            exclude_input,
844            estimate_predicates,
845            reserve_gas,
846        };
847
848        let query = schema::tx::AssembleTx::build(query_arg);
849        let assemble_tx_result = self.query(query).await.map(|r| r.assemble_tx)?;
850        Ok(assemble_tx_result.try_into()?)
851    }
852
853    /// Estimate predicates for the transaction
854    pub async fn estimate_predicates(&self, tx: &mut Transaction) -> io::Result<()> {
855        let serialized_tx = tx.to_bytes();
856        let query = schema::tx::EstimatePredicates::build(TxArg {
857            tx: HexString(Bytes(serialized_tx)),
858        });
859        let tx_with_predicate = self.query(query).await.map(|r| r.estimate_predicates)?;
860        let tx_with_predicate: Transaction = tx_with_predicate.try_into()?;
861        *tx = tx_with_predicate;
862        Ok(())
863    }
864
865    pub async fn submit(
866        &self,
867        tx: &Transaction,
868    ) -> io::Result<types::primitives::TransactionId> {
869        self.submit_opt(tx, None).await
870    }
871
872    pub async fn submit_opt(
873        &self,
874        tx: &Transaction,
875        estimate_predicates: Option<bool>,
876    ) -> io::Result<types::primitives::TransactionId> {
877        let tx = tx.clone().to_bytes();
878        let query = schema::tx::Submit::build(TxWithEstimatedPredicatesArg {
879            tx: HexString(Bytes(tx)),
880            estimate_predicates,
881        });
882
883        let id = self.query(query).await.map(|r| r.submit)?.id.into();
884        Ok(id)
885    }
886
887    /// Similar to [`Self::submit_and_await_commit_opt`], but with default options.
888    #[cfg(feature = "subscriptions")]
889    pub async fn submit_and_await_commit(
890        &self,
891        tx: &Transaction,
892    ) -> io::Result<TransactionStatus> {
893        self.submit_and_await_commit_opt(tx, None).await
894    }
895
896    /// Submit the transaction and wait for it either to be included in
897    /// a block or removed from `TxPool`.
898    ///
899    /// If `estimate_predicates` is set, the predicates will be estimated before
900    /// the transaction is inserted into transaction pool.
901    ///
902    /// This will wait forever if needed, so consider wrapping this call
903    /// with a `tokio::time::timeout`.
904    #[cfg(feature = "subscriptions")]
905    pub async fn submit_and_await_commit_opt(
906        &self,
907        tx: &Transaction,
908        estimate_predicates: Option<bool>,
909    ) -> io::Result<TransactionStatus> {
910        use cynic::SubscriptionBuilder;
911        let tx = tx.clone().to_bytes();
912        let s =
913            schema::tx::SubmitAndAwaitSubscription::build(TxWithEstimatedPredicatesArg {
914                tx: HexString(Bytes(tx)),
915                estimate_predicates,
916            });
917
918        let mut stream = self.subscribe(s).await?.map(
919            |r: io::Result<schema::tx::SubmitAndAwaitSubscription>| {
920                let status: TransactionStatus = r?.submit_and_await.try_into()?;
921                Result::<_, io::Error>::Ok(status)
922            },
923        );
924
925        let status = stream.next().await.ok_or(io::Error::new(
926            io::ErrorKind::Other,
927            "Failed to get status from the submission",
928        ))??;
929
930        Ok(status)
931    }
932
933    /// Similar to [`Self::submit_and_await_commit`], but the status also contains transaction.
934    #[cfg(feature = "subscriptions")]
935    pub async fn submit_and_await_commit_with_tx(
936        &self,
937        tx: &Transaction,
938    ) -> io::Result<StatusWithTransaction> {
939        self.submit_and_await_commit_with_tx_opt(tx, None).await
940    }
941
942    /// Similar to [`Self::submit_and_await_commit_opt`], but the status also contains transaction.
943    #[cfg(feature = "subscriptions")]
944    pub async fn submit_and_await_commit_with_tx_opt(
945        &self,
946        tx: &Transaction,
947        estimate_predicates: Option<bool>,
948    ) -> io::Result<StatusWithTransaction> {
949        use cynic::SubscriptionBuilder;
950        let tx = tx.clone().to_bytes();
951        let s = schema::tx::SubmitAndAwaitSubscriptionWithTransaction::build(
952            TxWithEstimatedPredicatesArg {
953                tx: HexString(Bytes(tx)),
954                estimate_predicates,
955            },
956        );
957
958        let mut stream = self.subscribe(s).await?.map(
959            |r: io::Result<schema::tx::SubmitAndAwaitSubscriptionWithTransaction>| {
960                let status: StatusWithTransaction = r?.submit_and_await.try_into()?;
961                Result::<_, io::Error>::Ok(status)
962            },
963        );
964
965        let status = stream.next().await.ok_or(io::Error::new(
966            io::ErrorKind::Other,
967            "Failed to get status from the submission",
968        ))??;
969
970        Ok(status)
971    }
972
973    /// Similar to [`Self::submit_and_await_commit`], but includes all intermediate states.
974    #[cfg(feature = "subscriptions")]
975    pub async fn submit_and_await_status<'a>(
976        &'a self,
977        tx: &'a Transaction,
978    ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + 'a> {
979        self.submit_and_await_status_opt(tx, None, None).await
980    }
981
982    /// Similar to [`Self::submit_and_await_commit_opt`], but includes all intermediate states.
983    #[cfg(feature = "subscriptions")]
984    pub async fn submit_and_await_status_opt<'a>(
985        &'a self,
986        tx: &'a Transaction,
987        estimate_predicates: Option<bool>,
988        include_preconfirmation: Option<bool>,
989    ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + 'a> {
990        use cynic::SubscriptionBuilder;
991        use schema::tx::SubmitAndAwaitStatusArg;
992        let tx = tx.clone().to_bytes();
993        let s = schema::tx::SubmitAndAwaitStatusSubscription::build(
994            SubmitAndAwaitStatusArg {
995                tx: HexString(Bytes(tx)),
996                estimate_predicates,
997                include_preconfirmation,
998            },
999        );
1000
1001        let stream = self.subscribe(s).await?.map(
1002            |r: io::Result<schema::tx::SubmitAndAwaitStatusSubscription>| {
1003                let status: TransactionStatus = r?.submit_and_await_status.try_into()?;
1004                Result::<_, io::Error>::Ok(status)
1005            },
1006        );
1007
1008        Ok(stream)
1009    }
1010
1011    /// Requests all storage slots for the `contract_id`.
1012    #[cfg(feature = "subscriptions")]
1013    pub async fn contract_storage_slots<'a>(
1014        &'a self,
1015        contract_id: &'a ContractId,
1016    ) -> io::Result<impl Stream<Item = io::Result<(Bytes32, Vec<u8>)>> + 'a> {
1017        use cynic::SubscriptionBuilder;
1018        use schema::storage::ContractStorageSlotsArgs;
1019        let s = schema::storage::ContractStorageSlots::build(ContractStorageSlotsArgs {
1020            contract_id: (*contract_id).into(),
1021        });
1022
1023        let stream = self.subscribe(s).await?.map(
1024            |result: io::Result<schema::storage::ContractStorageSlots>| {
1025                let result: (Bytes32, Vec<u8>) = result?.contract_storage_slots.into();
1026                Result::<_, io::Error>::Ok(result)
1027            },
1028        );
1029
1030        Ok(stream)
1031    }
1032
1033    /// Requests all storage balances for the `contract_id`.
1034    #[cfg(feature = "subscriptions")]
1035    pub async fn contract_storage_balances<'a>(
1036        &'a self,
1037        contract_id: &'a ContractId,
1038    ) -> io::Result<impl Stream<Item = io::Result<schema::contract::ContractBalance>> + 'a>
1039    {
1040        use cynic::SubscriptionBuilder;
1041        use schema::{
1042            contract::ContractBalance,
1043            storage::ContractStorageBalancesArgs,
1044        };
1045        let s = schema::storage::ContractStorageBalances::build(
1046            ContractStorageBalancesArgs {
1047                contract_id: (*contract_id).into(),
1048            },
1049        );
1050
1051        let stream = self.subscribe(s).await?.map(
1052            |result: io::Result<schema::storage::ContractStorageBalances>| {
1053                let result: ContractBalance = result?.contract_storage_balances;
1054                Result::<_, io::Error>::Ok(result)
1055            },
1056        );
1057
1058        Ok(stream)
1059    }
1060
1061    pub async fn contract_slots_values(
1062        &self,
1063        contract_id: &ContractId,
1064        block_height: Option<BlockHeight>,
1065        requested_storage_slots: Vec<Bytes32>,
1066    ) -> io::Result<Vec<(Bytes32, Vec<u8>)>> {
1067        let query = schema::storage::ContractSlotValues::build(
1068            schema::storage::ContractSlotValuesArgs {
1069                contract_id: (*contract_id).into(),
1070                block_height: block_height.map(|b| (*b).into()),
1071                storage_slots: requested_storage_slots
1072                    .into_iter()
1073                    .map(Into::into)
1074                    .collect(),
1075            },
1076        );
1077
1078        self.query(query)
1079            .await
1080            .map(|r| r.contract_slot_values.into_iter().map(Into::into).collect())
1081    }
1082
1083    pub async fn contract_balance_values(
1084        &self,
1085        contract_id: &ContractId,
1086        block_height: Option<BlockHeight>,
1087        requested_storage_slots: Vec<AssetId>,
1088    ) -> io::Result<Vec<schema::contract::ContractBalance>> {
1089        let query = schema::storage::ContractBalanceValues::build(
1090            schema::storage::ContractBalanceValuesArgs {
1091                contract_id: (*contract_id).into(),
1092                block_height: block_height.map(|b| (*b).into()),
1093                assets: requested_storage_slots
1094                    .into_iter()
1095                    .map(Into::into)
1096                    .collect(),
1097            },
1098        );
1099
1100        self.query(query).await.map(|r| {
1101            r.contract_balance_values
1102                .into_iter()
1103                .map(Into::into)
1104                .collect()
1105        })
1106    }
1107
1108    pub async fn start_session(&self) -> io::Result<String> {
1109        let query = schema::StartSession::build(());
1110
1111        self.query(query)
1112            .await
1113            .map(|r| r.start_session.into_inner())
1114    }
1115
1116    pub async fn end_session(&self, id: &str) -> io::Result<bool> {
1117        let query = schema::EndSession::build(IdArg { id: id.into() });
1118
1119        self.query(query).await.map(|r| r.end_session)
1120    }
1121
1122    pub async fn reset(&self, id: &str) -> io::Result<bool> {
1123        let query = schema::Reset::build(IdArg { id: id.into() });
1124
1125        self.query(query).await.map(|r| r.reset)
1126    }
1127
1128    pub async fn execute(&self, id: &str, op: &Instruction) -> io::Result<bool> {
1129        let op = serde_json::to_string(op)?;
1130        let query = schema::Execute::build(schema::ExecuteArgs { id: id.into(), op });
1131
1132        self.query(query).await.map(|r| r.execute)
1133    }
1134
1135    pub async fn register(&self, id: &str, register: RegisterId) -> io::Result<Word> {
1136        let query = schema::Register::build(RegisterArgs {
1137            id: id.into(),
1138            register: register.into(),
1139        });
1140
1141        Ok(self.query(query).await?.register.0 as Word)
1142    }
1143
1144    pub async fn memory(&self, id: &str, start: u32, size: u32) -> io::Result<Vec<u8>> {
1145        let query = schema::Memory::build(MemoryArgs {
1146            id: id.into(),
1147            start: start.into(),
1148            size: size.into(),
1149        });
1150
1151        let memory = self.query(query).await?.memory;
1152
1153        Ok(serde_json::from_str(memory.as_str())?)
1154    }
1155
1156    pub async fn set_breakpoint(
1157        &self,
1158        session_id: &str,
1159        contract: fuel_types::ContractId,
1160        pc: u64,
1161    ) -> io::Result<()> {
1162        let operation = SetBreakpoint::build(SetBreakpointArgs {
1163            id: Id::new(session_id),
1164            bp: schema::Breakpoint {
1165                contract: contract.into(),
1166                pc: U64(pc),
1167            },
1168        });
1169
1170        let response = self.query(operation).await?;
1171        assert!(
1172            response.set_breakpoint,
1173            "Setting breakpoint returned invalid reply"
1174        );
1175        Ok(())
1176    }
1177
1178    pub async fn set_single_stepping(
1179        &self,
1180        session_id: &str,
1181        enable: bool,
1182    ) -> io::Result<()> {
1183        let operation = SetSingleStepping::build(SetSingleSteppingArgs {
1184            id: Id::new(session_id),
1185            enable,
1186        });
1187        self.query(operation).await?;
1188        Ok(())
1189    }
1190
1191    pub async fn start_tx(
1192        &self,
1193        session_id: &str,
1194        tx: &Transaction,
1195    ) -> io::Result<RunResult> {
1196        let operation = StartTx::build(StartTxArgs {
1197            id: Id::new(session_id),
1198            tx: serde_json::to_string(tx).expect("Couldn't serialize tx to json"),
1199        });
1200        let response = self.query(operation).await?.start_tx;
1201        Ok(response)
1202    }
1203
1204    pub async fn continue_tx(&self, session_id: &str) -> io::Result<RunResult> {
1205        let operation = ContinueTx::build(ContinueTxArgs {
1206            id: Id::new(session_id),
1207        });
1208        let response = self.query(operation).await?.continue_tx;
1209        Ok(response)
1210    }
1211
1212    pub async fn transaction(
1213        &self,
1214        id: &TxId,
1215    ) -> io::Result<Option<TransactionResponse>> {
1216        let query = schema::tx::TransactionQuery::build(TxIdArgs { id: (*id).into() });
1217
1218        let transaction = self.query(query).await?.transaction;
1219
1220        Ok(transaction.map(|tx| tx.try_into()).transpose()?)
1221    }
1222
1223    /// Get the status of a transaction
1224    pub async fn transaction_status(&self, id: &TxId) -> io::Result<TransactionStatus> {
1225        let query =
1226            schema::tx::TransactionStatusQuery::build(TxIdArgs { id: (*id).into() });
1227
1228        let status = self.query(query).await?.transaction.ok_or_else(|| {
1229            io::Error::new(
1230                ErrorKind::NotFound,
1231                format!("status not found for transaction {id} "),
1232            )
1233        })?;
1234
1235        let status = status
1236            .status
1237            .ok_or_else(|| {
1238                io::Error::new(
1239                    ErrorKind::NotFound,
1240                    format!("status not found for transaction {id}"),
1241                )
1242            })?
1243            .try_into()?;
1244        Ok(status)
1245    }
1246
1247    #[tracing::instrument(skip(self), level = "debug")]
1248    #[cfg(feature = "subscriptions")]
1249    /// Similar to [`Self::subscribe_transaction_status_opt`], but with default options.
1250    pub async fn subscribe_transaction_status<'a>(
1251        &'a self,
1252        id: &'a TxId,
1253    ) -> io::Result<impl futures::Stream<Item = io::Result<TransactionStatus>> + 'a> {
1254        self.subscribe_transaction_status_opt(id, None).await
1255    }
1256
1257    #[cfg(feature = "subscriptions")]
1258    /// Subscribe to the status of a transaction
1259    pub async fn subscribe_transaction_status_opt<'a>(
1260        &'a self,
1261        id: &'a TxId,
1262        include_preconfirmation: Option<bool>,
1263    ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + 'a> {
1264        use cynic::SubscriptionBuilder;
1265        use schema::tx::StatusChangeSubscriptionArgs;
1266        let tx_id: TransactionId = (*id).into();
1267        let s =
1268            schema::tx::StatusChangeSubscription::build(StatusChangeSubscriptionArgs {
1269                id: tx_id,
1270                include_preconfirmation,
1271            });
1272
1273        tracing::debug!("subscribing");
1274        let stream = self.subscribe(s).await?.map(|tx| {
1275            tracing::debug!("received {tx:?}");
1276            let tx = tx?;
1277            let status = tx.status_change.try_into()?;
1278            Ok(status)
1279        });
1280
1281        Ok(stream)
1282    }
1283
1284    #[cfg(feature = "subscriptions")]
1285    /// Awaits for the transaction to be committed into a block
1286    ///
1287    /// This will wait forever if needed, so consider wrapping this call
1288    /// with a `tokio::time::timeout`.
1289    pub async fn await_transaction_commit(
1290        &self,
1291        id: &TxId,
1292    ) -> io::Result<TransactionStatus> {
1293        // skip until we've reached a final status and then stop consuming the stream
1294        // to avoid an EOF which the eventsource client considers as an error.
1295        let status_result = self
1296            .subscribe_transaction_status(id)
1297            .await?
1298            .skip_while(|status| {
1299                future::ready(status.as_ref().map_or(true, |status| !status.is_final()))
1300            })
1301            .next()
1302            .await;
1303
1304        if let Some(Ok(status)) = status_result {
1305            Ok(status)
1306        } else {
1307            Err(io::Error::new(
1308                io::ErrorKind::Other,
1309                format!("Failed to get status for transaction {status_result:?}"),
1310            ))
1311        }
1312    }
1313
1314    /// returns a paginated set of transactions sorted by block height
1315    pub async fn transactions(
1316        &self,
1317        request: PaginationRequest<String>,
1318    ) -> io::Result<PaginatedResult<TransactionResponse, String>> {
1319        let args = schema::ConnectionArgs::from(request);
1320        let query = schema::tx::TransactionsQuery::build(args);
1321        let transactions = self.query(query).await?.transactions.try_into()?;
1322        Ok(transactions)
1323    }
1324
1325    /// Returns a paginated set of transactions associated with a txo owner address.
1326    pub async fn transactions_by_owner(
1327        &self,
1328        owner: &Address,
1329        request: PaginationRequest<String>,
1330    ) -> io::Result<PaginatedResult<TransactionResponse, String>> {
1331        let owner: schema::Address = (*owner).into();
1332        let args = TransactionsByOwnerConnectionArgs::from((owner, request));
1333        let query = schema::tx::TransactionsByOwnerQuery::build(args);
1334
1335        let transactions = self.query(query).await?.transactions_by_owner.try_into()?;
1336        Ok(transactions)
1337    }
1338
1339    pub async fn receipts(&self, id: &TxId) -> io::Result<Option<Vec<Receipt>>> {
1340        let query =
1341            schema::tx::TransactionStatusQuery::build(TxIdArgs { id: (*id).into() });
1342
1343        let tx = self.query(query).await?.transaction.ok_or_else(|| {
1344            io::Error::new(ErrorKind::NotFound, format!("transaction {id} not found"))
1345        })?;
1346
1347        let receipts = match tx.status {
1348            Some(status) => match status {
1349                schema::tx::TransactionStatus::SuccessStatus(s) => Some(
1350                    s.receipts
1351                        .into_iter()
1352                        .map(TryInto::<Receipt>::try_into)
1353                        .collect::<Result<Vec<Receipt>, ConversionError>>(),
1354                )
1355                .transpose()?,
1356                schema::tx::TransactionStatus::FailureStatus(s) => Some(
1357                    s.receipts
1358                        .into_iter()
1359                        .map(TryInto::<Receipt>::try_into)
1360                        .collect::<Result<Vec<Receipt>, ConversionError>>(),
1361                )
1362                .transpose()?,
1363                _ => None,
1364            },
1365            _ => None,
1366        };
1367
1368        Ok(receipts)
1369    }
1370
1371    #[cfg(feature = "test-helpers")]
1372    pub async fn all_receipts(&self) -> io::Result<Vec<Receipt>> {
1373        let query = schema::tx::AllReceipts::build(());
1374        let receipts = self.query(query).await?.all_receipts;
1375
1376        let vec: Result<Vec<Receipt>, ConversionError> = receipts
1377            .into_iter()
1378            .map(TryInto::<Receipt>::try_into)
1379            .collect();
1380
1381        Ok(vec?)
1382    }
1383
1384    pub async fn produce_blocks(
1385        &self,
1386        blocks_to_produce: u32,
1387        start_timestamp: Option<u64>,
1388    ) -> io::Result<BlockHeight> {
1389        let query = schema::block::BlockMutation::build(ProduceBlockArgs {
1390            blocks_to_produce: blocks_to_produce.into(),
1391            start_timestamp: start_timestamp
1392                .map(|timestamp| Tai64Timestamp::from(Tai64(timestamp))),
1393        });
1394
1395        let new_height = self.query(query).await?.produce_blocks;
1396
1397        Ok(new_height.into())
1398    }
1399
1400    pub async fn block(&self, id: &BlockId) -> io::Result<Option<types::Block>> {
1401        let query = schema::block::BlockByIdQuery::build(BlockByIdArgs {
1402            id: Some((*id).into()),
1403        });
1404
1405        let block = self
1406            .query(query)
1407            .await?
1408            .block
1409            .map(TryInto::try_into)
1410            .transpose()?;
1411
1412        Ok(block)
1413    }
1414
1415    pub async fn block_by_height(
1416        &self,
1417        height: BlockHeight,
1418    ) -> io::Result<Option<types::Block>> {
1419        let query = schema::block::BlockByHeightQuery::build(BlockByHeightArgs {
1420            height: Some(U32(height.into())),
1421        });
1422
1423        let block = self
1424            .query(query)
1425            .await?
1426            .block
1427            .map(TryInto::try_into)
1428            .transpose()?;
1429
1430        Ok(block)
1431    }
1432
1433    pub async fn da_compressed_block(
1434        &self,
1435        height: BlockHeight,
1436    ) -> io::Result<Option<Vec<u8>>> {
1437        let query = schema::da_compressed::DaCompressedBlockByHeightQuery::build(
1438            DaCompressedBlockByHeightArgs {
1439                height: U32(height.into()),
1440            },
1441        );
1442
1443        Ok(self
1444            .query(query)
1445            .await?
1446            .da_compressed_block
1447            .map(|b| b.bytes.into()))
1448    }
1449
1450    /// Retrieve a blob by its ID
1451    pub async fn blob(&self, id: BlobId) -> io::Result<Option<types::Blob>> {
1452        let query = schema::blob::BlobByIdQuery::build(BlobByIdArgs { id: id.into() });
1453        let blob = self.query(query).await?.blob.map(Into::into);
1454        Ok(blob)
1455    }
1456
1457    /// Check whether a blob with ID exists
1458    pub async fn blob_exists(&self, id: BlobId) -> io::Result<bool> {
1459        let query = schema::blob::BlobExistsQuery::build(BlobByIdArgs { id: id.into() });
1460        Ok(self.query(query).await?.blob.is_some())
1461    }
1462
1463    /// Retrieve multiple blocks
1464    pub async fn blocks(
1465        &self,
1466        request: PaginationRequest<String>,
1467    ) -> io::Result<PaginatedResult<types::Block, String>> {
1468        let args = schema::ConnectionArgs::from(request);
1469        let query = schema::block::BlocksQuery::build(args);
1470
1471        let blocks = self.query(query).await?.blocks.try_into()?;
1472
1473        Ok(blocks)
1474    }
1475
1476    pub async fn coin(&self, id: &UtxoId) -> io::Result<Option<types::Coin>> {
1477        let query = schema::coins::CoinByIdQuery::build(CoinByIdArgs {
1478            utxo_id: (*id).into(),
1479        });
1480        let coin = self.query(query).await?.coin.map(Into::into);
1481        Ok(coin)
1482    }
1483
1484    /// Retrieve a page of coins by their owner
1485    pub async fn coins(
1486        &self,
1487        owner: &Address,
1488        asset_id: Option<&AssetId>,
1489        request: PaginationRequest<String>,
1490    ) -> io::Result<PaginatedResult<types::Coin, String>> {
1491        let owner: schema::Address = (*owner).into();
1492        let asset_id: schema::AssetId = match asset_id {
1493            Some(asset_id) => (*asset_id).into(),
1494            None => schema::AssetId::default(),
1495        };
1496        let args = CoinsConnectionArgs::from((owner, asset_id, request));
1497        let query = schema::coins::CoinsQuery::build(args);
1498
1499        let coins = self.query(query).await?.coins.into();
1500        Ok(coins)
1501    }
1502
1503    /// Retrieve coins to spend in a transaction
1504    pub async fn coins_to_spend(
1505        &self,
1506        owner: &Address,
1507        spend_query: Vec<(AssetId, u128, Option<u16>)>,
1508        // (Utxos, Messages Nonce)
1509        excluded_ids: Option<(Vec<UtxoId>, Vec<Nonce>)>,
1510    ) -> io::Result<Vec<Vec<types::CoinType>>> {
1511        let owner: schema::Address = (*owner).into();
1512        let spend_query: Vec<SpendQueryElementInput> = spend_query
1513            .iter()
1514            .map(|(asset_id, amount, max)| -> Result<_, ConversionError> {
1515                Ok(SpendQueryElementInput {
1516                    asset_id: (*asset_id).into(),
1517                    amount: (*amount).into(),
1518                    max: (*max).map(|max| max.into()),
1519                })
1520            })
1521            .try_collect()?;
1522        let excluded_ids: Option<ExcludeInput> = excluded_ids.map(Into::into);
1523        let args =
1524            schema::coins::CoinsToSpendArgs::from((owner, spend_query, excluded_ids));
1525        let query = schema::coins::CoinsToSpendQuery::build(args);
1526
1527        let coins_per_asset = self
1528            .query(query)
1529            .await?
1530            .coins_to_spend
1531            .into_iter()
1532            .map(|v| v.into_iter().map(Into::into).collect::<Vec<_>>())
1533            .collect::<Vec<_>>();
1534        Ok(coins_per_asset)
1535    }
1536
1537    pub async fn contract(&self, id: &ContractId) -> io::Result<Option<types::Contract>> {
1538        let query = schema::contract::ContractByIdQuery::build(ContractByIdArgs {
1539            id: (*id).into(),
1540        });
1541        let contract = self.query(query).await?.contract.map(Into::into);
1542        Ok(contract)
1543    }
1544
1545    pub async fn contract_balance(
1546        &self,
1547        id: &ContractId,
1548        asset: Option<&AssetId>,
1549    ) -> io::Result<u64> {
1550        let asset_id: schema::AssetId = match asset {
1551            Some(asset) => (*asset).into(),
1552            None => schema::AssetId::default(),
1553        };
1554
1555        let query =
1556            schema::contract::ContractBalanceQuery::build(ContractBalanceQueryArgs {
1557                id: (*id).into(),
1558                asset: asset_id,
1559            });
1560
1561        let balance: types::ContractBalance =
1562            self.query(query).await?.contract_balance.into();
1563        Ok(balance.amount)
1564    }
1565
1566    pub async fn balance(
1567        &self,
1568        owner: &Address,
1569        asset_id: Option<&AssetId>,
1570    ) -> io::Result<u64> {
1571        let owner: schema::Address = (*owner).into();
1572        let asset_id: schema::AssetId = match asset_id {
1573            Some(asset_id) => (*asset_id).into(),
1574            None => schema::AssetId::default(),
1575        };
1576        let query = schema::balance::BalanceQuery::build(BalanceArgs { owner, asset_id });
1577        let balance: types::Balance = self.query(query).await?.balance.into();
1578        Ok(balance.amount.try_into().unwrap_or(u64::MAX))
1579    }
1580
1581    // Retrieve a page of balances by their owner
1582    pub async fn balances(
1583        &self,
1584        owner: &Address,
1585        request: PaginationRequest<String>,
1586    ) -> io::Result<PaginatedResult<types::Balance, String>> {
1587        let owner: schema::Address = (*owner).into();
1588        let args = schema::balance::BalancesConnectionArgs::from((owner, request));
1589        let query = schema::balance::BalancesQuery::build(args);
1590
1591        let balances = self.query(query).await?.balances.into();
1592        Ok(balances)
1593    }
1594
1595    pub async fn contract_balances(
1596        &self,
1597        contract: &ContractId,
1598        request: PaginationRequest<String>,
1599    ) -> io::Result<PaginatedResult<types::ContractBalance, String>> {
1600        let contract_id: schema::ContractId = (*contract).into();
1601        let args = ContractBalancesConnectionArgs::from((contract_id, request));
1602        let query = schema::contract::ContractBalancesQuery::build(args);
1603
1604        let balances = self.query(query).await?.contract_balances.into();
1605
1606        Ok(balances)
1607    }
1608
1609    // Retrieve a message by its nonce
1610    pub async fn message(&self, nonce: &Nonce) -> io::Result<Option<types::Message>> {
1611        let query = schema::message::MessageQuery::build(NonceArgs {
1612            nonce: (*nonce).into(),
1613        });
1614        let message = self.query(query).await?.message.map(Into::into);
1615        Ok(message)
1616    }
1617
1618    pub async fn messages(
1619        &self,
1620        owner: Option<&Address>,
1621        request: PaginationRequest<String>,
1622    ) -> io::Result<PaginatedResult<types::Message, String>> {
1623        let owner: Option<schema::Address> = owner.map(|owner| (*owner).into());
1624        let args = schema::message::OwnedMessagesConnectionArgs::from((owner, request));
1625        let query = schema::message::OwnedMessageQuery::build(args);
1626
1627        let messages = self.query(query).await?.messages.into();
1628
1629        Ok(messages)
1630    }
1631
1632    pub async fn contract_info(
1633        &self,
1634        contract: &ContractId,
1635    ) -> io::Result<Option<types::Contract>> {
1636        let query = schema::contract::ContractByIdQuery::build(ContractByIdArgs {
1637            id: (*contract).into(),
1638        });
1639        let contract_info = self.query(query).await?.contract.map(Into::into);
1640        Ok(contract_info)
1641    }
1642
1643    pub async fn message_status(&self, nonce: &Nonce) -> io::Result<MessageStatus> {
1644        let query = schema::message::MessageStatusQuery::build(MessageStatusArgs {
1645            nonce: (*nonce).into(),
1646        });
1647        let status = self.query(query).await?.message_status.into();
1648
1649        Ok(status)
1650    }
1651
1652    /// Request a merkle proof of an output message.
1653    pub async fn message_proof(
1654        &self,
1655        transaction_id: &TxId,
1656        nonce: &Nonce,
1657        commit_block_id: Option<&BlockId>,
1658        commit_block_height: Option<BlockHeight>,
1659    ) -> io::Result<types::MessageProof> {
1660        let transaction_id: TransactionId = (*transaction_id).into();
1661        let nonce: schema::Nonce = (*nonce).into();
1662        let commit_block_id: Option<schema::BlockId> =
1663            commit_block_id.map(|commit_block_id| (*commit_block_id).into());
1664        let commit_block_height = commit_block_height.map(Into::into);
1665        let query = schema::message::MessageProofQuery::build(MessageProofArgs {
1666            transaction_id,
1667            nonce,
1668            commit_block_id,
1669            commit_block_height,
1670        });
1671        let proof = self.query(query).await?.message_proof.try_into()?;
1672        Ok(proof)
1673    }
1674
1675    pub async fn relayed_transaction_status(
1676        &self,
1677        id: &Bytes32,
1678    ) -> io::Result<Option<RelayedTransactionStatus>> {
1679        let query = schema::relayed_tx::RelayedTransactionStatusQuery::build(
1680            RelayedTransactionStatusArgs {
1681                id: id.to_owned().into(),
1682            },
1683        );
1684        let status = self
1685            .query(query)
1686            .await?
1687            .relayed_transaction_status
1688            .map(|status| status.try_into())
1689            .transpose()?;
1690        Ok(status)
1691    }
1692
1693    pub async fn asset_info(&self, asset_id: &AssetId) -> io::Result<AssetDetail> {
1694        let query = schema::assets::AssetInfoQuery::build(AssetInfoArg {
1695            id: (*asset_id).into(),
1696        });
1697        let asset_info = self.query(query).await?.asset_details.into();
1698        Ok(asset_info)
1699    }
1700}
1701
1702#[cfg(any(test, feature = "test-helpers"))]
1703impl FuelClient {
1704    pub async fn transparent_transaction(
1705        &self,
1706        id: &TxId,
1707    ) -> io::Result<Option<types::TransactionType>> {
1708        let query = schema::tx::TransactionQuery::build(TxIdArgs { id: (*id).into() });
1709
1710        let transaction = self.query(query).await?.transaction;
1711
1712        Ok(transaction
1713            .map(|tx| {
1714                let response: TransactionResponse = tx.try_into()?;
1715                Ok::<_, ConversionError>(response.transaction)
1716            })
1717            .transpose()?)
1718    }
1719}