这是indexloc提供的服务,不要输入任何密码
Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions consensus/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ pub trait QuorumStoreSender: Send + Clone {
request: BatchRequest,
recipient: Author,
timeout: Duration,
) -> anyhow::Result<BatchResponse>;
) -> anyhow::Result<BatchResponse<BatchInfo>>;

async fn send_signed_batch_info_msg(
&self,
Expand All @@ -214,7 +214,7 @@ pub trait QuorumStoreSender: Send + Clone {
recipients: Vec<Author>,
);

async fn broadcast_batch_msg(&mut self, batches: Vec<Batch>);
async fn broadcast_batch_msg(&mut self, batches: Vec<Batch<BatchInfo>>);

async fn broadcast_proof_of_store_msg(&mut self, proof_of_stores: Vec<ProofOfStore<BatchInfo>>);

Expand Down Expand Up @@ -548,7 +548,7 @@ impl QuorumStoreSender for NetworkSender {
request: BatchRequest,
recipient: Author,
timeout: Duration,
) -> anyhow::Result<BatchResponse> {
) -> anyhow::Result<BatchResponse<BatchInfo>> {
fail_point!("consensus::send::request_batch", |_| Err(anyhow!("failed")));
let request_digest = request.digest();
let msg = ConsensusMsg::BatchRequestMsg(Box::new(request));
Expand Down Expand Up @@ -593,7 +593,7 @@ impl QuorumStoreSender for NetworkSender {
self.send(msg, recipients).await
}

async fn broadcast_batch_msg(&mut self, batches: Vec<Batch>) {
async fn broadcast_batch_msg(&mut self, batches: Vec<Batch<BatchInfo>>) {
fail_point!("consensus::send::broadcast_batch", |_| ());
let msg = ConsensusMsg::BatchMsg(Box::new(BatchMsg::new(batches)));
self.broadcast(msg).await
Expand Down
6 changes: 3 additions & 3 deletions consensus/src/network_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ pub enum ConsensusMsg {
/// it can save slow machines to quickly confirm the execution result.
CommitDecisionMsg(Box<CommitDecision>),
/// Quorum Store: Send a Batch of transactions.
BatchMsg(Box<BatchMsg>),
BatchMsg(Box<BatchMsg<BatchInfo>>),
/// Quorum Store: Request the payloads of a completed batch.
BatchRequestMsg(Box<BatchRequest>),
/// Quorum Store: Response to the batch request.
BatchResponse(Box<Batch>),
BatchResponse(Box<Batch<BatchInfo>>),
/// Quorum Store: Send a signed batch digest. This is a vote for the batch and a promise that
/// the batch of transactions was received and will be persisted until batch expiration.
SignedBatchInfo(Box<SignedBatchInfoMsg<BatchInfo>>),
Expand All @@ -81,7 +81,7 @@ pub enum ConsensusMsg {
/// Randomness generation message
RandGenMessage(RandGenMessage),
/// Quorum Store: Response to the batch request.
BatchResponseV2(Box<BatchResponse>),
BatchResponseV2(Box<BatchResponse<BatchInfo>>),
/// OrderVoteMsg is the struct that is broadcasted by a validator on receiving quorum certificate
/// on a block.
OrderVoteMsg(Box<OrderVoteMsg>),
Expand Down
14 changes: 9 additions & 5 deletions consensus/src/quorum_store/batch_coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
};
use anyhow::ensure;
use aptos_config::config::BatchTransactionFilterConfig;
use aptos_consensus_types::payload::TDataInfo;
use aptos_consensus_types::{payload::TDataInfo, proof_of_store::BatchInfo};
use aptos_logger::prelude::*;
use aptos_short_hex_str::AsShortHexStr;
use aptos_types::PeerId;
Expand All @@ -28,7 +28,7 @@ use tokio::sync::{
#[derive(Debug)]
pub enum BatchCoordinatorCommand {
Shutdown(oneshot::Sender<()>),
NewBatches(PeerId, Vec<Batch>),
NewBatches(PeerId, Vec<Batch<BatchInfo>>),
}

/// The `BatchCoordinator` is responsible for coordinating the receipt and persistence of batches.
Expand Down Expand Up @@ -80,7 +80,7 @@ impl BatchCoordinator {

fn persist_and_send_digests(
&self,
persist_requests: Vec<PersistedValue>,
persist_requests: Vec<PersistedValue<BatchInfo>>,
approx_created_ts_usecs: u64,
) {
if persist_requests.is_empty() {
Expand Down Expand Up @@ -130,7 +130,7 @@ impl BatchCoordinator {
});
}

fn ensure_max_limits(&self, batches: &[Batch]) -> anyhow::Result<()> {
fn ensure_max_limits(&self, batches: &[Batch<BatchInfo>]) -> anyhow::Result<()> {
let mut total_txns = 0;
let mut total_bytes = 0;
for batch in batches.iter() {
Expand Down Expand Up @@ -166,7 +166,11 @@ impl BatchCoordinator {
Ok(())
}

pub(crate) async fn handle_batches_msg(&mut self, author: PeerId, batches: Vec<Batch>) {
pub(crate) async fn handle_batches_msg(
&mut self,
author: PeerId,
batches: Vec<Batch<BatchInfo>>,
) {
if let Err(e) = self.ensure_max_limits(&batches) {
error!("Batch from {}: {}", author, e);
counters::RECEIVED_BATCH_MAX_LIMIT_FAILED.inc();
Expand Down
12 changes: 6 additions & 6 deletions consensus/src/quorum_store/batch_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
use aptos_config::config::QuorumStoreConfig;
use aptos_consensus_types::{
common::{TransactionInProgress, TransactionSummary},
proof_of_store::{BatchInfoExt, TBatchInfo},
proof_of_store::{BatchInfo, BatchInfoExt, TBatchInfo},
};
use aptos_experimental_runtimes::thread_manager::optimal_min_len;
use aptos_logger::prelude::*;
Expand All @@ -33,7 +33,7 @@ use tokio::time::Interval;
pub enum BatchGeneratorCommand {
CommitNotification(u64, Vec<BatchInfoExt>),
ProofExpiration(Vec<BatchId>),
RemoteBatch(Batch),
RemoteBatch(Batch<BatchInfo>),
Shutdown(tokio::sync::oneshot::Sender<()>),
}

Expand Down Expand Up @@ -175,7 +175,7 @@ impl BatchGenerator {
txns: Vec<SignedTransaction>,
expiry_time: u64,
bucket_start: u64,
) -> Batch {
) -> Batch<BatchInfo> {
let batch_id = self.batch_id;
self.batch_id.increment();
self.db
Expand All @@ -201,7 +201,7 @@ impl BatchGenerator {
/// batches are pushed.
fn push_bucket_to_batches(
&mut self,
batches: &mut Vec<Batch>,
batches: &mut Vec<Batch<BatchInfo>>,
txns: &mut Vec<SignedTransaction>,
num_txns_in_bucket: usize,
expiry_time: u64,
Expand Down Expand Up @@ -242,7 +242,7 @@ impl BatchGenerator {
&mut self,
pulled_txns: &mut Vec<SignedTransaction>,
expiry_time: u64,
) -> Vec<Batch> {
) -> Vec<Batch<BatchInfo>> {
// Sort by gas, in descending order. This is a stable sort on existing mempool ordering,
// so will not reorder accounts or their sequence numbers as long as they have the same gas.
pulled_txns.sort_by_key(|txn| u64::MAX - txn.gas_unit_price());
Expand Down Expand Up @@ -325,7 +325,7 @@ impl BatchGenerator {
self.txns_in_progress_sorted.len()
}

pub(crate) async fn handle_scheduled_pull(&mut self, max_count: u64) -> Vec<Batch> {
pub(crate) async fn handle_scheduled_pull(&mut self, max_count: u64) -> Vec<Batch<BatchInfo>> {
counters::BATCH_PULL_EXCLUDED_TXNS.observe(self.txns_in_progress_sorted.len() as f64);
trace!(
"QS: excluding txs len: {:?}",
Expand Down
3 changes: 2 additions & 1 deletion consensus/src/quorum_store/batch_requester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
types::{BatchRequest, BatchResponse, PersistedValue},
},
};
use aptos_consensus_types::proof_of_store::BatchInfo;
use aptos_crypto::HashValue;
use aptos_executor_types::*;
use aptos_infallible::Mutex;
Expand Down Expand Up @@ -102,7 +103,7 @@ impl<T: QuorumStoreSender + Sync + 'static> BatchRequester<T> {
digest: HashValue,
expiration: u64,
responders: Arc<Mutex<BTreeSet<PeerId>>>,
mut subscriber_rx: oneshot::Receiver<PersistedValue>,
mut subscriber_rx: oneshot::Receiver<PersistedValue<BatchInfo>>,
) -> ExecutorResult<Vec<SignedTransaction>> {
let validator_verifier = self.validator_verifier.clone();
let mut request_state = BatchRequesterState::new(responders, self.retry_limit);
Expand Down
37 changes: 23 additions & 14 deletions consensus/src/quorum_store/batch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,15 @@ impl QuotaManager {
pub struct BatchStore {
epoch: OnceCell<u64>,
last_certified_time: AtomicU64,
db_cache: DashMap<HashValue, PersistedValue>,
db_cache: DashMap<HashValue, PersistedValue<BatchInfo>>,
peer_quota: DashMap<PeerId, QuotaManager>,
expirations: Mutex<TimeExpirations<HashValue>>,
db: Arc<dyn QuorumStoreStorage>,
memory_quota: usize,
db_quota: usize,
batch_quota: usize,
validator_signer: ValidatorSigner,
persist_subscribers: DashMap<HashValue, Vec<oneshot::Sender<PersistedValue>>>,
persist_subscribers: DashMap<HashValue, Vec<oneshot::Sender<PersistedValue<BatchInfo>>>>,
expiration_buffer_usecs: u64,
}

Expand Down Expand Up @@ -252,7 +252,7 @@ impl BatchStore {
*self.epoch.get().expect("Epoch should always be set")
}

fn free_quota(&self, value: PersistedValue) {
fn free_quota(&self, value: PersistedValue<BatchInfo>) {
let mut quota_manager = self
.peer_quota
.get_mut(&value.author())
Expand All @@ -268,7 +268,10 @@ impl BatchStore {
// Note: holds db_cache entry lock (due to DashMap), while accessing peer_quota
// DashMap. Hence, peer_quota reference should never be held while accessing the
// db_cache to avoid the deadlock (if needed, order is db_cache, then peer_quota).
pub(crate) fn insert_to_cache(&self, value: &PersistedValue) -> anyhow::Result<bool> {
pub(crate) fn insert_to_cache(
&self,
value: &PersistedValue<BatchInfo>,
) -> anyhow::Result<bool> {
let digest = *value.digest();
let author = value.author();
let expiration_time = value.expiration();
Expand Down Expand Up @@ -326,7 +329,7 @@ impl BatchStore {
Ok(true)
}

pub(crate) fn save(&self, value: &PersistedValue) -> anyhow::Result<bool> {
pub(crate) fn save(&self, value: &PersistedValue<BatchInfo>) -> anyhow::Result<bool> {
let last_certified_time = self.last_certified_time();
if value.expiration() > last_certified_time {
fail_point!("quorum_store::save", |_| {
Expand Down Expand Up @@ -398,7 +401,7 @@ impl BatchStore {
fn persist_inner<T: TBatchInfo>(
&self,
batch_info: T,
persist_request: PersistedValue,
persist_request: PersistedValue<BatchInfo>,
) -> Option<SignedBatchInfo<T>> {
assert!(
batch_info.as_batch_info() == persist_request.batch_info(),
Expand Down Expand Up @@ -437,7 +440,7 @@ impl BatchStore {
self.last_certified_time.load(Ordering::Relaxed)
}

fn get_batch_from_db(&self, digest: &HashValue) -> ExecutorResult<PersistedValue> {
fn get_batch_from_db(&self, digest: &HashValue) -> ExecutorResult<PersistedValue<BatchInfo>> {
counters::GET_BATCH_FROM_DB_COUNT.inc();

match self.db.get_batch(digest) {
Expand All @@ -452,7 +455,7 @@ impl BatchStore {
pub(crate) fn get_batch_from_local(
&self,
digest: &HashValue,
) -> ExecutorResult<PersistedValue> {
) -> ExecutorResult<PersistedValue<BatchInfo>> {
if let Some(value) = self.db_cache.get(digest) {
if value.payload_storage_mode() == StorageMode::PersistedOnly {
self.get_batch_from_db(digest)
Expand All @@ -469,7 +472,7 @@ impl BatchStore {
/// This can be useful in cases where there are multiple flows to add a batch (like
/// direct from author batch / batch requester fetch) to the batch store and either
/// flow needs to subscribe to the other.
fn subscribe(&self, digest: HashValue) -> oneshot::Receiver<PersistedValue> {
fn subscribe(&self, digest: HashValue) -> oneshot::Receiver<PersistedValue<BatchInfo>> {
let (tx, rx) = oneshot::channel();
self.persist_subscribers.entry(digest).or_default().push(tx);

Expand All @@ -482,7 +485,7 @@ impl BatchStore {
rx
}

fn notify_subscribers(&self, value: PersistedValue) {
fn notify_subscribers(&self, value: PersistedValue<BatchInfo>) {
if let Some((_, subscribers)) = self.persist_subscribers.remove(value.digest()) {
for subscriber in subscribers {
subscriber.send(value.clone()).ok();
Expand All @@ -492,7 +495,10 @@ impl BatchStore {
}

impl BatchWriter for BatchStore {
fn persist(&self, persist_requests: Vec<PersistedValue>) -> Vec<SignedBatchInfo<BatchInfo>> {
fn persist(
&self,
persist_requests: Vec<PersistedValue<BatchInfo>>,
) -> Vec<SignedBatchInfo<BatchInfo>> {
let mut signed_infos = vec![];
for persist_request in persist_requests.into_iter() {
let batch_info = persist_request.batch_info().clone();
Expand All @@ -506,7 +512,7 @@ impl BatchWriter for BatchStore {

fn persist_v2(
&self,
persist_requests: Vec<PersistedValue>,
persist_requests: Vec<PersistedValue<BatchInfo>>,
) -> Vec<SignedBatchInfo<BatchInfoExt>> {
let mut signed_infos = vec![];
for persist_request in persist_requests.into_iter() {
Expand Down Expand Up @@ -649,10 +655,13 @@ impl<T: QuorumStoreSender + Clone + Send + Sync + 'static> BatchReader for Batch
}

pub trait BatchWriter: Send + Sync {
fn persist(&self, persist_requests: Vec<PersistedValue>) -> Vec<SignedBatchInfo<BatchInfo>>;
fn persist(
&self,
persist_requests: Vec<PersistedValue<BatchInfo>>,
) -> Vec<SignedBatchInfo<BatchInfo>>;

fn persist_v2(
&self,
persist_requests: Vec<PersistedValue>,
persist_requests: Vec<PersistedValue<BatchInfo>>,
) -> Vec<SignedBatchInfo<BatchInfoExt>>;
}
2 changes: 1 addition & 1 deletion consensus/src/quorum_store/quorum_store_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ impl InnerBuilder {
let response = if let Ok(value) =
batch_store.get_batch_from_local(&rpc_request.req.digest())
{
let batch: Batch = value.try_into().unwrap();
let batch: Batch<BatchInfo> = value.try_into().unwrap();
BatchResponse::Batch(batch)
} else {
match aptos_db_clone.get_latest_ledger_info() {
Expand Down
Loading
Loading