diff --git a/consensus/consensus-types/src/block.rs b/consensus/consensus-types/src/block.rs index c830d4e0a9478..d45bfd6b9a15c 100644 --- a/consensus/consensus-types/src/block.rs +++ b/consensus/consensus-types/src/block.rs @@ -6,6 +6,7 @@ use crate::{ block_data::{BlockData, BlockType}, common::{Author, Payload, Round}, opt_block_data::OptBlockData, + payload::{OptQuorumStorePayload, TDataInfo}, quorum_cert::QuorumCert, }; use anyhow::{bail, ensure, format_err, Result}; @@ -143,11 +144,19 @@ impl Block { proof_with_data.num_txns(), proof_with_data.num_bytes(), ), - Payload::OptQuorumStore(opt_quorum_store_payload) => ( - opt_quorum_store_payload.proof_with_data().num_proofs(), - opt_quorum_store_payload.proof_with_data().num_txns(), - opt_quorum_store_payload.proof_with_data().num_bytes(), - ), + Payload::OptQuorumStore(opt_quorum_store_payload) => match opt_quorum_store_payload + { + OptQuorumStorePayload::V1(p) => ( + p.proof_with_data().num_proofs(), + p.proof_with_data().num_txns(), + p.proof_with_data().num_bytes(), + ), + OptQuorumStorePayload::V2(p) => ( + p.proof_with_data().num_proofs(), + p.proof_with_data().num_txns(), + p.proof_with_data().num_bytes(), + ), + }, }, } } @@ -169,11 +178,19 @@ impl Block { .map(|(b, _)| b.num_bytes() as usize) .sum(), ), - Payload::OptQuorumStore(opt_quorum_store_payload) => ( - opt_quorum_store_payload.inline_batches().num_batches(), - opt_quorum_store_payload.inline_batches().num_txns(), - opt_quorum_store_payload.inline_batches().num_bytes(), - ), + Payload::OptQuorumStore(opt_quorum_store_payload) => match opt_quorum_store_payload + { + OptQuorumStorePayload::V1(p) => ( + p.inline_batches().num_batches(), + p.inline_batches().num_txns(), + p.inline_batches().num_bytes(), + ), + OptQuorumStorePayload::V2(p) => ( + p.inline_batches().num_batches(), + p.inline_batches().num_txns(), + p.inline_batches().num_bytes(), + ), + }, _ => (0, 0, 0), }, } @@ -184,19 +201,19 @@ impl Block { match self.block_data.payload() { None => (0, 0, 0), Some(payload) => match payload { - Payload::OptQuorumStore(opt_quorum_store_payload) => ( - opt_quorum_store_payload.opt_batches().len(), - opt_quorum_store_payload - .opt_batches() - .iter() - .map(|b| b.num_txns() as usize) - .sum(), - opt_quorum_store_payload - .opt_batches() - .iter() - .map(|b| b.num_bytes() as usize) - .sum(), - ), + Payload::OptQuorumStore(opt_quorum_store_payload) => match opt_quorum_store_payload + { + OptQuorumStorePayload::V1(p) => ( + p.opt_batches().len(), + p.opt_batches().iter().map(|b| b.num_txns() as usize).sum(), + p.opt_batches().iter().map(|b| b.num_bytes() as usize).sum(), + ), + OptQuorumStorePayload::V2(p) => ( + p.opt_batches().len(), + p.opt_batches().iter().map(|b| b.num_txns() as usize).sum(), + p.opt_batches().iter().map(|b| b.num_bytes() as usize).sum(), + ), + }, _ => (0, 0, 0), }, } diff --git a/consensus/consensus-types/src/common.rs b/consensus/consensus-types/src/common.rs index f27597edca7f6..4fb442b0ac706 100644 --- a/consensus/consensus-types/src/common.rs +++ b/consensus/consensus-types/src/common.rs @@ -515,11 +515,15 @@ impl Payload { } } - fn verify_with_cache( - proofs: &[ProofOfStore], + fn verify_with_cache( + proofs: &[ProofOfStore], validator: &ValidatorVerifier, proof_cache: &ProofCache, - ) -> anyhow::Result<()> { + ) -> anyhow::Result<()> + where + T: TBatchInfo + Send + Sync + 'static, + BatchInfoExt: From, + { let unverified: Vec<_> = proofs .iter() .filter(|proof| { @@ -535,15 +539,15 @@ impl Payload { Ok(()) } - pub fn verify_inline_batches<'a>( - inline_batches: impl Iterator)>, + pub fn verify_inline_batches<'a, T: TBatchInfo + 'a>( + inline_batches: impl Iterator)>, ) -> anyhow::Result<()> { for (batch, payload) in inline_batches { // TODO: Can cloning be avoided here? let computed_digest = BatchPayload::new(batch.author(), payload.clone()).hash(); ensure!( computed_digest == *batch.digest(), - "Hash of the received inline batch doesn't match the digest value for batch {}: {} != {}", + "Hash of the received inline batch doesn't match the digest value for batch {:?}: {} != {}", batch, computed_digest, batch.digest() @@ -552,9 +556,9 @@ impl Payload { Ok(()) } - pub fn verify_opt_batches( + pub fn verify_opt_batches( verifier: &ValidatorVerifier, - opt_batches: &OptBatches, + opt_batches: &OptBatches, ) -> anyhow::Result<()> { let authors = verifier.address_to_validator_index(); for batch in &opt_batches.batch_summary { @@ -592,16 +596,26 @@ impl Payload { )?; Ok(()) }, - (true, Payload::OptQuorumStore(opt_quorum_store)) => { - let proof_with_data = opt_quorum_store.proof_with_data(); + (true, Payload::OptQuorumStore(OptQuorumStorePayload::V1(p))) => { + let proof_with_data = p.proof_with_data(); + Self::verify_with_cache(&proof_with_data.batch_summary, verifier, proof_cache)?; + Self::verify_inline_batches( + p.inline_batches() + .iter() + .map(|batch| (batch.info(), batch.transactions())), + )?; + Self::verify_opt_batches(verifier, p.opt_batches())?; + Ok(()) + }, + (true, Payload::OptQuorumStore(OptQuorumStorePayload::V2(p))) => { + let proof_with_data = p.proof_with_data(); Self::verify_with_cache(&proof_with_data.batch_summary, verifier, proof_cache)?; Self::verify_inline_batches( - opt_quorum_store - .inline_batches() + p.inline_batches() .iter() .map(|batch| (batch.info(), batch.transactions())), )?; - Self::verify_opt_batches(verifier, opt_quorum_store.opt_batches())?; + Self::verify_opt_batches(verifier, p.opt_batches())?; Ok(()) }, (_, _) => Err(anyhow::anyhow!( @@ -792,17 +806,28 @@ impl From<&Vec<&Payload>> for PayloadFilter { Payload::DirectMempool(_) => { error!("DirectMempool payload in InQuorumStore filter"); }, - Payload::OptQuorumStore(opt_qs_payload) => { - for batch in opt_qs_payload.inline_batches().iter() { + Payload::OptQuorumStore(OptQuorumStorePayload::V1(p)) => { + for batch in p.inline_batches().iter() { exclude_batches.insert(batch.info().clone().into()); } - for batch_info in &opt_qs_payload.opt_batches().batch_summary { + for batch_info in &p.opt_batches().batch_summary { exclude_batches.insert(batch_info.clone().into()); } - for proof in &opt_qs_payload.proof_with_data().batch_summary { + for proof in &p.proof_with_data().batch_summary { exclude_batches.insert(proof.info().clone().into()); } }, + Payload::OptQuorumStore(OptQuorumStorePayload::V2(p)) => { + for batch in p.inline_batches().iter() { + exclude_batches.insert(batch.info().clone()); + } + for batch_info in &p.opt_batches().batch_summary { + exclude_batches.insert(batch_info.clone()); + } + for proof in &p.proof_with_data().batch_summary { + exclude_batches.insert(proof.info().clone()); + } + }, } } PayloadFilter::InQuorumStore(exclude_batches) diff --git a/consensus/consensus-types/src/payload.rs b/consensus/consensus-types/src/payload.rs index 42bba94da20d3..cf2243cd8f6c7 100644 --- a/consensus/consensus-types/src/payload.rs +++ b/consensus/consensus-types/src/payload.rs @@ -1,7 +1,7 @@ // Copyright (c) Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::proof_of_store::{BatchInfo, BatchInfoExt, ProofOfStore}; +use crate::proof_of_store::{BatchInfo, BatchInfoExt, ProofOfStore, TBatchInfo}; use anyhow::ensure; use aptos_types::{transaction::SignedTransaction, PeerId}; use core::fmt; @@ -11,9 +11,9 @@ use std::{ ops::{Deref, DerefMut}, }; -pub type OptBatches = BatchPointer; +pub type OptBatches = BatchPointer; -pub type ProofBatches = BatchPointer>; +pub type ProofBatches = BatchPointer>; pub trait TDataInfo { fn num_txns(&self) -> u64; @@ -94,6 +94,12 @@ impl Deref for BatchPointer { } } +impl DerefMut for BatchPointer { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.batch_summary + } +} + impl IntoIterator for BatchPointer { type IntoIter = std::vec::IntoIter; type Item = T; @@ -188,20 +194,20 @@ impl PayloadExecutionLimit { } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] -pub struct InlineBatch { - batch_info: BatchInfo, +pub struct InlineBatch { + batch_info: T, transactions: Vec, } -impl InlineBatch { - pub fn new(batch_info: BatchInfo, transactions: Vec) -> Self { +impl InlineBatch { + pub fn new(batch_info: T, transactions: Vec) -> Self { Self { batch_info, transactions, } } - pub fn info(&self) -> &BatchInfo { + pub fn info(&self) -> &T { &self.batch_info } @@ -211,9 +217,9 @@ impl InlineBatch { } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] -pub struct InlineBatches(Vec); +pub struct InlineBatches(Vec>); -impl InlineBatches { +impl InlineBatches { pub fn num_batches(&self) -> usize { self.0.len() } @@ -243,7 +249,7 @@ impl InlineBatches { .collect() } - pub fn batch_infos(&self) -> Vec { + pub fn batch_infos(&self) -> Vec { self.0 .iter() .map(|inline_batch| inline_batch.batch_info.clone()) @@ -251,14 +257,14 @@ impl InlineBatches { } } -impl From> for InlineBatches { - fn from(value: Vec) -> Self { +impl From>> for InlineBatches { + fn from(value: Vec>) -> Self { Self(value) } } -impl From)>> for InlineBatches { - fn from(value: Vec<(BatchInfo, Vec)>) -> Self { +impl From)>> for InlineBatches { + fn from(value: Vec<(T, Vec)>) -> Self { value .into_iter() .map(|(batch_info, transactions)| InlineBatch::new(batch_info, transactions)) @@ -267,29 +273,32 @@ impl From)>> for InlineBatches { } } -impl Deref for InlineBatches { - type Target = Vec; +impl Deref for InlineBatches { + type Target = Vec>; fn deref(&self) -> &Self::Target { &self.0 } } -impl DerefMut for InlineBatches { +impl DerefMut for InlineBatches { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 } } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] -pub struct OptQuorumStorePayloadV1 { - inline_batches: InlineBatches, - opt_batches: OptBatches, - proofs: ProofBatches, +pub struct OptQuorumStorePayloadV1 { + inline_batches: InlineBatches, + opt_batches: OptBatches, + proofs: ProofBatches, execution_limits: PayloadExecutionLimit, } -impl OptQuorumStorePayloadV1 { +impl OptQuorumStorePayloadV1 +where + T: TBatchInfo + Send + Sync + 'static + TDataInfo, +{ pub fn get_all_batch_infos(self) -> Vec { let Self { inline_batches, @@ -323,29 +332,93 @@ impl OptQuorumStorePayloadV1 { "OptQS InlineBatch epoch doesn't match given epoch" ); ensure!( - self.opt_batches.iter().all(|b| b.info().epoch() == epoch), + self.opt_batches.iter().all(|b| b.epoch() == epoch), "OptQS OptBatch epoch doesn't match given epoch" ); ensure!( - self.proofs.iter().all(|b| b.info().epoch() == epoch), + self.proofs.iter().all(|b| b.epoch() == epoch), "OptQS Proof epoch doesn't match given epoch" ); Ok(()) } + + fn extend(mut self, other: Self) -> Self { + self.inline_batches.extend(other.inline_batches.0); + self.opt_batches.extend(other.opt_batches); + self.proofs.extend(other.proofs); + self.execution_limits.extend(other.execution_limits); + self + } + + pub fn inline_batches(&self) -> &InlineBatches { + &self.inline_batches + } + + pub fn proof_with_data(&self) -> &BatchPointer> { + &self.proofs + } + + pub fn opt_batches(&self) -> &BatchPointer { + &self.opt_batches + } + + pub fn set_execution_limit(&mut self, execution_limits: PayloadExecutionLimit) { + self.execution_limits = execution_limits; + } + + pub(crate) fn num_txns(&self) -> usize { + self.opt_batches.num_txns() + self.proofs.num_txns() + self.inline_batches.num_txns() + } + + pub(crate) fn is_empty(&self) -> bool { + self.opt_batches.is_empty() && self.proofs.is_empty() && self.inline_batches.is_empty() + } + + pub(crate) fn num_bytes(&self) -> usize { + self.opt_batches.num_bytes() + self.proofs.num_bytes() + self.inline_batches.num_bytes() + } +} + +impl From> for OptQuorumStorePayloadV1 { + fn from(p: OptQuorumStorePayloadV1) -> Self { + OptQuorumStorePayloadV1 { + inline_batches: p + .inline_batches + .0 + .into_iter() + .map(|batch| InlineBatch::new(batch.batch_info.into(), batch.transactions)) + .collect::>() + .into(), + opt_batches: p + .opt_batches + .into_iter() + .map(|batch| batch.into()) + .collect::>() + .into(), + proofs: p + .proofs + .into_iter() + .map(|proof| proof.into()) + .collect::>() + .into(), + execution_limits: p.execution_limits, + } + } } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] pub enum OptQuorumStorePayload { - V1(OptQuorumStorePayloadV1), + V1(OptQuorumStorePayloadV1), + V2(OptQuorumStorePayloadV1), } impl OptQuorumStorePayload { pub fn new( - inline_batches: InlineBatches, - opt_batches: OptBatches, - proofs: ProofBatches, + inline_batches: InlineBatches, + opt_batches: OptBatches, + proofs: ProofBatches, execution_limits: PayloadExecutionLimit, ) -> Self { Self::V1(OptQuorumStorePayloadV1 { @@ -356,64 +429,56 @@ impl OptQuorumStorePayload { }) } - pub(crate) fn num_txns(&self) -> usize { - self.opt_batches.num_txns() + self.proofs.num_txns() + self.inline_batches.num_txns() - } - - pub(crate) fn is_empty(&self) -> bool { - self.opt_batches.is_empty() && self.proofs.is_empty() && self.inline_batches.is_empty() - } - - pub(crate) fn extend(mut self, other: Self) -> Self { - let other: OptQuorumStorePayloadV1 = other.into_inner(); - self.inline_batches.extend(other.inline_batches.0); - self.opt_batches.extend(other.opt_batches); - self.proofs.extend(other.proofs); - self.execution_limits.extend(other.execution_limits); - self - } - - pub(crate) fn num_bytes(&self) -> usize { - self.opt_batches.num_bytes() + self.proofs.num_bytes() + self.inline_batches.num_bytes() + pub(crate) fn extend(self, other: Self) -> Self { + match (self, other) { + (Self::V1(p1), Self::V1(p2)) => Self::V1(p1.extend(p2)), + (Self::V2(p1), Self::V2(p2)) => Self::V2(p1.extend(p2)), + (Self::V1(p1), Self::V2(p2)) => { + Self::V2(OptQuorumStorePayloadV1::::from(p1).extend(p2)) + }, + (Self::V2(p1), Self::V1(p2)) => Self::V2(p1.extend(p2.into())), + } } - pub fn into_inner(self) -> OptQuorumStorePayloadV1 { + pub fn set_execution_limit(&mut self, execution_limits: PayloadExecutionLimit) { match self { - OptQuorumStorePayload::V1(opt_qs_payload) => opt_qs_payload, + OptQuorumStorePayload::V1(p) => p.set_execution_limit(execution_limits), + OptQuorumStorePayload::V2(p) => p.set_execution_limit(execution_limits), } } - pub fn inline_batches(&self) -> &InlineBatches { - &self.inline_batches - } - - pub fn proof_with_data(&self) -> &BatchPointer> { - &self.proofs + pub(crate) fn num_txns(&self) -> usize { + match self { + OptQuorumStorePayload::V1(p) => p.num_txns(), + OptQuorumStorePayload::V2(p) => p.num_txns(), + } } - pub fn opt_batches(&self) -> &BatchPointer { - &self.opt_batches + pub(crate) fn is_empty(&self) -> bool { + match self { + OptQuorumStorePayload::V1(p) => p.is_empty(), + OptQuorumStorePayload::V2(p) => p.is_empty(), + } } - pub fn set_execution_limit(&mut self, execution_limits: PayloadExecutionLimit) { - self.execution_limits = execution_limits; + pub(crate) fn num_bytes(&self) -> usize { + match self { + OptQuorumStorePayload::V1(p) => p.num_bytes(), + OptQuorumStorePayload::V2(p) => p.num_bytes(), + } } -} - -impl Deref for OptQuorumStorePayload { - type Target = OptQuorumStorePayloadV1; - fn deref(&self) -> &Self::Target { + pub(crate) fn max_txns_to_execute(&self) -> Option { match self { - OptQuorumStorePayload::V1(opt_qs_payload) => opt_qs_payload, + OptQuorumStorePayload::V1(p) => p.max_txns_to_execute(), + OptQuorumStorePayload::V2(p) => p.max_txns_to_execute(), } } -} -impl DerefMut for OptQuorumStorePayload { - fn deref_mut(&mut self) -> &mut Self::Target { + pub(crate) fn check_epoch(&self, epoch: u64) -> anyhow::Result<()> { match self { - OptQuorumStorePayload::V1(opt_qs_payload) => opt_qs_payload, + OptQuorumStorePayload::V1(p) => p.check_epoch(epoch), + OptQuorumStorePayload::V2(p) => p.check_epoch(epoch), } } } @@ -423,9 +488,9 @@ impl fmt::Display for OptQuorumStorePayload { write!( f, "OptQuorumStorePayload(opt_batches: {}, proofs: {}, limits: {:?})", - self.opt_batches.num_txns(), - self.proofs.num_txns(), - self.execution_limits, + self.num_txns(), + self.num_txns(), + self.is_empty(), ) } } diff --git a/consensus/consensus-types/src/proof_of_store.rs b/consensus/consensus-types/src/proof_of_store.rs index a2a97b8a7df8f..36a8af0b10c4b 100644 --- a/consensus/consensus-types/src/proof_of_store.rs +++ b/consensus/consensus-types/src/proof_of_store.rs @@ -249,7 +249,25 @@ impl TBatchInfo for BatchInfoExt { } fn size(&self) -> PayloadTxnsSize { - PayloadTxnsSize::new(self.num_txns(), self.num_bytes()) + PayloadTxnsSize::new(self.info().num_txns(), self.info().num_bytes()) + } +} + +impl TDataInfo for BatchInfoExt { + fn num_txns(&self) -> u64 { + self.info().num_txns() + } + + fn num_bytes(&self) -> u64 { + self.info().num_bytes() + } + + fn info(&self) -> &BatchInfo { + self.info() + } + + fn signers(&self, _ordered_authors: &[PeerId]) -> Vec { + vec![self.author()] } } diff --git a/consensus/src/consensus_observer/network/observer_message.rs b/consensus/src/consensus_observer/network/observer_message.rs index 2b8415d34a892..37e5485346aab 100644 --- a/consensus/src/consensus_observer/network/observer_message.rs +++ b/consensus/src/consensus_observer/network/observer_message.rs @@ -2,9 +2,10 @@ // SPDX-License-Identifier: Apache-2.0 use crate::consensus_observer::common::error::Error; +use anyhow::bail; use aptos_consensus_types::{ common::{BatchPayload, Payload}, - payload::InlineBatches, + payload::{InlineBatches, OptQuorumStorePayload}, pipelined_block::PipelinedBlock, proof_of_store::{BatchInfo, ProofCache, ProofOfStore}, }; @@ -696,18 +697,20 @@ impl BlockTransactionPayload { // TODO: verify the block gas limit? }, - Payload::OptQuorumStore(opt_qs_payload) => { + Payload::OptQuorumStore(OptQuorumStorePayload::V1(p)) => { // Verify the batches in the requested block - self.verify_batches(opt_qs_payload.proof_with_data())?; + self.verify_batches(p.proof_with_data())?; // Verify optQS and inline batches - self.verify_optqs_and_inline_batches( - opt_qs_payload.opt_batches(), - opt_qs_payload.inline_batches(), - )?; + self.verify_optqs_and_inline_batches(p.opt_batches(), p.inline_batches())?; // Verify the transaction limit - self.verify_transaction_limit(opt_qs_payload.max_txns_to_execute())?; + self.verify_transaction_limit(p.max_txns_to_execute())?; + }, + Payload::OptQuorumStore(OptQuorumStorePayload::V2(p)) => { + return Err(Error::InvalidMessageError( + "OptQuorumStorePayload V2 is not supproted".into(), + )); }, } @@ -772,7 +775,7 @@ impl BlockTransactionPayload { fn verify_optqs_and_inline_batches( &self, expected_opt_batches: &Vec, - expected_inline_batches: &InlineBatches, + expected_inline_batches: &InlineBatches, ) -> Result<(), Error> { let optqs_and_inline_batches: &Vec = match self { BlockTransactionPayload::OptQuorumStore(_, optqs_and_inline_batches) => { @@ -1247,10 +1250,10 @@ mod test { ); // Create a quorum store payload with a single proof - let inline_batches = InlineBatches::from(Vec::::new()); + let inline_batches = InlineBatches::from(Vec::>::new()); let opt_batches: BatchPointer = Vec::new().into(); let batch_info = create_batch_info(); - let proof_with_data: ProofBatches = + let proof_with_data: ProofBatches = vec![ProofOfStore::new(batch_info, AggregateSignature::empty())].into(); let ordered_payload = Payload::OptQuorumStore(OptQuorumStorePayload::new( inline_batches.clone(), @@ -1266,7 +1269,7 @@ mod test { assert_matches!(error, Error::InvalidMessageError(_)); // Create a quorum store payload with no transaction limit - let proof_with_data: ProofBatches = Vec::new().into(); + let proof_with_data: ProofBatches = Vec::new().into(); let ordered_payload = Payload::OptQuorumStore(OptQuorumStorePayload::new( inline_batches, opt_batches, @@ -1281,7 +1284,7 @@ mod test { assert_matches!(error, Error::InvalidMessageError(_)); // Create a quorum store payload with a single inline batch - let proof_with_data: ProofBatches = Vec::new().into(); + let proof_with_data: ProofBatches = Vec::new().into(); let ordered_payload = Payload::OptQuorumStore(OptQuorumStorePayload::new( vec![(create_batch_info(), vec![])].into(), Vec::new().into(), @@ -1296,9 +1299,9 @@ mod test { assert_matches!(error, Error::InvalidMessageError(_)); // Create a quorum store payload with a single opt batch - let proof_with_data: ProofBatches = Vec::new().into(); + let proof_with_data: ProofBatches = Vec::new().into(); let ordered_payload = Payload::OptQuorumStore(OptQuorumStorePayload::new( - Vec::::new().into(), + Vec::>::new().into(), vec![create_batch_info()].into(), proof_with_data, PayloadExecutionLimit::None, @@ -1313,7 +1316,7 @@ mod test { // Create an empty opt quorum store payload let proof_with_data = Vec::new().into(); let ordered_payload = Payload::OptQuorumStore(OptQuorumStorePayload::new( - Vec::::new().into(), + Vec::>::new().into(), Vec::new().into(), proof_with_data, PayloadExecutionLimit::MaxTransactionsToExecute(100), @@ -1329,8 +1332,8 @@ mod test { create_batch_info(), AggregateSignature::empty(), )]; - let inline_batches: InlineBatches = vec![(create_batch_info(), vec![])].into(); - let opt_batches: OptBatches = vec![create_batch_info()].into(); + let inline_batches: InlineBatches = vec![(create_batch_info(), vec![])].into(); + let opt_batches: OptBatches = vec![create_batch_info()].into(); let opt_and_inline_batches = [opt_batches.deref().clone(), inline_batches.batch_infos()].concat(); diff --git a/consensus/src/payload_manager/quorum_store_payload_manager.rs b/consensus/src/payload_manager/quorum_store_payload_manager.rs index 57dbe71a37fbf..24869f33f06d6 100644 --- a/consensus/src/payload_manager/quorum_store_payload_manager.rs +++ b/consensus/src/payload_manager/quorum_store_payload_manager.rs @@ -15,8 +15,8 @@ use aptos_config::config::BlockTransactionFilterConfig; use aptos_consensus_types::{ block::Block, common::{Author, Payload, ProofWithData}, - payload::{BatchPointer, TDataInfo}, - proof_of_store::{BatchInfo, BatchInfoExt}, + payload::{BatchPointer, OptQuorumStorePayload, OptQuorumStorePayloadV1, TDataInfo}, + proof_of_store::{BatchInfo, BatchInfoExt, TBatchInfo}, }; use aptos_crypto::HashValue; use aptos_executor_types::*; @@ -199,9 +199,8 @@ impl TPayloadManager for QuorumStorePayloadManager { ) .collect::>() }, - Payload::OptQuorumStore(opt_quorum_store_payload) => { - opt_quorum_store_payload.into_inner().get_all_batch_infos() - }, + Payload::OptQuorumStore(OptQuorumStorePayload::V1(p)) => p.get_all_batch_infos(), + Payload::OptQuorumStore(OptQuorumStorePayload::V2(p)) => p.get_all_batch_infos(), }) .collect(); @@ -271,16 +270,32 @@ impl TPayloadManager for QuorumStorePayloadManager { Payload::DirectMempool(_) => { unreachable!() }, - Payload::OptQuorumStore(opt_qs_payload) => { + Payload::OptQuorumStore(OptQuorumStorePayload::V1(p)) => { prefetch_helper( - opt_qs_payload.opt_batches(), + p.opt_batches(), self.batch_reader.clone(), Some(author), timestamp, &self.ordered_authors, ); prefetch_helper( - opt_qs_payload.proof_with_data(), + p.proof_with_data(), + self.batch_reader.clone(), + None, + timestamp, + &self.ordered_authors, + ) + }, + Payload::OptQuorumStore(OptQuorumStorePayload::V2(p)) => { + prefetch_helper( + p.opt_batches(), + self.batch_reader.clone(), + Some(author), + timestamp, + &self.ordered_authors, + ); + prefetch_helper( + p.proof_with_data(), self.batch_reader.clone(), None, timestamp, @@ -391,9 +406,26 @@ impl TPayloadManager for QuorumStorePayloadManager { // or inlined transactions. Ok(()) }, - Payload::OptQuorumStore(opt_qs_payload) => { + Payload::OptQuorumStore(OptQuorumStorePayload::V1(p)) => { let mut missing_authors = BitVec::with_num_bits(self.ordered_authors.len() as u16); - for batch in opt_qs_payload.opt_batches().deref() { + for batch in p.opt_batches().deref() { + if self.batch_reader.exists(batch.digest()).is_none() { + let index = *self + .address_to_validator_index + .get(&batch.author()) + .expect("Payload author should have been verified"); + missing_authors.set(index as u16); + } + } + if missing_authors.all_zeros() { + Ok(()) + } else { + Err(missing_authors) + } + }, + Payload::OptQuorumStore(OptQuorumStorePayload::V2(p)) => { + let mut missing_authors = BitVec::with_num_bits(self.ordered_authors.len() as u16); + for batch in p.opt_batches().deref() { if self.batch_reader.exists(batch.digest()).is_none() { let index = *self .address_to_validator_index @@ -476,7 +508,7 @@ impl TPayloadManager for QuorumStorePayloadManager { ) .await? }, - Payload::OptQuorumStore(opt_qs_payload) => { + Payload::OptQuorumStore(OptQuorumStorePayload::V1(opt_qs_payload)) => { let opt_batch_txns = process_optqs_payload( opt_qs_payload.opt_batches(), self.batch_reader.clone(), @@ -555,7 +587,9 @@ fn get_inline_transactions(block: &Block) -> Vec { .flat_map(|(_batch_info, txns)| txns.clone()) .collect() }, - Payload::OptQuorumStore(opt_qs_payload) => opt_qs_payload.inline_batches().transactions(), + Payload::OptQuorumStore(OptQuorumStorePayload::V1(opt_qs_payload)) => { + opt_qs_payload.inline_batches().transactions() + }, _ => { vec![] // Other payload types do not have inline transactions }, diff --git a/consensus/src/quorum_store/batch_proof_queue.rs b/consensus/src/quorum_store/batch_proof_queue.rs index 0563212ba69b9..d49d9b0dceab5 100644 --- a/consensus/src/quorum_store/batch_proof_queue.rs +++ b/consensus/src/quorum_store/batch_proof_queue.rs @@ -8,7 +8,6 @@ use super::{ use crate::quorum_store::counters; use aptos_consensus_types::{ common::{Author, TxnSummaryWithExpiration}, - payload::TDataInfo, proof_of_store::{BatchInfoExt, ProofOfStore, TBatchInfo}, utils::PayloadTxnsSize, }; diff --git a/consensus/src/util/db_tool.rs b/consensus/src/util/db_tool.rs index 66b7a50bf24e0..229ae5cdcec0a 100644 --- a/consensus/src/util/db_tool.rs +++ b/consensus/src/util/db_tool.rs @@ -11,7 +11,12 @@ use crate::{ }, }; use anyhow::{bail, Result}; -use aptos_consensus_types::{block::Block, common::Payload, proof_of_store::BatchInfo}; +use aptos_consensus_types::{ + block::Block, + common::Payload, + payload::OptQuorumStorePayload, + proof_of_store::{BatchInfo, TBatchInfo}, +}; use aptos_crypto::HashValue; use aptos_types::transaction::{SignedTransaction, Transaction}; use clap::Parser; @@ -113,21 +118,30 @@ pub fn extract_txns_from_block<'a>( } Ok(all_txns) }, - Payload::OptQuorumStore(opt_qs_payload) => { + Payload::OptQuorumStore(OptQuorumStorePayload::V1(p)) => { + let mut all_txns = extract_txns_from_quorum_store( + p.proof_with_data().iter().map(|proof| *proof.digest()), + all_batches, + ) + .unwrap(); + all_txns.extend( + extract_txns_from_quorum_store( + p.opt_batches().iter().map(|info| *info.digest()), + all_batches, + ) + .unwrap(), + ); + Ok(all_txns) + }, + Payload::OptQuorumStore(OptQuorumStorePayload::V2(p)) => { let mut all_txns = extract_txns_from_quorum_store( - opt_qs_payload - .proof_with_data() - .iter() - .map(|proof| *proof.digest()), + p.proof_with_data().iter().map(|proof| *proof.digest()), all_batches, ) .unwrap(); all_txns.extend( extract_txns_from_quorum_store( - opt_qs_payload - .opt_batches() - .iter() - .map(|info| *info.digest()), + p.opt_batches().iter().map(|info| *info.digest()), all_batches, ) .unwrap(),