sp_runtime/offchain/
storage_lock.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: Apache-2.0
5
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10// 	http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! # Off-chain Storage Lock
19//!
20//! A storage-based lock with a defined expiry time.
21//!
22//! The lock is using Local Storage and allows synchronizing access to critical
23//! section of your code for concurrently running Off-chain Workers. Usage of
24//! `PERSISTENT` variant of the storage persists the lock value across a full node
25//! restart or re-orgs.
26//!
27//! A use case for the lock is to make sure that a particular section of the
28//! code is only run by one Off-chain Worker at a time. This may include
29//! performing a side-effect (i.e. an HTTP call) or alteration of single or
30//! multiple Local Storage entries.
31//!
32//! One use case would be collective updates of multiple data items or append /
33//! remove of i.e. sets, vectors which are stored in the off-chain storage DB.
34//!
35//! ## Example:
36//!
37//! ```rust
38//! # use codec::{Decode, Encode, Codec};
39//! // in your off-chain worker code
40//! use sp_runtime::offchain::{
41//! 		storage::StorageValueRef,
42//! 		storage_lock::{StorageLock, Time},
43//! };
44//!
45//! fn append_to_in_storage_vec<'a, T>(key: &'a [u8], _: T) where T: Codec {
46//!    // `access::lock` defines the storage entry which is used for
47//!    // persisting the lock in the underlying database.
48//!    // The entry name _must_ be unique and can be interpreted as a
49//!    // unique mutex instance reference tag.
50//!    let mut lock = StorageLock::<Time>::new(b"access::lock");
51//!    {
52//!         let _guard = lock.lock();
53//!         let acc = StorageValueRef::persistent(key);
54//!         let v: Vec<T> = acc.get::<Vec<T>>().unwrap().unwrap();
55//!         // modify `v` as desired
56//!         // i.e. perform some heavy computation with
57//!         // side effects that should only be done once.
58//!         acc.set(&v);
59//!         // drop `_guard` implicitly at end of scope
60//!    }
61//! }
62//! ```
63
64use crate::{
65	offchain::storage::{MutateStorageError, StorageRetrievalError, StorageValueRef},
66	traits::BlockNumberProvider,
67};
68use codec::{Codec, Decode, Encode};
69use core::fmt;
70use sp_core::offchain::{Duration, Timestamp};
71use sp_io::offchain;
72
73/// Default expiry duration for time based locks in milliseconds.
74const STORAGE_LOCK_DEFAULT_EXPIRY_DURATION: Duration = Duration::from_millis(20_000);
75
76/// Default expiry duration for block based locks in blocks.
77const STORAGE_LOCK_DEFAULT_EXPIRY_BLOCKS: u32 = 4;
78
79/// Time between checks if the lock is still being held in milliseconds.
80const STORAGE_LOCK_PER_CHECK_ITERATION_SNOOZE_MIN: Duration = Duration::from_millis(10);
81const STORAGE_LOCK_PER_CHECK_ITERATION_SNOOZE_MAX: Duration = Duration::from_millis(100);
82
83/// Lockable item for use with a persisted storage lock.
84///
85/// Bound for an item that has a stateful ordered meaning
86/// without explicitly requiring `Ord` trait in general.
87pub trait Lockable: Sized {
88	/// An instant type specifying i.e. a point in time.
89	type Deadline: Sized + Codec + Clone;
90
91	/// Calculate the deadline based on a current state
92	/// such as time or block number and derives the deadline.
93	fn deadline(&self) -> Self::Deadline;
94
95	/// Verify the deadline has not expired compared to the
96	/// current state, i.e. time or block number.
97	fn has_expired(deadline: &Self::Deadline) -> bool;
98
99	/// Snooze at least until `deadline` is reached.
100	///
101	/// Note that `deadline` is only passed to allow optimizations
102	/// for `Lockables` which have a time based component.
103	fn snooze(_deadline: &Self::Deadline) {
104		sp_io::offchain::sleep_until(
105			offchain::timestamp().add(STORAGE_LOCK_PER_CHECK_ITERATION_SNOOZE_MAX),
106		);
107	}
108}
109
110/// Lockable based on the current timestamp with a configurable expiration time.
111#[derive(Encode, Decode)]
112pub struct Time {
113	/// How long the lock will stay valid once `fn lock(..)` or
114	/// `fn try_lock(..)` successfully acquire a lock.
115	expiration_duration: Duration,
116}
117
118impl Default for Time {
119	fn default() -> Self {
120		Self { expiration_duration: STORAGE_LOCK_DEFAULT_EXPIRY_DURATION }
121	}
122}
123
124impl Lockable for Time {
125	type Deadline = Timestamp;
126
127	fn deadline(&self) -> Self::Deadline {
128		offchain::timestamp().add(self.expiration_duration)
129	}
130
131	fn has_expired(deadline: &Self::Deadline) -> bool {
132		offchain::timestamp() > *deadline
133	}
134
135	fn snooze(deadline: &Self::Deadline) {
136		let now = offchain::timestamp();
137		let remainder: Duration = now.diff(deadline);
138		// do not snooze the full duration, but instead snooze max 100ms
139		// it might get unlocked in another thread
140		let snooze = remainder.clamp(
141			STORAGE_LOCK_PER_CHECK_ITERATION_SNOOZE_MIN,
142			STORAGE_LOCK_PER_CHECK_ITERATION_SNOOZE_MAX,
143		);
144		sp_io::offchain::sleep_until(now.add(snooze));
145	}
146}
147
148/// A deadline based on block number and time.
149#[derive(Encode, Decode, Eq, PartialEq)]
150pub struct BlockAndTimeDeadline<B: BlockNumberProvider> {
151	/// The block number until which the lock is still valid _at least_.
152	pub block_number: <B as BlockNumberProvider>::BlockNumber,
153	/// The timestamp until which the lock is still valid _at least_.
154	pub timestamp: Timestamp,
155}
156
157impl<B: BlockNumberProvider> Clone for BlockAndTimeDeadline<B> {
158	fn clone(&self) -> Self {
159		Self { block_number: self.block_number, timestamp: self.timestamp }
160	}
161}
162
163impl<B: BlockNumberProvider> Default for BlockAndTimeDeadline<B> {
164	/// Provide the current state of block number and time.
165	fn default() -> Self {
166		Self {
167			block_number: B::current_block_number() + STORAGE_LOCK_DEFAULT_EXPIRY_BLOCKS.into(),
168			timestamp: offchain::timestamp().add(STORAGE_LOCK_DEFAULT_EXPIRY_DURATION),
169		}
170	}
171}
172
173impl<B: BlockNumberProvider> fmt::Debug for BlockAndTimeDeadline<B>
174where
175	<B as BlockNumberProvider>::BlockNumber: fmt::Debug,
176{
177	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
178		f.debug_struct("BlockAndTimeDeadline")
179			.field("block_number", &self.block_number)
180			.field("timestamp", &self.timestamp)
181			.finish()
182	}
183}
184
185/// Lockable based on block number and timestamp.
186///
187/// Expiration is defined if both, block number _and_ timestamp
188/// expire.
189pub struct BlockAndTime<B: BlockNumberProvider> {
190	/// Relative block number offset, which is used to determine
191	/// the block number part of the deadline.
192	expiration_block_number_offset: u32,
193	/// Relative duration, used to derive the time based part of
194	/// the deadline.
195	expiration_duration: Duration,
196
197	_phantom: core::marker::PhantomData<B>,
198}
199
200impl<B: BlockNumberProvider> Default for BlockAndTime<B> {
201	fn default() -> Self {
202		Self {
203			expiration_block_number_offset: STORAGE_LOCK_DEFAULT_EXPIRY_BLOCKS,
204			expiration_duration: STORAGE_LOCK_DEFAULT_EXPIRY_DURATION,
205			_phantom: core::marker::PhantomData::<B>,
206		}
207	}
208}
209
210// derive not possible, since `B` does not necessarily implement `trait Clone`
211impl<B: BlockNumberProvider> Clone for BlockAndTime<B> {
212	fn clone(&self) -> Self {
213		Self {
214			expiration_block_number_offset: self.expiration_block_number_offset,
215			expiration_duration: self.expiration_duration,
216			_phantom: core::marker::PhantomData::<B>,
217		}
218	}
219}
220
221impl<B: BlockNumberProvider> Lockable for BlockAndTime<B> {
222	type Deadline = BlockAndTimeDeadline<B>;
223
224	fn deadline(&self) -> Self::Deadline {
225		let block_number = <B as BlockNumberProvider>::current_block_number() +
226			self.expiration_block_number_offset.into();
227		BlockAndTimeDeadline {
228			timestamp: offchain::timestamp().add(self.expiration_duration),
229			block_number,
230		}
231	}
232
233	fn has_expired(deadline: &Self::Deadline) -> bool {
234		offchain::timestamp() > deadline.timestamp &&
235			<B as BlockNumberProvider>::current_block_number() > deadline.block_number
236	}
237
238	fn snooze(deadline: &Self::Deadline) {
239		let now = offchain::timestamp();
240		let remainder: Duration = now.diff(&(deadline.timestamp));
241		let snooze = remainder.clamp(
242			STORAGE_LOCK_PER_CHECK_ITERATION_SNOOZE_MIN,
243			STORAGE_LOCK_PER_CHECK_ITERATION_SNOOZE_MAX,
244		);
245		sp_io::offchain::sleep_until(now.add(snooze));
246	}
247}
248
249/// Storage based lock.
250///
251/// A lock that is persisted in the DB and provides the ability to guard against
252/// concurrent access in an off-chain worker, with a defined expiry deadline
253/// based on the concrete [`Lockable`] implementation.
254pub struct StorageLock<'a, L = Time> {
255	// A storage value ref which defines the DB entry representing the lock.
256	value_ref: StorageValueRef<'a>,
257	lockable: L,
258}
259
260impl<'a, L: Lockable + Default> StorageLock<'a, L> {
261	/// Create a new storage lock with a `default()` instance of type `L`.
262	pub fn new(key: &'a [u8]) -> Self {
263		Self::with_lockable(key, Default::default())
264	}
265}
266
267impl<'a, L: Lockable> StorageLock<'a, L> {
268	/// Create a new storage lock with an explicit instance of a lockable `L`.
269	pub fn with_lockable(key: &'a [u8], lockable: L) -> Self {
270		Self { value_ref: StorageValueRef::<'a>::persistent(key), lockable }
271	}
272
273	/// Extend active lock's deadline
274	fn extend_active_lock(&mut self) -> Result<<L as Lockable>::Deadline, ()> {
275		let res = self.value_ref.mutate(
276			|s: Result<Option<L::Deadline>, StorageRetrievalError>| -> Result<<L as Lockable>::Deadline, ()> {
277			match s {
278				// lock is present and is still active, extend the lock.
279				Ok(Some(deadline)) if !<L as Lockable>::has_expired(&deadline) =>
280					Ok(self.lockable.deadline()),
281				// other cases
282				_ => Err(()),
283			}
284		});
285		match res {
286			Ok(deadline) => Ok(deadline),
287			Err(MutateStorageError::ConcurrentModification(_)) => Err(()),
288			Err(MutateStorageError::ValueFunctionFailed(e)) => Err(e),
289		}
290	}
291
292	/// Internal lock helper to avoid lifetime conflicts.
293	fn try_lock_inner(
294		&mut self,
295		new_deadline: L::Deadline,
296	) -> Result<(), <L as Lockable>::Deadline> {
297		let res = self.value_ref.mutate(
298			|s: Result<Option<L::Deadline>, StorageRetrievalError>|
299			-> Result<<L as Lockable>::Deadline, <L as Lockable>::Deadline> {
300				match s {
301					// no lock set, we can safely acquire it
302					Ok(None) => Ok(new_deadline),
303					// write was good, but read failed
304					Err(_) => Ok(new_deadline),
305					// lock is set, but it is expired. We can re-acquire it.
306					Ok(Some(deadline)) if <L as Lockable>::has_expired(&deadline) =>
307						Ok(new_deadline),
308					// lock is present and is still active
309					Ok(Some(deadline)) => Err(deadline),
310				}
311			},
312		);
313		match res {
314			Ok(_) => Ok(()),
315			Err(MutateStorageError::ConcurrentModification(deadline)) => Err(deadline),
316			Err(MutateStorageError::ValueFunctionFailed(e)) => Err(e),
317		}
318	}
319
320	/// A single attempt to lock using the storage entry.
321	///
322	/// Returns a lock guard on success, otherwise an error containing the
323	/// `<Self::Lockable>::Deadline` in for the currently active lock
324	/// by another task `Err(<L as Lockable>::Deadline)`.
325	pub fn try_lock(&mut self) -> Result<StorageLockGuard<'a, '_, L>, <L as Lockable>::Deadline> {
326		self.try_lock_inner(self.lockable.deadline())?;
327		Ok(StorageLockGuard::<'a, '_> { lock: Some(self) })
328	}
329
330	/// Repeated lock attempts until the lock is successfully acquired.
331	///
332	/// If one uses `fn forget(..)`, it is highly likely `fn try_lock(..)`
333	/// is the correct API to use instead of `fn lock(..)`, since that might
334	/// never unlock in the anticipated span i.e. when used with `BlockAndTime`
335	/// during a certain block number span.
336	pub fn lock(&mut self) -> StorageLockGuard<'a, '_, L> {
337		while let Err(deadline) = self.try_lock_inner(self.lockable.deadline()) {
338			L::snooze(&deadline);
339		}
340		StorageLockGuard::<'a, '_, L> { lock: Some(self) }
341	}
342
343	/// Explicitly unlock the lock.
344	fn unlock(&mut self) {
345		self.value_ref.clear();
346	}
347}
348
349/// RAII style guard for a lock.
350pub struct StorageLockGuard<'a, 'b, L: Lockable> {
351	lock: Option<&'b mut StorageLock<'a, L>>,
352}
353
354impl<'a, 'b, L: Lockable> StorageLockGuard<'a, 'b, L> {
355	/// Consume the guard but **do not** unlock the underlying lock.
356	///
357	/// Can be used to implement a grace period after doing some
358	/// heavy computations and sending a transaction to be included
359	/// on-chain. By forgetting the lock, it will stay locked until
360	/// its expiration deadline is reached while the off-chain worker
361	/// can already exit.
362	pub fn forget(mut self) {
363		let _ = self.lock.take();
364	}
365
366	/// Extend the lock by guard deadline if it already exists.
367	///
368	/// i.e. large sets of items for which it is hard to calculate a
369	/// meaning full conservative deadline which does not block for a
370	/// very long time on node termination.
371	pub fn extend_lock(&mut self) -> Result<<L as Lockable>::Deadline, ()> {
372		if let Some(ref mut lock) = self.lock {
373			lock.extend_active_lock()
374		} else {
375			Err(())
376		}
377	}
378}
379
380impl<'a, 'b, L: Lockable> Drop for StorageLockGuard<'a, 'b, L> {
381	fn drop(&mut self) {
382		if let Some(lock) = self.lock.take() {
383			lock.unlock();
384		}
385	}
386}
387
388impl<'a> StorageLock<'a, Time> {
389	/// Explicitly create a time based storage lock with a non-default
390	/// expiration timeout.
391	pub fn with_deadline(key: &'a [u8], expiration_duration: Duration) -> Self {
392		Self {
393			value_ref: StorageValueRef::<'a>::persistent(key),
394			lockable: Time { expiration_duration },
395		}
396	}
397}
398
399impl<'a, B> StorageLock<'a, BlockAndTime<B>>
400where
401	B: BlockNumberProvider,
402{
403	/// Explicitly create a time and block number based storage lock with
404	/// a non-default expiration duration and block number offset.
405	pub fn with_block_and_time_deadline(
406		key: &'a [u8],
407		expiration_block_number_offset: u32,
408		expiration_duration: Duration,
409	) -> Self {
410		Self {
411			value_ref: StorageValueRef::<'a>::persistent(key),
412			lockable: BlockAndTime::<B> {
413				expiration_block_number_offset,
414				expiration_duration,
415				_phantom: core::marker::PhantomData,
416			},
417		}
418	}
419
420	/// Explicitly create a time and block number based storage lock with
421	/// the default expiration duration and a non-default block number offset.
422	pub fn with_block_deadline(key: &'a [u8], expiration_block_number_offset: u32) -> Self {
423		Self {
424			value_ref: StorageValueRef::<'a>::persistent(key),
425			lockable: BlockAndTime::<B> {
426				expiration_block_number_offset,
427				expiration_duration: STORAGE_LOCK_DEFAULT_EXPIRY_DURATION,
428				_phantom: core::marker::PhantomData,
429			},
430		}
431	}
432}
433
434#[cfg(test)]
435mod tests {
436	use super::*;
437	use sp_core::offchain::{testing, OffchainDbExt, OffchainWorkerExt};
438	use sp_io::TestExternalities;
439
440	const VAL_1: u32 = 0u32;
441	const VAL_2: u32 = 0xFFFF_FFFFu32;
442
443	#[test]
444	fn storage_lock_write_unlock_lock_read_unlock() {
445		let (offchain, state) = testing::TestOffchainExt::new();
446		let mut t = TestExternalities::default();
447		t.register_extension(OffchainDbExt::new(offchain.clone()));
448		t.register_extension(OffchainWorkerExt::new(offchain));
449
450		t.execute_with(|| {
451			let mut lock = StorageLock::<'_, Time>::new(b"lock_1");
452
453			let val = StorageValueRef::persistent(b"protected_value");
454
455			{
456				let _guard = lock.lock();
457
458				val.set(&VAL_1);
459
460				assert_eq!(val.get::<u32>(), Ok(Some(VAL_1)));
461			}
462
463			{
464				let _guard = lock.lock();
465				val.set(&VAL_2);
466
467				assert_eq!(val.get::<u32>(), Ok(Some(VAL_2)));
468			}
469		});
470		// lock must have been cleared at this point
471		assert_eq!(state.read().persistent_storage.get(b"lock_1"), None);
472	}
473
474	#[test]
475	fn storage_lock_and_forget() {
476		let (offchain, state) = testing::TestOffchainExt::new();
477		let mut t = TestExternalities::default();
478		t.register_extension(OffchainDbExt::new(offchain.clone()));
479		t.register_extension(OffchainWorkerExt::new(offchain));
480
481		t.execute_with(|| {
482			let mut lock = StorageLock::<'_, Time>::new(b"lock_2");
483
484			let val = StorageValueRef::persistent(b"protected_value");
485
486			let guard = lock.lock();
487
488			val.set(&VAL_1);
489
490			assert_eq!(val.get::<u32>(), Ok(Some(VAL_1)));
491
492			guard.forget();
493		});
494		// lock must have been cleared at this point
495		let opt = state.read().persistent_storage.get(b"lock_2");
496		assert!(opt.is_some());
497	}
498
499	#[test]
500	fn storage_lock_and_let_expire_and_lock_again() {
501		let (offchain, state) = testing::TestOffchainExt::new();
502		let mut t = TestExternalities::default();
503		t.register_extension(OffchainDbExt::new(offchain.clone()));
504		t.register_extension(OffchainWorkerExt::new(offchain));
505
506		t.execute_with(|| {
507			let sleep_until = offchain::timestamp().add(Duration::from_millis(500));
508			let lock_expiration = Duration::from_millis(200);
509
510			let mut lock = StorageLock::<'_, Time>::with_deadline(b"lock_3", lock_expiration);
511
512			{
513				let guard = lock.lock();
514				guard.forget();
515			}
516
517			// assure the lock expires
518			offchain::sleep_until(sleep_until);
519
520			let mut lock = StorageLock::<'_, Time>::new(b"lock_3");
521			let res = lock.try_lock();
522			assert!(res.is_ok());
523			let guard = res.unwrap();
524			guard.forget();
525		});
526
527		// lock must have been cleared at this point
528		let opt = state.read().persistent_storage.get(b"lock_3");
529		assert!(opt.is_some());
530	}
531
532	#[test]
533	fn extend_active_lock() {
534		let (offchain, state) = testing::TestOffchainExt::new();
535		let mut t = TestExternalities::default();
536		t.register_extension(OffchainDbExt::new(offchain.clone()));
537		t.register_extension(OffchainWorkerExt::new(offchain));
538
539		t.execute_with(|| {
540			let lock_expiration = Duration::from_millis(300);
541
542			let mut lock = StorageLock::<'_, Time>::with_deadline(b"lock_4", lock_expiration);
543			let mut guard = lock.lock();
544
545			// sleep_until < lock_expiration
546			offchain::sleep_until(offchain::timestamp().add(Duration::from_millis(200)));
547
548			// the lock is still active, extend it successfully
549			assert_eq!(guard.extend_lock().is_ok(), true);
550
551			// sleep_until < deadline
552			offchain::sleep_until(offchain::timestamp().add(Duration::from_millis(200)));
553
554			// the lock is still active, try_lock will fail
555			let mut lock = StorageLock::<'_, Time>::with_deadline(b"lock_4", lock_expiration);
556			let res = lock.try_lock();
557			assert_eq!(res.is_ok(), false);
558
559			// sleep again until sleep_until > deadline
560			offchain::sleep_until(offchain::timestamp().add(Duration::from_millis(200)));
561
562			// the lock has expired, failed to extend it
563			assert_eq!(guard.extend_lock().is_ok(), false);
564			guard.forget();
565
566			// try_lock will succeed
567			let mut lock = StorageLock::<'_, Time>::with_deadline(b"lock_4", lock_expiration);
568			let res = lock.try_lock();
569			assert!(res.is_ok());
570			let guard = res.unwrap();
571
572			guard.forget();
573		});
574
575		// lock must have been cleared at this point
576		let opt = state.read().persistent_storage.get(b"lock_4");
577		assert_eq!(opt.unwrap(), vec![132_u8, 3u8, 0, 0, 0, 0, 0, 0]); // 132 + 256 * 3 = 900
578	}
579}