sc_network_sync/service/
network.rs1use futures::{channel::oneshot, StreamExt};
20use sc_network_types::PeerId;
21
22use sc_network::{
23 request_responses::{IfDisconnected, RequestFailure},
24 types::ProtocolName,
25 NetworkPeers, NetworkRequest, ReputationChange,
26};
27use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
28
29use std::sync::Arc;
30
31pub trait Network: NetworkPeers + NetworkRequest {}
33
34impl<T> Network for T where T: NetworkPeers + NetworkRequest {}
35
36pub struct NetworkServiceProvider {
41 rx: TracingUnboundedReceiver<ToServiceCommand>,
42 handle: NetworkServiceHandle,
43}
44
45#[derive(Debug)]
47pub enum ToServiceCommand {
48 DisconnectPeer(PeerId, ProtocolName),
50
51 ReportPeer(PeerId, ReputationChange),
53
54 StartRequest(
56 PeerId,
57 ProtocolName,
58 Vec<u8>,
59 oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
60 IfDisconnected,
61 ),
62}
63
64#[derive(Debug, Clone)]
67pub struct NetworkServiceHandle {
68 tx: TracingUnboundedSender<ToServiceCommand>,
69}
70
71impl NetworkServiceHandle {
72 pub fn new(tx: TracingUnboundedSender<ToServiceCommand>) -> NetworkServiceHandle {
74 Self { tx }
75 }
76
77 pub fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) {
79 let _ = self.tx.unbounded_send(ToServiceCommand::ReportPeer(who, cost_benefit));
80 }
81
82 pub fn disconnect_peer(&self, who: PeerId, protocol: ProtocolName) {
84 let _ = self.tx.unbounded_send(ToServiceCommand::DisconnectPeer(who, protocol));
85 }
86
87 pub fn start_request(
89 &self,
90 who: PeerId,
91 protocol: ProtocolName,
92 request: Vec<u8>,
93 tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
94 connect: IfDisconnected,
95 ) {
96 let _ = self
97 .tx
98 .unbounded_send(ToServiceCommand::StartRequest(who, protocol, request, tx, connect));
99 }
100}
101
102impl NetworkServiceProvider {
103 pub fn new() -> Self {
105 let (tx, rx) = tracing_unbounded("mpsc_network_service_provider", 100_000);
106
107 Self { rx, handle: NetworkServiceHandle::new(tx) }
108 }
109
110 pub fn handle(&self) -> NetworkServiceHandle {
112 self.handle.clone()
113 }
114
115 pub async fn run(self, service: Arc<dyn Network + Send + Sync>) {
117 let Self { mut rx, handle } = self;
118 drop(handle);
119
120 while let Some(inner) = rx.next().await {
121 match inner {
122 ToServiceCommand::DisconnectPeer(peer, protocol_name) =>
123 service.disconnect_peer(peer, protocol_name),
124 ToServiceCommand::ReportPeer(peer, reputation_change) =>
125 service.report_peer(peer, reputation_change),
126 ToServiceCommand::StartRequest(peer, protocol, request, tx, connect) =>
127 service.start_request(peer, protocol, request, None, tx, connect),
128 }
129 }
130 }
131}
132
133#[cfg(test)]
134mod tests {
135 use super::*;
136 use crate::service::mock::MockNetwork;
137
138 #[tokio::test]
141 async fn disconnect_and_report_peer() {
142 let provider = NetworkServiceProvider::new();
143 let handle = provider.handle();
144
145 let peer = PeerId::random();
146 let proto = ProtocolName::from("test-protocol");
147 let proto_clone = proto.clone();
148 let change = sc_network::ReputationChange::new_fatal("test-change");
149
150 let mut mock_network = MockNetwork::new();
151 mock_network
152 .expect_disconnect_peer()
153 .withf(move |in_peer, in_proto| &peer == in_peer && &proto == in_proto)
154 .once()
155 .returning(|_, _| ());
156 mock_network
157 .expect_report_peer()
158 .withf(move |in_peer, in_change| &peer == in_peer && &change == in_change)
159 .once()
160 .returning(|_, _| ());
161
162 tokio::spawn(async move {
163 provider.run(Arc::new(mock_network)).await;
164 });
165
166 handle.disconnect_peer(peer, proto_clone);
167 handle.report_peer(peer, change);
168 }
169}