penumbra_sdk_app/server/
consensus.rsuse anyhow::Result;
use cnidarium::Storage;
use tendermint::abci::Event;
use tendermint::v0_37::abci::{
request, response, ConsensusRequest as Request, ConsensusResponse as Response,
};
use tokio::sync::mpsc;
use tower::BoxError;
use tower_actor::Message;
use tracing::Instrument;
use crate::app::App;
pub struct Consensus {
queue: mpsc::Receiver<Message<Request, Response, tower::BoxError>>,
storage: Storage,
app: App,
}
pub type ConsensusService = tower_actor::Actor<Request, Response, BoxError>;
fn trace_events(events: &[Event]) {
for event in events {
let span = tracing::debug_span!("event", kind = ?event.kind);
span.in_scope(|| {
for attr in &event.attributes {
tracing::debug!(
k = %String::from_utf8_lossy(attr.key_bytes()),
v = %String::from_utf8_lossy(attr.value_bytes()),
);
}
})
}
}
impl Consensus {
const QUEUE_SIZE: usize = 10;
pub fn new(storage: Storage) -> ConsensusService {
tower_actor::Actor::new(Self::QUEUE_SIZE, |queue: _| {
Consensus::new_inner(storage, queue).run()
})
}
fn new_inner(
storage: Storage,
queue: mpsc::Receiver<Message<Request, Response, tower::BoxError>>,
) -> Self {
let app = App::new(storage.latest_snapshot());
Self {
queue,
storage,
app,
}
}
async fn run(mut self) -> Result<(), tower::BoxError> {
while let Some(Message {
req,
rsp_sender,
span,
}) = self.queue.recv().await
{
let _ = rsp_sender.send(Ok(match req {
Request::InitChain(init_chain) => Response::InitChain(
self.init_chain(init_chain)
.instrument(span)
.await
.expect("init_chain must succeed"),
),
Request::PrepareProposal(proposal) => Response::PrepareProposal(
self.prepare_proposal(proposal)
.instrument(span)
.await
.expect("prepare proposal must succeed"),
),
Request::ProcessProposal(proposal) => Response::ProcessProposal(
self.process_proposal(proposal)
.instrument(span)
.await
.expect("process proposal must succeed"),
),
Request::BeginBlock(begin_block) => Response::BeginBlock(
self.begin_block(begin_block)
.instrument(span)
.await
.expect("begin_block must succeed"),
),
Request::DeliverTx(deliver_tx) => {
Response::DeliverTx(self.deliver_tx(deliver_tx).instrument(span.clone()).await)
}
Request::EndBlock(end_block) => {
Response::EndBlock(self.end_block(end_block).instrument(span).await)
}
Request::Commit => Response::Commit(
self.commit()
.instrument(span)
.await
.expect("commit must succeed"),
),
}));
}
Ok(())
}
async fn init_chain(&mut self, init_chain: request::InitChain) -> Result<response::InitChain> {
let app_state: crate::genesis::AppState =
serde_json::from_slice(&init_chain.app_state_bytes)
.expect("can parse app_state in genesis file");
self.app.init_chain(&app_state).await;
let validators = self.app.cometbft_validator_updates();
let app_hash = match &app_state {
crate::genesis::AppState::Checkpoint(h) => {
tracing::info!(?h, "genesis state is a checkpoint");
self.storage.latest_snapshot().root_hash().await?
}
crate::genesis::AppState::Content(_) => {
tracing::info!("genesis state is a full configuration");
if self.storage.latest_version() != u64::MAX {
anyhow::bail!("database already initialized");
}
self.app.commit(self.storage.clone()).await
}
};
tracing::info!(
consensus_params = ?init_chain.consensus_params,
?validators,
app_hash = ?app_hash,
"finished init_chain"
);
Ok(response::InitChain {
consensus_params: Some(init_chain.consensus_params),
validators,
app_hash: app_hash.0.to_vec().try_into()?,
})
}
async fn prepare_proposal(
&mut self,
proposal: request::PrepareProposal,
) -> Result<response::PrepareProposal> {
tracing::info!(height = ?proposal.height, proposer = ?proposal.proposer_address, "preparing proposal");
let mut tmp_app = App::new(self.storage.latest_snapshot());
Ok(tmp_app.prepare_proposal(proposal).await)
}
async fn process_proposal(
&mut self,
proposal: request::ProcessProposal,
) -> Result<response::ProcessProposal> {
tracing::info!(height = ?proposal.height, proposer = ?proposal.proposer_address, proposal_hash = %proposal.hash, "processing proposal");
let mut tmp_app = App::new(self.storage.latest_snapshot());
Ok(tmp_app.process_proposal(proposal).await)
}
async fn begin_block(
&mut self,
begin_block: request::BeginBlock,
) -> Result<response::BeginBlock> {
tracing::info!(time = ?begin_block.header.time, "beginning block");
let events = self.app.begin_block(&begin_block).await;
Ok(response::BeginBlock { events })
}
async fn deliver_tx(&mut self, deliver_tx: request::DeliverTx) -> response::DeliverTx {
let rsp = self.app.deliver_tx_bytes(deliver_tx.tx.as_ref()).await;
match rsp {
Ok(events) => {
trace_events(&events);
response::DeliverTx {
events,
..Default::default()
}
}
Err(e) => {
tracing::info!(?e, "deliver_tx failed");
response::DeliverTx {
code: 1.into(),
log: format!("{e:#}"),
..Default::default()
}
}
}
}
async fn end_block(&mut self, end_block: request::EndBlock) -> response::EndBlock {
let latest_state_version = self.storage.latest_version();
tracing::info!(height = ?end_block.height, ?latest_state_version, "ending block");
if latest_state_version >= end_block.height as u64 {
tracing::warn!(
%latest_state_version,
%end_block.height,
"chain state version is ahead of the block height, this is an unexpected corruption of chain state"
);
}
let events = self.app.end_block(&end_block).await;
trace_events(&events);
let validator_updates = self.app.cometbft_validator_updates();
tracing::debug!(
?validator_updates,
"sending validator updates to tendermint"
);
response::EndBlock {
validator_updates,
consensus_param_updates: None,
events,
}
}
async fn commit(&mut self) -> Result<response::Commit> {
let app_hash = self.app.commit(self.storage.clone()).await;
tracing::info!(?app_hash, "committed block");
Ok(response::Commit {
data: app_hash.0.to_vec().into(),
retain_height: 0u32.into(),
})
}
}