1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
/// Abstractions for indexer task execution.
use crate::{
    database::Database, ffi, queries::ClientExt, IndexerConfig, IndexerError,
    IndexerResult,
};
use anyhow::Context;
use async_std::sync::{Arc, Mutex};
use async_trait::async_trait;
use fuel_core_client::client::{
    pagination::{PageDirection, PaginatedResult, PaginationRequest},
    schema::block::{Consensus as ClientConsensus, Genesis as ClientGenesis},
    types::TransactionStatus as ClientTransactionStatus,
    FuelClient,
};
use fuel_indexer_database::{queries, types::IndexerStatus, IndexerConnectionPool};
use fuel_indexer_lib::{
    defaults::*, manifest::Manifest, utils::serialize, WasmIndexerError,
};
#[cfg(feature = "metrics")]
use fuel_indexer_metrics::METRICS;
use fuel_indexer_types::{
    fuel::{field::*, *},
    scalar::{Bytes, Bytes32},
};
use fuel_tx::UniqueIdentifier;
use fuel_vm::prelude::Deserializable;
use fuel_vm::state::ProgramState as ClientProgramState;
use futures::Future;
use itertools::Itertools;
use std::{
    marker::{Send, Sync},
    path::Path,
    str::FromStr,
    sync::atomic::{AtomicBool, Ordering},
};
use tokio::{
    task::spawn_blocking,
    time::{sleep, Duration},
};
use tracing::{debug, error, info, warn};
use wasmer::{
    imports, CompilerConfig, Cranelift, FunctionEnv, Instance, Memory, Module, Store,
    TypedFunction,
};
use wasmer_middlewares::metering::MeteringPoints;

#[cfg(feature = "metrics")]
use tokio::time::Instant;

/// Source of the indexer's execution.
#[derive(Debug, Clone)]
pub enum ExecutorSource {
    /// The executor was created from a manifest file.
    Manifest,

    /// The executor was created from indexer bytes stored in the database.
    Registry(Vec<u8>),
}

impl AsRef<[u8]> for ExecutorSource {
    fn as_ref(&self) -> &[u8] {
        match self {
            ExecutorSource::Manifest => &[],
            ExecutorSource::Registry(b) => b,
        }
    }
}

impl From<ExecutorSource> for Vec<u8> {
    fn from(source: ExecutorSource) -> Self {
        match source {
            ExecutorSource::Manifest => vec![],
            ExecutorSource::Registry(bytes) => bytes,
        }
    }
}

/// Run the executor task until the kill switch is flipped, or until some other
/// stop criteria is met.
//
// In general the logic in this function isn't very idiomatic, but that's because
// types in `fuel_core_client` don't compile to WASM.
pub fn run_executor<T: 'static + Executor + Send + Sync>(
    config: &IndexerConfig,
    pool: IndexerConnectionPool,
    mut executor: T,
) -> anyhow::Result<impl Future<Output = IndexerResult<()>>> {
    // TODO: https://github.com/FuelLabs/fuel-indexer/issues/286

    let end_block = executor.manifest().end_block();
    let stop_idle_indexers = config.stop_idle_indexers;
    let indexer_uid = executor.manifest().uid();
    let block_page_size = config.block_page_size;

    let fuel_node_addr = executor
        .manifest()
        .fuel_client()
        .map(|x| x.to_string())
        .unwrap_or(config.fuel_node.to_string());

    // Where should we initially start when fetching blocks from the client?
    let mut cursor = executor.manifest().start_block().map(|x| {
        if x > 1 {
            let decremented = x - 1;
            decremented.to_string()
        } else {
            "0".to_string()
        }
    });

    info!("Indexer({indexer_uid}) subscribing to Fuel node at {fuel_node_addr}");

    let client = FuelClient::from_str(&fuel_node_addr)
        .with_context(|| "Client node connection failed".to_string())?;

    if let Some(end_block) = end_block {
        info!("Indexer({indexer_uid}) will stop at block #{end_block}.");
    } else {
        warn!("No end_block specified in the manifest. Indexer({indexer_uid}) will run forever.");
    }

    let allow_non_sequential_blocks = config.allow_non_sequential_blocks;
    let client_request_delay = config.client_request_delay;

    let task = async move {
        let mut conn = pool
            .acquire()
            .await
            .with_context(|| "Unable to acquire a database connection".to_string())?;

        if allow_non_sequential_blocks {
            queries::remove_ensure_block_height_consecutive_trigger(
                &mut conn,
                executor.manifest().namespace(),
                executor.manifest().identifier(),
            )
            .await
            .with_context(|| {
                "Unable to remove the sequential blocks trigger".to_string()
            })?;
        } else {
            queries::create_ensure_block_height_consecutive_trigger(
                &mut conn,
                executor.manifest().namespace(),
                executor.manifest().identifier(),
            )
            .await
            .with_context(|| {
                "Unable to create the sequential blocks trigger".to_string()
            })?;
        }

        // If we reach an issue that continues to fail, we'll retry a few times before giving up, as
        // we don't want to quit on the first error. But also don't want to waste CPU.
        //
        // Note that this count considers _consecutive_ failed calls.
        let mut consecutive_retries = 0;

        // If we're testing or running on CI, we don't want indexers to run forever. But in production
        // let the indexer service operator decide if they want to stop idle indexers.
        //
        // Maybe we can eventually make this MAX_CONSECUTIVE_EMPTY_BLOCK_RESPONSES value configurable
        //
        // Also note that this count considers _consecutive_ empty block requests.
        let max_empty_block_reqs = if stop_idle_indexers {
            MAX_CONSECUTIVE_EMPTY_BLOCK_RESPONSES
        } else {
            usize::MAX
        };

        // Keep track of how many empty pages we've received from the client.
        let mut num_empty_block_reqs = 0;

        loop {
            // If something else has signaled that this indexer should stop, then stop.
            if executor.kill_switch().load(Ordering::SeqCst) {
                return Err(IndexerError::KillSwitch);
            }

            #[cfg(feature = "metrics")]
            let start = Instant::now();

            // Fetch the next page of blocks, and the starting cursor for the subsequent page
            let (block_info, next_cursor, _has_next_page) =
                match retrieve_blocks_from_node(
                    &client,
                    block_page_size,
                    &cursor,
                    end_block,
                    &indexer_uid,
                )
                .await
                {
                    Ok((block_info, next_cursor, has_next_page)) => {
                        (block_info, next_cursor, has_next_page)
                    }
                    Err(e) => {
                        if let IndexerError::EndBlockMet = e {
                            info!("Indexer({indexer_uid}) has met its end block; beginning indexer shutdown process.");
                            executor.kill_switch().store(true, Ordering::SeqCst);
                            continue;
                        } else {
                            error!(
                                "Indexer({indexer_uid}) failed to fetch blocks: {e:?}",
                            );
                            sleep(Duration::from_secs(DELAY_FOR_SERVICE_ERROR)).await;
                            continue;
                        }
                    }
                };

            #[cfg(feature = "metrics")]
            {
                METRICS
                    .exec
                    .web
                    .record(&indexer_uid, start.elapsed().as_millis() as f64);
            }

            // If our block page request from the client returns empty, we sleep for a bit, and then continue.
            if block_info.is_empty() {
                num_empty_block_reqs += 1;

                info!(
                    "Indexer({indexer_uid}) has no new blocks to process, sleeping zzZZ. (Empty response #{num_empty_block_reqs})"
                );

                if num_empty_block_reqs == max_empty_block_reqs {
                    return Err(anyhow::format_err!(
                        "No blocks being produced after {num_empty_block_reqs} empty responses. Indexer({indexer_uid}) giving up. <('.')>"
                    ).into());
                }

                // There is no work to do, so we sleep for a bit, then continue without updating our cursor.
                sleep(Duration::from_secs(IDLE_SERVICE_WAIT_SECS)).await;
                continue;
            }

            // The client responded with actual blocks, so attempt to index them.
            let result = executor.handle_events(block_info).await;

            // If the kill switch has been triggered, the executor exits early.
            if executor.kill_switch().load(Ordering::SeqCst) {
                return Err(IndexerError::KillSwitch);
            }

            if let Err(e) = result {
                if let IndexerError::RuntimeError(ref e) = e {
                    match e.downcast_ref::<WasmIndexerError>() {
                        Some(&WasmIndexerError::MissingBlocksError) => {
                            return Err(anyhow::anyhow!("{e}").into());
                        }
                        Some(&WasmIndexerError::Panic)
                        | Some(&WasmIndexerError::GeneralError) => {
                            let message = executor.get_error_message().await.unwrap_or(
                                "unable to extract the error message".to_string(),
                            );
                            return Err(anyhow::anyhow!("{message}").into());
                        }
                        _ => (),
                    }
                }
                // Run time metering is deterministic. There is no point in retrying.
                if let IndexerError::RunTimeLimitExceededError = e {
                    return Err(anyhow::format_err!(
                        "Indexer({indexer_uid}) executor run time limit exceeded. Giving up. <('.')>. Consider increasing metering points"
                    ).into());
                }

                // We don't want to retry forever as that eats resources.
                if consecutive_retries >= INDEXER_FAILED_CALLS {
                    return Err(anyhow::format_err!(
                        "Indexer({indexer_uid}) failed after too many retries, giving up. <('.')>"
                    ).into());
                }

                if let IndexerError::SqlxError(sqlx::Error::Database(inner)) = e {
                    // TODO: https://github.com/FuelLabs/fuel-indexer/issues/1093
                    if inner.constraint().is_some() {
                        // Just bump the cursor and keep going. These errors do not count towards `INDEXER_FAILED_CALLS`
                        warn!("Constraint violation: {inner:?}. This is not a retry-able error. Continuing...");
                        cursor = next_cursor;
                        continue;
                    }
                }

                // If we get here, this must be an error that allows us to retry.
                warn!("Indexer({indexer_uid}) retrying handler after {consecutive_retries}/{INDEXER_FAILED_CALLS} failed attempts.");

                consecutive_retries += 1;

                // Since there was some type of error, we're gonna call `retrieve_blocks_from_node` again,
                // with our same cursor.
                continue;
            }

            // If we get a non-empty response, we reset the counter.
            num_empty_block_reqs = 0;

            // If we make it this far, we always go to the next page.
            cursor = next_cursor;

            queries::set_indexer_status(
                &mut conn,
                executor.manifest().namespace(),
                executor.manifest().identifier(),
                IndexerStatus::running(format!(
                    "Indexed {} blocks",
                    cursor.clone().unwrap_or("0".to_string())
                )),
            )
            .await?;

            // Again, check if something else has signaled that this indexer should stop, then stop.
            if executor.kill_switch().load(Ordering::SeqCst) {
                return Err(IndexerError::KillSwitch);
            }

            // Since we had successful call, we reset the retry count.
            consecutive_retries = 0;

            if let Some(delay) = client_request_delay {
                sleep(Duration::from_secs(delay)).await;
            }
        }
    };

    Ok(task)
}

/// Retrieve blocks from a client node.
///
/// This was abstracted out of `run_executor` in order to allow for use in the benchmarking suite
/// to give consistent timings.
///
/// If there is an issue fetching blocks, we return an empty cursor and an empty list of blocks, then
/// run_executor will determine whether or not we should sleep for a bit and try again.
pub async fn retrieve_blocks_from_node(
    client: &FuelClient,
    block_page_size: usize,
    cursor: &Option<String>,
    end_block: Option<u32>,
    indexer_uid: &str,
) -> IndexerResult<(Vec<BlockData>, Option<String>, bool)> {
    // Let's check if we need less blocks than block_page_size.
    let page_size = if let (Some(start), Some(end)) = (cursor, end_block) {
        if let Ok(start) = start.parse::<u32>() {
            if start >= end {
                return Err(IndexerError::EndBlockMet);
            }

            std::cmp::min((end - start) as usize, block_page_size)
        } else {
            block_page_size
        }
    } else {
        block_page_size
    };

    debug!("Fetching paginated results from {cursor:?}");

    let PaginatedResult {
        cursor,
        results,
        has_next_page,
        ..
    } = client
        .full_blocks(PaginationRequest {
            cursor: cursor.clone(),
            results: page_size,
            direction: PageDirection::Forward,
        })
        .await
        .unwrap_or_else(|e| {
            error!("Indexer({indexer_uid}) failed to retrieve blocks: {e:?}");
            // Setting an empty cursor will cause the indexer to sleep for a bit and try again.
            PaginatedResult {
                cursor: None,
                results: vec![],
                has_next_page: false,
                has_previous_page: false,
            }
        });

    let chain_id = client.chain_info().await?.consensus_parameters.chain_id;

    let mut block_info = Vec::new();
    for block in results.into_iter() {
        let producer: Option<Bytes32> = block.block_producer().map(|pk| pk.hash());

        let mut transactions = Vec::new();

        for trans in block.transactions {
            let receipts = trans
                .receipts
                .unwrap_or_default()
                .into_iter()
                .map(TryInto::try_into)
                .try_collect()
                .expect("Bad receipts.");

            let status = trans.status.expect("Bad transaction status.");
            // NOTE: https://github.com/FuelLabs/fuel-indexer/issues/286
            let status = match status.try_into().unwrap() {
                ClientTransactionStatus::Success {
                    block_id,
                    time,
                    program_state,
                } => {
                    let program_state = program_state.map(|p| match p {
                        ClientProgramState::Return(w) => ProgramState {
                            return_type: ReturnType::Return,
                            data: Bytes::from(w.to_le_bytes().to_vec()),
                        },
                        ClientProgramState::ReturnData(d) => ProgramState {
                            return_type: ReturnType::ReturnData,
                            data: Bytes::from(d.to_vec()),
                        },
                        ClientProgramState::Revert(w) => ProgramState {
                            return_type: ReturnType::Revert,
                            data: Bytes::from(w.to_le_bytes().to_vec()),
                        },
                        // Either `cargo watch` complains that this is unreachable, or `clippy` complains
                        // that all patterns are not matched. These other program states are only used in
                        // debug modes.
                        #[allow(unreachable_patterns)]
                        _ => unreachable!("Bad program state."),
                    });
                    TransactionStatus::Success {
                        block: block_id.parse().expect("Bad block height."),
                        time: time.to_unix() as u64,
                        program_state,
                    }
                }
                ClientTransactionStatus::Failure {
                    block_id,
                    time,
                    reason,
                    program_state,
                } => {
                    let program_state = program_state.map(|p| match p {
                        ClientProgramState::Return(w) => ProgramState {
                            return_type: ReturnType::Return,
                            data: Bytes::from(w.to_le_bytes().to_vec()),
                        },
                        ClientProgramState::ReturnData(d) => ProgramState {
                            return_type: ReturnType::ReturnData,
                            data: Bytes::from(d.to_vec()),
                        },
                        ClientProgramState::Revert(w) => ProgramState {
                            return_type: ReturnType::Revert,
                            data: Bytes::from(w.to_le_bytes().to_vec()),
                        },
                        // Either `cargo watch` complains that this is unreachable, or `clippy` complains
                        // that all patterns are not matched. These other program states are only used in
                        // debug modes.
                        #[allow(unreachable_patterns)]
                        _ => unreachable!("Bad program state."),
                    });
                    TransactionStatus::Failure {
                        block: block_id.parse().expect("Bad block ID."),
                        time: time.to_unix() as u64,
                        program_state,
                        reason,
                    }
                }
                ClientTransactionStatus::Submitted { submitted_at } => {
                    TransactionStatus::Submitted {
                        submitted_at: submitted_at.to_unix() as u64,
                    }
                }
                ClientTransactionStatus::SqueezedOut { reason } => {
                    TransactionStatus::SqueezedOut { reason }
                }
            };

            let transaction: fuel_tx::Transaction =
                fuel_tx::Transaction::from_bytes(trans.raw_payload.0 .0.as_slice())
                    .expect("Bad transaction.");

            let id = transaction.id(&chain_id);

            let transaction = match transaction {
                ClientTransaction::Create(tx) => Transaction::Create(Create {
                    gas_price: *tx.gas_price(),
                    gas_limit: *tx.gas_limit(),
                    maturity: *tx.maturity(),
                    bytecode_length: *tx.bytecode_length(),
                    bytecode_witness_index: *tx.bytecode_witness_index(),
                    storage_slots: tx
                        .storage_slots()
                        .iter()
                        .map(|x| StorageSlot {
                            key: <[u8; 32]>::from(*x.key()).into(),
                            value: <[u8; 32]>::from(*x.value()).into(),
                        })
                        .collect(),
                    inputs: tx.inputs().iter().map(|i| i.to_owned().into()).collect(),
                    outputs: tx.outputs().iter().map(|o| o.to_owned().into()).collect(),
                    witnesses: tx.witnesses().to_vec(),
                    salt: <[u8; 32]>::from(*tx.salt()).into(),
                    metadata: None,
                }),
                ClientTransaction::Script(tx) => Transaction::Script(Script {
                    gas_price: *tx.gas_price(),
                    gas_limit: *tx.gas_limit(),
                    maturity: *tx.maturity(),
                    script: (*tx.script().clone()).to_vec(),
                    script_data: (*tx.script_data().clone()).to_vec(),
                    inputs: tx.inputs().iter().map(|i| i.to_owned().into()).collect(),
                    outputs: tx.outputs().iter().map(|o| o.to_owned().into()).collect(),
                    witnesses: tx.witnesses().to_vec(),
                    receipts_root: <[u8; 32]>::from(*tx.receipts_root()).into(),
                    metadata: None,
                }),
                ClientTransaction::Mint(tx) => Transaction::Mint(Mint {
                    tx_pointer: tx.tx_pointer().to_owned().into(),
                    outputs: tx.outputs().iter().map(|o| o.to_owned().into()).collect(),
                    metadata: None,
                }),
            };

            let tx_data = TransactionData {
                receipts,
                status,
                transaction,
                id,
            };

            transactions.push(tx_data);
        }

        // TODO: https://github.com/FuelLabs/fuel-indexer/issues/286
        let consensus = match &block.consensus {
            ClientConsensus::Unknown => Consensus::Unknown,
            ClientConsensus::Genesis(g) => {
                let ClientGenesis {
                    chain_config_hash,
                    coins_root,
                    contracts_root,
                    messages_root,
                } = g.to_owned();

                Consensus::Genesis(Genesis {
                    chain_config_hash: <[u8; 32]>::from(
                        chain_config_hash.to_owned().0 .0,
                    )
                    .into(),
                    coins_root: <[u8; 32]>::from(coins_root.0 .0.to_owned()).into(),
                    contracts_root: <[u8; 32]>::from(contracts_root.0 .0.to_owned())
                        .into(),
                    messages_root: <[u8; 32]>::from(messages_root.0 .0.to_owned()).into(),
                })
            }
            ClientConsensus::PoAConsensus(poa) => Consensus::PoA(PoA {
                signature: <[u8; 64]>::from(poa.signature.0 .0.to_owned()).into(),
            }),
        };

        // TODO: https://github.com/FuelLabs/fuel-indexer/issues/286
        let block = BlockData {
            height: block.header.height.clone().into(),
            id: Bytes32::from(<[u8; 32]>::from(block.id.0 .0)),
            producer,
            time: block.header.time.0.to_unix(),
            consensus,
            header: Header {
                id: block.header.id.into(),
                da_height: block.header.da_height.0,
                transactions_count: block.header.transactions_count.into(),
                message_receipt_count: block.header.message_receipt_count.into(),
                transactions_root: block.header.transactions_root.into(),
                message_receipt_root: block.header.message_receipt_root.into(),
                height: block.header.height.into(),
                prev_root: block.header.prev_root.into(),
                time: block.header.time.0.to_unix(),
                application_hash: block.header.application_hash.into(),
            },
            transactions,
        };

        block_info.push(block);
    }

    Ok((block_info, cursor, has_next_page))
}

/// Executors are responsible for the actual indexing of data.
#[async_trait]
pub trait Executor
where
    Self: Sized,
{
    async fn handle_events(&mut self, blocks: Vec<BlockData>) -> IndexerResult<()>;

    fn manifest(&self) -> &Manifest;

    fn kill_switch(&self) -> &Arc<AtomicBool>;

    async fn get_error_message(&self) -> IndexerResult<String>;
}

/// WASM indexer runtime environment responsible for fetching/saving data to and from the database.
#[derive(Clone)]
pub struct IndexEnv {
    /// Memory allocated to this runtime.
    pub memory: Option<Memory>,

    /// Allocator function used to allocate memory for calls that fetch items from the database.
    pub alloc: Option<TypedFunction<u32, u32>>,

    /// Deallocator function used to deallocate memory after the associated `WasmArg` is dropped.
    pub dealloc: Option<TypedFunction<(u32, u32), ()>>,

    /// Reference to the connected database.
    pub db: Arc<Mutex<Database>>,

    /// Kill switch for this indexer. When true, the indexer service indicated
    /// that the indexer is being terminated.
    pub kill_switch: Arc<AtomicBool>,
}

impl IndexEnv {
    /// Create a new `IndexEnv`.
    pub async fn new(
        pool: IndexerConnectionPool,
        manifest: &Manifest,
        config: &IndexerConfig,
        kill_switch: Arc<AtomicBool>,
    ) -> IndexerResult<IndexEnv> {
        let db = Database::new(pool, manifest, config).await;
        Ok(IndexEnv {
            memory: None,
            alloc: None,
            dealloc: None,
            db: Arc::new(Mutex::new(db)),
            kill_switch,
        })
    }
}

/// WASM executors are the primary means of execution.
///
/// WASM executors contain a WASM module that is instantiated and executed by the indexer service on a
/// virtually infinite tokio task loop. These executors are responsible for allocating/deallocating their
/// own memory for calls in and out of the runtime.
#[derive(Debug)]
pub struct WasmIndexExecutor {
    /// Associated wasmer module instance.
    instance: Instance,

    /// Associated wasmer module.
    _module: Module,

    /// Associated wasmer store.
    store: Arc<Mutex<Store>>,

    /// Reference to the connected database.
    db: Arc<Mutex<Database>>,

    /// Number of metering points to use for this executor.
    metering_points: Option<u64>,

    /// Manifest of the indexer.
    manifest: Manifest,

    /// Kill switch. When set to true, the indexer must stop execution.
    kill_switch: Arc<AtomicBool>,
}

impl WasmIndexExecutor {
    /// Create a new `WasmIndexExecutor`.
    pub async fn new(
        config: &IndexerConfig,
        manifest: &Manifest,
        wasm_bytes: impl AsRef<[u8]>,
        pool: IndexerConnectionPool,
        schema_version: String,
    ) -> IndexerResult<Self> {
        let mut compiler_config = Cranelift::new();

        if let Some(metering_points) = config.metering_points {
            // `Metering` needs to be configured with a limit and a cost
            // function. For each `Operator`, the metering middleware will call
            // the cost function and subtract the cost from the remaining points.
            let metering =
                Arc::new(wasmer_middlewares::Metering::new(metering_points, |_| 1));
            compiler_config.push_middleware(metering);
        }

        let kill_switch = Arc::new(AtomicBool::new(false));

        let idx_env = IndexEnv::new(pool, manifest, config, kill_switch.clone()).await?;

        let db: Arc<Mutex<Database>> = idx_env.db.clone();

        let mut store = Store::new(compiler_config);

        let module = Module::new(&store, &wasm_bytes)?;

        let env = FunctionEnv::new(&mut store, idx_env);

        let mut imports = imports! {};
        for (export_name, export) in ffi::get_exports(&mut store, &env) {
            imports.define("env", &export_name, export.clone());
        }

        let instance = Instance::new(&mut store, &module, &imports)?;

        if !instance
            .exports
            .contains(ffi::MODULE_ENTRYPOINT.to_string())
        {
            return Err(IndexerError::MissingHandler);
        }

        // FunctionEnvMut and StoreMut must be scoped because they can't
        // be used across await
        {
            let schema_version_from_wasm = ffi::get_version(&mut store, &instance)?;

            let mut env_mut = env.clone().into_mut(&mut store);

            let (data_mut, store_mut) = env_mut.data_and_store_mut();

            if schema_version_from_wasm != schema_version {
                return Err(IndexerError::SchemaVersionMismatch(format!(
                    "Schema version from WASM {} does not match schema version from database {}",
                    schema_version_from_wasm, schema_version
                )));
            }

            data_mut.memory = Some(instance.exports.get_memory("memory")?.clone());
            data_mut.alloc = Some(
                instance
                    .exports
                    .get_typed_function(&store_mut, "alloc_fn")?,
            );
            data_mut.dealloc = Some(
                instance
                    .exports
                    .get_typed_function(&store_mut, "dealloc_fn")?,
            );
        }

        db.lock().await.load_schema(schema_version).await?;

        Ok(WasmIndexExecutor {
            instance,
            _module: module,
            store: Arc::new(Mutex::new(store)),
            db: db.clone(),
            metering_points: config.metering_points,
            manifest: manifest.clone(),
            kill_switch,
        })
    }

    /// Restore index from wasm file
    pub async fn from_file(
        p: impl AsRef<Path>,
        config: Option<IndexerConfig>,
        pool: IndexerConnectionPool,
    ) -> IndexerResult<WasmIndexExecutor> {
        let config = config.unwrap_or_default();
        let manifest = Manifest::from_file(p)?;
        let bytes = manifest.module_bytes()?;
        let schema_version = manifest.graphql_schema_content()?.version().to_string();
        Self::new(&config, &manifest, bytes, pool, schema_version).await
    }

    /// Create a new `WasmIndexExecutor`.
    pub async fn create(
        config: &IndexerConfig,
        manifest: &Manifest,
        pool: IndexerConnectionPool,
        schema_version: String,
        wasm_bytes: impl AsRef<[u8]>,
    ) -> IndexerResult<Self> {
        let uid = manifest.uid();

        let mut conn = pool.acquire().await?;
        queries::set_indexer_status(
            &mut conn,
            manifest.namespace(),
            manifest.identifier(),
            IndexerStatus::instantiating(),
        )
        .await?;

        match WasmIndexExecutor::new(
            config,
            manifest,
            wasm_bytes,
            pool.clone(),
            schema_version,
        )
        .await
        {
            Ok(executor) => Ok(executor),
            Err(e) => {
                error!("Could not instantiate WasmIndexExecutor({uid}): {e:?}.");
                let mut conn = pool.acquire().await?;
                queries::set_indexer_status(
                    &mut conn,
                    manifest.namespace(),
                    manifest.identifier(),
                    IndexerStatus::error(format!("{e}")),
                )
                .await?;
                Err(IndexerError::WasmExecutionInstantiationError)
            }
        }
    }

    /// Returns true if metering is enabled.
    pub fn metering_enabled(&self) -> bool {
        self.metering_points.is_some()
    }

    /// Returns true if metering is enabled metering points are exhausted.
    /// Otherwise returns false.
    pub async fn metering_points_exhausted(&self) -> bool {
        if self.metering_enabled() {
            self.get_remaining_metering_points().await.unwrap()
                == MeteringPoints::Exhausted
        } else {
            false
        }
    }

    /// Returns remaining metering points if metering is enabled. Otherwise, returns None.
    pub async fn get_remaining_metering_points(&self) -> Option<MeteringPoints> {
        if self.metering_enabled() {
            let mut store_guard = self.store.lock().await;
            let result = wasmer_middlewares::metering::get_remaining_points(
                &mut store_guard,
                &self.instance,
            );
            Some(result)
        } else {
            None
        }
    }

    /// Sets the remaining metering points if metering is enabled. Otherwise, returns an error.
    pub async fn set_metering_points(&self, metering_points: u64) -> IndexerResult<()> {
        if self.metering_enabled() {
            let mut store_guard = self.store.lock().await;
            wasmer_middlewares::metering::set_remaining_points(
                &mut store_guard,
                &self.instance,
                metering_points,
            );
            Ok(())
        } else {
            Err(anyhow::anyhow!(
                "Attempting to set metering points when metering is not enables"
                    .to_string(),
            )
            .into())
        }
    }
}

#[async_trait]
impl Executor for WasmIndexExecutor {
    /// Trigger a WASM event handler, passing in a serialized event struct.
    async fn handle_events(&mut self, blocks: Vec<BlockData>) -> IndexerResult<()> {
        if blocks.is_empty() {
            return Ok(());
        }

        if let Some(metering_points) = self.metering_points {
            self.set_metering_points(metering_points).await?
        }
        let bytes = serialize(&blocks);
        let uid = self.manifest.uid();

        let fun = {
            let store_guard = self.store.lock().await;
            self.instance.exports.get_typed_function::<(u32, u32), ()>(
                &store_guard,
                ffi::MODULE_ENTRYPOINT,
            )?
        };

        let _ = self.db.lock().await.start_transaction().await?;

        #[cfg(feature = "metrics")]
        let start = Instant::now();

        let res = spawn_blocking({
            let store = self.store.clone();
            let instance = self.instance.clone();
            let metering_enabled = self.metering_enabled();
            move || {
                let store_guard =
                    tokio::runtime::Handle::current().block_on(store.lock());
                let mut arg =
                    ffi::WasmArg::new(store_guard, instance, bytes, metering_enabled)
                        .unwrap();

                let ptr = arg.get_ptr();
                let len = arg.get_len();

                fun.call(&mut arg.store(), ptr, len)
            }
        })
        .await?;

        #[cfg(feature = "metrics")]
        {
            METRICS
                .exec
                .handler
                .record(&self.manifest.uid(), start.elapsed().as_millis() as f64);
        }

        if let Err(e) = res {
            if self.metering_points_exhausted().await {
                self.db.lock().await.revert_transaction().await?;
                return Err(IndexerError::RunTimeLimitExceededError);
            } else {
                if let Some(e) = e.downcast_ref::<WasmIndexerError>() {
                    match e {
                        // Termination due to kill switch is an expected behavior.
                        WasmIndexerError::KillSwitch => {
                            info!("Indexer({uid}) WASM execution terminated: {e}.")
                        }
                        _ => {
                            error!("Indexer({uid}) WASM execution failed: {e}.")
                        }
                    }
                } else {
                    error!("Indexer({uid}) WASM execution failed: {e:?}.");
                };
                self.db.lock().await.revert_transaction().await?;
                return Err(IndexerError::from(e));
            }
        } else {
            // Do not commit if kill switch has been triggered.
            if self.kill_switch.load(Ordering::SeqCst) {
                self.db.lock().await.revert_transaction().await?;
            } else {
                self.db.lock().await.commit_transaction().await?;
            }
        }

        Ok(())
    }

    fn kill_switch(&self) -> &Arc<AtomicBool> {
        &self.kill_switch
    }

    fn manifest(&self) -> &Manifest {
        &self.manifest
    }

    async fn get_error_message(&self) -> IndexerResult<String> {
        let mut store = self.store.lock().await;
        let result = ffi::get_error_message(&mut store, &self.instance)?;
        Ok(result)
    }
}