1#![allow(rustdoc::private_intra_doc_links)]
2use {
99 crate::utils::{ShortCounter, Token, TokenCell},
100 assert_matches::assert_matches,
101 solana_sdk::{pubkey::Pubkey, transaction::SanitizedTransaction},
102 static_assertions::const_assert_eq,
103 std::{collections::VecDeque, mem, sync::Arc},
104};
105
106mod utils {
108 use std::{
109 any::{self, TypeId},
110 cell::{RefCell, UnsafeCell},
111 collections::BTreeSet,
112 marker::PhantomData,
113 thread,
114 };
115
116 #[derive(Debug, Clone, Copy)]
120 pub(super) struct ShortCounter(u32);
121
122 impl ShortCounter {
123 pub(super) fn zero() -> Self {
124 Self(0)
125 }
126
127 pub(super) fn one() -> Self {
128 Self(1)
129 }
130
131 pub(super) fn is_one(&self) -> bool {
132 self.0 == 1
133 }
134
135 pub(super) fn is_zero(&self) -> bool {
136 self.0 == 0
137 }
138
139 pub(super) fn current(&self) -> u32 {
140 self.0
141 }
142
143 #[must_use]
144 pub(super) fn increment(self) -> Self {
145 Self(self.0.checked_add(1).unwrap())
146 }
147
148 #[must_use]
149 pub(super) fn decrement(self) -> Self {
150 Self(self.0.checked_sub(1).unwrap())
151 }
152
153 pub(super) fn increment_self(&mut self) -> &mut Self {
154 *self = self.increment();
155 self
156 }
157
158 pub(super) fn decrement_self(&mut self) -> &mut Self {
159 *self = self.decrement();
160 self
161 }
162
163 pub(super) fn reset_to_zero(&mut self) -> &mut Self {
164 self.0 = 0;
165 self
166 }
167 }
168
169 #[derive(Debug, Default)]
202 pub(super) struct TokenCell<V>(UnsafeCell<V>);
203
204 impl<V> TokenCell<V> {
205 pub(super) fn new(value: V) -> Self {
215 Self(UnsafeCell::new(value))
216 }
217
218 pub(super) fn with_borrow_mut<R>(
229 &self,
230 _token: &mut Token<V>,
231 f: impl FnOnce(&mut V) -> R,
232 ) -> R {
233 f(unsafe { &mut *self.0.get() })
234 }
235 }
236
237 unsafe impl<V> Sync for TokenCell<V> {}
245
246 pub(super) struct Token<V: 'static>(PhantomData<*mut V>);
253
254 impl<V> Token<V> {
255 #[must_use]
270 pub(super) unsafe fn assume_exclusive_mutating_thread() -> Self {
271 thread_local! {
272 static TOKENS: RefCell<BTreeSet<TypeId>> = const { RefCell::new(BTreeSet::new()) };
273 }
274 assert!(
277 TOKENS.with_borrow_mut(|tokens| tokens.insert(TypeId::of::<Self>())),
278 "{:?} is wrongly initialized twice on {:?}",
279 any::type_name::<Self>(),
280 thread::current()
281 );
282
283 Self(PhantomData)
284 }
285 }
286
287 #[cfg(test)]
288 mod tests {
289 use {
290 super::{Token, TokenCell},
291 std::{mem, sync::Arc, thread},
292 };
293
294 #[test]
295 #[should_panic(
296 expected = "\"solana_unified_scheduler_logic::utils::Token<usize>\" is wrongly \
297 initialized twice on Thread"
298 )]
299 fn test_second_creation_of_tokens_in_a_thread() {
300 unsafe {
301 let _ = Token::<usize>::assume_exclusive_mutating_thread();
302 let _ = Token::<usize>::assume_exclusive_mutating_thread();
303 }
304 }
305
306 #[derive(Debug)]
307 struct FakeQueue {
308 v: Vec<u8>,
309 }
310
311 #[test]
314 #[cfg_attr(miri, ignore)]
317 fn test_ub_illegally_created_multiple_tokens() {
318 let mut token1 = unsafe { mem::transmute::<(), Token<FakeQueue>>(()) };
320 let mut token2 = unsafe { mem::transmute::<(), Token<FakeQueue>>(()) };
321
322 let queue = TokenCell::new(FakeQueue {
323 v: Vec::with_capacity(20),
324 });
325 queue.with_borrow_mut(&mut token1, |queue_mut1| {
326 queue_mut1.v.push(1);
327 queue.with_borrow_mut(&mut token2, |queue_mut2| {
328 queue_mut2.v.push(2);
329 queue_mut1.v.push(3);
330 });
331 queue_mut1.v.push(4);
332 });
333
334 #[cfg(not(miri))]
336 dbg!(queue.0.into_inner());
337
338 }
341
342 #[test]
347 #[cfg_attr(miri, ignore)]
350 fn test_ub_illegally_shared_token_cell() {
351 let queue1 = Arc::new(TokenCell::new(FakeQueue {
352 v: Vec::with_capacity(20),
353 }));
354 let queue2 = queue1.clone();
355 #[cfg(not(miri))]
356 let queue3 = queue1.clone();
357
358 for _ in 0..10 {
361 let (queue1, queue2) = (queue1.clone(), queue2.clone());
362 let thread1 = thread::spawn(move || {
363 let mut token = unsafe { Token::assume_exclusive_mutating_thread() };
364 queue1.with_borrow_mut(&mut token, |queue| {
365 queue.v.push(3);
367 });
368 });
369 let thread2 = thread::spawn(move || {
372 let mut token = unsafe { Token::assume_exclusive_mutating_thread() };
373 queue2.with_borrow_mut(&mut token, |queue| {
374 queue.v.push(4);
376 });
377 });
378
379 thread1.join().unwrap();
380 thread2.join().unwrap();
381 }
382
383 #[cfg(not(miri))]
385 {
386 drop((queue1, queue2));
387 dbg!(Arc::into_inner(queue3).unwrap().0.into_inner());
388 }
389
390 }
393 }
394}
395
396type LockResult = Result<(), ()>;
399const_assert_eq!(mem::size_of::<LockResult>(), 1);
400
401pub type Task = Arc<TaskInner>;
403const_assert_eq!(mem::size_of::<Task>(), 8);
404
405type UsageQueueToken = Token<UsageQueueInner>;
407const_assert_eq!(mem::size_of::<UsageQueueToken>(), 0);
408
409type BlockedUsageCountToken = Token<ShortCounter>;
411const_assert_eq!(mem::size_of::<BlockedUsageCountToken>(), 0);
412
413#[derive(Debug)]
415pub struct TaskInner {
416 transaction: SanitizedTransaction,
417 index: usize,
421 lock_contexts: Vec<LockContext>,
422 blocked_usage_count: TokenCell<ShortCounter>,
423}
424
425impl TaskInner {
426 pub fn task_index(&self) -> usize {
427 self.index
428 }
429
430 pub fn transaction(&self) -> &SanitizedTransaction {
431 &self.transaction
432 }
433
434 fn lock_contexts(&self) -> &[LockContext] {
435 &self.lock_contexts
436 }
437
438 fn set_blocked_usage_count(&self, token: &mut BlockedUsageCountToken, count: ShortCounter) {
439 self.blocked_usage_count
440 .with_borrow_mut(token, |usage_count| {
441 *usage_count = count;
442 })
443 }
444
445 #[must_use]
446 fn try_unblock(self: Task, token: &mut BlockedUsageCountToken) -> Option<Task> {
447 let did_unblock = self
448 .blocked_usage_count
449 .with_borrow_mut(token, |usage_count| usage_count.decrement_self().is_zero());
450 did_unblock.then_some(self)
451 }
452}
453
454#[derive(Debug)]
457struct LockContext {
458 usage_queue: UsageQueue,
459 requested_usage: RequestedUsage,
460}
461const_assert_eq!(mem::size_of::<LockContext>(), 16);
462
463impl LockContext {
464 fn new(usage_queue: UsageQueue, requested_usage: RequestedUsage) -> Self {
465 Self {
466 usage_queue,
467 requested_usage,
468 }
469 }
470
471 fn with_usage_queue_mut<R>(
472 &self,
473 usage_queue_token: &mut UsageQueueToken,
474 f: impl FnOnce(&mut UsageQueueInner) -> R,
475 ) -> R {
476 self.usage_queue.0.with_borrow_mut(usage_queue_token, f)
477 }
478}
479
480#[derive(Copy, Clone, Debug)]
482enum Usage {
483 Readonly(ShortCounter),
484 Writable,
485}
486const_assert_eq!(mem::size_of::<Usage>(), 8);
487
488impl From<RequestedUsage> for Usage {
489 fn from(requested_usage: RequestedUsage) -> Self {
490 match requested_usage {
491 RequestedUsage::Readonly => Usage::Readonly(ShortCounter::one()),
492 RequestedUsage::Writable => Usage::Writable,
493 }
494 }
495}
496
497#[derive(Clone, Copy, Debug)]
499enum RequestedUsage {
500 Readonly,
501 Writable,
502}
503
504#[derive(Debug)]
510struct UsageQueueInner {
511 current_usage: Option<Usage>,
512 blocked_usages_from_tasks: VecDeque<UsageFromTask>,
513}
514
515type UsageFromTask = (RequestedUsage, Task);
516
517impl Default for UsageQueueInner {
518 fn default() -> Self {
519 Self {
520 current_usage: None,
521 blocked_usages_from_tasks: VecDeque::with_capacity(128),
531 }
532 }
533}
534
535impl UsageQueueInner {
536 fn try_lock(&mut self, requested_usage: RequestedUsage) -> LockResult {
537 match self.current_usage {
538 None => Some(Usage::from(requested_usage)),
539 Some(Usage::Readonly(count)) => match requested_usage {
540 RequestedUsage::Readonly => Some(Usage::Readonly(count.increment())),
541 RequestedUsage::Writable => None,
542 },
543 Some(Usage::Writable) => None,
544 }
545 .inspect(|&new_usage| {
546 self.current_usage = Some(new_usage);
547 })
548 .map(|_| ())
549 .ok_or(())
550 }
551
552 #[must_use]
553 fn unlock(&mut self, requested_usage: RequestedUsage) -> Option<UsageFromTask> {
554 let mut is_unused_now = false;
555 match &mut self.current_usage {
556 Some(Usage::Readonly(ref mut count)) => match requested_usage {
557 RequestedUsage::Readonly => {
558 if count.is_one() {
559 is_unused_now = true;
560 } else {
561 count.decrement_self();
562 }
563 }
564 RequestedUsage::Writable => unreachable!(),
565 },
566 Some(Usage::Writable) => match requested_usage {
567 RequestedUsage::Writable => {
568 is_unused_now = true;
569 }
570 RequestedUsage::Readonly => unreachable!(),
571 },
572 None => unreachable!(),
573 }
574
575 if is_unused_now {
576 self.current_usage = None;
577 self.blocked_usages_from_tasks.pop_front()
578 } else {
579 None
580 }
581 }
582
583 fn push_blocked_usage_from_task(&mut self, usage_from_task: UsageFromTask) {
584 assert_matches!(self.current_usage, Some(_));
585 self.blocked_usages_from_tasks.push_back(usage_from_task);
586 }
587
588 #[must_use]
589 fn pop_unblocked_readonly_usage_from_task(&mut self) -> Option<UsageFromTask> {
590 if matches!(
591 self.blocked_usages_from_tasks.front(),
592 Some((RequestedUsage::Readonly, _))
593 ) {
594 assert_matches!(self.current_usage, Some(Usage::Readonly(_)));
595 self.blocked_usages_from_tasks.pop_front()
596 } else {
597 None
598 }
599 }
600
601 fn has_no_blocked_usage(&self) -> bool {
602 self.blocked_usages_from_tasks.is_empty()
603 }
604}
605
606const_assert_eq!(mem::size_of::<TokenCell<UsageQueueInner>>(), 40);
607
608#[derive(Debug, Clone, Default)]
612pub struct UsageQueue(Arc<TokenCell<UsageQueueInner>>);
613const_assert_eq!(mem::size_of::<UsageQueue>(), 8);
614
615pub struct SchedulingStateMachine {
618 unblocked_task_queue: VecDeque<Task>,
619 active_task_count: ShortCounter,
620 handled_task_count: ShortCounter,
621 unblocked_task_count: ShortCounter,
622 total_task_count: ShortCounter,
623 count_token: BlockedUsageCountToken,
624 usage_queue_token: UsageQueueToken,
625}
626const_assert_eq!(mem::size_of::<SchedulingStateMachine>(), 48);
627
628impl SchedulingStateMachine {
629 pub fn has_no_active_task(&self) -> bool {
630 self.active_task_count.is_zero()
631 }
632
633 pub fn has_unblocked_task(&self) -> bool {
634 !self.unblocked_task_queue.is_empty()
635 }
636
637 pub fn unblocked_task_queue_count(&self) -> usize {
638 self.unblocked_task_queue.len()
639 }
640
641 pub fn active_task_count(&self) -> u32 {
642 self.active_task_count.current()
643 }
644
645 pub fn handled_task_count(&self) -> u32 {
646 self.handled_task_count.current()
647 }
648
649 pub fn unblocked_task_count(&self) -> u32 {
650 self.unblocked_task_count.current()
651 }
652
653 pub fn total_task_count(&self) -> u32 {
654 self.total_task_count.current()
655 }
656
657 #[must_use]
664 pub fn schedule_task(&mut self, task: Task) -> Option<Task> {
665 self.total_task_count.increment_self();
666 self.active_task_count.increment_self();
667 self.try_lock_usage_queues(task)
668 }
669
670 #[must_use]
671 pub fn schedule_next_unblocked_task(&mut self) -> Option<Task> {
672 self.unblocked_task_queue.pop_front().inspect(|_| {
673 self.unblocked_task_count.increment_self();
674 })
675 }
676
677 pub fn deschedule_task(&mut self, task: &Task) {
688 self.active_task_count.decrement_self();
689 self.handled_task_count.increment_self();
690 self.unlock_usage_queues(task);
691 }
692
693 #[must_use]
694 fn try_lock_usage_queues(&mut self, task: Task) -> Option<Task> {
695 let mut blocked_usage_count = ShortCounter::zero();
696
697 for context in task.lock_contexts() {
698 context.with_usage_queue_mut(&mut self.usage_queue_token, |usage_queue| {
699 let lock_result = if usage_queue.has_no_blocked_usage() {
700 usage_queue.try_lock(context.requested_usage)
701 } else {
702 LockResult::Err(())
703 };
704 if let Err(()) = lock_result {
705 blocked_usage_count.increment_self();
706 let usage_from_task = (context.requested_usage, task.clone());
707 usage_queue.push_blocked_usage_from_task(usage_from_task);
708 }
709 });
710 }
711
712 if blocked_usage_count.is_zero() {
714 Some(task)
715 } else {
716 task.set_blocked_usage_count(&mut self.count_token, blocked_usage_count);
717 None
718 }
719 }
720
721 fn unlock_usage_queues(&mut self, task: &Task) {
722 for context in task.lock_contexts() {
723 context.with_usage_queue_mut(&mut self.usage_queue_token, |usage_queue| {
724 let mut unblocked_task_from_queue = usage_queue.unlock(context.requested_usage);
725
726 while let Some((requested_usage, task_with_unblocked_queue)) =
727 unblocked_task_from_queue
728 {
729 if let Some(task) = task_with_unblocked_queue.try_unblock(&mut self.count_token)
735 {
736 self.unblocked_task_queue.push_back(task);
737 }
738
739 match usage_queue.try_lock(requested_usage) {
740 LockResult::Ok(()) => {
741 unblocked_task_from_queue =
744 if matches!(requested_usage, RequestedUsage::Readonly) {
745 usage_queue.pop_unblocked_readonly_usage_from_task()
746 } else {
747 None
748 };
749 }
750 LockResult::Err(()) => panic!("should never fail in this context"),
751 }
752 }
753 });
754 }
755 }
756
757 pub fn create_task(
771 transaction: SanitizedTransaction,
772 index: usize,
773 usage_queue_loader: &mut impl FnMut(Pubkey) -> UsageQueue,
774 ) -> Task {
775 let lock_contexts = transaction
807 .message()
808 .account_keys()
809 .iter()
810 .enumerate()
811 .map(|(index, address)| {
812 LockContext::new(
813 usage_queue_loader(*address),
814 if transaction.message().is_writable(index) {
815 RequestedUsage::Writable
816 } else {
817 RequestedUsage::Readonly
818 },
819 )
820 })
821 .collect();
822
823 Task::new(TaskInner {
824 transaction,
825 index,
826 lock_contexts,
827 blocked_usage_count: TokenCell::new(ShortCounter::zero()),
828 })
829 }
830
831 pub fn reinitialize(&mut self) {
842 assert!(self.has_no_active_task());
843 assert_eq!(self.unblocked_task_queue.len(), 0);
844 let Self {
846 unblocked_task_queue: _,
847 active_task_count,
848 handled_task_count,
849 unblocked_task_count,
850 total_task_count,
851 count_token: _,
852 usage_queue_token: _,
853 } = self;
855 active_task_count.reset_to_zero();
856 handled_task_count.reset_to_zero();
857 unblocked_task_count.reset_to_zero();
858 total_task_count.reset_to_zero();
859 }
860
861 #[must_use]
867 pub unsafe fn exclusively_initialize_current_thread_for_scheduling() -> Self {
868 Self {
869 unblocked_task_queue: VecDeque::with_capacity(1024),
872 active_task_count: ShortCounter::zero(),
873 handled_task_count: ShortCounter::zero(),
874 unblocked_task_count: ShortCounter::zero(),
875 total_task_count: ShortCounter::zero(),
876 count_token: unsafe { BlockedUsageCountToken::assume_exclusive_mutating_thread() },
877 usage_queue_token: unsafe { UsageQueueToken::assume_exclusive_mutating_thread() },
878 }
879 }
880}
881
882#[cfg(test)]
883mod tests {
884 use {
885 super::*,
886 solana_sdk::{
887 instruction::{AccountMeta, Instruction},
888 message::Message,
889 pubkey::Pubkey,
890 signature::Signer,
891 signer::keypair::Keypair,
892 transaction::{SanitizedTransaction, Transaction},
893 },
894 std::{cell::RefCell, collections::HashMap, rc::Rc},
895 };
896
897 fn simplest_transaction() -> SanitizedTransaction {
898 let payer = Keypair::new();
899 let message = Message::new(&[], Some(&payer.pubkey()));
900 let unsigned = Transaction::new_unsigned(message);
901 SanitizedTransaction::from_transaction_for_tests(unsigned)
902 }
903
904 fn transaction_with_readonly_address(address: Pubkey) -> SanitizedTransaction {
905 let instruction = Instruction {
906 program_id: Pubkey::default(),
907 accounts: vec![AccountMeta::new_readonly(address, false)],
908 data: vec![],
909 };
910 let message = Message::new(&[instruction], Some(&Pubkey::new_unique()));
911 let unsigned = Transaction::new_unsigned(message);
912 SanitizedTransaction::from_transaction_for_tests(unsigned)
913 }
914
915 fn transaction_with_writable_address(address: Pubkey) -> SanitizedTransaction {
916 let instruction = Instruction {
917 program_id: Pubkey::default(),
918 accounts: vec![AccountMeta::new(address, false)],
919 data: vec![],
920 };
921 let message = Message::new(&[instruction], Some(&Pubkey::new_unique()));
922 let unsigned = Transaction::new_unsigned(message);
923 SanitizedTransaction::from_transaction_for_tests(unsigned)
924 }
925
926 fn create_address_loader(
927 usage_queues: Option<Rc<RefCell<HashMap<Pubkey, UsageQueue>>>>,
928 ) -> impl FnMut(Pubkey) -> UsageQueue {
929 let usage_queues = usage_queues.unwrap_or_default();
930 move |address| {
931 usage_queues
932 .borrow_mut()
933 .entry(address)
934 .or_default()
935 .clone()
936 }
937 }
938
939 #[test]
940 fn test_scheduling_state_machine_creation() {
941 let state_machine = unsafe {
942 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
943 };
944 assert_eq!(state_machine.active_task_count(), 0);
945 assert_eq!(state_machine.total_task_count(), 0);
946 assert!(state_machine.has_no_active_task());
947 }
948
949 #[test]
950 fn test_scheduling_state_machine_good_reinitialization() {
951 let mut state_machine = unsafe {
952 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
953 };
954 state_machine.total_task_count.increment_self();
955 assert_eq!(state_machine.total_task_count(), 1);
956 state_machine.reinitialize();
957 assert_eq!(state_machine.total_task_count(), 0);
958 }
959
960 #[test]
961 #[should_panic(expected = "assertion failed: self.has_no_active_task()")]
962 fn test_scheduling_state_machine_bad_reinitialization() {
963 let mut state_machine = unsafe {
964 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
965 };
966 let address_loader = &mut create_address_loader(None);
967 let task = SchedulingStateMachine::create_task(simplest_transaction(), 3, address_loader);
968 state_machine.schedule_task(task).unwrap();
969 state_machine.reinitialize();
970 }
971
972 #[test]
973 fn test_create_task() {
974 let sanitized = simplest_transaction();
975 let task = SchedulingStateMachine::create_task(sanitized.clone(), 3, &mut |_| {
976 UsageQueue::default()
977 });
978 assert_eq!(task.task_index(), 3);
979 assert_eq!(task.transaction(), &sanitized);
980 }
981
982 #[test]
983 fn test_non_conflicting_task_related_counts() {
984 let sanitized = simplest_transaction();
985 let address_loader = &mut create_address_loader(None);
986 let task = SchedulingStateMachine::create_task(sanitized.clone(), 3, address_loader);
987
988 let mut state_machine = unsafe {
989 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
990 };
991 let task = state_machine.schedule_task(task).unwrap();
992 assert_eq!(state_machine.active_task_count(), 1);
993 assert_eq!(state_machine.total_task_count(), 1);
994 state_machine.deschedule_task(&task);
995 assert_eq!(state_machine.active_task_count(), 0);
996 assert_eq!(state_machine.total_task_count(), 1);
997 assert!(state_machine.has_no_active_task());
998 }
999
1000 #[test]
1001 fn test_conflicting_task_related_counts() {
1002 let sanitized = simplest_transaction();
1003 let address_loader = &mut create_address_loader(None);
1004 let task1 = SchedulingStateMachine::create_task(sanitized.clone(), 101, address_loader);
1005 let task2 = SchedulingStateMachine::create_task(sanitized.clone(), 102, address_loader);
1006 let task3 = SchedulingStateMachine::create_task(sanitized.clone(), 103, address_loader);
1007
1008 let mut state_machine = unsafe {
1009 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1010 };
1011 assert_matches!(
1012 state_machine
1013 .schedule_task(task1.clone())
1014 .map(|t| t.task_index()),
1015 Some(101)
1016 );
1017 assert_matches!(state_machine.schedule_task(task2.clone()), None);
1018
1019 state_machine.deschedule_task(&task1);
1020 assert!(state_machine.has_unblocked_task());
1021 assert_eq!(state_machine.unblocked_task_queue_count(), 1);
1022
1023 assert_eq!(state_machine.unblocked_task_count(), 0);
1025 assert_eq!(
1026 state_machine
1027 .schedule_next_unblocked_task()
1028 .map(|t| t.task_index()),
1029 Some(102)
1030 );
1031 assert_eq!(state_machine.unblocked_task_count(), 1);
1032
1033 assert!(!state_machine.has_unblocked_task());
1036 assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1037 assert_eq!(state_machine.unblocked_task_count(), 1);
1038
1039 assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1040 state_machine.deschedule_task(&task2);
1041
1042 assert_matches!(
1043 state_machine
1044 .schedule_task(task3.clone())
1045 .map(|task| task.task_index()),
1046 Some(103)
1047 );
1048 state_machine.deschedule_task(&task3);
1049 assert!(state_machine.has_no_active_task());
1050 }
1051
1052 #[test]
1053 fn test_existing_blocking_task_then_newly_scheduled_task() {
1054 let sanitized = simplest_transaction();
1055 let address_loader = &mut create_address_loader(None);
1056 let task1 = SchedulingStateMachine::create_task(sanitized.clone(), 101, address_loader);
1057 let task2 = SchedulingStateMachine::create_task(sanitized.clone(), 102, address_loader);
1058 let task3 = SchedulingStateMachine::create_task(sanitized.clone(), 103, address_loader);
1059
1060 let mut state_machine = unsafe {
1061 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1062 };
1063 assert_matches!(
1064 state_machine
1065 .schedule_task(task1.clone())
1066 .map(|t| t.task_index()),
1067 Some(101)
1068 );
1069 assert_matches!(state_machine.schedule_task(task2.clone()), None);
1070
1071 assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1072 state_machine.deschedule_task(&task1);
1073 assert_eq!(state_machine.unblocked_task_queue_count(), 1);
1074
1075 assert_matches!(state_machine.schedule_task(task3.clone()), None);
1077
1078 assert_eq!(state_machine.unblocked_task_count(), 0);
1079 assert_matches!(
1080 state_machine
1081 .schedule_next_unblocked_task()
1082 .map(|t| t.task_index()),
1083 Some(102)
1084 );
1085 assert_eq!(state_machine.unblocked_task_count(), 1);
1086
1087 state_machine.deschedule_task(&task2);
1088
1089 assert_matches!(
1090 state_machine
1091 .schedule_next_unblocked_task()
1092 .map(|t| t.task_index()),
1093 Some(103)
1094 );
1095 assert_eq!(state_machine.unblocked_task_count(), 2);
1096
1097 state_machine.deschedule_task(&task3);
1098 assert!(state_machine.has_no_active_task());
1099 }
1100
1101 #[test]
1102 fn test_multiple_readonly_task_and_counts() {
1103 let conflicting_address = Pubkey::new_unique();
1104 let sanitized1 = transaction_with_readonly_address(conflicting_address);
1105 let sanitized2 = transaction_with_readonly_address(conflicting_address);
1106 let address_loader = &mut create_address_loader(None);
1107 let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1108 let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1109
1110 let mut state_machine = unsafe {
1111 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1112 };
1113 assert_matches!(
1115 state_machine
1116 .schedule_task(task1.clone())
1117 .map(|t| t.task_index()),
1118 Some(101)
1119 );
1120 assert_matches!(
1121 state_machine
1122 .schedule_task(task2.clone())
1123 .map(|t| t.task_index()),
1124 Some(102)
1125 );
1126
1127 assert_eq!(state_machine.active_task_count(), 2);
1128 assert_eq!(state_machine.handled_task_count(), 0);
1129 assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1130 state_machine.deschedule_task(&task1);
1131 assert_eq!(state_machine.active_task_count(), 1);
1132 assert_eq!(state_machine.handled_task_count(), 1);
1133 assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1134 state_machine.deschedule_task(&task2);
1135 assert_eq!(state_machine.active_task_count(), 0);
1136 assert_eq!(state_machine.handled_task_count(), 2);
1137 assert!(state_machine.has_no_active_task());
1138 }
1139
1140 #[test]
1141 fn test_all_blocking_readable_tasks_block_writable_task() {
1142 let conflicting_address = Pubkey::new_unique();
1143 let sanitized1 = transaction_with_readonly_address(conflicting_address);
1144 let sanitized2 = transaction_with_readonly_address(conflicting_address);
1145 let sanitized3 = transaction_with_writable_address(conflicting_address);
1146 let address_loader = &mut create_address_loader(None);
1147 let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1148 let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1149 let task3 = SchedulingStateMachine::create_task(sanitized3, 103, address_loader);
1150
1151 let mut state_machine = unsafe {
1152 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1153 };
1154 assert_matches!(
1155 state_machine
1156 .schedule_task(task1.clone())
1157 .map(|t| t.task_index()),
1158 Some(101)
1159 );
1160 assert_matches!(
1161 state_machine
1162 .schedule_task(task2.clone())
1163 .map(|t| t.task_index()),
1164 Some(102)
1165 );
1166 assert_matches!(state_machine.schedule_task(task3.clone()), None);
1167
1168 assert_eq!(state_machine.active_task_count(), 3);
1169 assert_eq!(state_machine.handled_task_count(), 0);
1170 assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1171 state_machine.deschedule_task(&task1);
1172 assert_eq!(state_machine.active_task_count(), 2);
1173 assert_eq!(state_machine.handled_task_count(), 1);
1174 assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1175 assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1176 state_machine.deschedule_task(&task2);
1177 assert_eq!(state_machine.active_task_count(), 1);
1178 assert_eq!(state_machine.handled_task_count(), 2);
1179 assert_eq!(state_machine.unblocked_task_queue_count(), 1);
1180 assert_matches!(
1182 state_machine
1183 .schedule_next_unblocked_task()
1184 .map(|t| t.task_index()),
1185 Some(103)
1186 );
1187 state_machine.deschedule_task(&task3);
1188 assert!(state_machine.has_no_active_task());
1189 }
1190
1191 #[test]
1192 fn test_readonly_then_writable_then_readonly_linearized() {
1193 let conflicting_address = Pubkey::new_unique();
1194 let sanitized1 = transaction_with_readonly_address(conflicting_address);
1195 let sanitized2 = transaction_with_writable_address(conflicting_address);
1196 let sanitized3 = transaction_with_readonly_address(conflicting_address);
1197 let address_loader = &mut create_address_loader(None);
1198 let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1199 let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1200 let task3 = SchedulingStateMachine::create_task(sanitized3, 103, address_loader);
1201
1202 let mut state_machine = unsafe {
1203 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1204 };
1205 assert_matches!(
1206 state_machine
1207 .schedule_task(task1.clone())
1208 .map(|t| t.task_index()),
1209 Some(101)
1210 );
1211 assert_matches!(state_machine.schedule_task(task2.clone()), None);
1212 assert_matches!(state_machine.schedule_task(task3.clone()), None);
1213
1214 assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1215 state_machine.deschedule_task(&task1);
1216 assert_matches!(
1217 state_machine
1218 .schedule_next_unblocked_task()
1219 .map(|t| t.task_index()),
1220 Some(102)
1221 );
1222 assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1223 state_machine.deschedule_task(&task2);
1224 assert_matches!(
1225 state_machine
1226 .schedule_next_unblocked_task()
1227 .map(|t| t.task_index()),
1228 Some(103)
1229 );
1230 assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1231 state_machine.deschedule_task(&task3);
1232 assert!(state_machine.has_no_active_task());
1233 }
1234
1235 #[test]
1236 fn test_readonly_then_writable() {
1237 let conflicting_address = Pubkey::new_unique();
1238 let sanitized1 = transaction_with_readonly_address(conflicting_address);
1239 let sanitized2 = transaction_with_writable_address(conflicting_address);
1240 let address_loader = &mut create_address_loader(None);
1241 let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1242 let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1243
1244 let mut state_machine = unsafe {
1245 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1246 };
1247 assert_matches!(
1248 state_machine
1249 .schedule_task(task1.clone())
1250 .map(|t| t.task_index()),
1251 Some(101)
1252 );
1253 assert_matches!(state_machine.schedule_task(task2.clone()), None);
1254
1255 state_machine.deschedule_task(&task1);
1257 assert_matches!(
1258 state_machine
1259 .schedule_next_unblocked_task()
1260 .map(|t| t.task_index()),
1261 Some(102)
1262 );
1263 state_machine.deschedule_task(&task2);
1264 assert!(state_machine.has_no_active_task());
1265 }
1266
1267 #[test]
1268 fn test_blocked_tasks_writable_2_readonly_then_writable() {
1269 let conflicting_address = Pubkey::new_unique();
1270 let sanitized1 = transaction_with_writable_address(conflicting_address);
1271 let sanitized2 = transaction_with_readonly_address(conflicting_address);
1272 let sanitized3 = transaction_with_readonly_address(conflicting_address);
1273 let sanitized4 = transaction_with_writable_address(conflicting_address);
1274 let address_loader = &mut create_address_loader(None);
1275 let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1276 let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1277 let task3 = SchedulingStateMachine::create_task(sanitized3, 103, address_loader);
1278 let task4 = SchedulingStateMachine::create_task(sanitized4, 104, address_loader);
1279
1280 let mut state_machine = unsafe {
1281 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1282 };
1283 assert_matches!(
1284 state_machine
1285 .schedule_task(task1.clone())
1286 .map(|t| t.task_index()),
1287 Some(101)
1288 );
1289 assert_matches!(state_machine.schedule_task(task2.clone()), None);
1290 assert_matches!(state_machine.schedule_task(task3.clone()), None);
1291 assert_matches!(state_machine.schedule_task(task4.clone()), None);
1292
1293 state_machine.deschedule_task(&task1);
1294 assert_matches!(
1295 state_machine
1296 .schedule_next_unblocked_task()
1297 .map(|t| t.task_index()),
1298 Some(102)
1299 );
1300 assert_matches!(
1301 state_machine
1302 .schedule_next_unblocked_task()
1303 .map(|t| t.task_index()),
1304 Some(103)
1305 );
1306 assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1309
1310 state_machine.deschedule_task(&task2);
1311 assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1313
1314 state_machine.deschedule_task(&task3);
1315 assert_matches!(
1317 state_machine
1318 .schedule_next_unblocked_task()
1319 .map(|t| t.task_index()),
1320 Some(104)
1321 );
1322 state_machine.deschedule_task(&task4);
1323 assert!(state_machine.has_no_active_task());
1324 }
1325
1326 #[test]
1327 fn test_gradual_locking() {
1328 let conflicting_address = Pubkey::new_unique();
1329 let sanitized1 = transaction_with_writable_address(conflicting_address);
1330 let sanitized2 = transaction_with_writable_address(conflicting_address);
1331 let usage_queues = Rc::new(RefCell::new(HashMap::new()));
1332 let address_loader = &mut create_address_loader(Some(usage_queues.clone()));
1333 let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1334 let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1335
1336 let mut state_machine = unsafe {
1337 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1338 };
1339 assert_matches!(
1340 state_machine
1341 .schedule_task(task1.clone())
1342 .map(|t| t.task_index()),
1343 Some(101)
1344 );
1345 assert_matches!(state_machine.schedule_task(task2.clone()), None);
1346 let usage_queues = usage_queues.borrow_mut();
1347 let usage_queue = usage_queues.get(&conflicting_address).unwrap();
1348 usage_queue
1349 .0
1350 .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1351 assert_matches!(usage_queue.current_usage, Some(Usage::Writable));
1352 });
1353 let fee_payer = task2.transaction().message().fee_payer();
1356 let usage_queue = usage_queues.get(fee_payer).unwrap();
1357 usage_queue
1358 .0
1359 .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1360 assert_matches!(usage_queue.current_usage, Some(Usage::Writable));
1361 });
1362 state_machine.deschedule_task(&task1);
1363 assert_matches!(
1364 state_machine
1365 .schedule_next_unblocked_task()
1366 .map(|t| t.task_index()),
1367 Some(102)
1368 );
1369 state_machine.deschedule_task(&task2);
1370 assert!(state_machine.has_no_active_task());
1371 }
1372
1373 #[test]
1374 #[should_panic(expected = "internal error: entered unreachable code")]
1375 fn test_unreachable_unlock_conditions1() {
1376 let mut state_machine = unsafe {
1377 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1378 };
1379 let usage_queue = UsageQueue::default();
1380 usage_queue
1381 .0
1382 .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1383 let _ = usage_queue.unlock(RequestedUsage::Writable);
1384 });
1385 }
1386
1387 #[test]
1388 #[should_panic(expected = "internal error: entered unreachable code")]
1389 fn test_unreachable_unlock_conditions2() {
1390 let mut state_machine = unsafe {
1391 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1392 };
1393 let usage_queue = UsageQueue::default();
1394 usage_queue
1395 .0
1396 .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1397 usage_queue.current_usage = Some(Usage::Writable);
1398 let _ = usage_queue.unlock(RequestedUsage::Readonly);
1399 });
1400 }
1401
1402 #[test]
1403 #[should_panic(expected = "internal error: entered unreachable code")]
1404 fn test_unreachable_unlock_conditions3() {
1405 let mut state_machine = unsafe {
1406 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
1407 };
1408 let usage_queue = UsageQueue::default();
1409 usage_queue
1410 .0
1411 .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1412 usage_queue.current_usage = Some(Usage::Readonly(ShortCounter::one()));
1413 let _ = usage_queue.unlock(RequestedUsage::Writable);
1414 });
1415 }
1416}