1#[cfg(feature = "subscriptions")]
2use crate::client::types::StatusWithTransaction;
3use crate::{
4 client::{
5 schema::{
6 block::BlockByHeightArgs,
7 coins::{
8 ExcludeInput,
9 SpendQueryElementInput,
10 },
11 contract::ContractBalanceQueryArgs,
12 gas_price::EstimateGasPrice,
13 message::MessageStatusArgs,
14 relayed_tx::RelayedTransactionStatusArgs,
15 tx::{
16 DryRunArg,
17 TxWithEstimatedPredicatesArg,
18 },
19 Tai64Timestamp,
20 TransactionId,
21 },
22 types::{
23 asset::AssetDetail,
24 gas_price::LatestGasPrice,
25 message::MessageStatus,
26 primitives::{
27 Address,
28 AssetId,
29 BlockId,
30 ContractId,
31 UtxoId,
32 },
33 upgrades::StateTransitionBytecode,
34 RelayedTransactionStatus,
35 },
36 },
37 reqwest_ext::{
38 FuelGraphQlResponse,
39 FuelOperation,
40 ReqwestExt,
41 },
42};
43use anyhow::Context;
44#[cfg(feature = "subscriptions")]
45use base64::prelude::{
46 Engine as _,
47 BASE64_STANDARD,
48};
49#[cfg(feature = "subscriptions")]
50use cynic::StreamingOperation;
51use cynic::{
52 Id,
53 MutationBuilder,
54 Operation,
55 QueryBuilder,
56};
57use fuel_core_types::{
58 blockchain::header::{
59 ConsensusParametersVersion,
60 StateTransitionBytecodeVersion,
61 },
62 fuel_asm::{
63 Instruction,
64 Word,
65 },
66 fuel_tx::{
67 BlobId,
68 Bytes32,
69 ConsensusParameters,
70 Receipt,
71 Transaction,
72 TxId,
73 },
74 fuel_types::{
75 self,
76 canonical::Serialize,
77 BlockHeight,
78 Nonce,
79 },
80 services::executor::{
81 StorageReadReplayEvent,
82 TransactionExecutionStatus,
83 },
84};
85#[cfg(feature = "subscriptions")]
86use futures::{
87 Stream,
88 StreamExt,
89};
90use itertools::Itertools;
91use pagination::{
92 PageDirection,
93 PaginatedResult,
94 PaginationRequest,
95};
96use schema::{
97 assets::AssetInfoArg,
98 balance::BalanceArgs,
99 blob::BlobByIdArgs,
100 block::BlockByIdArgs,
101 coins::{
102 CoinByIdArgs,
103 CoinsConnectionArgs,
104 },
105 contract::{
106 ContractBalancesConnectionArgs,
107 ContractByIdArgs,
108 },
109 da_compressed::DaCompressedBlockByHeightArgs,
110 gas_price::BlockHorizonArgs,
111 storage_read_replay::{
112 StorageReadReplay,
113 StorageReadReplayArgs,
114 },
115 tx::{
116 AssembleTxArg,
117 TransactionsByOwnerConnectionArgs,
118 TxArg,
119 TxIdArgs,
120 },
121 Bytes,
122 ContinueTx,
123 ContinueTxArgs,
124 ConversionError,
125 HexString,
126 IdArg,
127 MemoryArgs,
128 RegisterArgs,
129 RunResult,
130 SetBreakpoint,
131 SetBreakpointArgs,
132 SetSingleStepping,
133 SetSingleSteppingArgs,
134 StartTx,
135 StartTxArgs,
136 U32,
137 U64,
138};
139#[cfg(feature = "subscriptions")]
140use std::future;
141use std::{
142 convert::TryInto,
143 io::{
144 self,
145 ErrorKind,
146 },
147 net,
148 str::{
149 self,
150 FromStr,
151 },
152 sync::{
153 Arc,
154 Mutex,
155 },
156};
157use tai64::Tai64;
158use tracing as _;
159use types::{
160 assemble_tx::{
161 AssembleTransactionResult,
162 RequiredBalance,
163 },
164 TransactionResponse,
165 TransactionStatus,
166};
167
168use self::schema::{
169 block::ProduceBlockArgs,
170 message::{
171 MessageProofArgs,
172 NonceArgs,
173 },
174};
175
176pub mod pagination;
177pub mod schema;
178pub mod types;
179
180type RegisterId = u32;
181
182#[derive(Debug, derive_more::Display, derive_more::From)]
183#[non_exhaustive]
184pub enum Error {
188 #[from]
190 Other(anyhow::Error),
191}
192
193#[derive(Debug)]
196pub enum ConsistencyPolicy {
197 Auto {
201 height: Arc<Mutex<Option<BlockHeight>>>,
203 },
204 Manual {
207 height: Option<BlockHeight>,
209 },
210}
211
212impl Clone for ConsistencyPolicy {
213 fn clone(&self) -> Self {
214 match self {
215 Self::Auto { height } => Self::Auto {
216 height: Arc::new(Mutex::new(height.lock().ok().and_then(|h| *h))),
219 },
220 Self::Manual { height } => Self::Manual { height: *height },
221 }
222 }
223}
224
225#[derive(Debug, Default)]
226struct ChainStateInfo {
227 current_stf_version: Arc<Mutex<Option<StateTransitionBytecodeVersion>>>,
228 current_consensus_parameters_version: Arc<Mutex<Option<ConsensusParametersVersion>>>,
229}
230
231impl Clone for ChainStateInfo {
232 fn clone(&self) -> Self {
233 Self {
234 current_stf_version: Arc::new(Mutex::new(
235 self.current_stf_version.lock().ok().and_then(|v| *v),
236 )),
237 current_consensus_parameters_version: Arc::new(Mutex::new(
238 self.current_consensus_parameters_version
239 .lock()
240 .ok()
241 .and_then(|v| *v),
242 )),
243 }
244 }
245}
246
247#[derive(Debug, Clone)]
248pub struct FuelClient {
249 client: reqwest::Client,
250 #[cfg(feature = "subscriptions")]
251 cookie: std::sync::Arc<reqwest::cookie::Jar>,
252 url: reqwest::Url,
253 require_height: ConsistencyPolicy,
254 chain_state_info: ChainStateInfo,
255}
256
257impl FromStr for FuelClient {
258 type Err = anyhow::Error;
259
260 fn from_str(str: &str) -> Result<Self, Self::Err> {
261 let mut raw_url = str.to_string();
262 if !raw_url.starts_with("http") {
263 raw_url = format!("http://{raw_url}");
264 }
265
266 let mut url = reqwest::Url::parse(&raw_url)
267 .map_err(anyhow::Error::msg)
268 .with_context(|| format!("Invalid fuel-core URL: {str}"))?;
269 url.set_path("/v1/graphql");
270
271 #[cfg(feature = "subscriptions")]
272 {
273 let cookie = std::sync::Arc::new(reqwest::cookie::Jar::default());
274 let client = reqwest::Client::builder()
275 .cookie_provider(cookie.clone())
276 .build()
277 .map_err(anyhow::Error::msg)?;
278 Ok(Self {
279 client,
280 cookie,
281 url,
282 require_height: ConsistencyPolicy::Auto {
283 height: Arc::new(Mutex::new(None)),
284 },
285 chain_state_info: Default::default(),
286 })
287 }
288
289 #[cfg(not(feature = "subscriptions"))]
290 {
291 let client = reqwest::Client::new();
292 Ok(Self {
293 client,
294 url,
295 require_height: ConsistencyPolicy::Auto {
296 height: Arc::new(Mutex::new(None)),
297 },
298 chain_state_info: Default::default(),
299 })
300 }
301 }
302}
303
304impl<S> From<S> for FuelClient
305where
306 S: Into<net::SocketAddr>,
307{
308 fn from(socket: S) -> Self {
309 format!("http://{}", socket.into())
310 .as_str()
311 .parse()
312 .unwrap()
313 }
314}
315
316pub fn from_strings_errors_to_std_error(errors: Vec<String>) -> io::Error {
317 let e = errors
318 .into_iter()
319 .fold(String::from("Response errors"), |mut s, e| {
320 s.push_str("; ");
321 s.push_str(e.as_str());
322 s
323 });
324 io::Error::new(io::ErrorKind::Other, e)
325}
326
327impl FuelClient {
328 pub fn new(url: impl AsRef<str>) -> anyhow::Result<Self> {
329 Self::from_str(url.as_ref())
330 }
331
332 pub fn with_required_fuel_block_height(
333 &mut self,
334 new_height: Option<BlockHeight>,
335 ) -> &mut Self {
336 match &mut self.require_height {
337 ConsistencyPolicy::Auto { height } => {
338 *height.lock().expect("Mutex poisoned") = new_height;
339 }
340 ConsistencyPolicy::Manual { height } => {
341 *height = new_height;
342 }
343 }
344 self
345 }
346
347 pub fn use_manual_consistency_policy(
348 &mut self,
349 height: Option<BlockHeight>,
350 ) -> &mut Self {
351 self.require_height = ConsistencyPolicy::Manual { height };
352 self
353 }
354
355 pub fn required_block_height(&self) -> Option<BlockHeight> {
356 match &self.require_height {
357 ConsistencyPolicy::Auto { height } => height.lock().ok().and_then(|h| *h),
358 ConsistencyPolicy::Manual { height } => *height,
359 }
360 }
361
362 fn update_chain_state_info<R, E>(&self, response: &FuelGraphQlResponse<R, E>) {
363 if let Some(current_sft_version) = response
364 .extensions
365 .as_ref()
366 .and_then(|e| e.current_stf_version)
367 {
368 if let Ok(mut c) = self.chain_state_info.current_stf_version.lock() {
369 *c = Some(current_sft_version);
370 }
371 }
372
373 if let Some(current_consensus_parameters_version) = response
374 .extensions
375 .as_ref()
376 .and_then(|e| e.current_consensus_parameters_version)
377 {
378 if let Ok(mut c) = self
379 .chain_state_info
380 .current_consensus_parameters_version
381 .lock()
382 {
383 *c = Some(current_consensus_parameters_version);
384 }
385 }
386
387 let inner_required_height = match &self.require_height {
388 ConsistencyPolicy::Auto { height } => Some(height.clone()),
389 ConsistencyPolicy::Manual { .. } => None,
390 };
391
392 if let Some(inner_required_height) = inner_required_height {
393 if let Some(current_fuel_block_height) = response
394 .extensions
395 .as_ref()
396 .and_then(|e| e.current_fuel_block_height)
397 {
398 let mut lock = inner_required_height.lock().expect("Mutex poisoned");
399
400 if current_fuel_block_height >= lock.unwrap_or_default() {
401 *lock = Some(current_fuel_block_height);
402 }
403 }
404 }
405 }
406
407 pub async fn query<ResponseData, Vars>(
409 &self,
410 q: Operation<ResponseData, Vars>,
411 ) -> io::Result<ResponseData>
412 where
413 Vars: serde::Serialize,
414 ResponseData: serde::de::DeserializeOwned + 'static,
415 {
416 let required_fuel_block_height = self.required_block_height();
417 let fuel_operation = FuelOperation::new(q, required_fuel_block_height);
418 let response = self
419 .client
420 .post(self.url.clone())
421 .run_fuel_graphql(fuel_operation)
422 .await
423 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
424
425 self.decode_response(response)
426 }
427
428 fn decode_response<R, E>(&self, response: FuelGraphQlResponse<R, E>) -> io::Result<R>
429 where
430 R: serde::de::DeserializeOwned + 'static,
431 {
432 self.update_chain_state_info(&response);
433
434 if let Some(failed) = response
435 .extensions
436 .as_ref()
437 .and_then(|e| e.fuel_block_height_precondition_failed)
438 {
439 if failed {
440 return Err(io::Error::new(
441 io::ErrorKind::Other,
442 "The required block height was not met",
443 ));
444 }
445 }
446
447 let response = response.response;
448
449 match (response.data, response.errors) {
450 (Some(d), _) => Ok(d),
451 (_, Some(e)) => Err(from_strings_errors_to_std_error(
452 e.into_iter().map(|e| e.message).collect(),
453 )),
454 _ => Err(io::Error::new(io::ErrorKind::Other, "Invalid response")),
455 }
456 }
457
458 #[tracing::instrument(skip_all)]
459 #[cfg(feature = "subscriptions")]
460 async fn subscribe<ResponseData, Vars>(
461 &self,
462 q: StreamingOperation<ResponseData, Vars>,
463 ) -> io::Result<impl futures::Stream<Item = io::Result<ResponseData>> + '_>
464 where
465 Vars: serde::Serialize,
466 ResponseData: serde::de::DeserializeOwned + 'static,
467 {
468 use core::ops::Deref;
469 use eventsource_client as es;
470 use hyper_rustls as _;
471 use reqwest::cookie::CookieStore;
472 let mut url = self.url.clone();
473 url.set_path("/v1/graphql-sub");
474
475 let required_fuel_block_height = self.required_block_height();
476 let fuel_operation = FuelOperation::new(q, required_fuel_block_height);
477
478 let json_query = serde_json::to_string(&fuel_operation)?;
479 let mut client_builder = es::ClientBuilder::for_url(url.as_str())
480 .map_err(|e| {
481 io::Error::new(
482 io::ErrorKind::Other,
483 format!("Failed to start client {e:?}"),
484 )
485 })?
486 .body(json_query)
487 .method("POST".to_string())
488 .header("content-type", "application/json")
489 .map_err(|e| {
490 io::Error::new(
491 io::ErrorKind::Other,
492 format!("Failed to add header to client {e:?}"),
493 )
494 })?;
495 if let Some(password) = url.password() {
496 let username = url.username();
497 let credentials = format!("{}:{}", username, password);
498 let authorization = format!("Basic {}", BASE64_STANDARD.encode(credentials));
499 client_builder = client_builder
500 .header("Authorization", &authorization)
501 .map_err(|e| {
502 io::Error::new(
503 io::ErrorKind::Other,
504 format!("Failed to add header to client {e:?}"),
505 )
506 })?;
507 }
508
509 if let Some(value) = self.cookie.deref().cookies(&self.url) {
510 let value = value.to_str().map_err(|e| {
511 io::Error::new(
512 io::ErrorKind::Other,
513 format!("Unable convert header value to string {e:?}"),
514 )
515 })?;
516 client_builder = client_builder
517 .header(reqwest::header::COOKIE.as_str(), value)
518 .map_err(|e| {
519 io::Error::new(
520 io::ErrorKind::Other,
521 format!("Failed to add header from `reqwest` to client {e:?}"),
522 )
523 })?;
524 }
525
526 let client = client_builder.build_with_conn(
527 hyper_rustls::HttpsConnectorBuilder::new()
528 .with_webpki_roots()
529 .https_or_http()
530 .enable_http1()
531 .build(),
532 );
533
534 let mut last = None;
535
536 let stream = es::Client::stream(&client)
537 .take_while(|result| {
538 futures::future::ready(!matches!(result, Err(es::Error::Eof)))
539 })
540 .filter_map(move |result| {
541 tracing::debug!("Got result: {result:?}");
542 let r = match result {
543 Ok(es::SSE::Event(es::Event { data, .. })) => {
544 match serde_json::from_str::<FuelGraphQlResponse<ResponseData>>(
545 &data,
546 ) {
547 Ok(resp) => {
548 match self.decode_response(resp) {
549 Ok(resp) => {
550 match last.replace(data) {
551 Some(l)
553 if l == *last.as_ref().expect(
554 "Safe because of the replace above",
555 ) =>
556 {
557 None
558 }
559 _ => Some(Ok(resp)),
560 }
561 }
562 Err(e) => Some(Err(io::Error::new(
563 io::ErrorKind::Other,
564 format!("Decode error: {e:?}"),
565 ))),
566 }
567 }
568 Err(e) => Some(Err(io::Error::new(
569 io::ErrorKind::Other,
570 format!("Json error: {e:?}"),
571 ))),
572 }
573 }
574 Ok(_) => None,
575 Err(e) => Some(Err(io::Error::new(
576 io::ErrorKind::Other,
577 format!("Graphql error: {e:?}"),
578 ))),
579 };
580 futures::future::ready(r)
581 });
582
583 Ok(stream)
584 }
585
586 pub fn latest_stf_version(&self) -> Option<StateTransitionBytecodeVersion> {
587 self.chain_state_info
588 .current_stf_version
589 .lock()
590 .ok()
591 .and_then(|value| *value)
592 }
593
594 pub fn latest_consensus_parameters_version(
595 &self,
596 ) -> Option<ConsensusParametersVersion> {
597 self.chain_state_info
598 .current_consensus_parameters_version
599 .lock()
600 .ok()
601 .and_then(|value| *value)
602 }
603
604 pub async fn health(&self) -> io::Result<bool> {
605 let query = schema::Health::build(());
606 self.query(query).await.map(|r| r.health)
607 }
608
609 pub async fn node_info(&self) -> io::Result<types::NodeInfo> {
610 let query = schema::node_info::QueryNodeInfo::build(());
611 self.query(query).await.map(|r| r.node_info.into())
612 }
613
614 pub async fn latest_gas_price(&self) -> io::Result<LatestGasPrice> {
615 let query = schema::gas_price::QueryLatestGasPrice::build(());
616 self.query(query).await.map(|r| r.latest_gas_price.into())
617 }
618
619 pub async fn estimate_gas_price(
620 &self,
621 block_horizon: u32,
622 ) -> io::Result<EstimateGasPrice> {
623 let args = BlockHorizonArgs {
624 block_horizon: Some(block_horizon.into()),
625 };
626 let query = schema::gas_price::QueryEstimateGasPrice::build(args);
627 self.query(query).await.map(|r| r.estimate_gas_price)
628 }
629
630 #[cfg(feature = "std")]
631 pub async fn connected_peers_info(
632 &self,
633 ) -> io::Result<Vec<fuel_core_types::services::p2p::PeerInfo>> {
634 let query = schema::node_info::QueryPeersInfo::build(());
635 self.query(query)
636 .await
637 .map(|r| r.node_info.peers.into_iter().map(Into::into).collect())
638 }
639
640 pub async fn chain_info(&self) -> io::Result<types::ChainInfo> {
641 let query = schema::chain::ChainQuery::build(());
642 self.query(query).await.and_then(|r| {
643 let result = r.chain.try_into()?;
644 Ok(result)
645 })
646 }
647
648 pub async fn consensus_parameters(
649 &self,
650 version: i32,
651 ) -> io::Result<Option<ConsensusParameters>> {
652 let args = schema::upgrades::ConsensusParametersByVersionArgs { version };
653 let query = schema::upgrades::ConsensusParametersByVersionQuery::build(args);
654
655 let result = self
656 .query(query)
657 .await?
658 .consensus_parameters
659 .map(TryInto::try_into)
660 .transpose()?;
661
662 Ok(result)
663 }
664
665 pub async fn state_transition_byte_code_by_version(
666 &self,
667 version: i32,
668 ) -> io::Result<Option<StateTransitionBytecode>> {
669 let args = schema::upgrades::StateTransitionBytecodeByVersionArgs { version };
670 let query = schema::upgrades::StateTransitionBytecodeByVersionQuery::build(args);
671
672 let result = self
673 .query(query)
674 .await?
675 .state_transition_bytecode_by_version
676 .map(TryInto::try_into)
677 .transpose()?;
678
679 Ok(result)
680 }
681
682 pub async fn state_transition_byte_code_by_root(
683 &self,
684 root: Bytes32,
685 ) -> io::Result<Option<StateTransitionBytecode>> {
686 let args = schema::upgrades::StateTransitionBytecodeByRootArgs {
687 root: HexString(Bytes(root.to_vec())),
688 };
689 let query = schema::upgrades::StateTransitionBytecodeByRootQuery::build(args);
690
691 let result = self
692 .query(query)
693 .await?
694 .state_transition_bytecode_by_root
695 .map(TryInto::try_into)
696 .transpose()?;
697
698 Ok(result)
699 }
700
701 pub async fn dry_run(
703 &self,
704 txs: &[Transaction],
705 ) -> io::Result<Vec<TransactionExecutionStatus>> {
706 self.dry_run_opt(txs, None, None, None).await
707 }
708
709 pub async fn dry_run_opt(
711 &self,
712 txs: &[Transaction],
713 utxo_validation: Option<bool>,
715 gas_price: Option<u64>,
716 at_height: Option<BlockHeight>,
717 ) -> io::Result<Vec<TransactionExecutionStatus>> {
718 let txs = txs
719 .iter()
720 .map(|tx| HexString(Bytes(tx.to_bytes())))
721 .collect::<Vec<HexString>>();
722 let query: Operation<schema::tx::DryRun, DryRunArg> =
723 schema::tx::DryRun::build(DryRunArg {
724 txs,
725 utxo_validation,
726 gas_price: gas_price.map(|gp| gp.into()),
727 block_height: at_height.map(|bh| bh.into()),
728 });
729 let tx_statuses = self.query(query).await.map(|r| r.dry_run)?;
730 tx_statuses
731 .into_iter()
732 .map(|tx_status| tx_status.try_into().map_err(Into::into))
733 .collect()
734 }
735
736 pub async fn dry_run_opt_record_storage_reads(
738 &self,
739 txs: &[Transaction],
740 utxo_validation: Option<bool>,
742 gas_price: Option<u64>,
743 at_height: Option<BlockHeight>,
744 ) -> io::Result<(Vec<TransactionExecutionStatus>, Vec<StorageReadReplayEvent>)> {
745 let txs = txs
746 .iter()
747 .map(|tx| HexString(Bytes(tx.to_bytes())))
748 .collect::<Vec<HexString>>();
749 let query: Operation<schema::tx::DryRunRecordStorageReads, DryRunArg> =
750 schema::tx::DryRunRecordStorageReads::build(DryRunArg {
751 txs,
752 utxo_validation,
753 gas_price: gas_price.map(|gp| gp.into()),
754 block_height: at_height.map(|bh| bh.into()),
755 });
756 let result = self
757 .query(query)
758 .await
759 .map(|r| r.dry_run_record_storage_reads)?;
760 let tx_statuses = result
761 .tx_statuses
762 .into_iter()
763 .map(|tx_status| tx_status.try_into().map_err(Into::into))
764 .collect::<io::Result<Vec<_>>>()?;
765 let storage_reads = result
766 .storage_reads
767 .into_iter()
768 .map(Into::into)
769 .collect::<Vec<_>>();
770 Ok((tx_statuses, storage_reads))
771 }
772
773 pub async fn storage_read_replay(
775 &self,
776 height: &BlockHeight,
777 ) -> io::Result<Vec<StorageReadReplayEvent>> {
778 let query: Operation<StorageReadReplay, StorageReadReplayArgs> =
779 StorageReadReplay::build(StorageReadReplayArgs {
780 height: (*height).into(),
781 });
782 Ok(self
783 .query(query)
784 .await
785 .map(|r| r.storage_read_replay)?
786 .into_iter()
787 .map(Into::into)
788 .collect())
789 }
790
791 #[allow(clippy::too_many_arguments)]
814 pub async fn assemble_tx(
815 &self,
816 tx: &Transaction,
817 block_horizon: u32,
818 required_balances: Vec<RequiredBalance>,
819 fee_address_index: u16,
820 exclude: Option<(Vec<UtxoId>, Vec<Nonce>)>,
821 estimate_predicates: bool,
822 reserve_gas: Option<u64>,
823 ) -> io::Result<AssembleTransactionResult> {
824 let tx = HexString(Bytes(tx.to_bytes()));
825 let block_horizon = block_horizon.into();
826
827 let required_balances: Vec<_> = required_balances
828 .into_iter()
829 .map(schema::tx::RequiredBalance::try_from)
830 .collect::<Result<Vec<_>, _>>()?;
831
832 let fee_address_index = fee_address_index.into();
833
834 let exclude_input = exclude.map(Into::into);
835
836 let reserve_gas = reserve_gas.map(U64::from);
837
838 let query_arg = AssembleTxArg {
839 tx,
840 block_horizon,
841 required_balances,
842 fee_address_index,
843 exclude_input,
844 estimate_predicates,
845 reserve_gas,
846 };
847
848 let query = schema::tx::AssembleTx::build(query_arg);
849 let assemble_tx_result = self.query(query).await.map(|r| r.assemble_tx)?;
850 Ok(assemble_tx_result.try_into()?)
851 }
852
853 pub async fn estimate_predicates(&self, tx: &mut Transaction) -> io::Result<()> {
855 let serialized_tx = tx.to_bytes();
856 let query = schema::tx::EstimatePredicates::build(TxArg {
857 tx: HexString(Bytes(serialized_tx)),
858 });
859 let tx_with_predicate = self.query(query).await.map(|r| r.estimate_predicates)?;
860 let tx_with_predicate: Transaction = tx_with_predicate.try_into()?;
861 *tx = tx_with_predicate;
862 Ok(())
863 }
864
865 pub async fn submit(
866 &self,
867 tx: &Transaction,
868 ) -> io::Result<types::primitives::TransactionId> {
869 self.submit_opt(tx, None).await
870 }
871
872 pub async fn submit_opt(
873 &self,
874 tx: &Transaction,
875 estimate_predicates: Option<bool>,
876 ) -> io::Result<types::primitives::TransactionId> {
877 let tx = tx.clone().to_bytes();
878 let query = schema::tx::Submit::build(TxWithEstimatedPredicatesArg {
879 tx: HexString(Bytes(tx)),
880 estimate_predicates,
881 });
882
883 let id = self.query(query).await.map(|r| r.submit)?.id.into();
884 Ok(id)
885 }
886
887 #[cfg(feature = "subscriptions")]
889 pub async fn submit_and_await_commit(
890 &self,
891 tx: &Transaction,
892 ) -> io::Result<TransactionStatus> {
893 self.submit_and_await_commit_opt(tx, None).await
894 }
895
896 #[cfg(feature = "subscriptions")]
905 pub async fn submit_and_await_commit_opt(
906 &self,
907 tx: &Transaction,
908 estimate_predicates: Option<bool>,
909 ) -> io::Result<TransactionStatus> {
910 use cynic::SubscriptionBuilder;
911 let tx = tx.clone().to_bytes();
912 let s =
913 schema::tx::SubmitAndAwaitSubscription::build(TxWithEstimatedPredicatesArg {
914 tx: HexString(Bytes(tx)),
915 estimate_predicates,
916 });
917
918 let mut stream = self.subscribe(s).await?.map(
919 |r: io::Result<schema::tx::SubmitAndAwaitSubscription>| {
920 let status: TransactionStatus = r?.submit_and_await.try_into()?;
921 Result::<_, io::Error>::Ok(status)
922 },
923 );
924
925 let status = stream.next().await.ok_or(io::Error::new(
926 io::ErrorKind::Other,
927 "Failed to get status from the submission",
928 ))??;
929
930 Ok(status)
931 }
932
933 #[cfg(feature = "subscriptions")]
935 pub async fn submit_and_await_commit_with_tx(
936 &self,
937 tx: &Transaction,
938 ) -> io::Result<StatusWithTransaction> {
939 self.submit_and_await_commit_with_tx_opt(tx, None).await
940 }
941
942 #[cfg(feature = "subscriptions")]
944 pub async fn submit_and_await_commit_with_tx_opt(
945 &self,
946 tx: &Transaction,
947 estimate_predicates: Option<bool>,
948 ) -> io::Result<StatusWithTransaction> {
949 use cynic::SubscriptionBuilder;
950 let tx = tx.clone().to_bytes();
951 let s = schema::tx::SubmitAndAwaitSubscriptionWithTransaction::build(
952 TxWithEstimatedPredicatesArg {
953 tx: HexString(Bytes(tx)),
954 estimate_predicates,
955 },
956 );
957
958 let mut stream = self.subscribe(s).await?.map(
959 |r: io::Result<schema::tx::SubmitAndAwaitSubscriptionWithTransaction>| {
960 let status: StatusWithTransaction = r?.submit_and_await.try_into()?;
961 Result::<_, io::Error>::Ok(status)
962 },
963 );
964
965 let status = stream.next().await.ok_or(io::Error::new(
966 io::ErrorKind::Other,
967 "Failed to get status from the submission",
968 ))??;
969
970 Ok(status)
971 }
972
973 #[cfg(feature = "subscriptions")]
975 pub async fn submit_and_await_status<'a>(
976 &'a self,
977 tx: &'a Transaction,
978 ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + 'a> {
979 self.submit_and_await_status_opt(tx, None, None).await
980 }
981
982 #[cfg(feature = "subscriptions")]
984 pub async fn submit_and_await_status_opt<'a>(
985 &'a self,
986 tx: &'a Transaction,
987 estimate_predicates: Option<bool>,
988 include_preconfirmation: Option<bool>,
989 ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + 'a> {
990 use cynic::SubscriptionBuilder;
991 use schema::tx::SubmitAndAwaitStatusArg;
992 let tx = tx.clone().to_bytes();
993 let s = schema::tx::SubmitAndAwaitStatusSubscription::build(
994 SubmitAndAwaitStatusArg {
995 tx: HexString(Bytes(tx)),
996 estimate_predicates,
997 include_preconfirmation,
998 },
999 );
1000
1001 let stream = self.subscribe(s).await?.map(
1002 |r: io::Result<schema::tx::SubmitAndAwaitStatusSubscription>| {
1003 let status: TransactionStatus = r?.submit_and_await_status.try_into()?;
1004 Result::<_, io::Error>::Ok(status)
1005 },
1006 );
1007
1008 Ok(stream)
1009 }
1010
1011 #[cfg(feature = "subscriptions")]
1013 pub async fn contract_storage_slots<'a>(
1014 &'a self,
1015 contract_id: &'a ContractId,
1016 ) -> io::Result<impl Stream<Item = io::Result<(Bytes32, Vec<u8>)>> + 'a> {
1017 use cynic::SubscriptionBuilder;
1018 use schema::storage::ContractStorageSlotsArgs;
1019 let s = schema::storage::ContractStorageSlots::build(ContractStorageSlotsArgs {
1020 contract_id: (*contract_id).into(),
1021 });
1022
1023 let stream = self.subscribe(s).await?.map(
1024 |result: io::Result<schema::storage::ContractStorageSlots>| {
1025 let result: (Bytes32, Vec<u8>) = result?.contract_storage_slots.into();
1026 Result::<_, io::Error>::Ok(result)
1027 },
1028 );
1029
1030 Ok(stream)
1031 }
1032
1033 #[cfg(feature = "subscriptions")]
1035 pub async fn contract_storage_balances<'a>(
1036 &'a self,
1037 contract_id: &'a ContractId,
1038 ) -> io::Result<impl Stream<Item = io::Result<schema::contract::ContractBalance>> + 'a>
1039 {
1040 use cynic::SubscriptionBuilder;
1041 use schema::{
1042 contract::ContractBalance,
1043 storage::ContractStorageBalancesArgs,
1044 };
1045 let s = schema::storage::ContractStorageBalances::build(
1046 ContractStorageBalancesArgs {
1047 contract_id: (*contract_id).into(),
1048 },
1049 );
1050
1051 let stream = self.subscribe(s).await?.map(
1052 |result: io::Result<schema::storage::ContractStorageBalances>| {
1053 let result: ContractBalance = result?.contract_storage_balances;
1054 Result::<_, io::Error>::Ok(result)
1055 },
1056 );
1057
1058 Ok(stream)
1059 }
1060
1061 pub async fn contract_slots_values(
1062 &self,
1063 contract_id: &ContractId,
1064 block_height: Option<BlockHeight>,
1065 requested_storage_slots: Vec<Bytes32>,
1066 ) -> io::Result<Vec<(Bytes32, Vec<u8>)>> {
1067 let query = schema::storage::ContractSlotValues::build(
1068 schema::storage::ContractSlotValuesArgs {
1069 contract_id: (*contract_id).into(),
1070 block_height: block_height.map(|b| (*b).into()),
1071 storage_slots: requested_storage_slots
1072 .into_iter()
1073 .map(Into::into)
1074 .collect(),
1075 },
1076 );
1077
1078 self.query(query)
1079 .await
1080 .map(|r| r.contract_slot_values.into_iter().map(Into::into).collect())
1081 }
1082
1083 pub async fn contract_balance_values(
1084 &self,
1085 contract_id: &ContractId,
1086 block_height: Option<BlockHeight>,
1087 requested_storage_slots: Vec<AssetId>,
1088 ) -> io::Result<Vec<schema::contract::ContractBalance>> {
1089 let query = schema::storage::ContractBalanceValues::build(
1090 schema::storage::ContractBalanceValuesArgs {
1091 contract_id: (*contract_id).into(),
1092 block_height: block_height.map(|b| (*b).into()),
1093 assets: requested_storage_slots
1094 .into_iter()
1095 .map(Into::into)
1096 .collect(),
1097 },
1098 );
1099
1100 self.query(query).await.map(|r| {
1101 r.contract_balance_values
1102 .into_iter()
1103 .map(Into::into)
1104 .collect()
1105 })
1106 }
1107
1108 pub async fn start_session(&self) -> io::Result<String> {
1109 let query = schema::StartSession::build(());
1110
1111 self.query(query)
1112 .await
1113 .map(|r| r.start_session.into_inner())
1114 }
1115
1116 pub async fn end_session(&self, id: &str) -> io::Result<bool> {
1117 let query = schema::EndSession::build(IdArg { id: id.into() });
1118
1119 self.query(query).await.map(|r| r.end_session)
1120 }
1121
1122 pub async fn reset(&self, id: &str) -> io::Result<bool> {
1123 let query = schema::Reset::build(IdArg { id: id.into() });
1124
1125 self.query(query).await.map(|r| r.reset)
1126 }
1127
1128 pub async fn execute(&self, id: &str, op: &Instruction) -> io::Result<bool> {
1129 let op = serde_json::to_string(op)?;
1130 let query = schema::Execute::build(schema::ExecuteArgs { id: id.into(), op });
1131
1132 self.query(query).await.map(|r| r.execute)
1133 }
1134
1135 pub async fn register(&self, id: &str, register: RegisterId) -> io::Result<Word> {
1136 let query = schema::Register::build(RegisterArgs {
1137 id: id.into(),
1138 register: register.into(),
1139 });
1140
1141 Ok(self.query(query).await?.register.0 as Word)
1142 }
1143
1144 pub async fn memory(&self, id: &str, start: u32, size: u32) -> io::Result<Vec<u8>> {
1145 let query = schema::Memory::build(MemoryArgs {
1146 id: id.into(),
1147 start: start.into(),
1148 size: size.into(),
1149 });
1150
1151 let memory = self.query(query).await?.memory;
1152
1153 Ok(serde_json::from_str(memory.as_str())?)
1154 }
1155
1156 pub async fn set_breakpoint(
1157 &self,
1158 session_id: &str,
1159 contract: fuel_types::ContractId,
1160 pc: u64,
1161 ) -> io::Result<()> {
1162 let operation = SetBreakpoint::build(SetBreakpointArgs {
1163 id: Id::new(session_id),
1164 bp: schema::Breakpoint {
1165 contract: contract.into(),
1166 pc: U64(pc),
1167 },
1168 });
1169
1170 let response = self.query(operation).await?;
1171 assert!(
1172 response.set_breakpoint,
1173 "Setting breakpoint returned invalid reply"
1174 );
1175 Ok(())
1176 }
1177
1178 pub async fn set_single_stepping(
1179 &self,
1180 session_id: &str,
1181 enable: bool,
1182 ) -> io::Result<()> {
1183 let operation = SetSingleStepping::build(SetSingleSteppingArgs {
1184 id: Id::new(session_id),
1185 enable,
1186 });
1187 self.query(operation).await?;
1188 Ok(())
1189 }
1190
1191 pub async fn start_tx(
1192 &self,
1193 session_id: &str,
1194 tx: &Transaction,
1195 ) -> io::Result<RunResult> {
1196 let operation = StartTx::build(StartTxArgs {
1197 id: Id::new(session_id),
1198 tx: serde_json::to_string(tx).expect("Couldn't serialize tx to json"),
1199 });
1200 let response = self.query(operation).await?.start_tx;
1201 Ok(response)
1202 }
1203
1204 pub async fn continue_tx(&self, session_id: &str) -> io::Result<RunResult> {
1205 let operation = ContinueTx::build(ContinueTxArgs {
1206 id: Id::new(session_id),
1207 });
1208 let response = self.query(operation).await?.continue_tx;
1209 Ok(response)
1210 }
1211
1212 pub async fn transaction(
1213 &self,
1214 id: &TxId,
1215 ) -> io::Result<Option<TransactionResponse>> {
1216 let query = schema::tx::TransactionQuery::build(TxIdArgs { id: (*id).into() });
1217
1218 let transaction = self.query(query).await?.transaction;
1219
1220 Ok(transaction.map(|tx| tx.try_into()).transpose()?)
1221 }
1222
1223 pub async fn transaction_status(&self, id: &TxId) -> io::Result<TransactionStatus> {
1225 let query =
1226 schema::tx::TransactionStatusQuery::build(TxIdArgs { id: (*id).into() });
1227
1228 let status = self.query(query).await?.transaction.ok_or_else(|| {
1229 io::Error::new(
1230 ErrorKind::NotFound,
1231 format!("status not found for transaction {id} "),
1232 )
1233 })?;
1234
1235 let status = status
1236 .status
1237 .ok_or_else(|| {
1238 io::Error::new(
1239 ErrorKind::NotFound,
1240 format!("status not found for transaction {id}"),
1241 )
1242 })?
1243 .try_into()?;
1244 Ok(status)
1245 }
1246
1247 #[tracing::instrument(skip(self), level = "debug")]
1248 #[cfg(feature = "subscriptions")]
1249 pub async fn subscribe_transaction_status<'a>(
1251 &'a self,
1252 id: &'a TxId,
1253 ) -> io::Result<impl futures::Stream<Item = io::Result<TransactionStatus>> + 'a> {
1254 self.subscribe_transaction_status_opt(id, None).await
1255 }
1256
1257 #[cfg(feature = "subscriptions")]
1258 pub async fn subscribe_transaction_status_opt<'a>(
1260 &'a self,
1261 id: &'a TxId,
1262 include_preconfirmation: Option<bool>,
1263 ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + 'a> {
1264 use cynic::SubscriptionBuilder;
1265 use schema::tx::StatusChangeSubscriptionArgs;
1266 let tx_id: TransactionId = (*id).into();
1267 let s =
1268 schema::tx::StatusChangeSubscription::build(StatusChangeSubscriptionArgs {
1269 id: tx_id,
1270 include_preconfirmation,
1271 });
1272
1273 tracing::debug!("subscribing");
1274 let stream = self.subscribe(s).await?.map(|tx| {
1275 tracing::debug!("received {tx:?}");
1276 let tx = tx?;
1277 let status = tx.status_change.try_into()?;
1278 Ok(status)
1279 });
1280
1281 Ok(stream)
1282 }
1283
1284 #[cfg(feature = "subscriptions")]
1285 pub async fn await_transaction_commit(
1290 &self,
1291 id: &TxId,
1292 ) -> io::Result<TransactionStatus> {
1293 let status_result = self
1296 .subscribe_transaction_status(id)
1297 .await?
1298 .skip_while(|status| {
1299 future::ready(status.as_ref().map_or(true, |status| !status.is_final()))
1300 })
1301 .next()
1302 .await;
1303
1304 if let Some(Ok(status)) = status_result {
1305 Ok(status)
1306 } else {
1307 Err(io::Error::new(
1308 io::ErrorKind::Other,
1309 format!("Failed to get status for transaction {status_result:?}"),
1310 ))
1311 }
1312 }
1313
1314 pub async fn transactions(
1316 &self,
1317 request: PaginationRequest<String>,
1318 ) -> io::Result<PaginatedResult<TransactionResponse, String>> {
1319 let args = schema::ConnectionArgs::from(request);
1320 let query = schema::tx::TransactionsQuery::build(args);
1321 let transactions = self.query(query).await?.transactions.try_into()?;
1322 Ok(transactions)
1323 }
1324
1325 pub async fn transactions_by_owner(
1327 &self,
1328 owner: &Address,
1329 request: PaginationRequest<String>,
1330 ) -> io::Result<PaginatedResult<TransactionResponse, String>> {
1331 let owner: schema::Address = (*owner).into();
1332 let args = TransactionsByOwnerConnectionArgs::from((owner, request));
1333 let query = schema::tx::TransactionsByOwnerQuery::build(args);
1334
1335 let transactions = self.query(query).await?.transactions_by_owner.try_into()?;
1336 Ok(transactions)
1337 }
1338
1339 pub async fn receipts(&self, id: &TxId) -> io::Result<Option<Vec<Receipt>>> {
1340 let query =
1341 schema::tx::TransactionStatusQuery::build(TxIdArgs { id: (*id).into() });
1342
1343 let tx = self.query(query).await?.transaction.ok_or_else(|| {
1344 io::Error::new(ErrorKind::NotFound, format!("transaction {id} not found"))
1345 })?;
1346
1347 let receipts = match tx.status {
1348 Some(status) => match status {
1349 schema::tx::TransactionStatus::SuccessStatus(s) => Some(
1350 s.receipts
1351 .into_iter()
1352 .map(TryInto::<Receipt>::try_into)
1353 .collect::<Result<Vec<Receipt>, ConversionError>>(),
1354 )
1355 .transpose()?,
1356 schema::tx::TransactionStatus::FailureStatus(s) => Some(
1357 s.receipts
1358 .into_iter()
1359 .map(TryInto::<Receipt>::try_into)
1360 .collect::<Result<Vec<Receipt>, ConversionError>>(),
1361 )
1362 .transpose()?,
1363 _ => None,
1364 },
1365 _ => None,
1366 };
1367
1368 Ok(receipts)
1369 }
1370
1371 #[cfg(feature = "test-helpers")]
1372 pub async fn all_receipts(&self) -> io::Result<Vec<Receipt>> {
1373 let query = schema::tx::AllReceipts::build(());
1374 let receipts = self.query(query).await?.all_receipts;
1375
1376 let vec: Result<Vec<Receipt>, ConversionError> = receipts
1377 .into_iter()
1378 .map(TryInto::<Receipt>::try_into)
1379 .collect();
1380
1381 Ok(vec?)
1382 }
1383
1384 pub async fn produce_blocks(
1385 &self,
1386 blocks_to_produce: u32,
1387 start_timestamp: Option<u64>,
1388 ) -> io::Result<BlockHeight> {
1389 let query = schema::block::BlockMutation::build(ProduceBlockArgs {
1390 blocks_to_produce: blocks_to_produce.into(),
1391 start_timestamp: start_timestamp
1392 .map(|timestamp| Tai64Timestamp::from(Tai64(timestamp))),
1393 });
1394
1395 let new_height = self.query(query).await?.produce_blocks;
1396
1397 Ok(new_height.into())
1398 }
1399
1400 pub async fn block(&self, id: &BlockId) -> io::Result<Option<types::Block>> {
1401 let query = schema::block::BlockByIdQuery::build(BlockByIdArgs {
1402 id: Some((*id).into()),
1403 });
1404
1405 let block = self
1406 .query(query)
1407 .await?
1408 .block
1409 .map(TryInto::try_into)
1410 .transpose()?;
1411
1412 Ok(block)
1413 }
1414
1415 pub async fn block_by_height(
1416 &self,
1417 height: BlockHeight,
1418 ) -> io::Result<Option<types::Block>> {
1419 let query = schema::block::BlockByHeightQuery::build(BlockByHeightArgs {
1420 height: Some(U32(height.into())),
1421 });
1422
1423 let block = self
1424 .query(query)
1425 .await?
1426 .block
1427 .map(TryInto::try_into)
1428 .transpose()?;
1429
1430 Ok(block)
1431 }
1432
1433 pub async fn da_compressed_block(
1434 &self,
1435 height: BlockHeight,
1436 ) -> io::Result<Option<Vec<u8>>> {
1437 let query = schema::da_compressed::DaCompressedBlockByHeightQuery::build(
1438 DaCompressedBlockByHeightArgs {
1439 height: U32(height.into()),
1440 },
1441 );
1442
1443 Ok(self
1444 .query(query)
1445 .await?
1446 .da_compressed_block
1447 .map(|b| b.bytes.into()))
1448 }
1449
1450 pub async fn blob(&self, id: BlobId) -> io::Result<Option<types::Blob>> {
1452 let query = schema::blob::BlobByIdQuery::build(BlobByIdArgs { id: id.into() });
1453 let blob = self.query(query).await?.blob.map(Into::into);
1454 Ok(blob)
1455 }
1456
1457 pub async fn blob_exists(&self, id: BlobId) -> io::Result<bool> {
1459 let query = schema::blob::BlobExistsQuery::build(BlobByIdArgs { id: id.into() });
1460 Ok(self.query(query).await?.blob.is_some())
1461 }
1462
1463 pub async fn blocks(
1465 &self,
1466 request: PaginationRequest<String>,
1467 ) -> io::Result<PaginatedResult<types::Block, String>> {
1468 let args = schema::ConnectionArgs::from(request);
1469 let query = schema::block::BlocksQuery::build(args);
1470
1471 let blocks = self.query(query).await?.blocks.try_into()?;
1472
1473 Ok(blocks)
1474 }
1475
1476 pub async fn coin(&self, id: &UtxoId) -> io::Result<Option<types::Coin>> {
1477 let query = schema::coins::CoinByIdQuery::build(CoinByIdArgs {
1478 utxo_id: (*id).into(),
1479 });
1480 let coin = self.query(query).await?.coin.map(Into::into);
1481 Ok(coin)
1482 }
1483
1484 pub async fn coins(
1486 &self,
1487 owner: &Address,
1488 asset_id: Option<&AssetId>,
1489 request: PaginationRequest<String>,
1490 ) -> io::Result<PaginatedResult<types::Coin, String>> {
1491 let owner: schema::Address = (*owner).into();
1492 let asset_id: schema::AssetId = match asset_id {
1493 Some(asset_id) => (*asset_id).into(),
1494 None => schema::AssetId::default(),
1495 };
1496 let args = CoinsConnectionArgs::from((owner, asset_id, request));
1497 let query = schema::coins::CoinsQuery::build(args);
1498
1499 let coins = self.query(query).await?.coins.into();
1500 Ok(coins)
1501 }
1502
1503 pub async fn coins_to_spend(
1505 &self,
1506 owner: &Address,
1507 spend_query: Vec<(AssetId, u128, Option<u16>)>,
1508 excluded_ids: Option<(Vec<UtxoId>, Vec<Nonce>)>,
1510 ) -> io::Result<Vec<Vec<types::CoinType>>> {
1511 let owner: schema::Address = (*owner).into();
1512 let spend_query: Vec<SpendQueryElementInput> = spend_query
1513 .iter()
1514 .map(|(asset_id, amount, max)| -> Result<_, ConversionError> {
1515 Ok(SpendQueryElementInput {
1516 asset_id: (*asset_id).into(),
1517 amount: (*amount).into(),
1518 max: (*max).map(|max| max.into()),
1519 })
1520 })
1521 .try_collect()?;
1522 let excluded_ids: Option<ExcludeInput> = excluded_ids.map(Into::into);
1523 let args =
1524 schema::coins::CoinsToSpendArgs::from((owner, spend_query, excluded_ids));
1525 let query = schema::coins::CoinsToSpendQuery::build(args);
1526
1527 let coins_per_asset = self
1528 .query(query)
1529 .await?
1530 .coins_to_spend
1531 .into_iter()
1532 .map(|v| v.into_iter().map(Into::into).collect::<Vec<_>>())
1533 .collect::<Vec<_>>();
1534 Ok(coins_per_asset)
1535 }
1536
1537 pub async fn contract(&self, id: &ContractId) -> io::Result<Option<types::Contract>> {
1538 let query = schema::contract::ContractByIdQuery::build(ContractByIdArgs {
1539 id: (*id).into(),
1540 });
1541 let contract = self.query(query).await?.contract.map(Into::into);
1542 Ok(contract)
1543 }
1544
1545 pub async fn contract_balance(
1546 &self,
1547 id: &ContractId,
1548 asset: Option<&AssetId>,
1549 ) -> io::Result<u64> {
1550 let asset_id: schema::AssetId = match asset {
1551 Some(asset) => (*asset).into(),
1552 None => schema::AssetId::default(),
1553 };
1554
1555 let query =
1556 schema::contract::ContractBalanceQuery::build(ContractBalanceQueryArgs {
1557 id: (*id).into(),
1558 asset: asset_id,
1559 });
1560
1561 let balance: types::ContractBalance =
1562 self.query(query).await?.contract_balance.into();
1563 Ok(balance.amount)
1564 }
1565
1566 pub async fn balance(
1567 &self,
1568 owner: &Address,
1569 asset_id: Option<&AssetId>,
1570 ) -> io::Result<u64> {
1571 let owner: schema::Address = (*owner).into();
1572 let asset_id: schema::AssetId = match asset_id {
1573 Some(asset_id) => (*asset_id).into(),
1574 None => schema::AssetId::default(),
1575 };
1576 let query = schema::balance::BalanceQuery::build(BalanceArgs { owner, asset_id });
1577 let balance: types::Balance = self.query(query).await?.balance.into();
1578 Ok(balance.amount.try_into().unwrap_or(u64::MAX))
1579 }
1580
1581 pub async fn balances(
1583 &self,
1584 owner: &Address,
1585 request: PaginationRequest<String>,
1586 ) -> io::Result<PaginatedResult<types::Balance, String>> {
1587 let owner: schema::Address = (*owner).into();
1588 let args = schema::balance::BalancesConnectionArgs::from((owner, request));
1589 let query = schema::balance::BalancesQuery::build(args);
1590
1591 let balances = self.query(query).await?.balances.into();
1592 Ok(balances)
1593 }
1594
1595 pub async fn contract_balances(
1596 &self,
1597 contract: &ContractId,
1598 request: PaginationRequest<String>,
1599 ) -> io::Result<PaginatedResult<types::ContractBalance, String>> {
1600 let contract_id: schema::ContractId = (*contract).into();
1601 let args = ContractBalancesConnectionArgs::from((contract_id, request));
1602 let query = schema::contract::ContractBalancesQuery::build(args);
1603
1604 let balances = self.query(query).await?.contract_balances.into();
1605
1606 Ok(balances)
1607 }
1608
1609 pub async fn message(&self, nonce: &Nonce) -> io::Result<Option<types::Message>> {
1611 let query = schema::message::MessageQuery::build(NonceArgs {
1612 nonce: (*nonce).into(),
1613 });
1614 let message = self.query(query).await?.message.map(Into::into);
1615 Ok(message)
1616 }
1617
1618 pub async fn messages(
1619 &self,
1620 owner: Option<&Address>,
1621 request: PaginationRequest<String>,
1622 ) -> io::Result<PaginatedResult<types::Message, String>> {
1623 let owner: Option<schema::Address> = owner.map(|owner| (*owner).into());
1624 let args = schema::message::OwnedMessagesConnectionArgs::from((owner, request));
1625 let query = schema::message::OwnedMessageQuery::build(args);
1626
1627 let messages = self.query(query).await?.messages.into();
1628
1629 Ok(messages)
1630 }
1631
1632 pub async fn contract_info(
1633 &self,
1634 contract: &ContractId,
1635 ) -> io::Result<Option<types::Contract>> {
1636 let query = schema::contract::ContractByIdQuery::build(ContractByIdArgs {
1637 id: (*contract).into(),
1638 });
1639 let contract_info = self.query(query).await?.contract.map(Into::into);
1640 Ok(contract_info)
1641 }
1642
1643 pub async fn message_status(&self, nonce: &Nonce) -> io::Result<MessageStatus> {
1644 let query = schema::message::MessageStatusQuery::build(MessageStatusArgs {
1645 nonce: (*nonce).into(),
1646 });
1647 let status = self.query(query).await?.message_status.into();
1648
1649 Ok(status)
1650 }
1651
1652 pub async fn message_proof(
1654 &self,
1655 transaction_id: &TxId,
1656 nonce: &Nonce,
1657 commit_block_id: Option<&BlockId>,
1658 commit_block_height: Option<BlockHeight>,
1659 ) -> io::Result<types::MessageProof> {
1660 let transaction_id: TransactionId = (*transaction_id).into();
1661 let nonce: schema::Nonce = (*nonce).into();
1662 let commit_block_id: Option<schema::BlockId> =
1663 commit_block_id.map(|commit_block_id| (*commit_block_id).into());
1664 let commit_block_height = commit_block_height.map(Into::into);
1665 let query = schema::message::MessageProofQuery::build(MessageProofArgs {
1666 transaction_id,
1667 nonce,
1668 commit_block_id,
1669 commit_block_height,
1670 });
1671 let proof = self.query(query).await?.message_proof.try_into()?;
1672 Ok(proof)
1673 }
1674
1675 pub async fn relayed_transaction_status(
1676 &self,
1677 id: &Bytes32,
1678 ) -> io::Result<Option<RelayedTransactionStatus>> {
1679 let query = schema::relayed_tx::RelayedTransactionStatusQuery::build(
1680 RelayedTransactionStatusArgs {
1681 id: id.to_owned().into(),
1682 },
1683 );
1684 let status = self
1685 .query(query)
1686 .await?
1687 .relayed_transaction_status
1688 .map(|status| status.try_into())
1689 .transpose()?;
1690 Ok(status)
1691 }
1692
1693 pub async fn asset_info(&self, asset_id: &AssetId) -> io::Result<AssetDetail> {
1694 let query = schema::assets::AssetInfoQuery::build(AssetInfoArg {
1695 id: (*asset_id).into(),
1696 });
1697 let asset_info = self.query(query).await?.asset_details.into();
1698 Ok(asset_info)
1699 }
1700}
1701
1702#[cfg(any(test, feature = "test-helpers"))]
1703impl FuelClient {
1704 pub async fn transparent_transaction(
1705 &self,
1706 id: &TxId,
1707 ) -> io::Result<Option<types::TransactionType>> {
1708 let query = schema::tx::TransactionQuery::build(TxIdArgs { id: (*id).into() });
1709
1710 let transaction = self.query(query).await?.transaction;
1711
1712 Ok(transaction
1713 .map(|tx| {
1714 let response: TransactionResponse = tx.try_into()?;
1715 Ok::<_, ConversionError>(response.transaction)
1716 })
1717 .transpose()?)
1718 }
1719}