use crate::{
middleware::AuthenticationMiddleware,
uses::{
get_nonce, graphql_playground, health_check, indexer_status, query_graph,
register_indexer_assets, remove_indexer, sql_query, verify_signature,
},
};
#[cfg(feature = "metrics")]
use crate::middleware::MetricsMiddleware;
use async_std::sync::{Arc, RwLock};
use axum::{
error_handling::HandleErrorLayer,
extract::{Extension, Json},
http::StatusCode,
response::{IntoResponse, Response},
routing::{delete, get, post},
Router,
};
use fuel_indexer_database::{IndexerConnectionPool, IndexerDatabaseError};
use fuel_indexer_graphql::graphql::GraphqlError;
use fuel_indexer_lib::{config::IndexerConfig, defaults, utils::ServiceRequest};
use fuel_indexer_schema::db::{manager::SchemaManager, IndexerSchemaDbError};
use hyper::Method;
use serde_json::json;
use std::{
net::SocketAddr,
str::FromStr,
time::{Duration, Instant},
};
use thiserror::Error;
use tokio::sync::mpsc::{error::SendError, Sender};
use tower::ServiceBuilder;
use tower::{buffer::BufferLayer, limit::RateLimitLayer};
use tower_http::{
cors::{Any, CorsLayer},
limit::RequestBodyLimitLayer,
trace::{DefaultMakeSpan, DefaultOnRequest, DefaultOnResponse, TraceLayer},
LatencyUnit,
};
use tracing::{error, Level};
pub type ApiResult<T> = core::result::Result<T, ApiError>;
const BUFFER_SIZE: usize = 1024;
#[derive(Debug, Error)]
pub enum HttpError {
#[error("Bad request.")]
BadRequest,
#[error("Conflict. {0:#?}")]
Conflict(String),
#[error("Unauthorized request.")]
Unauthorized,
#[error("Not not found. {0:#?}")]
NotFound(String),
#[error("Error.")]
InternalServer,
#[error("HTTP error: {0:?}")]
Http(http::Error),
}
impl From<http::Error> for HttpError {
fn from(err: http::Error) -> Self {
HttpError::Http(err)
}
}
#[derive(Debug, Error)]
pub enum ApiError {
#[error("Query builder error {0:?}")]
Graphql(#[from] GraphqlError),
#[error("Serialization error {0:?}")]
Serde(#[from] serde_json::Error),
#[error("Database error {0:?}")]
Database(#[from] IndexerDatabaseError),
#[error("Sqlx error {0:?}")]
Sqlx(#[from] sqlx::Error),
#[error("Http error {0:?}")]
Http(#[from] HttpError),
#[error("Schema error {0:?}")]
SchemaError(#[from] IndexerSchemaDbError),
#[error("Channel send error: {0:?}")]
ChannelSend(#[from] SendError<ServiceRequest>),
#[error("Axum error: {0:?}")]
Axum(#[from] axum::Error),
#[error("Hyper error: {0:?}")]
HyperError(#[from] hyper::Error),
#[error("FuelCrypto error: {0:?}")]
FuelCrypto(#[from] fuel_crypto::Error),
#[error("JsonWebToken: {0:?}")]
JsonWebToken(#[from] jsonwebtoken::errors::Error),
#[error("HexError: {0:?}")]
HexError(#[from] hex::FromHexError),
#[error("BoxError: {0:?}")]
BoxError(#[from] axum::BoxError),
#[error("Sql validator error: {0:?}")]
SqlValidator(#[from] crate::sql::SqlValidatorError),
#[error("ParseError: {0:?}")]
ParseError(#[from] strum::ParseError),
#[error("The forc-index version {toolchain_version} does not match the fuel-indexer version {fuel_indexer_version}.")]
ToolchainVersionMismatch {
toolchain_version: String,
fuel_indexer_version: String,
},
#[error("Other error: {0}")]
OtherError(String),
}
impl Default for ApiError {
fn default() -> Self {
ApiError::Http(HttpError::InternalServer)
}
}
impl From<http::Error> for ApiError {
fn from(err: http::Error) -> Self {
ApiError::Http(HttpError::from(err))
}
}
impl IntoResponse for ApiError {
fn into_response(self) -> Response {
let generic_details = "Internal server error.".to_string();
error!("ApiError: {self:?}");
let (status, details) = match self {
Self::JsonWebToken(e) => (
StatusCode::BAD_REQUEST,
format!("Could not process JWT: {e}"),
),
Self::Http(HttpError::Conflict(e)) => {
(StatusCode::CONFLICT, format!("Conflict: {e}"))
}
Self::Http(HttpError::Unauthorized) => {
(StatusCode::UNAUTHORIZED, "Unauthorized.".to_string())
}
Self::Http(HttpError::NotFound(e)) => {
(StatusCode::NOT_FOUND, format!("Not found: {e}."))
}
Self::Sqlx(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Database error: {e}."),
),
Self::Database(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Database error: {e}."),
),
Self::FuelCrypto(e) => {
(StatusCode::BAD_REQUEST, format!("Crypto error: {e}."))
}
Self::Graphql(e) => (StatusCode::BAD_REQUEST, format!("GraphQL error: {e}.")),
Self::SchemaError(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Schema error: {e}."),
),
ApiError::BoxError(e) => {
error!("Generic BoxError: {e:?}");
(StatusCode::INTERNAL_SERVER_ERROR, format!("Error: {e}"))
}
ApiError::SqlValidator(e) => {
error!("SqlValidatorError: {e:?}");
(StatusCode::BAD_REQUEST, format!("Error: {e}"))
}
ApiError::ParseError(e) => {
error!("ParseError: {e:?}");
(StatusCode::BAD_REQUEST, format!("Invalid asset type: {e}"))
}
ApiError::ToolchainVersionMismatch{fuel_indexer_version, toolchain_version} => {
(StatusCode::METHOD_NOT_ALLOWED, format!("WASM module toolchain version `{toolchain_version}` does not match fuel-indexer version `{fuel_indexer_version}`"))
}
_ => (StatusCode::INTERNAL_SERVER_ERROR, generic_details),
};
error!("{status:?} - {details}");
(
status,
Json(json!({
"success": "false",
"details": details,
})),
)
.into_response()
}
}
pub struct WebApi;
impl WebApi {
pub async fn build(
config: IndexerConfig,
pool: IndexerConnectionPool,
tx: Sender<ServiceRequest>,
) -> ApiResult<Router> {
let sm = SchemaManager::new(pool.clone());
let schema_manager = Arc::new(RwLock::new(sm));
let max_body_size = config.web_api.max_body_size;
let start_time = Arc::new(Instant::now());
let log_level =
Level::from_str(config.log_level.as_ref()).expect("Invalid log level.");
let mut graph_routes = Router::new()
.route("/:namespace/:identifier", post(query_graph))
.layer(Extension(schema_manager.clone()))
.layer(Extension(pool.clone()))
.layer(RequestBodyLimitLayer::new(max_body_size));
let mut sql_routes = Router::new();
if config.accept_sql_queries {
sql_routes = Router::new()
.route("/:namespace/:identifier", post(sql_query))
.layer(AuthenticationMiddleware::from(&config))
.layer(Extension(pool.clone()))
.layer(Extension(config.clone()))
.layer(RequestBodyLimitLayer::new(max_body_size));
}
if config.rate_limit.enabled {
graph_routes = graph_routes.layer(
ServiceBuilder::new()
.layer(HandleErrorLayer::new(|e: axum::BoxError| async move {
ApiError::from(e).into_response()
}))
.layer(BufferLayer::new(BUFFER_SIZE))
.layer(RateLimitLayer::new(
config
.rate_limit
.request_count
.unwrap_or(defaults::RATE_LIMIT_REQUEST_COUNT),
Duration::from_secs(
config
.rate_limit
.window_size
.unwrap_or(defaults::RATE_LIMIT_WINDOW_SIZE),
),
)),
);
}
#[cfg(feature = "metrics")]
let graph_routes = graph_routes.layer(MetricsMiddleware::default());
let indexer_routes = Router::new()
.route("/:namespace/:identifier", post(register_indexer_assets))
.layer(AuthenticationMiddleware::from(&config))
.layer(Extension(tx.clone()))
.layer(Extension(schema_manager.clone()))
.layer(Extension(pool.clone()))
.layer(Extension(config.clone()))
.route("/:namespace/:identifier", delete(remove_indexer))
.layer(AuthenticationMiddleware::from(&config))
.layer(Extension(tx))
.layer(Extension(pool.clone()))
.layer(Extension(config.clone()))
.layer(RequestBodyLimitLayer::new(max_body_size));
#[cfg(feature = "metrics")]
let indexer_routes = indexer_routes.layer(MetricsMiddleware::default());
let root_routes = Router::new()
.route("/status", get(indexer_status))
.layer(Extension(pool.clone()))
.layer(AuthenticationMiddleware::from(&config))
.layer(Extension(config.clone()))
.route("/health", get(health_check))
.layer(Extension(config.clone()))
.layer(Extension(pool.clone()))
.layer(Extension(start_time));
#[cfg(feature = "metrics")]
let root_routes = root_routes
.route("/metrics", get(crate::uses::get_metrics))
.layer(MetricsMiddleware::default());
let auth_routes = Router::new()
.route("/nonce", get(get_nonce))
.layer(Extension(pool.clone()))
.route("/signature", post(verify_signature))
.layer(Extension(pool.clone()))
.layer(Extension(config));
#[cfg(feature = "metrics")]
let auth_routes = auth_routes.layer(MetricsMiddleware::default());
let playground_route = Router::new()
.route("/:namespace/:identifier", get(graphql_playground))
.layer(Extension(schema_manager))
.layer(Extension(pool))
.layer(RequestBodyLimitLayer::new(max_body_size));
#[cfg(feature = "metrics")]
let playground_route = playground_route.layer(MetricsMiddleware::default());
let api_routes = Router::new()
.nest("/", root_routes)
.nest("/playground", playground_route)
.nest("/index", indexer_routes)
.nest("/graph", graph_routes)
.nest("/sql", sql_routes)
.nest("/auth", auth_routes);
let app = Router::new()
.nest("/api", api_routes)
.layer(
TraceLayer::new_for_http()
.make_span_with(DefaultMakeSpan::new().include_headers(true))
.on_request(DefaultOnRequest::new().level(log_level))
.on_response(
DefaultOnResponse::new()
.level(log_level)
.latency_unit(LatencyUnit::Micros),
),
)
.layer(
CorsLayer::new()
.allow_methods(vec![
Method::GET,
Method::POST,
Method::OPTIONS,
Method::DELETE,
])
.allow_origin(Any {})
.allow_headers(Any {}),
);
Ok(app)
}
pub async fn run(config: IndexerConfig, app: Router) -> ApiResult<()> {
let listen_on: SocketAddr = config.web_api.into();
axum::Server::bind(&listen_on)
.serve(app.into_make_service())
.await?;
Ok(())
}
pub async fn build_and_run(
config: IndexerConfig,
pool: IndexerConnectionPool,
tx: Sender<ServiceRequest>,
) -> ApiResult<()> {
let listen_on: SocketAddr = config.web_api.clone().into();
let app = WebApi::build(config, pool, tx).await?;
axum::Server::bind(&listen_on)
.serve(app.into_make_service())
.await?;
Ok(())
}
}