solana_runtime/
installed_scheduler_pool.rs

1//! Transaction processing glue code, mainly consisting of Object-safe traits
2//!
3//! [InstalledSchedulerPool] lends one of pooled [InstalledScheduler]s as wrapped in
4//! [BankWithScheduler], which can be used by `ReplayStage` and `BankingStage` for transaction
5//! execution. After use, the scheduler will be returned to the pool.
6//!
7//! [InstalledScheduler] can be fed with [SanitizedTransaction]s. Then, it schedules those
8//! executions and commits those results into the associated _bank_.
9//!
10//! It's generally assumed that each [InstalledScheduler] is backed by multiple threads for
11//! parallel transaction processing and there are multiple independent schedulers inside a single
12//! instance of [InstalledSchedulerPool].
13//!
14//! Dynamic dispatch was inevitable due to the desire to piggyback on
15//! [BankForks](crate::bank_forks::BankForks)'s pruning for scheduler lifecycle management as the
16//! common place both for `ReplayStage` and `BankingStage` and the resultant need of invoking
17//! actual implementations provided by the dependent crate (`solana-unified-scheduler-pool`, which
18//! in turn depends on `solana-ledger`, which in turn depends on `solana-runtime`), avoiding a
19//! cyclic dependency.
20//!
21//! See [InstalledScheduler] for visualized interaction.
22
23use {
24    crate::bank::Bank,
25    log::*,
26    solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
27    solana_sdk::{
28        clock::Slot,
29        hash::Hash,
30        transaction::{Result, SanitizedTransaction, TransactionError},
31    },
32    solana_timings::ExecuteTimings,
33    solana_unified_scheduler_logic::SchedulingMode,
34    std::{
35        fmt::{self, Debug},
36        mem,
37        ops::Deref,
38        sync::{Arc, RwLock},
39        thread,
40    },
41};
42#[cfg(feature = "dev-context-only-utils")]
43use {mockall::automock, qualifier_attr::qualifiers};
44
45pub fn initialized_result_with_timings() -> ResultWithTimings {
46    (Ok(()), ExecuteTimings::default())
47}
48
49pub trait InstalledSchedulerPool: Send + Sync + Debug {
50    /// A very thin wrapper of [`Self::take_resumed_scheduler`] to take a scheduler from this pool
51    /// for a brand-new bank.
52    fn take_scheduler(&self, context: SchedulingContext) -> InstalledSchedulerBox {
53        self.take_resumed_scheduler(context, initialized_result_with_timings())
54    }
55
56    fn take_resumed_scheduler(
57        &self,
58        context: SchedulingContext,
59        result_with_timings: ResultWithTimings,
60    ) -> InstalledSchedulerBox;
61
62    /// Registers an opaque timeout listener.
63    ///
64    /// This method and the passed `struct` called [`TimeoutListener`] are very opaque by purpose.
65    /// Specifically, it doesn't provide any way to tell which listener is semantically associated
66    /// to which particular scheduler. That's because proper _unregistration_ is omitted at the
67    /// timing of scheduler returning to reduce latency of the normal block-verification code-path,
68    /// relying on eventual stale listener clean-up by `solScCleaner`.
69    fn register_timeout_listener(&self, timeout_listener: TimeoutListener);
70}
71
72#[derive(Debug)]
73pub struct SchedulerAborted;
74pub type ScheduleResult = std::result::Result<(), SchedulerAborted>;
75
76pub struct TimeoutListener {
77    callback: Box<dyn FnOnce(InstalledSchedulerPoolArc) + Sync + Send>,
78}
79
80impl TimeoutListener {
81    pub(crate) fn new(f: impl FnOnce(InstalledSchedulerPoolArc) + Sync + Send + 'static) -> Self {
82        Self {
83            callback: Box::new(f),
84        }
85    }
86
87    pub fn trigger(self, pool: InstalledSchedulerPoolArc) {
88        (self.callback)(pool);
89    }
90}
91
92impl Debug for TimeoutListener {
93    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94        write!(f, "TimeoutListener({self:p})")
95    }
96}
97
98#[cfg_attr(doc, aquamarine::aquamarine)]
99/// Schedules, executes, and commits transactions under encapsulated implementation
100///
101/// The following chart illustrates the ownership/reference interaction between inter-dependent
102/// objects across crates:
103///
104/// ```mermaid
105/// graph TD
106///     Bank["Arc#lt;Bank#gt;"]
107///
108///     subgraph solana-runtime[<span style="font-size: 70%">solana-runtime</span>]
109///         BankForks;
110///         BankWithScheduler;
111///         Bank;
112///         LoadExecuteAndCommitTransactions([<span style="font-size: 67%">load_execute_and_commit_transactions#lpar;#rpar;</span>]);
113///         SchedulingContext;
114///         InstalledSchedulerPool{{InstalledSchedulerPool}};
115///         InstalledScheduler{{InstalledScheduler}};
116///     end
117///
118///     subgraph solana-unified-scheduler-pool[<span style="font-size: 70%">solana-unified-scheduler-pool</span>]
119///         SchedulerPool;
120///         PooledScheduler;
121///         ScheduleExecution(["schedule_execution()"]);
122///     end
123///
124///     subgraph solana-ledger[<span style="font-size: 60%">solana-ledger</span>]
125///         ExecuteBatch(["execute_batch()"]);
126///     end
127///
128///     ScheduleExecution -. calls .-> ExecuteBatch;
129///     BankWithScheduler -. dyn-calls .-> ScheduleExecution;
130///     ExecuteBatch -. calls .-> LoadExecuteAndCommitTransactions;
131///     linkStyle 0,1,2 stroke:gray,color:gray;
132///
133///     BankForks -- owns --> BankWithScheduler;
134///     BankForks -- owns --> InstalledSchedulerPool;
135///     BankWithScheduler -- refs --> Bank;
136///     BankWithScheduler -- owns --> InstalledScheduler;
137///     SchedulingContext -- refs --> Bank;
138///     InstalledScheduler -- owns --> SchedulingContext;
139///
140///     SchedulerPool -- owns --> PooledScheduler;
141///     SchedulerPool -. impls .-> InstalledSchedulerPool;
142///     PooledScheduler -. impls .-> InstalledScheduler;
143///     PooledScheduler -- refs --> SchedulerPool;
144/// ```
145#[cfg_attr(feature = "dev-context-only-utils", automock)]
146// suppress false clippy complaints arising from mockall-derive:
147//   warning: `#[must_use]` has no effect when applied to a struct field
148#[cfg_attr(feature = "dev-context-only-utils", allow(unused_attributes))]
149pub trait InstalledScheduler: Send + Sync + Debug + 'static {
150    fn id(&self) -> SchedulerId;
151    fn context(&self) -> &SchedulingContext;
152
153    /// Schedule transaction for execution.
154    ///
155    /// This non-blocking function will return immediately without waiting for actual execution.
156    ///
157    /// Calling this is illegal as soon as `wait_for_termination()` is called. It would result in
158    /// fatal logic error.
159    ///
160    /// Note that the returned result indicates whether the scheduler has been aborted due to a
161    /// previously-scheduled bad transaction, which terminates further block verification. So,
162    /// almost always, the returned error isn't due to the merely scheduling of the current
163    /// transaction itself. At this point, calling this does nothing anymore while it's still safe
164    /// to do. As soon as notified, callers are expected to stop processing upcoming transactions
165    /// of the same `SchedulingContext` (i.e. same block). Internally, the aborted scheduler will
166    /// be disposed cleanly, not repooled, after `wait_for_termination()` is called like
167    /// not-aborted schedulers.
168    ///
169    /// Caller can acquire the error by calling a separate function called
170    /// `recover_error_after_abort()`, which requires `&mut self`, instead of `&self`. This
171    /// separation and the convoluted returned value semantics explained above are intentional to
172    /// optimize the fast code-path of normal transaction scheduling to be multi-threaded at the
173    /// cost of far slower error code-path while giving implementors increased flexibility by
174    /// having &mut.
175    fn schedule_execution(
176        &self,
177        transaction: RuntimeTransaction<SanitizedTransaction>,
178        index: usize,
179    ) -> ScheduleResult;
180
181    /// Return the error which caused the scheduler to abort.
182    ///
183    /// Note that this must not be called until it's observed that `schedule_execution()` has
184    /// returned `Err(SchedulerAborted)`. Violating this should `panic!()`.
185    ///
186    /// That said, calling this multiple times is completely acceptable after the error observation
187    /// from `schedule_execution()`. While it's not guaranteed, the same `.clone()`-ed errors of
188    /// the first bad transaction are usually returned across invocations.
189    fn recover_error_after_abort(&mut self) -> TransactionError;
190
191    /// Wait for a scheduler to terminate after processing.
192    ///
193    /// This function blocks the current thread while waiting for the scheduler to complete all of
194    /// the executions for the scheduled transactions and to return the finalized
195    /// `ResultWithTimings`. This function still blocks for short period of time even in the case
196    /// of aborted schedulers to gracefully shutdown the scheduler (like thread joining).
197    ///
198    /// Along with the result being returned, this function also makes the scheduler itself
199    /// uninstalled from the bank by transforming the consumed self.
200    ///
201    /// If no transaction is scheduled, the result and timing will be `Ok(())` and
202    /// `ExecuteTimings::default()` respectively.
203    fn wait_for_termination(
204        self: Box<Self>,
205        is_dropped: bool,
206    ) -> (ResultWithTimings, UninstalledSchedulerBox);
207
208    /// Pause a scheduler after processing to update bank's recent blockhash.
209    ///
210    /// This function blocks the current thread like wait_for_termination(). However, the scheduler
211    /// won't be consumed. This means the scheduler is responsible to retain the finalized
212    /// `ResultWithTimings` internally until it's `wait_for_termination()`-ed to collect the result
213    /// later.
214    fn pause_for_recent_blockhash(&mut self);
215}
216
217#[cfg_attr(feature = "dev-context-only-utils", automock)]
218pub trait UninstalledScheduler: Send + Sync + Debug + 'static {
219    fn return_to_pool(self: Box<Self>);
220}
221
222pub type InstalledSchedulerBox = Box<dyn InstalledScheduler>;
223pub type UninstalledSchedulerBox = Box<dyn UninstalledScheduler>;
224
225pub type InstalledSchedulerPoolArc = Arc<dyn InstalledSchedulerPool>;
226
227pub type SchedulerId = u64;
228
229/// A small context to propagate a bank and its scheduling mode to the scheduler subsystem.
230///
231/// Note that this isn't called `SchedulerContext` because the contexts aren't associated with
232/// schedulers one by one. A scheduler will use many SchedulingContexts during its lifetime.
233/// "Scheduling" part of the context name refers to an abstract slice of time to schedule and
234/// execute all transactions for a given bank for block verification or production. A context is
235/// expected to be used by a particular scheduler only for that duration of the time and to be
236/// disposed by the scheduler. Then, the scheduler may work on different banks with new
237/// `SchedulingContext`s.
238#[derive(Clone, Debug)]
239pub struct SchedulingContext {
240    mode: SchedulingMode,
241    bank: Arc<Bank>,
242}
243
244impl SchedulingContext {
245    pub fn new(bank: Arc<Bank>) -> Self {
246        // mode will be configurable later
247        Self {
248            mode: SchedulingMode::BlockVerification,
249            bank,
250        }
251    }
252
253    pub fn new_with_mode(mode: SchedulingMode, bank: Arc<Bank>) -> Self {
254        Self { mode, bank }
255    }
256
257    #[cfg(feature = "dev-context-only-utils")]
258    pub fn for_production(bank: Arc<Bank>) -> Self {
259        Self {
260            mode: SchedulingMode::BlockProduction,
261            bank,
262        }
263    }
264
265    pub fn mode(&self) -> SchedulingMode {
266        self.mode
267    }
268
269    pub fn bank(&self) -> &Arc<Bank> {
270        &self.bank
271    }
272
273    pub fn slot(&self) -> Slot {
274        self.bank().slot()
275    }
276}
277
278pub type ResultWithTimings = (Result<()>, ExecuteTimings);
279
280/// A hint from the bank about the reason the caller is waiting on its scheduler.
281#[derive(Debug, PartialEq, Eq, Clone, Copy)]
282enum WaitReason {
283    // The bank wants its scheduler to terminate after the completion of transaction execution, in
284    // order to freeze itself immediately thereafter. This is by far the most normal wait reason.
285    //
286    // Note that `wait_for_termination(TerminatedToFreeze)` must explicitly be done prior
287    // to Bank::freeze(). This can't be done inside Bank::freeze() implicitly to remain it
288    // infallible.
289    TerminatedToFreeze,
290    // The bank wants its scheduler to terminate just like `TerminatedToFreeze` and indicate that
291    // Drop::drop() is the caller.
292    DroppedFromBankForks,
293    // The bank wants its scheduler to pause after the completion without being returned to the
294    // pool. This is to update bank's recent blockhash and to collect scheduler's internally-held
295    // `ResultWithTimings` later.
296    PausedForRecentBlockhash,
297}
298
299impl WaitReason {
300    pub fn is_paused(&self) -> bool {
301        // Exhaustive `match` is preferred here than `matches!()` to trigger an explicit
302        // decision to be made, should we add new variants like `PausedForFooBar`...
303        match self {
304            WaitReason::PausedForRecentBlockhash => true,
305            WaitReason::TerminatedToFreeze | WaitReason::DroppedFromBankForks => false,
306        }
307    }
308
309    pub fn is_dropped(&self) -> bool {
310        // Exhaustive `match` is preferred here than `matches!()` to trigger an explicit
311        // decision to be made, should we add new variants like `PausedForFooBar`...
312        match self {
313            WaitReason::DroppedFromBankForks => true,
314            WaitReason::TerminatedToFreeze | WaitReason::PausedForRecentBlockhash => false,
315        }
316    }
317}
318
319#[allow(clippy::large_enum_variant)]
320#[derive(Debug)]
321pub enum SchedulerStatus {
322    /// Unified scheduler is disabled or installed scheduler is consumed by
323    /// [`InstalledScheduler::wait_for_termination`]. Note that transition to [`Self::Unavailable`]
324    /// from {[`Self::Active`], [`Self::Stale`]} is one-way (i.e. one-time) unlike [`Self::Active`]
325    /// <=> [`Self::Stale`] below.  Also, this variant is transiently used as a placeholder
326    /// internally when transitioning scheduler statuses, which isn't observable unless panic is
327    /// happening.
328    Unavailable,
329    /// Scheduler is installed into a bank; could be running or just be waiting for additional
330    /// transactions. This will be transitioned to [`Self::Stale`] after certain time (i.e.
331    /// `solana_unified_scheduler_pool::DEFAULT_TIMEOUT_DURATION`) has passed if its bank hasn't
332    /// been frozen since installed.
333    Active(InstalledSchedulerBox),
334    /// Scheduler has yet to freeze its associated bank even after it's taken too long since
335    /// installed, resulting in returning the scheduler back to the pool. Later, this can
336    /// immediately (i.e. transparently) be transitioned to [`Self::Active`] as soon as there's new
337    /// transaction to be executed (= [`BankWithScheduler::schedule_transaction_executions`] is
338    /// called, which internally calls [`BankWithSchedulerInner::with_active_scheduler`] to make
339    /// the transition happen).
340    Stale(InstalledSchedulerPoolArc, ResultWithTimings),
341}
342
343impl SchedulerStatus {
344    fn new(scheduler: Option<InstalledSchedulerBox>) -> Self {
345        match scheduler {
346            Some(scheduler) => SchedulerStatus::Active(scheduler),
347            None => SchedulerStatus::Unavailable,
348        }
349    }
350
351    fn transition_from_stale_to_active(
352        &mut self,
353        f: impl FnOnce(InstalledSchedulerPoolArc, ResultWithTimings) -> InstalledSchedulerBox,
354    ) {
355        let Self::Stale(pool, result_with_timings) = mem::replace(self, Self::Unavailable) else {
356            panic!("transition to Active failed: {self:?}");
357        };
358        *self = Self::Active(f(pool, result_with_timings));
359    }
360
361    fn maybe_transition_from_active_to_stale(
362        &mut self,
363        f: impl FnOnce(InstalledSchedulerBox) -> (InstalledSchedulerPoolArc, ResultWithTimings),
364    ) {
365        if !matches!(self, Self::Active(_scheduler)) {
366            return;
367        }
368        let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else {
369            unreachable!("not active: {self:?}");
370        };
371        let (pool, result_with_timings) = f(scheduler);
372        *self = Self::Stale(pool, result_with_timings);
373    }
374
375    fn transition_from_active_to_unavailable(&mut self) -> InstalledSchedulerBox {
376        let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else {
377            panic!("transition to Unavailable failed: {self:?}");
378        };
379        scheduler
380    }
381
382    fn transition_from_stale_to_unavailable(&mut self) -> ResultWithTimings {
383        let Self::Stale(_pool, result_with_timings) = mem::replace(self, Self::Unavailable) else {
384            panic!("transition to Unavailable failed: {self:?}");
385        };
386        result_with_timings
387    }
388
389    fn active_scheduler(&self) -> &InstalledSchedulerBox {
390        let SchedulerStatus::Active(active_scheduler) = self else {
391            panic!("not active: {self:?}");
392        };
393        active_scheduler
394    }
395}
396
397/// Very thin wrapper around Arc<Bank>
398///
399/// It brings type-safety against accidental mixing of bank and scheduler with different slots,
400/// which is a pretty dangerous condition. Also, it guarantees to call wait_for_termination() via
401/// ::drop() by DropBankService, which receives Vec<BankWithScheduler> from BankForks::set_root()'s
402/// pruning, mostly matching to Arc<Bank>'s lifetime by piggybacking on the pruning.
403///
404/// Semantically, a scheduler is tightly coupled with a particular bank. But scheduler wasn't put
405/// into Bank fields to avoid circular-references (a scheduler needs to refer to its accompanied
406/// Arc<Bank>). BankWithScheduler behaves almost like Arc<Bank>. It only adds a few of transaction
407/// scheduling and scheduler management functions. For this reason, `bank` variable names should be
408/// used for `BankWithScheduler` across codebase.
409///
410/// BankWithScheduler even implements Deref for convenience. And Clone is omitted to implement to
411/// avoid ambiguity as to which to clone: BankWithScheduler or Arc<Bank>. Use
412/// clone_without_scheduler() for Arc<Bank>. Otherwise, use clone_with_scheduler() (this should be
413/// unusual outside scheduler code-path)
414#[derive(Debug)]
415pub struct BankWithScheduler {
416    inner: Arc<BankWithSchedulerInner>,
417}
418
419#[derive(Debug)]
420pub struct BankWithSchedulerInner {
421    bank: Arc<Bank>,
422    scheduler: InstalledSchedulerRwLock,
423}
424pub type InstalledSchedulerRwLock = RwLock<SchedulerStatus>;
425
426impl BankWithScheduler {
427    #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
428    pub(crate) fn new(bank: Arc<Bank>, scheduler: Option<InstalledSchedulerBox>) -> Self {
429        if let Some(bank_in_context) = scheduler
430            .as_ref()
431            .map(|scheduler| scheduler.context().bank())
432        {
433            assert!(Arc::ptr_eq(&bank, bank_in_context));
434        }
435
436        Self {
437            inner: Arc::new(BankWithSchedulerInner {
438                bank,
439                scheduler: RwLock::new(SchedulerStatus::new(scheduler)),
440            }),
441        }
442    }
443
444    pub fn new_without_scheduler(bank: Arc<Bank>) -> Self {
445        Self::new(bank, None)
446    }
447
448    pub fn clone_with_scheduler(&self) -> BankWithScheduler {
449        BankWithScheduler {
450            inner: self.inner.clone(),
451        }
452    }
453
454    pub fn clone_without_scheduler(&self) -> Arc<Bank> {
455        self.inner.bank.clone()
456    }
457
458    pub fn register_tick(&self, hash: &Hash) {
459        self.inner.bank.register_tick(hash, &self.inner.scheduler);
460    }
461
462    #[cfg(feature = "dev-context-only-utils")]
463    pub fn fill_bank_with_ticks_for_tests(&self) {
464        self.do_fill_bank_with_ticks_for_tests(&self.inner.scheduler);
465    }
466
467    pub fn has_installed_scheduler(&self) -> bool {
468        !matches!(
469            &*self.inner.scheduler.read().unwrap(),
470            SchedulerStatus::Unavailable
471        )
472    }
473
474    /// Schedule the transaction as long as the scheduler hasn't been aborted.
475    ///
476    /// If the scheduler has been aborted, this doesn't schedule the transaction, instead just
477    /// return the error of prior scheduled transaction.
478    ///
479    /// Calling this will panic if the installed scheduler is Unavailable (the bank is
480    /// wait_for_termination()-ed or the unified scheduler is disabled in the first place).
481    pub fn schedule_transaction_executions(
482        &self,
483        transactions_with_indexes: impl ExactSizeIterator<
484            Item = (RuntimeTransaction<SanitizedTransaction>, usize),
485        >,
486    ) -> Result<()> {
487        trace!(
488            "schedule_transaction_executions(): {} txs",
489            transactions_with_indexes.len()
490        );
491
492        let schedule_result: ScheduleResult = self.inner.with_active_scheduler(|scheduler| {
493            for (sanitized_transaction, index) in transactions_with_indexes {
494                scheduler.schedule_execution(sanitized_transaction, index)?;
495            }
496            Ok(())
497        });
498
499        if schedule_result.is_err() {
500            // This write lock isn't atomic with the above the read lock. So, another thread
501            // could have called .recover_error_after_abort() while we're literally stuck at
502            // the gaps of these locks (i.e. this comment in source code wise) under extreme
503            // race conditions. Thus, .recover_error_after_abort() is made idempotetnt for that
504            // consideration in mind.
505            //
506            // Lastly, this non-atomic nature is intentional for optimizing the fast code-path
507            return Err(self.inner.retrieve_error_after_schedule_failure());
508        }
509
510        Ok(())
511    }
512
513    #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
514    pub(crate) fn create_timeout_listener(&self) -> TimeoutListener {
515        self.inner.do_create_timeout_listener()
516    }
517
518    // take needless &mut only to communicate its semantic mutability to humans...
519    #[cfg(feature = "dev-context-only-utils")]
520    pub fn drop_scheduler(&mut self) {
521        self.inner.drop_scheduler();
522    }
523
524    pub(crate) fn wait_for_paused_scheduler(bank: &Bank, scheduler: &InstalledSchedulerRwLock) {
525        let maybe_result_with_timings = BankWithSchedulerInner::wait_for_scheduler_termination(
526            bank,
527            scheduler,
528            WaitReason::PausedForRecentBlockhash,
529        );
530        assert!(
531            maybe_result_with_timings.is_none(),
532            "Premature result was returned from scheduler after paused (slot: {})",
533            bank.slot(),
534        );
535    }
536
537    #[must_use]
538    pub fn wait_for_completed_scheduler(&self) -> Option<ResultWithTimings> {
539        BankWithSchedulerInner::wait_for_scheduler_termination(
540            &self.inner.bank,
541            &self.inner.scheduler,
542            WaitReason::TerminatedToFreeze,
543        )
544    }
545
546    pub const fn no_scheduler_available() -> InstalledSchedulerRwLock {
547        RwLock::new(SchedulerStatus::Unavailable)
548    }
549}
550
551impl BankWithSchedulerInner {
552    fn with_active_scheduler(
553        self: &Arc<Self>,
554        f: impl FnOnce(&InstalledSchedulerBox) -> ScheduleResult,
555    ) -> ScheduleResult {
556        let scheduler = self.scheduler.read().unwrap();
557        match &*scheduler {
558            SchedulerStatus::Active(scheduler) => {
559                // This is the fast path, needing single read-lock most of time.
560                f(scheduler)
561            }
562            SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => {
563                trace!(
564                    "with_active_scheduler: bank (slot: {}) has a stale aborted scheduler...",
565                    self.bank.slot(),
566                );
567                Err(SchedulerAborted)
568            }
569            SchedulerStatus::Stale(pool, _result_with_timings) => {
570                let pool = pool.clone();
571                drop(scheduler);
572
573                let context = SchedulingContext::new(self.bank.clone());
574                let mut scheduler = self.scheduler.write().unwrap();
575                trace!("with_active_scheduler: {:?}", scheduler);
576                scheduler.transition_from_stale_to_active(|pool, result_with_timings| {
577                    let scheduler = pool.take_resumed_scheduler(context, result_with_timings);
578                    info!(
579                        "with_active_scheduler: bank (slot: {}) got active, taking scheduler (id: {})",
580                        self.bank.slot(),
581                        scheduler.id(),
582                    );
583                    scheduler
584                });
585                drop(scheduler);
586
587                let scheduler = self.scheduler.read().unwrap();
588                // Re-register a new timeout listener only after acquiring the read lock;
589                // Otherwise, the listener would again put scheduler into Stale before the read
590                // lock under an extremely-rare race condition, causing panic below in
591                // active_scheduler().
592                pool.register_timeout_listener(self.do_create_timeout_listener());
593                f(scheduler.active_scheduler())
594            }
595            SchedulerStatus::Unavailable => unreachable!("no installed scheduler"),
596        }
597    }
598
599    fn do_create_timeout_listener(self: &Arc<Self>) -> TimeoutListener {
600        let weak_bank = Arc::downgrade(self);
601        TimeoutListener::new(move |pool| {
602            let Some(bank) = weak_bank.upgrade() else {
603                // BankWithSchedulerInner is already dropped, indicating successful and timely
604                // `wait_for_termination()` on the bank prior to this triggering of the timeout,
605                // rendering this callback invocation no-op.
606                return;
607            };
608
609            let Ok(mut scheduler) = bank.scheduler.write() else {
610                // BankWithScheduler's lock is poisoned...
611                return;
612            };
613
614            // Reaching here means that it's been awhile since this active scheduler is taken from
615            // the pool and yet it has yet to be `wait_for_termination()`-ed. To avoid unbounded
616            // thread creation under forky condition, return the scheduler for now, even if the
617            // bank could process more transactions later.
618            scheduler.maybe_transition_from_active_to_stale(|scheduler| {
619                // Return the installed scheduler back to the scheduler pool as soon as the
620                // scheduler indicates the completion of all currently-scheduled transaction
621                // executions by `solana_unified_scheduler_pool::ThreadManager::end_session()`
622                // internally.
623
624                let id = scheduler.id();
625                let (result_with_timings, uninstalled_scheduler) =
626                    scheduler.wait_for_termination(false);
627                uninstalled_scheduler.return_to_pool();
628                info!(
629                    "timeout_listener: bank (slot: {}) got stale, returning scheduler (id: {})",
630                    bank.bank.slot(),
631                    id,
632                );
633                (pool, result_with_timings)
634            });
635            trace!("timeout_listener: {:?}", scheduler);
636        })
637    }
638
639    /// This must not be called until `Err(SchedulerAborted)` is observed. Violating this should
640    /// `panic!()`.
641    fn retrieve_error_after_schedule_failure(&self) -> TransactionError {
642        let mut scheduler = self.scheduler.write().unwrap();
643        match &mut *scheduler {
644            SchedulerStatus::Active(scheduler) => scheduler.recover_error_after_abort(),
645            SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => {
646                result.clone().unwrap_err()
647            }
648            _ => unreachable!("no error in {:?}", self.scheduler),
649        }
650    }
651
652    #[must_use]
653    fn wait_for_completed_scheduler_from_drop(&self) -> Option<ResultWithTimings> {
654        Self::wait_for_scheduler_termination(
655            &self.bank,
656            &self.scheduler,
657            WaitReason::DroppedFromBankForks,
658        )
659    }
660
661    #[must_use]
662    fn wait_for_scheduler_termination(
663        bank: &Bank,
664        scheduler: &InstalledSchedulerRwLock,
665        reason: WaitReason,
666    ) -> Option<ResultWithTimings> {
667        debug!(
668            "wait_for_scheduler_termination(slot: {}, reason: {:?}): started at {:?}...",
669            bank.slot(),
670            reason,
671            thread::current(),
672        );
673
674        let mut scheduler = scheduler.write().unwrap();
675        let (was_noop, result_with_timings) = match &mut *scheduler {
676            SchedulerStatus::Active(scheduler) if reason.is_paused() => {
677                scheduler.pause_for_recent_blockhash();
678                (false, None)
679            }
680            SchedulerStatus::Active(_scheduler) => {
681                let scheduler = scheduler.transition_from_active_to_unavailable();
682                let (result_with_timings, uninstalled_scheduler) =
683                    scheduler.wait_for_termination(reason.is_dropped());
684                uninstalled_scheduler.return_to_pool();
685                (false, Some(result_with_timings))
686            }
687            SchedulerStatus::Stale(_pool, _result_with_timings) if reason.is_paused() => {
688                // Do nothing for pauses because the scheduler termination is guaranteed to be
689                // called later.
690                (true, None)
691            }
692            SchedulerStatus::Stale(_pool, _result_with_timings) => {
693                let result_with_timings = scheduler.transition_from_stale_to_unavailable();
694                (true, Some(result_with_timings))
695            }
696            SchedulerStatus::Unavailable => (true, None),
697        };
698        debug!(
699            "wait_for_scheduler_termination(slot: {}, reason: {:?}): noop: {:?}, result: {:?} at {:?}...",
700            bank.slot(),
701            reason,
702            was_noop,
703            result_with_timings.as_ref().map(|(result, _)| result),
704            thread::current(),
705        );
706        trace!(
707            "wait_for_scheduler_termination(result_with_timings: {:?})",
708            result_with_timings,
709        );
710
711        result_with_timings
712    }
713
714    fn drop_scheduler(&self) {
715        if thread::panicking() {
716            error!(
717                "BankWithSchedulerInner::drop_scheduler(): slot: {} skipping due to already panicking...",
718                self.bank.slot(),
719            );
720            return;
721        }
722
723        // There's no guarantee ResultWithTimings is available or not at all when being dropped.
724        if let Some(Err(err)) = self
725            .wait_for_completed_scheduler_from_drop()
726            .map(|(result, _timings)| result)
727        {
728            warn!(
729                "BankWithSchedulerInner::drop_scheduler(): slot: {} discarding error from scheduler: {:?}",
730                self.bank.slot(),
731                err,
732            );
733        }
734    }
735}
736
737impl Drop for BankWithSchedulerInner {
738    fn drop(&mut self) {
739        self.drop_scheduler();
740    }
741}
742
743impl Deref for BankWithScheduler {
744    type Target = Arc<Bank>;
745
746    fn deref(&self) -> &Self::Target {
747        &self.inner.bank
748    }
749}
750
751#[cfg(test)]
752mod tests {
753    use {
754        super::*,
755        crate::{
756            bank::test_utils::goto_end_of_slot_with_scheduler,
757            genesis_utils::{create_genesis_config, GenesisConfigInfo},
758        },
759        assert_matches::assert_matches,
760        mockall::Sequence,
761        solana_sdk::system_transaction,
762        std::sync::Mutex,
763    };
764
765    fn setup_mocked_scheduler_with_extra(
766        bank: Arc<Bank>,
767        is_dropped_flags: impl Iterator<Item = bool>,
768        f: Option<impl Fn(&mut MockInstalledScheduler)>,
769    ) -> InstalledSchedulerBox {
770        let mut mock = MockInstalledScheduler::new();
771        let seq = Arc::new(Mutex::new(Sequence::new()));
772
773        mock.expect_context()
774            .times(1)
775            .in_sequence(&mut seq.lock().unwrap())
776            .return_const(SchedulingContext::new(bank));
777
778        for wait_reason in is_dropped_flags {
779            let seq_cloned = seq.clone();
780            mock.expect_wait_for_termination()
781                .with(mockall::predicate::eq(wait_reason))
782                .times(1)
783                .in_sequence(&mut seq.lock().unwrap())
784                .returning(move |_| {
785                    let mut mock_uninstalled = MockUninstalledScheduler::new();
786                    mock_uninstalled
787                        .expect_return_to_pool()
788                        .times(1)
789                        .in_sequence(&mut seq_cloned.lock().unwrap())
790                        .returning(|| ());
791                    (
792                        (Ok(()), ExecuteTimings::default()),
793                        Box::new(mock_uninstalled),
794                    )
795                });
796        }
797
798        if let Some(f) = f {
799            f(&mut mock);
800        }
801
802        Box::new(mock)
803    }
804
805    fn setup_mocked_scheduler(
806        bank: Arc<Bank>,
807        is_dropped_flags: impl Iterator<Item = bool>,
808    ) -> InstalledSchedulerBox {
809        setup_mocked_scheduler_with_extra(
810            bank,
811            is_dropped_flags,
812            None::<fn(&mut MockInstalledScheduler) -> ()>,
813        )
814    }
815
816    #[test]
817    fn test_scheduler_normal_termination() {
818        solana_logger::setup();
819
820        let bank = Arc::new(Bank::default_for_tests());
821        let bank = BankWithScheduler::new(
822            bank.clone(),
823            Some(setup_mocked_scheduler(bank, [false].into_iter())),
824        );
825        assert!(bank.has_installed_scheduler());
826        assert_matches!(bank.wait_for_completed_scheduler(), Some(_));
827
828        // Repeating to call wait_for_completed_scheduler() is okay with no ResultWithTimings being
829        // returned.
830        assert!(!bank.has_installed_scheduler());
831        assert_matches!(bank.wait_for_completed_scheduler(), None);
832    }
833
834    #[test]
835    fn test_no_scheduler_termination() {
836        solana_logger::setup();
837
838        let bank = Arc::new(Bank::default_for_tests());
839        let bank = BankWithScheduler::new_without_scheduler(bank);
840
841        // Calling wait_for_completed_scheduler() is noop, when no scheduler is installed.
842        assert!(!bank.has_installed_scheduler());
843        assert_matches!(bank.wait_for_completed_scheduler(), None);
844    }
845
846    #[test]
847    fn test_scheduler_termination_from_drop() {
848        solana_logger::setup();
849
850        let bank = Arc::new(Bank::default_for_tests());
851        let bank = BankWithScheduler::new(
852            bank.clone(),
853            Some(setup_mocked_scheduler(bank, [true].into_iter())),
854        );
855        drop(bank);
856    }
857
858    #[test]
859    fn test_scheduler_pause() {
860        solana_logger::setup();
861
862        let bank = Arc::new(crate::bank::tests::create_simple_test_bank(42));
863        let bank = BankWithScheduler::new(
864            bank.clone(),
865            Some(setup_mocked_scheduler_with_extra(
866                bank,
867                [false].into_iter(),
868                Some(|mocked: &mut MockInstalledScheduler| {
869                    mocked
870                        .expect_pause_for_recent_blockhash()
871                        .times(1)
872                        .returning(|| ());
873                }),
874            )),
875        );
876        goto_end_of_slot_with_scheduler(&bank);
877        assert_matches!(bank.wait_for_completed_scheduler(), Some(_));
878    }
879
880    fn do_test_schedule_execution(should_succeed: bool) {
881        solana_logger::setup();
882
883        let GenesisConfigInfo {
884            genesis_config,
885            mint_keypair,
886            ..
887        } = create_genesis_config(10_000);
888        let tx0 = RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer(
889            &mint_keypair,
890            &solana_pubkey::new_rand(),
891            2,
892            genesis_config.hash(),
893        ));
894        let bank = Arc::new(Bank::new_for_tests(&genesis_config));
895        let mocked_scheduler = setup_mocked_scheduler_with_extra(
896            bank.clone(),
897            [true].into_iter(),
898            Some(|mocked: &mut MockInstalledScheduler| {
899                if should_succeed {
900                    mocked
901                        .expect_schedule_execution()
902                        .times(1)
903                        .returning(|_, _| Ok(()));
904                } else {
905                    mocked
906                        .expect_schedule_execution()
907                        .times(1)
908                        .returning(|_, _| Err(SchedulerAborted));
909                    mocked
910                        .expect_recover_error_after_abort()
911                        .times(1)
912                        .returning(|| TransactionError::InsufficientFundsForFee);
913                }
914            }),
915        );
916
917        let bank = BankWithScheduler::new(bank, Some(mocked_scheduler));
918        let result = bank.schedule_transaction_executions([(tx0, 0)].into_iter());
919        if should_succeed {
920            assert_matches!(result, Ok(()));
921        } else {
922            assert_matches!(result, Err(TransactionError::InsufficientFundsForFee));
923        }
924    }
925
926    #[test]
927    fn test_schedule_execution_success() {
928        do_test_schedule_execution(true);
929    }
930
931    #[test]
932    fn test_schedule_execution_failure() {
933        do_test_schedule_execution(false);
934    }
935}