diff --git a/crates/turbopack-trace-server/Cargo.toml b/crates/turbopack-trace-server/Cargo.toml index d2cba5d30aa7a..7c1f1c8b254a0 100644 --- a/crates/turbopack-trace-server/Cargo.toml +++ b/crates/turbopack-trace-server/Cargo.toml @@ -21,6 +21,7 @@ postcard = { workspace = true } rustc-demangle = "0.1" serde = { workspace = true } serde_json = { workspace = true } +tungstenite = { version = "0.21.0" } turbopack-trace-utils = { workspace = true } websocket = { version = "0.27.0", features = ["sync"] } zstd = { version = "0.13.0" } diff --git a/crates/turbopack-trace-server/src/lib.rs b/crates/turbopack-trace-server/src/lib.rs index 6365b16a15c87..c6ad3db06ee51 100644 --- a/crates/turbopack-trace-server/src/lib.rs +++ b/crates/turbopack-trace-server/src/lib.rs @@ -24,7 +24,7 @@ pub fn start_turbopack_trace_server(path: PathBuf) { let store = Arc::new(StoreContainer::new()); let reader = TraceReader::spawn(store.clone(), path); - serve(store).unwrap(); + serve(store); reader.join().unwrap(); } diff --git a/crates/turbopack-trace-server/src/main.rs b/crates/turbopack-trace-server/src/main.rs index d1c1f8fbadba6..56f4dcdcb5888 100644 --- a/crates/turbopack-trace-server/src/main.rs +++ b/crates/turbopack-trace-server/src/main.rs @@ -31,7 +31,7 @@ fn main() { let store = Arc::new(StoreContainer::new()); let reader = TraceReader::spawn(store.clone(), arg.into()); - serve(store).unwrap(); + serve(store); reader.join().unwrap(); } diff --git a/crates/turbopack-trace-server/src/server.rs b/crates/turbopack-trace-server/src/server.rs index 6d6386d53d059..97fbf2e0cdcbf 100644 --- a/crates/turbopack-trace-server/src/server.rs +++ b/crates/turbopack-trace-server/src/server.rs @@ -1,20 +1,12 @@ use std::{ - net::{Shutdown, TcpStream}, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, Mutex, - }, - thread, - time::Duration, + net::{TcpListener, TcpStream}, + sync::{Arc, Mutex}, + thread::spawn, }; use anyhow::{bail, Result}; use serde::{Deserialize, Serialize}; -use websocket::{ - server::upgrade::WsUpgrade, - sync::{server::upgrade::Buffer, Server, Writer}, - OwnedMessage, -}; +use tungstenite::{accept, Message}; use crate::{ store::SpanId, @@ -76,6 +68,7 @@ pub enum ClientToServerMessage { id: SpanId, }, Ack, + CheckForMoreData, } #[derive(Serialize, Deserialize, Debug)] @@ -101,287 +94,255 @@ pub struct ViewRect { } struct ConnectionState { - writer: Writer, store: Arc, viewer: Viewer, view_rect: ViewRect, last_update_generation: usize, } -pub fn serve(store: Arc) -> Result<()> { - let mut server: websocket::server::WsServer< - websocket::server::NoTlsAcceptor, - std::net::TcpListener, - > = Server::bind("127.0.0.1:5747")?; - loop { - let Ok(connection) = server.accept() else { - continue; - }; +pub fn serve(store: Arc) { + let server = TcpListener::bind("127.0.0.1:5747").unwrap(); + for stream in server.incoming() { let store = store.clone(); - thread::spawn(move || { - fn handle_connection( - connection: WsUpgrade>, - store: Arc, - ) -> Result<()> { - let connection = match connection.accept() { - Ok(connection) => connection, - Err((connection, error)) => { - connection.shutdown(Shutdown::Both)?; - return Err(error.into()); + + spawn(move || { + let websocket = accept(stream.unwrap()).unwrap(); + if let Err(err) = handle_connection(websocket, store) { + eprintln!("Error: {:?}", err); + } + }); + } +} + +fn handle_connection( + mut websocket: tungstenite::WebSocket, + store: Arc, +) -> Result<()> { + let state = Arc::new(Mutex::new(ConnectionState { + store, + viewer: Viewer::new(), + view_rect: ViewRect { + x: 0, + y: 0, + width: 1, + height: 1, + horizontal_pixels: 1, + query: String::new(), + view_mode: "aggregated".to_string(), + value_mode: "duration".to_string(), + }, + last_update_generation: 0, + })); + let mut update_skipped = false; + let mut ready_for_update = true; + + fn send_update( + websocket: &mut tungstenite::WebSocket, + state: &mut ConnectionState, + force_send: bool, + ready_for_update: &mut bool, + update_skipped: &mut bool, + ) -> Result<()> { + if !*ready_for_update { + if force_send { + *update_skipped = true; + } + return Ok(()); + } + let store = state.store.read(); + if !force_send && state.last_update_generation == store.generation() { + return Ok(()); + } + state.last_update_generation = store.generation(); + let Update { + lines: updates, + max, + } = state.viewer.compute_update(&store, &state.view_rect); + let count = updates.len(); + for update in updates { + let message = ServerToClientMessage::ViewLine { update }; + let message = serde_json::to_string(&message).unwrap(); + websocket.send(Message::Text(message))?; + } + let message = ServerToClientMessage::ViewLinesCount { count, max }; + let message = serde_json::to_string(&message).unwrap(); + websocket.send(Message::Text(message))?; + *ready_for_update = false; + Ok(()) + } + loop { + match websocket.read().unwrap() { + Message::Frame(_frame) => {} + Message::Text(text) => { + let message: ClientToServerMessage = serde_json::from_str(&text)?; + let mut state = state.lock().unwrap(); + match message { + ClientToServerMessage::CheckForMoreData => { + send_update( + &mut websocket, + &mut state, + false, + &mut ready_for_update, + &mut update_skipped, + )?; } - }; - println!("client connected"); - let (mut reader, writer) = connection.split()?; - let state = Arc::new(Mutex::new(ConnectionState { - writer, - store, - viewer: Viewer::new(), - view_rect: ViewRect { - x: 0, - y: 0, - width: 1, - height: 1, - horizontal_pixels: 1, - query: String::new(), - view_mode: "aggregated".to_string(), - value_mode: "duration".to_string(), - }, - last_update_generation: 0, - })); - let should_shutdown = Arc::new(AtomicBool::new(false)); - let update_skipped = Arc::new(AtomicBool::new(false)); - let ready_for_update = Arc::new(AtomicBool::new(true)); - fn send_update( - state: &mut ConnectionState, - force_send: bool, - ready_for_update: &AtomicBool, - update_skipped: &AtomicBool, - ) -> Result<()> { - if !ready_for_update.load(Ordering::SeqCst) { - if force_send { - update_skipped.store(true, Ordering::SeqCst); - } - return Ok(()); + ClientToServerMessage::ViewRect { view_rect } => { + state.view_rect = view_rect; + send_update( + &mut websocket, + &mut state, + true, + &mut ready_for_update, + &mut update_skipped, + )?; } - let store = state.store.read(); - if !force_send && state.last_update_generation == store.generation() { - return Ok(()); + ClientToServerMessage::ViewMode { id, mode, inherit } => { + let (mode, sorted) = if let Some(mode) = mode.strip_suffix("-sorted") { + (mode, true) + } else { + (mode.as_str(), false) + }; + match mode { + "raw-spans" => { + state.viewer.set_view_mode( + id, + Some((ViewMode::RawSpans { sorted }, inherit)), + ); + } + "aggregated" => { + state.viewer.set_view_mode( + id, + Some((ViewMode::Aggregated { sorted }, inherit)), + ); + } + "bottom-up" => { + state.viewer.set_view_mode( + id, + Some((ViewMode::BottomUp { sorted }, inherit)), + ); + } + "aggregated-bottom-up" => { + state.viewer.set_view_mode( + id, + Some((ViewMode::AggregatedBottomUp { sorted }, inherit)), + ); + } + _ => { + bail!("unknown view mode: {}", mode) + } + } + send_update( + &mut websocket, + &mut state, + true, + &mut ready_for_update, + &mut update_skipped, + )?; } - state.last_update_generation = store.generation(); - let Update { - lines: updates, - max, - } = state.viewer.compute_update(&store, &state.view_rect); - let count = updates.len(); - for update in updates { - let message = ServerToClientMessage::ViewLine { update }; - let message = serde_json::to_string(&message).unwrap(); - state.writer.send_message(&OwnedMessage::Text(message))?; + ClientToServerMessage::ResetViewMode { id } => { + state.viewer.set_view_mode(id, None); + send_update( + &mut websocket, + &mut state, + true, + &mut ready_for_update, + &mut update_skipped, + )?; } - let message = ServerToClientMessage::ViewLinesCount { count, max }; - let message = serde_json::to_string(&message).unwrap(); - state.writer.send_message(&OwnedMessage::Text(message))?; - ready_for_update.store(false, Ordering::SeqCst); - Ok(()) - } - let inner_thread = { - let should_shutdown = should_shutdown.clone(); - let ready_for_update = ready_for_update.clone(); - let update_skipped = update_skipped.clone(); - let state = state.clone(); - thread::spawn(move || loop { - if should_shutdown.load(Ordering::SeqCst) { - return; - } - if send_update( - &mut state.lock().unwrap(), - false, - &ready_for_update, - &update_skipped, - ) - .is_err() - { - break; - } - thread::sleep(Duration::from_millis(500)); - }) - }; - loop { - match reader.recv_message()? { - OwnedMessage::Text(text) => { - let message: ClientToServerMessage = serde_json::from_str(&text)?; - let mut state = state.lock().unwrap(); - match message { - ClientToServerMessage::ViewRect { view_rect } => { - state.view_rect = view_rect; - send_update( - &mut state, - true, - &ready_for_update, - &update_skipped, - )?; + ClientToServerMessage::Query { id } => { + let message = { + let store = state.store.read(); + if let Some((span, is_graph)) = store.span(id) { + let root_start = store.root_span().start(); + let span_start = span.start() - root_start; + let span_end = span.end() - root_start; + let duration = span.corrected_total_time(); + let cpu = span.total_time(); + let allocations = span.total_allocations(); + let deallocations = span.total_deallocations(); + let allocation_count = span.total_allocation_count(); + let persistent_allocations = span.total_persistent_allocations(); + let args = span + .args() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(); + let mut path = Vec::new(); + let mut current = span; + while let Some(parent) = current.parent() { + path.push(parent.nice_name().1.to_string()); + current = parent; } - ClientToServerMessage::ViewMode { id, mode, inherit } => { - let (mode, sorted) = - if let Some(mode) = mode.strip_suffix("-sorted") { - (mode, true) - } else { - (mode.as_str(), false) - }; - match mode { - "raw-spans" => { - state.viewer.set_view_mode( - id, - Some((ViewMode::RawSpans { sorted }, inherit)), - ); - } - "aggregated" => { - state.viewer.set_view_mode( - id, - Some((ViewMode::Aggregated { sorted }, inherit)), - ); - } - "bottom-up" => { - state.viewer.set_view_mode( - id, - Some((ViewMode::BottomUp { sorted }, inherit)), - ); - } - "aggregated-bottom-up" => { - state.viewer.set_view_mode( - id, - Some(( - ViewMode::AggregatedBottomUp { sorted }, - inherit, - )), - ); - } - _ => { - bail!("unknown view mode: {}", mode) - } - } - send_update( - &mut state, - true, - &ready_for_update, - &update_skipped, - )?; + path.reverse(); + ServerToClientMessage::QueryResult { + id, + is_graph, + start: span_start, + end: span_end, + duration, + cpu, + allocations, + deallocations, + allocation_count, + persistent_allocations, + args, + path, } - ClientToServerMessage::ResetViewMode { id } => { - state.viewer.set_view_mode(id, None); - send_update( - &mut state, - true, - &ready_for_update, - &update_skipped, - )?; - } - ClientToServerMessage::Query { id } => { - let message = { - let store = state.store.read(); - if let Some((span, is_graph)) = store.span(id) { - let root_start = store.root_span().start(); - let span_start = span.start() - root_start; - let span_end = span.end() - root_start; - let duration = span.corrected_total_time(); - let cpu = span.total_time(); - let allocations = span.total_allocations(); - let deallocations = span.total_deallocations(); - let allocation_count = span.total_allocation_count(); - let persistent_allocations = - span.total_persistent_allocations(); - let args = span - .args() - .map(|(k, v)| (k.to_string(), v.to_string())) - .collect(); - let mut path = Vec::new(); - let mut current = span; - while let Some(parent) = current.parent() { - path.push(parent.nice_name().1.to_string()); - current = parent; - } - path.reverse(); - ServerToClientMessage::QueryResult { - id, - is_graph, - start: span_start, - end: span_end, - duration, - cpu, - allocations, - deallocations, - allocation_count, - persistent_allocations, - args, - path, - } - } else { - ServerToClientMessage::QueryResult { - id, - is_graph: false, - start: 0, - end: 0, - duration: 0, - cpu: 0, - allocations: 0, - deallocations: 0, - allocation_count: 0, - persistent_allocations: 0, - args: Vec::new(), - path: Vec::new(), - } - } - }; - let message = serde_json::to_string(&message).unwrap(); - state.writer.send_message(&OwnedMessage::Text(message))?; - send_update( - &mut state, - true, - &ready_for_update, - &update_skipped, - )?; - - continue; - } - ClientToServerMessage::Ack => { - ready_for_update.store(true, Ordering::SeqCst); - if update_skipped.load(Ordering::SeqCst) { - update_skipped.store(false, Ordering::SeqCst); - send_update( - &mut state, - true, - &ready_for_update, - &update_skipped, - )?; - } + } else { + ServerToClientMessage::QueryResult { + id, + is_graph: false, + start: 0, + end: 0, + duration: 0, + cpu: 0, + allocations: 0, + deallocations: 0, + allocation_count: 0, + persistent_allocations: 0, + args: Vec::new(), + path: Vec::new(), } } - } - OwnedMessage::Binary(_) => { - // This doesn't happen - } - OwnedMessage::Close(_) => { - reader.shutdown_all()?; - should_shutdown.store(true, Ordering::SeqCst); - inner_thread.join().unwrap(); - return Ok(()); - } - OwnedMessage::Ping(d) => { - state - .lock() - .unwrap() - .writer - .send_message(&OwnedMessage::Pong(d))?; - } - OwnedMessage::Pong(_) => { - // thanks for the fish + }; + let message = serde_json::to_string(&message).unwrap(); + websocket.send(Message::Text(message))?; + send_update( + &mut websocket, + &mut state, + true, + &mut ready_for_update, + &mut update_skipped, + )?; + + continue; + } + ClientToServerMessage::Ack => { + ready_for_update = true; + if update_skipped { + update_skipped = false; + send_update( + &mut websocket, + &mut state, + true, + &mut ready_for_update, + &mut update_skipped, + )?; } } } } - if let Err(err) = handle_connection(connection, store) { - eprintln!("Error: {:?}", err); + Message::Binary(_) => { + // This doesn't happen } - }); + Message::Close(_) => { + return Ok(()); + } + Message::Ping(d) => { + websocket.send(Message::Pong(d))?; + } + Message::Pong(_) => { + // thanks for the fish + } + } } }