diff --git a/consensus/src/network.rs b/consensus/src/network.rs index 123cd6d550fca..278ba5b4f927b 100644 --- a/consensus/src/network.rs +++ b/consensus/src/network.rs @@ -200,7 +200,7 @@ pub trait QuorumStoreSender: Send + Clone { request: BatchRequest, recipient: Author, timeout: Duration, - ) -> anyhow::Result; + ) -> anyhow::Result>; async fn send_signed_batch_info_msg( &self, @@ -214,7 +214,7 @@ pub trait QuorumStoreSender: Send + Clone { recipients: Vec, ); - async fn broadcast_batch_msg(&mut self, batches: Vec); + async fn broadcast_batch_msg(&mut self, batches: Vec>); async fn broadcast_proof_of_store_msg(&mut self, proof_of_stores: Vec>); @@ -548,7 +548,7 @@ impl QuorumStoreSender for NetworkSender { request: BatchRequest, recipient: Author, timeout: Duration, - ) -> anyhow::Result { + ) -> anyhow::Result> { fail_point!("consensus::send::request_batch", |_| Err(anyhow!("failed"))); let request_digest = request.digest(); let msg = ConsensusMsg::BatchRequestMsg(Box::new(request)); @@ -593,7 +593,7 @@ impl QuorumStoreSender for NetworkSender { self.send(msg, recipients).await } - async fn broadcast_batch_msg(&mut self, batches: Vec) { + async fn broadcast_batch_msg(&mut self, batches: Vec>) { fail_point!("consensus::send::broadcast_batch", |_| ()); let msg = ConsensusMsg::BatchMsg(Box::new(BatchMsg::new(batches))); self.broadcast(msg).await diff --git a/consensus/src/network_interface.rs b/consensus/src/network_interface.rs index b0fa3a3df3481..b92704c1baff1 100644 --- a/consensus/src/network_interface.rs +++ b/consensus/src/network_interface.rs @@ -64,11 +64,11 @@ pub enum ConsensusMsg { /// it can save slow machines to quickly confirm the execution result. CommitDecisionMsg(Box), /// Quorum Store: Send a Batch of transactions. - BatchMsg(Box), + BatchMsg(Box>), /// Quorum Store: Request the payloads of a completed batch. BatchRequestMsg(Box), /// Quorum Store: Response to the batch request. - BatchResponse(Box), + BatchResponse(Box>), /// Quorum Store: Send a signed batch digest. This is a vote for the batch and a promise that /// the batch of transactions was received and will be persisted until batch expiration. SignedBatchInfo(Box>), @@ -81,7 +81,7 @@ pub enum ConsensusMsg { /// Randomness generation message RandGenMessage(RandGenMessage), /// Quorum Store: Response to the batch request. - BatchResponseV2(Box), + BatchResponseV2(Box>), /// OrderVoteMsg is the struct that is broadcasted by a validator on receiving quorum certificate /// on a block. OrderVoteMsg(Box), diff --git a/consensus/src/quorum_store/batch_coordinator.rs b/consensus/src/quorum_store/batch_coordinator.rs index 50cc928ffcb3a..52eb2bdba65b2 100644 --- a/consensus/src/quorum_store/batch_coordinator.rs +++ b/consensus/src/quorum_store/batch_coordinator.rs @@ -15,7 +15,7 @@ use crate::{ }; use anyhow::ensure; use aptos_config::config::BatchTransactionFilterConfig; -use aptos_consensus_types::payload::TDataInfo; +use aptos_consensus_types::{payload::TDataInfo, proof_of_store::BatchInfo}; use aptos_logger::prelude::*; use aptos_short_hex_str::AsShortHexStr; use aptos_types::PeerId; @@ -28,7 +28,7 @@ use tokio::sync::{ #[derive(Debug)] pub enum BatchCoordinatorCommand { Shutdown(oneshot::Sender<()>), - NewBatches(PeerId, Vec), + NewBatches(PeerId, Vec>), } /// The `BatchCoordinator` is responsible for coordinating the receipt and persistence of batches. @@ -80,7 +80,7 @@ impl BatchCoordinator { fn persist_and_send_digests( &self, - persist_requests: Vec, + persist_requests: Vec>, approx_created_ts_usecs: u64, ) { if persist_requests.is_empty() { @@ -130,7 +130,7 @@ impl BatchCoordinator { }); } - fn ensure_max_limits(&self, batches: &[Batch]) -> anyhow::Result<()> { + fn ensure_max_limits(&self, batches: &[Batch]) -> anyhow::Result<()> { let mut total_txns = 0; let mut total_bytes = 0; for batch in batches.iter() { @@ -166,7 +166,11 @@ impl BatchCoordinator { Ok(()) } - pub(crate) async fn handle_batches_msg(&mut self, author: PeerId, batches: Vec) { + pub(crate) async fn handle_batches_msg( + &mut self, + author: PeerId, + batches: Vec>, + ) { if let Err(e) = self.ensure_max_limits(&batches) { error!("Batch from {}: {}", author, e); counters::RECEIVED_BATCH_MAX_LIMIT_FAILED.inc(); diff --git a/consensus/src/quorum_store/batch_generator.rs b/consensus/src/quorum_store/batch_generator.rs index 4d3765f755049..2eae2a6b15cb7 100644 --- a/consensus/src/quorum_store/batch_generator.rs +++ b/consensus/src/quorum_store/batch_generator.rs @@ -14,7 +14,7 @@ use crate::{ use aptos_config::config::QuorumStoreConfig; use aptos_consensus_types::{ common::{TransactionInProgress, TransactionSummary}, - proof_of_store::{BatchInfoExt, TBatchInfo}, + proof_of_store::{BatchInfo, BatchInfoExt, TBatchInfo}, }; use aptos_experimental_runtimes::thread_manager::optimal_min_len; use aptos_logger::prelude::*; @@ -33,7 +33,7 @@ use tokio::time::Interval; pub enum BatchGeneratorCommand { CommitNotification(u64, Vec), ProofExpiration(Vec), - RemoteBatch(Batch), + RemoteBatch(Batch), Shutdown(tokio::sync::oneshot::Sender<()>), } @@ -175,7 +175,7 @@ impl BatchGenerator { txns: Vec, expiry_time: u64, bucket_start: u64, - ) -> Batch { + ) -> Batch { let batch_id = self.batch_id; self.batch_id.increment(); self.db @@ -201,7 +201,7 @@ impl BatchGenerator { /// batches are pushed. fn push_bucket_to_batches( &mut self, - batches: &mut Vec, + batches: &mut Vec>, txns: &mut Vec, num_txns_in_bucket: usize, expiry_time: u64, @@ -242,7 +242,7 @@ impl BatchGenerator { &mut self, pulled_txns: &mut Vec, expiry_time: u64, - ) -> Vec { + ) -> Vec> { // Sort by gas, in descending order. This is a stable sort on existing mempool ordering, // so will not reorder accounts or their sequence numbers as long as they have the same gas. pulled_txns.sort_by_key(|txn| u64::MAX - txn.gas_unit_price()); @@ -325,7 +325,7 @@ impl BatchGenerator { self.txns_in_progress_sorted.len() } - pub(crate) async fn handle_scheduled_pull(&mut self, max_count: u64) -> Vec { + pub(crate) async fn handle_scheduled_pull(&mut self, max_count: u64) -> Vec> { counters::BATCH_PULL_EXCLUDED_TXNS.observe(self.txns_in_progress_sorted.len() as f64); trace!( "QS: excluding txs len: {:?}", diff --git a/consensus/src/quorum_store/batch_requester.rs b/consensus/src/quorum_store/batch_requester.rs index f3391cb541a68..c943fd292b89c 100644 --- a/consensus/src/quorum_store/batch_requester.rs +++ b/consensus/src/quorum_store/batch_requester.rs @@ -9,6 +9,7 @@ use crate::{ types::{BatchRequest, BatchResponse, PersistedValue}, }, }; +use aptos_consensus_types::proof_of_store::BatchInfo; use aptos_crypto::HashValue; use aptos_executor_types::*; use aptos_infallible::Mutex; @@ -102,7 +103,7 @@ impl BatchRequester { digest: HashValue, expiration: u64, responders: Arc>>, - mut subscriber_rx: oneshot::Receiver, + mut subscriber_rx: oneshot::Receiver>, ) -> ExecutorResult> { let validator_verifier = self.validator_verifier.clone(); let mut request_state = BatchRequesterState::new(responders, self.retry_limit); diff --git a/consensus/src/quorum_store/batch_store.rs b/consensus/src/quorum_store/batch_store.rs index b90a6c03a6f69..2c978a2b192e6 100644 --- a/consensus/src/quorum_store/batch_store.rs +++ b/consensus/src/quorum_store/batch_store.rs @@ -116,7 +116,7 @@ impl QuotaManager { pub struct BatchStore { epoch: OnceCell, last_certified_time: AtomicU64, - db_cache: DashMap, + db_cache: DashMap>, peer_quota: DashMap, expirations: Mutex>, db: Arc, @@ -124,7 +124,7 @@ pub struct BatchStore { db_quota: usize, batch_quota: usize, validator_signer: ValidatorSigner, - persist_subscribers: DashMap>>, + persist_subscribers: DashMap>>>, expiration_buffer_usecs: u64, } @@ -252,7 +252,7 @@ impl BatchStore { *self.epoch.get().expect("Epoch should always be set") } - fn free_quota(&self, value: PersistedValue) { + fn free_quota(&self, value: PersistedValue) { let mut quota_manager = self .peer_quota .get_mut(&value.author()) @@ -268,7 +268,10 @@ impl BatchStore { // Note: holds db_cache entry lock (due to DashMap), while accessing peer_quota // DashMap. Hence, peer_quota reference should never be held while accessing the // db_cache to avoid the deadlock (if needed, order is db_cache, then peer_quota). - pub(crate) fn insert_to_cache(&self, value: &PersistedValue) -> anyhow::Result { + pub(crate) fn insert_to_cache( + &self, + value: &PersistedValue, + ) -> anyhow::Result { let digest = *value.digest(); let author = value.author(); let expiration_time = value.expiration(); @@ -326,7 +329,7 @@ impl BatchStore { Ok(true) } - pub(crate) fn save(&self, value: &PersistedValue) -> anyhow::Result { + pub(crate) fn save(&self, value: &PersistedValue) -> anyhow::Result { let last_certified_time = self.last_certified_time(); if value.expiration() > last_certified_time { fail_point!("quorum_store::save", |_| { @@ -398,7 +401,7 @@ impl BatchStore { fn persist_inner( &self, batch_info: T, - persist_request: PersistedValue, + persist_request: PersistedValue, ) -> Option> { assert!( batch_info.as_batch_info() == persist_request.batch_info(), @@ -437,7 +440,7 @@ impl BatchStore { self.last_certified_time.load(Ordering::Relaxed) } - fn get_batch_from_db(&self, digest: &HashValue) -> ExecutorResult { + fn get_batch_from_db(&self, digest: &HashValue) -> ExecutorResult> { counters::GET_BATCH_FROM_DB_COUNT.inc(); match self.db.get_batch(digest) { @@ -452,7 +455,7 @@ impl BatchStore { pub(crate) fn get_batch_from_local( &self, digest: &HashValue, - ) -> ExecutorResult { + ) -> ExecutorResult> { if let Some(value) = self.db_cache.get(digest) { if value.payload_storage_mode() == StorageMode::PersistedOnly { self.get_batch_from_db(digest) @@ -469,7 +472,7 @@ impl BatchStore { /// This can be useful in cases where there are multiple flows to add a batch (like /// direct from author batch / batch requester fetch) to the batch store and either /// flow needs to subscribe to the other. - fn subscribe(&self, digest: HashValue) -> oneshot::Receiver { + fn subscribe(&self, digest: HashValue) -> oneshot::Receiver> { let (tx, rx) = oneshot::channel(); self.persist_subscribers.entry(digest).or_default().push(tx); @@ -482,7 +485,7 @@ impl BatchStore { rx } - fn notify_subscribers(&self, value: PersistedValue) { + fn notify_subscribers(&self, value: PersistedValue) { if let Some((_, subscribers)) = self.persist_subscribers.remove(value.digest()) { for subscriber in subscribers { subscriber.send(value.clone()).ok(); @@ -492,7 +495,10 @@ impl BatchStore { } impl BatchWriter for BatchStore { - fn persist(&self, persist_requests: Vec) -> Vec> { + fn persist( + &self, + persist_requests: Vec>, + ) -> Vec> { let mut signed_infos = vec![]; for persist_request in persist_requests.into_iter() { let batch_info = persist_request.batch_info().clone(); @@ -506,7 +512,7 @@ impl BatchWriter for BatchStore { fn persist_v2( &self, - persist_requests: Vec, + persist_requests: Vec>, ) -> Vec> { let mut signed_infos = vec![]; for persist_request in persist_requests.into_iter() { @@ -649,10 +655,13 @@ impl BatchReader for Batch } pub trait BatchWriter: Send + Sync { - fn persist(&self, persist_requests: Vec) -> Vec>; + fn persist( + &self, + persist_requests: Vec>, + ) -> Vec>; fn persist_v2( &self, - persist_requests: Vec, + persist_requests: Vec>, ) -> Vec>; } diff --git a/consensus/src/quorum_store/quorum_store_builder.rs b/consensus/src/quorum_store/quorum_store_builder.rs index dcb654fd4620f..b0b895388a133 100644 --- a/consensus/src/quorum_store/quorum_store_builder.rs +++ b/consensus/src/quorum_store/quorum_store_builder.rs @@ -410,7 +410,7 @@ impl InnerBuilder { let response = if let Ok(value) = batch_store.get_batch_from_local(&rpc_request.req.digest()) { - let batch: Batch = value.try_into().unwrap(); + let batch: Batch = value.try_into().unwrap(); BatchResponse::Batch(batch) } else { match aptos_db_clone.get_latest_ledger_info() { diff --git a/consensus/src/quorum_store/quorum_store_db.rs b/consensus/src/quorum_store/quorum_store_db.rs index 38c81a5c3dee5..8175a9edaf6e1 100644 --- a/consensus/src/quorum_store/quorum_store_db.rs +++ b/consensus/src/quorum_store/quorum_store_db.rs @@ -4,11 +4,12 @@ use crate::{ error::DbError, quorum_store::{ - schema::{BatchIdSchema, BatchSchema, BATCH_CF_NAME, BATCH_ID_CF_NAME}, + schema::{BatchIdSchema, BatchSchema, BatchV2Schema, BATCH_CF_NAME, BATCH_ID_CF_NAME}, types::PersistedValue, }, }; use anyhow::Result; +use aptos_consensus_types::proof_of_store::{BatchInfo, BatchInfoExt, TBatchInfo}; use aptos_crypto::HashValue; use aptos_logger::prelude::*; use aptos_schemadb::{ @@ -22,11 +23,20 @@ use std::{collections::HashMap, path::Path, time::Instant}; pub trait QuorumStoreStorage: Sync + Send { fn delete_batches(&self, digests: Vec) -> Result<(), DbError>; - fn get_all_batches(&self) -> Result>; + fn get_all_batches(&self) -> Result>>; - fn save_batch(&self, batch: PersistedValue) -> Result<(), DbError>; + fn save_batch(&self, batch: PersistedValue) -> Result<(), DbError>; - fn get_batch(&self, digest: &HashValue) -> Result, DbError>; + fn get_batch(&self, digest: &HashValue) -> Result>, DbError>; + + fn get_all_batches_v2(&self) -> Result>>; + + fn save_batch_v2(&self, batch: PersistedValue) -> Result<(), DbError>; + + fn get_batch_v2( + &self, + digest: &HashValue, + ) -> Result>, DbError>; fn delete_batch_id(&self, epoch: u64) -> Result<(), DbError>; @@ -85,14 +95,14 @@ impl QuorumStoreStorage for QuorumStoreDB { Ok(()) } - fn get_all_batches(&self) -> Result> { + fn get_all_batches(&self) -> Result>> { let mut iter = self.db.iter::()?; iter.seek_to_first(); iter.map(|res| res.map_err(Into::into)) - .collect::>>() + .collect::>>>() } - fn save_batch(&self, batch: PersistedValue) -> Result<(), DbError> { + fn save_batch(&self, batch: PersistedValue) -> Result<(), DbError> { trace!( "QS: db persists digest {} expiration {:?}", batch.digest(), @@ -101,10 +111,33 @@ impl QuorumStoreStorage for QuorumStoreDB { self.put::(batch.digest(), &batch) } - fn get_batch(&self, digest: &HashValue) -> Result, DbError> { + fn get_batch(&self, digest: &HashValue) -> Result>, DbError> { Ok(self.db.get::(digest)?) } + fn get_all_batches_v2(&self) -> Result>> { + let mut iter = self.db.iter::()?; + iter.seek_to_first(); + iter.map(|res| res.map_err(Into::into)) + .collect::>>>() + } + + fn save_batch_v2(&self, batch: PersistedValue) -> Result<(), DbError> { + trace!( + "QS: db persists digest {} expiration {:?}", + batch.digest(), + batch.expiration() + ); + self.put::(batch.digest(), &batch) + } + + fn get_batch_v2( + &self, + digest: &HashValue, + ) -> Result>, DbError> { + Ok(self.db.get::(digest)?) + } + fn delete_batch_id(&self, epoch: u64) -> Result<(), DbError> { let mut batch = SchemaBatch::new(); batch.delete::(&epoch)?; @@ -160,15 +193,15 @@ pub mod mock { Ok(()) } - fn get_all_batches(&self) -> Result> { + fn get_all_batches(&self) -> Result>> { Ok(HashMap::new()) } - fn save_batch(&self, _: PersistedValue) -> Result<(), DbError> { + fn save_batch(&self, _: PersistedValue) -> Result<(), DbError> { Ok(()) } - fn get_batch(&self, _: &HashValue) -> Result, DbError> { + fn get_batch(&self, _: &HashValue) -> Result>, DbError> { Ok(None) } @@ -183,5 +216,20 @@ pub mod mock { fn save_batch_id(&self, _: u64, _: BatchId) -> Result<(), DbError> { Ok(()) } + + fn get_all_batches_v2(&self) -> Result>> { + Ok(HashMap::new()) + } + + fn save_batch_v2(&self, batch: PersistedValue) -> Result<(), DbError> { + Ok(()) + } + + fn get_batch_v2( + &self, + digest: &HashValue, + ) -> Result>, DbError> { + Ok(None) + } } } diff --git a/consensus/src/quorum_store/schema.rs b/consensus/src/quorum_store/schema.rs index 4de503c9cc3c5..583400c8c445c 100644 --- a/consensus/src/quorum_store/schema.rs +++ b/consensus/src/quorum_store/schema.rs @@ -3,6 +3,7 @@ use crate::quorum_store::types::PersistedValue; use anyhow::Result; +use aptos_consensus_types::proof_of_store::{BatchInfo, BatchInfoExt}; use aptos_crypto::HashValue; use aptos_schemadb::{ schema::{KeyCodec, Schema, ValueCodec}, @@ -12,13 +13,14 @@ use aptos_types::quorum_store::BatchId; pub(crate) const BATCH_CF_NAME: ColumnFamilyName = "batch"; pub(crate) const BATCH_ID_CF_NAME: ColumnFamilyName = "batch_ID"; +pub(crate) const BATCH_V2_CF_NAME: ColumnFamilyName = "batch_v2"; #[derive(Debug)] pub(crate) struct BatchSchema; impl Schema for BatchSchema { type Key = HashValue; - type Value = PersistedValue; + type Value = PersistedValue; const COLUMN_FAMILY_NAME: aptos_schemadb::ColumnFamilyName = BATCH_CF_NAME; } @@ -33,7 +35,37 @@ impl KeyCodec for HashValue { } } -impl ValueCodec for PersistedValue { +impl ValueCodec for PersistedValue { + fn encode_value(&self) -> Result> { + Ok(bcs::to_bytes(&self)?) + } + + fn decode_value(data: &[u8]) -> Result { + Ok(bcs::from_bytes(data)?) + } +} + +#[derive(Debug)] +pub(crate) struct BatchV2Schema; + +impl Schema for BatchV2Schema { + type Key = HashValue; + type Value = PersistedValue; + + const COLUMN_FAMILY_NAME: aptos_schemadb::ColumnFamilyName = BATCH_V2_CF_NAME; +} + +impl KeyCodec for HashValue { + fn encode_key(&self) -> Result> { + Ok(self.to_vec()) + } + + fn decode_key(data: &[u8]) -> Result { + Ok(HashValue::from_slice(data)?) + } +} + +impl ValueCodec for PersistedValue { fn encode_value(&self) -> Result> { Ok(bcs::to_bytes(&self)?) } diff --git a/consensus/src/quorum_store/tests/batch_generator_test.rs b/consensus/src/quorum_store/tests/batch_generator_test.rs index 758eba20368b3..7232c2f624f60 100644 --- a/consensus/src/quorum_store/tests/batch_generator_test.rs +++ b/consensus/src/quorum_store/tests/batch_generator_test.rs @@ -35,13 +35,16 @@ impl MockBatchWriter { } impl BatchWriter for MockBatchWriter { - fn persist(&self, _persist_requests: Vec) -> Vec> { + fn persist( + &self, + _persist_requests: Vec>, + ) -> Vec> { vec![] } fn persist_v2( &self, - persist_requests: Vec, + persist_requests: Vec>, ) -> Vec> { vec![] } diff --git a/consensus/src/quorum_store/tests/batch_requester_test.rs b/consensus/src/quorum_store/tests/batch_requester_test.rs index 4441e5eeadb1e..54055539f6700 100644 --- a/consensus/src/quorum_store/tests/batch_requester_test.rs +++ b/consensus/src/quorum_store/tests/batch_requester_test.rs @@ -34,11 +34,11 @@ use tokio::sync::oneshot; #[derive(Clone)] struct MockBatchRequester { - return_value: BatchResponse, + return_value: BatchResponse, } impl MockBatchRequester { - fn new(return_value: BatchResponse) -> Self { + fn new(return_value: BatchResponse) -> Self { Self { return_value } } } @@ -50,7 +50,7 @@ impl QuorumStoreSender for MockBatchRequester { _request: BatchRequest, _recipient: Author, _timeout: Duration, - ) -> anyhow::Result { + ) -> anyhow::Result> { Ok(self.return_value.clone()) } @@ -70,7 +70,7 @@ impl QuorumStoreSender for MockBatchRequester { unimplemented!() } - async fn broadcast_batch_msg(&mut self, _batches: Vec) { + async fn broadcast_batch_msg(&mut self, _batches: Vec>) { unimplemented!() } diff --git a/consensus/src/quorum_store/tests/batch_store_test.rs b/consensus/src/quorum_store/tests/batch_store_test.rs index a2704c2bf80c1..fb213841927f9 100644 --- a/consensus/src/quorum_store/tests/batch_store_test.rs +++ b/consensus/src/quorum_store/tests/batch_store_test.rs @@ -46,7 +46,7 @@ fn request_for_test( round: u64, num_bytes: u64, maybe_payload: Option>, -) -> PersistedValue { +) -> PersistedValue { PersistedValue::new( BatchInfo::new( *TEST_REQUEST_ACCOUNT, // make sure all request come from the same account @@ -96,7 +96,7 @@ async fn test_extend_expiration_vs_save() { let batch_store_clone2 = batch_store.clone(); let digests: Vec = (0..num_experiments).map(|_| HashValue::random()).collect(); - let later_exp_values: Vec = (0..num_experiments) + let later_exp_values: Vec> = (0..num_experiments) .map(|i| { // Pre-insert some of them. if i % 2 == 0 { diff --git a/consensus/src/quorum_store/tests/quorum_store_db_test.rs b/consensus/src/quorum_store/tests/quorum_store_db_test.rs index 9d526499e976b..d23e301ef91bb 100644 --- a/consensus/src/quorum_store/tests/quorum_store_db_test.rs +++ b/consensus/src/quorum_store/tests/quorum_store_db_test.rs @@ -8,6 +8,7 @@ use crate::{ }, test_utils::create_vec_signed_transactions, }; +use aptos_consensus_types::proof_of_store::BatchInfo; use aptos_temppath::TempPath; use aptos_types::{account_address::AccountAddress, quorum_store::BatchId}; use claims::assert_ok; @@ -19,7 +20,7 @@ fn test_db_for_data() { let source = AccountAddress::random(); let signed_txns = create_vec_signed_transactions(100); - let persist_request_1: PersistedValue = + let persist_request_1: PersistedValue = Batch::new(BatchId::new_for_test(1), signed_txns, 1, 20, source, 0).into(); let clone_1 = persist_request_1.clone(); assert!(db.save_batch(clone_1).is_ok()); @@ -32,13 +33,13 @@ fn test_db_for_data() { ); let signed_txns = create_vec_signed_transactions(200); - let persist_request_2: PersistedValue = + let persist_request_2: PersistedValue = Batch::new(BatchId::new_for_test(1), signed_txns, 1, 20, source, 0).into(); let clone_2 = persist_request_2.clone(); assert_ok!(db.save_batch(clone_2)); let signed_txns = create_vec_signed_transactions(300); - let persist_request_3: PersistedValue = + let persist_request_3: PersistedValue = Batch::new(BatchId::new_for_test(1), signed_txns, 1, 20, source, 0).into(); let clone_3 = persist_request_3.clone(); assert_ok!(db.save_batch(clone_3)); diff --git a/consensus/src/quorum_store/types.rs b/consensus/src/quorum_store/types.rs index 3254305b6d8f5..579975373b842 100644 --- a/consensus/src/quorum_store/types.rs +++ b/consensus/src/quorum_store/types.rs @@ -4,7 +4,7 @@ use anyhow::ensure; use aptos_consensus_types::{ common::{BatchPayload, TxnSummaryWithExpiration}, - proof_of_store::BatchInfo, + proof_of_store::{BatchInfo, TBatchInfo}, }; use aptos_crypto::{hash::CryptoHash, HashValue}; use aptos_types::{ @@ -18,8 +18,8 @@ use std::{ }; #[derive(Clone, Eq, Deserialize, Serialize, PartialEq, Debug)] -pub struct PersistedValue { - info: BatchInfo, +pub struct PersistedValue { + info: T, maybe_payload: Option>, } @@ -29,8 +29,8 @@ pub(crate) enum StorageMode { MemoryAndPersisted, } -impl PersistedValue { - pub(crate) fn new(info: BatchInfo, maybe_payload: Option>) -> Self { +impl PersistedValue { + pub(crate) fn new(info: T, maybe_payload: Option>) -> Self { Self { info, maybe_payload, @@ -53,7 +53,7 @@ impl PersistedValue { self.maybe_payload = None; } - pub fn batch_info(&self) -> &BatchInfo { + pub fn batch_info(&self) -> &T { &self.info } @@ -78,23 +78,23 @@ impl PersistedValue { vec![] } - pub fn unpack(self) -> (BatchInfo, Option>) { + pub fn unpack(self) -> (T, Option>) { (self.info, self.maybe_payload) } } -impl Deref for PersistedValue { - type Target = BatchInfo; +impl Deref for PersistedValue { + type Target = T; fn deref(&self) -> &Self::Target { &self.info } } -impl TryFrom for Batch { +impl TryFrom> for Batch { type Error = anyhow::Error; - fn try_from(value: PersistedValue) -> Result { + fn try_from(value: PersistedValue) -> Result { let author = value.author(); Ok(Batch { batch_info: value.info, @@ -125,12 +125,12 @@ mod tests { } #[derive(Clone, Debug, Deserialize, Serialize)] -pub struct Batch { - batch_info: BatchInfo, +pub struct Batch { + batch_info: T, payload: BatchPayload, } -impl Batch { +impl Batch { pub fn new( batch_id: BatchId, payload: Vec, @@ -150,6 +150,12 @@ impl Batch { payload.num_bytes() as u64, gas_bucket_start, ); + Self::new_generic(batch_info, payload) + } +} + +impl Batch { + pub fn new_generic(batch_info: T, payload: BatchPayload) -> Self { Self { batch_info, payload, @@ -204,13 +210,13 @@ impl Batch { self.payload.txns() } - pub fn batch_info(&self) -> &BatchInfo { + pub fn batch_info(&self) -> &T { &self.batch_info } } -impl Deref for Batch { - type Target = BatchInfo; +impl Deref for Batch { + type Target = T; fn deref(&self) -> &Self::Target { &self.batch_info @@ -268,8 +274,8 @@ impl BatchRequest { } } -impl From for PersistedValue { - fn from(value: Batch) -> Self { +impl From> for PersistedValue { + fn from(value: Batch) -> Self { let Batch { batch_info, payload, @@ -279,18 +285,18 @@ impl From for PersistedValue { } #[derive(Clone, Debug, Deserialize, Serialize)] -pub enum BatchResponse { - Batch(Batch), +pub enum BatchResponse { + Batch(Batch), NotFound(LedgerInfoWithSignatures), } #[derive(Clone, Debug, Deserialize, Serialize)] -pub struct BatchMsg { - batches: Vec, +pub struct BatchMsg { + batches: Vec>, } -impl BatchMsg { - pub fn new(batches: Vec) -> Self { +impl BatchMsg { + pub fn new(batches: Vec>) -> Self { Self { batches } } @@ -342,7 +348,7 @@ impl BatchMsg { self.batches.first().map(|batch| batch.author()) } - pub fn take(self) -> Vec { + pub fn take(self) -> Vec> { self.batches } } diff --git a/consensus/src/round_manager.rs b/consensus/src/round_manager.rs index 4e3647142db96..4181324f7114a 100644 --- a/consensus/src/round_manager.rs +++ b/consensus/src/round_manager.rs @@ -45,7 +45,7 @@ use aptos_consensus_types::{ order_vote::OrderVote, order_vote_msg::OrderVoteMsg, pipelined_block::PipelinedBlock, - proof_of_store::{BatchInfoExt, ProofCache, ProofOfStoreMsg, SignedBatchInfoMsg}, + proof_of_store::{BatchInfo, BatchInfoExt, ProofCache, ProofOfStoreMsg, SignedBatchInfoMsg}, proposal_msg::ProposalMsg, quorum_cert::QuorumCert, round_timeout::{RoundTimeout, RoundTimeoutMsg, RoundTimeoutReason}, @@ -94,7 +94,7 @@ pub enum UnverifiedEvent { RoundTimeoutMsg(Box), OrderVoteMsg(Box), SyncInfo(Box), - BatchMsg(Box), + BatchMsg(Box>), SignedBatchInfo(Box>), ProofOfStoreMsg(Box>), OptProposalMsg(Box), @@ -243,7 +243,7 @@ pub enum VerifiedEvent { RoundTimeoutMsg(Box), OrderVoteMsg(Box), UnverifiedSyncInfo(Box), - BatchMsg(Box), + BatchMsg(Box>), SignedBatchInfo(Box>), ProofOfStoreMsg(Box>), // local messages diff --git a/consensus/src/test_utils/mock_quorum_store_sender.rs b/consensus/src/test_utils/mock_quorum_store_sender.rs index f33ef98ed688a..99c92134e824f 100644 --- a/consensus/src/test_utils/mock_quorum_store_sender.rs +++ b/consensus/src/test_utils/mock_quorum_store_sender.rs @@ -33,7 +33,7 @@ impl QuorumStoreSender for MockQuorumStoreSender { _request: BatchRequest, _recipient: Author, _timeout: Duration, - ) -> anyhow::Result { + ) -> anyhow::Result> { unimplemented!(); } @@ -69,7 +69,7 @@ impl QuorumStoreSender for MockQuorumStoreSender { .expect("could not send"); } - async fn broadcast_batch_msg(&mut self, _batches: Vec) { + async fn broadcast_batch_msg(&mut self, _batches: Vec>) { unimplemented!() } diff --git a/consensus/src/util/db_tool.rs b/consensus/src/util/db_tool.rs index 36c834e209194..66b7a50bf24e0 100644 --- a/consensus/src/util/db_tool.rs +++ b/consensus/src/util/db_tool.rs @@ -11,7 +11,7 @@ use crate::{ }, }; use anyhow::{bail, Result}; -use aptos_consensus_types::{block::Block, common::Payload}; +use aptos_consensus_types::{block::Block, common::Payload, proof_of_store::BatchInfo}; use aptos_crypto::HashValue; use aptos_types::transaction::{SignedTransaction, Transaction}; use clap::Parser; @@ -63,7 +63,7 @@ impl Command { fn extract_txns_from_quorum_store( digests: impl Iterator, - all_batches: &HashMap, + all_batches: &HashMap>, ) -> anyhow::Result> { let mut block_txns = Vec::new(); for digest in digests { @@ -82,7 +82,7 @@ fn extract_txns_from_quorum_store( pub fn extract_txns_from_block<'a>( block: &'a Block, - all_batches: &'a HashMap, + all_batches: &'a HashMap>, ) -> anyhow::Result> { match block.payload().as_ref() { Some(payload) => match payload {