1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
// Copyright (c) Aptos
// SPDX-License-Identifier: Apache-2.0

use crate::delta_ext::{addition, deserialize, serialize, subtraction, DeltaOp};
use aptos_crypto::hash::DefaultHasher;
use aptos_types::vm_status::StatusCode;
use better_any::{Tid, TidAble};
use move_deps::{
    move_binary_format::errors::{PartialVMError, PartialVMResult},
    move_core_types::account_address::AccountAddress,
    move_table_extension::{TableHandle, TableResolver},
    move_vm_runtime::{
        native_functions,
        native_functions::{NativeContext, NativeFunctionTable},
    },
    move_vm_types::{
        loaded_data::runtime_types::Type,
        natives::function::NativeResult,
        pop_arg,
        values::{Reference, Struct, StructRef, Value},
    },
};
use smallvec::smallvec;
use std::{
    cell::RefCell,
    collections::{BTreeMap, BTreeSet, VecDeque},
    convert::TryInto,
    sync::Arc,
};

/// Describes the state of each aggregator instance.
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum AggregatorState {
    // If aggregator stores a known value.
    Data,
    // If aggregator stores a non-negative delta.
    PositiveDelta,
}

/// Uniquely identifies each aggregator instance in storage.
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct AggregatorID {
    // A handle that is shared accross all aggregator instances created by the
    // same `AggregatorFactory` and which is used for fine-grained storage
    // access.
    pub handle: u128,
    // Unique key associated with each aggregator instance. Generated by
    // taking the hash of transaction which creates an aggregator and the
    // number of aggregators that were created by this transaction so far.
    pub key: u128,
}

impl AggregatorID {
    fn new(handle: u128, key: u128) -> Self {
        AggregatorID { handle, key }
    }
}

/// Internal aggregator data structure.
struct Aggregator {
    // Describes a value of an aggregator.
    value: u128,
    // Describes a state of an aggregator.
    state: AggregatorState,
    // Describes an upper bound of an aggregator. If `value` exceeds it, the
    // aggregator overflows.
    // TODO: Currently this is a single u128 value since we use 0 as a trivial
    // lower bound. If we want to support custom lower bounds, or have more
    // complex postconditions, we should factor this out in its own struct.
    limit: u128,
}

impl Aggregator {
    /// Implements logic for adding to an aggregator.
    fn add(&mut self, value: u128) -> PartialVMResult<()> {
        // At this point, aggregator holds a positive delta or knows the value.
        // Hence, we can add, of course checking for overflow.
        self.value = addition(self.value, value, self.limit)?;
        Ok(())
    }

    /// Implements logic for subtracting from an aggregator.
    fn sub(&mut self, value: u128) -> PartialVMResult<()> {
        match self.state {
            AggregatorState::Data => {
                // Aggregator knows the value, therefore we can subtract
                // checking we don't drop below zero.
                self.value = subtraction(self.value, value)?;
                Ok(())
            }
            // For now, `aggregator::sub` always materializes the value, so
            // this should be unreachable.
            // TODO: support non-materialized subtractions.
            AggregatorState::PositiveDelta => {
                unreachable!("subtraction always materializes the value")
            }
        }
    }

    /// Implements logic for materializing the value of an aggregator. As a
    /// result, the aggregator knows it value (i.e. its state changed to
    /// `Data`).
    fn materialize(
        &mut self,
        context: &NativeAggregatorContext,
        id: &AggregatorID,
    ) -> PartialVMResult<()> {
        // If aggregator has already been materialized, return immediately.
        if self.state == AggregatorState::Data {
            return Ok(());
        }

        // Otherwise, we have a delta and have to go to storage and apply it.
        // In theory, any delta will be applied to existing value. However,
        // something may go wrong, so we guard by throwing an error in
        // extension.
        let key_bytes = serialize(&id.key);
        context
            .resolver
            .resolve_table_entry(&TableHandle(id.handle), &key_bytes)
            .map_err(|_| extension_error("could not find the value of the aggregator"))?
            .map_or(
                Err(extension_error(
                    "could not find the value of the aggregator",
                )),
                |bytes| {
                    // The only remaining case is PositiveDelta. Assert just in
                    // case.
                    debug_assert!(self.state == AggregatorState::PositiveDelta);

                    // Get the value from the storage and try to apply the delta
                    // to it. If application succeeds, we change the state of the
                    // aggregator. Otherwise the error is propagated to the caller.
                    let base = deserialize(&bytes);
                    self.value = addition(base, self.value, self.limit)?;
                    self.state = AggregatorState::Data;
                    Ok(())
                },
            )
    }
}

/// Stores all information about aggregators (how many have been created or
/// removed), what are their states, etc. per context (i.e. single
/// transaction).
#[derive(Default)]
struct AggregatorData {
    // All aggregators that were created in the current context, stored as ids.
    // Used to filter out aggregators that were created and destroyed in the
    // same context (i.e. within a single transaction).
    new_aggregators: BTreeSet<AggregatorID>,
    // All aggregators that were destroyed in the current context, stored as ids.
    destroyed_aggregators: BTreeSet<AggregatorID>,
    // All aggregator instances that exist in the current context.
    aggregators: BTreeMap<AggregatorID, Aggregator>,
}

impl AggregatorData {
    /// Returns a mutable reference to an aggregator with `id` and a `limit`.
    /// If it does not exist in the current context (i.e. transaction that is
    /// currently executing did not initilize it), a new aggregator instance is
    /// created, with a zero-initialized value and in a delta state.
    /// Note: when we say "aggregator instance" here we refer to Rust struct and
    /// not to the Move aggregator.
    fn get_aggregator(&mut self, id: AggregatorID, limit: u128) -> &mut Aggregator {
        self.aggregators.entry(id).or_insert_with(|| Aggregator {
            value: 0,
            state: AggregatorState::PositiveDelta,
            limit,
        });
        self.aggregators.get_mut(&id).unwrap()
    }

    /// Returns the number of aggregators that are used in the current context.
    fn num_aggregators(&self) -> u128 {
        self.aggregators.len() as u128
    }

    /// Creates and a new Aggregator with a given `id` and a `limit`. The value
    /// of a new aggregator is always known, therefore it is created in a data
    /// state, with a zero-initialized value.
    fn create_new_aggregator(&mut self, id: AggregatorID, limit: u128) {
        let aggregator = Aggregator {
            value: 0,
            state: AggregatorState::Data,
            limit,
        };
        self.aggregators.insert(id, aggregator);
        self.new_aggregators.insert(id);
    }

    /// If aggregator has been used in this context, it is removed. Otherwise,
    /// it is marked for deletion.
    fn remove_aggregator(&mut self, id: AggregatorID) {
        // Aggregator no longer in use during this context: remove it.
        self.aggregators.remove(&id);

        if self.new_aggregators.contains(&id) {
            // Aggregator has been created in the same context. Therefore, no
            // side-effects.
            self.new_aggregators.remove(&id);
        } else {
            // Otherwise, aggregator has been created somewhere else.
            self.destroyed_aggregators.insert(id);
        }
    }
}

/// Represents a single aggregator change.
#[derive(Debug)]
pub enum AggregatorChange {
    // A value should be written to storage.
    Write(u128),
    // A delta should be merged with the value from storage.
    Merge(DeltaOp),
    // A value should be deleted from the storage.
    Delete,
}

/// Represents changes made by all aggregators during this context. This change
/// set can be converted into appropriate `WriteSet` and `DeltaChangeSet` by the
/// user, e.g. VM session.
pub struct AggregatorChangeSet {
    pub changes: BTreeMap<AggregatorID, AggregatorChange>,
}

/// Native context that can be attached to VM `NativeContextExtensions`.
///
/// Note: table resolver is reused for fine-grained storage access.
#[derive(Tid)]
pub struct NativeAggregatorContext<'a> {
    txn_hash: u128,
    resolver: &'a dyn TableResolver,
    aggregator_data: RefCell<AggregatorData>,
}

impl<'a> NativeAggregatorContext<'a> {
    /// Creates a new instance of a native aggregator context. This must be
    /// passed into VM session.
    pub fn new(txn_hash: u128, resolver: &'a dyn TableResolver) -> Self {
        Self {
            txn_hash,
            resolver,
            aggregator_data: Default::default(),
        }
    }

    /// Returns all changes made within this context (i.e. by a single
    /// transaction).
    pub fn into_change_set(self) -> AggregatorChangeSet {
        let NativeAggregatorContext {
            aggregator_data, ..
        } = self;
        let AggregatorData {
            destroyed_aggregators,
            aggregators,
            ..
        } = aggregator_data.into_inner();

        let mut changes = BTreeMap::new();

        // First, process all writes and deltas.
        for (id, aggregator) in aggregators {
            let Aggregator {
                value,
                state,
                limit,
            } = aggregator;

            let change = match state {
                AggregatorState::Data => AggregatorChange::Write(value),
                AggregatorState::PositiveDelta => {
                    let delta_op = DeltaOp::Addition { value, limit };
                    AggregatorChange::Merge(delta_op)
                }
            };
            changes.insert(id, change);
        }

        // Additionally, do not forget to delete destroyed values from storage.
        for id in destroyed_aggregators {
            changes.insert(id, AggregatorChange::Delete);
        }

        AggregatorChangeSet { changes }
    }
}

// ================================= Natives =================================

/// All aggregator native functions. For more details, refer to code in
/// `aggregator_factory.move` and `aggregator.move`.
pub fn aggregator_natives(aggregator_addr: AccountAddress) -> NativeFunctionTable {
    native_functions::make_table(
        aggregator_addr,
        &[
            ("aggregator", "add", Arc::new(native_add)),
            ("aggregator", "read", Arc::new(native_read)),
            ("aggregator", "destroy", Arc::new(native_destroy)),
            ("aggregator", "sub", Arc::new(native_sub)),
            (
                "aggregator_factory",
                "new_aggregator",
                Arc::new(native_new_aggregator),
            ),
        ],
    )
}

/// Move signature:
/// fun new_aggregator(
///   aggregator_factory: &mut AggregatorFactory,
///   limit: u128
/// ): Aggregator;
fn native_new_aggregator(
    context: &mut NativeContext,
    _ty_args: Vec<Type>,
    mut args: VecDeque<Value>,
) -> PartialVMResult<NativeResult> {
    if !cfg!(any(test, feature = "aggregator-extension")) {
        return Err(not_supported_error());
    }
    assert!(args.len() == 2);

    // Extract fields: `limit` of the new aggregator and a `phantom_handle` of
    // the parent factory.
    let limit = pop_arg!(args, u128);
    let handle = get_handle(&pop_arg!(args, StructRef))?;

    // Get the current aggregator data.
    let aggregator_context = context.extensions().get::<NativeAggregatorContext>();
    let mut aggregator_data = aggregator_context.aggregator_data.borrow_mut();

    // Every aggregator instance uses a unique key in its id. Here we can reuse
    // the strategy from `table` implementation: taking hash of transaction and
    // number of aggregator instances created so far and truncating them to
    // 128 bits.
    let txn_hash_buffer = u128::to_be_bytes(aggregator_context.txn_hash);
    let num_aggregators_buffer = u128::to_be_bytes(aggregator_data.num_aggregators());

    let mut hasher = DefaultHasher::new(&[0_u8; 0]);
    hasher.update(&txn_hash_buffer);
    hasher.update(&num_aggregators_buffer);
    let hash = hasher.finish();

    // TODO: Using u128 is not enough, and it should be u256 instead. For now,
    // just take first 16 bytes of the hash.
    let bytes = &hash.to_vec()[..16];
    let key = u128::from_be_bytes(bytes.try_into().expect("not enough bytes"));

    let id = AggregatorID::new(handle, key);
    aggregator_data.create_new_aggregator(id, limit);

    // TODO: charge gas properly.
    Ok(NativeResult::ok(
        0,
        smallvec![Value::struct_(Struct::pack(vec![
            Value::u128(handle),
            Value::u128(key),
            Value::u128(limit),
        ]))],
    ))
}

/// Move signature:
/// fun add(aggregator: &mut Aggregator, value: u128);
fn native_add(
    context: &mut NativeContext,
    _ty_args: Vec<Type>,
    mut args: VecDeque<Value>,
) -> PartialVMResult<NativeResult> {
    if !cfg!(any(test, feature = "aggregator-extension")) {
        return Err(not_supported_error());
    }
    assert!(args.len() == 2);

    // Get aggregator fields and a value to add.
    let value = pop_arg!(args, u128);
    let aggregator_ref = pop_arg!(args, StructRef);
    let (handle, key, limit) = get_aggregator_fields(&aggregator_ref)?;
    let id = AggregatorID::new(handle, key);

    // Get aggregator.
    let aggregator_context = context.extensions().get::<NativeAggregatorContext>();
    let mut aggregator_data = aggregator_context.aggregator_data.borrow_mut();
    let aggregator = aggregator_data.get_aggregator(id, limit);

    aggregator.add(value)?;

    // TODO: charge gas properly.
    Ok(NativeResult::ok(0, smallvec![]))
}

/// Move signature:
/// fun read(aggregator: &Aggregator): u128;
fn native_read(
    context: &mut NativeContext,
    _ty_args: Vec<Type>,
    mut args: VecDeque<Value>,
) -> PartialVMResult<NativeResult> {
    if !cfg!(any(test, feature = "aggregator-extension")) {
        return Err(not_supported_error());
    }
    assert!(args.len() == 1);
    let aggregator_ref = pop_arg!(args, StructRef);

    // Extract fields from aggregator struct reference.
    let (handle, key, limit) = get_aggregator_fields(&aggregator_ref)?;
    let id = AggregatorID::new(handle, key);

    // Get aggregator.
    let aggregator_context = context.extensions().get::<NativeAggregatorContext>();
    let mut aggregator_data = aggregator_context.aggregator_data.borrow_mut();
    let aggregator = aggregator_data.get_aggregator(id, limit);

    // Materialize the value.
    aggregator.materialize(aggregator_context, &id)?;

    // TODO: charge gas properly.
    Ok(NativeResult::ok(
        0,
        smallvec![Value::u128(aggregator.value)],
    ))
}

/// Move signature:
/// fun sub(aggregator: &mut Aggregator, value: u128);
fn native_sub(
    context: &mut NativeContext,
    _ty_args: Vec<Type>,
    mut args: VecDeque<Value>,
) -> PartialVMResult<NativeResult> {
    if !cfg!(any(test, feature = "aggregator-extension")) {
        return Err(not_supported_error());
    }
    assert!(args.len() == 2);

    // Get aggregator fields and a value to subtract.
    let value = pop_arg!(args, u128);
    let aggregator_ref = pop_arg!(args, StructRef);
    let (handle, key, limit) = get_aggregator_fields(&aggregator_ref)?;
    let id = AggregatorID::new(handle, key);

    // Get aggregator.
    let aggregator_context = context.extensions().get::<NativeAggregatorContext>();
    let mut aggregator_data = aggregator_context.aggregator_data.borrow_mut();
    let aggregator = aggregator_data.get_aggregator(id, limit);

    // For first version of `Aggregator` (V1), subtraction always materializes
    // the value first. While this limits commutativity, it is sufficient for
    // now.
    // TODO: change this when we implement commutative subtraction.
    aggregator.materialize(aggregator_context, &id)?;
    aggregator.sub(value)?;

    // TODO: charge gas properly.
    Ok(NativeResult::ok(0, smallvec![]))
}

/// Move signature:
/// fun destroy(aggregator: Aggregator);
fn native_destroy(
    context: &mut NativeContext,
    _ty_args: Vec<Type>,
    mut args: VecDeque<Value>,
) -> PartialVMResult<NativeResult> {
    if !cfg!(any(test, feature = "aggregator-extension")) {
        return Err(not_supported_error());
    }
    assert!(args.len() == 1);

    // First, unpack the struct.
    let aggregator_struct = pop_arg!(args, Struct);
    let (handle, key, _) = unpack_aggregator_struct(aggregator_struct)?;

    // Get aggregator data.
    let aggregator_context = context.extensions().get::<NativeAggregatorContext>();
    let mut aggregator_data = aggregator_context.aggregator_data.borrow_mut();

    // Actually remove the aggregator.
    let id = AggregatorID::new(handle, key);
    aggregator_data.remove_aggregator(id);

    // TODO: charge gas properly.
    Ok(NativeResult::ok(0, smallvec![]))
}

// ================================ Utilities ================================

/// The index of the `phantom_table` field in the `AggregatorFactory` Move
/// struct.
const PHANTOM_TABLE_FIELD_INDEX: usize = 0;

/// The index of the `handle` field in the `Table` Move struct.
const TABLE_HANDLE_FIELD_INDEX: usize = 0;

/// Indices of `handle`, `key` and `limit` fields in the `Aggregator` Move
/// struct.
const HANDLE_FIELD_INDEX: usize = 0;
const KEY_FIELD_INDEX: usize = 1;
const LIMIT_FIELD_INDEX: usize = 2;

/// Given a reference to `AggregatorFactory` Move struct, returns the value of
/// `handle` field (from underlying `Table` struct).
fn get_handle(aggregator_table: &StructRef) -> PartialVMResult<u128> {
    aggregator_table
        .borrow_field(PHANTOM_TABLE_FIELD_INDEX)?
        .value_as::<StructRef>()?
        .borrow_field(TABLE_HANDLE_FIELD_INDEX)?
        .value_as::<Reference>()?
        .read_ref()?
        .value_as::<u128>()
}

/// Given a reference to `Aggregator` Move struct returns a field value at `index`.
fn get_aggregator_field(aggregator: &StructRef, index: usize) -> PartialVMResult<Value> {
    let field_ref = aggregator.borrow_field(index)?.value_as::<Reference>()?;
    field_ref.read_ref()
}

/// Given a reference to `Aggregator` Move struct, returns a tuple of its
/// fields: (`handle`, `key`, `limit`).
fn get_aggregator_fields(aggregator: &StructRef) -> PartialVMResult<(u128, u128, u128)> {
    let handle = get_aggregator_field(aggregator, HANDLE_FIELD_INDEX)?.value_as::<u128>()?;
    let key = get_aggregator_field(aggregator, KEY_FIELD_INDEX)?.value_as::<u128>()?;
    let limit = get_aggregator_field(aggregator, LIMIT_FIELD_INDEX)?.value_as::<u128>()?;
    Ok((handle, key, limit))
}

/// Given an `Aggregator` Move struct, unpacks it into fields: (`handle`, `key`, `limit`).
fn unpack_aggregator_struct(aggregator_struct: Struct) -> PartialVMResult<(u128, u128, u128)> {
    let mut fields: Vec<Value> = aggregator_struct.unpack()?.collect();
    assert!(fields.len() == 3);

    let pop_with_err = |vec: &mut Vec<Value>, msg: &str| {
        vec.pop()
            .map_or(Err(extension_error(msg)), |v| v.value_as::<u128>())
    };

    let limit = pop_with_err(&mut fields, "unable to pop 'limit' field")?;
    let key = pop_with_err(&mut fields, "unable to pop 'key' field")?;
    let handle = pop_with_err(&mut fields, "unable to pop 'handle' field")?;
    Ok((handle, key, limit))
}

/// Returns partial VM error on extension failure.
fn extension_error(message: impl ToString) -> PartialVMError {
    PartialVMError::new(StatusCode::VM_EXTENSION_ERROR).with_message(message.to_string())
}

/// When aggregator feature is not supported.
const ENOT_SUPPORTED: u64 = 0x0C_0003;

/// Returns partial VM error when experimental feature is not supported.
fn not_supported_error() -> PartialVMError {
    PartialVMError::new(StatusCode::ABORTED)
        .with_message("this experimental feature is not supported".to_string())
        .with_sub_status(ENOT_SUPPORTED)
}

// ================================= Tests =================================

#[cfg(test)]
mod test {
    use super::*;
    use aptos_state_view::StateView;
    use aptos_types::state_store::{state_key::StateKey, table::TableHandle as AptosTableHandle};
    use claim::{assert_err, assert_matches, assert_ok};
    use move_deps::move_table_extension::TableOperation;
    use once_cell::sync::Lazy;
    use std::collections::HashMap;

    #[derive(Default)]
    pub struct FakeTestStorage {
        data: HashMap<StateKey, Vec<u8>>,
    }

    impl FakeTestStorage {
        fn new() -> Self {
            let mut data = HashMap::new();

            // Initialize storage with some test data.
            data.insert(id_to_state_key(test_id(4)), serialize(&900));
            data.insert(id_to_state_key(test_id(5)), serialize(&5));
            FakeTestStorage { data }
        }
    }

    impl StateView for FakeTestStorage {
        fn get_state_value(&self, state_key: &StateKey) -> anyhow::Result<Option<Vec<u8>>> {
            Ok(self.data.get(state_key).cloned())
        }

        fn is_genesis(&self) -> bool {
            self.data.is_empty()
        }
    }

    impl TableResolver for FakeTestStorage {
        fn resolve_table_entry(
            &self,
            handle: &TableHandle,
            key: &[u8],
        ) -> Result<Option<Vec<u8>>, anyhow::Error> {
            let state_key = StateKey::table_item(AptosTableHandle::from(*handle), key.to_vec());
            self.get_state_value(&state_key)
        }

        fn operation_cost(&self, _op: TableOperation, _key_size: usize, _val_size: usize) -> u64 {
            1
        }
    }

    fn test_id(key: u128) -> AggregatorID {
        AggregatorID::new(0, key)
    }

    fn id_to_state_key(id: AggregatorID) -> StateKey {
        let key_bytes = serialize(&id.key);
        StateKey::table_item(AptosTableHandle(id.handle), key_bytes)
    }

    fn test_set_up(context: &NativeAggregatorContext) {
        let mut aggregator_data = context.aggregator_data.borrow_mut();

        // Aggregators with data.
        aggregator_data.create_new_aggregator(test_id(0), 1000);
        aggregator_data.create_new_aggregator(test_id(1), 1000);
        aggregator_data.create_new_aggregator(test_id(2), 1000);

        // Aggregators with delta.
        aggregator_data.get_aggregator(test_id(3), 1000);
        aggregator_data.get_aggregator(test_id(4), 1000);
        aggregator_data.get_aggregator(test_id(5), 10);

        // Different cases of aggregator removal.
        aggregator_data.remove_aggregator(test_id(0));
        aggregator_data.remove_aggregator(test_id(3));
        aggregator_data.remove_aggregator(test_id(6));
    }

    #[allow(clippy::redundant_closure)]
    static TEST_RESOLVER: Lazy<FakeTestStorage> = Lazy::new(|| FakeTestStorage::new());

    #[test]
    fn test_into_change_set() {
        let context = NativeAggregatorContext::new(0, &*TEST_RESOLVER);
        test_set_up(&context);

        let AggregatorChangeSet { changes } = context.into_change_set();

        assert!(!changes.contains_key(&test_id(0)));

        assert_matches!(
            changes.get(&test_id(1)).unwrap(),
            AggregatorChange::Write(0)
        );
        assert_matches!(
            changes.get(&test_id(2)).unwrap(),
            AggregatorChange::Write(0)
        );

        assert_matches!(changes.get(&test_id(3)).unwrap(), AggregatorChange::Delete);

        assert_matches!(
            changes.get(&test_id(4)).unwrap(),
            AggregatorChange::Merge(DeltaOp::Addition {
                value: 0,
                limit: 1000
            })
        );
        assert_matches!(
            changes.get(&test_id(5)).unwrap(),
            AggregatorChange::Merge(DeltaOp::Addition {
                value: 0,
                limit: 10
            })
        );

        assert_matches!(changes.get(&test_id(6)).unwrap(), AggregatorChange::Delete);
    }

    #[test]
    fn test_aggregator_natives() {
        let context = NativeAggregatorContext::new(0, &*TEST_RESOLVER);
        test_set_up(&context);

        let mut aggregator_data = context.aggregator_data.borrow_mut();

        // This aggregator has been created during this context, hence the
        // value is known.
        let aggregator = aggregator_data.get_aggregator(test_id(1), 1000);
        assert_matches!(aggregator.state, AggregatorState::Data);
        assert_eq!(aggregator.value, 0);

        assert_ok!(aggregator.add(100));
        assert_ok!(aggregator.add(900));
        assert_matches!(aggregator.state, AggregatorState::Data);
        assert_eq!(aggregator.value, 1000);

        // Overflow!
        assert_err!(aggregator.add(1));

        // This aggregator has not been created during this context, and contains
        // an unknown value.
        let aggregator = aggregator_data.get_aggregator(test_id(4), 1000);
        assert_matches!(aggregator.state, AggregatorState::PositiveDelta);
        assert_eq!(aggregator.value, 0);

        assert_ok!(aggregator.add(100));
        assert_ok!(aggregator.add(100));
        assert_matches!(aggregator.state, AggregatorState::PositiveDelta);
        assert_eq!(aggregator.value, 200);

        // 900 + 200 > 1000!
        assert_err!(aggregator.materialize(&context, &test_id(4)));

        // This aggregator also has not been created during this context, and
        // contains an unknown value.
        let aggregator = aggregator_data.get_aggregator(test_id(5), 10);
        assert_matches!(aggregator.state, AggregatorState::PositiveDelta);
        assert_eq!(aggregator.value, 0);

        assert_ok!(aggregator.add(2));
        assert_matches!(aggregator.state, AggregatorState::PositiveDelta);
        assert_eq!(aggregator.value, 2);

        assert_ok!(aggregator.materialize(&context, &test_id(5)));
        assert_matches!(aggregator.state, AggregatorState::Data);
        assert_eq!(aggregator.value, 7);

        assert_ok!(aggregator.sub(7));
        assert_matches!(aggregator.state, AggregatorState::Data);
        assert_eq!(aggregator.value, 0);
    }
}