sc_network_sync/service/
network.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use futures::{channel::oneshot, StreamExt};
20use sc_network_types::PeerId;
21
22use sc_network::{
23	request_responses::{IfDisconnected, RequestFailure},
24	types::ProtocolName,
25	NetworkPeers, NetworkRequest, ReputationChange,
26};
27use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
28
29use std::sync::Arc;
30
31/// Network-related services required by `sc-network-sync`
32pub trait Network: NetworkPeers + NetworkRequest {}
33
34impl<T> Network for T where T: NetworkPeers + NetworkRequest {}
35
36/// Network service provider for `ChainSync`
37///
38/// It runs as an asynchronous task and listens to commands coming from `ChainSync` and
39/// calls the `NetworkService` on its behalf.
40pub struct NetworkServiceProvider {
41	rx: TracingUnboundedReceiver<ToServiceCommand>,
42	handle: NetworkServiceHandle,
43}
44
45/// Commands that `ChainSync` wishes to send to `NetworkService`
46#[derive(Debug)]
47pub enum ToServiceCommand {
48	/// Call `NetworkPeers::disconnect_peer()`
49	DisconnectPeer(PeerId, ProtocolName),
50
51	/// Call `NetworkPeers::report_peer()`
52	ReportPeer(PeerId, ReputationChange),
53
54	/// Call `NetworkRequest::start_request()`
55	StartRequest(
56		PeerId,
57		ProtocolName,
58		Vec<u8>,
59		oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
60		IfDisconnected,
61	),
62}
63
64/// Handle that is (temporarily) passed to `ChainSync` so it can
65/// communicate with `NetworkService` through `SyncingEngine`
66#[derive(Debug, Clone)]
67pub struct NetworkServiceHandle {
68	tx: TracingUnboundedSender<ToServiceCommand>,
69}
70
71impl NetworkServiceHandle {
72	/// Create new service handle
73	pub fn new(tx: TracingUnboundedSender<ToServiceCommand>) -> NetworkServiceHandle {
74		Self { tx }
75	}
76
77	/// Report peer
78	pub fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) {
79		let _ = self.tx.unbounded_send(ToServiceCommand::ReportPeer(who, cost_benefit));
80	}
81
82	/// Disconnect peer
83	pub fn disconnect_peer(&self, who: PeerId, protocol: ProtocolName) {
84		let _ = self.tx.unbounded_send(ToServiceCommand::DisconnectPeer(who, protocol));
85	}
86
87	/// Send request to peer
88	pub fn start_request(
89		&self,
90		who: PeerId,
91		protocol: ProtocolName,
92		request: Vec<u8>,
93		tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
94		connect: IfDisconnected,
95	) {
96		let _ = self
97			.tx
98			.unbounded_send(ToServiceCommand::StartRequest(who, protocol, request, tx, connect));
99	}
100}
101
102impl NetworkServiceProvider {
103	/// Create new `NetworkServiceProvider`
104	pub fn new() -> Self {
105		let (tx, rx) = tracing_unbounded("mpsc_network_service_provider", 100_000);
106
107		Self { rx, handle: NetworkServiceHandle::new(tx) }
108	}
109
110	/// Get handle to talk to the provider
111	pub fn handle(&self) -> NetworkServiceHandle {
112		self.handle.clone()
113	}
114
115	/// Run the `NetworkServiceProvider`
116	pub async fn run(self, service: Arc<dyn Network + Send + Sync>) {
117		let Self { mut rx, handle } = self;
118		drop(handle);
119
120		while let Some(inner) = rx.next().await {
121			match inner {
122				ToServiceCommand::DisconnectPeer(peer, protocol_name) =>
123					service.disconnect_peer(peer, protocol_name),
124				ToServiceCommand::ReportPeer(peer, reputation_change) =>
125					service.report_peer(peer, reputation_change),
126				ToServiceCommand::StartRequest(peer, protocol, request, tx, connect) =>
127					service.start_request(peer, protocol, request, None, tx, connect),
128			}
129		}
130	}
131}
132
133#[cfg(test)]
134mod tests {
135	use super::*;
136	use crate::service::mock::MockNetwork;
137
138	// typical pattern in `Protocol` code where peer is disconnected
139	// and then reported
140	#[tokio::test]
141	async fn disconnect_and_report_peer() {
142		let provider = NetworkServiceProvider::new();
143		let handle = provider.handle();
144
145		let peer = PeerId::random();
146		let proto = ProtocolName::from("test-protocol");
147		let proto_clone = proto.clone();
148		let change = sc_network::ReputationChange::new_fatal("test-change");
149
150		let mut mock_network = MockNetwork::new();
151		mock_network
152			.expect_disconnect_peer()
153			.withf(move |in_peer, in_proto| &peer == in_peer && &proto == in_proto)
154			.once()
155			.returning(|_, _| ());
156		mock_network
157			.expect_report_peer()
158			.withf(move |in_peer, in_change| &peer == in_peer && &change == in_change)
159			.once()
160			.returning(|_, _| ());
161
162		tokio::spawn(async move {
163			provider.run(Arc::new(mock_network)).await;
164		});
165
166		handle.disconnect_peer(peer, proto_clone);
167		handle.report_peer(peer, change);
168	}
169}