diff --git a/src/cmd/serve.rs b/src/cmd/serve.rs index 3a0aa90..2d0fad8 100644 --- a/src/cmd/serve.rs +++ b/src/cmd/serve.rs @@ -24,10 +24,7 @@ pub fn run_serve_cli(matches: &ArgMatches) -> Result<(), String> { } let data_directory = matches.value_of("DATA_DIRECTORY").unwrap(); let schema_file = matches.value_of("SCHEMA_FILE").unwrap(); - let mut tokenizer_file = ""; - if let Some(f) = matches.value_of("TOKENIZER_FILE") { - tokenizer_file = f; - } + let tokenizer_file = matches.value_of("TOKENIZER_FILE").unwrap(); let indexer_threads = matches .value_of("INDEXER_THREADS") diff --git a/src/main.rs b/src/main.rs index 8d770f1..f3bdbd7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -114,6 +114,7 @@ fn main() { .short("T") .long("tokenizer-file") .value_name("TOKENIZER_FILE") + .default_value("./etc/tokenizer.json") .takes_value(true), ) .arg( diff --git a/src/server/peer.rs b/src/server/peer.rs index 071ac7a..18f23f0 100644 --- a/src/server/peer.rs +++ b/src/server/peer.rs @@ -1,12 +1,12 @@ -//use std::collections::HashMap; +// use std::collections::HashMap; use std::sync::mpsc::{Receiver, RecvTimeoutError, SyncSender}; use std::thread; use std::time::{Duration, Instant}; use log::*; -use raft::eraftpb::{ConfChange, Entry, EntryType, Message}; -use raft::storage::MemStorage as PeerStorage; use raft::{self, RawNode}; +use raft::eraftpb::{ConfChange, ConfState, Entry, EntryType, Message, MessageType}; +use raft::storage::MemStorage; use crate::server::util; @@ -17,9 +17,10 @@ pub enum PeerMessage { } pub struct Peer { - pub raft_group: RawNode, - // last_applying_idx: u64, + pub raw_node: RawNode, + last_apply_index: u64, // last_compacted_idx: u64, + conf_state: Option, apply_ch: SyncSender, // peers_addr: HashMap, // id, (host, port) } @@ -27,11 +28,12 @@ pub struct Peer { impl Peer { pub fn new(id: u64, apply_ch: SyncSender, peers: Vec) -> Peer { let cfg = util::default_raft_config(id, peers); - let storge = PeerStorage::new(); + let storge = MemStorage::new(); let peer = Peer { - raft_group: RawNode::new(&cfg, storge, vec![]).unwrap(), - // last_applying_idx: 0, + raw_node: RawNode::new(&cfg, storge, vec![]).unwrap(), + last_apply_index: 0, // last_compacted_idx: 0, + conf_state: None, apply_ch, // peers_addr: HashMap::new(), }; @@ -45,21 +47,26 @@ impl Peer { } fn listen_message(&mut self, sender: SyncSender, receiver: Receiver) { + debug!("start listening message"); + let mut t = Instant::now(); let mut timeout = Duration::from_millis(100); loop { match receiver.recv_timeout(timeout) { - Ok(PeerMessage::Propose(p)) => match self.raft_group.propose(vec![], p) { - Ok(_) => (), - Err(_) => self.apply_message(Entry::new()), + Ok(PeerMessage::Propose(p)) => match self.raw_node.propose(vec![], p.clone()) { + Ok(_) => info!("proposal succeeded: {:?}", p), + Err(_) => { + warn!("proposal failed: {:?}", p); + self.apply_message(Entry::new()); + } }, Ok(PeerMessage::ConfChange(cc)) => { - match self.raft_group.propose_conf_change(vec![], cc.clone()) { - Ok(_) => (), - Err(_) => error!("conf change failed: {:?}", cc), + match self.raw_node.propose_conf_change(vec![], cc.clone()) { + Ok(_) => info!("proposed configuration change succeeded: {:?}", cc), + Err(_) => warn!("proposed configuration change failed: {:?}", cc), } } - Ok(PeerMessage::Message(m)) => self.raft_group.step(m).unwrap(), + Ok(PeerMessage::Message(m)) => self.raw_node.step(m).unwrap(), Err(RecvTimeoutError::Timeout) => (), Err(RecvTimeoutError::Disconnected) => return, } @@ -68,7 +75,7 @@ impl Peer { if d >= timeout { t = Instant::now(); timeout = Duration::from_millis(200); - self.raft_group.tick(); + self.raw_node.tick(); } else { timeout -= d; } @@ -77,23 +84,28 @@ impl Peer { } } + fn is_leader(&self) -> bool { + self.raw_node.raft.leader_id == self.raw_node.raft.id + } + fn on_ready(&mut self, sender: SyncSender) { - if !self.raft_group.has_ready() { + if !self.raw_node.has_ready() { return; } + let mut ready = self.raw_node.ready(); + + let is_leader = self.is_leader(); - let mut ready = self.raft_group.ready(); - let is_leader = self.raft_group.raft.leader_id == self.raft_group.raft.id; + // leader if is_leader { - // debug!("I'm leader"); let msgs = ready.messages.drain(..); - for _msg in msgs { - Self::send_message(sender.clone(), _msg.clone()); + for msg in msgs { + Self::send_message(sender.clone(), msg.clone()); } } if !raft::is_empty_snap(&ready.snapshot) { - self.raft_group + self.raw_node .mut_store() .wl() .apply_snapshot(ready.snapshot.clone()) @@ -101,7 +113,7 @@ impl Peer { } if !ready.entries.is_empty() { - self.raft_group + self.raw_node .mut_store() .wl() .append(&ready.entries) @@ -109,26 +121,22 @@ impl Peer { } if let Some(ref hs) = ready.hs { - self.raft_group.mut_store().wl().set_hardstate(hs.clone()); + self.raw_node.mut_store().wl().set_hardstate(hs.clone()); } + // not leader if !is_leader { - // debug!("I'm follower"); let msgs = ready.messages.drain(..); - for mut _msg in msgs { - for _entry in _msg.mut_entries().iter() { - if _entry.get_entry_type() == EntryType::EntryConfChange {} - } - Self::send_message(sender.clone(), _msg.clone()); + for msg in msgs { + Self::send_message(sender.clone(), msg.clone()); } } if let Some(committed_entries) = ready.committed_entries.take() { - let mut _last_apply_index = 0; for entry in committed_entries { // Mostly, you need to save the last apply index to resume applying // after restart. Here we just ignore this because we use a Memory storage. - _last_apply_index = entry.get_index(); + self.last_apply_index = entry.get_index(); if entry.get_data().is_empty() { // Emtpy entry, when the peer becomes Leader it will send an empty entry. @@ -139,23 +147,53 @@ impl Peer { EntryType::EntryNormal => self.apply_message(entry.clone()), EntryType::EntryConfChange => { let cc = util::parse_data(&entry.data); - debug!("config: {:?}", cc); - self.raft_group.apply_conf_change(&cc); - debug!("apply conf change"); + info!("apply config change: {:?}", cc); + self.raw_node.apply_conf_change(&cc); self.apply_message(entry.clone()); + + self.conf_state = Some(self.raw_node.apply_conf_change(&cc)); } } } + + // if last_apply_index > 0 { + // info!("last apply index: {}", last_apply_index); + // let data = b"data".to_vec(); + // match self.raw_node.mut_store().wl().create_snapshot( + // last_apply_index, + // conf_state, + // data, + // ) { + // Ok(_s) => (), + // Err(e) => error!("creating snapshot failed: {:?}", e), + // } + // } } // Advance the Raft - self.raft_group.advance(ready); + self.raw_node.advance(ready); + + // let raft_applied = self.raw_node.raft.raft_log.get_applied(); + // info!("raft_applied: {}", raft_applied); + // match self.raw_node.mut_store().wl().compact(raft_applied) { + // Ok(_) => (), + // Err(e) => error!("compaction failed: {:?}", e), + // } } fn send_message(sender: SyncSender, msg: Message) { thread::spawn(move || { + // for entry in msg.mut_entries().iter() { + // debug!("leader: {:?}", entry); + // } + match msg.msg_type { + MessageType::MsgHeartbeat => debug!("send message: {:?}", msg), + MessageType::MsgHeartbeatResponse => debug!("send message: {:?}", msg), + _ => info!("send message: {:?}", msg), + } + sender.send(msg).unwrap_or_else(|e| { - panic!("raft send message error: {}", e); + panic!("error sending message: {:?}", e); }); }); } @@ -163,8 +201,10 @@ impl Peer { fn apply_message(&self, entry: Entry) { let sender = self.apply_ch.clone(); thread::spawn(move || { + info!("apply entry: {:?}", entry); + sender.send(entry).unwrap_or_else(|e| { - panic!("raft send apply entry error: {}", e); + panic!("error sending apply entry: {:?}", e); }); }); } diff --git a/src/server/server.rs b/src/server/server.rs index a8612c3..38326e0 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -35,12 +35,13 @@ use crate::tokenizer::tokenizer_initializer::TokenizerInitializer; use crate::util::search_result::{ScoredNamedFieldDocument, SearchResult}; use crate::util::signal::sigterm_channel; +#[derive(Debug)] struct NotifyArgs(u64, String, RespErr); #[derive(Clone)] pub struct IndexServer { id: u64, - peers: Arc>>, + peers_client: Arc>>, peers_addr: Arc>>, rf_message_ch: SyncSender, notify_ch_map: Arc>>>, @@ -61,10 +62,10 @@ impl IndexServer { indexer_threads: usize, indexer_memory_size: usize, ) { - let mut peers = HashMap::new(); - peers.insert(id, create_client(&format!("{}:{}", host, port))); + let mut peers_client = HashMap::new(); + peers_client.insert(id, create_client(&format!("{}:{}", host, port))); for (peer_id, peer_addr) in peers_addr.iter() { - peers.insert(*peer_id, create_client(peer_addr)); + peers_client.insert(*peer_id, create_client(peer_addr)); } let raft_path = Path::new(data_directory).join(Path::new("raft")); @@ -103,11 +104,11 @@ impl IndexServer { let (rpc_sender, rpc_receiver) = mpsc::sync_channel(100); let (apply_sender, apply_receiver) = mpsc::sync_channel(100); - let peers_id = peers.keys().map(|id| *id).collect(); + let peers_id = peers_client.keys().map(|id| *id).collect(); let mut index_server = IndexServer { id, - peers: Arc::new(Mutex::new(peers)), + peers_client: Arc::new(Mutex::new(peers_client)), peers_addr: Arc::new(Mutex::new(peers_addr)), rf_message_ch: rf_sender, notify_ch_map: Arc::new(Mutex::new(HashMap::new())), @@ -138,7 +139,7 @@ impl IndexServer { peer::Peer::activate(peer, rpc_sender, rf_receiver); let mut servers: Vec = Vec::new(); - for (_, value) in index_server.peers.clone().lock().unwrap().iter() { + for (_, value) in index_server.peers_client.clone().lock().unwrap().iter() { servers.push(value.clone()); } @@ -160,7 +161,7 @@ impl IndexServer { } fn async_rpc_sender(&mut self, receiver: Receiver) { - let l = self.peers.clone(); + let l = self.peers_client.clone(); thread::spawn(move || loop { match receiver.recv() { Ok(m) => { @@ -187,6 +188,7 @@ impl IndexServer { let mut map = self.notify_ch_map.lock().unwrap(); map.insert(req.get_client_id(), sh); } + self.rf_message_ch .send(PeerMessage::Propose(req.write_to_bytes().unwrap_or_else( |e| { @@ -196,6 +198,7 @@ impl IndexServer { .unwrap_or_else(|e| { error!("send propose to raft error: {:?}", e); }); + // TODO: consider appropriate timeout value return match rh.recv_timeout(Duration::from_millis(60000)) { Ok(args) => (args.2, args.1), @@ -223,7 +226,7 @@ impl IndexServer { // TODO: check duplicate request. fn async_applier(&mut self, apply_receiver: Receiver) { let notify_ch_map = self.notify_ch_map.clone(); - let peers = self.peers.clone(); + let peers = self.peers_client.clone(); let peers_addr = self.peers_addr.clone(); let index = self.index.clone(); let index_writer = self.index_writer.clone(); @@ -231,14 +234,16 @@ impl IndexServer { thread::spawn(move || loop { match apply_receiver.recv() { - Ok(e) => match e.get_entry_type() { + Ok(entry) => match entry.get_entry_type() { EntryType::EntryNormal => { + let req: ApplyReq = util::parse_data(entry.get_data()); + debug!("request: {:?}", req); + + // let client_id = req.get_client_id(); let result: NotifyArgs; - let req: ApplyReq = util::parse_data(e.get_data()); - let client_id = req.get_client_id(); - if e.data.len() > 0 { + if entry.data.len() > 0 { result = Self::apply_entry( - e.term, + entry.term, &req, peers.clone(), peers_addr.clone(), @@ -246,26 +251,30 @@ impl IndexServer { index_writer.clone(), metrics.clone(), ); - debug!("{:?}: {:?}", result.2, req); } else { result = NotifyArgs(0, String::from(""), RespErr::ErrWrongLeader); - debug!("{:?}", req); } + debug!("request: {:?}", result); + let mut map = notify_ch_map.lock().unwrap(); - if let Some(s) = map.get(&client_id) { - s.send(result).unwrap_or_else(|e| { - error!("notify apply result error: {:?}", e); + if let Some(sync_sender) = map.get(&req.get_client_id()) { + sync_sender.send(result).unwrap_or_else(|e| { + error!("error sending result: {:?}", e); }); } - map.remove(&client_id); + map.remove(&req.get_client_id()); } EntryType::EntryConfChange => { + let cc: ConfChange = util::parse_data(entry.get_data()); + debug!("config change: {:?}", cc); + let result = NotifyArgs(0, String::from(""), RespErr::OK); - let cc: ConfChange = util::parse_data(e.get_data()); + debug!("request: {:?}", result); + let mut map = notify_ch_map.lock().unwrap(); - if let Some(s) = map.get(&cc.get_node_id()) { - s.send(result).unwrap_or_else(|e| { - error!("notify apply result error: {:?}", e); + if let Some(sync_sender) = map.get(&cc.get_node_id()) { + sync_sender.send(result).unwrap_or_else(|e| { + error!("error sending result: {:?}", e); }); } map.remove(&cc.get_node_id()); @@ -509,30 +518,34 @@ impl IndexService for IndexServer { fn raft(&mut self, ctx: RpcContext, req: RaftMessage, sink: UnarySink) { self.metrics.lock().unwrap().inc_request_count("raft"); + debug!("request: {:?}", req); + self.rf_message_ch .send(PeerMessage::Message(req.clone())) .unwrap_or_else(|e| { - error!("send message to raft error: {:?}", e); + error!("error sending message: {:?}", e); }); + let resp = RaftDone::new(); + debug!("response: {:?}", resp); ctx.spawn( sink.success(resp) - .map_err(move |e| error!("failed to reply {:?}: {:?}", req, e)), + .map_err(move |e| error!("failed to reply response: {:?}", e)), ) } fn raft_conf_change(&mut self, ctx: RpcContext, req: ConfChangeReq, sink: UnarySink) { - debug!("request: {:?}", req); - self.metrics .lock() .unwrap() .inc_request_count("raft_conf_change"); + debug!("request: {:?}", req); + let cc = req.cc.clone().unwrap(); - let mut resp = RaftDone::new(); let mut apply_req = ApplyReq::new(); + let mut resp = RaftDone::new(); match cc.change_type { ConfChangeType::AddNode | ConfChangeType::AddLearnerNode => { @@ -572,18 +585,17 @@ impl IndexService for IndexServer { } debug!("response: {:?}", resp); - ctx.spawn( sink.success(resp) - .map_err(move |e| error!("failed to reply {:?}: {:?}", req, e)), + .map_err(move |e| error!("failed to reply response: {:?}", e)), ) } fn probe(&mut self, ctx: RpcContext, req: ProbeReq, sink: UnarySink) { - debug!("request: {:?}", req); - self.metrics.lock().unwrap().inc_request_count("probe"); + debug!("request: {:?}", req); + let mut ret = HashMap::new(); ret.insert("health", "OK"); @@ -592,52 +604,49 @@ impl IndexService for IndexServer { resp.set_value(serde_json::to_string(&ret).unwrap()); debug!("response: {:?}", resp); - ctx.spawn( sink.success(resp) - .map_err(move |e| error!("failed to reply {:?}: {:?}", req, e)), + .map_err(move |e| error!("failed to reply response: {:?}", e)), ) } fn peers(&mut self, ctx: RpcContext, req: PeersReq, sink: UnarySink) { - debug!("request: {:?}", req); - self.metrics.lock().unwrap().inc_request_count("peers"); + debug!("request: {:?}", req); + let mut resp = PeersResp::new(); resp.set_err(RespErr::OK); resp.set_value(serde_json::to_string(&self.peers_addr.lock().unwrap().clone()).unwrap()); debug!("response: {:?}", resp); - ctx.spawn( sink.success(resp) - .map_err(move |e| error!("failed to reply {:?}: {:?}", req, e)), + .map_err(move |e| error!("failed to reply response: {:?}", e)), ) } fn metrics(&mut self, ctx: RpcContext, req: MetricsReq, sink: UnarySink) { - debug!("request: {:?}", req); - self.metrics.lock().unwrap().inc_request_count("metrics"); + debug!("request: {:?}", req); + let mut resp = MetricsResp::new(); resp.set_err(RespErr::OK); resp.set_value(self.metrics.lock().unwrap().get_metrics()); debug!("response: {:?}", resp); - ctx.spawn( sink.success(resp) - .map_err(move |e| error!("failed to reply {:?}: {:?}", req, e)), + .map_err(move |e| error!("failed to reply response: {:?}", e)), ) } fn get(&mut self, ctx: RpcContext, req: GetReq, sink: UnarySink) { - debug!("request: {:?}", req); - self.metrics.lock().unwrap().inc_request_count("get"); + debug!("request: {:?}", req); + let t = Term::from_field_text( self.index.schema().get_field("_id").unwrap(), req.get_doc_id(), @@ -656,10 +665,9 @@ impl IndexService for IndexServer { resp.set_value(serde_json::to_string(&named_doc).unwrap()); debug!("response: {:?}", resp); - ctx.spawn( sink.success(resp) - .map_err(move |e| error!("failed to reply {:?}: {:?}", req, e)), + .map_err(move |e| error!("failed to reply response: {:?}", e)), ) } @@ -677,10 +685,9 @@ impl IndexService for IndexServer { resp.set_value(ret); debug!("response: {:?}", resp); - ctx.spawn( sink.success(resp) - .map_err(move |e| error!("failed to reply {:?}: {:?}", apply_req, e)), + .map_err(move |e| error!("failed to reply message: {:?}", e)), ) } @@ -698,10 +705,9 @@ impl IndexService for IndexServer { resp.set_value(ret); debug!("response: {:?}", resp); - ctx.spawn( sink.success(resp) - .map_err(move |e| error!("failed to reply {:?}: {:?}", apply_req, e)), + .map_err(move |e| error!("failed to reply response: {:?}", e)), ) } @@ -719,10 +725,9 @@ impl IndexService for IndexServer { resp.set_value(ret); debug!("response: {:?}", resp); - ctx.spawn( sink.success(resp) - .map_err(move |e| error!("failed to reply {:?}: {:?}", apply_req, e)), + .map_err(move |e| error!("failed to reply response: {:?}", e)), ) } @@ -745,10 +750,9 @@ impl IndexService for IndexServer { resp.set_value(ret); debug!("response: {:?}", resp); - ctx.spawn( sink.success(resp) - .map_err(move |e| error!("failed to reply {:?}: {:?}", apply_req, e)), + .map_err(move |e| error!("failed to reply response: {:?}", e)), ) } @@ -766,10 +770,9 @@ impl IndexService for IndexServer { resp.set_value(ret); debug!("response: {:?}", resp); - ctx.spawn( sink.success(resp) - .map_err(move |e| error!("failed to reply {:?}: {:?}", apply_req, e)), + .map_err(move |e| error!("failed to reply response: {:?}", e)), ) } @@ -787,10 +790,9 @@ impl IndexService for IndexServer { resp.set_value(ret); debug!("response: {:?}", resp); - ctx.spawn( sink.success(resp) - .map_err(move |e| error!("failed to reply {:?}: {:?}", apply_req, e)), + .map_err(move |e| error!("failed to reply response: {:?}", e)), ) } @@ -808,18 +810,17 @@ impl IndexService for IndexServer { resp.set_value(ret); debug!("response: {:?}", resp); - ctx.spawn( sink.success(resp) - .map_err(move |e| error!("failed to reply {:?}: {:?}", apply_req, e)), + .map_err(move |e| error!("failed to reply response: {:?}", e)), ) } fn search(&mut self, ctx: RpcContext, req: SearchReq, sink: UnarySink) { - debug!("request: {:?}", req); - self.metrics.lock().unwrap().inc_request_count("search"); + debug!("request: {:?}", req); + let schema = self.index.schema(); let default_fields: Vec = schema .fields() @@ -882,7 +883,6 @@ impl IndexService for IndexServer { let mut facet_kv: HashMap = HashMap::new(); for facet_prefix in req.get_facet_prefixes() { for (facet_key, facet_value) in facet_counts.get(facet_prefix) { - debug!("{:?}={}", facet_key.to_string(), facet_value); facet_kv.insert(facet_key.to_string(), facet_value); } } @@ -895,12 +895,6 @@ impl IndexService for IndexServer { if doc_pos >= req.get_from() { let doc = searcher.doc(doc_address).unwrap(); let named_doc = schema.to_named_doc(&doc); - debug!( - "score: {:?} doc: {:?}", - score, - serde_json::to_string(&named_doc).unwrap() - ); - let scored_doc = ScoredNamedFieldDocument { fields: named_doc, score, @@ -917,30 +911,25 @@ impl IndexService for IndexServer { resp.set_value(serde_json::to_string(&sr).unwrap()); debug!("response: {:?}", resp); - ctx.spawn( sink.success(resp) - .map_err(move |e| error!("failed to reply {:?}: {:?}", req, e)), + .map_err(move |e| error!("failed to reply response: {:?}", e)), ) } fn schema(&mut self, ctx: RpcContext, req: SchemaReq, sink: UnarySink) { - debug!("request: {:?}", req); - self.metrics.lock().unwrap().inc_request_count("schema"); + debug!("request: {:?}", req); + let mut resp = SchemaResp::new(); resp.set_err(RespErr::OK); - resp.set_value(format!( - "{}", - serde_json::to_string(&self.index.schema()).unwrap() - )); + resp.set_value(serde_json::to_string(&self.index.schema()).unwrap()); debug!("response: {:?}", resp); - ctx.spawn( sink.success(resp) - .map_err(move |e| error!("failed to reply {:?}: {:?}", req, e)), + .map_err(move |e| error!("failed to reply response: {:?}", e)), ); } } diff --git a/src/server/util.rs b/src/server/util.rs index ba21a9c..92d8c94 100644 --- a/src/server/util.rs +++ b/src/server/util.rs @@ -1,9 +1,7 @@ -use log::*; use protobuf::{self, Message}; use raft::Config; pub fn default_raft_config(id: u64, peers: Vec) -> Config { - debug!("default_raft_config id:{} peers:{:?}", id, peers); Config { id, peers,