diff --git a/mempool/src/counters.rs b/mempool/src/counters.rs index 838506efd7fc2..221d8b4a93748 100644 --- a/mempool/src/counters.rs +++ b/mempool/src/counters.rs @@ -100,6 +100,10 @@ pub const SUBMITTED_BY_CLIENT_LABEL: &str = "client"; pub const SUBMITTED_BY_DOWNSTREAM_LABEL: &str = "downstream"; pub const SUBMITTED_BY_PEER_VALIDATOR_LABEL: &str = "peer_validator"; +// Broadcast event labels +pub const DROP_BROADCAST_LABEL: &str = "drop_broadcast"; +pub const RUNNING_LABEL: &str = "running"; + // Histogram buckets with a large range of 0-500s and some constant sized buckets between: // 0-1.5s (every 25ms), 1.5-2s (every 100ms), 2-5s (250ms), 5-10s (1s), and 10-25s (2.5s). const MEMPOOL_LATENCY_BUCKETS: &[f64] = &[ @@ -556,6 +560,21 @@ pub fn shared_mempool_broadcast_size(network_id: NetworkId, num_txns: usize) { .observe(num_txns as f64); } +static SHARED_MEMPOOL_BROADCAST_RUNNING_EVENTS: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "aptos_shared_mempool_broadcast_running_events", + "Broadcast events (at runtime) for shared mempool", + &["event"] + ) + .unwrap() +}); + +pub fn shared_mempool_broadcast_running_event_inc(event_label: &str) { + SHARED_MEMPOOL_BROADCAST_RUNNING_EVENTS + .with_label_values(&[event_label]) + .inc(); +} + static SHARED_MEMPOOL_BROADCAST_TYPE_COUNT: Lazy = Lazy::new(|| { register_int_counter_vec!( "aptos_shared_mempool_rebroadcast_count", diff --git a/mempool/src/shared_mempool/network.rs b/mempool/src/shared_mempool/network.rs index eb57f9bad097e..be6cdd147d62c 100644 --- a/mempool/src/shared_mempool/network.rs +++ b/mempool/src/shared_mempool/network.rs @@ -90,6 +90,20 @@ pub enum BroadcastError { TooManyPendingBroadcasts(PeerNetworkId), } +impl BroadcastError { + /// Returns a summary label for the error + pub fn get_label(&self) -> &'static str { + match self { + Self::NetworkError(_, _) => "network_error", + Self::NoTransactions(_) => "no_transactions", + Self::PeerNotFound(_) => "peer_not_found", + Self::PeerNotPrioritized(_, _) => "peer_not_prioritized", + Self::PeerNotScheduled(_) => "peer_not_scheduled", + Self::TooManyPendingBroadcasts(_) => "too_many_pending_broadcasts", + } + } +} + #[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] pub enum BroadcastPeerPriority { Primary, @@ -639,7 +653,7 @@ impl> MempoolNetworkInterf // Log all the metrics let latency = start_time.elapsed(); - trace!( + info!( LogSchema::event_log(LogEntry::BroadcastTransaction, LogEvent::Success) .peer(&peer) .message_id(&message_id) diff --git a/mempool/src/shared_mempool/tasks.rs b/mempool/src/shared_mempool/tasks.rs index 60476c70e7420..ae52c0438f1d2 100644 --- a/mempool/src/shared_mempool/tasks.rs +++ b/mempool/src/shared_mempool/tasks.rs @@ -61,35 +61,39 @@ pub(crate) async fn execute_broadcast( TransactionValidator: TransactionValidation, { let network_interface = &smp.network_interface.clone(); + counters::shared_mempool_broadcast_running_event_inc(counters::RUNNING_LABEL); + // If there's no connection, don't bother to broadcast if network_interface.sync_states_exists(&peer) { if let Err(err) = network_interface .execute_broadcast(peer, backoff, smp) .await { + counters::shared_mempool_broadcast_running_event_inc(err.get_label()); match err { - BroadcastError::NetworkError(peer, error) => warn!(LogSchema::event_log( - LogEntry::BroadcastTransaction, - LogEvent::NetworkSendFail - ) - .peer(&peer) - .error(&error)), - BroadcastError::NoTransactions(_) | BroadcastError::PeerNotPrioritized(_, _) => { + BroadcastError::NoTransactions(_) => { + sample!( + SampleRate::Duration(Duration::from_secs(1)), + info!("No transactions to broadcast: {:?}", err) + ); + }, + BroadcastError::PeerNotPrioritized(_, _) => { sample!( - SampleRate::Duration(Duration::from_secs(60)), - trace!("{:?}", err) + SampleRate::Duration(Duration::from_secs(1)), + info!("Peer not prioritized. Skipping broadcast: {:?}", err) ); }, _ => { sample!( - SampleRate::Duration(Duration::from_secs(60)), - debug!("{:?}", err) + SampleRate::Duration(Duration::from_secs(1)), + warn!("Execute broadcast failed: {:?}", err) ); }, } } } else { // Drop the scheduled broadcast, we're not connected anymore + counters::shared_mempool_broadcast_running_event_inc(counters::DROP_BROADCAST_LABEL); return; } let schedule_backoff = network_interface.is_backoff_mode(&peer);