这是indexloc提供的服务,不要输入任何密码
Skip to content

[Mempool] Add metrics & logs for broadcast behaviour. #17124

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: aptos-release-v1.32
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions mempool/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ pub const SUBMITTED_BY_CLIENT_LABEL: &str = "client";
pub const SUBMITTED_BY_DOWNSTREAM_LABEL: &str = "downstream";
pub const SUBMITTED_BY_PEER_VALIDATOR_LABEL: &str = "peer_validator";

// Broadcast event labels
pub const DROP_BROADCAST_LABEL: &str = "drop_broadcast";
pub const RUNNING_LABEL: &str = "running";

// Histogram buckets with a large range of 0-500s and some constant sized buckets between:
// 0-1.5s (every 25ms), 1.5-2s (every 100ms), 2-5s (250ms), 5-10s (1s), and 10-25s (2.5s).
const MEMPOOL_LATENCY_BUCKETS: &[f64] = &[
Expand Down Expand Up @@ -556,6 +560,21 @@ pub fn shared_mempool_broadcast_size(network_id: NetworkId, num_txns: usize) {
.observe(num_txns as f64);
}

static SHARED_MEMPOOL_BROADCAST_RUNNING_EVENTS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"aptos_shared_mempool_broadcast_running_events",
"Broadcast events (at runtime) for shared mempool",
&["event"]
)
.unwrap()
});

pub fn shared_mempool_broadcast_running_event_inc(event_label: &str) {
SHARED_MEMPOOL_BROADCAST_RUNNING_EVENTS
.with_label_values(&[event_label])
.inc();
}

static SHARED_MEMPOOL_BROADCAST_TYPE_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"aptos_shared_mempool_rebroadcast_count",
Expand Down
16 changes: 15 additions & 1 deletion mempool/src/shared_mempool/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,20 @@ pub enum BroadcastError {
TooManyPendingBroadcasts(PeerNetworkId),
}

impl BroadcastError {
/// Returns a summary label for the error
pub fn get_label(&self) -> &'static str {
match self {
Self::NetworkError(_, _) => "network_error",
Self::NoTransactions(_) => "no_transactions",
Self::PeerNotFound(_) => "peer_not_found",
Self::PeerNotPrioritized(_, _) => "peer_not_prioritized",
Self::PeerNotScheduled(_) => "peer_not_scheduled",
Self::TooManyPendingBroadcasts(_) => "too_many_pending_broadcasts",
}
}
}

#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
pub enum BroadcastPeerPriority {
Primary,
Expand Down Expand Up @@ -639,7 +653,7 @@ impl<NetworkClient: NetworkClientInterface<MempoolSyncMsg>> MempoolNetworkInterf

// Log all the metrics
let latency = start_time.elapsed();
trace!(
info!(
LogSchema::event_log(LogEntry::BroadcastTransaction, LogEvent::Success)
.peer(&peer)
.message_id(&message_id)
Expand Down
26 changes: 15 additions & 11 deletions mempool/src/shared_mempool/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,35 +61,39 @@ pub(crate) async fn execute_broadcast<NetworkClient, TransactionValidator>(
TransactionValidator: TransactionValidation,
{
let network_interface = &smp.network_interface.clone();
counters::shared_mempool_broadcast_running_event_inc(counters::RUNNING_LABEL);

// If there's no connection, don't bother to broadcast
if network_interface.sync_states_exists(&peer) {
if let Err(err) = network_interface
.execute_broadcast(peer, backoff, smp)
.await
{
counters::shared_mempool_broadcast_running_event_inc(err.get_label());
match err {
BroadcastError::NetworkError(peer, error) => warn!(LogSchema::event_log(
LogEntry::BroadcastTransaction,
LogEvent::NetworkSendFail
)
.peer(&peer)
.error(&error)),
BroadcastError::NoTransactions(_) | BroadcastError::PeerNotPrioritized(_, _) => {
BroadcastError::NoTransactions(_) => {
sample!(
SampleRate::Duration(Duration::from_secs(1)),
info!("No transactions to broadcast: {:?}", err)
);
},
BroadcastError::PeerNotPrioritized(_, _) => {
sample!(
SampleRate::Duration(Duration::from_secs(60)),
trace!("{:?}", err)
SampleRate::Duration(Duration::from_secs(1)),
info!("Peer not prioritized. Skipping broadcast: {:?}", err)
);
},
_ => {
sample!(
SampleRate::Duration(Duration::from_secs(60)),
debug!("{:?}", err)
SampleRate::Duration(Duration::from_secs(1)),
warn!("Execute broadcast failed: {:?}", err)
);
},
}
}
} else {
// Drop the scheduled broadcast, we're not connected anymore
counters::shared_mempool_broadcast_running_event_inc(counters::DROP_BROADCAST_LABEL);
return;
}
let schedule_backoff = network_interface.is_backoff_mode(&peer);
Expand Down
Loading