solana_runtime/
installed_scheduler_pool.rs

1//! Transaction processing glue code, mainly consisting of Object-safe traits
2//!
3//! [InstalledSchedulerPool] lends one of pooled [InstalledScheduler]s as wrapped in
4//! [BankWithScheduler], which can be used by `ReplayStage` and `BankingStage` for transaction
5//! execution. After use, the scheduler will be returned to the pool.
6//!
7//! [InstalledScheduler] can be fed with [SanitizedTransaction]s. Then, it schedules those
8//! executions and commits those results into the associated _bank_.
9//!
10//! It's generally assumed that each [InstalledScheduler] is backed by multiple threads for
11//! parallel transaction processing and there are multiple independent schedulers inside a single
12//! instance of [InstalledSchedulerPool].
13//!
14//! Dynamic dispatch was inevitable due to the desire to piggyback on
15//! [BankForks](crate::bank_forks::BankForks)'s pruning for scheduler lifecycle management as the
16//! common place both for `ReplayStage` and `BankingStage` and the resultant need of invoking
17//! actual implementations provided by the dependent crate (`solana-unified-scheduler-pool`, which
18//! in turn depends on `solana-ledger`, which in turn depends on `solana-runtime`), avoiding a
19//! cyclic dependency.
20//!
21//! See [InstalledScheduler] for visualized interaction.
22
23use {
24    crate::bank::Bank,
25    log::*,
26    solana_sdk::{
27        clock::Slot,
28        hash::Hash,
29        transaction::{Result, SanitizedTransaction, TransactionError},
30    },
31    solana_timings::ExecuteTimings,
32    std::{
33        fmt::{self, Debug},
34        mem,
35        ops::Deref,
36        sync::{Arc, RwLock},
37        thread,
38    },
39};
40#[cfg(feature = "dev-context-only-utils")]
41use {mockall::automock, qualifier_attr::qualifiers};
42
43pub fn initialized_result_with_timings() -> ResultWithTimings {
44    (Ok(()), ExecuteTimings::default())
45}
46
47pub trait InstalledSchedulerPool: Send + Sync + Debug {
48    fn take_scheduler(&self, context: SchedulingContext) -> InstalledSchedulerBox {
49        self.take_resumed_scheduler(context, initialized_result_with_timings())
50    }
51
52    fn take_resumed_scheduler(
53        &self,
54        context: SchedulingContext,
55        result_with_timings: ResultWithTimings,
56    ) -> InstalledSchedulerBox;
57
58    fn register_timeout_listener(&self, timeout_listener: TimeoutListener);
59}
60
61#[derive(Debug)]
62pub struct SchedulerAborted;
63pub type ScheduleResult = std::result::Result<(), SchedulerAborted>;
64
65pub struct TimeoutListener {
66    callback: Box<dyn FnOnce(InstalledSchedulerPoolArc) + Sync + Send>,
67}
68
69impl TimeoutListener {
70    pub(crate) fn new(f: impl FnOnce(InstalledSchedulerPoolArc) + Sync + Send + 'static) -> Self {
71        Self {
72            callback: Box::new(f),
73        }
74    }
75
76    pub fn trigger(self, pool: InstalledSchedulerPoolArc) {
77        (self.callback)(pool);
78    }
79}
80
81impl Debug for TimeoutListener {
82    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
83        write!(f, "TimeoutListener({self:p})")
84    }
85}
86
87#[cfg_attr(doc, aquamarine::aquamarine)]
88/// Schedules, executes, and commits transactions under encapsulated implementation
89///
90/// The following chart illustrates the ownership/reference interaction between inter-dependent
91/// objects across crates:
92///
93/// ```mermaid
94/// graph TD
95///     Bank["Arc#lt;Bank#gt;"]
96///
97///     subgraph solana-runtime
98///         BankForks;
99///         BankWithScheduler;
100///         Bank;
101///         LoadExecuteAndCommitTransactions(["load_execute_and_commit_transactions()"]);
102///         SchedulingContext;
103///         InstalledSchedulerPool{{InstalledSchedulerPool}};
104///         InstalledScheduler{{InstalledScheduler}};
105///     end
106///
107///     subgraph solana-unified-scheduler-pool
108///         SchedulerPool;
109///         PooledScheduler;
110///         ScheduleExecution(["schedule_execution()"]);
111///     end
112///
113///     subgraph solana-ledger
114///         ExecuteBatch(["execute_batch()"]);
115///     end
116///
117///     ScheduleExecution -. calls .-> ExecuteBatch;
118///     BankWithScheduler -. dyn-calls .-> ScheduleExecution;
119///     ExecuteBatch -. calls .-> LoadExecuteAndCommitTransactions;
120///     linkStyle 0,1,2 stroke:gray,color:gray;
121///
122///     BankForks -- owns --> BankWithScheduler;
123///     BankForks -- owns --> InstalledSchedulerPool;
124///     BankWithScheduler -- refs --> Bank;
125///     BankWithScheduler -- owns --> InstalledScheduler;
126///     SchedulingContext -- refs --> Bank;
127///     InstalledScheduler -- owns --> SchedulingContext;
128///
129///     SchedulerPool -- owns --> PooledScheduler;
130///     SchedulerPool -. impls .-> InstalledSchedulerPool;
131///     PooledScheduler -. impls .-> InstalledScheduler;
132///     PooledScheduler -- refs --> SchedulerPool;
133/// ```
134#[cfg_attr(feature = "dev-context-only-utils", automock)]
135// suppress false clippy complaints arising from mockall-derive:
136//   warning: `#[must_use]` has no effect when applied to a struct field
137#[cfg_attr(feature = "dev-context-only-utils", allow(unused_attributes))]
138pub trait InstalledScheduler: Send + Sync + Debug + 'static {
139    fn id(&self) -> SchedulerId;
140    fn context(&self) -> &SchedulingContext;
141
142    /// Schedule transaction for execution.
143    ///
144    /// This non-blocking function will return immediately without waiting for actual execution.
145    ///
146    /// Calling this is illegal as soon as `wait_for_termination()` is called. It would result in
147    /// fatal logic error.
148    ///
149    /// Note that the returned result indicates whether the scheduler has been aborted due to a
150    /// previously-scheduled bad transaction, which terminates further block verification. So,
151    /// almost always, the returned error isn't due to the merely scheduling of the current
152    /// transaction itself. At this point, calling this does nothing anymore while it's still safe
153    /// to do. As soon as notified, callers are expected to stop processing upcoming transactions
154    /// of the same `SchedulingContext` (i.e. same block). Internally, the aborted scheduler will
155    /// be disposed cleanly, not repooled, after `wait_for_termination()` is called like
156    /// not-aborted schedulers.
157    ///
158    /// Caller can acquire the error by calling a separate function called
159    /// `recover_error_after_abort()`, which requires `&mut self`, instead of `&self`. This
160    /// separation and the convoluted returned value semantics explained above are intentional to
161    /// optimize the fast code-path of normal transaction scheduling to be multi-threaded at the
162    /// cost of far slower error code-path while giving implementors increased flexibility by
163    /// having &mut.
164    fn schedule_execution(&self, transaction: SanitizedTransaction, index: usize)
165        -> ScheduleResult;
166
167    /// Return the error which caused the scheduler to abort.
168    ///
169    /// Note that this must not be called until it's observed that `schedule_execution()` has
170    /// returned `Err(SchedulerAborted)`. Violating this should `panic!()`.
171    ///
172    /// That said, calling this multiple times is completely acceptable after the error observation
173    /// from `schedule_execution()`. While it's not guaranteed, the same `.clone()`-ed errors of
174    /// the first bad transaction are usually returned across invocations.
175    fn recover_error_after_abort(&mut self) -> TransactionError;
176
177    /// Wait for a scheduler to terminate after processing.
178    ///
179    /// This function blocks the current thread while waiting for the scheduler to complete all of
180    /// the executions for the scheduled transactions and to return the finalized
181    /// `ResultWithTimings`. This function still blocks for short period of time even in the case
182    /// of aborted schedulers to gracefully shutdown the scheduler (like thread joining).
183    ///
184    /// Along with the result being returned, this function also makes the scheduler itself
185    /// uninstalled from the bank by transforming the consumed self.
186    ///
187    /// If no transaction is scheduled, the result and timing will be `Ok(())` and
188    /// `ExecuteTimings::default()` respectively.
189    fn wait_for_termination(
190        self: Box<Self>,
191        is_dropped: bool,
192    ) -> (ResultWithTimings, UninstalledSchedulerBox);
193
194    /// Pause a scheduler after processing to update bank's recent blockhash.
195    ///
196    /// This function blocks the current thread like wait_for_termination(). However, the scheduler
197    /// won't be consumed. This means the scheduler is responsible to retain the finalized
198    /// `ResultWithTimings` internally until it's `wait_for_termination()`-ed to collect the result
199    /// later.
200    fn pause_for_recent_blockhash(&mut self);
201}
202
203#[cfg_attr(feature = "dev-context-only-utils", automock)]
204pub trait UninstalledScheduler: Send + Sync + Debug + 'static {
205    fn return_to_pool(self: Box<Self>);
206}
207
208pub type InstalledSchedulerBox = Box<dyn InstalledScheduler>;
209pub type UninstalledSchedulerBox = Box<dyn UninstalledScheduler>;
210
211pub type InstalledSchedulerPoolArc = Arc<dyn InstalledSchedulerPool>;
212
213pub type SchedulerId = u64;
214
215/// A small context to propagate a bank and its scheduling mode to the scheduler subsystem.
216///
217/// Note that this isn't called `SchedulerContext` because the contexts aren't associated with
218/// schedulers one by one. A scheduler will use many SchedulingContexts during its lifetime.
219/// "Scheduling" part of the context name refers to an abstract slice of time to schedule and
220/// execute all transactions for a given bank for block verification or production. A context is
221/// expected to be used by a particular scheduler only for that duration of the time and to be
222/// disposed by the scheduler. Then, the scheduler may work on different banks with new
223/// `SchedulingContext`s.
224#[derive(Clone, Debug)]
225pub struct SchedulingContext {
226    // mode: SchedulingMode, // this will be added later.
227    bank: Arc<Bank>,
228}
229
230impl SchedulingContext {
231    pub fn new(bank: Arc<Bank>) -> Self {
232        Self { bank }
233    }
234
235    pub fn bank(&self) -> &Arc<Bank> {
236        &self.bank
237    }
238
239    pub fn slot(&self) -> Slot {
240        self.bank().slot()
241    }
242}
243
244pub type ResultWithTimings = (Result<()>, ExecuteTimings);
245
246/// A hint from the bank about the reason the caller is waiting on its scheduler.
247#[derive(Debug, PartialEq, Eq, Clone, Copy)]
248enum WaitReason {
249    // The bank wants its scheduler to terminate after the completion of transaction execution, in
250    // order to freeze itself immediately thereafter. This is by far the most normal wait reason.
251    //
252    // Note that `wait_for_termination(TerminatedToFreeze)` must explicitly be done prior
253    // to Bank::freeze(). This can't be done inside Bank::freeze() implicitly to remain it
254    // infallible.
255    TerminatedToFreeze,
256    // The bank wants its scheduler to terminate just like `TerminatedToFreeze` and indicate that
257    // Drop::drop() is the caller.
258    DroppedFromBankForks,
259    // The bank wants its scheduler to pause after the completion without being returned to the
260    // pool. This is to update bank's recent blockhash and to collect scheduler's internally-held
261    // `ResultWithTimings` later.
262    PausedForRecentBlockhash,
263}
264
265impl WaitReason {
266    pub fn is_paused(&self) -> bool {
267        // Exhaustive `match` is preferred here than `matches!()` to trigger an explicit
268        // decision to be made, should we add new variants like `PausedForFooBar`...
269        match self {
270            WaitReason::PausedForRecentBlockhash => true,
271            WaitReason::TerminatedToFreeze | WaitReason::DroppedFromBankForks => false,
272        }
273    }
274
275    pub fn is_dropped(&self) -> bool {
276        // Exhaustive `match` is preferred here than `matches!()` to trigger an explicit
277        // decision to be made, should we add new variants like `PausedForFooBar`...
278        match self {
279            WaitReason::DroppedFromBankForks => true,
280            WaitReason::TerminatedToFreeze | WaitReason::PausedForRecentBlockhash => false,
281        }
282    }
283}
284
285#[allow(clippy::large_enum_variant)]
286#[derive(Debug)]
287pub enum SchedulerStatus {
288    /// Unified scheduler is disabled or installed scheduler is consumed by wait_for_termination().
289    /// Note that transition to Unavailable from {Active, Stale} is one-way (i.e. one-time).
290    /// Also, this variant is transiently used as a placeholder internally when transitioning
291    /// scheduler statuses, which isn't observable unless panic is happening.
292    Unavailable,
293    /// Scheduler is installed into a bank; could be running or just be idling.
294    /// This will be transitioned to Stale after certain time has passed if its bank hasn't been
295    /// frozen.
296    Active(InstalledSchedulerBox),
297    /// Scheduler is idling for long time, returning scheduler back to the pool.
298    /// This will be immediately (i.e. transaparently) transitioned to Active as soon as there's
299    /// new transaction to be executed.
300    Stale(InstalledSchedulerPoolArc, ResultWithTimings),
301}
302
303impl SchedulerStatus {
304    fn new(scheduler: Option<InstalledSchedulerBox>) -> Self {
305        match scheduler {
306            Some(scheduler) => SchedulerStatus::Active(scheduler),
307            None => SchedulerStatus::Unavailable,
308        }
309    }
310
311    fn transition_from_stale_to_active(
312        &mut self,
313        f: impl FnOnce(InstalledSchedulerPoolArc, ResultWithTimings) -> InstalledSchedulerBox,
314    ) {
315        let Self::Stale(pool, result_with_timings) = mem::replace(self, Self::Unavailable) else {
316            panic!("transition to Active failed: {self:?}");
317        };
318        *self = Self::Active(f(pool, result_with_timings));
319    }
320
321    fn maybe_transition_from_active_to_stale(
322        &mut self,
323        f: impl FnOnce(InstalledSchedulerBox) -> (InstalledSchedulerPoolArc, ResultWithTimings),
324    ) {
325        if !matches!(self, Self::Active(_scheduler)) {
326            return;
327        }
328        let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else {
329            unreachable!("not active: {self:?}");
330        };
331        let (pool, result_with_timings) = f(scheduler);
332        *self = Self::Stale(pool, result_with_timings);
333    }
334
335    fn transition_from_active_to_unavailable(&mut self) -> InstalledSchedulerBox {
336        let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else {
337            panic!("transition to Unavailable failed: {self:?}");
338        };
339        scheduler
340    }
341
342    fn transition_from_stale_to_unavailable(&mut self) -> ResultWithTimings {
343        let Self::Stale(_pool, result_with_timings) = mem::replace(self, Self::Unavailable) else {
344            panic!("transition to Unavailable failed: {self:?}");
345        };
346        result_with_timings
347    }
348
349    fn active_scheduler(&self) -> &InstalledSchedulerBox {
350        let SchedulerStatus::Active(active_scheduler) = self else {
351            panic!("not active: {self:?}");
352        };
353        active_scheduler
354    }
355}
356
357/// Very thin wrapper around Arc<Bank>
358///
359/// It brings type-safety against accidental mixing of bank and scheduler with different slots,
360/// which is a pretty dangerous condition. Also, it guarantees to call wait_for_termination() via
361/// ::drop() by DropBankService, which receives Vec<BankWithScheduler> from BankForks::set_root()'s
362/// pruning, mostly matching to Arc<Bank>'s lifetime by piggybacking on the pruning.
363///
364/// Semantically, a scheduler is tightly coupled with a particular bank. But scheduler wasn't put
365/// into Bank fields to avoid circular-references (a scheduler needs to refer to its accompanied
366/// Arc<Bank>). BankWithScheduler behaves almost like Arc<Bank>. It only adds a few of transaction
367/// scheduling and scheduler management functions. For this reason, `bank` variable names should be
368/// used for `BankWithScheduler` across codebase.
369///
370/// BankWithScheduler even implements Deref for convenience. And Clone is omitted to implement to
371/// avoid ambiguity as to which to clone: BankWithScheduler or Arc<Bank>. Use
372/// clone_without_scheduler() for Arc<Bank>. Otherwise, use clone_with_scheduler() (this should be
373/// unusual outside scheduler code-path)
374#[derive(Debug)]
375pub struct BankWithScheduler {
376    inner: Arc<BankWithSchedulerInner>,
377}
378
379#[derive(Debug)]
380pub struct BankWithSchedulerInner {
381    bank: Arc<Bank>,
382    scheduler: InstalledSchedulerRwLock,
383}
384pub type InstalledSchedulerRwLock = RwLock<SchedulerStatus>;
385
386impl BankWithScheduler {
387    #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
388    pub(crate) fn new(bank: Arc<Bank>, scheduler: Option<InstalledSchedulerBox>) -> Self {
389        if let Some(bank_in_context) = scheduler
390            .as_ref()
391            .map(|scheduler| scheduler.context().bank())
392        {
393            assert!(Arc::ptr_eq(&bank, bank_in_context));
394        }
395
396        Self {
397            inner: Arc::new(BankWithSchedulerInner {
398                bank,
399                scheduler: RwLock::new(SchedulerStatus::new(scheduler)),
400            }),
401        }
402    }
403
404    pub fn new_without_scheduler(bank: Arc<Bank>) -> Self {
405        Self::new(bank, None)
406    }
407
408    pub fn clone_with_scheduler(&self) -> BankWithScheduler {
409        BankWithScheduler {
410            inner: self.inner.clone(),
411        }
412    }
413
414    pub fn clone_without_scheduler(&self) -> Arc<Bank> {
415        self.inner.bank.clone()
416    }
417
418    pub fn register_tick(&self, hash: &Hash) {
419        self.inner.bank.register_tick(hash, &self.inner.scheduler);
420    }
421
422    #[cfg(feature = "dev-context-only-utils")]
423    pub fn fill_bank_with_ticks_for_tests(&self) {
424        self.do_fill_bank_with_ticks_for_tests(&self.inner.scheduler);
425    }
426
427    pub fn has_installed_scheduler(&self) -> bool {
428        !matches!(
429            &*self.inner.scheduler.read().unwrap(),
430            SchedulerStatus::Unavailable
431        )
432    }
433
434    /// Schedule the transaction as long as the scheduler hasn't been aborted.
435    ///
436    /// If the scheduler has been aborted, this doesn't schedule the transaction, instead just
437    /// return the error of prior scheduled transaction.
438    ///
439    /// Calling this will panic if the installed scheduler is Unavailable (the bank is
440    /// wait_for_termination()-ed or the unified scheduler is disabled in the first place).
441    pub fn schedule_transaction_executions(
442        &self,
443        transactions_with_indexes: impl ExactSizeIterator<Item = (SanitizedTransaction, usize)>,
444    ) -> Result<()> {
445        trace!(
446            "schedule_transaction_executions(): {} txs",
447            transactions_with_indexes.len()
448        );
449
450        let schedule_result: ScheduleResult = self.inner.with_active_scheduler(|scheduler| {
451            for (sanitized_transaction, index) in transactions_with_indexes {
452                scheduler.schedule_execution(sanitized_transaction, index)?;
453            }
454            Ok(())
455        });
456
457        if schedule_result.is_err() {
458            // This write lock isn't atomic with the above the read lock. So, another thread
459            // could have called .recover_error_after_abort() while we're literally stuck at
460            // the gaps of these locks (i.e. this comment in source code wise) under extreme
461            // race conditions. Thus, .recover_error_after_abort() is made idempotetnt for that
462            // consideration in mind.
463            //
464            // Lastly, this non-atomic nature is intentional for optimizing the fast code-path
465            return Err(self.inner.retrieve_error_after_schedule_failure());
466        }
467
468        Ok(())
469    }
470
471    #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
472    pub(crate) fn create_timeout_listener(&self) -> TimeoutListener {
473        self.inner.do_create_timeout_listener()
474    }
475
476    // take needless &mut only to communicate its semantic mutability to humans...
477    #[cfg(feature = "dev-context-only-utils")]
478    pub fn drop_scheduler(&mut self) {
479        self.inner.drop_scheduler();
480    }
481
482    pub(crate) fn wait_for_paused_scheduler(bank: &Bank, scheduler: &InstalledSchedulerRwLock) {
483        let maybe_result_with_timings = BankWithSchedulerInner::wait_for_scheduler_termination(
484            bank,
485            scheduler,
486            WaitReason::PausedForRecentBlockhash,
487        );
488        assert!(
489            maybe_result_with_timings.is_none(),
490            "Premature result was returned from scheduler after paused (slot: {})",
491            bank.slot(),
492        );
493    }
494
495    #[must_use]
496    pub fn wait_for_completed_scheduler(&self) -> Option<ResultWithTimings> {
497        BankWithSchedulerInner::wait_for_scheduler_termination(
498            &self.inner.bank,
499            &self.inner.scheduler,
500            WaitReason::TerminatedToFreeze,
501        )
502    }
503
504    pub const fn no_scheduler_available() -> InstalledSchedulerRwLock {
505        RwLock::new(SchedulerStatus::Unavailable)
506    }
507}
508
509impl BankWithSchedulerInner {
510    fn with_active_scheduler(
511        self: &Arc<Self>,
512        f: impl FnOnce(&InstalledSchedulerBox) -> ScheduleResult,
513    ) -> ScheduleResult {
514        let scheduler = self.scheduler.read().unwrap();
515        match &*scheduler {
516            SchedulerStatus::Active(scheduler) => {
517                // This is the fast path, needing single read-lock most of time.
518                f(scheduler)
519            }
520            SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => {
521                trace!(
522                    "with_active_scheduler: bank (slot: {}) has a stale aborted scheduler...",
523                    self.bank.slot(),
524                );
525                Err(SchedulerAborted)
526            }
527            SchedulerStatus::Stale(pool, _result_with_timings) => {
528                let pool = pool.clone();
529                drop(scheduler);
530
531                let context = SchedulingContext::new(self.bank.clone());
532                let mut scheduler = self.scheduler.write().unwrap();
533                trace!("with_active_scheduler: {:?}", scheduler);
534                scheduler.transition_from_stale_to_active(|pool, result_with_timings| {
535                    let scheduler = pool.take_resumed_scheduler(context, result_with_timings);
536                    info!(
537                        "with_active_scheduler: bank (slot: {}) got active, taking scheduler (id: {})",
538                        self.bank.slot(),
539                        scheduler.id(),
540                    );
541                    scheduler
542                });
543                drop(scheduler);
544
545                let scheduler = self.scheduler.read().unwrap();
546                // Re-register a new timeout listener only after acquiring the read lock;
547                // Otherwise, the listener would again put scheduler into Stale before the read
548                // lock under an extremely-rare race condition, causing panic below in
549                // active_scheduler().
550                pool.register_timeout_listener(self.do_create_timeout_listener());
551                f(scheduler.active_scheduler())
552            }
553            SchedulerStatus::Unavailable => unreachable!("no installed scheduler"),
554        }
555    }
556
557    fn do_create_timeout_listener(self: &Arc<Self>) -> TimeoutListener {
558        let weak_bank = Arc::downgrade(self);
559        TimeoutListener::new(move |pool| {
560            let Some(bank) = weak_bank.upgrade() else {
561                return;
562            };
563
564            let Ok(mut scheduler) = bank.scheduler.write() else {
565                return;
566            };
567
568            scheduler.maybe_transition_from_active_to_stale(|scheduler| {
569                // The scheduler hasn't still been wait_for_termination()-ed after awhile...
570                // Return the installed scheduler back to the scheduler pool as soon as the
571                // scheduler gets idle after executing all currently-scheduled transactions.
572
573                let id = scheduler.id();
574                let (result_with_timings, uninstalled_scheduler) =
575                    scheduler.wait_for_termination(false);
576                uninstalled_scheduler.return_to_pool();
577                info!(
578                    "timeout_listener: bank (slot: {}) got stale, returning scheduler (id: {})",
579                    bank.bank.slot(),
580                    id,
581                );
582                (pool, result_with_timings)
583            });
584            trace!("timeout_listener: {:?}", scheduler);
585        })
586    }
587
588    /// This must not be called until `Err(SchedulerAborted)` is observed. Violating this should
589    /// `panic!()`.
590    fn retrieve_error_after_schedule_failure(&self) -> TransactionError {
591        let mut scheduler = self.scheduler.write().unwrap();
592        match &mut *scheduler {
593            SchedulerStatus::Active(scheduler) => scheduler.recover_error_after_abort(),
594            SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => {
595                result.clone().unwrap_err()
596            }
597            _ => unreachable!("no error in {:?}", self.scheduler),
598        }
599    }
600
601    #[must_use]
602    fn wait_for_completed_scheduler_from_drop(&self) -> Option<ResultWithTimings> {
603        Self::wait_for_scheduler_termination(
604            &self.bank,
605            &self.scheduler,
606            WaitReason::DroppedFromBankForks,
607        )
608    }
609
610    #[must_use]
611    fn wait_for_scheduler_termination(
612        bank: &Bank,
613        scheduler: &InstalledSchedulerRwLock,
614        reason: WaitReason,
615    ) -> Option<ResultWithTimings> {
616        debug!(
617            "wait_for_scheduler_termination(slot: {}, reason: {:?}): started at {:?}...",
618            bank.slot(),
619            reason,
620            thread::current(),
621        );
622
623        let mut scheduler = scheduler.write().unwrap();
624        let (was_noop, result_with_timings) = match &mut *scheduler {
625            SchedulerStatus::Active(scheduler) if reason.is_paused() => {
626                scheduler.pause_for_recent_blockhash();
627                (false, None)
628            }
629            SchedulerStatus::Active(_scheduler) => {
630                let scheduler = scheduler.transition_from_active_to_unavailable();
631                let (result_with_timings, uninstalled_scheduler) =
632                    scheduler.wait_for_termination(reason.is_dropped());
633                uninstalled_scheduler.return_to_pool();
634                (false, Some(result_with_timings))
635            }
636            SchedulerStatus::Stale(_pool, _result_with_timings) if reason.is_paused() => {
637                // Do nothing for pauses because the scheduler termination is guaranteed to be
638                // called later.
639                (true, None)
640            }
641            SchedulerStatus::Stale(_pool, _result_with_timings) => {
642                let result_with_timings = scheduler.transition_from_stale_to_unavailable();
643                (true, Some(result_with_timings))
644            }
645            SchedulerStatus::Unavailable => (true, None),
646        };
647        debug!(
648            "wait_for_scheduler_termination(slot: {}, reason: {:?}): noop: {:?}, result: {:?} at {:?}...",
649            bank.slot(),
650            reason,
651            was_noop,
652            result_with_timings.as_ref().map(|(result, _)| result),
653            thread::current(),
654        );
655        trace!(
656            "wait_for_scheduler_termination(result_with_timings: {:?})",
657            result_with_timings,
658        );
659
660        result_with_timings
661    }
662
663    fn drop_scheduler(&self) {
664        if thread::panicking() {
665            error!(
666                "BankWithSchedulerInner::drop_scheduler(): slot: {} skipping due to already panicking...",
667                self.bank.slot(),
668            );
669            return;
670        }
671
672        // There's no guarantee ResultWithTimings is available or not at all when being dropped.
673        if let Some(Err(err)) = self
674            .wait_for_completed_scheduler_from_drop()
675            .map(|(result, _timings)| result)
676        {
677            warn!(
678                "BankWithSchedulerInner::drop_scheduler(): slot: {} discarding error from scheduler: {:?}",
679                self.bank.slot(),
680                err,
681            );
682        }
683    }
684}
685
686impl Drop for BankWithSchedulerInner {
687    fn drop(&mut self) {
688        self.drop_scheduler();
689    }
690}
691
692impl Deref for BankWithScheduler {
693    type Target = Arc<Bank>;
694
695    fn deref(&self) -> &Self::Target {
696        &self.inner.bank
697    }
698}
699
700#[cfg(test)]
701mod tests {
702    use {
703        super::*,
704        crate::{
705            bank::test_utils::goto_end_of_slot_with_scheduler,
706            genesis_utils::{create_genesis_config, GenesisConfigInfo},
707        },
708        assert_matches::assert_matches,
709        mockall::Sequence,
710        solana_sdk::system_transaction,
711        std::sync::Mutex,
712    };
713
714    fn setup_mocked_scheduler_with_extra(
715        bank: Arc<Bank>,
716        is_dropped_flags: impl Iterator<Item = bool>,
717        f: Option<impl Fn(&mut MockInstalledScheduler)>,
718    ) -> InstalledSchedulerBox {
719        let mut mock = MockInstalledScheduler::new();
720        let seq = Arc::new(Mutex::new(Sequence::new()));
721
722        mock.expect_context()
723            .times(1)
724            .in_sequence(&mut seq.lock().unwrap())
725            .return_const(SchedulingContext::new(bank));
726
727        for wait_reason in is_dropped_flags {
728            let seq_cloned = seq.clone();
729            mock.expect_wait_for_termination()
730                .with(mockall::predicate::eq(wait_reason))
731                .times(1)
732                .in_sequence(&mut seq.lock().unwrap())
733                .returning(move |_| {
734                    let mut mock_uninstalled = MockUninstalledScheduler::new();
735                    mock_uninstalled
736                        .expect_return_to_pool()
737                        .times(1)
738                        .in_sequence(&mut seq_cloned.lock().unwrap())
739                        .returning(|| ());
740                    (
741                        (Ok(()), ExecuteTimings::default()),
742                        Box::new(mock_uninstalled),
743                    )
744                });
745        }
746
747        if let Some(f) = f {
748            f(&mut mock);
749        }
750
751        Box::new(mock)
752    }
753
754    fn setup_mocked_scheduler(
755        bank: Arc<Bank>,
756        is_dropped_flags: impl Iterator<Item = bool>,
757    ) -> InstalledSchedulerBox {
758        setup_mocked_scheduler_with_extra(
759            bank,
760            is_dropped_flags,
761            None::<fn(&mut MockInstalledScheduler) -> ()>,
762        )
763    }
764
765    #[test]
766    fn test_scheduler_normal_termination() {
767        solana_logger::setup();
768
769        let bank = Arc::new(Bank::default_for_tests());
770        let bank = BankWithScheduler::new(
771            bank.clone(),
772            Some(setup_mocked_scheduler(bank, [false].into_iter())),
773        );
774        assert!(bank.has_installed_scheduler());
775        assert_matches!(bank.wait_for_completed_scheduler(), Some(_));
776
777        // Repeating to call wait_for_completed_scheduler() is okay with no ResultWithTimings being
778        // returned.
779        assert!(!bank.has_installed_scheduler());
780        assert_matches!(bank.wait_for_completed_scheduler(), None);
781    }
782
783    #[test]
784    fn test_no_scheduler_termination() {
785        solana_logger::setup();
786
787        let bank = Arc::new(Bank::default_for_tests());
788        let bank = BankWithScheduler::new_without_scheduler(bank);
789
790        // Calling wait_for_completed_scheduler() is noop, when no scheduler is installed.
791        assert!(!bank.has_installed_scheduler());
792        assert_matches!(bank.wait_for_completed_scheduler(), None);
793    }
794
795    #[test]
796    fn test_scheduler_termination_from_drop() {
797        solana_logger::setup();
798
799        let bank = Arc::new(Bank::default_for_tests());
800        let bank = BankWithScheduler::new(
801            bank.clone(),
802            Some(setup_mocked_scheduler(bank, [true].into_iter())),
803        );
804        drop(bank);
805    }
806
807    #[test]
808    fn test_scheduler_pause() {
809        solana_logger::setup();
810
811        let bank = Arc::new(crate::bank::tests::create_simple_test_bank(42));
812        let bank = BankWithScheduler::new(
813            bank.clone(),
814            Some(setup_mocked_scheduler_with_extra(
815                bank,
816                [false].into_iter(),
817                Some(|mocked: &mut MockInstalledScheduler| {
818                    mocked
819                        .expect_pause_for_recent_blockhash()
820                        .times(1)
821                        .returning(|| ());
822                }),
823            )),
824        );
825        goto_end_of_slot_with_scheduler(&bank);
826        assert_matches!(bank.wait_for_completed_scheduler(), Some(_));
827    }
828
829    fn do_test_schedule_execution(should_succeed: bool) {
830        solana_logger::setup();
831
832        let GenesisConfigInfo {
833            genesis_config,
834            mint_keypair,
835            ..
836        } = create_genesis_config(10_000);
837        let tx0 = SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
838            &mint_keypair,
839            &solana_sdk::pubkey::new_rand(),
840            2,
841            genesis_config.hash(),
842        ));
843        let bank = Arc::new(Bank::new_for_tests(&genesis_config));
844        let mocked_scheduler = setup_mocked_scheduler_with_extra(
845            bank.clone(),
846            [true].into_iter(),
847            Some(|mocked: &mut MockInstalledScheduler| {
848                if should_succeed {
849                    mocked
850                        .expect_schedule_execution()
851                        .times(1)
852                        .returning(|_, _| Ok(()));
853                } else {
854                    mocked
855                        .expect_schedule_execution()
856                        .times(1)
857                        .returning(|_, _| Err(SchedulerAborted));
858                    mocked
859                        .expect_recover_error_after_abort()
860                        .times(1)
861                        .returning(|| TransactionError::InsufficientFundsForFee);
862                }
863            }),
864        );
865
866        let bank = BankWithScheduler::new(bank, Some(mocked_scheduler));
867        let result = bank.schedule_transaction_executions([(tx0, 0)].into_iter());
868        if should_succeed {
869            assert_matches!(result, Ok(()));
870        } else {
871            assert_matches!(result, Err(TransactionError::InsufficientFundsForFee));
872        }
873    }
874
875    #[test]
876    fn test_schedule_execution_success() {
877        do_test_schedule_execution(true);
878    }
879
880    #[test]
881    fn test_schedule_execution_failure() {
882        do_test_schedule_execution(false);
883    }
884}