1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#![deny(unused_crate_dependencies)]

pub mod queries;
pub mod types {
    pub use fuel_indexer_database_types::*;
}

pub use fuel_indexer_database_types::DbType;
use fuel_indexer_lib::utils::{attempt_database_connection, ServiceStatus};
use fuel_indexer_postgres as postgres;
use sqlx::{
    pool::PoolConnection, postgres::PgConnectOptions, ConnectOptions, Error as SqlxError,
};
use std::{cmp::Ordering, collections::HashMap, str::FromStr};
use thiserror::Error;

#[derive(Debug, Error)]
pub enum IndexerDatabaseError {
    #[error("Invalid connection string: {0:?}")]
    InvalidConnectionString(String),
    #[error("Database backend not supported: {0:?}")]
    BackendNotSupported(String),
    #[error("No transaction is open.")]
    NoTransactionError,
    #[error("Error from sqlx: {0:#?}")]
    SqlxError(#[from] SqlxError),
    #[error("Unknown error")]
    Unknown,
    #[error("You don't own this indexer.")]
    NotYourIndexer,
    #[error("No table mapping exists for TypeId({0:?})")]
    TableMappingDoesNotExist(i64),
}

#[derive(Debug)]
pub enum IndexerConnection {
    Postgres(Box<PoolConnection<sqlx::Postgres>>),
}

impl IndexerConnection {
    pub fn database_type(&self) -> DbType {
        match self {
            IndexerConnection::Postgres(_) => DbType::Postgres,
        }
    }
}

#[derive(Clone, Debug)]
pub enum IndexerConnectionPool {
    Postgres(sqlx::Pool<sqlx::Postgres>),
}

impl IndexerConnectionPool {
    pub fn database_type(&self) -> DbType {
        match self {
            IndexerConnectionPool::Postgres(_) => DbType::Postgres,
        }
    }

    pub async fn connect(
        database_url: &str,
        max_db_connections: u32,
    ) -> Result<IndexerConnectionPool, IndexerDatabaseError> {
        let url = url::Url::parse(database_url);
        if url.is_err() {
            return Err(IndexerDatabaseError::InvalidConnectionString(
                database_url.into(),
            ));
        }
        let mut url = url.expect("Database URL should be correctly formed");
        let query: HashMap<_, _> = url.query_pairs().into_owned().collect();

        let query = query
            .iter()
            .map(|(k, v)| format!("{k}={v}"))
            .collect::<Vec<String>>()
            .join("&");

        url.set_query(Some(&query));

        match url.scheme() {
            "postgres" => {
                let mut opts = PgConnectOptions::from_str(url.as_str())?;
                opts.disable_statement_logging();

                let pool = attempt_database_connection(|| {
                    sqlx::postgres::PgPoolOptions::new()
                        .max_connections(max_db_connections)
                        .connect_with(opts.clone())
                })
                .await;

                let result = IndexerConnectionPool::Postgres(pool);
                let backend_max_connections = result.max_connections().await?;
                if backend_max_connections < max_db_connections {
                    tracing::warn!("Indexer --max-db-connections `{max_db_connections}` exceeds `{backend_max_connections}` value set by db backend")
                };
                Ok(result)
            }
            err => Err(IndexerDatabaseError::BackendNotSupported(err.into())),
        }
    }

    pub async fn is_connected(&self) -> sqlx::Result<ServiceStatus> {
        match self {
            IndexerConnectionPool::Postgres(p) => {
                let mut conn = p.acquire().await?;
                let result =
                    postgres::execute_query(&mut conn, "SELECT true;".to_string())
                        .await?;

                match result.cmp(&1) {
                    Ordering::Equal => Ok(ServiceStatus::OK),
                    _ => Ok(ServiceStatus::NotOk),
                }
            }
        }
    }

    pub async fn acquire(&self) -> sqlx::Result<IndexerConnection> {
        match self {
            IndexerConnectionPool::Postgres(p) => {
                Ok(IndexerConnection::Postgres(Box::new(p.acquire().await?)))
            }
        }
    }

    pub async fn max_connections(&self) -> sqlx::Result<u32> {
        match self {
            IndexerConnectionPool::Postgres(pool) => {
                let max_connections: i32 = sqlx::query_scalar(
                    "SELECT setting::int FROM pg_settings WHERE name = 'max_connections'",
                )
                .fetch_one(pool)
                .await?;
                Ok(max_connections as u32)
            }
        }
    }
}