use crate::{
fuel_core_graphql_api::{
api_service::{
BlockProducer,
ConsensusProvider,
TxPool,
},
database::ReadView,
ports::OffChainDatabase,
IntoApiResult,
},
query::{
transaction_status_change,
BlockQueryData,
SimpleTransactionData,
TransactionQueryData,
},
schema::{
scalars::{
Address,
HexString,
SortedTxCursor,
TransactionId,
TxPointer,
},
tx::types::TransactionStatus,
},
service::adapters::SharedMemoryPool,
};
use async_graphql::{
connection::{
Connection,
EmptyFields,
},
Context,
Object,
Subscription,
};
use fuel_core_storage::{
iter::IterDirection,
Error as StorageError,
Result as StorageResult,
};
use fuel_core_txpool::{
ports::MemoryPool,
service::TxStatusMessage,
};
use fuel_core_types::{
fuel_tx::{
Cacheable,
Transaction as FuelTx,
UniqueIdentifier,
},
fuel_types,
fuel_types::canonical::Deserialize,
fuel_vm::checked_transaction::{
CheckPredicateParams,
EstimatePredicates,
},
services::txpool,
};
use futures::{
Stream,
TryStreamExt,
};
use itertools::Itertools;
use std::{
iter,
sync::Arc,
};
use tokio_stream::StreamExt;
use types::{
DryRunTransactionExecutionStatus,
Transaction,
};
use super::scalars::U64;
pub mod input;
pub mod output;
pub mod receipt;
pub mod types;
pub mod upgrade_purpose;
#[derive(Default)]
pub struct TxQuery;
#[Object]
impl TxQuery {
async fn transaction(
&self,
ctx: &Context<'_>,
#[graphql(desc = "The ID of the transaction")] id: TransactionId,
) -> async_graphql::Result<Option<Transaction>> {
let query: &ReadView = ctx.data_unchecked();
let id = id.0;
let txpool = ctx.data_unchecked::<TxPool>();
if let Some(transaction) = txpool.transaction(id) {
Ok(Some(Transaction(transaction, id)))
} else {
query
.transaction(&id)
.map(|tx| Transaction::from_tx(id, tx))
.into_api_result()
}
}
async fn transactions(
&self,
ctx: &Context<'_>,
first: Option<i32>,
after: Option<String>,
last: Option<i32>,
before: Option<String>,
) -> async_graphql::Result<
Connection<SortedTxCursor, Transaction, EmptyFields, EmptyFields>,
> {
let query: &ReadView = ctx.data_unchecked();
crate::schema::query_pagination(
after,
before,
first,
last,
|start: &Option<SortedTxCursor>, direction| {
let start = *start;
let block_id = start.map(|sorted| sorted.block_height);
let all_block_ids = query.compressed_blocks(block_id, direction);
let all_txs = all_block_ids
.map(move |block| {
block.map(|fuel_block| {
let (header, mut txs) = fuel_block.into_inner();
if direction == IterDirection::Reverse {
txs.reverse();
}
txs.into_iter().zip(iter::repeat(*header.height()))
})
})
.flatten_ok()
.map(|result| {
result.map(|(tx_id, block_height)| {
SortedTxCursor::new(block_height, tx_id.into())
})
})
.skip_while(move |result| {
if let Ok(sorted) = result {
if let Some(start) = start {
return sorted != &start
}
}
false
});
let all_txs = all_txs.map(|result: StorageResult<SortedTxCursor>| {
result.and_then(|sorted| {
let tx = query.transaction(&sorted.tx_id.0)?;
Ok((sorted, Transaction::from_tx(sorted.tx_id.0, tx)))
})
});
Ok(all_txs)
},
)
.await
}
async fn transactions_by_owner(
&self,
ctx: &Context<'_>,
owner: Address,
first: Option<i32>,
after: Option<String>,
last: Option<i32>,
before: Option<String>,
) -> async_graphql::Result<Connection<TxPointer, Transaction, EmptyFields, EmptyFields>>
{
let query: &ReadView = ctx.data_unchecked();
let params = ctx
.data_unchecked::<ConsensusProvider>()
.latest_consensus_params();
let owner = fuel_types::Address::from(owner);
crate::schema::query_pagination(
after,
before,
first,
last,
|start: &Option<TxPointer>, direction| {
let start = (*start).map(Into::into);
let txs =
query
.owned_transactions(owner, start, direction)
.map(|result| {
result.map(|(cursor, tx)| {
let tx_id = tx.id(¶ms.chain_id());
(cursor.into(), Transaction::from_tx(tx_id, tx))
})
});
Ok(txs)
},
)
.await
}
async fn estimate_predicates(
&self,
ctx: &Context<'_>,
tx: HexString,
) -> async_graphql::Result<Transaction> {
let mut tx = FuelTx::from_bytes(&tx.0)?;
let params = ctx
.data_unchecked::<ConsensusProvider>()
.latest_consensus_params();
let memory_pool = ctx.data_unchecked::<SharedMemoryPool>();
let memory = memory_pool.get_memory().await;
let parameters = CheckPredicateParams::from(params.as_ref());
let tx = tokio_rayon::spawn_fifo(move || {
let result = tx.estimate_predicates(¶meters, memory);
result.map(|_| tx)
})
.await
.map_err(|err| anyhow::anyhow!("{:?}", err))?;
Ok(Transaction::from_tx(tx.id(¶ms.chain_id()), tx))
}
#[cfg(feature = "test-helpers")]
async fn all_receipts(&self) -> Vec<receipt::Receipt> {
receipt::all_receipts()
.into_iter()
.map(Into::into)
.collect()
}
}
#[derive(Default)]
pub struct TxMutation;
#[Object]
impl TxMutation {
async fn dry_run(
&self,
ctx: &Context<'_>,
txs: Vec<HexString>,
utxo_validation: Option<bool>,
gas_price: Option<U64>,
) -> async_graphql::Result<Vec<DryRunTransactionExecutionStatus>> {
let block_producer = ctx.data_unchecked::<BlockProducer>();
let params = ctx
.data_unchecked::<ConsensusProvider>()
.latest_consensus_params();
let mut transactions = txs
.iter()
.map(|tx| FuelTx::from_bytes(&tx.0))
.collect::<Result<Vec<FuelTx>, _>>()?;
for transaction in &mut transactions {
transaction.precompute(¶ms.chain_id())?;
}
let tx_statuses = block_producer
.dry_run_txs(
transactions,
None,
utxo_validation,
gas_price.map(|x| x.into()),
)
.await?;
let tx_statuses = tx_statuses
.into_iter()
.map(DryRunTransactionExecutionStatus)
.collect();
Ok(tx_statuses)
}
async fn submit(
&self,
ctx: &Context<'_>,
tx: HexString,
) -> async_graphql::Result<Transaction> {
let txpool = ctx.data_unchecked::<TxPool>();
let params = ctx
.data_unchecked::<ConsensusProvider>()
.latest_consensus_params();
let tx = FuelTx::from_bytes(&tx.0)?;
let _: Vec<_> = txpool
.insert(vec![Arc::new(tx.clone())])
.await
.into_iter()
.try_collect()?;
let id = tx.id(¶ms.chain_id());
let tx = Transaction(tx, id);
Ok(tx)
}
}
#[derive(Default)]
pub struct TxStatusSubscription;
#[Subscription]
impl TxStatusSubscription {
async fn status_change<'a>(
&self,
ctx: &Context<'a>,
#[graphql(desc = "The ID of the transaction")] id: TransactionId,
) -> anyhow::Result<impl Stream<Item = async_graphql::Result<TransactionStatus>> + 'a>
{
let txpool = ctx.data_unchecked::<TxPool>();
let query: &ReadView = ctx.data_unchecked();
let rx = txpool.tx_update_subscribe(id.into())?;
Ok(transaction_status_change(
move |id| match query.tx_status(&id) {
Ok(status) => Ok(Some(status)),
Err(StorageError::NotFound(_, _)) => Ok(txpool
.submission_time(id)
.map(|time| txpool::TransactionStatus::Submitted { time })),
Err(err) => Err(err),
},
rx,
id.into(),
)
.map_err(async_graphql::Error::from))
}
async fn submit_and_await<'a>(
&self,
ctx: &Context<'a>,
tx: HexString,
) -> async_graphql::Result<
impl Stream<Item = async_graphql::Result<TransactionStatus>> + 'a,
> {
let txpool = ctx.data_unchecked::<TxPool>();
let params = ctx
.data_unchecked::<ConsensusProvider>()
.latest_consensus_params();
let tx = FuelTx::from_bytes(&tx.0)?;
let tx_id = tx.id(¶ms.chain_id());
let subscription = txpool.tx_update_subscribe(tx_id)?;
let _: Vec<_> = txpool
.insert(vec![Arc::new(tx)])
.await
.into_iter()
.try_collect()?;
Ok(subscription
.skip_while(|event| {
matches!(
event,
TxStatusMessage::Status(txpool::TransactionStatus::Submitted { .. })
)
})
.map(move |event| match event {
TxStatusMessage::Status(status) => {
let status = TransactionStatus::new(tx_id, status);
Ok(status)
}
TxStatusMessage::FailedStatus => {
Err(anyhow::anyhow!("Failed to get transaction status").into())
}
})
.take(1))
}
}