1use crate::client::schema::contract::ContractBalanceQueryArgs;
2use anyhow::Context;
3use cynic::{
4 http::ReqwestExt,
5 GraphQlResponse,
6 Id,
7 MutationBuilder,
8 Operation,
9 QueryBuilder,
10 StreamingOperation,
11};
12use eventsource_client::HttpsConnector;
13use fuel_vm::prelude::*;
14use futures::StreamExt;
15use itertools::Itertools;
16use schema::{
17 balance::BalanceArgs,
18 block::BlockByIdArgs,
19 coin::{
20 Coin,
21 CoinByIdArgs,
22 },
23 contract::{
24 Contract,
25 ContractByIdArgs,
26 },
27 resource::SpendQueryElementInput,
28 tx::{
29 TxArg,
30 TxIdArgs,
31 },
32 Bytes,
33 ContinueTx,
34 ContinueTxArgs,
35 ConversionError,
36 HexString,
37 IdArg,
38 MemoryArgs,
39 RegisterArgs,
40 RunResult,
41 SetBreakpoint,
42 SetBreakpointArgs,
43 SetSingleStepping,
44 SetSingleSteppingArgs,
45 StartTx,
46 StartTxArgs,
47 TransactionId,
48 U64,
49};
50use std::{
51 convert::TryInto,
52 future,
53 io::{
54 self,
55 ErrorKind,
56 },
57 net,
58 str::{
59 self,
60 FromStr,
61 },
62};
63use types::{
64 TransactionResponse,
65 TransactionStatus,
66};
67
68use crate::client::schema::{
69 block::BlockByHeightArgs,
70 resource::ExcludeInput,
71 tx::DryRunArg,
72};
73pub use schema::{
74 PageDirection,
75 PaginatedResult,
76 PaginationRequest,
77};
78
79use self::schema::{
80 block::{
81 ProduceBlockArgs,
82 TimeParameters,
83 },
84 message::MessageProofArgs,
85};
86
87pub mod schema;
88pub mod types;
89
90#[derive(Debug, Clone, PartialEq, Eq, Hash)]
91pub struct FuelClient {
92 url: reqwest::Url,
93}
94
95impl FromStr for FuelClient {
96 type Err = anyhow::Error;
97
98 fn from_str(str: &str) -> Result<Self, Self::Err> {
99 let mut raw_url = str.to_string();
100 if !raw_url.starts_with("http") {
101 raw_url = format!("http://{}", raw_url);
102 }
103
104 let mut url = reqwest::Url::parse(&raw_url)
105 .with_context(|| format!("Invalid fuel-core URL: {}", str))?;
106 url.set_path("/graphql");
107 Ok(Self { url })
108 }
109}
110
111impl<S> From<S> for FuelClient
112where
113 S: Into<net::SocketAddr>,
114{
115 fn from(socket: S) -> Self {
116 format!("http://{}", socket.into())
117 .as_str()
118 .parse()
119 .unwrap()
120 }
121}
122
123pub fn from_strings_errors_to_std_error(errors: Vec<String>) -> io::Error {
124 let e = errors
125 .into_iter()
126 .fold(String::from("Response errors"), |mut s, e| {
127 s.push_str("; ");
128 s.push_str(e.as_str());
129 s
130 });
131 io::Error::new(io::ErrorKind::Other, e)
132}
133
134impl FuelClient {
135 pub fn new(url: impl AsRef<str>) -> anyhow::Result<Self> {
136 Self::from_str(url.as_ref())
137 }
138
139 async fn query<ResponseData, Vars>(
140 &self,
141 q: Operation<ResponseData, Vars>,
142 ) -> io::Result<ResponseData>
143 where
144 Vars: serde::Serialize,
145 ResponseData: serde::de::DeserializeOwned + 'static,
146 {
147 let response = reqwest::Client::new()
148 .post(self.url.clone())
149 .run_graphql(q)
150 .await
151 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
152
153 Self::decode_response(response)
154 }
155
156 fn decode_response<R>(response: GraphQlResponse<R>) -> io::Result<R>
157 where
158 R: serde::de::DeserializeOwned + 'static,
159 {
160 match (response.data, response.errors) {
161 (Some(d), _) => Ok(d),
162 (_, Some(e)) => Err(from_strings_errors_to_std_error(
163 e.into_iter().map(|e| e.message).collect(),
164 )),
165 _ => Err(io::Error::new(io::ErrorKind::Other, "Invalid response")),
166 }
167 }
168
169 async fn subscribe<ResponseData, Vars>(
170 &self,
171 q: StreamingOperation<ResponseData, Vars>,
172 ) -> io::Result<impl futures::Stream<Item = io::Result<ResponseData>>>
173 where
174 Vars: serde::Serialize,
175 ResponseData: serde::de::DeserializeOwned + 'static,
176 {
177 use eventsource_client as es;
178 let mut url = self.url.clone();
179 url.set_path("/graphql-sub");
180 let json_query = serde_json::to_string(&q)?;
181 let client = es::ClientBuilder::for_url(url.as_str())
182 .map_err(|e| {
183 io::Error::new(
184 io::ErrorKind::Other,
185 format!("Failed to start client {:?}", e),
186 )
187 })?
188 .body(json_query)
189 .method("POST".to_string())
190 .header("content-type", "application/json")
191 .map_err(|e| {
192 io::Error::new(
193 io::ErrorKind::Other,
194 format!("Failed to add header to client {:?}", e),
195 )
196 })?
197 .build_with_conn(HttpsConnector::with_webpki_roots());
198
199 let mut last = None;
200
201 let stream = es::Client::stream(&client)
202 .take_while(|result| {
203 futures::future::ready(!matches!(result, Err(es::Error::Eof)))
204 })
205 .filter_map(move |result| {
206 let r = match result {
207 Ok(es::SSE::Event(es::Event { data, .. })) => {
208 match serde_json::from_str::<GraphQlResponse<ResponseData>>(&data)
209 {
210 Ok(resp) => {
211 match Self::decode_response(resp) {
212 Ok(resp) => {
213 match last.replace(data) {
214 Some(l)
216 if l == *last.as_ref().expect(
217 "Safe because of the replace above",
218 ) =>
219 {
220 None
221 }
222 _ => Some(Ok(resp)),
223 }
224 }
225 Err(e) => Some(Err(io::Error::new(
226 io::ErrorKind::Other,
227 format!("Decode error: {:?}", e),
228 ))),
229 }
230 }
231 Err(e) => Some(Err(io::Error::new(
232 io::ErrorKind::Other,
233 format!("Json error: {:?}", e),
234 ))),
235 }
236 }
237 Ok(_) => None,
238 Err(e) => Some(Err(io::Error::new(
239 io::ErrorKind::Other,
240 format!("Graphql error: {:?}", e),
241 ))),
242 };
243 futures::future::ready(r)
244 });
245
246 Ok(stream)
247 }
248
249 pub async fn health(&self) -> io::Result<bool> {
250 let query = schema::Health::build(());
251 self.query(query).await.map(|r| r.health)
252 }
253
254 pub async fn node_info(&self) -> io::Result<schema::node_info::NodeInfo> {
255 let query = schema::node_info::QueryNodeInfo::build(());
256 self.query(query).await.map(|r| r.node_info)
257 }
258
259 pub async fn chain_info(&self) -> io::Result<schema::chain::ChainInfo> {
260 let query = schema::chain::ChainQuery::build(());
261 self.query(query).await.map(|r| r.chain)
262 }
263
264 pub async fn dry_run(&self, tx: &Transaction) -> io::Result<Vec<Receipt>> {
266 self.dry_run_opt(tx, None).await
267 }
268
269 pub async fn dry_run_opt(
271 &self,
272 tx: &Transaction,
273 utxo_validation: Option<bool>,
275 ) -> io::Result<Vec<Receipt>> {
276 let tx = tx.clone().to_bytes();
277 let query = schema::tx::DryRun::build(DryRunArg {
278 tx: HexString(Bytes(tx)),
279 utxo_validation,
280 });
281 let receipts = self.query(query).await.map(|r| r.dry_run)?;
282 receipts
283 .into_iter()
284 .map(|receipt| receipt.try_into().map_err(Into::into))
285 .collect()
286 }
287
288 pub async fn submit(&self, tx: &Transaction) -> io::Result<TransactionId> {
289 let tx = tx.clone().to_bytes();
290 let query = schema::tx::Submit::build(TxArg {
291 tx: HexString(Bytes(tx)),
292 });
293
294 let id = self.query(query).await.map(|r| r.submit)?.id;
295 Ok(id)
296 }
297
298 pub async fn submit_and_await_commit(
303 &self,
304 tx: &Transaction,
305 ) -> io::Result<TransactionStatus> {
306 let tx_id = self.submit(tx).await?;
307 self.await_transaction_commit(&tx_id.to_string()).await
308 }
309
310 pub async fn start_session(&self) -> io::Result<String> {
311 let query = schema::StartSession::build(());
312
313 self.query(query)
314 .await
315 .map(|r| r.start_session.into_inner())
316 }
317
318 pub async fn end_session(&self, id: &str) -> io::Result<bool> {
319 let query = schema::EndSession::build(IdArg { id: id.into() });
320
321 self.query(query).await.map(|r| r.end_session)
322 }
323
324 pub async fn reset(&self, id: &str) -> io::Result<bool> {
325 let query = schema::Reset::build(IdArg { id: id.into() });
326
327 self.query(query).await.map(|r| r.reset)
328 }
329
330 pub async fn execute(&self, id: &str, op: &Opcode) -> io::Result<bool> {
331 let op = serde_json::to_string(op)?;
332 let query = schema::Execute::build(schema::ExecuteArgs { id: id.into(), op });
333
334 self.query(query).await.map(|r| r.execute)
335 }
336
337 pub async fn register(&self, id: &str, register: RegisterId) -> io::Result<Word> {
338 let query = schema::Register::build(RegisterArgs {
339 id: id.into(),
340 register: register.into(),
341 });
342
343 Ok(self.query(query).await?.register.0 as Word)
344 }
345
346 pub async fn memory(
347 &self,
348 id: &str,
349 start: usize,
350 size: usize,
351 ) -> io::Result<Vec<u8>> {
352 let query = schema::Memory::build(MemoryArgs {
353 id: id.into(),
354 start: start.into(),
355 size: size.into(),
356 });
357
358 let memory = self.query(query).await?.memory;
359
360 Ok(serde_json::from_str(memory.as_str())?)
361 }
362
363 pub async fn set_breakpoint(
364 &self,
365 session_id: &str,
366 contract: ::fuel_vm::fuel_types::ContractId,
367 pc: u64,
368 ) -> io::Result<()> {
369 let operation = SetBreakpoint::build(SetBreakpointArgs {
370 id: Id::new(session_id),
371 bp: schema::Breakpoint {
372 contract: contract.into(),
373 pc: U64(pc),
374 },
375 });
376
377 let response = self.query(operation).await?;
378 assert!(
379 response.set_breakpoint,
380 "Setting breakpoint returned invalid reply"
381 );
382 Ok(())
383 }
384
385 pub async fn set_single_stepping(
386 &self,
387 session_id: &str,
388 enable: bool,
389 ) -> io::Result<()> {
390 let operation = SetSingleStepping::build(SetSingleSteppingArgs {
391 id: Id::new(session_id),
392 enable,
393 });
394 self.query(operation).await?;
395 Ok(())
396 }
397
398 pub async fn start_tx(
399 &self,
400 session_id: &str,
401 tx: &Transaction,
402 ) -> io::Result<RunResult> {
403 let operation = StartTx::build(StartTxArgs {
404 id: Id::new(session_id),
405 tx: serde_json::to_string(tx).expect("Couldn't serialize tx to json"),
406 });
407 let response = self.query(operation).await?.start_tx;
408 Ok(response)
409 }
410
411 pub async fn continue_tx(&self, session_id: &str) -> io::Result<RunResult> {
412 let operation = ContinueTx::build(ContinueTxArgs {
413 id: Id::new(session_id),
414 });
415 let response = self.query(operation).await?.continue_tx;
416 Ok(response)
417 }
418
419 pub async fn transaction(&self, id: &str) -> io::Result<Option<TransactionResponse>> {
420 let query = schema::tx::TransactionQuery::build(TxIdArgs { id: id.parse()? });
421
422 let transaction = self.query(query).await?.transaction;
423
424 Ok(transaction.map(|tx| tx.try_into()).transpose()?)
425 }
426
427 pub async fn transaction_status(&self, id: &str) -> io::Result<TransactionStatus> {
429 let query = schema::tx::TransactionQuery::build(TxIdArgs { id: id.parse()? });
430
431 let tx = self.query(query).await?.transaction.ok_or_else(|| {
432 io::Error::new(
433 ErrorKind::NotFound,
434 format!("status not found for transaction {} ", id),
435 )
436 })?;
437
438 let status = tx
439 .status
440 .ok_or_else(|| {
441 io::Error::new(
442 ErrorKind::NotFound,
443 format!("status not found for transaction {}", id),
444 )
445 })?
446 .try_into()?;
447 Ok(status)
448 }
449
450 pub async fn subscribe_transaction_status(
452 &self,
453 id: &str,
454 ) -> io::Result<impl futures::Stream<Item = io::Result<TransactionStatus>>> {
455 use cynic::SubscriptionBuilder;
456 let s = schema::tx::StatusChangeSubscription::build(TxIdArgs { id: id.parse()? });
457
458 let stream = self.subscribe(s).await?.map(|tx| {
459 let tx = tx?;
460 let status = tx.status_change.try_into()?;
461 Ok(status)
462 });
463
464 Ok(stream)
465 }
466
467 pub async fn await_transaction_commit(
472 &self,
473 id: &str,
474 ) -> io::Result<TransactionStatus> {
475 let status_result = self
478 .subscribe_transaction_status(id)
479 .await?
480 .skip_while(|status| {
481 future::ready(matches!(status, Ok(TransactionStatus::Submitted { .. })))
482 })
483 .next()
484 .await;
485
486 if let Some(Ok(status)) = status_result {
487 Ok(status)
488 } else {
489 Err(io::Error::new(
490 io::ErrorKind::Other,
491 "Failed to get status for transaction",
492 ))
493 }
494 }
495
496 pub async fn transactions(
498 &self,
499 request: PaginationRequest<String>,
500 ) -> io::Result<PaginatedResult<TransactionResponse, String>> {
501 let query = schema::tx::TransactionsQuery::build(request.into());
502 let transactions = self.query(query).await?.transactions.try_into()?;
503 Ok(transactions)
504 }
505
506 pub async fn transactions_by_owner(
508 &self,
509 owner: &str,
510 request: PaginationRequest<String>,
511 ) -> io::Result<PaginatedResult<TransactionResponse, String>> {
512 let owner: schema::Address = owner.parse()?;
513 let query = schema::tx::TransactionsByOwnerQuery::build((owner, request).into());
514
515 let transactions = self.query(query).await?.transactions_by_owner.try_into()?;
516 Ok(transactions)
517 }
518
519 pub async fn receipts(
520 &self,
521 id: &str,
522 ) -> io::Result<Vec<::fuel_vm::fuel_tx::Receipt>> {
523 let query = schema::tx::TransactionQuery::build(TxIdArgs { id: id.parse()? });
524
525 let tx = self.query(query).await?.transaction.ok_or_else(|| {
526 io::Error::new(ErrorKind::NotFound, format!("transaction {} not found", id))
527 })?;
528
529 let receipts: Result<Vec<::fuel_vm::fuel_tx::Receipt>, ConversionError> = tx
530 .receipts
531 .unwrap_or_default()
532 .into_iter()
533 .map(|r| r.try_into())
534 .collect();
535
536 Ok(receipts?)
537 }
538
539 pub async fn produce_blocks(
540 &self,
541 blocks_to_produce: u64,
542 time: Option<TimeParameters>,
543 ) -> io::Result<u64> {
544 let query = schema::block::BlockMutation::build(ProduceBlockArgs {
545 blocks_to_produce: blocks_to_produce.into(),
546 time,
547 });
548
549 let new_height = self.query(query).await?.produce_blocks;
550
551 Ok(new_height.into())
552 }
553
554 pub async fn block(&self, id: &str) -> io::Result<Option<schema::block::Block>> {
555 let query = schema::block::BlockByIdQuery::build(BlockByIdArgs {
556 id: Some(id.parse()?),
557 });
558
559 let block = self.query(query).await?.block;
560
561 Ok(block)
562 }
563
564 pub async fn block_by_height(
565 &self,
566 height: u64,
567 ) -> io::Result<Option<schema::block::Block>> {
568 let query = schema::block::BlockByHeightQuery::build(BlockByHeightArgs {
569 height: Some(U64(height)),
570 });
571
572 let block = self.query(query).await?.block;
573
574 Ok(block)
575 }
576
577 pub async fn blocks(
579 &self,
580 request: PaginationRequest<String>,
581 ) -> io::Result<PaginatedResult<schema::block::Block, String>> {
582 let query = schema::block::BlocksQuery::build(request.into());
583
584 let blocks = self.query(query).await?.blocks.into();
585
586 Ok(blocks)
587 }
588
589 pub async fn coin(&self, id: &str) -> io::Result<Option<Coin>> {
590 let query = schema::coin::CoinByIdQuery::build(CoinByIdArgs {
591 utxo_id: id.parse()?,
592 });
593 let coin = self.query(query).await?.coin;
594 Ok(coin)
595 }
596
597 pub async fn coins(
599 &self,
600 owner: &str,
601 asset_id: Option<&str>,
602 request: PaginationRequest<String>,
603 ) -> io::Result<PaginatedResult<schema::coin::Coin, String>> {
604 let owner: schema::Address = owner.parse()?;
605 let asset_id: schema::AssetId = match asset_id {
606 Some(asset_id) => asset_id.parse()?,
607 None => schema::AssetId::default(),
608 };
609 let query = schema::coin::CoinsQuery::build((owner, asset_id, request).into());
610
611 let coins = self.query(query).await?.coins.into();
612 Ok(coins)
613 }
614
615 pub async fn resources_to_spend(
617 &self,
618 owner: &str,
619 spend_query: Vec<(&str, u64, Option<u64>)>,
620 excluded_ids: Option<(Vec<&str>, Vec<&str>)>,
622 ) -> io::Result<Vec<Vec<schema::resource::Resource>>> {
623 let owner: schema::Address = owner.parse()?;
624 let spend_query: Vec<SpendQueryElementInput> = spend_query
625 .iter()
626 .map(|(asset_id, amount, max)| -> Result<_, ConversionError> {
627 Ok(SpendQueryElementInput {
628 asset_id: asset_id.parse()?,
629 amount: (*amount).into(),
630 max: (*max).map(|max| max.into()),
631 })
632 })
633 .try_collect()?;
634 let excluded_ids: Option<ExcludeInput> =
635 excluded_ids.map(ExcludeInput::from_tuple).transpose()?;
636 let query = schema::resource::ResourcesToSpendQuery::build(
637 (owner, spend_query, excluded_ids).into(),
638 );
639
640 let resources_per_asset = self.query(query).await?.resources_to_spend;
641 Ok(resources_per_asset)
642 }
643
644 pub async fn contract(&self, id: &str) -> io::Result<Option<Contract>> {
645 let query = schema::contract::ContractByIdQuery::build(ContractByIdArgs {
646 id: id.parse()?,
647 });
648 let contract = self.query(query).await?.contract;
649 Ok(contract)
650 }
651
652 pub async fn contract_balance(
653 &self,
654 id: &str,
655 asset: Option<&str>,
656 ) -> io::Result<u64> {
657 let asset_id: schema::AssetId = match asset {
658 Some(asset) => asset.parse()?,
659 None => schema::AssetId::default(),
660 };
661
662 let query =
663 schema::contract::ContractBalanceQuery::build(ContractBalanceQueryArgs {
664 id: id.parse()?,
665 asset: asset_id,
666 });
667
668 let balance = self.query(query).await.unwrap().contract_balance.amount;
669 Ok(balance.into())
670 }
671
672 pub async fn balance(&self, owner: &str, asset_id: Option<&str>) -> io::Result<u64> {
673 let owner: schema::Address = owner.parse()?;
674 let asset_id: schema::AssetId = match asset_id {
675 Some(asset_id) => asset_id.parse()?,
676 None => schema::AssetId::default(),
677 };
678 let query = schema::balance::BalanceQuery::build(BalanceArgs { owner, asset_id });
679 let balance = self.query(query).await?.balance;
680 Ok(balance.amount.into())
681 }
682
683 pub async fn balances(
685 &self,
686 owner: &str,
687 request: PaginationRequest<String>,
688 ) -> io::Result<PaginatedResult<schema::balance::Balance, String>> {
689 let owner: schema::Address = owner.parse()?;
690 let query = schema::balance::BalancesQuery::build((owner, request).into());
691
692 let balances = self.query(query).await?.balances.into();
693 Ok(balances)
694 }
695
696 pub async fn contract_balances(
697 &self,
698 contract: &str,
699 request: PaginationRequest<String>,
700 ) -> io::Result<PaginatedResult<schema::contract::ContractBalance, String>> {
701 let contract_id: schema::ContractId = contract.parse()?;
702 let query =
703 schema::contract::ContractBalancesQuery::build((contract_id, request).into());
704
705 let balances = self.query(query).await?.contract_balances.into();
706
707 Ok(balances)
708 }
709
710 pub async fn messages(
711 &self,
712 owner: Option<&str>,
713 request: PaginationRequest<String>,
714 ) -> io::Result<PaginatedResult<schema::message::Message, String>> {
715 let owner: Option<schema::Address> =
716 owner.map(|owner| owner.parse()).transpose()?;
717 let query = schema::message::OwnedMessageQuery::build((owner, request).into());
718
719 let messages = self.query(query).await?.messages.into();
720
721 Ok(messages)
722 }
723
724 pub async fn message_proof(
726 &self,
727 transaction_id: &str,
728 message_id: &str,
729 ) -> io::Result<Option<schema::message::MessageProof>> {
730 let transaction_id: schema::TransactionId = transaction_id.parse()?;
731 let message_id: schema::MessageId = message_id.parse()?;
732 let query = schema::message::MessageProofQuery::build(MessageProofArgs {
733 transaction_id,
734 message_id,
735 });
736
737 let proof = self.query(query).await?.message_proof;
738
739 Ok(proof)
740 }
741}
742
743#[cfg(any(test, feature = "test-helpers"))]
744impl FuelClient {
745 pub async fn transparent_transaction(
746 &self,
747 id: &str,
748 ) -> io::Result<Option<::fuel_vm::fuel_tx::Transaction>> {
749 let query = schema::tx::TransactionQuery::build(TxIdArgs { id: id.parse()? });
750
751 let transaction = self.query(query).await?.transaction;
752
753 Ok(transaction.map(|tx| tx.try_into()).transpose()?)
754 }
755}