Struct sc_network_sync::ChainSync
source · pub struct ChainSync<B: BlockT, Client> { /* private fields */ }
Expand description
The main data structure which contains all the state for a chains active syncing strategy.
Implementations§
source§impl<B, Client> ChainSync<B, Client>where
Self: ChainSyncT<B>,
B: BlockT,
Client: HeaderBackend<B> + BlockBackend<B> + HeaderMetadata<B, Error = Error> + ProofProvider<B> + Send + Sync + 'static,
impl<B, Client> ChainSync<B, Client>where
Self: ChainSyncT<B>,
B: BlockT,
Client: HeaderBackend<B> + BlockBackend<B> + HeaderMetadata<B, Error = Error> + ProofProvider<B> + Send + Sync + 'static,
sourcepub fn new(
mode: SyncMode,
client: Arc<Client>,
protocol_id: ProtocolId,
fork_id: &Option<String>,
roles: Roles,
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
max_parallel_downloads: u32,
warp_sync_provider: Option<Arc<dyn WarpSyncProvider<B>>>,
metrics_registry: Option<&Registry>,
network_service: NetworkServiceHandle,
import_queue: Box<dyn ImportQueueService<B>>,
block_request_protocol_name: ProtocolName,
state_request_protocol_name: ProtocolName,
warp_sync_protocol_name: Option<ProtocolName>
) -> Result<(Self, ChainSyncInterfaceHandle<B>, NonDefaultSetConfig), ClientError>
pub fn new(
mode: SyncMode,
client: Arc<Client>,
protocol_id: ProtocolId,
fork_id: &Option<String>,
roles: Roles,
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
max_parallel_downloads: u32,
warp_sync_provider: Option<Arc<dyn WarpSyncProvider<B>>>,
metrics_registry: Option<&Registry>,
network_service: NetworkServiceHandle,
import_queue: Box<dyn ImportQueueService<B>>,
block_request_protocol_name: ProtocolName,
state_request_protocol_name: ProtocolName,
warp_sync_protocol_name: Option<ProtocolName>
) -> Result<(Self, ChainSyncInterfaceHandle<B>, NonDefaultSetConfig), ClientError>
Create a new instance.
sourcepub fn get_block_announce_proto_config(
protocol_id: ProtocolId,
fork_id: &Option<String>,
roles: Roles,
best_number: NumberFor<B>,
best_hash: B::Hash,
genesis_hash: B::Hash
) -> NonDefaultSetConfig
pub fn get_block_announce_proto_config(
protocol_id: ProtocolId,
fork_id: &Option<String>,
roles: Roles,
best_number: NumberFor<B>,
best_hash: B::Hash,
genesis_hash: B::Hash
) -> NonDefaultSetConfig
Get config for the block announcement protocol
Examples found in repository?
src/lib.rs (lines 1440-1451)
1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500
pub fn new(
mode: SyncMode,
client: Arc<Client>,
protocol_id: ProtocolId,
fork_id: &Option<String>,
roles: Roles,
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
max_parallel_downloads: u32,
warp_sync_provider: Option<Arc<dyn WarpSyncProvider<B>>>,
metrics_registry: Option<&Registry>,
network_service: service::network::NetworkServiceHandle,
import_queue: Box<dyn ImportQueueService<B>>,
block_request_protocol_name: ProtocolName,
state_request_protocol_name: ProtocolName,
warp_sync_protocol_name: Option<ProtocolName>,
) -> Result<(Self, ChainSyncInterfaceHandle<B>, NonDefaultSetConfig), ClientError> {
let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync");
let block_announce_config = Self::get_block_announce_proto_config(
protocol_id,
fork_id,
roles,
client.info().best_number,
client.info().best_hash,
client
.block_hash(Zero::zero())
.ok()
.flatten()
.expect("Genesis block exists; qed"),
);
let mut sync = Self {
client,
peers: HashMap::new(),
blocks: BlockCollection::new(),
best_queued_hash: Default::default(),
best_queued_number: Zero::zero(),
extra_justifications: ExtraRequests::new("justification"),
mode,
queue_blocks: Default::default(),
fork_targets: Default::default(),
allowed_requests: Default::default(),
block_announce_validator,
max_parallel_downloads,
downloaded_blocks: 0,
block_announce_validation: Default::default(),
block_announce_validation_per_peer_stats: Default::default(),
state_sync: None,
warp_sync: None,
warp_sync_provider,
import_existing: false,
gap_sync: None,
service_rx,
network_service,
block_request_protocol_name,
state_request_protocol_name,
warp_sync_protocol_name,
block_announce_protocol_name: block_announce_config
.notifications_protocol
.clone()
.into(),
pending_responses: Default::default(),
import_queue,
metrics: if let Some(r) = &metrics_registry {
match SyncingMetrics::register(r) {
Ok(metrics) => Some(metrics),
Err(err) => {
error!(target: "sync", "Failed to register metrics for ChainSync: {err:?}");
None
},
}
} else {
None
},
};
sync.reset_sync_start_point()?;
Ok((sync, ChainSyncInterfaceHandle::new(tx), block_announce_config))
}
sourcepub fn on_state_response(
&mut self,
peer_id: PeerId,
response: OpaqueStateResponse
) -> Option<ImportResult<B>>
pub fn on_state_response(
&mut self,
peer_id: PeerId,
response: OpaqueStateResponse
) -> Option<ImportResult<B>>
Examples found in repository?
src/lib.rs (line 2245)
2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306
fn poll_pending_responses(&mut self, cx: &mut std::task::Context) -> Poll<ImportResult<B>> {
while let Poll::Ready(Some((id, request, response))) =
self.pending_responses.poll_next_unpin(cx)
{
match response {
Ok(Ok(resp)) => match request {
PeerRequest::Block(req) => {
let response = match Self::decode_block_response(&resp[..]) {
Ok(proto) => proto,
Err(e) => {
debug!(
target: "sync",
"Failed to decode block response from peer {:?}: {:?}.",
id,
e
);
self.network_service.report_peer(id, rep::BAD_MESSAGE);
self.network_service
.disconnect_peer(id, self.block_announce_protocol_name.clone());
continue
},
};
if let Some(import) = self.on_block_response(id, req, response) {
return Poll::Ready(import)
}
},
PeerRequest::State => {
let response = match Self::decode_state_response(&resp[..]) {
Ok(proto) => proto,
Err(e) => {
debug!(
target: "sync",
"Failed to decode state response from peer {:?}: {:?}.",
id,
e
);
self.network_service.report_peer(id, rep::BAD_MESSAGE);
self.network_service
.disconnect_peer(id, self.block_announce_protocol_name.clone());
continue
},
};
if let Some(import) = self.on_state_response(id, response) {
return Poll::Ready(import)
}
},
PeerRequest::WarpProof => {
self.on_warp_sync_response(id, EncodedProof(resp));
},
},
Ok(Err(e)) => {
debug!(target: "sync", "Request to peer {:?} failed: {:?}.", id, e);
match e {
RequestFailure::Network(OutboundFailure::Timeout) => {
self.network_service.report_peer(id, rep::TIMEOUT);
self.network_service
.disconnect_peer(id, self.block_announce_protocol_name.clone());
},
RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => {
self.network_service.report_peer(id, rep::BAD_PROTOCOL);
self.network_service
.disconnect_peer(id, self.block_announce_protocol_name.clone());
},
RequestFailure::Network(OutboundFailure::DialFailure) => {
self.network_service
.disconnect_peer(id, self.block_announce_protocol_name.clone());
},
RequestFailure::Refused => {
self.network_service.report_peer(id, rep::REFUSED);
self.network_service
.disconnect_peer(id, self.block_announce_protocol_name.clone());
},
RequestFailure::Network(OutboundFailure::ConnectionClosed) |
RequestFailure::NotConnected => {
self.network_service
.disconnect_peer(id, self.block_announce_protocol_name.clone());
},
RequestFailure::UnknownProtocol => {
debug_assert!(false, "Block request protocol should always be known.");
},
RequestFailure::Obsolete => {
debug_assert!(
false,
"Can not receive `RequestFailure::Obsolete` after dropping the \
response receiver.",
);
},
}
},
Err(oneshot::Canceled) => {
trace!(
target: "sync",
"Request to peer {:?} failed due to oneshot being canceled.",
id,
);
self.network_service
.disconnect_peer(id, self.block_announce_protocol_name.clone());
},
}
}
Poll::Pending
}
sourcepub fn on_warp_sync_response(&mut self, peer_id: PeerId, response: EncodedProof)
pub fn on_warp_sync_response(&mut self, peer_id: PeerId, response: EncodedProof)
Examples found in repository?
src/lib.rs (line 2250)
2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306
fn poll_pending_responses(&mut self, cx: &mut std::task::Context) -> Poll<ImportResult<B>> {
while let Poll::Ready(Some((id, request, response))) =
self.pending_responses.poll_next_unpin(cx)
{
match response {
Ok(Ok(resp)) => match request {
PeerRequest::Block(req) => {
let response = match Self::decode_block_response(&resp[..]) {
Ok(proto) => proto,
Err(e) => {
debug!(
target: "sync",
"Failed to decode block response from peer {:?}: {:?}.",
id,
e
);
self.network_service.report_peer(id, rep::BAD_MESSAGE);
self.network_service
.disconnect_peer(id, self.block_announce_protocol_name.clone());
continue
},
};
if let Some(import) = self.on_block_response(id, req, response) {
return Poll::Ready(import)
}
},
PeerRequest::State => {
let response = match Self::decode_state_response(&resp[..]) {
Ok(proto) => proto,
Err(e) => {
debug!(
target: "sync",
"Failed to decode state response from peer {:?}: {:?}.",
id,
e
);
self.network_service.report_peer(id, rep::BAD_MESSAGE);
self.network_service
.disconnect_peer(id, self.block_announce_protocol_name.clone());
continue
},
};
if let Some(import) = self.on_state_response(id, response) {
return Poll::Ready(import)
}
},
PeerRequest::WarpProof => {
self.on_warp_sync_response(id, EncodedProof(resp));
},
},
Ok(Err(e)) => {
debug!(target: "sync", "Request to peer {:?} failed: {:?}.", id, e);
match e {
RequestFailure::Network(OutboundFailure::Timeout) => {
self.network_service.report_peer(id, rep::TIMEOUT);
self.network_service
.disconnect_peer(id, self.block_announce_protocol_name.clone());
},
RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => {
self.network_service.report_peer(id, rep::BAD_PROTOCOL);
self.network_service
.disconnect_peer(id, self.block_announce_protocol_name.clone());
},
RequestFailure::Network(OutboundFailure::DialFailure) => {
self.network_service
.disconnect_peer(id, self.block_announce_protocol_name.clone());
},
RequestFailure::Refused => {
self.network_service.report_peer(id, rep::REFUSED);
self.network_service
.disconnect_peer(id, self.block_announce_protocol_name.clone());
},
RequestFailure::Network(OutboundFailure::ConnectionClosed) |
RequestFailure::NotConnected => {
self.network_service
.disconnect_peer(id, self.block_announce_protocol_name.clone());
},
RequestFailure::UnknownProtocol => {
debug_assert!(false, "Block request protocol should always be known.");
},
RequestFailure::Obsolete => {
debug_assert!(
false,
"Can not receive `RequestFailure::Obsolete` after dropping the \
response receiver.",
);
},
}
},
Err(oneshot::Canceled) => {
trace!(
target: "sync",
"Request to peer {:?} failed due to oneshot being canceled.",
id,
);
self.network_service
.disconnect_peer(id, self.block_announce_protocol_name.clone());
},
}
}
Poll::Pending
}
Trait Implementations§
source§impl<B, Client> ChainSync<B> for ChainSync<B, Client>where
B: BlockT,
Client: HeaderBackend<B> + BlockBackend<B> + HeaderMetadata<B, Error = Error> + ProofProvider<B> + Send + Sync + 'static,
impl<B, Client> ChainSync<B> for ChainSync<B, Client>where
B: BlockT,
Client: HeaderBackend<B> + BlockBackend<B> + HeaderMetadata<B, Error = Error> + ProofProvider<B> + Send + Sync + 'static,
source§fn status(&self) -> SyncStatus<B>
fn status(&self) -> SyncStatus<B>
Returns the current sync status.
source§fn peer_info(&self, who: &PeerId) -> Option<PeerInfo<B>>
fn peer_info(&self, who: &PeerId) -> Option<PeerInfo<B>>
Returns the state of the sync of the given peer. Read more
source§fn num_sync_requests(&self) -> usize
fn num_sync_requests(&self) -> usize
Number of active forks requests. This includes
requests that are pending or could be issued right away.
source§fn num_downloaded_blocks(&self) -> usize
fn num_downloaded_blocks(&self) -> usize
Number of downloaded blocks.
source§fn num_peers(&self) -> usize
fn num_peers(&self) -> usize
Returns the current number of peers stored within this state machine.
source§fn num_active_peers(&self) -> usize
fn num_active_peers(&self) -> usize
Returns the number of peers we’re connected to and that are being queried.
source§fn new_peer(
&mut self,
who: PeerId,
best_hash: B::Hash,
best_number: NumberFor<B>
) -> Result<Option<BlockRequest<B>>, BadPeer>
fn new_peer(
&mut self,
who: PeerId,
best_hash: B::Hash,
best_number: NumberFor<B>
) -> Result<Option<BlockRequest<B>>, BadPeer>
Handle a new connected peer. Read more
source§fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor<B>)
fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor<B>)
Signal that a new best block has been imported.
source§fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>)
fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>)
Schedule a justification request for the given block.
source§fn clear_justification_requests(&mut self)
fn clear_justification_requests(&mut self)
Clear all pending justification requests.
source§fn set_sync_fork_request(
&mut self,
peers: Vec<PeerId>,
hash: &B::Hash,
number: NumberFor<B>
)
fn set_sync_fork_request(
&mut self,
peers: Vec<PeerId>,
hash: &B::Hash,
number: NumberFor<B>
)
Request syncing for the given block from given set of peers.
source§fn on_block_data(
&mut self,
who: &PeerId,
request: Option<BlockRequest<B>>,
response: BlockResponse<B>
) -> Result<OnBlockData<B>, BadPeer>
fn on_block_data(
&mut self,
who: &PeerId,
request: Option<BlockRequest<B>>,
response: BlockResponse<B>
) -> Result<OnBlockData<B>, BadPeer>
Handle a response from the remote to a block request that we made. Read more
source§fn process_block_response_data(
&mut self,
blocks_to_import: Result<OnBlockData<B>, BadPeer>
)
fn process_block_response_data(
&mut self,
blocks_to_import: Result<OnBlockData<B>, BadPeer>
)
Procss received block data.
source§fn on_block_justification(
&mut self,
who: PeerId,
response: BlockResponse<B>
) -> Result<OnBlockJustification<B>, BadPeer>
fn on_block_justification(
&mut self,
who: PeerId,
response: BlockResponse<B>
) -> Result<OnBlockJustification<B>, BadPeer>
Handle a response from the remote to a justification request that we made. Read more
source§fn on_justification_import(
&mut self,
hash: B::Hash,
number: NumberFor<B>,
success: bool
)
fn on_justification_import(
&mut self,
hash: B::Hash,
number: NumberFor<B>,
success: bool
)
Call this when a justification has been processed by the import queue,
with or without errors.
source§fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor<B>)
fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor<B>)
Notify about finalization of the given block.
source§fn push_block_announce_validation(
&mut self,
who: PeerId,
hash: B::Hash,
announce: BlockAnnounce<B::Header>,
is_best: bool
)
fn push_block_announce_validation(
&mut self,
who: PeerId,
hash: B::Hash,
announce: BlockAnnounce<B::Header>,
is_best: bool
)
Push a block announce validation. Read more
source§fn poll_block_announce_validation(
&mut self,
cx: &mut Context<'_>
) -> Poll<PollBlockAnnounceValidation<B::Header>>
fn poll_block_announce_validation(
&mut self,
cx: &mut Context<'_>
) -> Poll<PollBlockAnnounceValidation<B::Header>>
Poll block announce validation. Read more
source§fn peer_disconnected(&mut self, who: &PeerId)
fn peer_disconnected(&mut self, who: &PeerId)
Call when a peer has disconnected.
Canceled obsolete block request may result in some blocks being ready for
import, so this functions checks for such blocks and returns them.
source§fn block_response_into_blocks(
&self,
request: &BlockRequest<B>,
response: OpaqueBlockResponse
) -> Result<Vec<BlockData<B>>, String>
fn block_response_into_blocks(
&self,
request: &BlockRequest<B>,
response: OpaqueBlockResponse
) -> Result<Vec<BlockData<B>>, String>
Access blocks from implementation-specific block response.
source§fn poll(
&mut self,
cx: &mut Context<'_>
) -> Poll<PollBlockAnnounceValidation<B::Header>>
fn poll(
&mut self,
cx: &mut Context<'_>
) -> Poll<PollBlockAnnounceValidation<B::Header>>
Advance the state of
ChainSync
Read moresource§fn send_block_request(&mut self, who: PeerId, request: BlockRequest<B>)
fn send_block_request(&mut self, who: PeerId, request: BlockRequest<B>)
Send block request to peer
Auto Trait Implementations§
impl<B, Client> !RefUnwindSafe for ChainSync<B, Client>
impl<B, Client> Send for ChainSync<B, Client>where
Client: Send + Sync,
impl<B, Client> !Sync for ChainSync<B, Client>
impl<B, Client> Unpin for ChainSync<B, Client>where
<B as Block>::Extrinsic: Unpin,
<B as Block>::Hash: Unpin,
<B as Block>::Header: Unpin,
<<B as Block>::Header as Header>::Number: Unpin,
impl<B, Client> !UnwindSafe for ChainSync<B, Client>
Blanket Implementations§
source§impl<T> CheckedConversion for T
impl<T> CheckedConversion for T
§impl<T> Downcast for Twhere
T: Any,
impl<T> Downcast for Twhere
T: Any,
§fn into_any(self: Box<T, Global>) -> Box<dyn Any + 'static, Global>
fn into_any(self: Box<T, Global>) -> Box<dyn Any + 'static, Global>
Convert
Box<dyn Trait>
(where Trait: Downcast
) to Box<dyn Any>
. Box<dyn Any>
can
then be further downcast
into Box<ConcreteType>
where ConcreteType
implements Trait
.§fn into_any_rc(self: Rc<T>) -> Rc<dyn Any + 'static>
fn into_any_rc(self: Rc<T>) -> Rc<dyn Any + 'static>
Convert
Rc<Trait>
(where Trait: Downcast
) to Rc<Any>
. Rc<Any>
can then be
further downcast
into Rc<ConcreteType>
where ConcreteType
implements Trait
.§fn as_any(&self) -> &(dyn Any + 'static)
fn as_any(&self) -> &(dyn Any + 'static)
Convert
&Trait
(where Trait: Downcast
) to &Any
. This is needed since Rust cannot
generate &Any
’s vtable from &Trait
’s.§fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
Convert
&mut Trait
(where Trait: Downcast
) to &Any
. This is needed since Rust cannot
generate &mut Any
’s vtable from &mut Trait
’s.source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T, Outer> IsWrappedBy<Outer> for Twhere
Outer: AsRef<T> + AsMut<T> + From<T>,
T: From<Outer>,
impl<T, Outer> IsWrappedBy<Outer> for Twhere
Outer: AsRef<T> + AsMut<T> + From<T>,
T: From<Outer>,
§impl<T> Pointable for T
impl<T> Pointable for T
source§impl<T> SaturatedConversion for T
impl<T> SaturatedConversion for T
source§fn saturated_from<T>(t: T) -> Selfwhere
Self: UniqueSaturatedFrom<T>,
fn saturated_from<T>(t: T) -> Selfwhere
Self: UniqueSaturatedFrom<T>,
source§fn saturated_into<T>(self) -> Twhere
Self: UniqueSaturatedInto<T>,
fn saturated_into<T>(self) -> Twhere
Self: UniqueSaturatedInto<T>,
Consume self to return an equivalent value of
T
. Read moresource§impl<S, T> UncheckedInto<T> for Swhere
T: UncheckedFrom<S>,
impl<S, T> UncheckedInto<T> for Swhere
T: UncheckedFrom<S>,
source§fn unchecked_into(self) -> T
fn unchecked_into(self) -> T
The counterpart to
unchecked_from
.source§impl<T, S> UniqueSaturatedInto<T> for Swhere
T: Bounded,
S: TryInto<T>,
impl<T, S> UniqueSaturatedInto<T> for Swhere
T: Bounded,
S: TryInto<T>,
source§fn unique_saturated_into(self) -> T
fn unique_saturated_into(self) -> T
Consume self to return an equivalent value of
T
.