1use {
24 crate::bank::Bank,
25 log::*,
26 solana_sdk::{
27 clock::Slot,
28 hash::Hash,
29 transaction::{Result, SanitizedTransaction, TransactionError},
30 },
31 solana_timings::ExecuteTimings,
32 std::{
33 fmt::{self, Debug},
34 mem,
35 ops::Deref,
36 sync::{Arc, RwLock},
37 thread,
38 },
39};
40#[cfg(feature = "dev-context-only-utils")]
41use {mockall::automock, qualifier_attr::qualifiers};
42
43pub fn initialized_result_with_timings() -> ResultWithTimings {
44 (Ok(()), ExecuteTimings::default())
45}
46
47pub trait InstalledSchedulerPool: Send + Sync + Debug {
48 fn take_scheduler(&self, context: SchedulingContext) -> InstalledSchedulerBox {
49 self.take_resumed_scheduler(context, initialized_result_with_timings())
50 }
51
52 fn take_resumed_scheduler(
53 &self,
54 context: SchedulingContext,
55 result_with_timings: ResultWithTimings,
56 ) -> InstalledSchedulerBox;
57
58 fn register_timeout_listener(&self, timeout_listener: TimeoutListener);
59}
60
61#[derive(Debug)]
62pub struct SchedulerAborted;
63pub type ScheduleResult = std::result::Result<(), SchedulerAborted>;
64
65pub struct TimeoutListener {
66 callback: Box<dyn FnOnce(InstalledSchedulerPoolArc) + Sync + Send>,
67}
68
69impl TimeoutListener {
70 pub(crate) fn new(f: impl FnOnce(InstalledSchedulerPoolArc) + Sync + Send + 'static) -> Self {
71 Self {
72 callback: Box::new(f),
73 }
74 }
75
76 pub fn trigger(self, pool: InstalledSchedulerPoolArc) {
77 (self.callback)(pool);
78 }
79}
80
81impl Debug for TimeoutListener {
82 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
83 write!(f, "TimeoutListener({self:p})")
84 }
85}
86
87#[cfg_attr(doc, aquamarine::aquamarine)]
88#[cfg_attr(feature = "dev-context-only-utils", automock)]
135#[cfg_attr(feature = "dev-context-only-utils", allow(unused_attributes))]
138pub trait InstalledScheduler: Send + Sync + Debug + 'static {
139 fn id(&self) -> SchedulerId;
140 fn context(&self) -> &SchedulingContext;
141
142 fn schedule_execution(&self, transaction: SanitizedTransaction, index: usize)
165 -> ScheduleResult;
166
167 fn recover_error_after_abort(&mut self) -> TransactionError;
176
177 fn wait_for_termination(
190 self: Box<Self>,
191 is_dropped: bool,
192 ) -> (ResultWithTimings, UninstalledSchedulerBox);
193
194 fn pause_for_recent_blockhash(&mut self);
201}
202
203#[cfg_attr(feature = "dev-context-only-utils", automock)]
204pub trait UninstalledScheduler: Send + Sync + Debug + 'static {
205 fn return_to_pool(self: Box<Self>);
206}
207
208pub type InstalledSchedulerBox = Box<dyn InstalledScheduler>;
209pub type UninstalledSchedulerBox = Box<dyn UninstalledScheduler>;
210
211pub type InstalledSchedulerPoolArc = Arc<dyn InstalledSchedulerPool>;
212
213pub type SchedulerId = u64;
214
215#[derive(Clone, Debug)]
225pub struct SchedulingContext {
226 bank: Arc<Bank>,
228}
229
230impl SchedulingContext {
231 pub fn new(bank: Arc<Bank>) -> Self {
232 Self { bank }
233 }
234
235 pub fn bank(&self) -> &Arc<Bank> {
236 &self.bank
237 }
238
239 pub fn slot(&self) -> Slot {
240 self.bank().slot()
241 }
242}
243
244pub type ResultWithTimings = (Result<()>, ExecuteTimings);
245
246#[derive(Debug, PartialEq, Eq, Clone, Copy)]
248enum WaitReason {
249 TerminatedToFreeze,
256 DroppedFromBankForks,
259 PausedForRecentBlockhash,
263}
264
265impl WaitReason {
266 pub fn is_paused(&self) -> bool {
267 match self {
270 WaitReason::PausedForRecentBlockhash => true,
271 WaitReason::TerminatedToFreeze | WaitReason::DroppedFromBankForks => false,
272 }
273 }
274
275 pub fn is_dropped(&self) -> bool {
276 match self {
279 WaitReason::DroppedFromBankForks => true,
280 WaitReason::TerminatedToFreeze | WaitReason::PausedForRecentBlockhash => false,
281 }
282 }
283}
284
285#[allow(clippy::large_enum_variant)]
286#[derive(Debug)]
287pub enum SchedulerStatus {
288 Unavailable,
293 Active(InstalledSchedulerBox),
297 Stale(InstalledSchedulerPoolArc, ResultWithTimings),
301}
302
303impl SchedulerStatus {
304 fn new(scheduler: Option<InstalledSchedulerBox>) -> Self {
305 match scheduler {
306 Some(scheduler) => SchedulerStatus::Active(scheduler),
307 None => SchedulerStatus::Unavailable,
308 }
309 }
310
311 fn transition_from_stale_to_active(
312 &mut self,
313 f: impl FnOnce(InstalledSchedulerPoolArc, ResultWithTimings) -> InstalledSchedulerBox,
314 ) {
315 let Self::Stale(pool, result_with_timings) = mem::replace(self, Self::Unavailable) else {
316 panic!("transition to Active failed: {self:?}");
317 };
318 *self = Self::Active(f(pool, result_with_timings));
319 }
320
321 fn maybe_transition_from_active_to_stale(
322 &mut self,
323 f: impl FnOnce(InstalledSchedulerBox) -> (InstalledSchedulerPoolArc, ResultWithTimings),
324 ) {
325 if !matches!(self, Self::Active(_scheduler)) {
326 return;
327 }
328 let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else {
329 unreachable!("not active: {self:?}");
330 };
331 let (pool, result_with_timings) = f(scheduler);
332 *self = Self::Stale(pool, result_with_timings);
333 }
334
335 fn transition_from_active_to_unavailable(&mut self) -> InstalledSchedulerBox {
336 let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else {
337 panic!("transition to Unavailable failed: {self:?}");
338 };
339 scheduler
340 }
341
342 fn transition_from_stale_to_unavailable(&mut self) -> ResultWithTimings {
343 let Self::Stale(_pool, result_with_timings) = mem::replace(self, Self::Unavailable) else {
344 panic!("transition to Unavailable failed: {self:?}");
345 };
346 result_with_timings
347 }
348
349 fn active_scheduler(&self) -> &InstalledSchedulerBox {
350 let SchedulerStatus::Active(active_scheduler) = self else {
351 panic!("not active: {self:?}");
352 };
353 active_scheduler
354 }
355}
356
357#[derive(Debug)]
375pub struct BankWithScheduler {
376 inner: Arc<BankWithSchedulerInner>,
377}
378
379#[derive(Debug)]
380pub struct BankWithSchedulerInner {
381 bank: Arc<Bank>,
382 scheduler: InstalledSchedulerRwLock,
383}
384pub type InstalledSchedulerRwLock = RwLock<SchedulerStatus>;
385
386impl BankWithScheduler {
387 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
388 pub(crate) fn new(bank: Arc<Bank>, scheduler: Option<InstalledSchedulerBox>) -> Self {
389 if let Some(bank_in_context) = scheduler
390 .as_ref()
391 .map(|scheduler| scheduler.context().bank())
392 {
393 assert!(Arc::ptr_eq(&bank, bank_in_context));
394 }
395
396 Self {
397 inner: Arc::new(BankWithSchedulerInner {
398 bank,
399 scheduler: RwLock::new(SchedulerStatus::new(scheduler)),
400 }),
401 }
402 }
403
404 pub fn new_without_scheduler(bank: Arc<Bank>) -> Self {
405 Self::new(bank, None)
406 }
407
408 pub fn clone_with_scheduler(&self) -> BankWithScheduler {
409 BankWithScheduler {
410 inner: self.inner.clone(),
411 }
412 }
413
414 pub fn clone_without_scheduler(&self) -> Arc<Bank> {
415 self.inner.bank.clone()
416 }
417
418 pub fn register_tick(&self, hash: &Hash) {
419 self.inner.bank.register_tick(hash, &self.inner.scheduler);
420 }
421
422 #[cfg(feature = "dev-context-only-utils")]
423 pub fn fill_bank_with_ticks_for_tests(&self) {
424 self.do_fill_bank_with_ticks_for_tests(&self.inner.scheduler);
425 }
426
427 pub fn has_installed_scheduler(&self) -> bool {
428 !matches!(
429 &*self.inner.scheduler.read().unwrap(),
430 SchedulerStatus::Unavailable
431 )
432 }
433
434 pub fn schedule_transaction_executions(
442 &self,
443 transactions_with_indexes: impl ExactSizeIterator<Item = (SanitizedTransaction, usize)>,
444 ) -> Result<()> {
445 trace!(
446 "schedule_transaction_executions(): {} txs",
447 transactions_with_indexes.len()
448 );
449
450 let schedule_result: ScheduleResult = self.inner.with_active_scheduler(|scheduler| {
451 for (sanitized_transaction, index) in transactions_with_indexes {
452 scheduler.schedule_execution(sanitized_transaction, index)?;
453 }
454 Ok(())
455 });
456
457 if schedule_result.is_err() {
458 return Err(self.inner.retrieve_error_after_schedule_failure());
466 }
467
468 Ok(())
469 }
470
471 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
472 pub(crate) fn create_timeout_listener(&self) -> TimeoutListener {
473 self.inner.do_create_timeout_listener()
474 }
475
476 #[cfg(feature = "dev-context-only-utils")]
478 pub fn drop_scheduler(&mut self) {
479 self.inner.drop_scheduler();
480 }
481
482 pub(crate) fn wait_for_paused_scheduler(bank: &Bank, scheduler: &InstalledSchedulerRwLock) {
483 let maybe_result_with_timings = BankWithSchedulerInner::wait_for_scheduler_termination(
484 bank,
485 scheduler,
486 WaitReason::PausedForRecentBlockhash,
487 );
488 assert!(
489 maybe_result_with_timings.is_none(),
490 "Premature result was returned from scheduler after paused (slot: {})",
491 bank.slot(),
492 );
493 }
494
495 #[must_use]
496 pub fn wait_for_completed_scheduler(&self) -> Option<ResultWithTimings> {
497 BankWithSchedulerInner::wait_for_scheduler_termination(
498 &self.inner.bank,
499 &self.inner.scheduler,
500 WaitReason::TerminatedToFreeze,
501 )
502 }
503
504 pub const fn no_scheduler_available() -> InstalledSchedulerRwLock {
505 RwLock::new(SchedulerStatus::Unavailable)
506 }
507}
508
509impl BankWithSchedulerInner {
510 fn with_active_scheduler(
511 self: &Arc<Self>,
512 f: impl FnOnce(&InstalledSchedulerBox) -> ScheduleResult,
513 ) -> ScheduleResult {
514 let scheduler = self.scheduler.read().unwrap();
515 match &*scheduler {
516 SchedulerStatus::Active(scheduler) => {
517 f(scheduler)
519 }
520 SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => {
521 trace!(
522 "with_active_scheduler: bank (slot: {}) has a stale aborted scheduler...",
523 self.bank.slot(),
524 );
525 Err(SchedulerAborted)
526 }
527 SchedulerStatus::Stale(pool, _result_with_timings) => {
528 let pool = pool.clone();
529 drop(scheduler);
530
531 let context = SchedulingContext::new(self.bank.clone());
532 let mut scheduler = self.scheduler.write().unwrap();
533 trace!("with_active_scheduler: {:?}", scheduler);
534 scheduler.transition_from_stale_to_active(|pool, result_with_timings| {
535 let scheduler = pool.take_resumed_scheduler(context, result_with_timings);
536 info!(
537 "with_active_scheduler: bank (slot: {}) got active, taking scheduler (id: {})",
538 self.bank.slot(),
539 scheduler.id(),
540 );
541 scheduler
542 });
543 drop(scheduler);
544
545 let scheduler = self.scheduler.read().unwrap();
546 pool.register_timeout_listener(self.do_create_timeout_listener());
551 f(scheduler.active_scheduler())
552 }
553 SchedulerStatus::Unavailable => unreachable!("no installed scheduler"),
554 }
555 }
556
557 fn do_create_timeout_listener(self: &Arc<Self>) -> TimeoutListener {
558 let weak_bank = Arc::downgrade(self);
559 TimeoutListener::new(move |pool| {
560 let Some(bank) = weak_bank.upgrade() else {
561 return;
562 };
563
564 let Ok(mut scheduler) = bank.scheduler.write() else {
565 return;
566 };
567
568 scheduler.maybe_transition_from_active_to_stale(|scheduler| {
569 let id = scheduler.id();
574 let (result_with_timings, uninstalled_scheduler) =
575 scheduler.wait_for_termination(false);
576 uninstalled_scheduler.return_to_pool();
577 info!(
578 "timeout_listener: bank (slot: {}) got stale, returning scheduler (id: {})",
579 bank.bank.slot(),
580 id,
581 );
582 (pool, result_with_timings)
583 });
584 trace!("timeout_listener: {:?}", scheduler);
585 })
586 }
587
588 fn retrieve_error_after_schedule_failure(&self) -> TransactionError {
591 let mut scheduler = self.scheduler.write().unwrap();
592 match &mut *scheduler {
593 SchedulerStatus::Active(scheduler) => scheduler.recover_error_after_abort(),
594 SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => {
595 result.clone().unwrap_err()
596 }
597 _ => unreachable!("no error in {:?}", self.scheduler),
598 }
599 }
600
601 #[must_use]
602 fn wait_for_completed_scheduler_from_drop(&self) -> Option<ResultWithTimings> {
603 Self::wait_for_scheduler_termination(
604 &self.bank,
605 &self.scheduler,
606 WaitReason::DroppedFromBankForks,
607 )
608 }
609
610 #[must_use]
611 fn wait_for_scheduler_termination(
612 bank: &Bank,
613 scheduler: &InstalledSchedulerRwLock,
614 reason: WaitReason,
615 ) -> Option<ResultWithTimings> {
616 debug!(
617 "wait_for_scheduler_termination(slot: {}, reason: {:?}): started at {:?}...",
618 bank.slot(),
619 reason,
620 thread::current(),
621 );
622
623 let mut scheduler = scheduler.write().unwrap();
624 let (was_noop, result_with_timings) = match &mut *scheduler {
625 SchedulerStatus::Active(scheduler) if reason.is_paused() => {
626 scheduler.pause_for_recent_blockhash();
627 (false, None)
628 }
629 SchedulerStatus::Active(_scheduler) => {
630 let scheduler = scheduler.transition_from_active_to_unavailable();
631 let (result_with_timings, uninstalled_scheduler) =
632 scheduler.wait_for_termination(reason.is_dropped());
633 uninstalled_scheduler.return_to_pool();
634 (false, Some(result_with_timings))
635 }
636 SchedulerStatus::Stale(_pool, _result_with_timings) if reason.is_paused() => {
637 (true, None)
640 }
641 SchedulerStatus::Stale(_pool, _result_with_timings) => {
642 let result_with_timings = scheduler.transition_from_stale_to_unavailable();
643 (true, Some(result_with_timings))
644 }
645 SchedulerStatus::Unavailable => (true, None),
646 };
647 debug!(
648 "wait_for_scheduler_termination(slot: {}, reason: {:?}): noop: {:?}, result: {:?} at {:?}...",
649 bank.slot(),
650 reason,
651 was_noop,
652 result_with_timings.as_ref().map(|(result, _)| result),
653 thread::current(),
654 );
655 trace!(
656 "wait_for_scheduler_termination(result_with_timings: {:?})",
657 result_with_timings,
658 );
659
660 result_with_timings
661 }
662
663 fn drop_scheduler(&self) {
664 if thread::panicking() {
665 error!(
666 "BankWithSchedulerInner::drop_scheduler(): slot: {} skipping due to already panicking...",
667 self.bank.slot(),
668 );
669 return;
670 }
671
672 if let Some(Err(err)) = self
674 .wait_for_completed_scheduler_from_drop()
675 .map(|(result, _timings)| result)
676 {
677 warn!(
678 "BankWithSchedulerInner::drop_scheduler(): slot: {} discarding error from scheduler: {:?}",
679 self.bank.slot(),
680 err,
681 );
682 }
683 }
684}
685
686impl Drop for BankWithSchedulerInner {
687 fn drop(&mut self) {
688 self.drop_scheduler();
689 }
690}
691
692impl Deref for BankWithScheduler {
693 type Target = Arc<Bank>;
694
695 fn deref(&self) -> &Self::Target {
696 &self.inner.bank
697 }
698}
699
700#[cfg(test)]
701mod tests {
702 use {
703 super::*,
704 crate::{
705 bank::test_utils::goto_end_of_slot_with_scheduler,
706 genesis_utils::{create_genesis_config, GenesisConfigInfo},
707 },
708 assert_matches::assert_matches,
709 mockall::Sequence,
710 solana_sdk::system_transaction,
711 std::sync::Mutex,
712 };
713
714 fn setup_mocked_scheduler_with_extra(
715 bank: Arc<Bank>,
716 is_dropped_flags: impl Iterator<Item = bool>,
717 f: Option<impl Fn(&mut MockInstalledScheduler)>,
718 ) -> InstalledSchedulerBox {
719 let mut mock = MockInstalledScheduler::new();
720 let seq = Arc::new(Mutex::new(Sequence::new()));
721
722 mock.expect_context()
723 .times(1)
724 .in_sequence(&mut seq.lock().unwrap())
725 .return_const(SchedulingContext::new(bank));
726
727 for wait_reason in is_dropped_flags {
728 let seq_cloned = seq.clone();
729 mock.expect_wait_for_termination()
730 .with(mockall::predicate::eq(wait_reason))
731 .times(1)
732 .in_sequence(&mut seq.lock().unwrap())
733 .returning(move |_| {
734 let mut mock_uninstalled = MockUninstalledScheduler::new();
735 mock_uninstalled
736 .expect_return_to_pool()
737 .times(1)
738 .in_sequence(&mut seq_cloned.lock().unwrap())
739 .returning(|| ());
740 (
741 (Ok(()), ExecuteTimings::default()),
742 Box::new(mock_uninstalled),
743 )
744 });
745 }
746
747 if let Some(f) = f {
748 f(&mut mock);
749 }
750
751 Box::new(mock)
752 }
753
754 fn setup_mocked_scheduler(
755 bank: Arc<Bank>,
756 is_dropped_flags: impl Iterator<Item = bool>,
757 ) -> InstalledSchedulerBox {
758 setup_mocked_scheduler_with_extra(
759 bank,
760 is_dropped_flags,
761 None::<fn(&mut MockInstalledScheduler) -> ()>,
762 )
763 }
764
765 #[test]
766 fn test_scheduler_normal_termination() {
767 solana_logger::setup();
768
769 let bank = Arc::new(Bank::default_for_tests());
770 let bank = BankWithScheduler::new(
771 bank.clone(),
772 Some(setup_mocked_scheduler(bank, [false].into_iter())),
773 );
774 assert!(bank.has_installed_scheduler());
775 assert_matches!(bank.wait_for_completed_scheduler(), Some(_));
776
777 assert!(!bank.has_installed_scheduler());
780 assert_matches!(bank.wait_for_completed_scheduler(), None);
781 }
782
783 #[test]
784 fn test_no_scheduler_termination() {
785 solana_logger::setup();
786
787 let bank = Arc::new(Bank::default_for_tests());
788 let bank = BankWithScheduler::new_without_scheduler(bank);
789
790 assert!(!bank.has_installed_scheduler());
792 assert_matches!(bank.wait_for_completed_scheduler(), None);
793 }
794
795 #[test]
796 fn test_scheduler_termination_from_drop() {
797 solana_logger::setup();
798
799 let bank = Arc::new(Bank::default_for_tests());
800 let bank = BankWithScheduler::new(
801 bank.clone(),
802 Some(setup_mocked_scheduler(bank, [true].into_iter())),
803 );
804 drop(bank);
805 }
806
807 #[test]
808 fn test_scheduler_pause() {
809 solana_logger::setup();
810
811 let bank = Arc::new(crate::bank::tests::create_simple_test_bank(42));
812 let bank = BankWithScheduler::new(
813 bank.clone(),
814 Some(setup_mocked_scheduler_with_extra(
815 bank,
816 [false].into_iter(),
817 Some(|mocked: &mut MockInstalledScheduler| {
818 mocked
819 .expect_pause_for_recent_blockhash()
820 .times(1)
821 .returning(|| ());
822 }),
823 )),
824 );
825 goto_end_of_slot_with_scheduler(&bank);
826 assert_matches!(bank.wait_for_completed_scheduler(), Some(_));
827 }
828
829 fn do_test_schedule_execution(should_succeed: bool) {
830 solana_logger::setup();
831
832 let GenesisConfigInfo {
833 genesis_config,
834 mint_keypair,
835 ..
836 } = create_genesis_config(10_000);
837 let tx0 = SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
838 &mint_keypair,
839 &solana_sdk::pubkey::new_rand(),
840 2,
841 genesis_config.hash(),
842 ));
843 let bank = Arc::new(Bank::new_for_tests(&genesis_config));
844 let mocked_scheduler = setup_mocked_scheduler_with_extra(
845 bank.clone(),
846 [true].into_iter(),
847 Some(|mocked: &mut MockInstalledScheduler| {
848 if should_succeed {
849 mocked
850 .expect_schedule_execution()
851 .times(1)
852 .returning(|_, _| Ok(()));
853 } else {
854 mocked
855 .expect_schedule_execution()
856 .times(1)
857 .returning(|_, _| Err(SchedulerAborted));
858 mocked
859 .expect_recover_error_after_abort()
860 .times(1)
861 .returning(|| TransactionError::InsufficientFundsForFee);
862 }
863 }),
864 );
865
866 let bank = BankWithScheduler::new(bank, Some(mocked_scheduler));
867 let result = bank.schedule_transaction_executions([(tx0, 0)].into_iter());
868 if should_succeed {
869 assert_matches!(result, Ok(()));
870 } else {
871 assert_matches!(result, Err(TransactionError::InsufficientFundsForFee));
872 }
873 }
874
875 #[test]
876 fn test_schedule_execution_success() {
877 do_test_schedule_execution(true);
878 }
879
880 #[test]
881 fn test_schedule_execution_failure() {
882 do_test_schedule_execution(false);
883 }
884}