solana_unified_scheduler_logic/
lib.rs

1#![allow(rustdoc::private_intra_doc_links)]
2//! The task (transaction) scheduling code for the unified scheduler
3//!
4//! ### High-level API and design
5//!
6//! The most important type is [`SchedulingStateMachine`]. It takes new tasks (= transactions) and
7//! may return back them if runnable via
8//! [`::schedule_task()`](SchedulingStateMachine::schedule_task) while maintaining the account
9//! readonly/writable lock rules. Those returned runnable tasks are guaranteed to be safe to
10//! execute in parallel. Lastly, `SchedulingStateMachine` should be notified about the completion
11//! of the exeuction via [`::deschedule_task()`](SchedulingStateMachine::deschedule_task), so that
12//! conflicting tasks can be returned from
13//! [`::schedule_next_unblocked_task()`](SchedulingStateMachine::schedule_next_unblocked_task) as
14//! newly-unblocked runnable ones.
15//!
16//! The design principle of this crate (`solana-unified-scheduler-logic`) is simplicity for the
17//! separation of concern. It is interacted only with a few of its public API by
18//! `solana-unified-scheduler-pool`. This crate doesn't know about banks, slots, solana-runtime,
19//! threads, crossbeam-channel at all. Becasue of this, it's deterministic, easy-to-unit-test, and
20//! its perf footprint is well understood. It really focuses on its single job: sorting
21//! transactions in executable order.
22//!
23//! ### Algorithm
24//!
25//! The algorithm can be said it's based on per-address FIFO queues, which are updated every time
26//! both new task is coming (= called _scheduling_) and runnable (= _post-scheduling_) task is
27//! finished (= called _descheduling_).
28//!
29//! For the _non-conflicting scheduling_ case, the story is very simple; it just remembers that all
30//! of accessed addresses are write-locked or read-locked with the number of active (=
31//! _currently-scheduled-and-not-descheduled-yet_) tasks. Correspondingly, descheduling does the
32//! opposite book-keeping process, regardless whether a finished task has been conflicted or not.
33//!
34//! For the _conflicting scheduling_ case, it remembers that each of **non-conflicting addresses**
35//! like the non-conflicting case above. As for **conflicting addresses**, each task is recorded to
36//! respective FIFO queues attached to the (conflicting) addresses. Importantly, the number of
37//! conflicting addresses of the conflicting task is also remembered.
38//!
39//! The last missing piece is that the scheduler actually tries to reschedule previously blocked
40//! tasks while deschduling, in addition to the above-mentioned book-keeping processing. Namely,
41//! when given address is ready for new fresh locking resulted from descheduling a task (i.e. write
42//! lock is released or read lock count has reached zero), it pops out the first element of the
43//! FIFO blocked-task queue of the address. Then, it immediately marks the address as relocked. It
44//! also decrements the number of conflicting addresses of the popped-out task. As the final step,
45//! if the number reaches to the zero, it means the task has fully finished locking all of its
46//! addresses and is directly routed to be runnable. Lastly, if the next first element of the
47//! blocked-task queue is trying to read-lock the address like the popped-out one, this
48//! rescheduling is repeated as an optimization to increase parallelism of task execution.
49//!
50//! Put differently, this algorithm tries to gradually lock all of addresses of tasks at different
51//! timings while not deviating the execution order from the original task ingestion order. This
52//! implies there's no locking retries in general, which is the primary source of non-linear perf.
53//! degration.
54//!
55//! As a ballpark number from a synthesized micro benchmark on usual CPU for `mainnet-beta`
56//! validators, it takes roughly 100ns to schedule and deschedule a transaction with 10 accounts.
57//! And 1us for a transaction with 100 accounts. Note that this excludes crossbeam communication
58//! overhead at all. That's said, it's not unrealistic to say the whole unified scheduler can
59//! attain 100k-1m tps overall, assuming those transaction executions aren't bottlenecked.
60//!
61//! ### Runtime performance characteristics and data structure arrangement
62//!
63//! Its algorithm is very fast for high throughput, real-time for low latency. The whole
64//! unified-scheduler architecture is designed from grounds up to support the fastest execution of
65//! this scheduling code. For that end, unified scheduler pre-loads address-specific locking state
66//! data structures (called [`UsageQueue`]) for all of transaction's accounts, in order to offload
67//! the job to other threads from the scheduler thread. This preloading is done inside
68//! [`create_task()`](SchedulingStateMachine::create_task). In this way, task scheduling
69//! computational complexity is basically reduced to several word-sized loads and stores in the
70//! schduler thread (i.e.  constant; no allocations nor syscalls), while being proportional to the
71//! number of addresses in a given transaction. Note that this statement is held true, regardless
72//! of conflicts. This is because the preloading also pre-allocates some scratch-pad area
73//! ([`blocked_usages_from_tasks`](UsageQueueInner::blocked_usages_from_tasks)) to stash blocked
74//! ones. So, a conflict only incurs some additional fixed number of mem stores, within error
75//! margin of the constant complexity. And additional memory allocation for the scratchpad could
76//! said to be amortized, if such an unusual event should occur.
77//!
78//! [`Arc`] is used to implement this preloading mechanism, because `UsageQueue`s are shared across
79//! tasks accessing the same account, and among threads due to the preloading. Also, interior
80//! mutability is needed. However, `SchedulingStateMachine` doesn't use conventional locks like
81//! RwLock.  Leveraging the fact it's the only state-mutating exclusive thread, it instead uses
82//! `UnsafeCell`, which is sugar-coated by a tailored wrapper called [`TokenCell`]. `TokenCell`
83//! imposes an overly restrictive aliasing rule via rust type system to maintain the memory safety.
84//! By localizing any synchronization to the message passing, the scheduling code itself attains
85//! maximally possible single-threaed execution without stalling cpu pipelines at all, only
86//! constrained to mem access latency, while efficiently utilizing L1-L3 cpu cache with full of
87//! `UsageQueue`s.
88//!
89//! ### Buffer bloat insignificance
90//!
91//! The scheduler code itself doesn't care about the buffer bloat problem, which can occur in
92//! unified scheduler, where a run of heavily linearized and blocked tasks could be severely
93//! hampered by very large number of interleaved runnable tasks along side.  The reason is again
94//! for separation of concerns. This is acceptable because the scheduling code itself isn't
95//! susceptible to the buffer bloat problem by itself as explained by the description and validated
96//! by the mentioned benchmark above. Thus, this should be solved elsewhere, specifically at the
97//! scheduler pool.
98use {
99    crate::utils::{ShortCounter, Token, TokenCell},
100    assert_matches::assert_matches,
101    solana_pubkey::Pubkey,
102    solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
103    solana_transaction::sanitized::SanitizedTransaction,
104    static_assertions::const_assert_eq,
105    std::{collections::VecDeque, mem, sync::Arc},
106};
107
108#[derive(Clone, Copy, Debug)]
109pub enum SchedulingMode {
110    BlockVerification,
111    BlockProduction,
112}
113
114/// Internal utilities. Namely this contains [`ShortCounter`] and [`TokenCell`].
115mod utils {
116    use std::{
117        any::{self, TypeId},
118        cell::{RefCell, UnsafeCell},
119        collections::BTreeSet,
120        marker::PhantomData,
121        thread,
122    };
123
124    /// A really tiny counter to hide `.checked_{add,sub}` all over the place.
125    ///
126    /// It's caller's reponsibility to ensure this (backed by [`u32`]) never overflow.
127    #[derive(Debug, Clone, Copy)]
128    pub(super) struct ShortCounter(u32);
129
130    impl ShortCounter {
131        pub(super) fn zero() -> Self {
132            Self(0)
133        }
134
135        pub(super) fn one() -> Self {
136            Self(1)
137        }
138
139        pub(super) fn is_one(&self) -> bool {
140            self.0 == 1
141        }
142
143        pub(super) fn is_zero(&self) -> bool {
144            self.0 == 0
145        }
146
147        pub(super) fn current(&self) -> u32 {
148            self.0
149        }
150
151        #[must_use]
152        pub(super) fn increment(self) -> Self {
153            Self(self.0.checked_add(1).unwrap())
154        }
155
156        #[must_use]
157        pub(super) fn decrement(self) -> Self {
158            Self(self.0.checked_sub(1).unwrap())
159        }
160
161        pub(super) fn increment_self(&mut self) -> &mut Self {
162            *self = self.increment();
163            self
164        }
165
166        pub(super) fn decrement_self(&mut self) -> &mut Self {
167            *self = self.decrement();
168            self
169        }
170
171        pub(super) fn reset_to_zero(&mut self) -> &mut Self {
172            self.0 = 0;
173            self
174        }
175    }
176
177    /// A conditionally [`Send`]-able and [`Sync`]-able cell leveraging scheduler's one-by-one data
178    /// access pattern with zero runtime synchronization cost.
179    ///
180    /// To comply with Rust's aliasing rules, these cells require a carefully-created [`Token`] to
181    /// be passed around to access the inner values. The token is a special-purpose phantom object
182    /// to get rid of its inherent `unsafe`-ness in [`UnsafeCell`], which is internally used for
183    /// the interior mutability.
184    ///
185    /// The final objective of [`Token`] is to ensure there's only one mutable reference to the
186    /// [`TokenCell`] at most _at any given moment_. To that end, it's `unsafe` to create it,
187    /// shifting the responsibility of binding the only singleton instance to a particular thread
188    /// and not creating more than one, onto the API consumers. And its constructor is non-`const`,
189    /// and the type is `!Clone` (and `!Copy` as well), `!Default`, `!Send` and `!Sync` to make it
190    /// relatively hard to cross thread boundaries accidentally.
191    ///
192    /// In other words, the token semantically _owns_ all of its associated instances of
193    /// [`TokenCell`]s. And `&mut Token` is needed to access one of them as if the one is of
194    /// [`Token`]'s `*_mut()` getters. Thus, the Rust aliasing rule for `UnsafeCell` can
195    /// transitively be proven to be satisfied simply based on the usual borrow checking of the
196    /// `&mut` reference of [`Token`] itself via
197    /// [`::with_borrow_mut()`](TokenCell::with_borrow_mut).
198    ///
199    /// By extension, it's allowed to create _multiple_ tokens in a _single_ process as long as no
200    /// instance of [`TokenCell`] is shared by multiple instances of [`Token`].
201    ///
202    /// Note that this is overly restrictive in that it's forbidden, yet, technically possible
203    /// to _have multiple mutable references to the inner values at the same time, if and only
204    /// if the respective cells aren't aliased to each other (i.e. different instances)_. This
205    /// artificial restriction is acceptable for its intended use by the unified scheduler's code
206    /// because its algorithm only needs to access each instance of [`TokenCell`]-ed data once at a
207    /// time. Finally, this restriction is traded off for restoration of Rust aliasing rule at zero
208    /// runtime cost.  Without this token mechanism, there's no way to realize this.
209    #[derive(Debug, Default)]
210    pub(super) struct TokenCell<V>(UnsafeCell<V>);
211
212    impl<V> TokenCell<V> {
213        /// Creates a new `TokenCell` with the `value` typed as `V`.
214        ///
215        /// Note that this isn't parametric over the its accompanied `Token`'s lifetime to avoid
216        /// complex handling of non-`'static` heaped data in general. Instead, it's manually
217        /// required to ensure this instance is accessed only via its associated Token for the
218        /// entire lifetime.
219        ///
220        /// This is intentionally left to be non-`const` to forbid unprotected sharing via static
221        /// variables among threads.
222        pub(super) fn new(value: V) -> Self {
223            Self(UnsafeCell::new(value))
224        }
225
226        /// Acquires a mutable reference inside a given closure, while borrowing the mutable
227        /// reference of the given token.
228        ///
229        /// In this way, any additional reborrow can never happen at the same time across all
230        /// instances of [`TokenCell<V>`] conceptually owned by the instance of [`Token<V>`] (a
231        /// particular thread), unless previous borrow is released. After the release, the used
232        /// singleton token should be free to be reused for reborrows.
233        ///
234        /// Note that lifetime of the acquired reference is still restricted to 'self, not
235        /// 'token, in order to avoid use-after-free undefined behaviors.
236        pub(super) fn with_borrow_mut<R>(
237            &self,
238            _token: &mut Token<V>,
239            f: impl FnOnce(&mut V) -> R,
240        ) -> R {
241            f(unsafe { &mut *self.0.get() })
242        }
243    }
244
245    // Safety: Once after a (`Send`-able) `TokenCell` is transferred to a thread from other
246    // threads, access to `TokenCell` is assumed to be only from the single thread by proper use of
247    // Token. Thereby, implementing `Sync` can be thought as safe and doing so is needed for the
248    // particular implementation pattern in the unified scheduler (multi-threaded off-loading).
249    //
250    // In other words, TokenCell is technically still `!Sync`. But there should be no
251    // legalized usage which depends on real `Sync` to avoid undefined behaviors.
252    unsafe impl<V> Sync for TokenCell<V> {}
253
254    /// A auxiliary zero-sized type to enforce aliasing rule to [`TokenCell`] via rust type system
255    ///
256    /// Token semantically owns a collection of `TokenCell` objects and governs the _unique_
257    /// existence of mutable access over them by requiring the token itself to be mutably borrowed
258    /// to get a mutable reference to the internal value of `TokenCell`.
259    // *mut is used to make this type !Send and !Sync
260    pub(super) struct Token<V: 'static>(PhantomData<*mut V>);
261
262    impl<V> Token<V> {
263        /// Returns the token to acquire a mutable reference to the inner value of [TokenCell].
264        ///
265        /// This is intentionally left to be non-`const` to forbid unprotected sharing via static
266        /// variables among threads.
267        ///
268        /// # Panics
269        ///
270        /// This function will `panic!()` if called multiple times with same type `V` from the same
271        /// thread to detect potential misuses.
272        ///
273        /// # Safety
274        ///
275        /// This method should be called exactly once for each thread at most to avoid undefined
276        /// behavior when used with [`Token`].
277        #[must_use]
278        pub(super) unsafe fn assume_exclusive_mutating_thread() -> Self {
279            thread_local! {
280                static TOKENS: RefCell<BTreeSet<TypeId>> = const { RefCell::new(BTreeSet::new()) };
281            }
282            // TOKEN.with_borrow_mut can't panic because it's the only non-overlapping
283            // bound-to-local-variable borrow of the _thread local_ variable.
284            assert!(
285                TOKENS.with_borrow_mut(|tokens| tokens.insert(TypeId::of::<Self>())),
286                "{:?} is wrongly initialized twice on {:?}",
287                any::type_name::<Self>(),
288                thread::current()
289            );
290
291            Self(PhantomData)
292        }
293    }
294
295    #[cfg(test)]
296    mod tests {
297        use {
298            super::{Token, TokenCell},
299            std::{mem, sync::Arc, thread},
300        };
301
302        #[test]
303        #[should_panic(
304            expected = "\"solana_unified_scheduler_logic::utils::Token<usize>\" is wrongly \
305                        initialized twice on Thread"
306        )]
307        fn test_second_creation_of_tokens_in_a_thread() {
308            unsafe {
309                let _ = Token::<usize>::assume_exclusive_mutating_thread();
310                let _ = Token::<usize>::assume_exclusive_mutating_thread();
311            }
312        }
313
314        #[derive(Debug)]
315        struct FakeQueue {
316            v: Vec<u8>,
317        }
318
319        // As documented above, it's illegal to create multiple tokens inside a single thread to
320        // acquire multiple mutable references to the same TokenCell at the same time.
321        #[test]
322        // Trigger (harmless) UB unless running under miri by conditionally #[ignore]-ing,
323        // confirming false-positive result to conversely show the merit of miri!
324        #[cfg_attr(miri, ignore)]
325        fn test_ub_illegally_created_multiple_tokens() {
326            // Unauthorized token minting!
327            let mut token1 = unsafe { mem::transmute::<(), Token<FakeQueue>>(()) };
328            let mut token2 = unsafe { mem::transmute::<(), Token<FakeQueue>>(()) };
329
330            let queue = TokenCell::new(FakeQueue {
331                v: Vec::with_capacity(20),
332            });
333            queue.with_borrow_mut(&mut token1, |queue_mut1| {
334                queue_mut1.v.push(1);
335                queue.with_borrow_mut(&mut token2, |queue_mut2| {
336                    queue_mut2.v.push(2);
337                    queue_mut1.v.push(3);
338                });
339                queue_mut1.v.push(4);
340            });
341
342            // It's in ub already, so we can't assert reliably, so dbg!(...) just for fun
343            #[cfg(not(miri))]
344            dbg!(queue.0.into_inner());
345
346            // Return successfully to indicate an unexpected outcome, because this test should
347            // have aborted by now.
348        }
349
350        // As documented above, it's illegal to share (= co-own) the same instance of TokenCell
351        // across threads. Unfortunately, we can't prevent this from happening with some
352        // type-safety magic to cause compile errors... So sanity-check here test fails due to a
353        // runtime error of the known UB, when run under miri.
354        #[test]
355        // Trigger (harmless) UB unless running under miri by conditionally #[ignore]-ing,
356        // confirming false-positive result to conversely show the merit of miri!
357        #[cfg_attr(miri, ignore)]
358        fn test_ub_illegally_shared_token_cell() {
359            let queue1 = Arc::new(TokenCell::new(FakeQueue {
360                v: Vec::with_capacity(20),
361            }));
362            let queue2 = queue1.clone();
363            #[cfg(not(miri))]
364            let queue3 = queue1.clone();
365
366            // Usually miri immediately detects the data race; but just repeat enough time to avoid
367            // being flaky
368            for _ in 0..10 {
369                let (queue1, queue2) = (queue1.clone(), queue2.clone());
370                let thread1 = thread::spawn(move || {
371                    let mut token = unsafe { Token::assume_exclusive_mutating_thread() };
372                    queue1.with_borrow_mut(&mut token, |queue| {
373                        // this is UB
374                        queue.v.push(3);
375                    });
376                });
377                // Immediately spawn next thread without joining thread1 to ensure there's a data race
378                // definitely. Otherwise, joining here wouldn't cause UB.
379                let thread2 = thread::spawn(move || {
380                    let mut token = unsafe { Token::assume_exclusive_mutating_thread() };
381                    queue2.with_borrow_mut(&mut token, |queue| {
382                        // this is UB
383                        queue.v.push(4);
384                    });
385                });
386
387                thread1.join().unwrap();
388                thread2.join().unwrap();
389            }
390
391            // It's in ub already, so we can't assert reliably, so dbg!(...) just for fun
392            #[cfg(not(miri))]
393            {
394                drop((queue1, queue2));
395                dbg!(Arc::into_inner(queue3).unwrap().0.into_inner());
396            }
397
398            // Return successfully to indicate an unexpected outcome, because this test should
399            // have aborted by now
400        }
401    }
402}
403
404/// [`Result`] for locking a [usage_queue](UsageQueue) with particular
405/// [current_usage](RequestedUsage).
406type LockResult = Result<(), ()>;
407const_assert_eq!(mem::size_of::<LockResult>(), 1);
408
409/// Something to be scheduled; usually a wrapper of [`SanitizedTransaction`].
410pub type Task = Arc<TaskInner>;
411const_assert_eq!(mem::size_of::<Task>(), 8);
412
413/// [`Token`] for [`UsageQueue`].
414type UsageQueueToken = Token<UsageQueueInner>;
415const_assert_eq!(mem::size_of::<UsageQueueToken>(), 0);
416
417/// [`Token`] for [task](Task)'s [internal mutable data](`TaskInner::blocked_usage_count`).
418type BlockedUsageCountToken = Token<ShortCounter>;
419const_assert_eq!(mem::size_of::<BlockedUsageCountToken>(), 0);
420
421/// Internal scheduling data about a particular task.
422#[derive(Debug)]
423pub struct TaskInner {
424    transaction: RuntimeTransaction<SanitizedTransaction>,
425    /// The index of a transaction in ledger entries; not used by SchedulingStateMachine by itself.
426    /// Carrying this along with the transaction is needed to properly record the execution result
427    /// of it.
428    index: usize,
429    lock_contexts: Vec<LockContext>,
430    blocked_usage_count: TokenCell<ShortCounter>,
431}
432
433impl TaskInner {
434    pub fn task_index(&self) -> usize {
435        self.index
436    }
437
438    pub fn transaction(&self) -> &RuntimeTransaction<SanitizedTransaction> {
439        &self.transaction
440    }
441
442    fn lock_contexts(&self) -> &[LockContext] {
443        &self.lock_contexts
444    }
445
446    fn set_blocked_usage_count(&self, token: &mut BlockedUsageCountToken, count: ShortCounter) {
447        self.blocked_usage_count
448            .with_borrow_mut(token, |usage_count| {
449                *usage_count = count;
450            })
451    }
452
453    #[must_use]
454    fn try_unblock(self: Task, token: &mut BlockedUsageCountToken) -> Option<Task> {
455        let did_unblock = self
456            .blocked_usage_count
457            .with_borrow_mut(token, |usage_count| usage_count.decrement_self().is_zero());
458        did_unblock.then_some(self)
459    }
460}
461
462/// [`Task`]'s per-address context to lock a [usage_queue](UsageQueue) with [certain kind of
463/// request](RequestedUsage).
464#[derive(Debug)]
465struct LockContext {
466    usage_queue: UsageQueue,
467    requested_usage: RequestedUsage,
468}
469const_assert_eq!(mem::size_of::<LockContext>(), 16);
470
471impl LockContext {
472    fn new(usage_queue: UsageQueue, requested_usage: RequestedUsage) -> Self {
473        Self {
474            usage_queue,
475            requested_usage,
476        }
477    }
478
479    fn with_usage_queue_mut<R>(
480        &self,
481        usage_queue_token: &mut UsageQueueToken,
482        f: impl FnOnce(&mut UsageQueueInner) -> R,
483    ) -> R {
484        self.usage_queue.0.with_borrow_mut(usage_queue_token, f)
485    }
486}
487
488/// Status about how the [`UsageQueue`] is used currently.
489#[derive(Copy, Clone, Debug)]
490enum Usage {
491    Readonly(ShortCounter),
492    Writable,
493}
494const_assert_eq!(mem::size_of::<Usage>(), 8);
495
496impl From<RequestedUsage> for Usage {
497    fn from(requested_usage: RequestedUsage) -> Self {
498        match requested_usage {
499            RequestedUsage::Readonly => Usage::Readonly(ShortCounter::one()),
500            RequestedUsage::Writable => Usage::Writable,
501        }
502    }
503}
504
505/// Status about how a task is requesting to use a particular [`UsageQueue`].
506#[derive(Clone, Copy, Debug)]
507enum RequestedUsage {
508    Readonly,
509    Writable,
510}
511
512/// Internal scheduling data about a particular address.
513///
514/// Specifically, it holds the current [`Usage`] (or no usage with [`Usage::Unused`]) and which
515/// [`Task`]s are blocked to be executed after the current task is notified to be finished via
516/// [`::deschedule_task`](`SchedulingStateMachine::deschedule_task`)
517#[derive(Debug)]
518struct UsageQueueInner {
519    current_usage: Option<Usage>,
520    blocked_usages_from_tasks: VecDeque<UsageFromTask>,
521}
522
523type UsageFromTask = (RequestedUsage, Task);
524
525impl Default for UsageQueueInner {
526    fn default() -> Self {
527        Self {
528            current_usage: None,
529            // Capacity should be configurable to create with large capacity like 1024 inside the
530            // (multi-threaded) closures passed to create_task(). In this way, reallocs can be
531            // avoided happening in the scheduler thread. Also, this configurability is desired for
532            // unified-scheduler-logic's motto: separation of concerns (the pure logic should be
533            // sufficiently distanced from any some random knob's constants needed for messy
534            // reality for author's personal preference...).
535            //
536            // Note that large cap should be accompanied with proper scheduler cleaning after use,
537            // which should be handled by higher layers (i.e. scheduler pool).
538            blocked_usages_from_tasks: VecDeque::with_capacity(128),
539        }
540    }
541}
542
543impl UsageQueueInner {
544    fn try_lock(&mut self, requested_usage: RequestedUsage) -> LockResult {
545        match self.current_usage {
546            None => Some(Usage::from(requested_usage)),
547            Some(Usage::Readonly(count)) => match requested_usage {
548                RequestedUsage::Readonly => Some(Usage::Readonly(count.increment())),
549                RequestedUsage::Writable => None,
550            },
551            Some(Usage::Writable) => None,
552        }
553        .inspect(|&new_usage| {
554            self.current_usage = Some(new_usage);
555        })
556        .map(|_| ())
557        .ok_or(())
558    }
559
560    #[must_use]
561    fn unlock(&mut self, requested_usage: RequestedUsage) -> Option<UsageFromTask> {
562        let mut is_unused_now = false;
563        match &mut self.current_usage {
564            Some(Usage::Readonly(ref mut count)) => match requested_usage {
565                RequestedUsage::Readonly => {
566                    if count.is_one() {
567                        is_unused_now = true;
568                    } else {
569                        count.decrement_self();
570                    }
571                }
572                RequestedUsage::Writable => unreachable!(),
573            },
574            Some(Usage::Writable) => match requested_usage {
575                RequestedUsage::Writable => {
576                    is_unused_now = true;
577                }
578                RequestedUsage::Readonly => unreachable!(),
579            },
580            None => unreachable!(),
581        }
582
583        if is_unused_now {
584            self.current_usage = None;
585            self.blocked_usages_from_tasks.pop_front()
586        } else {
587            None
588        }
589    }
590
591    fn push_blocked_usage_from_task(&mut self, usage_from_task: UsageFromTask) {
592        assert_matches!(self.current_usage, Some(_));
593        self.blocked_usages_from_tasks.push_back(usage_from_task);
594    }
595
596    #[must_use]
597    fn pop_unblocked_readonly_usage_from_task(&mut self) -> Option<UsageFromTask> {
598        if matches!(
599            self.blocked_usages_from_tasks.front(),
600            Some((RequestedUsage::Readonly, _))
601        ) {
602            assert_matches!(self.current_usage, Some(Usage::Readonly(_)));
603            self.blocked_usages_from_tasks.pop_front()
604        } else {
605            None
606        }
607    }
608
609    fn has_no_blocked_usage(&self) -> bool {
610        self.blocked_usages_from_tasks.is_empty()
611    }
612}
613
614const_assert_eq!(mem::size_of::<TokenCell<UsageQueueInner>>(), 40);
615
616/// Scheduler's internal data for each address ([`Pubkey`](`solana_sdk::pubkey::Pubkey`)). Very
617/// opaque wrapper type; no methods just with [`::clone()`](Clone::clone) and
618/// [`::default()`](Default::default).
619#[derive(Debug, Clone, Default)]
620pub struct UsageQueue(Arc<TokenCell<UsageQueueInner>>);
621const_assert_eq!(mem::size_of::<UsageQueue>(), 8);
622
623/// A high-level `struct`, managing the overall scheduling of [tasks](Task), to be used by
624/// `solana-unified-scheduler-pool`.
625pub struct SchedulingStateMachine {
626    unblocked_task_queue: VecDeque<Task>,
627    active_task_count: ShortCounter,
628    handled_task_count: ShortCounter,
629    unblocked_task_count: ShortCounter,
630    total_task_count: ShortCounter,
631    count_token: BlockedUsageCountToken,
632    usage_queue_token: UsageQueueToken,
633}
634const_assert_eq!(mem::size_of::<SchedulingStateMachine>(), 48);
635
636impl SchedulingStateMachine {
637    pub fn has_no_active_task(&self) -> bool {
638        self.active_task_count.is_zero()
639    }
640
641    pub fn has_unblocked_task(&self) -> bool {
642        !self.unblocked_task_queue.is_empty()
643    }
644
645    pub fn unblocked_task_queue_count(&self) -> usize {
646        self.unblocked_task_queue.len()
647    }
648
649    pub fn active_task_count(&self) -> u32 {
650        self.active_task_count.current()
651    }
652
653    pub fn handled_task_count(&self) -> u32 {
654        self.handled_task_count.current()
655    }
656
657    pub fn unblocked_task_count(&self) -> u32 {
658        self.unblocked_task_count.current()
659    }
660
661    pub fn total_task_count(&self) -> u32 {
662        self.total_task_count.current()
663    }
664
665    /// Schedules given `task`, returning it if successful.
666    ///
667    /// Returns `Some(task)` if it's immediately scheduled. Otherwise, returns `None`,
668    /// indicating the scheduled task is blocked currently.
669    ///
670    /// Note that this function takes ownership of the task to allow for future optimizations.
671    #[must_use]
672    pub fn schedule_task(&mut self, task: Task) -> Option<Task> {
673        self.total_task_count.increment_self();
674        self.active_task_count.increment_self();
675        self.try_lock_usage_queues(task)
676    }
677
678    #[must_use]
679    pub fn schedule_next_unblocked_task(&mut self) -> Option<Task> {
680        self.unblocked_task_queue.pop_front().inspect(|_| {
681            self.unblocked_task_count.increment_self();
682        })
683    }
684
685    /// Deschedules given scheduled `task`.
686    ///
687    /// This must be called exactly once for all scheduled tasks to uphold both
688    /// `SchedulingStateMachine` and `UsageQueue` internal state consistency at any given moment of
689    /// time. It's serious logic error to call this twice with the same task or none at all after
690    /// scheduling. Similarly, calling this with not scheduled task is also forbidden.
691    ///
692    /// Note that this function intentionally doesn't take ownership of the task to avoid dropping
693    /// tasks inside `SchedulingStateMachine` to provide an offloading-based optimization
694    /// opportunity for callers.
695    pub fn deschedule_task(&mut self, task: &Task) {
696        self.active_task_count.decrement_self();
697        self.handled_task_count.increment_self();
698        self.unlock_usage_queues(task);
699    }
700
701    #[must_use]
702    fn try_lock_usage_queues(&mut self, task: Task) -> Option<Task> {
703        let mut blocked_usage_count = ShortCounter::zero();
704
705        for context in task.lock_contexts() {
706            context.with_usage_queue_mut(&mut self.usage_queue_token, |usage_queue| {
707                let lock_result = if usage_queue.has_no_blocked_usage() {
708                    usage_queue.try_lock(context.requested_usage)
709                } else {
710                    LockResult::Err(())
711                };
712                if let Err(()) = lock_result {
713                    blocked_usage_count.increment_self();
714                    let usage_from_task = (context.requested_usage, task.clone());
715                    usage_queue.push_blocked_usage_from_task(usage_from_task);
716                }
717            });
718        }
719
720        // no blocked usage count means success
721        if blocked_usage_count.is_zero() {
722            Some(task)
723        } else {
724            task.set_blocked_usage_count(&mut self.count_token, blocked_usage_count);
725            None
726        }
727    }
728
729    fn unlock_usage_queues(&mut self, task: &Task) {
730        for context in task.lock_contexts() {
731            context.with_usage_queue_mut(&mut self.usage_queue_token, |usage_queue| {
732                let mut unblocked_task_from_queue = usage_queue.unlock(context.requested_usage);
733
734                while let Some((requested_usage, task_with_unblocked_queue)) =
735                    unblocked_task_from_queue
736                {
737                    // When `try_unblock()` returns `None` as a failure of unblocking this time,
738                    // this means the task is still blocked by other active task's usages. So,
739                    // don't push task into unblocked_task_queue yet. It can be assumed that every
740                    // task will eventually succeed to be unblocked, and enter in this condition
741                    // clause as long as `SchedulingStateMachine` is used correctly.
742                    if let Some(task) = task_with_unblocked_queue.try_unblock(&mut self.count_token)
743                    {
744                        self.unblocked_task_queue.push_back(task);
745                    }
746
747                    match usage_queue.try_lock(requested_usage) {
748                        LockResult::Ok(()) => {
749                            // Try to further schedule blocked task for parallelism in the case of
750                            // readonly usages
751                            unblocked_task_from_queue =
752                                if matches!(requested_usage, RequestedUsage::Readonly) {
753                                    usage_queue.pop_unblocked_readonly_usage_from_task()
754                                } else {
755                                    None
756                                };
757                        }
758                        LockResult::Err(()) => panic!("should never fail in this context"),
759                    }
760                }
761            });
762        }
763    }
764
765    /// Creates a new task with [`RuntimeTransaction<SanitizedTransaction>`] with all of
766    /// its corresponding [`UsageQueue`]s preloaded.
767    ///
768    /// Closure (`usage_queue_loader`) is used to delegate the (possibly multi-threaded)
769    /// implementation of [`UsageQueue`] look-up by [`pubkey`](Pubkey) to callers. It's the
770    /// caller's responsibility to ensure the same instance is returned from the closure, given a
771    /// particular pubkey.
772    ///
773    /// Closure is used here to delegate the responsibility of primary ownership of `UsageQueue`
774    /// (and caching/pruning if any) to the caller. `SchedulingStateMachine` guarantees that all of
775    /// shared owndership of `UsageQueue`s are released and UsageQueue state is identical to just
776    /// after created, if `has_no_active_task()` is `true`. Also note that this is desired for
777    /// separation of concern.
778    pub fn create_task(
779        transaction: RuntimeTransaction<SanitizedTransaction>,
780        index: usize,
781        usage_queue_loader: &mut impl FnMut(Pubkey) -> UsageQueue,
782    ) -> Task {
783        // It's crucial for tasks to be validated with
784        // `account_locks::validate_account_locks()` prior to the creation.
785        // That's because it's part of protocol consensus regarding the
786        // rejection of blocks containing malformed transactions
787        // (`AccountLoadedTwice` and `TooManyAccountLocks`). Even more,
788        // `SchedulingStateMachine` can't properly handle transactions with
789        // duplicate addresses (those falling under `AccountLoadedTwice`).
790        //
791        // However, it's okay for now not to call `::validate_account_locks()`
792        // here.
793        //
794        // Currently `replay_stage` is always calling
795        //`::validate_account_locks()` regardless of whether unified-scheduler
796        // is enabled or not at the blockstore
797        // (`Bank::prepare_sanitized_batch()` is called in
798        // `process_entries()`). This verification will be hoisted for
799        // optimization when removing
800        // `--block-verification-method=blockstore-processor`.
801        //
802        // As for `banking_stage` with unified scheduler, it will need to run
803        // `validate_account_locks()` at least once somewhere in the code path.
804        // In the distant future, this function (`create_task()`) should be
805        // adjusted so that both stages do the checks before calling this or do
806        // the checks here, to simplify the two code paths regarding the
807        // essential `validate_account_locks` validation.
808        //
809        // Lastly, `validate_account_locks()` is currently called in
810        // `DefaultTransactionHandler::handle()` via
811        // `Bank::prepare_unlocked_batch_from_single_tx()` as well.
812        // This redundancy is known. It was just left as-is out of abundance
813        // of caution.
814        let lock_contexts = transaction
815            .message()
816            .account_keys()
817            .iter()
818            .enumerate()
819            .map(|(index, address)| {
820                LockContext::new(
821                    usage_queue_loader(*address),
822                    if transaction.message().is_writable(index) {
823                        RequestedUsage::Writable
824                    } else {
825                        RequestedUsage::Readonly
826                    },
827                )
828            })
829            .collect();
830
831        Task::new(TaskInner {
832            transaction,
833            index,
834            lock_contexts,
835            blocked_usage_count: TokenCell::new(ShortCounter::zero()),
836        })
837    }
838
839    /// Rewind the inactive state machine to be initialized
840    ///
841    /// This isn't called _reset_ to indicate this isn't safe to call this at any given moment.
842    /// This panics if the state machine hasn't properly been finished (i.e.  there should be no
843    /// active task) to uphold invariants of [`UsageQueue`]s.
844    ///
845    /// This method is intended to reuse SchedulingStateMachine instance (to avoid its `unsafe`
846    /// [constructor](SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling)
847    /// as much as possible) and its (possibly cached) associated [`UsageQueue`]s for processing
848    /// other slots.
849    pub fn reinitialize(&mut self) {
850        assert!(self.has_no_active_task());
851        assert_eq!(self.unblocked_task_queue.len(), 0);
852        // nice trick to ensure all fields are handled here if new one is added.
853        let Self {
854            unblocked_task_queue: _,
855            active_task_count,
856            handled_task_count,
857            unblocked_task_count,
858            total_task_count,
859            count_token: _,
860            usage_queue_token: _,
861            // don't add ".." here
862        } = self;
863        active_task_count.reset_to_zero();
864        handled_task_count.reset_to_zero();
865        unblocked_task_count.reset_to_zero();
866        total_task_count.reset_to_zero();
867    }
868
869    /// Creates a new instance of [`SchedulingStateMachine`] with its `unsafe` fields created as
870    /// well, thus carrying over `unsafe`.
871    ///
872    /// # Safety
873    /// Call this exactly once for each thread. See [`TokenCell`] for details.
874    #[must_use]
875    pub unsafe fn exclusively_initialize_current_thread_for_scheduling() -> Self {
876        Self {
877            // It's very unlikely this is desired to be configurable, like
878            // `UsageQueueInner::blocked_usages_from_tasks`'s cap.
879            unblocked_task_queue: VecDeque::with_capacity(1024),
880            active_task_count: ShortCounter::zero(),
881            handled_task_count: ShortCounter::zero(),
882            unblocked_task_count: ShortCounter::zero(),
883            total_task_count: ShortCounter::zero(),
884            count_token: unsafe { BlockedUsageCountToken::assume_exclusive_mutating_thread() },
885            usage_queue_token: unsafe { UsageQueueToken::assume_exclusive_mutating_thread() },
886        }
887    }
888}
889
890#[cfg(test)]
891mod tests {
892    use {
893        super::*,
894        solana_instruction::{AccountMeta, Instruction},
895        solana_message::Message,
896        solana_pubkey::Pubkey,
897        solana_transaction::{sanitized::SanitizedTransaction, Transaction},
898        std::{cell::RefCell, collections::HashMap, rc::Rc},
899    };
900
901    fn simplest_transaction() -> RuntimeTransaction<SanitizedTransaction> {
902        let message = Message::new(&[], Some(&Pubkey::new_unique()));
903        let unsigned = Transaction::new_unsigned(message);
904        RuntimeTransaction::from_transaction_for_tests(unsigned)
905    }
906
907    fn transaction_with_readonly_address(
908        address: Pubkey,
909    ) -> RuntimeTransaction<SanitizedTransaction> {
910        let instruction = Instruction {
911            program_id: Pubkey::default(),
912            accounts: vec![AccountMeta::new_readonly(address, false)],
913            data: vec![],
914        };
915        let message = Message::new(&[instruction], Some(&Pubkey::new_unique()));
916        let unsigned = Transaction::new_unsigned(message);
917        RuntimeTransaction::from_transaction_for_tests(unsigned)
918    }
919
920    fn transaction_with_writable_address(
921        address: Pubkey,
922    ) -> RuntimeTransaction<SanitizedTransaction> {
923        let instruction = Instruction {
924            program_id: Pubkey::default(),
925            accounts: vec![AccountMeta::new(address, false)],
926            data: vec![],
927        };
928        let message = Message::new(&[instruction], Some(&Pubkey::new_unique()));
929        let unsigned = Transaction::new_unsigned(message);
930        RuntimeTransaction::from_transaction_for_tests(unsigned)
931    }
932
933    fn create_address_loader(
934        usage_queues: Option<Rc<RefCell<HashMap<Pubkey, UsageQueue>>>>,
935    ) -> impl FnMut(Pubkey) -> UsageQueue {
936        let usage_queues = usage_queues.unwrap_or_default();
937        move |address| {
938            usage_queues
939                .borrow_mut()
940                .entry(address)
941                .or_default()
942                .clone()
943        }
944    }
945
946    #[test]
947    fn test_scheduling_state_machine_creation() {
948        let state_machine = unsafe {
949            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
950        };
951        assert_eq!(state_machine.active_task_count(), 0);
952        assert_eq!(state_machine.total_task_count(), 0);
953        assert!(state_machine.has_no_active_task());
954    }
955
956    #[test]
957    fn test_scheduling_state_machine_good_reinitialization() {
958        let mut state_machine = unsafe {
959            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
960        };
961        state_machine.total_task_count.increment_self();
962        assert_eq!(state_machine.total_task_count(), 1);
963        state_machine.reinitialize();
964        assert_eq!(state_machine.total_task_count(), 0);
965    }
966
967    #[test]
968    #[should_panic(expected = "assertion failed: self.has_no_active_task()")]
969    fn test_scheduling_state_machine_bad_reinitialization() {
970        let mut state_machine = unsafe {
971            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
972        };
973        let address_loader = &mut create_address_loader(None);
974        let task = SchedulingStateMachine::create_task(simplest_transaction(), 3, address_loader);
975        state_machine.schedule_task(task).unwrap();
976        state_machine.reinitialize();
977    }
978
979    #[test]
980    fn test_create_task() {
981        let sanitized = simplest_transaction();
982        let signature = *sanitized.signature();
983        let task =
984            SchedulingStateMachine::create_task(sanitized, 3, &mut |_| UsageQueue::default());
985        assert_eq!(task.task_index(), 3);
986        assert_eq!(task.transaction().signature(), &signature);
987    }
988
989    #[test]
990    fn test_non_conflicting_task_related_counts() {
991        let sanitized = simplest_transaction();
992        let address_loader = &mut create_address_loader(None);
993        let task = SchedulingStateMachine::create_task(sanitized, 3, address_loader);
994
995        let mut state_machine = unsafe {
996            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
997        };
998        let task = state_machine.schedule_task(task).unwrap();
999        assert_eq!(state_machine.active_task_count(), 1);
1000        assert_eq!(state_machine.total_task_count(), 1);
1001        state_machine.deschedule_task(&task);
1002        assert_eq!(state_machine.active_task_count(), 0);
1003        assert_eq!(state_machine.total_task_count(), 1);
1004        assert!(state_machine.has_no_active_task());
1005    }
1006
1007    #[test]
1008    fn test_conflicting_task_related_counts() {
1009        let sanitized = simplest_transaction();
1010        let address_loader = &mut create_address_loader(None);
1011        let task1 = SchedulingStateMachine::create_task(sanitized.clone(), 101, address_loader);
1012        let task2 = SchedulingStateMachine::create_task(sanitized.clone(), 102, address_loader);
1013        let task3 = SchedulingStateMachine::create_task(sanitized.clone(), 103, address_loader);
1014
1015        let mut state_machine = unsafe {
1016            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1017        };
1018        assert_matches!(
1019            state_machine
1020                .schedule_task(task1.clone())
1021                .map(|t| t.task_index()),
1022            Some(101)
1023        );
1024        assert_matches!(state_machine.schedule_task(task2.clone()), None);
1025
1026        state_machine.deschedule_task(&task1);
1027        assert!(state_machine.has_unblocked_task());
1028        assert_eq!(state_machine.unblocked_task_queue_count(), 1);
1029
1030        // unblocked_task_count() should be incremented
1031        assert_eq!(state_machine.unblocked_task_count(), 0);
1032        assert_eq!(
1033            state_machine
1034                .schedule_next_unblocked_task()
1035                .map(|t| t.task_index()),
1036            Some(102)
1037        );
1038        assert_eq!(state_machine.unblocked_task_count(), 1);
1039
1040        // there's no blocked task anymore; calling schedule_next_unblocked_task should be noop and
1041        // shouldn't increment the unblocked_task_count().
1042        assert!(!state_machine.has_unblocked_task());
1043        assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1044        assert_eq!(state_machine.unblocked_task_count(), 1);
1045
1046        assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1047        state_machine.deschedule_task(&task2);
1048
1049        assert_matches!(
1050            state_machine
1051                .schedule_task(task3.clone())
1052                .map(|task| task.task_index()),
1053            Some(103)
1054        );
1055        state_machine.deschedule_task(&task3);
1056        assert!(state_machine.has_no_active_task());
1057    }
1058
1059    #[test]
1060    fn test_existing_blocking_task_then_newly_scheduled_task() {
1061        let sanitized = simplest_transaction();
1062        let address_loader = &mut create_address_loader(None);
1063        let task1 = SchedulingStateMachine::create_task(sanitized.clone(), 101, address_loader);
1064        let task2 = SchedulingStateMachine::create_task(sanitized.clone(), 102, address_loader);
1065        let task3 = SchedulingStateMachine::create_task(sanitized.clone(), 103, address_loader);
1066
1067        let mut state_machine = unsafe {
1068            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1069        };
1070        assert_matches!(
1071            state_machine
1072                .schedule_task(task1.clone())
1073                .map(|t| t.task_index()),
1074            Some(101)
1075        );
1076        assert_matches!(state_machine.schedule_task(task2.clone()), None);
1077
1078        assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1079        state_machine.deschedule_task(&task1);
1080        assert_eq!(state_machine.unblocked_task_queue_count(), 1);
1081
1082        // new task is arriving after task1 is already descheduled and task2 got unblocked
1083        assert_matches!(state_machine.schedule_task(task3.clone()), None);
1084
1085        assert_eq!(state_machine.unblocked_task_count(), 0);
1086        assert_matches!(
1087            state_machine
1088                .schedule_next_unblocked_task()
1089                .map(|t| t.task_index()),
1090            Some(102)
1091        );
1092        assert_eq!(state_machine.unblocked_task_count(), 1);
1093
1094        state_machine.deschedule_task(&task2);
1095
1096        assert_matches!(
1097            state_machine
1098                .schedule_next_unblocked_task()
1099                .map(|t| t.task_index()),
1100            Some(103)
1101        );
1102        assert_eq!(state_machine.unblocked_task_count(), 2);
1103
1104        state_machine.deschedule_task(&task3);
1105        assert!(state_machine.has_no_active_task());
1106    }
1107
1108    #[test]
1109    fn test_multiple_readonly_task_and_counts() {
1110        let conflicting_address = Pubkey::new_unique();
1111        let sanitized1 = transaction_with_readonly_address(conflicting_address);
1112        let sanitized2 = transaction_with_readonly_address(conflicting_address);
1113        let address_loader = &mut create_address_loader(None);
1114        let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1115        let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1116
1117        let mut state_machine = unsafe {
1118            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1119        };
1120        // both of read-only tasks should be immediately runnable
1121        assert_matches!(
1122            state_machine
1123                .schedule_task(task1.clone())
1124                .map(|t| t.task_index()),
1125            Some(101)
1126        );
1127        assert_matches!(
1128            state_machine
1129                .schedule_task(task2.clone())
1130                .map(|t| t.task_index()),
1131            Some(102)
1132        );
1133
1134        assert_eq!(state_machine.active_task_count(), 2);
1135        assert_eq!(state_machine.handled_task_count(), 0);
1136        assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1137        state_machine.deschedule_task(&task1);
1138        assert_eq!(state_machine.active_task_count(), 1);
1139        assert_eq!(state_machine.handled_task_count(), 1);
1140        assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1141        state_machine.deschedule_task(&task2);
1142        assert_eq!(state_machine.active_task_count(), 0);
1143        assert_eq!(state_machine.handled_task_count(), 2);
1144        assert!(state_machine.has_no_active_task());
1145    }
1146
1147    #[test]
1148    fn test_all_blocking_readable_tasks_block_writable_task() {
1149        let conflicting_address = Pubkey::new_unique();
1150        let sanitized1 = transaction_with_readonly_address(conflicting_address);
1151        let sanitized2 = transaction_with_readonly_address(conflicting_address);
1152        let sanitized3 = transaction_with_writable_address(conflicting_address);
1153        let address_loader = &mut create_address_loader(None);
1154        let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1155        let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1156        let task3 = SchedulingStateMachine::create_task(sanitized3, 103, address_loader);
1157
1158        let mut state_machine = unsafe {
1159            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1160        };
1161        assert_matches!(
1162            state_machine
1163                .schedule_task(task1.clone())
1164                .map(|t| t.task_index()),
1165            Some(101)
1166        );
1167        assert_matches!(
1168            state_machine
1169                .schedule_task(task2.clone())
1170                .map(|t| t.task_index()),
1171            Some(102)
1172        );
1173        assert_matches!(state_machine.schedule_task(task3.clone()), None);
1174
1175        assert_eq!(state_machine.active_task_count(), 3);
1176        assert_eq!(state_machine.handled_task_count(), 0);
1177        assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1178        state_machine.deschedule_task(&task1);
1179        assert_eq!(state_machine.active_task_count(), 2);
1180        assert_eq!(state_machine.handled_task_count(), 1);
1181        assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1182        assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1183        state_machine.deschedule_task(&task2);
1184        assert_eq!(state_machine.active_task_count(), 1);
1185        assert_eq!(state_machine.handled_task_count(), 2);
1186        assert_eq!(state_machine.unblocked_task_queue_count(), 1);
1187        // task3 is finally unblocked after all of readable tasks (task1 and task2) is finished.
1188        assert_matches!(
1189            state_machine
1190                .schedule_next_unblocked_task()
1191                .map(|t| t.task_index()),
1192            Some(103)
1193        );
1194        state_machine.deschedule_task(&task3);
1195        assert!(state_machine.has_no_active_task());
1196    }
1197
1198    #[test]
1199    fn test_readonly_then_writable_then_readonly_linearized() {
1200        let conflicting_address = Pubkey::new_unique();
1201        let sanitized1 = transaction_with_readonly_address(conflicting_address);
1202        let sanitized2 = transaction_with_writable_address(conflicting_address);
1203        let sanitized3 = transaction_with_readonly_address(conflicting_address);
1204        let address_loader = &mut create_address_loader(None);
1205        let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1206        let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1207        let task3 = SchedulingStateMachine::create_task(sanitized3, 103, address_loader);
1208
1209        let mut state_machine = unsafe {
1210            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1211        };
1212        assert_matches!(
1213            state_machine
1214                .schedule_task(task1.clone())
1215                .map(|t| t.task_index()),
1216            Some(101)
1217        );
1218        assert_matches!(state_machine.schedule_task(task2.clone()), None);
1219        assert_matches!(state_machine.schedule_task(task3.clone()), None);
1220
1221        assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1222        state_machine.deschedule_task(&task1);
1223        assert_matches!(
1224            state_machine
1225                .schedule_next_unblocked_task()
1226                .map(|t| t.task_index()),
1227            Some(102)
1228        );
1229        assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1230        state_machine.deschedule_task(&task2);
1231        assert_matches!(
1232            state_machine
1233                .schedule_next_unblocked_task()
1234                .map(|t| t.task_index()),
1235            Some(103)
1236        );
1237        assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1238        state_machine.deschedule_task(&task3);
1239        assert!(state_machine.has_no_active_task());
1240    }
1241
1242    #[test]
1243    fn test_readonly_then_writable() {
1244        let conflicting_address = Pubkey::new_unique();
1245        let sanitized1 = transaction_with_readonly_address(conflicting_address);
1246        let sanitized2 = transaction_with_writable_address(conflicting_address);
1247        let address_loader = &mut create_address_loader(None);
1248        let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1249        let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1250
1251        let mut state_machine = unsafe {
1252            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1253        };
1254        assert_matches!(
1255            state_machine
1256                .schedule_task(task1.clone())
1257                .map(|t| t.task_index()),
1258            Some(101)
1259        );
1260        assert_matches!(state_machine.schedule_task(task2.clone()), None);
1261
1262        // descheduling read-locking task1 should equate to unblocking write-locking task2
1263        state_machine.deschedule_task(&task1);
1264        assert_matches!(
1265            state_machine
1266                .schedule_next_unblocked_task()
1267                .map(|t| t.task_index()),
1268            Some(102)
1269        );
1270        state_machine.deschedule_task(&task2);
1271        assert!(state_machine.has_no_active_task());
1272    }
1273
1274    #[test]
1275    fn test_blocked_tasks_writable_2_readonly_then_writable() {
1276        let conflicting_address = Pubkey::new_unique();
1277        let sanitized1 = transaction_with_writable_address(conflicting_address);
1278        let sanitized2 = transaction_with_readonly_address(conflicting_address);
1279        let sanitized3 = transaction_with_readonly_address(conflicting_address);
1280        let sanitized4 = transaction_with_writable_address(conflicting_address);
1281        let address_loader = &mut create_address_loader(None);
1282        let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1283        let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1284        let task3 = SchedulingStateMachine::create_task(sanitized3, 103, address_loader);
1285        let task4 = SchedulingStateMachine::create_task(sanitized4, 104, address_loader);
1286
1287        let mut state_machine = unsafe {
1288            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1289        };
1290        assert_matches!(
1291            state_machine
1292                .schedule_task(task1.clone())
1293                .map(|t| t.task_index()),
1294            Some(101)
1295        );
1296        assert_matches!(state_machine.schedule_task(task2.clone()), None);
1297        assert_matches!(state_machine.schedule_task(task3.clone()), None);
1298        assert_matches!(state_machine.schedule_task(task4.clone()), None);
1299
1300        state_machine.deschedule_task(&task1);
1301        assert_matches!(
1302            state_machine
1303                .schedule_next_unblocked_task()
1304                .map(|t| t.task_index()),
1305            Some(102)
1306        );
1307        assert_matches!(
1308            state_machine
1309                .schedule_next_unblocked_task()
1310                .map(|t| t.task_index()),
1311            Some(103)
1312        );
1313        // the above deschedule_task(task1) call should only unblock task2 and task3 because these
1314        // are read-locking. And shouldn't unblock task4 because it's write-locking
1315        assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1316
1317        state_machine.deschedule_task(&task2);
1318        // still task4 is blocked...
1319        assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1320
1321        state_machine.deschedule_task(&task3);
1322        // finally task4 should be unblocked
1323        assert_matches!(
1324            state_machine
1325                .schedule_next_unblocked_task()
1326                .map(|t| t.task_index()),
1327            Some(104)
1328        );
1329        state_machine.deschedule_task(&task4);
1330        assert!(state_machine.has_no_active_task());
1331    }
1332
1333    #[test]
1334    fn test_gradual_locking() {
1335        let conflicting_address = Pubkey::new_unique();
1336        let sanitized1 = transaction_with_writable_address(conflicting_address);
1337        let sanitized2 = transaction_with_writable_address(conflicting_address);
1338        let usage_queues = Rc::new(RefCell::new(HashMap::new()));
1339        let address_loader = &mut create_address_loader(Some(usage_queues.clone()));
1340        let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1341        let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1342
1343        let mut state_machine = unsafe {
1344            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1345        };
1346        assert_matches!(
1347            state_machine
1348                .schedule_task(task1.clone())
1349                .map(|t| t.task_index()),
1350            Some(101)
1351        );
1352        assert_matches!(state_machine.schedule_task(task2.clone()), None);
1353        let usage_queues = usage_queues.borrow_mut();
1354        let usage_queue = usage_queues.get(&conflicting_address).unwrap();
1355        usage_queue
1356            .0
1357            .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1358                assert_matches!(usage_queue.current_usage, Some(Usage::Writable));
1359            });
1360        // task2's fee payer should have been locked already even if task2 is blocked still via the
1361        // above the schedule_task(task2) call
1362        let fee_payer = task2.transaction().message().fee_payer();
1363        let usage_queue = usage_queues.get(fee_payer).unwrap();
1364        usage_queue
1365            .0
1366            .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1367                assert_matches!(usage_queue.current_usage, Some(Usage::Writable));
1368            });
1369        state_machine.deschedule_task(&task1);
1370        assert_matches!(
1371            state_machine
1372                .schedule_next_unblocked_task()
1373                .map(|t| t.task_index()),
1374            Some(102)
1375        );
1376        state_machine.deschedule_task(&task2);
1377        assert!(state_machine.has_no_active_task());
1378    }
1379
1380    #[test]
1381    #[should_panic(expected = "internal error: entered unreachable code")]
1382    fn test_unreachable_unlock_conditions1() {
1383        let mut state_machine = unsafe {
1384            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1385        };
1386        let usage_queue = UsageQueue::default();
1387        usage_queue
1388            .0
1389            .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1390                let _ = usage_queue.unlock(RequestedUsage::Writable);
1391            });
1392    }
1393
1394    #[test]
1395    #[should_panic(expected = "internal error: entered unreachable code")]
1396    fn test_unreachable_unlock_conditions2() {
1397        let mut state_machine = unsafe {
1398            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1399        };
1400        let usage_queue = UsageQueue::default();
1401        usage_queue
1402            .0
1403            .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1404                usage_queue.current_usage = Some(Usage::Writable);
1405                let _ = usage_queue.unlock(RequestedUsage::Readonly);
1406            });
1407    }
1408
1409    #[test]
1410    #[should_panic(expected = "internal error: entered unreachable code")]
1411    fn test_unreachable_unlock_conditions3() {
1412        let mut state_machine = unsafe {
1413            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1414        };
1415        let usage_queue = UsageQueue::default();
1416        usage_queue
1417            .0
1418            .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1419                usage_queue.current_usage = Some(Usage::Readonly(ShortCounter::one()));
1420                let _ = usage_queue.unlock(RequestedUsage::Writable);
1421            });
1422    }
1423}