solana_runtime/installed_scheduler_pool.rs
1//! Transaction processing glue code, mainly consisting of Object-safe traits
2//!
3//! [InstalledSchedulerPool] lends one of pooled [InstalledScheduler]s as wrapped in
4//! [BankWithScheduler], which can be used by `ReplayStage` and `BankingStage` for transaction
5//! execution. After use, the scheduler will be returned to the pool.
6//!
7//! [InstalledScheduler] can be fed with [SanitizedTransaction]s. Then, it schedules those
8//! executions and commits those results into the associated _bank_.
9//!
10//! It's generally assumed that each [InstalledScheduler] is backed by multiple threads for
11//! parallel transaction processing and there are multiple independent schedulers inside a single
12//! instance of [InstalledSchedulerPool].
13//!
14//! Dynamic dispatch was inevitable due to the desire to piggyback on
15//! [BankForks](crate::bank_forks::BankForks)'s pruning for scheduler lifecycle management as the
16//! common place both for `ReplayStage` and `BankingStage` and the resultant need of invoking
17//! actual implementations provided by the dependent crate (`solana-unified-scheduler-pool`, which
18//! in turn depends on `solana-ledger`, which in turn depends on `solana-runtime`), avoiding a
19//! cyclic dependency.
20//!
21//! See [InstalledScheduler] for visualized interaction.
22
23use {
24 crate::bank::Bank,
25 log::*,
26 solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
27 solana_sdk::{
28 clock::Slot,
29 hash::Hash,
30 transaction::{Result, SanitizedTransaction, TransactionError},
31 },
32 solana_timings::ExecuteTimings,
33 solana_unified_scheduler_logic::SchedulingMode,
34 std::{
35 fmt::{self, Debug},
36 mem,
37 ops::Deref,
38 sync::{Arc, RwLock},
39 thread,
40 },
41};
42#[cfg(feature = "dev-context-only-utils")]
43use {mockall::automock, qualifier_attr::qualifiers};
44
45pub fn initialized_result_with_timings() -> ResultWithTimings {
46 (Ok(()), ExecuteTimings::default())
47}
48
49pub trait InstalledSchedulerPool: Send + Sync + Debug {
50 /// A very thin wrapper of [`Self::take_resumed_scheduler`] to take a scheduler from this pool
51 /// for a brand-new bank.
52 fn take_scheduler(&self, context: SchedulingContext) -> InstalledSchedulerBox {
53 self.take_resumed_scheduler(context, initialized_result_with_timings())
54 }
55
56 fn take_resumed_scheduler(
57 &self,
58 context: SchedulingContext,
59 result_with_timings: ResultWithTimings,
60 ) -> InstalledSchedulerBox;
61
62 /// Registers an opaque timeout listener.
63 ///
64 /// This method and the passed `struct` called [`TimeoutListener`] are very opaque by purpose.
65 /// Specifically, it doesn't provide any way to tell which listener is semantically associated
66 /// to which particular scheduler. That's because proper _unregistration_ is omitted at the
67 /// timing of scheduler returning to reduce latency of the normal block-verification code-path,
68 /// relying on eventual stale listener clean-up by `solScCleaner`.
69 fn register_timeout_listener(&self, timeout_listener: TimeoutListener);
70}
71
72#[derive(Debug)]
73pub struct SchedulerAborted;
74pub type ScheduleResult = std::result::Result<(), SchedulerAborted>;
75
76pub struct TimeoutListener {
77 callback: Box<dyn FnOnce(InstalledSchedulerPoolArc) + Sync + Send>,
78}
79
80impl TimeoutListener {
81 pub(crate) fn new(f: impl FnOnce(InstalledSchedulerPoolArc) + Sync + Send + 'static) -> Self {
82 Self {
83 callback: Box::new(f),
84 }
85 }
86
87 pub fn trigger(self, pool: InstalledSchedulerPoolArc) {
88 (self.callback)(pool);
89 }
90}
91
92impl Debug for TimeoutListener {
93 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94 write!(f, "TimeoutListener({self:p})")
95 }
96}
97
98#[cfg_attr(doc, aquamarine::aquamarine)]
99/// Schedules, executes, and commits transactions under encapsulated implementation
100///
101/// The following chart illustrates the ownership/reference interaction between inter-dependent
102/// objects across crates:
103///
104/// ```mermaid
105/// graph TD
106/// Bank["Arc#lt;Bank#gt;"]
107///
108/// subgraph solana-runtime[<span style="font-size: 70%">solana-runtime</span>]
109/// BankForks;
110/// BankWithScheduler;
111/// Bank;
112/// LoadExecuteAndCommitTransactions([<span style="font-size: 67%">load_execute_and_commit_transactions#lpar;#rpar;</span>]);
113/// SchedulingContext;
114/// InstalledSchedulerPool{{InstalledSchedulerPool}};
115/// InstalledScheduler{{InstalledScheduler}};
116/// end
117///
118/// subgraph solana-unified-scheduler-pool[<span style="font-size: 70%">solana-unified-scheduler-pool</span>]
119/// SchedulerPool;
120/// PooledScheduler;
121/// ScheduleExecution(["schedule_execution()"]);
122/// end
123///
124/// subgraph solana-ledger[<span style="font-size: 60%">solana-ledger</span>]
125/// ExecuteBatch(["execute_batch()"]);
126/// end
127///
128/// ScheduleExecution -. calls .-> ExecuteBatch;
129/// BankWithScheduler -. dyn-calls .-> ScheduleExecution;
130/// ExecuteBatch -. calls .-> LoadExecuteAndCommitTransactions;
131/// linkStyle 0,1,2 stroke:gray,color:gray;
132///
133/// BankForks -- owns --> BankWithScheduler;
134/// BankForks -- owns --> InstalledSchedulerPool;
135/// BankWithScheduler -- refs --> Bank;
136/// BankWithScheduler -- owns --> InstalledScheduler;
137/// SchedulingContext -- refs --> Bank;
138/// InstalledScheduler -- owns --> SchedulingContext;
139///
140/// SchedulerPool -- owns --> PooledScheduler;
141/// SchedulerPool -. impls .-> InstalledSchedulerPool;
142/// PooledScheduler -. impls .-> InstalledScheduler;
143/// PooledScheduler -- refs --> SchedulerPool;
144/// ```
145#[cfg_attr(feature = "dev-context-only-utils", automock)]
146// suppress false clippy complaints arising from mockall-derive:
147// warning: `#[must_use]` has no effect when applied to a struct field
148#[cfg_attr(feature = "dev-context-only-utils", allow(unused_attributes))]
149pub trait InstalledScheduler: Send + Sync + Debug + 'static {
150 fn id(&self) -> SchedulerId;
151 fn context(&self) -> &SchedulingContext;
152
153 /// Schedule transaction for execution.
154 ///
155 /// This non-blocking function will return immediately without waiting for actual execution.
156 ///
157 /// Calling this is illegal as soon as `wait_for_termination()` is called. It would result in
158 /// fatal logic error.
159 ///
160 /// Note that the returned result indicates whether the scheduler has been aborted due to a
161 /// previously-scheduled bad transaction, which terminates further block verification. So,
162 /// almost always, the returned error isn't due to the merely scheduling of the current
163 /// transaction itself. At this point, calling this does nothing anymore while it's still safe
164 /// to do. As soon as notified, callers are expected to stop processing upcoming transactions
165 /// of the same `SchedulingContext` (i.e. same block). Internally, the aborted scheduler will
166 /// be disposed cleanly, not repooled, after `wait_for_termination()` is called like
167 /// not-aborted schedulers.
168 ///
169 /// Caller can acquire the error by calling a separate function called
170 /// `recover_error_after_abort()`, which requires `&mut self`, instead of `&self`. This
171 /// separation and the convoluted returned value semantics explained above are intentional to
172 /// optimize the fast code-path of normal transaction scheduling to be multi-threaded at the
173 /// cost of far slower error code-path while giving implementors increased flexibility by
174 /// having &mut.
175 fn schedule_execution(
176 &self,
177 transaction: RuntimeTransaction<SanitizedTransaction>,
178 index: usize,
179 ) -> ScheduleResult;
180
181 /// Return the error which caused the scheduler to abort.
182 ///
183 /// Note that this must not be called until it's observed that `schedule_execution()` has
184 /// returned `Err(SchedulerAborted)`. Violating this should `panic!()`.
185 ///
186 /// That said, calling this multiple times is completely acceptable after the error observation
187 /// from `schedule_execution()`. While it's not guaranteed, the same `.clone()`-ed errors of
188 /// the first bad transaction are usually returned across invocations.
189 fn recover_error_after_abort(&mut self) -> TransactionError;
190
191 /// Wait for a scheduler to terminate after processing.
192 ///
193 /// This function blocks the current thread while waiting for the scheduler to complete all of
194 /// the executions for the scheduled transactions and to return the finalized
195 /// `ResultWithTimings`. This function still blocks for short period of time even in the case
196 /// of aborted schedulers to gracefully shutdown the scheduler (like thread joining).
197 ///
198 /// Along with the result being returned, this function also makes the scheduler itself
199 /// uninstalled from the bank by transforming the consumed self.
200 ///
201 /// If no transaction is scheduled, the result and timing will be `Ok(())` and
202 /// `ExecuteTimings::default()` respectively.
203 fn wait_for_termination(
204 self: Box<Self>,
205 is_dropped: bool,
206 ) -> (ResultWithTimings, UninstalledSchedulerBox);
207
208 /// Pause a scheduler after processing to update bank's recent blockhash.
209 ///
210 /// This function blocks the current thread like wait_for_termination(). However, the scheduler
211 /// won't be consumed. This means the scheduler is responsible to retain the finalized
212 /// `ResultWithTimings` internally until it's `wait_for_termination()`-ed to collect the result
213 /// later.
214 fn pause_for_recent_blockhash(&mut self);
215}
216
217#[cfg_attr(feature = "dev-context-only-utils", automock)]
218pub trait UninstalledScheduler: Send + Sync + Debug + 'static {
219 fn return_to_pool(self: Box<Self>);
220}
221
222pub type InstalledSchedulerBox = Box<dyn InstalledScheduler>;
223pub type UninstalledSchedulerBox = Box<dyn UninstalledScheduler>;
224
225pub type InstalledSchedulerPoolArc = Arc<dyn InstalledSchedulerPool>;
226
227pub type SchedulerId = u64;
228
229/// A small context to propagate a bank and its scheduling mode to the scheduler subsystem.
230///
231/// Note that this isn't called `SchedulerContext` because the contexts aren't associated with
232/// schedulers one by one. A scheduler will use many SchedulingContexts during its lifetime.
233/// "Scheduling" part of the context name refers to an abstract slice of time to schedule and
234/// execute all transactions for a given bank for block verification or production. A context is
235/// expected to be used by a particular scheduler only for that duration of the time and to be
236/// disposed by the scheduler. Then, the scheduler may work on different banks with new
237/// `SchedulingContext`s.
238#[derive(Clone, Debug)]
239pub struct SchedulingContext {
240 mode: SchedulingMode,
241 bank: Arc<Bank>,
242}
243
244impl SchedulingContext {
245 pub fn new(bank: Arc<Bank>) -> Self {
246 // mode will be configurable later
247 Self {
248 mode: SchedulingMode::BlockVerification,
249 bank,
250 }
251 }
252
253 pub fn new_with_mode(mode: SchedulingMode, bank: Arc<Bank>) -> Self {
254 Self { mode, bank }
255 }
256
257 #[cfg(feature = "dev-context-only-utils")]
258 pub fn for_production(bank: Arc<Bank>) -> Self {
259 Self {
260 mode: SchedulingMode::BlockProduction,
261 bank,
262 }
263 }
264
265 pub fn mode(&self) -> SchedulingMode {
266 self.mode
267 }
268
269 pub fn bank(&self) -> &Arc<Bank> {
270 &self.bank
271 }
272
273 pub fn slot(&self) -> Slot {
274 self.bank().slot()
275 }
276}
277
278pub type ResultWithTimings = (Result<()>, ExecuteTimings);
279
280/// A hint from the bank about the reason the caller is waiting on its scheduler.
281#[derive(Debug, PartialEq, Eq, Clone, Copy)]
282enum WaitReason {
283 // The bank wants its scheduler to terminate after the completion of transaction execution, in
284 // order to freeze itself immediately thereafter. This is by far the most normal wait reason.
285 //
286 // Note that `wait_for_termination(TerminatedToFreeze)` must explicitly be done prior
287 // to Bank::freeze(). This can't be done inside Bank::freeze() implicitly to remain it
288 // infallible.
289 TerminatedToFreeze,
290 // The bank wants its scheduler to terminate just like `TerminatedToFreeze` and indicate that
291 // Drop::drop() is the caller.
292 DroppedFromBankForks,
293 // The bank wants its scheduler to pause after the completion without being returned to the
294 // pool. This is to update bank's recent blockhash and to collect scheduler's internally-held
295 // `ResultWithTimings` later.
296 PausedForRecentBlockhash,
297}
298
299impl WaitReason {
300 pub fn is_paused(&self) -> bool {
301 // Exhaustive `match` is preferred here than `matches!()` to trigger an explicit
302 // decision to be made, should we add new variants like `PausedForFooBar`...
303 match self {
304 WaitReason::PausedForRecentBlockhash => true,
305 WaitReason::TerminatedToFreeze | WaitReason::DroppedFromBankForks => false,
306 }
307 }
308
309 pub fn is_dropped(&self) -> bool {
310 // Exhaustive `match` is preferred here than `matches!()` to trigger an explicit
311 // decision to be made, should we add new variants like `PausedForFooBar`...
312 match self {
313 WaitReason::DroppedFromBankForks => true,
314 WaitReason::TerminatedToFreeze | WaitReason::PausedForRecentBlockhash => false,
315 }
316 }
317}
318
319#[allow(clippy::large_enum_variant)]
320#[derive(Debug)]
321pub enum SchedulerStatus {
322 /// Unified scheduler is disabled or installed scheduler is consumed by
323 /// [`InstalledScheduler::wait_for_termination`]. Note that transition to [`Self::Unavailable`]
324 /// from {[`Self::Active`], [`Self::Stale`]} is one-way (i.e. one-time) unlike [`Self::Active`]
325 /// <=> [`Self::Stale`] below. Also, this variant is transiently used as a placeholder
326 /// internally when transitioning scheduler statuses, which isn't observable unless panic is
327 /// happening.
328 Unavailable,
329 /// Scheduler is installed into a bank; could be running or just be waiting for additional
330 /// transactions. This will be transitioned to [`Self::Stale`] after certain time (i.e.
331 /// `solana_unified_scheduler_pool::DEFAULT_TIMEOUT_DURATION`) has passed if its bank hasn't
332 /// been frozen since installed.
333 Active(InstalledSchedulerBox),
334 /// Scheduler has yet to freeze its associated bank even after it's taken too long since
335 /// installed, resulting in returning the scheduler back to the pool. Later, this can
336 /// immediately (i.e. transparently) be transitioned to [`Self::Active`] as soon as there's new
337 /// transaction to be executed (= [`BankWithScheduler::schedule_transaction_executions`] is
338 /// called, which internally calls [`BankWithSchedulerInner::with_active_scheduler`] to make
339 /// the transition happen).
340 Stale(InstalledSchedulerPoolArc, ResultWithTimings),
341}
342
343impl SchedulerStatus {
344 fn new(scheduler: Option<InstalledSchedulerBox>) -> Self {
345 match scheduler {
346 Some(scheduler) => SchedulerStatus::Active(scheduler),
347 None => SchedulerStatus::Unavailable,
348 }
349 }
350
351 fn transition_from_stale_to_active(
352 &mut self,
353 f: impl FnOnce(InstalledSchedulerPoolArc, ResultWithTimings) -> InstalledSchedulerBox,
354 ) {
355 let Self::Stale(pool, result_with_timings) = mem::replace(self, Self::Unavailable) else {
356 panic!("transition to Active failed: {self:?}");
357 };
358 *self = Self::Active(f(pool, result_with_timings));
359 }
360
361 fn maybe_transition_from_active_to_stale(
362 &mut self,
363 f: impl FnOnce(InstalledSchedulerBox) -> (InstalledSchedulerPoolArc, ResultWithTimings),
364 ) {
365 if !matches!(self, Self::Active(_scheduler)) {
366 return;
367 }
368 let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else {
369 unreachable!("not active: {self:?}");
370 };
371 let (pool, result_with_timings) = f(scheduler);
372 *self = Self::Stale(pool, result_with_timings);
373 }
374
375 fn transition_from_active_to_unavailable(&mut self) -> InstalledSchedulerBox {
376 let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else {
377 panic!("transition to Unavailable failed: {self:?}");
378 };
379 scheduler
380 }
381
382 fn transition_from_stale_to_unavailable(&mut self) -> ResultWithTimings {
383 let Self::Stale(_pool, result_with_timings) = mem::replace(self, Self::Unavailable) else {
384 panic!("transition to Unavailable failed: {self:?}");
385 };
386 result_with_timings
387 }
388
389 fn active_scheduler(&self) -> &InstalledSchedulerBox {
390 let SchedulerStatus::Active(active_scheduler) = self else {
391 panic!("not active: {self:?}");
392 };
393 active_scheduler
394 }
395}
396
397/// Very thin wrapper around Arc<Bank>
398///
399/// It brings type-safety against accidental mixing of bank and scheduler with different slots,
400/// which is a pretty dangerous condition. Also, it guarantees to call wait_for_termination() via
401/// ::drop() by DropBankService, which receives Vec<BankWithScheduler> from BankForks::set_root()'s
402/// pruning, mostly matching to Arc<Bank>'s lifetime by piggybacking on the pruning.
403///
404/// Semantically, a scheduler is tightly coupled with a particular bank. But scheduler wasn't put
405/// into Bank fields to avoid circular-references (a scheduler needs to refer to its accompanied
406/// Arc<Bank>). BankWithScheduler behaves almost like Arc<Bank>. It only adds a few of transaction
407/// scheduling and scheduler management functions. For this reason, `bank` variable names should be
408/// used for `BankWithScheduler` across codebase.
409///
410/// BankWithScheduler even implements Deref for convenience. And Clone is omitted to implement to
411/// avoid ambiguity as to which to clone: BankWithScheduler or Arc<Bank>. Use
412/// clone_without_scheduler() for Arc<Bank>. Otherwise, use clone_with_scheduler() (this should be
413/// unusual outside scheduler code-path)
414#[derive(Debug)]
415pub struct BankWithScheduler {
416 inner: Arc<BankWithSchedulerInner>,
417}
418
419#[derive(Debug)]
420pub struct BankWithSchedulerInner {
421 bank: Arc<Bank>,
422 scheduler: InstalledSchedulerRwLock,
423}
424pub type InstalledSchedulerRwLock = RwLock<SchedulerStatus>;
425
426impl BankWithScheduler {
427 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
428 pub(crate) fn new(bank: Arc<Bank>, scheduler: Option<InstalledSchedulerBox>) -> Self {
429 if let Some(bank_in_context) = scheduler
430 .as_ref()
431 .map(|scheduler| scheduler.context().bank())
432 {
433 assert!(Arc::ptr_eq(&bank, bank_in_context));
434 }
435
436 Self {
437 inner: Arc::new(BankWithSchedulerInner {
438 bank,
439 scheduler: RwLock::new(SchedulerStatus::new(scheduler)),
440 }),
441 }
442 }
443
444 pub fn new_without_scheduler(bank: Arc<Bank>) -> Self {
445 Self::new(bank, None)
446 }
447
448 pub fn clone_with_scheduler(&self) -> BankWithScheduler {
449 BankWithScheduler {
450 inner: self.inner.clone(),
451 }
452 }
453
454 pub fn clone_without_scheduler(&self) -> Arc<Bank> {
455 self.inner.bank.clone()
456 }
457
458 pub fn register_tick(&self, hash: &Hash) {
459 self.inner.bank.register_tick(hash, &self.inner.scheduler);
460 }
461
462 #[cfg(feature = "dev-context-only-utils")]
463 pub fn fill_bank_with_ticks_for_tests(&self) {
464 self.do_fill_bank_with_ticks_for_tests(&self.inner.scheduler);
465 }
466
467 pub fn has_installed_scheduler(&self) -> bool {
468 !matches!(
469 &*self.inner.scheduler.read().unwrap(),
470 SchedulerStatus::Unavailable
471 )
472 }
473
474 /// Schedule the transaction as long as the scheduler hasn't been aborted.
475 ///
476 /// If the scheduler has been aborted, this doesn't schedule the transaction, instead just
477 /// return the error of prior scheduled transaction.
478 ///
479 /// Calling this will panic if the installed scheduler is Unavailable (the bank is
480 /// wait_for_termination()-ed or the unified scheduler is disabled in the first place).
481 pub fn schedule_transaction_executions(
482 &self,
483 transactions_with_indexes: impl ExactSizeIterator<
484 Item = (RuntimeTransaction<SanitizedTransaction>, usize),
485 >,
486 ) -> Result<()> {
487 trace!(
488 "schedule_transaction_executions(): {} txs",
489 transactions_with_indexes.len()
490 );
491
492 let schedule_result: ScheduleResult = self.inner.with_active_scheduler(|scheduler| {
493 for (sanitized_transaction, index) in transactions_with_indexes {
494 scheduler.schedule_execution(sanitized_transaction, index)?;
495 }
496 Ok(())
497 });
498
499 if schedule_result.is_err() {
500 // This write lock isn't atomic with the above the read lock. So, another thread
501 // could have called .recover_error_after_abort() while we're literally stuck at
502 // the gaps of these locks (i.e. this comment in source code wise) under extreme
503 // race conditions. Thus, .recover_error_after_abort() is made idempotetnt for that
504 // consideration in mind.
505 //
506 // Lastly, this non-atomic nature is intentional for optimizing the fast code-path
507 return Err(self.inner.retrieve_error_after_schedule_failure());
508 }
509
510 Ok(())
511 }
512
513 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
514 pub(crate) fn create_timeout_listener(&self) -> TimeoutListener {
515 self.inner.do_create_timeout_listener()
516 }
517
518 // take needless &mut only to communicate its semantic mutability to humans...
519 #[cfg(feature = "dev-context-only-utils")]
520 pub fn drop_scheduler(&mut self) {
521 self.inner.drop_scheduler();
522 }
523
524 pub(crate) fn wait_for_paused_scheduler(bank: &Bank, scheduler: &InstalledSchedulerRwLock) {
525 let maybe_result_with_timings = BankWithSchedulerInner::wait_for_scheduler_termination(
526 bank,
527 scheduler,
528 WaitReason::PausedForRecentBlockhash,
529 );
530 assert!(
531 maybe_result_with_timings.is_none(),
532 "Premature result was returned from scheduler after paused (slot: {})",
533 bank.slot(),
534 );
535 }
536
537 #[must_use]
538 pub fn wait_for_completed_scheduler(&self) -> Option<ResultWithTimings> {
539 BankWithSchedulerInner::wait_for_scheduler_termination(
540 &self.inner.bank,
541 &self.inner.scheduler,
542 WaitReason::TerminatedToFreeze,
543 )
544 }
545
546 pub const fn no_scheduler_available() -> InstalledSchedulerRwLock {
547 RwLock::new(SchedulerStatus::Unavailable)
548 }
549}
550
551impl BankWithSchedulerInner {
552 fn with_active_scheduler(
553 self: &Arc<Self>,
554 f: impl FnOnce(&InstalledSchedulerBox) -> ScheduleResult,
555 ) -> ScheduleResult {
556 let scheduler = self.scheduler.read().unwrap();
557 match &*scheduler {
558 SchedulerStatus::Active(scheduler) => {
559 // This is the fast path, needing single read-lock most of time.
560 f(scheduler)
561 }
562 SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => {
563 trace!(
564 "with_active_scheduler: bank (slot: {}) has a stale aborted scheduler...",
565 self.bank.slot(),
566 );
567 Err(SchedulerAborted)
568 }
569 SchedulerStatus::Stale(pool, _result_with_timings) => {
570 let pool = pool.clone();
571 drop(scheduler);
572
573 let context = SchedulingContext::new(self.bank.clone());
574 let mut scheduler = self.scheduler.write().unwrap();
575 trace!("with_active_scheduler: {:?}", scheduler);
576 scheduler.transition_from_stale_to_active(|pool, result_with_timings| {
577 let scheduler = pool.take_resumed_scheduler(context, result_with_timings);
578 info!(
579 "with_active_scheduler: bank (slot: {}) got active, taking scheduler (id: {})",
580 self.bank.slot(),
581 scheduler.id(),
582 );
583 scheduler
584 });
585 drop(scheduler);
586
587 let scheduler = self.scheduler.read().unwrap();
588 // Re-register a new timeout listener only after acquiring the read lock;
589 // Otherwise, the listener would again put scheduler into Stale before the read
590 // lock under an extremely-rare race condition, causing panic below in
591 // active_scheduler().
592 pool.register_timeout_listener(self.do_create_timeout_listener());
593 f(scheduler.active_scheduler())
594 }
595 SchedulerStatus::Unavailable => unreachable!("no installed scheduler"),
596 }
597 }
598
599 fn do_create_timeout_listener(self: &Arc<Self>) -> TimeoutListener {
600 let weak_bank = Arc::downgrade(self);
601 TimeoutListener::new(move |pool| {
602 let Some(bank) = weak_bank.upgrade() else {
603 // BankWithSchedulerInner is already dropped, indicating successful and timely
604 // `wait_for_termination()` on the bank prior to this triggering of the timeout,
605 // rendering this callback invocation no-op.
606 return;
607 };
608
609 let Ok(mut scheduler) = bank.scheduler.write() else {
610 // BankWithScheduler's lock is poisoned...
611 return;
612 };
613
614 // Reaching here means that it's been awhile since this active scheduler is taken from
615 // the pool and yet it has yet to be `wait_for_termination()`-ed. To avoid unbounded
616 // thread creation under forky condition, return the scheduler for now, even if the
617 // bank could process more transactions later.
618 scheduler.maybe_transition_from_active_to_stale(|scheduler| {
619 // Return the installed scheduler back to the scheduler pool as soon as the
620 // scheduler indicates the completion of all currently-scheduled transaction
621 // executions by `solana_unified_scheduler_pool::ThreadManager::end_session()`
622 // internally.
623
624 let id = scheduler.id();
625 let (result_with_timings, uninstalled_scheduler) =
626 scheduler.wait_for_termination(false);
627 uninstalled_scheduler.return_to_pool();
628 info!(
629 "timeout_listener: bank (slot: {}) got stale, returning scheduler (id: {})",
630 bank.bank.slot(),
631 id,
632 );
633 (pool, result_with_timings)
634 });
635 trace!("timeout_listener: {:?}", scheduler);
636 })
637 }
638
639 /// This must not be called until `Err(SchedulerAborted)` is observed. Violating this should
640 /// `panic!()`.
641 fn retrieve_error_after_schedule_failure(&self) -> TransactionError {
642 let mut scheduler = self.scheduler.write().unwrap();
643 match &mut *scheduler {
644 SchedulerStatus::Active(scheduler) => scheduler.recover_error_after_abort(),
645 SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => {
646 result.clone().unwrap_err()
647 }
648 _ => unreachable!("no error in {:?}", self.scheduler),
649 }
650 }
651
652 #[must_use]
653 fn wait_for_completed_scheduler_from_drop(&self) -> Option<ResultWithTimings> {
654 Self::wait_for_scheduler_termination(
655 &self.bank,
656 &self.scheduler,
657 WaitReason::DroppedFromBankForks,
658 )
659 }
660
661 #[must_use]
662 fn wait_for_scheduler_termination(
663 bank: &Bank,
664 scheduler: &InstalledSchedulerRwLock,
665 reason: WaitReason,
666 ) -> Option<ResultWithTimings> {
667 debug!(
668 "wait_for_scheduler_termination(slot: {}, reason: {:?}): started at {:?}...",
669 bank.slot(),
670 reason,
671 thread::current(),
672 );
673
674 let mut scheduler = scheduler.write().unwrap();
675 let (was_noop, result_with_timings) = match &mut *scheduler {
676 SchedulerStatus::Active(scheduler) if reason.is_paused() => {
677 scheduler.pause_for_recent_blockhash();
678 (false, None)
679 }
680 SchedulerStatus::Active(_scheduler) => {
681 let scheduler = scheduler.transition_from_active_to_unavailable();
682 let (result_with_timings, uninstalled_scheduler) =
683 scheduler.wait_for_termination(reason.is_dropped());
684 uninstalled_scheduler.return_to_pool();
685 (false, Some(result_with_timings))
686 }
687 SchedulerStatus::Stale(_pool, _result_with_timings) if reason.is_paused() => {
688 // Do nothing for pauses because the scheduler termination is guaranteed to be
689 // called later.
690 (true, None)
691 }
692 SchedulerStatus::Stale(_pool, _result_with_timings) => {
693 let result_with_timings = scheduler.transition_from_stale_to_unavailable();
694 (true, Some(result_with_timings))
695 }
696 SchedulerStatus::Unavailable => (true, None),
697 };
698 debug!(
699 "wait_for_scheduler_termination(slot: {}, reason: {:?}): noop: {:?}, result: {:?} at {:?}...",
700 bank.slot(),
701 reason,
702 was_noop,
703 result_with_timings.as_ref().map(|(result, _)| result),
704 thread::current(),
705 );
706 trace!(
707 "wait_for_scheduler_termination(result_with_timings: {:?})",
708 result_with_timings,
709 );
710
711 result_with_timings
712 }
713
714 fn drop_scheduler(&self) {
715 if thread::panicking() {
716 error!(
717 "BankWithSchedulerInner::drop_scheduler(): slot: {} skipping due to already panicking...",
718 self.bank.slot(),
719 );
720 return;
721 }
722
723 // There's no guarantee ResultWithTimings is available or not at all when being dropped.
724 if let Some(Err(err)) = self
725 .wait_for_completed_scheduler_from_drop()
726 .map(|(result, _timings)| result)
727 {
728 warn!(
729 "BankWithSchedulerInner::drop_scheduler(): slot: {} discarding error from scheduler: {:?}",
730 self.bank.slot(),
731 err,
732 );
733 }
734 }
735}
736
737impl Drop for BankWithSchedulerInner {
738 fn drop(&mut self) {
739 self.drop_scheduler();
740 }
741}
742
743impl Deref for BankWithScheduler {
744 type Target = Arc<Bank>;
745
746 fn deref(&self) -> &Self::Target {
747 &self.inner.bank
748 }
749}
750
751#[cfg(test)]
752mod tests {
753 use {
754 super::*,
755 crate::{
756 bank::test_utils::goto_end_of_slot_with_scheduler,
757 genesis_utils::{create_genesis_config, GenesisConfigInfo},
758 },
759 assert_matches::assert_matches,
760 mockall::Sequence,
761 solana_sdk::system_transaction,
762 std::sync::Mutex,
763 };
764
765 fn setup_mocked_scheduler_with_extra(
766 bank: Arc<Bank>,
767 is_dropped_flags: impl Iterator<Item = bool>,
768 f: Option<impl Fn(&mut MockInstalledScheduler)>,
769 ) -> InstalledSchedulerBox {
770 let mut mock = MockInstalledScheduler::new();
771 let seq = Arc::new(Mutex::new(Sequence::new()));
772
773 mock.expect_context()
774 .times(1)
775 .in_sequence(&mut seq.lock().unwrap())
776 .return_const(SchedulingContext::new(bank));
777
778 for wait_reason in is_dropped_flags {
779 let seq_cloned = seq.clone();
780 mock.expect_wait_for_termination()
781 .with(mockall::predicate::eq(wait_reason))
782 .times(1)
783 .in_sequence(&mut seq.lock().unwrap())
784 .returning(move |_| {
785 let mut mock_uninstalled = MockUninstalledScheduler::new();
786 mock_uninstalled
787 .expect_return_to_pool()
788 .times(1)
789 .in_sequence(&mut seq_cloned.lock().unwrap())
790 .returning(|| ());
791 (
792 (Ok(()), ExecuteTimings::default()),
793 Box::new(mock_uninstalled),
794 )
795 });
796 }
797
798 if let Some(f) = f {
799 f(&mut mock);
800 }
801
802 Box::new(mock)
803 }
804
805 fn setup_mocked_scheduler(
806 bank: Arc<Bank>,
807 is_dropped_flags: impl Iterator<Item = bool>,
808 ) -> InstalledSchedulerBox {
809 setup_mocked_scheduler_with_extra(
810 bank,
811 is_dropped_flags,
812 None::<fn(&mut MockInstalledScheduler) -> ()>,
813 )
814 }
815
816 #[test]
817 fn test_scheduler_normal_termination() {
818 solana_logger::setup();
819
820 let bank = Arc::new(Bank::default_for_tests());
821 let bank = BankWithScheduler::new(
822 bank.clone(),
823 Some(setup_mocked_scheduler(bank, [false].into_iter())),
824 );
825 assert!(bank.has_installed_scheduler());
826 assert_matches!(bank.wait_for_completed_scheduler(), Some(_));
827
828 // Repeating to call wait_for_completed_scheduler() is okay with no ResultWithTimings being
829 // returned.
830 assert!(!bank.has_installed_scheduler());
831 assert_matches!(bank.wait_for_completed_scheduler(), None);
832 }
833
834 #[test]
835 fn test_no_scheduler_termination() {
836 solana_logger::setup();
837
838 let bank = Arc::new(Bank::default_for_tests());
839 let bank = BankWithScheduler::new_without_scheduler(bank);
840
841 // Calling wait_for_completed_scheduler() is noop, when no scheduler is installed.
842 assert!(!bank.has_installed_scheduler());
843 assert_matches!(bank.wait_for_completed_scheduler(), None);
844 }
845
846 #[test]
847 fn test_scheduler_termination_from_drop() {
848 solana_logger::setup();
849
850 let bank = Arc::new(Bank::default_for_tests());
851 let bank = BankWithScheduler::new(
852 bank.clone(),
853 Some(setup_mocked_scheduler(bank, [true].into_iter())),
854 );
855 drop(bank);
856 }
857
858 #[test]
859 fn test_scheduler_pause() {
860 solana_logger::setup();
861
862 let bank = Arc::new(crate::bank::tests::create_simple_test_bank(42));
863 let bank = BankWithScheduler::new(
864 bank.clone(),
865 Some(setup_mocked_scheduler_with_extra(
866 bank,
867 [false].into_iter(),
868 Some(|mocked: &mut MockInstalledScheduler| {
869 mocked
870 .expect_pause_for_recent_blockhash()
871 .times(1)
872 .returning(|| ());
873 }),
874 )),
875 );
876 goto_end_of_slot_with_scheduler(&bank);
877 assert_matches!(bank.wait_for_completed_scheduler(), Some(_));
878 }
879
880 fn do_test_schedule_execution(should_succeed: bool) {
881 solana_logger::setup();
882
883 let GenesisConfigInfo {
884 genesis_config,
885 mint_keypair,
886 ..
887 } = create_genesis_config(10_000);
888 let tx0 = RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer(
889 &mint_keypair,
890 &solana_pubkey::new_rand(),
891 2,
892 genesis_config.hash(),
893 ));
894 let bank = Arc::new(Bank::new_for_tests(&genesis_config));
895 let mocked_scheduler = setup_mocked_scheduler_with_extra(
896 bank.clone(),
897 [true].into_iter(),
898 Some(|mocked: &mut MockInstalledScheduler| {
899 if should_succeed {
900 mocked
901 .expect_schedule_execution()
902 .times(1)
903 .returning(|_, _| Ok(()));
904 } else {
905 mocked
906 .expect_schedule_execution()
907 .times(1)
908 .returning(|_, _| Err(SchedulerAborted));
909 mocked
910 .expect_recover_error_after_abort()
911 .times(1)
912 .returning(|| TransactionError::InsufficientFundsForFee);
913 }
914 }),
915 );
916
917 let bank = BankWithScheduler::new(bank, Some(mocked_scheduler));
918 let result = bank.schedule_transaction_executions([(tx0, 0)].into_iter());
919 if should_succeed {
920 assert_matches!(result, Ok(()));
921 } else {
922 assert_matches!(result, Err(TransactionError::InsufficientFundsForFee));
923 }
924 }
925
926 #[test]
927 fn test_schedule_execution_success() {
928 do_test_schedule_execution(true);
929 }
930
931 #[test]
932 fn test_schedule_execution_failure() {
933 do_test_schedule_execution(false);
934 }
935}