1#![allow(rustdoc::private_intra_doc_links)]
2use {
99 crate::utils::{ShortCounter, Token, TokenCell},
100 assert_matches::assert_matches,
101 solana_pubkey::Pubkey,
102 solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
103 solana_transaction::sanitized::SanitizedTransaction,
104 static_assertions::const_assert_eq,
105 std::{collections::VecDeque, mem, sync::Arc},
106};
107
108#[derive(Clone, Copy, Debug)]
109pub enum SchedulingMode {
110 BlockVerification,
111 BlockProduction,
112}
113
114mod utils {
116 use std::{
117 any::{self, TypeId},
118 cell::{RefCell, UnsafeCell},
119 collections::BTreeSet,
120 marker::PhantomData,
121 thread,
122 };
123
124 #[derive(Debug, Clone, Copy)]
128 pub(super) struct ShortCounter(u32);
129
130 impl ShortCounter {
131 pub(super) fn zero() -> Self {
132 Self(0)
133 }
134
135 pub(super) fn one() -> Self {
136 Self(1)
137 }
138
139 pub(super) fn is_one(&self) -> bool {
140 self.0 == 1
141 }
142
143 pub(super) fn is_zero(&self) -> bool {
144 self.0 == 0
145 }
146
147 pub(super) fn current(&self) -> u32 {
148 self.0
149 }
150
151 #[must_use]
152 pub(super) fn increment(self) -> Self {
153 Self(self.0.checked_add(1).unwrap())
154 }
155
156 #[must_use]
157 pub(super) fn decrement(self) -> Self {
158 Self(self.0.checked_sub(1).unwrap())
159 }
160
161 pub(super) fn increment_self(&mut self) -> &mut Self {
162 *self = self.increment();
163 self
164 }
165
166 pub(super) fn decrement_self(&mut self) -> &mut Self {
167 *self = self.decrement();
168 self
169 }
170
171 pub(super) fn reset_to_zero(&mut self) -> &mut Self {
172 self.0 = 0;
173 self
174 }
175 }
176
177 #[derive(Debug, Default)]
210 pub(super) struct TokenCell<V>(UnsafeCell<V>);
211
212 impl<V> TokenCell<V> {
213 pub(super) fn new(value: V) -> Self {
223 Self(UnsafeCell::new(value))
224 }
225
226 pub(super) fn with_borrow_mut<R>(
237 &self,
238 _token: &mut Token<V>,
239 f: impl FnOnce(&mut V) -> R,
240 ) -> R {
241 f(unsafe { &mut *self.0.get() })
242 }
243 }
244
245 unsafe impl<V> Sync for TokenCell<V> {}
253
254 pub(super) struct Token<V: 'static>(PhantomData<*mut V>);
261
262 impl<V> Token<V> {
263 #[must_use]
278 pub(super) unsafe fn assume_exclusive_mutating_thread() -> Self {
279 thread_local! {
280 static TOKENS: RefCell<BTreeSet<TypeId>> = const { RefCell::new(BTreeSet::new()) };
281 }
282 assert!(
285 TOKENS.with_borrow_mut(|tokens| tokens.insert(TypeId::of::<Self>())),
286 "{:?} is wrongly initialized twice on {:?}",
287 any::type_name::<Self>(),
288 thread::current()
289 );
290
291 Self(PhantomData)
292 }
293 }
294
295 #[cfg(test)]
296 mod tests {
297 use {
298 super::{Token, TokenCell},
299 std::{mem, sync::Arc, thread},
300 };
301
302 #[test]
303 #[should_panic(
304 expected = "\"solana_unified_scheduler_logic::utils::Token<usize>\" is wrongly \
305 initialized twice on Thread"
306 )]
307 fn test_second_creation_of_tokens_in_a_thread() {
308 unsafe {
309 let _ = Token::<usize>::assume_exclusive_mutating_thread();
310 let _ = Token::<usize>::assume_exclusive_mutating_thread();
311 }
312 }
313
314 #[derive(Debug)]
315 struct FakeQueue {
316 v: Vec<u8>,
317 }
318
319 #[test]
322 #[cfg_attr(miri, ignore)]
325 fn test_ub_illegally_created_multiple_tokens() {
326 let mut token1 = unsafe { mem::transmute::<(), Token<FakeQueue>>(()) };
328 let mut token2 = unsafe { mem::transmute::<(), Token<FakeQueue>>(()) };
329
330 let queue = TokenCell::new(FakeQueue {
331 v: Vec::with_capacity(20),
332 });
333 queue.with_borrow_mut(&mut token1, |queue_mut1| {
334 queue_mut1.v.push(1);
335 queue.with_borrow_mut(&mut token2, |queue_mut2| {
336 queue_mut2.v.push(2);
337 queue_mut1.v.push(3);
338 });
339 queue_mut1.v.push(4);
340 });
341
342 #[cfg(not(miri))]
344 dbg!(queue.0.into_inner());
345
346 }
349
350 #[test]
355 #[cfg_attr(miri, ignore)]
358 fn test_ub_illegally_shared_token_cell() {
359 let queue1 = Arc::new(TokenCell::new(FakeQueue {
360 v: Vec::with_capacity(20),
361 }));
362 let queue2 = queue1.clone();
363 #[cfg(not(miri))]
364 let queue3 = queue1.clone();
365
366 for _ in 0..10 {
369 let (queue1, queue2) = (queue1.clone(), queue2.clone());
370 let thread1 = thread::spawn(move || {
371 let mut token = unsafe { Token::assume_exclusive_mutating_thread() };
372 queue1.with_borrow_mut(&mut token, |queue| {
373 queue.v.push(3);
375 });
376 });
377 let thread2 = thread::spawn(move || {
380 let mut token = unsafe { Token::assume_exclusive_mutating_thread() };
381 queue2.with_borrow_mut(&mut token, |queue| {
382 queue.v.push(4);
384 });
385 });
386
387 thread1.join().unwrap();
388 thread2.join().unwrap();
389 }
390
391 #[cfg(not(miri))]
393 {
394 drop((queue1, queue2));
395 dbg!(Arc::into_inner(queue3).unwrap().0.into_inner());
396 }
397
398 }
401 }
402}
403
404type LockResult = Result<(), ()>;
407const_assert_eq!(mem::size_of::<LockResult>(), 1);
408
409pub type Task = Arc<TaskInner>;
411const_assert_eq!(mem::size_of::<Task>(), 8);
412
413type UsageQueueToken = Token<UsageQueueInner>;
415const_assert_eq!(mem::size_of::<UsageQueueToken>(), 0);
416
417type BlockedUsageCountToken = Token<ShortCounter>;
419const_assert_eq!(mem::size_of::<BlockedUsageCountToken>(), 0);
420
421#[derive(Debug)]
423pub struct TaskInner {
424 transaction: RuntimeTransaction<SanitizedTransaction>,
425 index: usize,
429 lock_contexts: Vec<LockContext>,
430 blocked_usage_count: TokenCell<ShortCounter>,
431}
432
433impl TaskInner {
434 pub fn task_index(&self) -> usize {
435 self.index
436 }
437
438 pub fn transaction(&self) -> &RuntimeTransaction<SanitizedTransaction> {
439 &self.transaction
440 }
441
442 fn lock_contexts(&self) -> &[LockContext] {
443 &self.lock_contexts
444 }
445
446 fn set_blocked_usage_count(&self, token: &mut BlockedUsageCountToken, count: ShortCounter) {
447 self.blocked_usage_count
448 .with_borrow_mut(token, |usage_count| {
449 *usage_count = count;
450 })
451 }
452
453 #[must_use]
454 fn try_unblock(self: Task, token: &mut BlockedUsageCountToken) -> Option<Task> {
455 let did_unblock = self
456 .blocked_usage_count
457 .with_borrow_mut(token, |usage_count| usage_count.decrement_self().is_zero());
458 did_unblock.then_some(self)
459 }
460}
461
462#[derive(Debug)]
465struct LockContext {
466 usage_queue: UsageQueue,
467 requested_usage: RequestedUsage,
468}
469const_assert_eq!(mem::size_of::<LockContext>(), 16);
470
471impl LockContext {
472 fn new(usage_queue: UsageQueue, requested_usage: RequestedUsage) -> Self {
473 Self {
474 usage_queue,
475 requested_usage,
476 }
477 }
478
479 fn with_usage_queue_mut<R>(
480 &self,
481 usage_queue_token: &mut UsageQueueToken,
482 f: impl FnOnce(&mut UsageQueueInner) -> R,
483 ) -> R {
484 self.usage_queue.0.with_borrow_mut(usage_queue_token, f)
485 }
486}
487
488#[derive(Copy, Clone, Debug)]
490enum Usage {
491 Readonly(ShortCounter),
492 Writable,
493}
494const_assert_eq!(mem::size_of::<Usage>(), 8);
495
496impl From<RequestedUsage> for Usage {
497 fn from(requested_usage: RequestedUsage) -> Self {
498 match requested_usage {
499 RequestedUsage::Readonly => Usage::Readonly(ShortCounter::one()),
500 RequestedUsage::Writable => Usage::Writable,
501 }
502 }
503}
504
505#[derive(Clone, Copy, Debug)]
507enum RequestedUsage {
508 Readonly,
509 Writable,
510}
511
512#[derive(Debug)]
518struct UsageQueueInner {
519 current_usage: Option<Usage>,
520 blocked_usages_from_tasks: VecDeque<UsageFromTask>,
521}
522
523type UsageFromTask = (RequestedUsage, Task);
524
525impl Default for UsageQueueInner {
526 fn default() -> Self {
527 Self {
528 current_usage: None,
529 blocked_usages_from_tasks: VecDeque::with_capacity(128),
539 }
540 }
541}
542
543impl UsageQueueInner {
544 fn try_lock(&mut self, requested_usage: RequestedUsage) -> LockResult {
545 match self.current_usage {
546 None => Some(Usage::from(requested_usage)),
547 Some(Usage::Readonly(count)) => match requested_usage {
548 RequestedUsage::Readonly => Some(Usage::Readonly(count.increment())),
549 RequestedUsage::Writable => None,
550 },
551 Some(Usage::Writable) => None,
552 }
553 .inspect(|&new_usage| {
554 self.current_usage = Some(new_usage);
555 })
556 .map(|_| ())
557 .ok_or(())
558 }
559
560 #[must_use]
561 fn unlock(&mut self, requested_usage: RequestedUsage) -> Option<UsageFromTask> {
562 let mut is_unused_now = false;
563 match &mut self.current_usage {
564 Some(Usage::Readonly(ref mut count)) => match requested_usage {
565 RequestedUsage::Readonly => {
566 if count.is_one() {
567 is_unused_now = true;
568 } else {
569 count.decrement_self();
570 }
571 }
572 RequestedUsage::Writable => unreachable!(),
573 },
574 Some(Usage::Writable) => match requested_usage {
575 RequestedUsage::Writable => {
576 is_unused_now = true;
577 }
578 RequestedUsage::Readonly => unreachable!(),
579 },
580 None => unreachable!(),
581 }
582
583 if is_unused_now {
584 self.current_usage = None;
585 self.blocked_usages_from_tasks.pop_front()
586 } else {
587 None
588 }
589 }
590
591 fn push_blocked_usage_from_task(&mut self, usage_from_task: UsageFromTask) {
592 assert_matches!(self.current_usage, Some(_));
593 self.blocked_usages_from_tasks.push_back(usage_from_task);
594 }
595
596 #[must_use]
597 fn pop_unblocked_readonly_usage_from_task(&mut self) -> Option<UsageFromTask> {
598 if matches!(
599 self.blocked_usages_from_tasks.front(),
600 Some((RequestedUsage::Readonly, _))
601 ) {
602 assert_matches!(self.current_usage, Some(Usage::Readonly(_)));
603 self.blocked_usages_from_tasks.pop_front()
604 } else {
605 None
606 }
607 }
608
609 fn has_no_blocked_usage(&self) -> bool {
610 self.blocked_usages_from_tasks.is_empty()
611 }
612}
613
614const_assert_eq!(mem::size_of::<TokenCell<UsageQueueInner>>(), 40);
615
616#[derive(Debug, Clone, Default)]
620pub struct UsageQueue(Arc<TokenCell<UsageQueueInner>>);
621const_assert_eq!(mem::size_of::<UsageQueue>(), 8);
622
623pub struct SchedulingStateMachine {
626 unblocked_task_queue: VecDeque<Task>,
627 active_task_count: ShortCounter,
628 handled_task_count: ShortCounter,
629 unblocked_task_count: ShortCounter,
630 total_task_count: ShortCounter,
631 count_token: BlockedUsageCountToken,
632 usage_queue_token: UsageQueueToken,
633}
634const_assert_eq!(mem::size_of::<SchedulingStateMachine>(), 48);
635
636impl SchedulingStateMachine {
637 pub fn has_no_active_task(&self) -> bool {
638 self.active_task_count.is_zero()
639 }
640
641 pub fn has_unblocked_task(&self) -> bool {
642 !self.unblocked_task_queue.is_empty()
643 }
644
645 pub fn unblocked_task_queue_count(&self) -> usize {
646 self.unblocked_task_queue.len()
647 }
648
649 pub fn active_task_count(&self) -> u32 {
650 self.active_task_count.current()
651 }
652
653 pub fn handled_task_count(&self) -> u32 {
654 self.handled_task_count.current()
655 }
656
657 pub fn unblocked_task_count(&self) -> u32 {
658 self.unblocked_task_count.current()
659 }
660
661 pub fn total_task_count(&self) -> u32 {
662 self.total_task_count.current()
663 }
664
665 #[must_use]
672 pub fn schedule_task(&mut self, task: Task) -> Option<Task> {
673 self.total_task_count.increment_self();
674 self.active_task_count.increment_self();
675 self.try_lock_usage_queues(task)
676 }
677
678 #[must_use]
679 pub fn schedule_next_unblocked_task(&mut self) -> Option<Task> {
680 self.unblocked_task_queue.pop_front().inspect(|_| {
681 self.unblocked_task_count.increment_self();
682 })
683 }
684
685 pub fn deschedule_task(&mut self, task: &Task) {
696 self.active_task_count.decrement_self();
697 self.handled_task_count.increment_self();
698 self.unlock_usage_queues(task);
699 }
700
701 #[must_use]
702 fn try_lock_usage_queues(&mut self, task: Task) -> Option<Task> {
703 let mut blocked_usage_count = ShortCounter::zero();
704
705 for context in task.lock_contexts() {
706 context.with_usage_queue_mut(&mut self.usage_queue_token, |usage_queue| {
707 let lock_result = if usage_queue.has_no_blocked_usage() {
708 usage_queue.try_lock(context.requested_usage)
709 } else {
710 LockResult::Err(())
711 };
712 if let Err(()) = lock_result {
713 blocked_usage_count.increment_self();
714 let usage_from_task = (context.requested_usage, task.clone());
715 usage_queue.push_blocked_usage_from_task(usage_from_task);
716 }
717 });
718 }
719
720 if blocked_usage_count.is_zero() {
722 Some(task)
723 } else {
724 task.set_blocked_usage_count(&mut self.count_token, blocked_usage_count);
725 None
726 }
727 }
728
729 fn unlock_usage_queues(&mut self, task: &Task) {
730 for context in task.lock_contexts() {
731 context.with_usage_queue_mut(&mut self.usage_queue_token, |usage_queue| {
732 let mut unblocked_task_from_queue = usage_queue.unlock(context.requested_usage);
733
734 while let Some((requested_usage, task_with_unblocked_queue)) =
735 unblocked_task_from_queue
736 {
737 if let Some(task) = task_with_unblocked_queue.try_unblock(&mut self.count_token)
743 {
744 self.unblocked_task_queue.push_back(task);
745 }
746
747 match usage_queue.try_lock(requested_usage) {
748 LockResult::Ok(()) => {
749 unblocked_task_from_queue =
752 if matches!(requested_usage, RequestedUsage::Readonly) {
753 usage_queue.pop_unblocked_readonly_usage_from_task()
754 } else {
755 None
756 };
757 }
758 LockResult::Err(()) => panic!("should never fail in this context"),
759 }
760 }
761 });
762 }
763 }
764
765 pub fn create_task(
779 transaction: RuntimeTransaction<SanitizedTransaction>,
780 index: usize,
781 usage_queue_loader: &mut impl FnMut(Pubkey) -> UsageQueue,
782 ) -> Task {
783 let lock_contexts = transaction
815 .message()
816 .account_keys()
817 .iter()
818 .enumerate()
819 .map(|(index, address)| {
820 LockContext::new(
821 usage_queue_loader(*address),
822 if transaction.message().is_writable(index) {
823 RequestedUsage::Writable
824 } else {
825 RequestedUsage::Readonly
826 },
827 )
828 })
829 .collect();
830
831 Task::new(TaskInner {
832 transaction,
833 index,
834 lock_contexts,
835 blocked_usage_count: TokenCell::new(ShortCounter::zero()),
836 })
837 }
838
839 pub fn reinitialize(&mut self) {
850 assert!(self.has_no_active_task());
851 assert_eq!(self.unblocked_task_queue.len(), 0);
852 let Self {
854 unblocked_task_queue: _,
855 active_task_count,
856 handled_task_count,
857 unblocked_task_count,
858 total_task_count,
859 count_token: _,
860 usage_queue_token: _,
861 } = self;
863 active_task_count.reset_to_zero();
864 handled_task_count.reset_to_zero();
865 unblocked_task_count.reset_to_zero();
866 total_task_count.reset_to_zero();
867 }
868
869 #[must_use]
875 pub unsafe fn exclusively_initialize_current_thread_for_scheduling() -> Self {
876 Self {
877 unblocked_task_queue: VecDeque::with_capacity(1024),
880 active_task_count: ShortCounter::zero(),
881 handled_task_count: ShortCounter::zero(),
882 unblocked_task_count: ShortCounter::zero(),
883 total_task_count: ShortCounter::zero(),
884 count_token: unsafe { BlockedUsageCountToken::assume_exclusive_mutating_thread() },
885 usage_queue_token: unsafe { UsageQueueToken::assume_exclusive_mutating_thread() },
886 }
887 }
888}
889
890#[cfg(test)]
891mod tests {
892 use {
893 super::*,
894 solana_instruction::{AccountMeta, Instruction},
895 solana_message::Message,
896 solana_pubkey::Pubkey,
897 solana_transaction::{sanitized::SanitizedTransaction, Transaction},
898 std::{cell::RefCell, collections::HashMap, rc::Rc},
899 };
900
901 fn simplest_transaction() -> RuntimeTransaction<SanitizedTransaction> {
902 let message = Message::new(&[], Some(&Pubkey::new_unique()));
903 let unsigned = Transaction::new_unsigned(message);
904 RuntimeTransaction::from_transaction_for_tests(unsigned)
905 }
906
907 fn transaction_with_readonly_address(
908 address: Pubkey,
909 ) -> RuntimeTransaction<SanitizedTransaction> {
910 let instruction = Instruction {
911 program_id: Pubkey::default(),
912 accounts: vec![AccountMeta::new_readonly(address, false)],
913 data: vec![],
914 };
915 let message = Message::new(&[instruction], Some(&Pubkey::new_unique()));
916 let unsigned = Transaction::new_unsigned(message);
917 RuntimeTransaction::from_transaction_for_tests(unsigned)
918 }
919
920 fn transaction_with_writable_address(
921 address: Pubkey,
922 ) -> RuntimeTransaction<SanitizedTransaction> {
923 let instruction = Instruction {
924 program_id: Pubkey::default(),
925 accounts: vec![AccountMeta::new(address, false)],
926 data: vec![],
927 };
928 let message = Message::new(&[instruction], Some(&Pubkey::new_unique()));
929 let unsigned = Transaction::new_unsigned(message);
930 RuntimeTransaction::from_transaction_for_tests(unsigned)
931 }
932
933 fn create_address_loader(
934 usage_queues: Option<Rc<RefCell<HashMap<Pubkey, UsageQueue>>>>,
935 ) -> impl FnMut(Pubkey) -> UsageQueue {
936 let usage_queues = usage_queues.unwrap_or_default();
937 move |address| {
938 usage_queues
939 .borrow_mut()
940 .entry(address)
941 .or_default()
942 .clone()
943 }
944 }
945
946 #[test]
947 fn test_scheduling_state_machine_creation() {
948 let state_machine = unsafe {
949 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
950 };
951 assert_eq!(state_machine.active_task_count(), 0);
952 assert_eq!(state_machine.total_task_count(), 0);
953 assert!(state_machine.has_no_active_task());
954 }
955
956 #[test]
957 fn test_scheduling_state_machine_good_reinitialization() {
958 let mut state_machine = unsafe {
959 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
960 };
961 state_machine.total_task_count.increment_self();
962 assert_eq!(state_machine.total_task_count(), 1);
963 state_machine.reinitialize();
964 assert_eq!(state_machine.total_task_count(), 0);
965 }
966
967 #[test]
968 #[should_panic(expected = "assertion failed: self.has_no_active_task()")]
969 fn test_scheduling_state_machine_bad_reinitialization() {
970 let mut state_machine = unsafe {
971 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
972 };
973 let address_loader = &mut create_address_loader(None);
974 let task = SchedulingStateMachine::create_task(simplest_transaction(), 3, address_loader);
975 state_machine.schedule_task(task).unwrap();
976 state_machine.reinitialize();
977 }
978
979 #[test]
980 fn test_create_task() {
981 let sanitized = simplest_transaction();
982 let signature = *sanitized.signature();
983 let task =
984 SchedulingStateMachine::create_task(sanitized, 3, &mut |_| UsageQueue::default());
985 assert_eq!(task.task_index(), 3);
986 assert_eq!(task.transaction().signature(), &signature);
987 }
988
989 #[test]
990 fn test_non_conflicting_task_related_counts() {
991 let sanitized = simplest_transaction();
992 let address_loader = &mut create_address_loader(None);
993 let task = SchedulingStateMachine::create_task(sanitized, 3, address_loader);
994
995 let mut state_machine = unsafe {
996 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
997 };
998 let task = state_machine.schedule_task(task).unwrap();
999 assert_eq!(state_machine.active_task_count(), 1);
1000 assert_eq!(state_machine.total_task_count(), 1);
1001 state_machine.deschedule_task(&task);
1002 assert_eq!(state_machine.active_task_count(), 0);
1003 assert_eq!(state_machine.total_task_count(), 1);
1004 assert!(state_machine.has_no_active_task());
1005 }
1006
1007 #[test]
1008 fn test_conflicting_task_related_counts() {
1009 let sanitized = simplest_transaction();
1010 let address_loader = &mut create_address_loader(None);
1011 let task1 = SchedulingStateMachine::create_task(sanitized.clone(), 101, address_loader);
1012 let task2 = SchedulingStateMachine::create_task(sanitized.clone(), 102, address_loader);
1013 let task3 = SchedulingStateMachine::create_task(sanitized.clone(), 103, address_loader);
1014
1015 let mut state_machine = unsafe {
1016 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1017 };
1018 assert_matches!(
1019 state_machine
1020 .schedule_task(task1.clone())
1021 .map(|t| t.task_index()),
1022 Some(101)
1023 );
1024 assert_matches!(state_machine.schedule_task(task2.clone()), None);
1025
1026 state_machine.deschedule_task(&task1);
1027 assert!(state_machine.has_unblocked_task());
1028 assert_eq!(state_machine.unblocked_task_queue_count(), 1);
1029
1030 assert_eq!(state_machine.unblocked_task_count(), 0);
1032 assert_eq!(
1033 state_machine
1034 .schedule_next_unblocked_task()
1035 .map(|t| t.task_index()),
1036 Some(102)
1037 );
1038 assert_eq!(state_machine.unblocked_task_count(), 1);
1039
1040 assert!(!state_machine.has_unblocked_task());
1043 assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1044 assert_eq!(state_machine.unblocked_task_count(), 1);
1045
1046 assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1047 state_machine.deschedule_task(&task2);
1048
1049 assert_matches!(
1050 state_machine
1051 .schedule_task(task3.clone())
1052 .map(|task| task.task_index()),
1053 Some(103)
1054 );
1055 state_machine.deschedule_task(&task3);
1056 assert!(state_machine.has_no_active_task());
1057 }
1058
1059 #[test]
1060 fn test_existing_blocking_task_then_newly_scheduled_task() {
1061 let sanitized = simplest_transaction();
1062 let address_loader = &mut create_address_loader(None);
1063 let task1 = SchedulingStateMachine::create_task(sanitized.clone(), 101, address_loader);
1064 let task2 = SchedulingStateMachine::create_task(sanitized.clone(), 102, address_loader);
1065 let task3 = SchedulingStateMachine::create_task(sanitized.clone(), 103, address_loader);
1066
1067 let mut state_machine = unsafe {
1068 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1069 };
1070 assert_matches!(
1071 state_machine
1072 .schedule_task(task1.clone())
1073 .map(|t| t.task_index()),
1074 Some(101)
1075 );
1076 assert_matches!(state_machine.schedule_task(task2.clone()), None);
1077
1078 assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1079 state_machine.deschedule_task(&task1);
1080 assert_eq!(state_machine.unblocked_task_queue_count(), 1);
1081
1082 assert_matches!(state_machine.schedule_task(task3.clone()), None);
1084
1085 assert_eq!(state_machine.unblocked_task_count(), 0);
1086 assert_matches!(
1087 state_machine
1088 .schedule_next_unblocked_task()
1089 .map(|t| t.task_index()),
1090 Some(102)
1091 );
1092 assert_eq!(state_machine.unblocked_task_count(), 1);
1093
1094 state_machine.deschedule_task(&task2);
1095
1096 assert_matches!(
1097 state_machine
1098 .schedule_next_unblocked_task()
1099 .map(|t| t.task_index()),
1100 Some(103)
1101 );
1102 assert_eq!(state_machine.unblocked_task_count(), 2);
1103
1104 state_machine.deschedule_task(&task3);
1105 assert!(state_machine.has_no_active_task());
1106 }
1107
1108 #[test]
1109 fn test_multiple_readonly_task_and_counts() {
1110 let conflicting_address = Pubkey::new_unique();
1111 let sanitized1 = transaction_with_readonly_address(conflicting_address);
1112 let sanitized2 = transaction_with_readonly_address(conflicting_address);
1113 let address_loader = &mut create_address_loader(None);
1114 let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1115 let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1116
1117 let mut state_machine = unsafe {
1118 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1119 };
1120 assert_matches!(
1122 state_machine
1123 .schedule_task(task1.clone())
1124 .map(|t| t.task_index()),
1125 Some(101)
1126 );
1127 assert_matches!(
1128 state_machine
1129 .schedule_task(task2.clone())
1130 .map(|t| t.task_index()),
1131 Some(102)
1132 );
1133
1134 assert_eq!(state_machine.active_task_count(), 2);
1135 assert_eq!(state_machine.handled_task_count(), 0);
1136 assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1137 state_machine.deschedule_task(&task1);
1138 assert_eq!(state_machine.active_task_count(), 1);
1139 assert_eq!(state_machine.handled_task_count(), 1);
1140 assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1141 state_machine.deschedule_task(&task2);
1142 assert_eq!(state_machine.active_task_count(), 0);
1143 assert_eq!(state_machine.handled_task_count(), 2);
1144 assert!(state_machine.has_no_active_task());
1145 }
1146
1147 #[test]
1148 fn test_all_blocking_readable_tasks_block_writable_task() {
1149 let conflicting_address = Pubkey::new_unique();
1150 let sanitized1 = transaction_with_readonly_address(conflicting_address);
1151 let sanitized2 = transaction_with_readonly_address(conflicting_address);
1152 let sanitized3 = transaction_with_writable_address(conflicting_address);
1153 let address_loader = &mut create_address_loader(None);
1154 let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1155 let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1156 let task3 = SchedulingStateMachine::create_task(sanitized3, 103, address_loader);
1157
1158 let mut state_machine = unsafe {
1159 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1160 };
1161 assert_matches!(
1162 state_machine
1163 .schedule_task(task1.clone())
1164 .map(|t| t.task_index()),
1165 Some(101)
1166 );
1167 assert_matches!(
1168 state_machine
1169 .schedule_task(task2.clone())
1170 .map(|t| t.task_index()),
1171 Some(102)
1172 );
1173 assert_matches!(state_machine.schedule_task(task3.clone()), None);
1174
1175 assert_eq!(state_machine.active_task_count(), 3);
1176 assert_eq!(state_machine.handled_task_count(), 0);
1177 assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1178 state_machine.deschedule_task(&task1);
1179 assert_eq!(state_machine.active_task_count(), 2);
1180 assert_eq!(state_machine.handled_task_count(), 1);
1181 assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1182 assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1183 state_machine.deschedule_task(&task2);
1184 assert_eq!(state_machine.active_task_count(), 1);
1185 assert_eq!(state_machine.handled_task_count(), 2);
1186 assert_eq!(state_machine.unblocked_task_queue_count(), 1);
1187 assert_matches!(
1189 state_machine
1190 .schedule_next_unblocked_task()
1191 .map(|t| t.task_index()),
1192 Some(103)
1193 );
1194 state_machine.deschedule_task(&task3);
1195 assert!(state_machine.has_no_active_task());
1196 }
1197
1198 #[test]
1199 fn test_readonly_then_writable_then_readonly_linearized() {
1200 let conflicting_address = Pubkey::new_unique();
1201 let sanitized1 = transaction_with_readonly_address(conflicting_address);
1202 let sanitized2 = transaction_with_writable_address(conflicting_address);
1203 let sanitized3 = transaction_with_readonly_address(conflicting_address);
1204 let address_loader = &mut create_address_loader(None);
1205 let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1206 let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1207 let task3 = SchedulingStateMachine::create_task(sanitized3, 103, address_loader);
1208
1209 let mut state_machine = unsafe {
1210 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1211 };
1212 assert_matches!(
1213 state_machine
1214 .schedule_task(task1.clone())
1215 .map(|t| t.task_index()),
1216 Some(101)
1217 );
1218 assert_matches!(state_machine.schedule_task(task2.clone()), None);
1219 assert_matches!(state_machine.schedule_task(task3.clone()), None);
1220
1221 assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1222 state_machine.deschedule_task(&task1);
1223 assert_matches!(
1224 state_machine
1225 .schedule_next_unblocked_task()
1226 .map(|t| t.task_index()),
1227 Some(102)
1228 );
1229 assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1230 state_machine.deschedule_task(&task2);
1231 assert_matches!(
1232 state_machine
1233 .schedule_next_unblocked_task()
1234 .map(|t| t.task_index()),
1235 Some(103)
1236 );
1237 assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1238 state_machine.deschedule_task(&task3);
1239 assert!(state_machine.has_no_active_task());
1240 }
1241
1242 #[test]
1243 fn test_readonly_then_writable() {
1244 let conflicting_address = Pubkey::new_unique();
1245 let sanitized1 = transaction_with_readonly_address(conflicting_address);
1246 let sanitized2 = transaction_with_writable_address(conflicting_address);
1247 let address_loader = &mut create_address_loader(None);
1248 let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1249 let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1250
1251 let mut state_machine = unsafe {
1252 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1253 };
1254 assert_matches!(
1255 state_machine
1256 .schedule_task(task1.clone())
1257 .map(|t| t.task_index()),
1258 Some(101)
1259 );
1260 assert_matches!(state_machine.schedule_task(task2.clone()), None);
1261
1262 state_machine.deschedule_task(&task1);
1264 assert_matches!(
1265 state_machine
1266 .schedule_next_unblocked_task()
1267 .map(|t| t.task_index()),
1268 Some(102)
1269 );
1270 state_machine.deschedule_task(&task2);
1271 assert!(state_machine.has_no_active_task());
1272 }
1273
1274 #[test]
1275 fn test_blocked_tasks_writable_2_readonly_then_writable() {
1276 let conflicting_address = Pubkey::new_unique();
1277 let sanitized1 = transaction_with_writable_address(conflicting_address);
1278 let sanitized2 = transaction_with_readonly_address(conflicting_address);
1279 let sanitized3 = transaction_with_readonly_address(conflicting_address);
1280 let sanitized4 = transaction_with_writable_address(conflicting_address);
1281 let address_loader = &mut create_address_loader(None);
1282 let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1283 let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1284 let task3 = SchedulingStateMachine::create_task(sanitized3, 103, address_loader);
1285 let task4 = SchedulingStateMachine::create_task(sanitized4, 104, address_loader);
1286
1287 let mut state_machine = unsafe {
1288 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1289 };
1290 assert_matches!(
1291 state_machine
1292 .schedule_task(task1.clone())
1293 .map(|t| t.task_index()),
1294 Some(101)
1295 );
1296 assert_matches!(state_machine.schedule_task(task2.clone()), None);
1297 assert_matches!(state_machine.schedule_task(task3.clone()), None);
1298 assert_matches!(state_machine.schedule_task(task4.clone()), None);
1299
1300 state_machine.deschedule_task(&task1);
1301 assert_matches!(
1302 state_machine
1303 .schedule_next_unblocked_task()
1304 .map(|t| t.task_index()),
1305 Some(102)
1306 );
1307 assert_matches!(
1308 state_machine
1309 .schedule_next_unblocked_task()
1310 .map(|t| t.task_index()),
1311 Some(103)
1312 );
1313 assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1316
1317 state_machine.deschedule_task(&task2);
1318 assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1320
1321 state_machine.deschedule_task(&task3);
1322 assert_matches!(
1324 state_machine
1325 .schedule_next_unblocked_task()
1326 .map(|t| t.task_index()),
1327 Some(104)
1328 );
1329 state_machine.deschedule_task(&task4);
1330 assert!(state_machine.has_no_active_task());
1331 }
1332
1333 #[test]
1334 fn test_gradual_locking() {
1335 let conflicting_address = Pubkey::new_unique();
1336 let sanitized1 = transaction_with_writable_address(conflicting_address);
1337 let sanitized2 = transaction_with_writable_address(conflicting_address);
1338 let usage_queues = Rc::new(RefCell::new(HashMap::new()));
1339 let address_loader = &mut create_address_loader(Some(usage_queues.clone()));
1340 let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1341 let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1342
1343 let mut state_machine = unsafe {
1344 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1345 };
1346 assert_matches!(
1347 state_machine
1348 .schedule_task(task1.clone())
1349 .map(|t| t.task_index()),
1350 Some(101)
1351 );
1352 assert_matches!(state_machine.schedule_task(task2.clone()), None);
1353 let usage_queues = usage_queues.borrow_mut();
1354 let usage_queue = usage_queues.get(&conflicting_address).unwrap();
1355 usage_queue
1356 .0
1357 .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1358 assert_matches!(usage_queue.current_usage, Some(Usage::Writable));
1359 });
1360 let fee_payer = task2.transaction().message().fee_payer();
1363 let usage_queue = usage_queues.get(fee_payer).unwrap();
1364 usage_queue
1365 .0
1366 .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1367 assert_matches!(usage_queue.current_usage, Some(Usage::Writable));
1368 });
1369 state_machine.deschedule_task(&task1);
1370 assert_matches!(
1371 state_machine
1372 .schedule_next_unblocked_task()
1373 .map(|t| t.task_index()),
1374 Some(102)
1375 );
1376 state_machine.deschedule_task(&task2);
1377 assert!(state_machine.has_no_active_task());
1378 }
1379
1380 #[test]
1381 #[should_panic(expected = "internal error: entered unreachable code")]
1382 fn test_unreachable_unlock_conditions1() {
1383 let mut state_machine = unsafe {
1384 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1385 };
1386 let usage_queue = UsageQueue::default();
1387 usage_queue
1388 .0
1389 .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1390 let _ = usage_queue.unlock(RequestedUsage::Writable);
1391 });
1392 }
1393
1394 #[test]
1395 #[should_panic(expected = "internal error: entered unreachable code")]
1396 fn test_unreachable_unlock_conditions2() {
1397 let mut state_machine = unsafe {
1398 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1399 };
1400 let usage_queue = UsageQueue::default();
1401 usage_queue
1402 .0
1403 .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1404 usage_queue.current_usage = Some(Usage::Writable);
1405 let _ = usage_queue.unlock(RequestedUsage::Readonly);
1406 });
1407 }
1408
1409 #[test]
1410 #[should_panic(expected = "internal error: entered unreachable code")]
1411 fn test_unreachable_unlock_conditions3() {
1412 let mut state_machine = unsafe {
1413 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1414 };
1415 let usage_queue = UsageQueue::default();
1416 usage_queue
1417 .0
1418 .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1419 usage_queue.current_usage = Some(Usage::Readonly(ShortCounter::one()));
1420 let _ = usage_queue.unlock(RequestedUsage::Writable);
1421 });
1422 }
1423}