solana_unified_scheduler_logic/
lib.rs

1#![allow(rustdoc::private_intra_doc_links)]
2//! The task (transaction) scheduling code for the unified scheduler
3//!
4//! ### High-level API and design
5//!
6//! The most important type is [`SchedulingStateMachine`]. It takes new tasks (= transactions) and
7//! may return back them if runnable via
8//! [`::schedule_task()`](SchedulingStateMachine::schedule_task) while maintaining the account
9//! readonly/writable lock rules. Those returned runnable tasks are guaranteed to be safe to
10//! execute in parallel. Lastly, `SchedulingStateMachine` should be notified about the completion
11//! of the exeuction via [`::deschedule_task()`](SchedulingStateMachine::deschedule_task), so that
12//! conflicting tasks can be returned from
13//! [`::schedule_next_unblocked_task()`](SchedulingStateMachine::schedule_next_unblocked_task) as
14//! newly-unblocked runnable ones.
15//!
16//! The design principle of this crate (`solana-unified-scheduler-logic`) is simplicity for the
17//! separation of concern. It is interacted only with a few of its public API by
18//! `solana-unified-scheduler-pool`. This crate doesn't know about banks, slots, solana-runtime,
19//! threads, crossbeam-channel at all. Becasue of this, it's deterministic, easy-to-unit-test, and
20//! its perf footprint is well understood. It really focuses on its single job: sorting
21//! transactions in executable order.
22//!
23//! ### Algorithm
24//!
25//! The algorithm can be said it's based on per-address FIFO queues, which are updated every time
26//! both new task is coming (= called _scheduling_) and runnable (= _post-scheduling_) task is
27//! finished (= called _descheduling_).
28//!
29//! For the _non-conflicting scheduling_ case, the story is very simple; it just remembers that all
30//! of accessed addresses are write-locked or read-locked with the number of active (=
31//! _currently-scheduled-and-not-descheduled-yet_) tasks. Correspondingly, descheduling does the
32//! opposite book-keeping process, regardless whether a finished task has been conflicted or not.
33//!
34//! For the _conflicting scheduling_ case, it remembers that each of **non-conflicting addresses**
35//! like the non-conflicting case above. As for **conflicting addresses**, each task is recorded to
36//! respective FIFO queues attached to the (conflicting) addresses. Importantly, the number of
37//! conflicting addresses of the conflicting task is also remembered.
38//!
39//! The last missing piece is that the scheduler actually tries to reschedule previously blocked
40//! tasks while deschduling, in addition to the above-mentioned book-keeping processing. Namely,
41//! when given address is ready for new fresh locking resulted from descheduling a task (i.e. write
42//! lock is released or read lock count has reached zero), it pops out the first element of the
43//! FIFO blocked-task queue of the address. Then, it immediately marks the address as relocked. It
44//! also decrements the number of conflicting addresses of the popped-out task. As the final step,
45//! if the number reaches to the zero, it means the task has fully finished locking all of its
46//! addresses and is directly routed to be runnable. Lastly, if the next first element of the
47//! blocked-task queue is trying to read-lock the address like the popped-out one, this
48//! rescheduling is repeated as an optimization to increase parallelism of task execution.
49//!
50//! Put differently, this algorithm tries to gradually lock all of addresses of tasks at different
51//! timings while not deviating the execution order from the original task ingestion order. This
52//! implies there's no locking retries in general, which is the primary source of non-linear perf.
53//! degration.
54//!
55//! As a ballpark number from a synthesized micro benchmark on usual CPU for `mainnet-beta`
56//! validators, it takes roughly 100ns to schedule and deschedule a transaction with 10 accounts.
57//! And 1us for a transaction with 100 accounts. Note that this excludes crossbeam communication
58//! overhead at all. That's said, it's not unrealistic to say the whole unified scheduler can
59//! attain 100k-1m tps overall, assuming those transaction executions aren't bottlenecked.
60//!
61//! ### Runtime performance characteristics and data structure arrangement
62//!
63//! Its algorithm is very fast for high throughput, real-time for low latency. The whole
64//! unified-scheduler architecture is designed from grounds up to support the fastest execution of
65//! this scheduling code. For that end, unified scheduler pre-loads address-specific locking state
66//! data structures (called [`UsageQueue`]) for all of transaction's accounts, in order to offload
67//! the job to other threads from the scheduler thread. This preloading is done inside
68//! [`create_task()`](SchedulingStateMachine::create_task). In this way, task scheduling
69//! computational complexity is basically reduced to several word-sized loads and stores in the
70//! schduler thread (i.e.  constant; no allocations nor syscalls), while being proportional to the
71//! number of addresses in a given transaction. Note that this statement is held true, regardless
72//! of conflicts. This is because the preloading also pre-allocates some scratch-pad area
73//! ([`blocked_usages_from_tasks`](UsageQueueInner::blocked_usages_from_tasks)) to stash blocked
74//! ones. So, a conflict only incurs some additional fixed number of mem stores, within error
75//! margin of the constant complexity. And additional memory allocation for the scratchpad could
76//! said to be amortized, if such an unusual event should occur.
77//!
78//! [`Arc`] is used to implement this preloading mechanism, because `UsageQueue`s are shared across
79//! tasks accessing the same account, and among threads due to the preloading. Also, interior
80//! mutability is needed. However, `SchedulingStateMachine` doesn't use conventional locks like
81//! RwLock.  Leveraging the fact it's the only state-mutating exclusive thread, it instead uses
82//! `UnsafeCell`, which is sugar-coated by a tailored wrapper called [`TokenCell`]. `TokenCell`
83//! imposes an overly restrictive aliasing rule via rust type system to maintain the memory safety.
84//! By localizing any synchronization to the message passing, the scheduling code itself attains
85//! maximally possible single-threaed execution without stalling cpu pipelines at all, only
86//! constrained to mem access latency, while efficiently utilizing L1-L3 cpu cache with full of
87//! `UsageQueue`s.
88//!
89//! ### Buffer bloat insignificance
90//!
91//! The scheduler code itself doesn't care about the buffer bloat problem, which can occur in
92//! unified scheduler, where a run of heavily linearized and blocked tasks could be severely
93//! hampered by very large number of interleaved runnable tasks along side.  The reason is again
94//! for separation of concerns. This is acceptable because the scheduling code itself isn't
95//! susceptible to the buffer bloat problem by itself as explained by the description and validated
96//! by the mentioned benchmark above. Thus, this should be solved elsewhere, specifically at the
97//! scheduler pool.
98use {
99    crate::utils::{ShortCounter, Token, TokenCell},
100    assert_matches::assert_matches,
101    solana_sdk::{pubkey::Pubkey, transaction::SanitizedTransaction},
102    static_assertions::const_assert_eq,
103    std::{collections::VecDeque, mem, sync::Arc},
104};
105
106/// Internal utilities. Namely this contains [`ShortCounter`] and [`TokenCell`].
107mod utils {
108    use std::{
109        any::{self, TypeId},
110        cell::{RefCell, UnsafeCell},
111        collections::BTreeSet,
112        marker::PhantomData,
113        thread,
114    };
115
116    /// A really tiny counter to hide `.checked_{add,sub}` all over the place.
117    ///
118    /// It's caller's reponsibility to ensure this (backed by [`u32`]) never overflow.
119    #[derive(Debug, Clone, Copy)]
120    pub(super) struct ShortCounter(u32);
121
122    impl ShortCounter {
123        pub(super) fn zero() -> Self {
124            Self(0)
125        }
126
127        pub(super) fn one() -> Self {
128            Self(1)
129        }
130
131        pub(super) fn is_one(&self) -> bool {
132            self.0 == 1
133        }
134
135        pub(super) fn is_zero(&self) -> bool {
136            self.0 == 0
137        }
138
139        pub(super) fn current(&self) -> u32 {
140            self.0
141        }
142
143        #[must_use]
144        pub(super) fn increment(self) -> Self {
145            Self(self.0.checked_add(1).unwrap())
146        }
147
148        #[must_use]
149        pub(super) fn decrement(self) -> Self {
150            Self(self.0.checked_sub(1).unwrap())
151        }
152
153        pub(super) fn increment_self(&mut self) -> &mut Self {
154            *self = self.increment();
155            self
156        }
157
158        pub(super) fn decrement_self(&mut self) -> &mut Self {
159            *self = self.decrement();
160            self
161        }
162
163        pub(super) fn reset_to_zero(&mut self) -> &mut Self {
164            self.0 = 0;
165            self
166        }
167    }
168
169    /// A conditionally [`Send`]-able and [`Sync`]-able cell leveraging scheduler's one-by-one data
170    /// access pattern with zero runtime synchronization cost.
171    ///
172    /// To comply with Rust's aliasing rules, these cells require a carefully-created [`Token`] to
173    /// be passed around to access the inner values. The token is a special-purpose phantom object
174    /// to get rid of its inherent `unsafe`-ness in [`UnsafeCell`], which is internally used for
175    /// the interior mutability.
176    ///
177    /// The final objective of [`Token`] is to ensure there's only one mutable reference to the
178    /// [`TokenCell`] at most _at any given moment_. To that end, it's `unsafe` to create it,
179    /// shifting the responsibility of binding the only singleton instance to a particular thread
180    /// and not creating more than one, onto the API consumers. And its constructor is non-`const`,
181    /// and the type is `!Clone` (and `!Copy` as well), `!Default`, `!Send` and `!Sync` to make it
182    /// relatively hard to cross thread boundaries accidentally.
183    ///
184    /// In other words, the token semantically _owns_ all of its associated instances of
185    /// [`TokenCell`]s. And `&mut Token` is needed to access one of them as if the one is of
186    /// [`Token`]'s `*_mut()` getters. Thus, the Rust aliasing rule for `UnsafeCell` can
187    /// transitively be proven to be satisfied simply based on the usual borrow checking of the
188    /// `&mut` reference of [`Token`] itself via
189    /// [`::with_borrow_mut()`](TokenCell::with_borrow_mut).
190    ///
191    /// By extension, it's allowed to create _multiple_ tokens in a _single_ process as long as no
192    /// instance of [`TokenCell`] is shared by multiple instances of [`Token`].
193    ///
194    /// Note that this is overly restrictive in that it's forbidden, yet, technically possible
195    /// to _have multiple mutable references to the inner values at the same time, if and only
196    /// if the respective cells aren't aliased to each other (i.e. different instances)_. This
197    /// artificial restriction is acceptable for its intended use by the unified scheduler's code
198    /// because its algorithm only needs to access each instance of [`TokenCell`]-ed data once at a
199    /// time. Finally, this restriction is traded off for restoration of Rust aliasing rule at zero
200    /// runtime cost.  Without this token mechanism, there's no way to realize this.
201    #[derive(Debug, Default)]
202    pub(super) struct TokenCell<V>(UnsafeCell<V>);
203
204    impl<V> TokenCell<V> {
205        /// Creates a new `TokenCell` with the `value` typed as `V`.
206        ///
207        /// Note that this isn't parametric over the its accompanied `Token`'s lifetime to avoid
208        /// complex handling of non-`'static` heaped data in general. Instead, it's manually
209        /// required to ensure this instance is accessed only via its associated Token for the
210        /// entire lifetime.
211        ///
212        /// This is intentionally left to be non-`const` to forbid unprotected sharing via static
213        /// variables among threads.
214        pub(super) fn new(value: V) -> Self {
215            Self(UnsafeCell::new(value))
216        }
217
218        /// Acquires a mutable reference inside a given closure, while borrowing the mutable
219        /// reference of the given token.
220        ///
221        /// In this way, any additional reborrow can never happen at the same time across all
222        /// instances of [`TokenCell<V>`] conceptually owned by the instance of [`Token<V>`] (a
223        /// particular thread), unless previous borrow is released. After the release, the used
224        /// singleton token should be free to be reused for reborrows.
225        ///
226        /// Note that lifetime of the acquired reference is still restricted to 'self, not
227        /// 'token, in order to avoid use-after-free undefined behaviors.
228        pub(super) fn with_borrow_mut<R>(
229            &self,
230            _token: &mut Token<V>,
231            f: impl FnOnce(&mut V) -> R,
232        ) -> R {
233            f(unsafe { &mut *self.0.get() })
234        }
235    }
236
237    // Safety: Once after a (`Send`-able) `TokenCell` is transferred to a thread from other
238    // threads, access to `TokenCell` is assumed to be only from the single thread by proper use of
239    // Token. Thereby, implementing `Sync` can be thought as safe and doing so is needed for the
240    // particular implementation pattern in the unified scheduler (multi-threaded off-loading).
241    //
242    // In other words, TokenCell is technically still `!Sync`. But there should be no
243    // legalized usage which depends on real `Sync` to avoid undefined behaviors.
244    unsafe impl<V> Sync for TokenCell<V> {}
245
246    /// A auxiliary zero-sized type to enforce aliasing rule to [`TokenCell`] via rust type system
247    ///
248    /// Token semantically owns a collection of `TokenCell` objects and governs the _unique_
249    /// existence of mutable access over them by requiring the token itself to be mutably borrowed
250    /// to get a mutable reference to the internal value of `TokenCell`.
251    // *mut is used to make this type !Send and !Sync
252    pub(super) struct Token<V: 'static>(PhantomData<*mut V>);
253
254    impl<V> Token<V> {
255        /// Returns the token to acquire a mutable reference to the inner value of [TokenCell].
256        ///
257        /// This is intentionally left to be non-`const` to forbid unprotected sharing via static
258        /// variables among threads.
259        ///
260        /// # Panics
261        ///
262        /// This function will `panic!()` if called multiple times with same type `V` from the same
263        /// thread to detect potential misuses.
264        ///
265        /// # Safety
266        ///
267        /// This method should be called exactly once for each thread at most to avoid undefined
268        /// behavior when used with [`Token`].
269        #[must_use]
270        pub(super) unsafe fn assume_exclusive_mutating_thread() -> Self {
271            thread_local! {
272                static TOKENS: RefCell<BTreeSet<TypeId>> = const { RefCell::new(BTreeSet::new()) };
273            }
274            // TOKEN.with_borrow_mut can't panic because it's the only non-overlapping
275            // bound-to-local-variable borrow of the _thread local_ variable.
276            assert!(
277                TOKENS.with_borrow_mut(|tokens| tokens.insert(TypeId::of::<Self>())),
278                "{:?} is wrongly initialized twice on {:?}",
279                any::type_name::<Self>(),
280                thread::current()
281            );
282
283            Self(PhantomData)
284        }
285    }
286
287    #[cfg(test)]
288    mod tests {
289        use {
290            super::{Token, TokenCell},
291            std::{mem, sync::Arc, thread},
292        };
293
294        #[test]
295        #[should_panic(
296            expected = "\"solana_unified_scheduler_logic::utils::Token<usize>\" is wrongly \
297                        initialized twice on Thread"
298        )]
299        fn test_second_creation_of_tokens_in_a_thread() {
300            unsafe {
301                let _ = Token::<usize>::assume_exclusive_mutating_thread();
302                let _ = Token::<usize>::assume_exclusive_mutating_thread();
303            }
304        }
305
306        #[derive(Debug)]
307        struct FakeQueue {
308            v: Vec<u8>,
309        }
310
311        // As documented above, it's illegal to create multiple tokens inside a single thread to
312        // acquire multiple mutable references to the same TokenCell at the same time.
313        #[test]
314        // Trigger (harmless) UB unless running under miri by conditionally #[ignore]-ing,
315        // confirming false-positive result to conversely show the merit of miri!
316        #[cfg_attr(miri, ignore)]
317        fn test_ub_illegally_created_multiple_tokens() {
318            // Unauthorized token minting!
319            let mut token1 = unsafe { mem::transmute::<(), Token<FakeQueue>>(()) };
320            let mut token2 = unsafe { mem::transmute::<(), Token<FakeQueue>>(()) };
321
322            let queue = TokenCell::new(FakeQueue {
323                v: Vec::with_capacity(20),
324            });
325            queue.with_borrow_mut(&mut token1, |queue_mut1| {
326                queue_mut1.v.push(1);
327                queue.with_borrow_mut(&mut token2, |queue_mut2| {
328                    queue_mut2.v.push(2);
329                    queue_mut1.v.push(3);
330                });
331                queue_mut1.v.push(4);
332            });
333
334            // It's in ub already, so we can't assert reliably, so dbg!(...) just for fun
335            #[cfg(not(miri))]
336            dbg!(queue.0.into_inner());
337
338            // Return successfully to indicate an unexpected outcome, because this test should
339            // have aborted by now.
340        }
341
342        // As documented above, it's illegal to share (= co-own) the same instance of TokenCell
343        // across threads. Unfortunately, we can't prevent this from happening with some
344        // type-safety magic to cause compile errors... So sanity-check here test fails due to a
345        // runtime error of the known UB, when run under miri.
346        #[test]
347        // Trigger (harmless) UB unless running under miri by conditionally #[ignore]-ing,
348        // confirming false-positive result to conversely show the merit of miri!
349        #[cfg_attr(miri, ignore)]
350        fn test_ub_illegally_shared_token_cell() {
351            let queue1 = Arc::new(TokenCell::new(FakeQueue {
352                v: Vec::with_capacity(20),
353            }));
354            let queue2 = queue1.clone();
355            #[cfg(not(miri))]
356            let queue3 = queue1.clone();
357
358            // Usually miri immediately detects the data race; but just repeat enough time to avoid
359            // being flaky
360            for _ in 0..10 {
361                let (queue1, queue2) = (queue1.clone(), queue2.clone());
362                let thread1 = thread::spawn(move || {
363                    let mut token = unsafe { Token::assume_exclusive_mutating_thread() };
364                    queue1.with_borrow_mut(&mut token, |queue| {
365                        // this is UB
366                        queue.v.push(3);
367                    });
368                });
369                // Immediately spawn next thread without joining thread1 to ensure there's a data race
370                // definitely. Otherwise, joining here wouldn't cause UB.
371                let thread2 = thread::spawn(move || {
372                    let mut token = unsafe { Token::assume_exclusive_mutating_thread() };
373                    queue2.with_borrow_mut(&mut token, |queue| {
374                        // this is UB
375                        queue.v.push(4);
376                    });
377                });
378
379                thread1.join().unwrap();
380                thread2.join().unwrap();
381            }
382
383            // It's in ub already, so we can't assert reliably, so dbg!(...) just for fun
384            #[cfg(not(miri))]
385            {
386                drop((queue1, queue2));
387                dbg!(Arc::into_inner(queue3).unwrap().0.into_inner());
388            }
389
390            // Return successfully to indicate an unexpected outcome, because this test should
391            // have aborted by now
392        }
393    }
394}
395
396/// [`Result`] for locking a [usage_queue](UsageQueue) with particular
397/// [current_usage](RequestedUsage).
398type LockResult = Result<(), ()>;
399const_assert_eq!(mem::size_of::<LockResult>(), 1);
400
401/// Something to be scheduled; usually a wrapper of [`SanitizedTransaction`].
402pub type Task = Arc<TaskInner>;
403const_assert_eq!(mem::size_of::<Task>(), 8);
404
405/// [`Token`] for [`UsageQueue`].
406type UsageQueueToken = Token<UsageQueueInner>;
407const_assert_eq!(mem::size_of::<UsageQueueToken>(), 0);
408
409/// [`Token`] for [task](Task)'s [internal mutable data](`TaskInner::blocked_usage_count`).
410type BlockedUsageCountToken = Token<ShortCounter>;
411const_assert_eq!(mem::size_of::<BlockedUsageCountToken>(), 0);
412
413/// Internal scheduling data about a particular task.
414#[derive(Debug)]
415pub struct TaskInner {
416    transaction: SanitizedTransaction,
417    /// The index of a transaction in ledger entries; not used by SchedulingStateMachine by itself.
418    /// Carrying this along with the transaction is needed to properly record the execution result
419    /// of it.
420    index: usize,
421    lock_contexts: Vec<LockContext>,
422    blocked_usage_count: TokenCell<ShortCounter>,
423}
424
425impl TaskInner {
426    pub fn task_index(&self) -> usize {
427        self.index
428    }
429
430    pub fn transaction(&self) -> &SanitizedTransaction {
431        &self.transaction
432    }
433
434    fn lock_contexts(&self) -> &[LockContext] {
435        &self.lock_contexts
436    }
437
438    fn set_blocked_usage_count(&self, token: &mut BlockedUsageCountToken, count: ShortCounter) {
439        self.blocked_usage_count
440            .with_borrow_mut(token, |usage_count| {
441                *usage_count = count;
442            })
443    }
444
445    #[must_use]
446    fn try_unblock(self: Task, token: &mut BlockedUsageCountToken) -> Option<Task> {
447        let did_unblock = self
448            .blocked_usage_count
449            .with_borrow_mut(token, |usage_count| usage_count.decrement_self().is_zero());
450        did_unblock.then_some(self)
451    }
452}
453
454/// [`Task`]'s per-address context to lock a [usage_queue](UsageQueue) with [certain kind of
455/// request](RequestedUsage).
456#[derive(Debug)]
457struct LockContext {
458    usage_queue: UsageQueue,
459    requested_usage: RequestedUsage,
460}
461const_assert_eq!(mem::size_of::<LockContext>(), 16);
462
463impl LockContext {
464    fn new(usage_queue: UsageQueue, requested_usage: RequestedUsage) -> Self {
465        Self {
466            usage_queue,
467            requested_usage,
468        }
469    }
470
471    fn with_usage_queue_mut<R>(
472        &self,
473        usage_queue_token: &mut UsageQueueToken,
474        f: impl FnOnce(&mut UsageQueueInner) -> R,
475    ) -> R {
476        self.usage_queue.0.with_borrow_mut(usage_queue_token, f)
477    }
478}
479
480/// Status about how the [`UsageQueue`] is used currently.
481#[derive(Copy, Clone, Debug)]
482enum Usage {
483    Readonly(ShortCounter),
484    Writable,
485}
486const_assert_eq!(mem::size_of::<Usage>(), 8);
487
488impl From<RequestedUsage> for Usage {
489    fn from(requested_usage: RequestedUsage) -> Self {
490        match requested_usage {
491            RequestedUsage::Readonly => Usage::Readonly(ShortCounter::one()),
492            RequestedUsage::Writable => Usage::Writable,
493        }
494    }
495}
496
497/// Status about how a task is requesting to use a particular [`UsageQueue`].
498#[derive(Clone, Copy, Debug)]
499enum RequestedUsage {
500    Readonly,
501    Writable,
502}
503
504/// Internal scheduling data about a particular address.
505///
506/// Specifically, it holds the current [`Usage`] (or no usage with [`Usage::Unused`]) and which
507/// [`Task`]s are blocked to be executed after the current task is notified to be finished via
508/// [`::deschedule_task`](`SchedulingStateMachine::deschedule_task`)
509#[derive(Debug)]
510struct UsageQueueInner {
511    current_usage: Option<Usage>,
512    blocked_usages_from_tasks: VecDeque<UsageFromTask>,
513}
514
515type UsageFromTask = (RequestedUsage, Task);
516
517impl Default for UsageQueueInner {
518    fn default() -> Self {
519        Self {
520            current_usage: None,
521            // Capacity should be configurable to create with large capacity like 1024 inside the
522            // (multi-threaded) closures passed to create_task(). In this way, reallocs can be
523            // avoided happening in the scheduler thread. Also, this configurability is desired for
524            // unified-scheduler-logic's motto: separation of concerns (the pure logic should be
525            // sufficiently distanced from any some random knob's constants needed for messy
526            // reality for author's personal preference...).
527            //
528            // Note that large cap should be accompanied with proper scheduler cleaning after use,
529            // which should be handled by higher layers (i.e. scheduler pool).
530            blocked_usages_from_tasks: VecDeque::with_capacity(128),
531        }
532    }
533}
534
535impl UsageQueueInner {
536    fn try_lock(&mut self, requested_usage: RequestedUsage) -> LockResult {
537        match self.current_usage {
538            None => Some(Usage::from(requested_usage)),
539            Some(Usage::Readonly(count)) => match requested_usage {
540                RequestedUsage::Readonly => Some(Usage::Readonly(count.increment())),
541                RequestedUsage::Writable => None,
542            },
543            Some(Usage::Writable) => None,
544        }
545        .inspect(|&new_usage| {
546            self.current_usage = Some(new_usage);
547        })
548        .map(|_| ())
549        .ok_or(())
550    }
551
552    #[must_use]
553    fn unlock(&mut self, requested_usage: RequestedUsage) -> Option<UsageFromTask> {
554        let mut is_unused_now = false;
555        match &mut self.current_usage {
556            Some(Usage::Readonly(ref mut count)) => match requested_usage {
557                RequestedUsage::Readonly => {
558                    if count.is_one() {
559                        is_unused_now = true;
560                    } else {
561                        count.decrement_self();
562                    }
563                }
564                RequestedUsage::Writable => unreachable!(),
565            },
566            Some(Usage::Writable) => match requested_usage {
567                RequestedUsage::Writable => {
568                    is_unused_now = true;
569                }
570                RequestedUsage::Readonly => unreachable!(),
571            },
572            None => unreachable!(),
573        }
574
575        if is_unused_now {
576            self.current_usage = None;
577            self.blocked_usages_from_tasks.pop_front()
578        } else {
579            None
580        }
581    }
582
583    fn push_blocked_usage_from_task(&mut self, usage_from_task: UsageFromTask) {
584        assert_matches!(self.current_usage, Some(_));
585        self.blocked_usages_from_tasks.push_back(usage_from_task);
586    }
587
588    #[must_use]
589    fn pop_unblocked_readonly_usage_from_task(&mut self) -> Option<UsageFromTask> {
590        if matches!(
591            self.blocked_usages_from_tasks.front(),
592            Some((RequestedUsage::Readonly, _))
593        ) {
594            assert_matches!(self.current_usage, Some(Usage::Readonly(_)));
595            self.blocked_usages_from_tasks.pop_front()
596        } else {
597            None
598        }
599    }
600
601    fn has_no_blocked_usage(&self) -> bool {
602        self.blocked_usages_from_tasks.is_empty()
603    }
604}
605
606const_assert_eq!(mem::size_of::<TokenCell<UsageQueueInner>>(), 40);
607
608/// Scheduler's internal data for each address ([`Pubkey`](`solana_sdk::pubkey::Pubkey`)). Very
609/// opaque wrapper type; no methods just with [`::clone()`](Clone::clone) and
610/// [`::default()`](Default::default).
611#[derive(Debug, Clone, Default)]
612pub struct UsageQueue(Arc<TokenCell<UsageQueueInner>>);
613const_assert_eq!(mem::size_of::<UsageQueue>(), 8);
614
615/// A high-level `struct`, managing the overall scheduling of [tasks](Task), to be used by
616/// `solana-unified-scheduler-pool`.
617pub struct SchedulingStateMachine {
618    unblocked_task_queue: VecDeque<Task>,
619    active_task_count: ShortCounter,
620    handled_task_count: ShortCounter,
621    unblocked_task_count: ShortCounter,
622    total_task_count: ShortCounter,
623    count_token: BlockedUsageCountToken,
624    usage_queue_token: UsageQueueToken,
625}
626const_assert_eq!(mem::size_of::<SchedulingStateMachine>(), 48);
627
628impl SchedulingStateMachine {
629    pub fn has_no_active_task(&self) -> bool {
630        self.active_task_count.is_zero()
631    }
632
633    pub fn has_unblocked_task(&self) -> bool {
634        !self.unblocked_task_queue.is_empty()
635    }
636
637    pub fn unblocked_task_queue_count(&self) -> usize {
638        self.unblocked_task_queue.len()
639    }
640
641    pub fn active_task_count(&self) -> u32 {
642        self.active_task_count.current()
643    }
644
645    pub fn handled_task_count(&self) -> u32 {
646        self.handled_task_count.current()
647    }
648
649    pub fn unblocked_task_count(&self) -> u32 {
650        self.unblocked_task_count.current()
651    }
652
653    pub fn total_task_count(&self) -> u32 {
654        self.total_task_count.current()
655    }
656
657    /// Schedules given `task`, returning it if successful.
658    ///
659    /// Returns `Some(task)` if it's immediately scheduled. Otherwise, returns `None`,
660    /// indicating the scheduled task is blocked currently.
661    ///
662    /// Note that this function takes ownership of the task to allow for future optimizations.
663    #[must_use]
664    pub fn schedule_task(&mut self, task: Task) -> Option<Task> {
665        self.total_task_count.increment_self();
666        self.active_task_count.increment_self();
667        self.try_lock_usage_queues(task)
668    }
669
670    #[must_use]
671    pub fn schedule_next_unblocked_task(&mut self) -> Option<Task> {
672        self.unblocked_task_queue.pop_front().inspect(|_| {
673            self.unblocked_task_count.increment_self();
674        })
675    }
676
677    /// Deschedules given scheduled `task`.
678    ///
679    /// This must be called exactly once for all scheduled tasks to uphold both
680    /// `SchedulingStateMachine` and `UsageQueue` internal state consistency at any given moment of
681    /// time. It's serious logic error to call this twice with the same task or none at all after
682    /// scheduling. Similarly, calling this with not scheduled task is also forbidden.
683    ///
684    /// Note that this function intentionally doesn't take ownership of the task to avoid dropping
685    /// tasks inside `SchedulingStateMachine` to provide an offloading-based optimization
686    /// opportunity for callers.
687    pub fn deschedule_task(&mut self, task: &Task) {
688        self.active_task_count.decrement_self();
689        self.handled_task_count.increment_self();
690        self.unlock_usage_queues(task);
691    }
692
693    #[must_use]
694    fn try_lock_usage_queues(&mut self, task: Task) -> Option<Task> {
695        let mut blocked_usage_count = ShortCounter::zero();
696
697        for context in task.lock_contexts() {
698            context.with_usage_queue_mut(&mut self.usage_queue_token, |usage_queue| {
699                let lock_result = if usage_queue.has_no_blocked_usage() {
700                    usage_queue.try_lock(context.requested_usage)
701                } else {
702                    LockResult::Err(())
703                };
704                if let Err(()) = lock_result {
705                    blocked_usage_count.increment_self();
706                    let usage_from_task = (context.requested_usage, task.clone());
707                    usage_queue.push_blocked_usage_from_task(usage_from_task);
708                }
709            });
710        }
711
712        // no blocked usage count means success
713        if blocked_usage_count.is_zero() {
714            Some(task)
715        } else {
716            task.set_blocked_usage_count(&mut self.count_token, blocked_usage_count);
717            None
718        }
719    }
720
721    fn unlock_usage_queues(&mut self, task: &Task) {
722        for context in task.lock_contexts() {
723            context.with_usage_queue_mut(&mut self.usage_queue_token, |usage_queue| {
724                let mut unblocked_task_from_queue = usage_queue.unlock(context.requested_usage);
725
726                while let Some((requested_usage, task_with_unblocked_queue)) =
727                    unblocked_task_from_queue
728                {
729                    // When `try_unblock()` returns `None` as a failure of unblocking this time,
730                    // this means the task is still blocked by other active task's usages. So,
731                    // don't push task into unblocked_task_queue yet. It can be assumed that every
732                    // task will eventually succeed to be unblocked, and enter in this condition
733                    // clause as long as `SchedulingStateMachine` is used correctly.
734                    if let Some(task) = task_with_unblocked_queue.try_unblock(&mut self.count_token)
735                    {
736                        self.unblocked_task_queue.push_back(task);
737                    }
738
739                    match usage_queue.try_lock(requested_usage) {
740                        LockResult::Ok(()) => {
741                            // Try to further schedule blocked task for parallelism in the case of
742                            // readonly usages
743                            unblocked_task_from_queue =
744                                if matches!(requested_usage, RequestedUsage::Readonly) {
745                                    usage_queue.pop_unblocked_readonly_usage_from_task()
746                                } else {
747                                    None
748                                };
749                        }
750                        LockResult::Err(()) => panic!("should never fail in this context"),
751                    }
752                }
753            });
754        }
755    }
756
757    /// Creates a new task with [`SanitizedTransaction`] with all of its corresponding
758    /// [`UsageQueue`]s preloaded.
759    ///
760    /// Closure (`usage_queue_loader`) is used to delegate the (possibly multi-threaded)
761    /// implementation of [`UsageQueue`] look-up by [`pubkey`](Pubkey) to callers. It's the
762    /// caller's responsibility to ensure the same instance is returned from the closure, given a
763    /// particular pubkey.
764    ///
765    /// Closure is used here to delegate the responsibility of primary ownership of `UsageQueue`
766    /// (and caching/pruning if any) to the caller. `SchedulingStateMachine` guarantees that all of
767    /// shared owndership of `UsageQueue`s are released and UsageQueue state is identical to just
768    /// after created, if `has_no_active_task()` is `true`. Also note that this is desired for
769    /// separation of concern.
770    pub fn create_task(
771        transaction: SanitizedTransaction,
772        index: usize,
773        usage_queue_loader: &mut impl FnMut(Pubkey) -> UsageQueue,
774    ) -> Task {
775        // It's crucial for tasks to be validated with
776        // `account_locks::validate_account_locks()` prior to the creation.
777        // That's because it's part of protocol consensus regarding the
778        // rejection of blocks containing malformed transactions
779        // (`AccountLoadedTwice` and `TooManyAccountLocks`). Even more,
780        // `SchedulingStateMachine` can't properly handle transactions with
781        // duplicate addresses (those falling under `AccountLoadedTwice`).
782        //
783        // However, it's okay for now not to call `::validate_account_locks()`
784        // here.
785        //
786        // Currently `replay_stage` is always calling
787        //`::validate_account_locks()` regardless of whether unified-scheduler
788        // is enabled or not at the blockstore
789        // (`Bank::prepare_sanitized_batch()` is called in
790        // `process_entries()`). This verification will be hoisted for
791        // optimization when removing
792        // `--block-verification-method=blockstore-processor`.
793        //
794        // As for `banking_stage` with unified scheduler, it will need to run
795        // `validate_account_locks()` at least once somewhere in the code path.
796        // In the distant future, this function (`create_task()`) should be
797        // adjusted so that both stages do the checks before calling this or do
798        // the checks here, to simplify the two code paths regarding the
799        // essential `validate_account_locks` validation.
800        //
801        // Lastly, `validate_account_locks()` is currently called in
802        // `DefaultTransactionHandler::handle()` via
803        // `Bank::prepare_unlocked_batch_from_single_tx()` as well.
804        // This redundancy is known. It was just left as-is out of abundance
805        // of caution.
806        let lock_contexts = transaction
807            .message()
808            .account_keys()
809            .iter()
810            .enumerate()
811            .map(|(index, address)| {
812                LockContext::new(
813                    usage_queue_loader(*address),
814                    if transaction.message().is_writable(index) {
815                        RequestedUsage::Writable
816                    } else {
817                        RequestedUsage::Readonly
818                    },
819                )
820            })
821            .collect();
822
823        Task::new(TaskInner {
824            transaction,
825            index,
826            lock_contexts,
827            blocked_usage_count: TokenCell::new(ShortCounter::zero()),
828        })
829    }
830
831    /// Rewind the inactive state machine to be initialized
832    ///
833    /// This isn't called _reset_ to indicate this isn't safe to call this at any given moment.
834    /// This panics if the state machine hasn't properly been finished (i.e.  there should be no
835    /// active task) to uphold invariants of [`UsageQueue`]s.
836    ///
837    /// This method is intended to reuse SchedulingStateMachine instance (to avoid its `unsafe`
838    /// [constructor](SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling)
839    /// as much as possible) and its (possibly cached) associated [`UsageQueue`]s for processing
840    /// other slots.
841    pub fn reinitialize(&mut self) {
842        assert!(self.has_no_active_task());
843        assert_eq!(self.unblocked_task_queue.len(), 0);
844        // nice trick to ensure all fields are handled here if new one is added.
845        let Self {
846            unblocked_task_queue: _,
847            active_task_count,
848            handled_task_count,
849            unblocked_task_count,
850            total_task_count,
851            count_token: _,
852            usage_queue_token: _,
853            // don't add ".." here
854        } = self;
855        active_task_count.reset_to_zero();
856        handled_task_count.reset_to_zero();
857        unblocked_task_count.reset_to_zero();
858        total_task_count.reset_to_zero();
859    }
860
861    /// Creates a new instance of [`SchedulingStateMachine`] with its `unsafe` fields created as
862    /// well, thus carrying over `unsafe`.
863    ///
864    /// # Safety
865    /// Call this exactly once for each thread. See [`TokenCell`] for details.
866    #[must_use]
867    pub unsafe fn exclusively_initialize_current_thread_for_scheduling() -> Self {
868        Self {
869            // It's very unlikely this is desired to be configurable, like
870            // `UsageQueueInner::blocked_usages_from_tasks`'s cap.
871            unblocked_task_queue: VecDeque::with_capacity(1024),
872            active_task_count: ShortCounter::zero(),
873            handled_task_count: ShortCounter::zero(),
874            unblocked_task_count: ShortCounter::zero(),
875            total_task_count: ShortCounter::zero(),
876            count_token: unsafe { BlockedUsageCountToken::assume_exclusive_mutating_thread() },
877            usage_queue_token: unsafe { UsageQueueToken::assume_exclusive_mutating_thread() },
878        }
879    }
880}
881
882#[cfg(test)]
883mod tests {
884    use {
885        super::*,
886        solana_sdk::{
887            instruction::{AccountMeta, Instruction},
888            message::Message,
889            pubkey::Pubkey,
890            signature::Signer,
891            signer::keypair::Keypair,
892            transaction::{SanitizedTransaction, Transaction},
893        },
894        std::{cell::RefCell, collections::HashMap, rc::Rc},
895    };
896
897    fn simplest_transaction() -> SanitizedTransaction {
898        let payer = Keypair::new();
899        let message = Message::new(&[], Some(&payer.pubkey()));
900        let unsigned = Transaction::new_unsigned(message);
901        SanitizedTransaction::from_transaction_for_tests(unsigned)
902    }
903
904    fn transaction_with_readonly_address(address: Pubkey) -> SanitizedTransaction {
905        let instruction = Instruction {
906            program_id: Pubkey::default(),
907            accounts: vec![AccountMeta::new_readonly(address, false)],
908            data: vec![],
909        };
910        let message = Message::new(&[instruction], Some(&Pubkey::new_unique()));
911        let unsigned = Transaction::new_unsigned(message);
912        SanitizedTransaction::from_transaction_for_tests(unsigned)
913    }
914
915    fn transaction_with_writable_address(address: Pubkey) -> SanitizedTransaction {
916        let instruction = Instruction {
917            program_id: Pubkey::default(),
918            accounts: vec![AccountMeta::new(address, false)],
919            data: vec![],
920        };
921        let message = Message::new(&[instruction], Some(&Pubkey::new_unique()));
922        let unsigned = Transaction::new_unsigned(message);
923        SanitizedTransaction::from_transaction_for_tests(unsigned)
924    }
925
926    fn create_address_loader(
927        usage_queues: Option<Rc<RefCell<HashMap<Pubkey, UsageQueue>>>>,
928    ) -> impl FnMut(Pubkey) -> UsageQueue {
929        let usage_queues = usage_queues.unwrap_or_default();
930        move |address| {
931            usage_queues
932                .borrow_mut()
933                .entry(address)
934                .or_default()
935                .clone()
936        }
937    }
938
939    #[test]
940    fn test_scheduling_state_machine_creation() {
941        let state_machine = unsafe {
942            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
943        };
944        assert_eq!(state_machine.active_task_count(), 0);
945        assert_eq!(state_machine.total_task_count(), 0);
946        assert!(state_machine.has_no_active_task());
947    }
948
949    #[test]
950    fn test_scheduling_state_machine_good_reinitialization() {
951        let mut state_machine = unsafe {
952            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
953        };
954        state_machine.total_task_count.increment_self();
955        assert_eq!(state_machine.total_task_count(), 1);
956        state_machine.reinitialize();
957        assert_eq!(state_machine.total_task_count(), 0);
958    }
959
960    #[test]
961    #[should_panic(expected = "assertion failed: self.has_no_active_task()")]
962    fn test_scheduling_state_machine_bad_reinitialization() {
963        let mut state_machine = unsafe {
964            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
965        };
966        let address_loader = &mut create_address_loader(None);
967        let task = SchedulingStateMachine::create_task(simplest_transaction(), 3, address_loader);
968        state_machine.schedule_task(task).unwrap();
969        state_machine.reinitialize();
970    }
971
972    #[test]
973    fn test_create_task() {
974        let sanitized = simplest_transaction();
975        let task = SchedulingStateMachine::create_task(sanitized.clone(), 3, &mut |_| {
976            UsageQueue::default()
977        });
978        assert_eq!(task.task_index(), 3);
979        assert_eq!(task.transaction(), &sanitized);
980    }
981
982    #[test]
983    fn test_non_conflicting_task_related_counts() {
984        let sanitized = simplest_transaction();
985        let address_loader = &mut create_address_loader(None);
986        let task = SchedulingStateMachine::create_task(sanitized.clone(), 3, address_loader);
987
988        let mut state_machine = unsafe {
989            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
990        };
991        let task = state_machine.schedule_task(task).unwrap();
992        assert_eq!(state_machine.active_task_count(), 1);
993        assert_eq!(state_machine.total_task_count(), 1);
994        state_machine.deschedule_task(&task);
995        assert_eq!(state_machine.active_task_count(), 0);
996        assert_eq!(state_machine.total_task_count(), 1);
997        assert!(state_machine.has_no_active_task());
998    }
999
1000    #[test]
1001    fn test_conflicting_task_related_counts() {
1002        let sanitized = simplest_transaction();
1003        let address_loader = &mut create_address_loader(None);
1004        let task1 = SchedulingStateMachine::create_task(sanitized.clone(), 101, address_loader);
1005        let task2 = SchedulingStateMachine::create_task(sanitized.clone(), 102, address_loader);
1006        let task3 = SchedulingStateMachine::create_task(sanitized.clone(), 103, address_loader);
1007
1008        let mut state_machine = unsafe {
1009            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1010        };
1011        assert_matches!(
1012            state_machine
1013                .schedule_task(task1.clone())
1014                .map(|t| t.task_index()),
1015            Some(101)
1016        );
1017        assert_matches!(state_machine.schedule_task(task2.clone()), None);
1018
1019        state_machine.deschedule_task(&task1);
1020        assert!(state_machine.has_unblocked_task());
1021        assert_eq!(state_machine.unblocked_task_queue_count(), 1);
1022
1023        // unblocked_task_count() should be incremented
1024        assert_eq!(state_machine.unblocked_task_count(), 0);
1025        assert_eq!(
1026            state_machine
1027                .schedule_next_unblocked_task()
1028                .map(|t| t.task_index()),
1029            Some(102)
1030        );
1031        assert_eq!(state_machine.unblocked_task_count(), 1);
1032
1033        // there's no blocked task anymore; calling schedule_next_unblocked_task should be noop and
1034        // shouldn't increment the unblocked_task_count().
1035        assert!(!state_machine.has_unblocked_task());
1036        assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1037        assert_eq!(state_machine.unblocked_task_count(), 1);
1038
1039        assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1040        state_machine.deschedule_task(&task2);
1041
1042        assert_matches!(
1043            state_machine
1044                .schedule_task(task3.clone())
1045                .map(|task| task.task_index()),
1046            Some(103)
1047        );
1048        state_machine.deschedule_task(&task3);
1049        assert!(state_machine.has_no_active_task());
1050    }
1051
1052    #[test]
1053    fn test_existing_blocking_task_then_newly_scheduled_task() {
1054        let sanitized = simplest_transaction();
1055        let address_loader = &mut create_address_loader(None);
1056        let task1 = SchedulingStateMachine::create_task(sanitized.clone(), 101, address_loader);
1057        let task2 = SchedulingStateMachine::create_task(sanitized.clone(), 102, address_loader);
1058        let task3 = SchedulingStateMachine::create_task(sanitized.clone(), 103, address_loader);
1059
1060        let mut state_machine = unsafe {
1061            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1062        };
1063        assert_matches!(
1064            state_machine
1065                .schedule_task(task1.clone())
1066                .map(|t| t.task_index()),
1067            Some(101)
1068        );
1069        assert_matches!(state_machine.schedule_task(task2.clone()), None);
1070
1071        assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1072        state_machine.deschedule_task(&task1);
1073        assert_eq!(state_machine.unblocked_task_queue_count(), 1);
1074
1075        // new task is arriving after task1 is already descheduled and task2 got unblocked
1076        assert_matches!(state_machine.schedule_task(task3.clone()), None);
1077
1078        assert_eq!(state_machine.unblocked_task_count(), 0);
1079        assert_matches!(
1080            state_machine
1081                .schedule_next_unblocked_task()
1082                .map(|t| t.task_index()),
1083            Some(102)
1084        );
1085        assert_eq!(state_machine.unblocked_task_count(), 1);
1086
1087        state_machine.deschedule_task(&task2);
1088
1089        assert_matches!(
1090            state_machine
1091                .schedule_next_unblocked_task()
1092                .map(|t| t.task_index()),
1093            Some(103)
1094        );
1095        assert_eq!(state_machine.unblocked_task_count(), 2);
1096
1097        state_machine.deschedule_task(&task3);
1098        assert!(state_machine.has_no_active_task());
1099    }
1100
1101    #[test]
1102    fn test_multiple_readonly_task_and_counts() {
1103        let conflicting_address = Pubkey::new_unique();
1104        let sanitized1 = transaction_with_readonly_address(conflicting_address);
1105        let sanitized2 = transaction_with_readonly_address(conflicting_address);
1106        let address_loader = &mut create_address_loader(None);
1107        let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1108        let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1109
1110        let mut state_machine = unsafe {
1111            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1112        };
1113        // both of read-only tasks should be immediately runnable
1114        assert_matches!(
1115            state_machine
1116                .schedule_task(task1.clone())
1117                .map(|t| t.task_index()),
1118            Some(101)
1119        );
1120        assert_matches!(
1121            state_machine
1122                .schedule_task(task2.clone())
1123                .map(|t| t.task_index()),
1124            Some(102)
1125        );
1126
1127        assert_eq!(state_machine.active_task_count(), 2);
1128        assert_eq!(state_machine.handled_task_count(), 0);
1129        assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1130        state_machine.deschedule_task(&task1);
1131        assert_eq!(state_machine.active_task_count(), 1);
1132        assert_eq!(state_machine.handled_task_count(), 1);
1133        assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1134        state_machine.deschedule_task(&task2);
1135        assert_eq!(state_machine.active_task_count(), 0);
1136        assert_eq!(state_machine.handled_task_count(), 2);
1137        assert!(state_machine.has_no_active_task());
1138    }
1139
1140    #[test]
1141    fn test_all_blocking_readable_tasks_block_writable_task() {
1142        let conflicting_address = Pubkey::new_unique();
1143        let sanitized1 = transaction_with_readonly_address(conflicting_address);
1144        let sanitized2 = transaction_with_readonly_address(conflicting_address);
1145        let sanitized3 = transaction_with_writable_address(conflicting_address);
1146        let address_loader = &mut create_address_loader(None);
1147        let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1148        let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1149        let task3 = SchedulingStateMachine::create_task(sanitized3, 103, address_loader);
1150
1151        let mut state_machine = unsafe {
1152            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1153        };
1154        assert_matches!(
1155            state_machine
1156                .schedule_task(task1.clone())
1157                .map(|t| t.task_index()),
1158            Some(101)
1159        );
1160        assert_matches!(
1161            state_machine
1162                .schedule_task(task2.clone())
1163                .map(|t| t.task_index()),
1164            Some(102)
1165        );
1166        assert_matches!(state_machine.schedule_task(task3.clone()), None);
1167
1168        assert_eq!(state_machine.active_task_count(), 3);
1169        assert_eq!(state_machine.handled_task_count(), 0);
1170        assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1171        state_machine.deschedule_task(&task1);
1172        assert_eq!(state_machine.active_task_count(), 2);
1173        assert_eq!(state_machine.handled_task_count(), 1);
1174        assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1175        assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1176        state_machine.deschedule_task(&task2);
1177        assert_eq!(state_machine.active_task_count(), 1);
1178        assert_eq!(state_machine.handled_task_count(), 2);
1179        assert_eq!(state_machine.unblocked_task_queue_count(), 1);
1180        // task3 is finally unblocked after all of readable tasks (task1 and task2) is finished.
1181        assert_matches!(
1182            state_machine
1183                .schedule_next_unblocked_task()
1184                .map(|t| t.task_index()),
1185            Some(103)
1186        );
1187        state_machine.deschedule_task(&task3);
1188        assert!(state_machine.has_no_active_task());
1189    }
1190
1191    #[test]
1192    fn test_readonly_then_writable_then_readonly_linearized() {
1193        let conflicting_address = Pubkey::new_unique();
1194        let sanitized1 = transaction_with_readonly_address(conflicting_address);
1195        let sanitized2 = transaction_with_writable_address(conflicting_address);
1196        let sanitized3 = transaction_with_readonly_address(conflicting_address);
1197        let address_loader = &mut create_address_loader(None);
1198        let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1199        let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1200        let task3 = SchedulingStateMachine::create_task(sanitized3, 103, address_loader);
1201
1202        let mut state_machine = unsafe {
1203            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1204        };
1205        assert_matches!(
1206            state_machine
1207                .schedule_task(task1.clone())
1208                .map(|t| t.task_index()),
1209            Some(101)
1210        );
1211        assert_matches!(state_machine.schedule_task(task2.clone()), None);
1212        assert_matches!(state_machine.schedule_task(task3.clone()), None);
1213
1214        assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1215        state_machine.deschedule_task(&task1);
1216        assert_matches!(
1217            state_machine
1218                .schedule_next_unblocked_task()
1219                .map(|t| t.task_index()),
1220            Some(102)
1221        );
1222        assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1223        state_machine.deschedule_task(&task2);
1224        assert_matches!(
1225            state_machine
1226                .schedule_next_unblocked_task()
1227                .map(|t| t.task_index()),
1228            Some(103)
1229        );
1230        assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1231        state_machine.deschedule_task(&task3);
1232        assert!(state_machine.has_no_active_task());
1233    }
1234
1235    #[test]
1236    fn test_readonly_then_writable() {
1237        let conflicting_address = Pubkey::new_unique();
1238        let sanitized1 = transaction_with_readonly_address(conflicting_address);
1239        let sanitized2 = transaction_with_writable_address(conflicting_address);
1240        let address_loader = &mut create_address_loader(None);
1241        let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1242        let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1243
1244        let mut state_machine = unsafe {
1245            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1246        };
1247        assert_matches!(
1248            state_machine
1249                .schedule_task(task1.clone())
1250                .map(|t| t.task_index()),
1251            Some(101)
1252        );
1253        assert_matches!(state_machine.schedule_task(task2.clone()), None);
1254
1255        // descheduling read-locking task1 should equate to unblocking write-locking task2
1256        state_machine.deschedule_task(&task1);
1257        assert_matches!(
1258            state_machine
1259                .schedule_next_unblocked_task()
1260                .map(|t| t.task_index()),
1261            Some(102)
1262        );
1263        state_machine.deschedule_task(&task2);
1264        assert!(state_machine.has_no_active_task());
1265    }
1266
1267    #[test]
1268    fn test_blocked_tasks_writable_2_readonly_then_writable() {
1269        let conflicting_address = Pubkey::new_unique();
1270        let sanitized1 = transaction_with_writable_address(conflicting_address);
1271        let sanitized2 = transaction_with_readonly_address(conflicting_address);
1272        let sanitized3 = transaction_with_readonly_address(conflicting_address);
1273        let sanitized4 = transaction_with_writable_address(conflicting_address);
1274        let address_loader = &mut create_address_loader(None);
1275        let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1276        let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1277        let task3 = SchedulingStateMachine::create_task(sanitized3, 103, address_loader);
1278        let task4 = SchedulingStateMachine::create_task(sanitized4, 104, address_loader);
1279
1280        let mut state_machine = unsafe {
1281            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1282        };
1283        assert_matches!(
1284            state_machine
1285                .schedule_task(task1.clone())
1286                .map(|t| t.task_index()),
1287            Some(101)
1288        );
1289        assert_matches!(state_machine.schedule_task(task2.clone()), None);
1290        assert_matches!(state_machine.schedule_task(task3.clone()), None);
1291        assert_matches!(state_machine.schedule_task(task4.clone()), None);
1292
1293        state_machine.deschedule_task(&task1);
1294        assert_matches!(
1295            state_machine
1296                .schedule_next_unblocked_task()
1297                .map(|t| t.task_index()),
1298            Some(102)
1299        );
1300        assert_matches!(
1301            state_machine
1302                .schedule_next_unblocked_task()
1303                .map(|t| t.task_index()),
1304            Some(103)
1305        );
1306        // the above deschedule_task(task1) call should only unblock task2 and task3 because these
1307        // are read-locking. And shouldn't unblock task4 because it's write-locking
1308        assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1309
1310        state_machine.deschedule_task(&task2);
1311        // still task4 is blocked...
1312        assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1313
1314        state_machine.deschedule_task(&task3);
1315        // finally task4 should be unblocked
1316        assert_matches!(
1317            state_machine
1318                .schedule_next_unblocked_task()
1319                .map(|t| t.task_index()),
1320            Some(104)
1321        );
1322        state_machine.deschedule_task(&task4);
1323        assert!(state_machine.has_no_active_task());
1324    }
1325
1326    #[test]
1327    fn test_gradual_locking() {
1328        let conflicting_address = Pubkey::new_unique();
1329        let sanitized1 = transaction_with_writable_address(conflicting_address);
1330        let sanitized2 = transaction_with_writable_address(conflicting_address);
1331        let usage_queues = Rc::new(RefCell::new(HashMap::new()));
1332        let address_loader = &mut create_address_loader(Some(usage_queues.clone()));
1333        let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1334        let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1335
1336        let mut state_machine = unsafe {
1337            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1338        };
1339        assert_matches!(
1340            state_machine
1341                .schedule_task(task1.clone())
1342                .map(|t| t.task_index()),
1343            Some(101)
1344        );
1345        assert_matches!(state_machine.schedule_task(task2.clone()), None);
1346        let usage_queues = usage_queues.borrow_mut();
1347        let usage_queue = usage_queues.get(&conflicting_address).unwrap();
1348        usage_queue
1349            .0
1350            .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1351                assert_matches!(usage_queue.current_usage, Some(Usage::Writable));
1352            });
1353        // task2's fee payer should have been locked already even if task2 is blocked still via the
1354        // above the schedule_task(task2) call
1355        let fee_payer = task2.transaction().message().fee_payer();
1356        let usage_queue = usage_queues.get(fee_payer).unwrap();
1357        usage_queue
1358            .0
1359            .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1360                assert_matches!(usage_queue.current_usage, Some(Usage::Writable));
1361            });
1362        state_machine.deschedule_task(&task1);
1363        assert_matches!(
1364            state_machine
1365                .schedule_next_unblocked_task()
1366                .map(|t| t.task_index()),
1367            Some(102)
1368        );
1369        state_machine.deschedule_task(&task2);
1370        assert!(state_machine.has_no_active_task());
1371    }
1372
1373    #[test]
1374    #[should_panic(expected = "internal error: entered unreachable code")]
1375    fn test_unreachable_unlock_conditions1() {
1376        let mut state_machine = unsafe {
1377            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1378        };
1379        let usage_queue = UsageQueue::default();
1380        usage_queue
1381            .0
1382            .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1383                let _ = usage_queue.unlock(RequestedUsage::Writable);
1384            });
1385    }
1386
1387    #[test]
1388    #[should_panic(expected = "internal error: entered unreachable code")]
1389    fn test_unreachable_unlock_conditions2() {
1390        let mut state_machine = unsafe {
1391            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1392        };
1393        let usage_queue = UsageQueue::default();
1394        usage_queue
1395            .0
1396            .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1397                usage_queue.current_usage = Some(Usage::Writable);
1398                let _ = usage_queue.unlock(RequestedUsage::Readonly);
1399            });
1400    }
1401
1402    #[test]
1403    #[should_panic(expected = "internal error: entered unreachable code")]
1404    fn test_unreachable_unlock_conditions3() {
1405        let mut state_machine = unsafe {
1406            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1407        };
1408        let usage_queue = UsageQueue::default();
1409        usage_queue
1410            .0
1411            .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1412                usage_queue.current_usage = Some(Usage::Readonly(ShortCounter::one()));
1413                let _ = usage_queue.unlock(RequestedUsage::Writable);
1414            });
1415    }
1416}