这是indexloc提供的服务,不要输入任何密码
Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 13 additions & 18 deletions src/broadcast_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ impl BroadcastService {
/// WriteStage is the last stage in the pipeline), which will then close Broadcast service,
/// which will then close FetchStage in the Tpu, and then the rest of the Tpu,
/// completing the cycle.
#[allow(clippy::too_many_arguments, clippy::new_ret_no_self)]
#[allow(clippy::too_many_arguments)]
pub fn new(
db_ledger: Arc<DbLedger>,
bank: Arc<Bank>,
Expand All @@ -323,9 +323,8 @@ impl BroadcastService {
receiver: Receiver<Vec<Entry>>,
max_tick_height: Option<u64>,
exit_sender: Arc<AtomicBool>,
) -> (Self, Arc<AtomicBool>) {
) -> Self {
let exit_signal = Arc::new(AtomicBool::new(false));
let exit_signal_ = exit_signal.clone();
let thread_hdl = Builder::new()
.name("solana-broadcaster".to_string())
.spawn(move || {
Expand All @@ -340,12 +339,12 @@ impl BroadcastService {
&leader_scheduler,
&receiver,
max_tick_height,
&exit_signal_,
&exit_signal,
)
})
.unwrap();

(Self { thread_hdl }, exit_signal)
Self { thread_hdl }
}
}

Expand All @@ -370,25 +369,23 @@ mod test {
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel;
use std::sync::mpsc::Sender;
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::time::Duration;

struct DummyBroadcastService {
struct MockBroadcastService {
db_ledger: Arc<DbLedger>,
broadcast_service: BroadcastService,
entry_sender: Sender<Vec<Entry>>,
exit_signal: Arc<AtomicBool>,
}

fn setup_dummy_broadcast_service(
leader_pubkey: Pubkey,
ledger_path: &str,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
entry_receiver: Receiver<Vec<Entry>>,
entry_height: u64,
max_tick_height: u64,
) -> DummyBroadcastService {
) -> MockBroadcastService {
// Make the database ledger
let db_ledger = Arc::new(DbLedger::open(ledger_path).unwrap());

Expand All @@ -406,12 +403,11 @@ mod test {

let window = new_window(32 * 1024);
let shared_window = Arc::new(RwLock::new(window));
let (entry_sender, entry_receiver) = channel();
let exit_sender = Arc::new(AtomicBool::new(false));
let bank = Arc::new(Bank::default());

// Start up the broadcast stage
let (broadcast_service, exit_signal) = BroadcastService::new(
let broadcast_service = BroadcastService::new(
db_ledger.clone(),
bank.clone(),
leader_info.sockets.broadcast,
Expand All @@ -424,11 +420,9 @@ mod test {
exit_sender,
);

DummyBroadcastService {
MockBroadcastService {
db_ledger,
broadcast_service,
entry_sender,
exit_signal,
}
}

Expand All @@ -450,10 +444,12 @@ mod test {
let entry_height = 2 * start_tick_height;

let leader_scheduler = Arc::new(RwLock::new(leader_scheduler));
let (entry_sender, entry_receiver) = channel();
let broadcast_service = setup_dummy_broadcast_service(
leader_keypair.pubkey(),
&ledger_path,
leader_scheduler.clone(),
entry_receiver,
entry_height,
max_tick_height,
);
Expand All @@ -465,8 +461,7 @@ mod test {
for (i, mut tick) in ticks.into_iter().enumerate() {
// Simulate the tick heights generated in poh.rs
tick.tick_height = start_tick_height + i as u64 + 1;
broadcast_service
.entry_sender
entry_sender
.send(vec![tick])
.expect("Expect successful send to broadcast service");
}
Expand All @@ -484,7 +479,7 @@ mod test {
assert!(result.is_some());
}

broadcast_service.exit_signal.store(true, Ordering::Relaxed);
drop(entry_sender);
broadcast_service
.broadcast_service
.join()
Expand Down
4 changes: 2 additions & 2 deletions src/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ impl Fullnode {
scheduled_leader,
);

let (broadcast_service, _) = BroadcastService::new(
let broadcast_service = BroadcastService::new(
db_ledger.clone(),
bank.clone(),
node.sockets
Expand Down Expand Up @@ -487,7 +487,7 @@ impl Fullnode {
self.keypair.pubkey(),
);

let (broadcast_service, _) = BroadcastService::new(
let broadcast_service = BroadcastService::new(
self.db_ledger.clone(),
self.bank.clone(),
self.broadcast_socket
Expand Down