这是indexloc提供的服务,不要输入任何密码
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config/src/config/quorum_store_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ pub struct QuorumStoreConfig {
pub enable_opt_quorum_store: bool,
pub opt_qs_minimum_batch_age_usecs: u64,
pub enable_payload_v2: bool,
/// Boolean flag that controls the usage of `BatchInfoExt::V2`
pub enable_proof_v2: bool,
}

impl Default for QuorumStoreConfig {
Expand Down Expand Up @@ -140,6 +142,7 @@ impl Default for QuorumStoreConfig {
enable_opt_quorum_store: true,
opt_qs_minimum_batch_age_usecs: Duration::from_millis(50).as_micros() as u64,
enable_payload_v2: false,
enable_proof_v2: false,
}
}
}
Expand Down
37 changes: 33 additions & 4 deletions consensus/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use aptos_consensus_types::{
order_vote_msg::OrderVoteMsg,
pipeline::{commit_decision::CommitDecision, commit_vote::CommitVote},
proof_of_store::{
BatchInfo, ProofOfStore, ProofOfStoreMsg, SignedBatchInfo, SignedBatchInfoMsg,
BatchInfo, BatchInfoExt, ProofOfStore, ProofOfStoreMsg, SignedBatchInfo, SignedBatchInfoMsg,
},
proposal_msg::ProposalMsg,
round_timeout::RoundTimeoutMsg,
Expand Down Expand Up @@ -208,13 +208,24 @@ pub trait QuorumStoreSender: Send + Clone {
recipients: Vec<Author>,
);

async fn send_signed_batch_info_msg_v2(
&self,
signed_batch_infos: Vec<SignedBatchInfo<BatchInfoExt>>,
recipients: Vec<Author>,
);

async fn broadcast_batch_msg(&mut self, batches: Vec<Batch>);

async fn broadcast_proof_of_store_msg(&mut self, proof_of_stores: Vec<ProofOfStore<BatchInfo>>);

async fn broadcast_proof_of_store_msg_v2(
&mut self,
proof_of_stores: Vec<ProofOfStore<BatchInfoExt>>,
);

async fn send_proof_of_store_msg_to_self(
&mut self,
proof_of_stores: Vec<ProofOfStore<BatchInfo>>,
proof_of_stores: Vec<ProofOfStore<BatchInfoExt>>,
);
}

Expand Down Expand Up @@ -570,6 +581,18 @@ impl QuorumStoreSender for NetworkSender {
self.send(msg, recipients).await
}

async fn send_signed_batch_info_msg_v2(
&self,
signed_batch_infos: Vec<SignedBatchInfo<BatchInfoExt>>,
recipients: Vec<Author>,
) {
fail_point!("consensus::send::signed_batch_info", |_| ());
let msg = ConsensusMsg::SignedBatchInfoMsgV2(Box::new(SignedBatchInfoMsg::new(
signed_batch_infos,
)));
self.send(msg, recipients).await
}

async fn broadcast_batch_msg(&mut self, batches: Vec<Batch>) {
fail_point!("consensus::send::broadcast_batch", |_| ());
let msg = ConsensusMsg::BatchMsg(Box::new(BatchMsg::new(batches)));
Expand All @@ -582,9 +605,15 @@ impl QuorumStoreSender for NetworkSender {
self.broadcast(msg).await
}

async fn send_proof_of_store_msg_to_self(&mut self, proofs: Vec<ProofOfStore<BatchInfo>>) {
async fn broadcast_proof_of_store_msg_v2(&mut self, proofs: Vec<ProofOfStore<BatchInfoExt>>) {
fail_point!("consensus::send::proof_of_store", |_| ());
let msg = ConsensusMsg::ProofOfStoreMsg(Box::new(ProofOfStoreMsg::new(proofs)));
let msg = ConsensusMsg::ProofOfStoreMsgV2(Box::new(ProofOfStoreMsg::new(proofs)));
self.broadcast(msg).await
}

async fn send_proof_of_store_msg_to_self(&mut self, proofs: Vec<ProofOfStore<BatchInfoExt>>) {
fail_point!("consensus::send::proof_of_store", |_| ());
let msg = ConsensusMsg::ProofOfStoreMsgV2(Box::new(ProofOfStoreMsg::new(proofs)));
self.send(msg, vec![self.author]).await
}
}
Expand Down
9 changes: 8 additions & 1 deletion consensus/src/network_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use aptos_consensus_types::{
opt_proposal_msg::OptProposalMsg,
order_vote_msg::OrderVoteMsg,
pipeline::{commit_decision::CommitDecision, commit_vote::CommitVote},
proof_of_store::{BatchInfo, ProofOfStoreMsg, SignedBatchInfoMsg},
proof_of_store::{BatchInfo, BatchInfoExt, ProofOfStoreMsg, SignedBatchInfoMsg},
proposal_msg::ProposalMsg,
round_timeout::RoundTimeoutMsg,
sync_info::SyncInfo,
Expand Down Expand Up @@ -91,6 +91,11 @@ pub enum ConsensusMsg {
BlockRetrievalRequest(Box<BlockRetrievalRequest>),
/// OptProposalMsg contains the optimistic proposal and sync info.
OptProposalMsg(Box<OptProposalMsg>),
/// Quorum Store: Send a signed batch digest with BatchInfoExt. This is a vote for the batch and a promise that
/// the batch of transactions was received and will be persisted until batch expiration.
SignedBatchInfoMsgV2(Box<SignedBatchInfoMsg<BatchInfoExt>>),
/// Quorum Store: Broadcast a certified proof of store (a digest that received 2f+1 votes) with BatchInfoExt.
ProofOfStoreMsgV2(Box<ProofOfStoreMsg<BatchInfoExt>>),
}

/// Network type for consensus
Expand Down Expand Up @@ -121,6 +126,8 @@ impl ConsensusMsg {
ConsensusMsg::BatchResponseV2(_) => "BatchResponseV2",
ConsensusMsg::RoundTimeoutMsg(_) => "RoundTimeoutV2",
ConsensusMsg::BlockRetrievalRequest(_) => "BlockRetrievalRequest",
ConsensusMsg::SignedBatchInfoMsgV2(_) => "SignedBatchInfoMsgV2",
ConsensusMsg::ProofOfStoreMsgV2(_) => "ProofOfStoreMsgV2",
}
}
}
Expand Down
31 changes: 24 additions & 7 deletions consensus/src/quorum_store/batch_coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub struct BatchCoordinator {
max_total_bytes: u64,
batch_expiry_gap_when_init_usecs: u64,
transaction_filter_config: BatchTransactionFilterConfig,
enable_proof_v2: bool,
}

impl BatchCoordinator {
Expand All @@ -59,6 +60,7 @@ impl BatchCoordinator {
max_total_bytes: u64,
batch_expiry_gap_when_init_usecs: u64,
transaction_filter_config: BatchTransactionFilterConfig,
enable_proof_v2: bool,
) -> Self {
Self {
my_peer_id,
Expand All @@ -72,6 +74,7 @@ impl BatchCoordinator {
max_total_bytes,
batch_expiry_gap_when_init_usecs,
transaction_filter_config,
enable_proof_v2,
}
}

Expand All @@ -87,6 +90,7 @@ impl BatchCoordinator {
let batch_store = self.batch_store.clone();
let network_sender = self.network_sender.clone();
let sender_to_proof_manager = self.sender_to_proof_manager.clone();
let enable_proof_v2 = self.enable_proof_v2;
tokio::spawn(async move {
let peer_id = persist_requests[0].author();
let batches = persist_requests
Expand All @@ -98,14 +102,27 @@ impl BatchCoordinator {
)
})
.collect();
let signed_batch_infos = batch_store.persist(persist_requests);
if !signed_batch_infos.is_empty() {
if approx_created_ts_usecs > 0 {
observe_batch(approx_created_ts_usecs, peer_id, BatchStage::SIGNED);

if enable_proof_v2 {
let signed_batch_infos = batch_store.persist_v2(persist_requests);
if !signed_batch_infos.is_empty() {
if approx_created_ts_usecs > 0 {
observe_batch(approx_created_ts_usecs, peer_id, BatchStage::SIGNED);
}
network_sender
.send_signed_batch_info_msg_v2(signed_batch_infos, vec![peer_id])
.await;
}
} else {
let signed_batch_infos = batch_store.persist(persist_requests);
if !signed_batch_infos.is_empty() {
if approx_created_ts_usecs > 0 {
observe_batch(approx_created_ts_usecs, peer_id, BatchStage::SIGNED);
}
network_sender
.send_signed_batch_info_msg(signed_batch_infos, vec![peer_id])
.await;
}
network_sender
.send_signed_batch_info_msg(signed_batch_infos, vec![peer_id])
.await;
}
let _ = sender_to_proof_manager
.send(ProofManagerCommand::ReceiveBatches(batches))
Expand Down
59 changes: 51 additions & 8 deletions consensus/src/quorum_store/batch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use crate::{
},
};
use anyhow::bail;
use aptos_consensus_types::proof_of_store::{BatchInfo, SignedBatchInfo};
use aptos_consensus_types::proof_of_store::{
BatchInfo, BatchInfoExt, BatchKind, ExtraBatchInfo, SignedBatchInfo, TBatchInfo,
};
use aptos_crypto::{CryptoMaterialError, HashValue};
use aptos_executor_types::{ExecutorError, ExecutorResult};
use aptos_infallible::Mutex;
Expand Down Expand Up @@ -378,10 +380,10 @@ impl BatchStore {
ret
}

fn generate_signed_batch_info(
fn generate_signed_batch_info<T: TBatchInfo>(
&self,
batch_info: BatchInfo,
) -> Result<SignedBatchInfo<BatchInfo>, CryptoMaterialError> {
batch_info: T,
) -> Result<SignedBatchInfo<T>, CryptoMaterialError> {
fail_point!("quorum_store::create_invalid_signed_batch_info", |_| {
Ok(SignedBatchInfo::new_with_signature(
batch_info.clone(),
Expand All @@ -392,10 +394,17 @@ impl BatchStore {
SignedBatchInfo::new(batch_info, &self.validator_signer)
}

fn persist_inner(&self, persist_request: PersistedValue) -> Option<SignedBatchInfo<BatchInfo>> {
fn persist_inner<T: TBatchInfo>(
&self,
batch_info: T,
persist_request: PersistedValue,
) -> Option<SignedBatchInfo<T>> {
assert!(
batch_info.as_batch_info() == persist_request.batch_info(),
"Provided batch info doesn't match persist request batch info"
);
match self.save(&persist_request) {
Ok(needs_db) => {
let batch_info = persist_request.batch_info().clone();
trace!("QS: sign digest {}", persist_request.digest());
if needs_db {
#[allow(clippy::unwrap_in_result)]
Expand All @@ -405,7 +414,6 @@ impl BatchStore {
}
self.generate_signed_batch_info(batch_info).ok()
},

Err(e) => {
debug!("QS: failed to store to cache {:?}", e);
None
Expand Down Expand Up @@ -486,7 +494,37 @@ impl BatchWriter for BatchStore {
fn persist(&self, persist_requests: Vec<PersistedValue>) -> Vec<SignedBatchInfo<BatchInfo>> {
let mut signed_infos = vec![];
for persist_request in persist_requests.into_iter() {
if let Some(signed_info) = self.persist_inner(persist_request.clone()) {
let batch_info = persist_request.batch_info().clone();
if let Some(signed_info) = self.persist_inner(batch_info, persist_request.clone()) {
self.notify_subscribers(persist_request);
signed_infos.push(signed_info);
}
}
signed_infos
}

fn persist_v2(
&self,
persist_requests: Vec<PersistedValue>,
) -> Vec<SignedBatchInfo<BatchInfoExt>> {
let mut signed_infos = vec![];
for persist_request in persist_requests.into_iter() {
let is_encrypted_batch = persist_request
.payload()
.as_ref()
.expect("Payload must be available for persistence")
.iter()
.any(|txn| txn.is_encrypted_txn());
let batch_kind = if is_encrypted_batch {
BatchKind::Encrypted
} else {
BatchKind::Normal
};
let batch_info = BatchInfoExt::V2 {
info: persist_request.batch_info().clone(),
extra: ExtraBatchInfo { batch_kind },
};
if let Some(signed_info) = self.persist_inner(batch_info, persist_request.clone()) {
self.notify_subscribers(persist_request);
signed_infos.push(signed_info);
}
Expand Down Expand Up @@ -611,4 +649,9 @@ impl<T: QuorumStoreSender + Clone + Send + Sync + 'static> BatchReader for Batch

pub trait BatchWriter: Send + Sync {
fn persist(&self, persist_requests: Vec<PersistedValue>) -> Vec<SignedBatchInfo<BatchInfo>>;

fn persist_v2(
&self,
persist_requests: Vec<PersistedValue>,
) -> Vec<SignedBatchInfo<BatchInfoExt>>;
}
54 changes: 34 additions & 20 deletions consensus/src/quorum_store/proof_coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ pub(crate) struct ProofCoordinator {
proof_cache: ProofCache,
broadcast_proofs: bool,
batch_expiry_gap_when_init_usecs: u64,
enable_proof_v2_msg: bool,
}

//PoQS builder object - gather signed digest to form PoQS
Expand All @@ -180,6 +181,7 @@ impl ProofCoordinator {
proof_cache: ProofCache,
broadcast_proofs: bool,
batch_expiry_gap_when_init_usecs: u64,
enable_proof_v2_msg: bool,
) -> Self {
Self {
peer_id,
Expand All @@ -192,6 +194,7 @@ impl ProofCoordinator {
proof_cache,
broadcast_proofs,
batch_expiry_gap_when_init_usecs,
enable_proof_v2_msg,
}
}

Expand Down Expand Up @@ -371,45 +374,56 @@ impl ProofCoordinator {
let approx_created_ts_usecs = signed_batch_info
.expiration()
.saturating_sub(self.batch_expiry_gap_when_init_usecs);
let self_peer_id = self.peer_id;
let enable_broadcast_proofs = self.broadcast_proofs;
let enable_proof_v2_msg = self.enable_proof_v2_msg;

let mut proofs = vec![];
for signed_batch_info in signed_batch_infos.into_iter() {
let mut proofs_iter = signed_batch_infos.into_iter().filter_map(|signed_batch_info| {
let peer_id = signed_batch_info.signer();
let digest = *signed_batch_info.digest();
let batch_id = signed_batch_info.batch_id();
match self.add_signature(signed_batch_info, &validator_verifier) {
Ok(result) => {
if let Some(proof) = result {
debug!(
LogSchema::new(LogEvent::ProofOfStoreReady),
digest = digest,
batch_id = batch_id.id,
);
let (info, sig) = proof.unpack();
proofs.push(ProofOfStore::new(info.info().clone(), sig));
}
Ok(Some(proof)) => {
debug!(
LogSchema::new(LogEvent::ProofOfStoreReady),
digest = digest,
batch_id = batch_id.id,
);
Some(proof)
},
Ok(None) => None,
Err(e) => {
// Can happen if we already garbage collected, the commit notification is late, or the peer is misbehaving.
if peer_id == self.peer_id {
info!("QS: could not add signature from self, digest = {}, batch_id = {}, err = {:?}", digest, batch_id, e);
} else {
debug!("QS: could not add signature from peer {}, digest = {}, batch_id = {}, err = {:?}", peer_id, digest, batch_id, e);
}
None
},
}
}
if let Some(value) = self.batch_info_to_proof.get_mut(&info) {
value.observe_voting_pct(approx_created_ts_usecs, &validator_verifier);
}
if !proofs.is_empty() {
observe_batch(approx_created_ts_usecs, self.peer_id, BatchStage::POS_FORMED);
if self.broadcast_proofs {
network_sender.broadcast_proof_of_store_msg(proofs).await;
}).peekable();
if proofs_iter.peek().is_some() {
observe_batch(approx_created_ts_usecs, self_peer_id, BatchStage::POS_FORMED);
if enable_broadcast_proofs {
if enable_proof_v2_msg {
let proofs: Vec<_> = proofs_iter.collect();
network_sender.broadcast_proof_of_store_msg_v2(proofs).await;
} else {
let proofs: Vec<_> = proofs_iter.map(|proof| {
let (info, sig) = proof.unpack();
ProofOfStore::new(info.info().clone(), sig)
}).collect();
network_sender.broadcast_proof_of_store_msg(proofs).await;
}
} else {
let proofs: Vec<_> = proofs_iter.collect();
network_sender.send_proof_of_store_msg_to_self(proofs).await;
}
}
if let Some(value) = self.batch_info_to_proof.get_mut(&info) {
value.observe_voting_pct(approx_created_ts_usecs, &validator_verifier);
}
},
}
}),
Expand Down
Loading
Loading