diff --git a/proto/indexpb.proto b/proto/indexpb.proto index 20b86f1..1166b84 100644 --- a/proto/indexpb.proto +++ b/proto/indexpb.proto @@ -17,4 +17,6 @@ service Index { } rpc RaftConfChange (indexrpcpb.ConfChangeReq) returns (indexrpcpb.RaftDone) { } + rpc Status (indexrpcpb.IndexReq) returns (indexrpcpb.StatusResp) { + } } diff --git a/proto/indexrpcpb.proto b/proto/indexrpcpb.proto index 6255c9f..bf5f53e 100644 --- a/proto/indexrpcpb.proto +++ b/proto/indexrpcpb.proto @@ -14,7 +14,9 @@ enum ReqType { Put = 1; Delete = 2; Search = 3; - PeerAddr = 4; + Join = 4; + Leave = 5; + Status = 6; } message IndexReq { @@ -34,6 +36,11 @@ message ConfChangeReq { uint32 port = 3; } +message StatusResp { + string value = 1; + RespErr err = 2; +} + message GetResp { string value = 1; RespErr err = 2; diff --git a/src/client/client.rs b/src/client/client.rs index aae535d..6096317 100644 --- a/src/client/client.rs +++ b/src/client/client.rs @@ -4,10 +4,12 @@ use std::time::Duration; use grpcio::{ChannelBuilder, EnvBuilder}; use log::*; +use raft::eraftpb::{ConfChange, ConfChangeType}; use crate::proto::indexpb_grpc::IndexClient; use crate::proto::indexrpcpb::{ - DeleteResp, GetResp, IndexReq, PutResp, ReqType, RespErr, SearchResp, + ConfChangeReq, DeleteResp, GetResp, IndexReq, PutResp, RaftDone, ReqType, RespErr, SearchResp, + StatusResp, }; pub fn create_client(addr: &str) -> IndexClient { @@ -135,4 +137,85 @@ impl Clerk { thread::sleep(Duration::from_millis(100)); } } + + pub fn join(&mut self, id: u64, ip: &str, port: u16) { + let mut cc = ConfChange::new(); + cc.set_id(id); + cc.set_node_id(id); + cc.set_change_type(ConfChangeType::AddNode); + let mut req = ConfChangeReq::new(); + req.set_cc(cc); + req.set_ip(ip.to_string()); + req.set_port(port as u32); + + loop { + let reply = self.servers[self.leader_id] + .raft_conf_change(&req) + .unwrap_or_else(|e| { + error!("{:?}", e); + let mut resp = RaftDone::new(); + resp.set_err(RespErr::ErrWrongLeader); + resp + }); + match reply.err { + RespErr::OK => return, + RespErr::ErrWrongLeader => (), + RespErr::ErrNoKey => return, + } + self.leader_id = (self.leader_id + 1) % self.servers.len(); + thread::sleep(Duration::from_millis(100)); + } + } + + pub fn leave(&mut self, id: u64) { + let mut cc = ConfChange::new(); + cc.set_id(id); + cc.set_node_id(id); + cc.set_change_type(ConfChangeType::RemoveNode); + let mut req = ConfChangeReq::new(); + req.set_cc(cc); + + loop { + let reply = self.servers[self.leader_id] + .raft_conf_change(&req) + .unwrap_or_else(|e| { + error!("{:?}", e); + let mut resp = RaftDone::new(); + resp.set_err(RespErr::ErrWrongLeader); + resp + }); + match reply.err { + RespErr::OK => return, + RespErr::ErrWrongLeader => (), + RespErr::ErrNoKey => return, + } + self.leader_id = (self.leader_id + 1) % self.servers.len(); + thread::sleep(Duration::from_millis(100)); + } + } + + pub fn status(&mut self) -> String { + let mut req = IndexReq::new(); + req.set_client_id(self.client_id); + req.set_req_type(ReqType::Status); + req.set_seq(self.request_seq); + self.request_seq += 1; + + loop { + let reply = self.servers[self.leader_id] + .status(&req) + .unwrap_or_else(|_e| { + let mut resp = StatusResp::new(); + resp.set_err(RespErr::ErrWrongLeader); + resp + }); + match reply.err { + RespErr::OK => return reply.value, + RespErr::ErrWrongLeader => (), + RespErr::ErrNoKey => return String::from(""), + } + self.leader_id = (self.leader_id + 1) % self.servers.len(); + thread::sleep(Duration::from_millis(100)); + } + } } diff --git a/src/cmd.rs b/src/cmd.rs index dd864bd..6eb9487 100644 --- a/src/cmd.rs +++ b/src/cmd.rs @@ -4,3 +4,4 @@ pub mod leave; pub mod search; pub mod serve; pub mod set; +pub mod status; diff --git a/src/cmd/delete.rs b/src/cmd/delete.rs index 773fdea..eda5643 100644 --- a/src/cmd/delete.rs +++ b/src/cmd/delete.rs @@ -1,18 +1,16 @@ use clap::ArgMatches; use crate::client::client::{create_client, Clerk}; -use crate::util::log::set_log_level; +use crate::util::log::set_logger; pub fn run_delete_cli(matches: &ArgMatches) -> Result<(), String> { - set_log_level(); - env_logger::init(); + set_logger(); let servers: Vec<_> = matches .values_of("SERVERS") .unwrap() .map(|addr| create_client(addr)) .collect(); - let key = matches.value_of("KEY").unwrap(); let client_id = rand::random(); diff --git a/src/cmd/get.rs b/src/cmd/get.rs index 6ea98cf..a9eecfe 100644 --- a/src/cmd/get.rs +++ b/src/cmd/get.rs @@ -1,18 +1,16 @@ use clap::ArgMatches; use crate::client::client::{create_client, Clerk}; -use crate::util::log::set_log_level; +use crate::util::log::set_logger; pub fn run_get_cli(matches: &ArgMatches) -> Result<(), String> { - set_log_level(); - env_logger::init(); + set_logger(); let servers: Vec<_> = matches .values_of("SERVERS") .unwrap() .map(|addr| create_client(addr)) .collect(); - let key = matches.value_of("KEY").unwrap(); let client_id = rand::random(); diff --git a/src/cmd/leave.rs b/src/cmd/leave.rs index d99169a..f914e47 100644 --- a/src/cmd/leave.rs +++ b/src/cmd/leave.rs @@ -1,45 +1,22 @@ -use std::collections::HashMap; - use clap::ArgMatches; -use log::*; -use raft::eraftpb::{ConfChange, ConfChangeType}; -use crate::client::client::create_client; -use crate::proto::indexrpcpb::ConfChangeReq; -use crate::server::util::conf_change; -use crate::util::log::set_log_level; +use crate::client::client::{create_client, Clerk}; +use crate::util::log::set_logger; pub fn run_leave_cli(matches: &ArgMatches) -> Result<(), String> { - set_log_level(); - env_logger::init(); + set_logger(); - let host = matches.value_of("HOST").unwrap(); - let port = matches.value_of("PORT").unwrap().parse::().unwrap(); + let servers: Vec<_> = matches + .values_of("SERVERS") + .unwrap() + .map(|addr| create_client(addr)) + .collect(); let id = matches.value_of("ID").unwrap().parse::().unwrap(); - let mut peers = HashMap::new(); - if let Some(peers_vec) = matches.values_of("PEERS") { - peers_vec - .map(|s| { - let mut parts = s.split('='); - let id = parts.next().unwrap().parse::().unwrap(); - let addr = parts.next().unwrap(); - peers.insert(id, create_client(addr)); - }) - .count(); - } - if let Some(leader_id_str) = matches.value_of("LEADER_ID") { - let leader_id = leader_id_str.parse::().unwrap(); - let mut cc = ConfChange::new(); - cc.set_id(id); - cc.set_node_id(id); - cc.set_change_type(ConfChangeType::RemoveNode); - let mut req = ConfChangeReq::new(); - req.set_cc(cc); - req.set_ip(host.to_owned()); - req.set_port(port as u32); - conf_change(id, leader_id, &peers, req); - info!("the node was successfully removed from the cluster"); - } + + let client_id = rand::random(); + + let mut client = Clerk::new(&servers, client_id); + client.leave(id); Ok(()) } diff --git a/src/cmd/search.rs b/src/cmd/search.rs index bf59ce7..147c591 100644 --- a/src/cmd/search.rs +++ b/src/cmd/search.rs @@ -1,18 +1,16 @@ use clap::ArgMatches; use crate::client::client::{create_client, Clerk}; -use crate::util::log::set_log_level; +use crate::util::log::set_logger; pub fn run_search_cli(matches: &ArgMatches) -> Result<(), String> { - set_log_level(); - env_logger::init(); + set_logger(); let servers: Vec<_> = matches .values_of("SERVERS") .unwrap() .map(|addr| create_client(addr)) .collect(); - let query = matches.value_of("QUERY").unwrap(); let client_id = rand::random(); diff --git a/src/cmd/serve.rs b/src/cmd/serve.rs index 673d8e6..b9c231a 100644 --- a/src/cmd/serve.rs +++ b/src/cmd/serve.rs @@ -5,20 +5,17 @@ use std::sync::Arc; use clap::ArgMatches; use log::*; -use raft::eraftpb::{ConfChange, ConfChangeType}; use serde_json; use tantivy::schema::Schema; use tantivy::Index; -use crate::client::client::create_client; -use crate::proto::indexrpcpb::ConfChangeReq; +use crate::client::client::{create_client, Clerk}; +use crate::proto::indexpb_grpc::IndexClient; use crate::server::server::IndexServer; -use crate::server::util::conf_change; -use crate::util::log::set_log_level; +use crate::util::log::set_logger; pub fn run_serve_cli(matches: &ArgMatches) -> Result<(), String> { - set_log_level(); - env_logger::init(); + set_logger(); let host = matches.value_of("HOST").unwrap(); let port = matches.value_of("PORT").unwrap().parse::().unwrap(); @@ -34,19 +31,11 @@ pub fn run_serve_cli(matches: &ArgMatches) -> Result<(), String> { }) .count(); } - if let Some(leader_id_str) = matches.value_of("LEADER_ID") { - let leader_id = leader_id_str.parse::().unwrap(); - let mut cc = ConfChange::new(); - cc.set_id(id); - cc.set_node_id(id); - cc.set_change_type(ConfChangeType::AddNode); - let mut req = ConfChangeReq::new(); - req.set_cc(cc); - req.set_ip(host.to_owned()); - req.set_port(port as u32); - conf_change(id, leader_id, &peers, req); - info!("the node was successfully added to the cluster"); - } + let leader_id = matches + .value_of("LEADER_ID") + .unwrap_or("0") + .parse::() + .unwrap(); let data_directory = matches.value_of("DATA_DIRECTORY").unwrap(); let schema_file = matches.value_of("SCHEMA_FILE").unwrap(); let unique_key_field_name = matches.value_of("UNIQUE_KEY_FIELD_NAME").unwrap(); @@ -79,6 +68,7 @@ pub fn run_serve_cli(matches: &ArgMatches) -> Result<(), String> { id, host, port, + leader_id, peers, Arc::new(index), unique_key_field_name, diff --git a/src/cmd/set.rs b/src/cmd/set.rs index 20feb98..51f51d1 100644 --- a/src/cmd/set.rs +++ b/src/cmd/set.rs @@ -1,18 +1,16 @@ use clap::ArgMatches; use crate::client::client::{create_client, Clerk}; -use crate::util::log::set_log_level; +use crate::util::log::set_logger; pub fn run_set_cli(matches: &ArgMatches) -> Result<(), String> { - set_log_level(); - env_logger::init(); + set_logger(); let servers: Vec<_> = matches .values_of("SERVERS") .unwrap() .map(|addr| create_client(addr)) .collect(); - let key = matches.value_of("KEY").unwrap(); let value = matches.value_of("VALUE").unwrap(); diff --git a/src/cmd/status.rs b/src/cmd/status.rs new file mode 100644 index 0000000..5046dd7 --- /dev/null +++ b/src/cmd/status.rs @@ -0,0 +1,22 @@ +use clap::ArgMatches; + +use crate::client::client::{create_client, Clerk}; +use crate::util::log::set_logger; + +pub fn run_status_cli(matches: &ArgMatches) -> Result<(), String> { + set_logger(); + + let servers: Vec<_> = matches + .values_of("SERVERS") + .unwrap() + .map(|addr| create_client(addr)) + .collect(); + + let client_id = rand::random(); + + let mut client = Clerk::new(&servers, client_id); + let value = client.status(); + print!("{}", value); + + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index 1fbd9ff..56bebbd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,6 +12,7 @@ use bayard::cmd::leave::run_leave_cli; use bayard::cmd::search::run_search_cli; use bayard::cmd::serve::run_serve_cli; use bayard::cmd::set::run_set_cli; +use bayard::cmd::status::run_status_cli; fn main() { let app = App::new(crate_name!()) @@ -104,28 +105,44 @@ fn main() { ) ) .subcommand( - SubCommand::with_name("leave") - .name("leave") + SubCommand::with_name("status") + .name("status") .setting(AppSettings::DeriveDisplayOrder) .version(crate_version!()) .author(crate_authors!()) - .about("Remove a node from a cluster") + .about("Get cluster status") .arg( - Arg::with_name("HOST") - .help("The node address") - .short("H") - .long("host") - .value_name("HOST") - .default_value("0.0.0.0") + Arg::with_name("SERVERS") + .help("The server addresses. Use `,` to separate address. Example: `127.0.0.1:5000,127.0.0.1:5001`") + .short("s") + .long("servers") + .value_name("IP:PORT") + .default_value("127.0.0.1:5000") + .multiple(true) + .use_delimiter(true) + .require_delimiter(true) + .value_delimiter(",") .takes_value(true), ) + ) + .subcommand( + SubCommand::with_name("leave") + .name("leave") + .setting(AppSettings::DeriveDisplayOrder) + .version(crate_version!()) + .author(crate_authors!()) + .about("Remove a node from a cluster") .arg( - Arg::with_name("PORT") - .help("The gRPC listen port for client connection") - .short("P") - .long("port") - .value_name("PORT") - .default_value("5000") + Arg::with_name("SERVERS") + .help("The server addresses. Use `,` to separate address. Example: `127.0.0.1:5000,127.0.0.1:5001`") + .short("s") + .long("servers") + .value_name("IP:PORT") + .default_value("127.0.0.1:5000") + .multiple(true) + .use_delimiter(true) + .require_delimiter(true) + .value_delimiter(",") .takes_value(true), ) .arg( @@ -137,27 +154,6 @@ fn main() { .default_value("1") .takes_value(true), ) - .arg( - Arg::with_name("PEERS") - .help("Set raft peers address separated by `,`") - .short("p") - .long("peers") - .value_name("ID=IP:PORT") - .default_value("1=0.0.0.0:5000") - .multiple(true) - .takes_value(true) - .use_delimiter(true) - .require_delimiter(true) - .value_delimiter(","), - ) - .arg( - Arg::with_name("LEADER_ID") - .help("The leader node ID") - .short("l") - .long("leader-id") - .value_name("LEADER_ID") - .takes_value(true), - ) ) .subcommand( SubCommand::with_name("set") @@ -308,17 +304,13 @@ fn main() { let options = some_options.unwrap(); let run_cli = match subcommand { "serve" => run_serve_cli, + "status" => run_status_cli, "leave" => run_leave_cli, "set" => run_set_cli, "get" => run_get_cli, "delete" => run_delete_cli, "search" => run_search_cli, // "version" => run_version_cli, - // "index" => run_index_cli, - // "serve" => run_serve_cli, - // "search" => run_search_cli, - // "merge" => run_merge_cli, - // "bench" => run_bench_cli, _ => panic!("Subcommand {} is unknown", subcommand), }; diff --git a/src/proto/indexpb.rs b/src/proto/indexpb.rs index 806a38b..8a427d8 100644 --- a/src/proto/indexpb.rs +++ b/src/proto/indexpb.rs @@ -28,33 +28,37 @@ const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_8_0; static file_descriptor_proto_data: &'static [u8] = b"\ \n\rindexpb.proto\x12\x07indexpb\x1a\x10indexrpcpb.proto\x1a\reraftpb.pr\ - oto2\xda\x02\n\x05Index\x122\n\x03Get\x12\x14.indexrpcpb.IndexReq\x1a\ + oto2\x94\x03\n\x05Index\x122\n\x03Get\x12\x14.indexrpcpb.IndexReq\x1a\ \x13.indexrpcpb.GetResp\"\0\x122\n\x03Put\x12\x14.indexrpcpb.IndexReq\ \x1a\x13.indexrpcpb.PutResp\"\0\x128\n\x06Delete\x12\x14.indexrpcpb.Inde\ xReq\x1a\x16.indexrpcpb.DeleteResp\"\0\x128\n\x06Search\x12\x14.indexrpc\ pb.IndexReq\x1a\x16.indexrpcpb.SearchResp\"\0\x120\n\x04Raft\x12\x10.era\ ftpb.Message\x1a\x14.indexrpcpb.RaftDone\"\0\x12C\n\x0eRaftConfChange\ - \x12\x19.indexrpcpb.ConfChangeReq\x1a\x14.indexrpcpb.RaftDone\"\0J\x9a\ - \x03\n\x06\x12\x04\0\0\x13\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\x08\n\ - \x01\x02\x12\x03\x01\0\x10\n\t\n\x02\x03\0\x12\x03\x03\0\x1a\n\t\n\x02\ - \x03\x01\x12\x03\x04\0\x17\n\n\n\x02\x06\0\x12\x04\x06\0\x13\x01\n\n\n\ - \x03\x06\0\x01\x12\x03\x06\x08\r\n\x0c\n\x04\x06\0\x02\0\x12\x04\x07\x04\ - \x08\x05\n\x0c\n\x05\x06\0\x02\0\x01\x12\x03\x07\x08\x0b\n\x0c\n\x05\x06\ - \0\x02\0\x02\x12\x03\x07\r\x20\n\x0c\n\x05\x06\0\x02\0\x03\x12\x03\x07+=\ - \n\x0c\n\x04\x06\0\x02\x01\x12\x04\t\x04\n\x05\n\x0c\n\x05\x06\0\x02\x01\ - \x01\x12\x03\t\x08\x0b\n\x0c\n\x05\x06\0\x02\x01\x02\x12\x03\t\r\x20\n\ - \x0c\n\x05\x06\0\x02\x01\x03\x12\x03\t+=\n\x0c\n\x04\x06\0\x02\x02\x12\ - \x04\x0b\x04\x0c\x05\n\x0c\n\x05\x06\0\x02\x02\x01\x12\x03\x0b\x08\x0e\n\ - \x0c\n\x05\x06\0\x02\x02\x02\x12\x03\x0b\x10#\n\x0c\n\x05\x06\0\x02\x02\ - \x03\x12\x03\x0b.C\n\x0c\n\x04\x06\0\x02\x03\x12\x04\r\x04\x0e\x05\n\x0c\ - \n\x05\x06\0\x02\x03\x01\x12\x03\r\x08\x0e\n\x0c\n\x05\x06\0\x02\x03\x02\ - \x12\x03\r\x10#\n\x0c\n\x05\x06\0\x02\x03\x03\x12\x03\r.C\n\x0c\n\x04\ - \x06\0\x02\x04\x12\x04\x0f\x04\x10\x05\n\x0c\n\x05\x06\0\x02\x04\x01\x12\ - \x03\x0f\x08\x0c\n\x0c\n\x05\x06\0\x02\x04\x02\x12\x03\x0f\x0e\x1d\n\x0c\ - \n\x05\x06\0\x02\x04\x03\x12\x03\x0f(;\n\x0c\n\x04\x06\0\x02\x05\x12\x04\ - \x11\x04\x12\x05\n\x0c\n\x05\x06\0\x02\x05\x01\x12\x03\x11\x08\x16\n\x0c\ - \n\x05\x06\0\x02\x05\x02\x12\x03\x11\x180\n\x0c\n\x05\x06\0\x02\x05\x03\ - \x12\x03\x11;Nb\x06proto3\ + \x12\x19.indexrpcpb.ConfChangeReq\x1a\x14.indexrpcpb.RaftDone\"\0\x128\n\ + \x06Status\x12\x14.indexrpcpb.IndexReq\x1a\x16.indexrpcpb.StatusResp\"\0\ + J\xd2\x03\n\x06\x12\x04\0\0\x15\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\ + \x08\n\x01\x02\x12\x03\x01\0\x10\n\t\n\x02\x03\0\x12\x03\x03\0\x1a\n\t\n\ + \x02\x03\x01\x12\x03\x04\0\x17\n\n\n\x02\x06\0\x12\x04\x06\0\x15\x01\n\n\ + \n\x03\x06\0\x01\x12\x03\x06\x08\r\n\x0c\n\x04\x06\0\x02\0\x12\x04\x07\ + \x04\x08\x05\n\x0c\n\x05\x06\0\x02\0\x01\x12\x03\x07\x08\x0b\n\x0c\n\x05\ + \x06\0\x02\0\x02\x12\x03\x07\r\x20\n\x0c\n\x05\x06\0\x02\0\x03\x12\x03\ + \x07+=\n\x0c\n\x04\x06\0\x02\x01\x12\x04\t\x04\n\x05\n\x0c\n\x05\x06\0\ + \x02\x01\x01\x12\x03\t\x08\x0b\n\x0c\n\x05\x06\0\x02\x01\x02\x12\x03\t\r\ + \x20\n\x0c\n\x05\x06\0\x02\x01\x03\x12\x03\t+=\n\x0c\n\x04\x06\0\x02\x02\ + \x12\x04\x0b\x04\x0c\x05\n\x0c\n\x05\x06\0\x02\x02\x01\x12\x03\x0b\x08\ + \x0e\n\x0c\n\x05\x06\0\x02\x02\x02\x12\x03\x0b\x10#\n\x0c\n\x05\x06\0\ + \x02\x02\x03\x12\x03\x0b.C\n\x0c\n\x04\x06\0\x02\x03\x12\x04\r\x04\x0e\ + \x05\n\x0c\n\x05\x06\0\x02\x03\x01\x12\x03\r\x08\x0e\n\x0c\n\x05\x06\0\ + \x02\x03\x02\x12\x03\r\x10#\n\x0c\n\x05\x06\0\x02\x03\x03\x12\x03\r.C\n\ + \x0c\n\x04\x06\0\x02\x04\x12\x04\x0f\x04\x10\x05\n\x0c\n\x05\x06\0\x02\ + \x04\x01\x12\x03\x0f\x08\x0c\n\x0c\n\x05\x06\0\x02\x04\x02\x12\x03\x0f\ + \x0e\x1d\n\x0c\n\x05\x06\0\x02\x04\x03\x12\x03\x0f(;\n\x0c\n\x04\x06\0\ + \x02\x05\x12\x04\x11\x04\x12\x05\n\x0c\n\x05\x06\0\x02\x05\x01\x12\x03\ + \x11\x08\x16\n\x0c\n\x05\x06\0\x02\x05\x02\x12\x03\x11\x180\n\x0c\n\x05\ + \x06\0\x02\x05\x03\x12\x03\x11;N\n\x0c\n\x04\x06\0\x02\x06\x12\x04\x13\ + \x04\x14\x05\n\x0c\n\x05\x06\0\x02\x06\x01\x12\x03\x13\x08\x0e\n\x0c\n\ + \x05\x06\0\x02\x06\x02\x12\x03\x13\x10#\n\x0c\n\x05\x06\0\x02\x06\x03\ + \x12\x03\x13.Cb\x06proto3\ "; static mut file_descriptor_proto_lazy: ::protobuf::lazy::Lazy<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::lazy::Lazy { diff --git a/src/proto/indexpb_grpc.rs b/src/proto/indexpb_grpc.rs index c8504df..fb6f698 100644 --- a/src/proto/indexpb_grpc.rs +++ b/src/proto/indexpb_grpc.rs @@ -60,6 +60,13 @@ const METHOD_INDEX_RAFT_CONF_CHANGE: ::grpcio::Method = ::grpcio::Method { + ty: ::grpcio::MethodType::Unary, + name: "/indexpb.Index/Status", + req_mar: ::grpcio::Marshaller { ser: ::grpcio::pb_ser, de: ::grpcio::pb_de }, + resp_mar: ::grpcio::Marshaller { ser: ::grpcio::pb_ser, de: ::grpcio::pb_de }, +}; + #[derive(Clone)] pub struct IndexClient { client: ::grpcio::Client, @@ -167,6 +174,22 @@ impl IndexClient { pub fn raft_conf_change_async(&self, req: &super::indexrpcpb::ConfChangeReq) -> ::grpcio::Result<::grpcio::ClientUnaryReceiver> { self.raft_conf_change_async_opt(req, ::grpcio::CallOption::default()) } + + pub fn status_opt(&self, req: &super::indexrpcpb::IndexReq, opt: ::grpcio::CallOption) -> ::grpcio::Result { + self.client.unary_call(&METHOD_INDEX_STATUS, req, opt) + } + + pub fn status(&self, req: &super::indexrpcpb::IndexReq) -> ::grpcio::Result { + self.status_opt(req, ::grpcio::CallOption::default()) + } + + pub fn status_async_opt(&self, req: &super::indexrpcpb::IndexReq, opt: ::grpcio::CallOption) -> ::grpcio::Result<::grpcio::ClientUnaryReceiver> { + self.client.unary_call_async(&METHOD_INDEX_STATUS, req, opt) + } + + pub fn status_async(&self, req: &super::indexrpcpb::IndexReq) -> ::grpcio::Result<::grpcio::ClientUnaryReceiver> { + self.status_async_opt(req, ::grpcio::CallOption::default()) + } pub fn spawn(&self, f: F) where F: ::futures::Future + Send + 'static { self.client.spawn(f) } @@ -179,6 +202,7 @@ pub trait Index { fn search(&mut self, ctx: ::grpcio::RpcContext, req: super::indexrpcpb::IndexReq, sink: ::grpcio::UnarySink); fn raft(&mut self, ctx: ::grpcio::RpcContext, req: super::eraftpb::Message, sink: ::grpcio::UnarySink); fn raft_conf_change(&mut self, ctx: ::grpcio::RpcContext, req: super::indexrpcpb::ConfChangeReq, sink: ::grpcio::UnarySink); + fn status(&mut self, ctx: ::grpcio::RpcContext, req: super::indexrpcpb::IndexReq, sink: ::grpcio::UnarySink); } pub fn create_index(s: S) -> ::grpcio::Service { @@ -207,5 +231,9 @@ pub fn create_index(s: S) -> ::grpcio::Servic builder = builder.add_unary_handler(&METHOD_INDEX_RAFT_CONF_CHANGE, move |ctx, req, resp| { instance.raft_conf_change(ctx, req, resp) }); + let mut instance = s.clone(); + builder = builder.add_unary_handler(&METHOD_INDEX_STATUS, move |ctx, req, resp| { + instance.status(ctx, req, resp) + }); builder.build() } diff --git a/src/proto/indexrpcpb.rs b/src/proto/indexrpcpb.rs index 1a0608e..e2b87b2 100644 --- a/src/proto/indexrpcpb.rs +++ b/src/proto/indexrpcpb.rs @@ -718,6 +718,206 @@ impl ::protobuf::reflect::ProtobufValue for ConfChangeReq { } } +#[derive(PartialEq,Clone,Default)] +pub struct StatusResp { + // message fields + pub value: ::std::string::String, + pub err: RespErr, + // special fields + pub unknown_fields: ::protobuf::UnknownFields, + pub cached_size: ::protobuf::CachedSize, +} + +impl<'a> ::std::default::Default for &'a StatusResp { + fn default() -> &'a StatusResp { + ::default_instance() + } +} + +impl StatusResp { + pub fn new() -> StatusResp { + ::std::default::Default::default() + } + + // string value = 1; + + + pub fn get_value(&self) -> &str { + &self.value + } + pub fn clear_value(&mut self) { + self.value.clear(); + } + + // Param is passed by value, moved + pub fn set_value(&mut self, v: ::std::string::String) { + self.value = v; + } + + // Mutable pointer to the field. + // If field is not initialized, it is initialized with default value first. + pub fn mut_value(&mut self) -> &mut ::std::string::String { + &mut self.value + } + + // Take field + pub fn take_value(&mut self) -> ::std::string::String { + ::std::mem::replace(&mut self.value, ::std::string::String::new()) + } + + // .indexrpcpb.RespErr err = 2; + + + pub fn get_err(&self) -> RespErr { + self.err + } + pub fn clear_err(&mut self) { + self.err = RespErr::OK; + } + + // Param is passed by value, moved + pub fn set_err(&mut self, v: RespErr) { + self.err = v; + } +} + +impl ::protobuf::Message for StatusResp { + fn is_initialized(&self) -> bool { + true + } + + fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream) -> ::protobuf::ProtobufResult<()> { + while !is.eof()? { + let (field_number, wire_type) = is.read_tag_unpack()?; + match field_number { + 1 => { + ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.value)?; + }, + 2 => { + ::protobuf::rt::read_proto3_enum_with_unknown_fields_into(wire_type, is, &mut self.err, 2, &mut self.unknown_fields)? + }, + _ => { + ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; + }, + }; + } + ::std::result::Result::Ok(()) + } + + // Compute sizes of nested messages + #[allow(unused_variables)] + fn compute_size(&self) -> u32 { + let mut my_size = 0; + if !self.value.is_empty() { + my_size += ::protobuf::rt::string_size(1, &self.value); + } + if self.err != RespErr::OK { + my_size += ::protobuf::rt::enum_size(2, self.err); + } + my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); + self.cached_size.set(my_size); + my_size + } + + fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream) -> ::protobuf::ProtobufResult<()> { + if !self.value.is_empty() { + os.write_string(1, &self.value)?; + } + if self.err != RespErr::OK { + os.write_enum(2, self.err.value())?; + } + os.write_unknown_fields(self.get_unknown_fields())?; + ::std::result::Result::Ok(()) + } + + fn get_cached_size(&self) -> u32 { + self.cached_size.get() + } + + fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { + &self.unknown_fields + } + + fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { + &mut self.unknown_fields + } + + fn as_any(&self) -> &dyn (::std::any::Any) { + self as &dyn (::std::any::Any) + } + fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) { + self as &mut dyn (::std::any::Any) + } + fn into_any(self: Box) -> ::std::boxed::Box { + self + } + + fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { + Self::descriptor_static() + } + + fn new() -> StatusResp { + StatusResp::new() + } + + fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor { + static mut descriptor: ::protobuf::lazy::Lazy<::protobuf::reflect::MessageDescriptor> = ::protobuf::lazy::Lazy { + lock: ::protobuf::lazy::ONCE_INIT, + ptr: 0 as *const ::protobuf::reflect::MessageDescriptor, + }; + unsafe { + descriptor.get(|| { + let mut fields = ::std::vec::Vec::new(); + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( + "value", + |m: &StatusResp| { &m.value }, + |m: &mut StatusResp| { &mut m.value }, + )); + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeEnum>( + "err", + |m: &StatusResp| { &m.err }, + |m: &mut StatusResp| { &mut m.err }, + )); + ::protobuf::reflect::MessageDescriptor::new::( + "StatusResp", + fields, + file_descriptor_proto() + ) + }) + } + } + + fn default_instance() -> &'static StatusResp { + static mut instance: ::protobuf::lazy::Lazy = ::protobuf::lazy::Lazy { + lock: ::protobuf::lazy::ONCE_INIT, + ptr: 0 as *const StatusResp, + }; + unsafe { + instance.get(StatusResp::new) + } + } +} + +impl ::protobuf::Clear for StatusResp { + fn clear(&mut self) { + self.value.clear(); + self.err = RespErr::OK; + self.unknown_fields.clear(); + } +} + +impl ::std::fmt::Debug for StatusResp { + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + ::protobuf::text_format::fmt(self, f) + } +} + +impl ::protobuf::reflect::ProtobufValue for StatusResp { + fn as_ref(&self) -> ::protobuf::reflect::ProtobufValueRef { + ::protobuf::reflect::ProtobufValueRef::Message(self) + } +} + #[derive(PartialEq,Clone,Default)] pub struct GetResp { // message fields @@ -1656,7 +1856,9 @@ pub enum ReqType { Put = 1, Delete = 2, Search = 3, - PeerAddr = 4, + Join = 4, + Leave = 5, + Status = 6, } impl ::protobuf::ProtobufEnum for ReqType { @@ -1670,7 +1872,9 @@ impl ::protobuf::ProtobufEnum for ReqType { 1 => ::std::option::Option::Some(ReqType::Put), 2 => ::std::option::Option::Some(ReqType::Delete), 3 => ::std::option::Option::Some(ReqType::Search), - 4 => ::std::option::Option::Some(ReqType::PeerAddr), + 4 => ::std::option::Option::Some(ReqType::Join), + 5 => ::std::option::Option::Some(ReqType::Leave), + 6 => ::std::option::Option::Some(ReqType::Status), _ => ::std::option::Option::None } } @@ -1681,7 +1885,9 @@ impl ::protobuf::ProtobufEnum for ReqType { ReqType::Put, ReqType::Delete, ReqType::Search, - ReqType::PeerAddr, + ReqType::Join, + ReqType::Leave, + ReqType::Status, ]; values } @@ -1724,107 +1930,121 @@ static file_descriptor_proto_data: &'static [u8] = b"\ alue\x18\x07\x20\x01(\tR\x05value\x12\x14\n\x05query\x18\x08\x20\x01(\tR\ \x05query\"X\n\rConfChangeReq\x12#\n\x02cc\x18\x01\x20\x01(\x0b2\x13.era\ ftpb.ConfChangeR\x02cc\x12\x0e\n\x02ip\x18\x02\x20\x01(\tR\x02ip\x12\x12\ - \n\x04port\x18\x03\x20\x01(\rR\x04port\"F\n\x07GetResp\x12\x14\n\x05valu\ - e\x18\x01\x20\x01(\tR\x05value\x12%\n\x03err\x18\x02\x20\x01(\x0e2\x13.i\ - ndexrpcpb.RespErrR\x03err\"0\n\x07PutResp\x12%\n\x03err\x18\x01\x20\x01(\ - \x0e2\x13.indexrpcpb.RespErrR\x03err\"3\n\nDeleteResp\x12%\n\x03err\x18\ - \x01\x20\x01(\x0e2\x13.indexrpcpb.RespErrR\x03err\"I\n\nSearchResp\x12\ - \x14\n\x05value\x18\x01\x20\x01(\tR\x05value\x12%\n\x03err\x18\x02\x20\ - \x01(\x0e2\x13.indexrpcpb.RespErrR\x03err\"1\n\x08RaftDone\x12%\n\x03err\ - \x18\x01\x20\x01(\x0e2\x13.indexrpcpb.RespErrR\x03err*3\n\x07RespErr\x12\ - \x06\n\x02OK\x10\0\x12\x12\n\x0eErrWrongLeader\x10\x01\x12\x0c\n\x08ErrN\ - oKey\x10\x02*A\n\x07ReqType\x12\x07\n\x03Get\x10\0\x12\x07\n\x03Put\x10\ - \x01\x12\n\n\x06Delete\x10\x02\x12\n\n\x06Search\x10\x03\x12\x0c\n\x08Pe\ - erAddr\x10\x04J\xb3\x0e\n\x06\x12\x04\0\08\x01\n\x08\n\x01\x0c\x12\x03\0\ - \0\x12\n\x08\n\x01\x02\x12\x03\x01\0\x13\n\t\n\x02\x03\0\x12\x03\x03\0\ - \x17\n\n\n\x02\x05\0\x12\x04\x05\0\t\x01\n\n\n\x03\x05\0\x01\x12\x03\x05\ - \x05\x0c\n\x0b\n\x04\x05\0\x02\0\x12\x03\x06\x04\x0b\n\x0c\n\x05\x05\0\ - \x02\0\x01\x12\x03\x06\x04\x06\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x06\t\ - \n\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x07\x04\x17\n\x0c\n\x05\x05\0\x02\ - \x01\x01\x12\x03\x07\x04\x12\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\x07\ - \x15\x16\n\x0b\n\x04\x05\0\x02\x02\x12\x03\x08\x04\x11\n\x0c\n\x05\x05\0\ - \x02\x02\x01\x12\x03\x08\x04\x0c\n\x0c\n\x05\x05\0\x02\x02\x02\x12\x03\ - \x08\x0f\x10\n\n\n\x02\x05\x01\x12\x04\x0b\0\x11\x01\n\n\n\x03\x05\x01\ - \x01\x12\x03\x0b\x05\x0c\n\x0b\n\x04\x05\x01\x02\0\x12\x03\x0c\x04\x0c\n\ - \x0c\n\x05\x05\x01\x02\0\x01\x12\x03\x0c\x04\x07\n\x0c\n\x05\x05\x01\x02\ - \0\x02\x12\x03\x0c\n\x0b\n\x0b\n\x04\x05\x01\x02\x01\x12\x03\r\x04\x0c\n\ - \x0c\n\x05\x05\x01\x02\x01\x01\x12\x03\r\x04\x07\n\x0c\n\x05\x05\x01\x02\ - \x01\x02\x12\x03\r\n\x0b\n\x0b\n\x04\x05\x01\x02\x02\x12\x03\x0e\x04\x0f\ - \n\x0c\n\x05\x05\x01\x02\x02\x01\x12\x03\x0e\x04\n\n\x0c\n\x05\x05\x01\ - \x02\x02\x02\x12\x03\x0e\r\x0e\n\x0b\n\x04\x05\x01\x02\x03\x12\x03\x0f\ - \x04\x0f\n\x0c\n\x05\x05\x01\x02\x03\x01\x12\x03\x0f\x04\n\n\x0c\n\x05\ - \x05\x01\x02\x03\x02\x12\x03\x0f\r\x0e\n\x0b\n\x04\x05\x01\x02\x04\x12\ - \x03\x10\x04\x11\n\x0c\n\x05\x05\x01\x02\x04\x01\x12\x03\x10\x04\x0c\n\ - \x0c\n\x05\x05\x01\x02\x04\x02\x12\x03\x10\x0f\x10\n\n\n\x02\x04\0\x12\ - \x04\x13\0\x1c\x01\n\n\n\x03\x04\0\x01\x12\x03\x13\x08\x10\n\x0b\n\x04\ - \x04\0\x02\0\x12\x03\x14\x04\x19\n\r\n\x05\x04\0\x02\0\x04\x12\x04\x14\ - \x04\x13\x12\n\x0c\n\x05\x04\0\x02\0\x05\x12\x03\x14\x04\n\n\x0c\n\x05\ - \x04\0\x02\0\x01\x12\x03\x14\x0b\x14\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\ - \x14\x17\x18\n\x0b\n\x04\x04\0\x02\x01\x12\x03\x15\x04\x13\n\r\n\x05\x04\ - \0\x02\x01\x04\x12\x04\x15\x04\x14\x19\n\x0c\n\x05\x04\0\x02\x01\x05\x12\ - \x03\x15\x04\n\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x15\x0b\x0e\n\x0c\n\ - \x05\x04\0\x02\x01\x03\x12\x03\x15\x11\x12\n\x0b\n\x04\x04\0\x02\x02\x12\ - \x03\x16\x04\x19\n\r\n\x05\x04\0\x02\x02\x04\x12\x04\x16\x04\x15\x13\n\ - \x0c\n\x05\x04\0\x02\x02\x06\x12\x03\x16\x04\x0b\n\x0c\n\x05\x04\0\x02\ - \x02\x01\x12\x03\x16\x0c\x14\n\x0c\n\x05\x04\0\x02\x02\x03\x12\x03\x16\ - \x17\x18\n\x0b\n\x04\x04\0\x02\x03\x12\x03\x17\x04\x17\n\r\n\x05\x04\0\ - \x02\x03\x04\x12\x04\x17\x04\x16\x19\n\x0c\n\x05\x04\0\x02\x03\x05\x12\ - \x03\x17\x04\n\n\x0c\n\x05\x04\0\x02\x03\x01\x12\x03\x17\x0b\x12\n\x0c\n\ - \x05\x04\0\x02\x03\x03\x12\x03\x17\x15\x16\n\x0b\n\x04\x04\0\x02\x04\x12\ - \x03\x18\x04\x19\n\r\n\x05\x04\0\x02\x04\x04\x12\x04\x18\x04\x17\x17\n\ - \x0c\n\x05\x04\0\x02\x04\x05\x12\x03\x18\x04\n\n\x0c\n\x05\x04\0\x02\x04\ - \x01\x12\x03\x18\x0b\x14\n\x0c\n\x05\x04\0\x02\x04\x03\x12\x03\x18\x17\ - \x18\n\x0b\n\x04\x04\0\x02\x05\x12\x03\x19\x04\x13\n\r\n\x05\x04\0\x02\ - \x05\x04\x12\x04\x19\x04\x18\x19\n\x0c\n\x05\x04\0\x02\x05\x05\x12\x03\ - \x19\x04\n\n\x0c\n\x05\x04\0\x02\x05\x01\x12\x03\x19\x0b\x0e\n\x0c\n\x05\ - \x04\0\x02\x05\x03\x12\x03\x19\x11\x12\n\x0b\n\x04\x04\0\x02\x06\x12\x03\ - \x1a\x04\x15\n\r\n\x05\x04\0\x02\x06\x04\x12\x04\x1a\x04\x19\x13\n\x0c\n\ - \x05\x04\0\x02\x06\x05\x12\x03\x1a\x04\n\n\x0c\n\x05\x04\0\x02\x06\x01\ - \x12\x03\x1a\x0b\x10\n\x0c\n\x05\x04\0\x02\x06\x03\x12\x03\x1a\x13\x14\n\ - \x0b\n\x04\x04\0\x02\x07\x12\x03\x1b\x04\x15\n\r\n\x05\x04\0\x02\x07\x04\ - \x12\x04\x1b\x04\x1a\x15\n\x0c\n\x05\x04\0\x02\x07\x05\x12\x03\x1b\x04\n\ - \n\x0c\n\x05\x04\0\x02\x07\x01\x12\x03\x1b\x0b\x10\n\x0c\n\x05\x04\0\x02\ - \x07\x03\x12\x03\x1b\x13\x14\n\n\n\x02\x04\x01\x12\x04\x1e\0\"\x01\n\n\n\ - \x03\x04\x01\x01\x12\x03\x1e\x08\x15\n\x0b\n\x04\x04\x01\x02\0\x12\x03\ - \x1f\x04\x1e\n\r\n\x05\x04\x01\x02\0\x04\x12\x04\x1f\x04\x1e\x17\n\x0c\n\ - \x05\x04\x01\x02\0\x06\x12\x03\x1f\x04\x16\n\x0c\n\x05\x04\x01\x02\0\x01\ - \x12\x03\x1f\x17\x19\n\x0c\n\x05\x04\x01\x02\0\x03\x12\x03\x1f\x1c\x1d\n\ - \x0b\n\x04\x04\x01\x02\x01\x12\x03\x20\x04\x12\n\r\n\x05\x04\x01\x02\x01\ - \x04\x12\x04\x20\x04\x1f\x1e\n\x0c\n\x05\x04\x01\x02\x01\x05\x12\x03\x20\ - \x04\n\n\x0c\n\x05\x04\x01\x02\x01\x01\x12\x03\x20\x0b\r\n\x0c\n\x05\x04\ - \x01\x02\x01\x03\x12\x03\x20\x10\x11\n\x0b\n\x04\x04\x01\x02\x02\x12\x03\ - !\x04\x14\n\r\n\x05\x04\x01\x02\x02\x04\x12\x04!\x04\x20\x12\n\x0c\n\x05\ - \x04\x01\x02\x02\x05\x12\x03!\x04\n\n\x0c\n\x05\x04\x01\x02\x02\x01\x12\ - \x03!\x0b\x0f\n\x0c\n\x05\x04\x01\x02\x02\x03\x12\x03!\x12\x13\n\n\n\x02\ - \x04\x02\x12\x04$\0'\x01\n\n\n\x03\x04\x02\x01\x12\x03$\x08\x0f\n\x0b\n\ - \x04\x04\x02\x02\0\x12\x03%\x04\x15\n\r\n\x05\x04\x02\x02\0\x04\x12\x04%\ - \x04$\x11\n\x0c\n\x05\x04\x02\x02\0\x05\x12\x03%\x04\n\n\x0c\n\x05\x04\ - \x02\x02\0\x01\x12\x03%\x0b\x10\n\x0c\n\x05\x04\x02\x02\0\x03\x12\x03%\ - \x13\x14\n\x0b\n\x04\x04\x02\x02\x01\x12\x03&\x04\x14\n\r\n\x05\x04\x02\ - \x02\x01\x04\x12\x04&\x04%\x15\n\x0c\n\x05\x04\x02\x02\x01\x06\x12\x03&\ - \x04\x0b\n\x0c\n\x05\x04\x02\x02\x01\x01\x12\x03&\x0c\x0f\n\x0c\n\x05\ - \x04\x02\x02\x01\x03\x12\x03&\x12\x13\n\n\n\x02\x04\x03\x12\x04)\0+\x01\ - \n\n\n\x03\x04\x03\x01\x12\x03)\x08\x0f\n\x0b\n\x04\x04\x03\x02\0\x12\ - \x03*\x04\x14\n\r\n\x05\x04\x03\x02\0\x04\x12\x04*\x04)\x11\n\x0c\n\x05\ - \x04\x03\x02\0\x06\x12\x03*\x04\x0b\n\x0c\n\x05\x04\x03\x02\0\x01\x12\ - \x03*\x0c\x0f\n\x0c\n\x05\x04\x03\x02\0\x03\x12\x03*\x12\x13\n\n\n\x02\ - \x04\x04\x12\x04-\0/\x01\n\n\n\x03\x04\x04\x01\x12\x03-\x08\x12\n\x0b\n\ - \x04\x04\x04\x02\0\x12\x03.\x04\x14\n\r\n\x05\x04\x04\x02\0\x04\x12\x04.\ - \x04-\x14\n\x0c\n\x05\x04\x04\x02\0\x06\x12\x03.\x04\x0b\n\x0c\n\x05\x04\ - \x04\x02\0\x01\x12\x03.\x0c\x0f\n\x0c\n\x05\x04\x04\x02\0\x03\x12\x03.\ - \x12\x13\n\n\n\x02\x04\x05\x12\x041\04\x01\n\n\n\x03\x04\x05\x01\x12\x03\ - 1\x08\x12\n\x0b\n\x04\x04\x05\x02\0\x12\x032\x04\x15\n\r\n\x05\x04\x05\ - \x02\0\x04\x12\x042\x041\x14\n\x0c\n\x05\x04\x05\x02\0\x05\x12\x032\x04\ - \n\n\x0c\n\x05\x04\x05\x02\0\x01\x12\x032\x0b\x10\n\x0c\n\x05\x04\x05\ - \x02\0\x03\x12\x032\x13\x14\n\x0b\n\x04\x04\x05\x02\x01\x12\x033\x04\x14\ - \n\r\n\x05\x04\x05\x02\x01\x04\x12\x043\x042\x15\n\x0c\n\x05\x04\x05\x02\ - \x01\x06\x12\x033\x04\x0b\n\x0c\n\x05\x04\x05\x02\x01\x01\x12\x033\x0c\ - \x0f\n\x0c\n\x05\x04\x05\x02\x01\x03\x12\x033\x12\x13\n\n\n\x02\x04\x06\ - \x12\x046\08\x01\n\n\n\x03\x04\x06\x01\x12\x036\x08\x10\n\x0b\n\x04\x04\ - \x06\x02\0\x12\x037\x04\x14\n\r\n\x05\x04\x06\x02\0\x04\x12\x047\x046\ - \x12\n\x0c\n\x05\x04\x06\x02\0\x06\x12\x037\x04\x0b\n\x0c\n\x05\x04\x06\ - \x02\0\x01\x12\x037\x0c\x0f\n\x0c\n\x05\x04\x06\x02\0\x03\x12\x037\x12\ - \x13b\x06proto3\ + \n\x04port\x18\x03\x20\x01(\rR\x04port\"I\n\nStatusResp\x12\x14\n\x05val\ + ue\x18\x01\x20\x01(\tR\x05value\x12%\n\x03err\x18\x02\x20\x01(\x0e2\x13.\ + indexrpcpb.RespErrR\x03err\"F\n\x07GetResp\x12\x14\n\x05value\x18\x01\ + \x20\x01(\tR\x05value\x12%\n\x03err\x18\x02\x20\x01(\x0e2\x13.indexrpcpb\ + .RespErrR\x03err\"0\n\x07PutResp\x12%\n\x03err\x18\x01\x20\x01(\x0e2\x13\ + .indexrpcpb.RespErrR\x03err\"3\n\nDeleteResp\x12%\n\x03err\x18\x01\x20\ + \x01(\x0e2\x13.indexrpcpb.RespErrR\x03err\"I\n\nSearchResp\x12\x14\n\x05\ + value\x18\x01\x20\x01(\tR\x05value\x12%\n\x03err\x18\x02\x20\x01(\x0e2\ + \x13.indexrpcpb.RespErrR\x03err\"1\n\x08RaftDone\x12%\n\x03err\x18\x01\ + \x20\x01(\x0e2\x13.indexrpcpb.RespErrR\x03err*3\n\x07RespErr\x12\x06\n\ + \x02OK\x10\0\x12\x12\n\x0eErrWrongLeader\x10\x01\x12\x0c\n\x08ErrNoKey\ + \x10\x02*T\n\x07ReqType\x12\x07\n\x03Get\x10\0\x12\x07\n\x03Put\x10\x01\ + \x12\n\n\x06Delete\x10\x02\x12\n\n\x06Search\x10\x03\x12\x08\n\x04Join\ + \x10\x04\x12\t\n\x05Leave\x10\x05\x12\n\n\x06Status\x10\x06J\xa9\x10\n\ + \x06\x12\x04\0\0?\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\x08\n\x01\x02\ + \x12\x03\x01\0\x13\n\t\n\x02\x03\0\x12\x03\x03\0\x17\n\n\n\x02\x05\0\x12\ + \x04\x05\0\t\x01\n\n\n\x03\x05\0\x01\x12\x03\x05\x05\x0c\n\x0b\n\x04\x05\ + \0\x02\0\x12\x03\x06\x04\x0b\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x06\x04\ + \x06\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x06\t\n\n\x0b\n\x04\x05\0\x02\ + \x01\x12\x03\x07\x04\x17\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\x07\x04\ + \x12\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\x07\x15\x16\n\x0b\n\x04\x05\0\ + \x02\x02\x12\x03\x08\x04\x11\n\x0c\n\x05\x05\0\x02\x02\x01\x12\x03\x08\ + \x04\x0c\n\x0c\n\x05\x05\0\x02\x02\x02\x12\x03\x08\x0f\x10\n\n\n\x02\x05\ + \x01\x12\x04\x0b\0\x13\x01\n\n\n\x03\x05\x01\x01\x12\x03\x0b\x05\x0c\n\ + \x0b\n\x04\x05\x01\x02\0\x12\x03\x0c\x04\x0c\n\x0c\n\x05\x05\x01\x02\0\ + \x01\x12\x03\x0c\x04\x07\n\x0c\n\x05\x05\x01\x02\0\x02\x12\x03\x0c\n\x0b\ + \n\x0b\n\x04\x05\x01\x02\x01\x12\x03\r\x04\x0c\n\x0c\n\x05\x05\x01\x02\ + \x01\x01\x12\x03\r\x04\x07\n\x0c\n\x05\x05\x01\x02\x01\x02\x12\x03\r\n\ + \x0b\n\x0b\n\x04\x05\x01\x02\x02\x12\x03\x0e\x04\x0f\n\x0c\n\x05\x05\x01\ + \x02\x02\x01\x12\x03\x0e\x04\n\n\x0c\n\x05\x05\x01\x02\x02\x02\x12\x03\ + \x0e\r\x0e\n\x0b\n\x04\x05\x01\x02\x03\x12\x03\x0f\x04\x0f\n\x0c\n\x05\ + \x05\x01\x02\x03\x01\x12\x03\x0f\x04\n\n\x0c\n\x05\x05\x01\x02\x03\x02\ + \x12\x03\x0f\r\x0e\n\x0b\n\x04\x05\x01\x02\x04\x12\x03\x10\x04\r\n\x0c\n\ + \x05\x05\x01\x02\x04\x01\x12\x03\x10\x04\x08\n\x0c\n\x05\x05\x01\x02\x04\ + \x02\x12\x03\x10\x0b\x0c\n\x0b\n\x04\x05\x01\x02\x05\x12\x03\x11\x04\x0e\ + \n\x0c\n\x05\x05\x01\x02\x05\x01\x12\x03\x11\x04\t\n\x0c\n\x05\x05\x01\ + \x02\x05\x02\x12\x03\x11\x0c\r\n\x0b\n\x04\x05\x01\x02\x06\x12\x03\x12\ + \x04\x0f\n\x0c\n\x05\x05\x01\x02\x06\x01\x12\x03\x12\x04\n\n\x0c\n\x05\ + \x05\x01\x02\x06\x02\x12\x03\x12\r\x0e\n\n\n\x02\x04\0\x12\x04\x15\0\x1e\ + \x01\n\n\n\x03\x04\0\x01\x12\x03\x15\x08\x10\n\x0b\n\x04\x04\0\x02\0\x12\ + \x03\x16\x04\x19\n\r\n\x05\x04\0\x02\0\x04\x12\x04\x16\x04\x15\x12\n\x0c\ + \n\x05\x04\0\x02\0\x05\x12\x03\x16\x04\n\n\x0c\n\x05\x04\0\x02\0\x01\x12\ + \x03\x16\x0b\x14\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x16\x17\x18\n\x0b\n\ + \x04\x04\0\x02\x01\x12\x03\x17\x04\x13\n\r\n\x05\x04\0\x02\x01\x04\x12\ + \x04\x17\x04\x16\x19\n\x0c\n\x05\x04\0\x02\x01\x05\x12\x03\x17\x04\n\n\ + \x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x17\x0b\x0e\n\x0c\n\x05\x04\0\x02\ + \x01\x03\x12\x03\x17\x11\x12\n\x0b\n\x04\x04\0\x02\x02\x12\x03\x18\x04\ + \x19\n\r\n\x05\x04\0\x02\x02\x04\x12\x04\x18\x04\x17\x13\n\x0c\n\x05\x04\ + \0\x02\x02\x06\x12\x03\x18\x04\x0b\n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03\ + \x18\x0c\x14\n\x0c\n\x05\x04\0\x02\x02\x03\x12\x03\x18\x17\x18\n\x0b\n\ + \x04\x04\0\x02\x03\x12\x03\x19\x04\x17\n\r\n\x05\x04\0\x02\x03\x04\x12\ + \x04\x19\x04\x18\x19\n\x0c\n\x05\x04\0\x02\x03\x05\x12\x03\x19\x04\n\n\ + \x0c\n\x05\x04\0\x02\x03\x01\x12\x03\x19\x0b\x12\n\x0c\n\x05\x04\0\x02\ + \x03\x03\x12\x03\x19\x15\x16\n\x0b\n\x04\x04\0\x02\x04\x12\x03\x1a\x04\ + \x19\n\r\n\x05\x04\0\x02\x04\x04\x12\x04\x1a\x04\x19\x17\n\x0c\n\x05\x04\ + \0\x02\x04\x05\x12\x03\x1a\x04\n\n\x0c\n\x05\x04\0\x02\x04\x01\x12\x03\ + \x1a\x0b\x14\n\x0c\n\x05\x04\0\x02\x04\x03\x12\x03\x1a\x17\x18\n\x0b\n\ + \x04\x04\0\x02\x05\x12\x03\x1b\x04\x13\n\r\n\x05\x04\0\x02\x05\x04\x12\ + \x04\x1b\x04\x1a\x19\n\x0c\n\x05\x04\0\x02\x05\x05\x12\x03\x1b\x04\n\n\ + \x0c\n\x05\x04\0\x02\x05\x01\x12\x03\x1b\x0b\x0e\n\x0c\n\x05\x04\0\x02\ + \x05\x03\x12\x03\x1b\x11\x12\n\x0b\n\x04\x04\0\x02\x06\x12\x03\x1c\x04\ + \x15\n\r\n\x05\x04\0\x02\x06\x04\x12\x04\x1c\x04\x1b\x13\n\x0c\n\x05\x04\ + \0\x02\x06\x05\x12\x03\x1c\x04\n\n\x0c\n\x05\x04\0\x02\x06\x01\x12\x03\ + \x1c\x0b\x10\n\x0c\n\x05\x04\0\x02\x06\x03\x12\x03\x1c\x13\x14\n\x0b\n\ + \x04\x04\0\x02\x07\x12\x03\x1d\x04\x15\n\r\n\x05\x04\0\x02\x07\x04\x12\ + \x04\x1d\x04\x1c\x15\n\x0c\n\x05\x04\0\x02\x07\x05\x12\x03\x1d\x04\n\n\ + \x0c\n\x05\x04\0\x02\x07\x01\x12\x03\x1d\x0b\x10\n\x0c\n\x05\x04\0\x02\ + \x07\x03\x12\x03\x1d\x13\x14\n\n\n\x02\x04\x01\x12\x04\x20\0$\x01\n\n\n\ + \x03\x04\x01\x01\x12\x03\x20\x08\x15\n\x0b\n\x04\x04\x01\x02\0\x12\x03!\ + \x04\x1e\n\r\n\x05\x04\x01\x02\0\x04\x12\x04!\x04\x20\x17\n\x0c\n\x05\ + \x04\x01\x02\0\x06\x12\x03!\x04\x16\n\x0c\n\x05\x04\x01\x02\0\x01\x12\ + \x03!\x17\x19\n\x0c\n\x05\x04\x01\x02\0\x03\x12\x03!\x1c\x1d\n\x0b\n\x04\ + \x04\x01\x02\x01\x12\x03\"\x04\x12\n\r\n\x05\x04\x01\x02\x01\x04\x12\x04\ + \"\x04!\x1e\n\x0c\n\x05\x04\x01\x02\x01\x05\x12\x03\"\x04\n\n\x0c\n\x05\ + \x04\x01\x02\x01\x01\x12\x03\"\x0b\r\n\x0c\n\x05\x04\x01\x02\x01\x03\x12\ + \x03\"\x10\x11\n\x0b\n\x04\x04\x01\x02\x02\x12\x03#\x04\x14\n\r\n\x05\ + \x04\x01\x02\x02\x04\x12\x04#\x04\"\x12\n\x0c\n\x05\x04\x01\x02\x02\x05\ + \x12\x03#\x04\n\n\x0c\n\x05\x04\x01\x02\x02\x01\x12\x03#\x0b\x0f\n\x0c\n\ + \x05\x04\x01\x02\x02\x03\x12\x03#\x12\x13\n\n\n\x02\x04\x02\x12\x04&\0)\ + \x01\n\n\n\x03\x04\x02\x01\x12\x03&\x08\x12\n\x0b\n\x04\x04\x02\x02\0\ + \x12\x03'\x04\x15\n\r\n\x05\x04\x02\x02\0\x04\x12\x04'\x04&\x14\n\x0c\n\ + \x05\x04\x02\x02\0\x05\x12\x03'\x04\n\n\x0c\n\x05\x04\x02\x02\0\x01\x12\ + \x03'\x0b\x10\n\x0c\n\x05\x04\x02\x02\0\x03\x12\x03'\x13\x14\n\x0b\n\x04\ + \x04\x02\x02\x01\x12\x03(\x04\x14\n\r\n\x05\x04\x02\x02\x01\x04\x12\x04(\ + \x04'\x15\n\x0c\n\x05\x04\x02\x02\x01\x06\x12\x03(\x04\x0b\n\x0c\n\x05\ + \x04\x02\x02\x01\x01\x12\x03(\x0c\x0f\n\x0c\n\x05\x04\x02\x02\x01\x03\ + \x12\x03(\x12\x13\n\n\n\x02\x04\x03\x12\x04+\0.\x01\n\n\n\x03\x04\x03\ + \x01\x12\x03+\x08\x0f\n\x0b\n\x04\x04\x03\x02\0\x12\x03,\x04\x15\n\r\n\ + \x05\x04\x03\x02\0\x04\x12\x04,\x04+\x11\n\x0c\n\x05\x04\x03\x02\0\x05\ + \x12\x03,\x04\n\n\x0c\n\x05\x04\x03\x02\0\x01\x12\x03,\x0b\x10\n\x0c\n\ + \x05\x04\x03\x02\0\x03\x12\x03,\x13\x14\n\x0b\n\x04\x04\x03\x02\x01\x12\ + \x03-\x04\x14\n\r\n\x05\x04\x03\x02\x01\x04\x12\x04-\x04,\x15\n\x0c\n\ + \x05\x04\x03\x02\x01\x06\x12\x03-\x04\x0b\n\x0c\n\x05\x04\x03\x02\x01\ + \x01\x12\x03-\x0c\x0f\n\x0c\n\x05\x04\x03\x02\x01\x03\x12\x03-\x12\x13\n\ + \n\n\x02\x04\x04\x12\x040\02\x01\n\n\n\x03\x04\x04\x01\x12\x030\x08\x0f\ + \n\x0b\n\x04\x04\x04\x02\0\x12\x031\x04\x14\n\r\n\x05\x04\x04\x02\0\x04\ + \x12\x041\x040\x11\n\x0c\n\x05\x04\x04\x02\0\x06\x12\x031\x04\x0b\n\x0c\ + \n\x05\x04\x04\x02\0\x01\x12\x031\x0c\x0f\n\x0c\n\x05\x04\x04\x02\0\x03\ + \x12\x031\x12\x13\n\n\n\x02\x04\x05\x12\x044\06\x01\n\n\n\x03\x04\x05\ + \x01\x12\x034\x08\x12\n\x0b\n\x04\x04\x05\x02\0\x12\x035\x04\x14\n\r\n\ + \x05\x04\x05\x02\0\x04\x12\x045\x044\x14\n\x0c\n\x05\x04\x05\x02\0\x06\ + \x12\x035\x04\x0b\n\x0c\n\x05\x04\x05\x02\0\x01\x12\x035\x0c\x0f\n\x0c\n\ + \x05\x04\x05\x02\0\x03\x12\x035\x12\x13\n\n\n\x02\x04\x06\x12\x048\0;\ + \x01\n\n\n\x03\x04\x06\x01\x12\x038\x08\x12\n\x0b\n\x04\x04\x06\x02\0\ + \x12\x039\x04\x15\n\r\n\x05\x04\x06\x02\0\x04\x12\x049\x048\x14\n\x0c\n\ + \x05\x04\x06\x02\0\x05\x12\x039\x04\n\n\x0c\n\x05\x04\x06\x02\0\x01\x12\ + \x039\x0b\x10\n\x0c\n\x05\x04\x06\x02\0\x03\x12\x039\x13\x14\n\x0b\n\x04\ + \x04\x06\x02\x01\x12\x03:\x04\x14\n\r\n\x05\x04\x06\x02\x01\x04\x12\x04:\ + \x049\x15\n\x0c\n\x05\x04\x06\x02\x01\x06\x12\x03:\x04\x0b\n\x0c\n\x05\ + \x04\x06\x02\x01\x01\x12\x03:\x0c\x0f\n\x0c\n\x05\x04\x06\x02\x01\x03\ + \x12\x03:\x12\x13\n\n\n\x02\x04\x07\x12\x04=\0?\x01\n\n\n\x03\x04\x07\ + \x01\x12\x03=\x08\x10\n\x0b\n\x04\x04\x07\x02\0\x12\x03>\x04\x14\n\r\n\ + \x05\x04\x07\x02\0\x04\x12\x04>\x04=\x12\n\x0c\n\x05\x04\x07\x02\0\x06\ + \x12\x03>\x04\x0b\n\x0c\n\x05\x04\x07\x02\0\x01\x12\x03>\x0c\x0f\n\x0c\n\ + \x05\x04\x07\x02\0\x03\x12\x03>\x12\x13b\x06proto3\ "; static mut file_descriptor_proto_lazy: ::protobuf::lazy::Lazy<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::lazy::Lazy { diff --git a/src/server/peer.rs b/src/server/peer.rs index 781967d..139cc77 100644 --- a/src/server/peer.rs +++ b/src/server/peer.rs @@ -18,23 +18,24 @@ pub enum PeerMessage { pub struct Peer { raft_group: RawNode, - // last_applying_idx: u64, - // last_compacted_idx: u64, + // last_applying_idx: u64, + // last_compacted_idx: u64, apply_ch: SyncSender, - // peers_addr: HashMap, + // peers_addr: HashMap, // id, (host, port) } impl Peer { pub fn new(id: u64, apply_ch: SyncSender, peers: Vec) -> Peer { let cfg = util::default_raft_config(id, peers); let storge = PeerStorage::new(); - Peer { + let peer = Peer { raft_group: RawNode::new(&cfg, storge, vec![]).unwrap(), - // last_applying_idx: 0, - // last_compacted_idx: 0, + // last_applying_idx: 0, + // last_compacted_idx: 0, apply_ch, - // peers_addr: HashMap::new(), - } + // peers_addr: HashMap::new(), + }; + peer } pub fn activate(mut peer: Peer, sender: SyncSender, receiver: Receiver) { @@ -55,7 +56,7 @@ impl Peer { Ok(PeerMessage::ConfChange(cc)) => { match self.raft_group.propose_conf_change(vec![], cc.clone()) { Ok(_) => (), - Err(_) => debug!("conf change failed: {:?}", cc), + Err(_) => error!("conf change failed: {:?}", cc), } } Ok(PeerMessage::Message(m)) => self.raft_group.step(m).unwrap(), @@ -84,7 +85,7 @@ impl Peer { let mut ready = self.raft_group.ready(); let is_leader = self.raft_group.raft.leader_id == self.raft_group.raft.id; if is_leader { - // debug!("I'm leader"); + // debug!("I'm leader"); let msgs = ready.messages.drain(..); for _msg in msgs { Self::send_message(sender.clone(), _msg.clone()); @@ -112,7 +113,7 @@ impl Peer { } if !is_leader { - // debug!("I'm follower"); + // debug!("I'm follower"); let msgs = ready.messages.drain(..); for mut _msg in msgs { for _entry in _msg.mut_entries().iter() { @@ -138,6 +139,7 @@ impl Peer { EntryType::EntryNormal => self.apply_message(entry.clone()), EntryType::EntryConfChange => { let cc = util::parse_data(&entry.data); + debug!("config: {:?}", cc); self.raft_group.apply_conf_change(&cc); debug!("apply conf change"); self.apply_message(entry.clone()); diff --git a/src/server/server.rs b/src/server/server.rs index 4128628..3d70ae9 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -10,15 +10,17 @@ use futures::Future; use grpcio::{ChannelBuilder, EnvBuilder, Environment, RpcContext, ServerBuilder, UnarySink}; use log::*; use protobuf::Message; -use raft::eraftpb::{ConfChange, Entry, EntryType, Message as RaftMessage}; +use raft::eraftpb::{ConfChange, ConfChangeType, Entry, EntryType, Message as RaftMessage}; use tantivy::collector::TopDocs; use tantivy::query::{QueryParser, TermQuery}; use tantivy::schema::{Field, FieldType, IndexRecordOption, NamedFieldDocument}; use tantivy::{Document, Index, Term}; +use crate::client::client::Clerk; use crate::proto::indexpb_grpc::{self, Index as IndexService, IndexClient}; use crate::proto::indexrpcpb::{ ConfChangeReq, DeleteResp, GetResp, IndexReq, PutResp, RaftDone, ReqType, RespErr, SearchResp, + StatusResp, }; use crate::server::peer::PeerMessage; use crate::server::{peer, util}; @@ -39,6 +41,7 @@ fn sigterm_channel() -> Result, ctrlc::Error> { pub struct IndexServer { id: u64, peers: Arc>>, + peers_addr: Arc>>, rf_message_ch: SyncSender, notify_ch_map: Arc>>>, index: Arc, @@ -50,6 +53,7 @@ impl IndexServer { id: u64, host: &str, port: u16, + leader_id: u64, peers: HashMap, index: Arc, unique_key_field: &str, @@ -59,11 +63,11 @@ impl IndexServer { let (apply_sender, apply_receiver) = mpsc::sync_channel(100); let peers_id = peers.keys().map(|id| *id).collect(); - let peer = peer::Peer::new(id, apply_sender, peers_id); let mut index_server = IndexServer { id, peers: Arc::new(Mutex::new(peers)), + peers_addr: Arc::new(Mutex::new(HashMap::new())), rf_message_ch: rf_sender, notify_ch_map: Arc::new(Mutex::new(HashMap::new())), index, @@ -74,7 +78,7 @@ impl IndexServer { index_server.async_applier(apply_receiver); let env = Arc::new(Environment::new(10)); - let service = indexpb_grpc::create_index(index_server); + let service = indexpb_grpc::create_index(index_server.clone()); let mut server = ServerBuilder::new(env) .register_service(service) .bind(host, port) @@ -83,12 +87,32 @@ impl IndexServer { panic!("build server error: {}", e); }); - peer::Peer::activate(peer, rpc_sender, rf_receiver); server.start(); for &(ref host, port) in server.bind_addrs() { info!("listening on {}:{}", host, port); } + let peer = peer::Peer::new(id, apply_sender, peers_id); + peer::Peer::activate(peer, rpc_sender, rf_receiver); + + debug!("leader_id: {}", leader_id); + if leader_id > 0 { + let mut leaders: Vec = Vec::new(); + leaders.push( + index_server + .peers + .clone() + .lock() + .unwrap() + .get(&leader_id) + .unwrap() + .clone(), + ); + let client_id = rand::random(); + let mut client = Clerk::new(&leaders, client_id); + client.join(id, host, port); + } + // Wait for signals for termination (SIGINT, SIGTERM). let sigterm_receiver = sigterm_channel().unwrap(); loop { @@ -157,6 +181,7 @@ impl IndexServer { fn async_applier(&mut self, apply_receiver: Receiver) { let notify_ch_map = self.notify_ch_map.clone(); let peers = self.peers.clone(); + let peers_addr = self.peers_addr.clone(); let index = self.index.clone(); let unique_key_field = self.unique_key_field.clone(); @@ -172,13 +197,14 @@ impl IndexServer { e.term, &req, peers.clone(), + peers_addr.clone(), index.clone(), unique_key_field.as_str(), ); - debug!("apply_entry: {:?}---{:?}", req, result.2); + debug!("{:?}: {:?}", result.2, req); } else { result = NotifyArgs(0, String::from(""), RespErr::ErrWrongLeader); - debug!("empty_entry: {:?}", req); + debug!("{:?}", req); } let mut map = notify_ch_map.lock().unwrap(); if let Some(s) = map.get(&client_id) { @@ -209,13 +235,12 @@ impl IndexServer { term: u64, req: &IndexReq, peers: Arc>>, + peers_addr: Arc>>, index: Arc, unique_key_field: &str, ) -> NotifyArgs { match req.req_type { ReqType::Get => { - // searcher - debug!("Get"); let t = Term::from_field_text( index.schema().get_field(unique_key_field).unwrap(), req.key.as_str(), @@ -235,8 +260,6 @@ impl IndexServer { ) } ReqType::Put => { - // indexer - debug!("Put"); let num_threads = 1; let buffer_size_per_thread = 50_000_000; let mut index_writer = if num_threads > 0 { @@ -263,8 +286,6 @@ impl IndexServer { } } ReqType::Delete => { - // indexer - debug!("Delete"); let num_threads = 1; let buffer_size_per_thread = 50_000_000; let mut index_writer = if num_threads > 0 { @@ -290,8 +311,6 @@ impl IndexServer { } } ReqType::Search => { - // searcher - debug!("Search"); let schema = index.schema(); let default_fields: Vec = schema .fields() @@ -326,13 +345,38 @@ impl IndexServer { RespErr::OK, ) } - ReqType::PeerAddr => { + ReqType::Join => { + debug!("request: {:?}", &req); let mut prs = peers.lock().unwrap(); let env = Arc::new(EnvBuilder::new().build()); let ch = ChannelBuilder::new(env).connect(&req.peer_addr); prs.insert(req.peer_id, IndexClient::new(ch)); + + let mut prs_addr = peers_addr.lock().unwrap(); + prs_addr.insert(req.peer_id, req.peer_addr.clone()); + + NotifyArgs(term, String::from(""), RespErr::OK) + } + ReqType::Leave => { + debug!("request: {:?}", &req); + let mut prs = peers.lock().unwrap(); + prs.remove(&req.peer_id); + + let mut prs_addr = peers_addr.lock().unwrap(); + prs_addr.remove(&req.peer_id); + NotifyArgs(term, String::from(""), RespErr::OK) } + ReqType::Status => { + let prs_addr = peers_addr.lock().unwrap(); + debug!("{:?}", prs_addr); + NotifyArgs( + term, + serde_json::to_string(&prs_addr.clone()).unwrap(), + // String::from(""), + RespErr::OK, + ) + } } } } @@ -394,10 +438,19 @@ impl IndexService for IndexServer { } fn raft_conf_change(&mut self, ctx: RpcContext, req: ConfChangeReq, sink: UnarySink) { + debug!("request: {:?}", req); let cc = req.cc.clone().unwrap(); let mut resp = RaftDone::new(); let mut peer_req = IndexReq::new(); - peer_req.set_req_type(ReqType::PeerAddr); + + match cc.change_type { + ConfChangeType::AddNode | ConfChangeType::AddLearnerNode => { + peer_req.set_req_type(ReqType::Join); + } + ConfChangeType::RemoveNode => { + peer_req.set_req_type(ReqType::Leave); + } + } peer_req.set_peer_addr(format!("{}:{}", req.ip, req.port)); peer_req.set_peer_id(cc.get_node_id()); peer_req.set_client_id(cc.get_node_id()); @@ -425,4 +478,15 @@ impl IndexService for IndexServer { .map_err(move |e| error!("failed to reply {:?}: {:?}", req, e)), ) } + + fn status(&mut self, ctx: RpcContext, req: IndexReq, sink: UnarySink) { + let (err, value) = Self::start_op(self, &req); + let mut resp = StatusResp::new(); + resp.set_err(err); + resp.set_value(value); + ctx.spawn( + sink.success(resp) + .map_err(move |e| error!("failed to reply {:?}: {:?}", req, e)), + ) + } } diff --git a/src/server/util.rs b/src/server/util.rs index b9e1f15..ba21a9c 100644 --- a/src/server/util.rs +++ b/src/server/util.rs @@ -1,9 +1,6 @@ -use crate::proto::indexpb_grpc::IndexClient; -use crate::proto::indexrpcpb::{ConfChangeReq, RaftDone, RespErr}; use log::*; use protobuf::{self, Message}; use raft::Config; -use std::collections::HashMap; pub fn default_raft_config(id: u64, peers: Vec) -> Config { debug!("default_raft_config id:{} peers:{:?}", id, peers); @@ -24,38 +21,3 @@ pub fn parse_data(data: &[u8]) -> T { panic!("data is corrupted: {:?}", e); }) } - -pub fn conf_change( - self_id: u64, - leader_id: u64, - peers: &HashMap, - req: ConfChangeReq, -) { - let client = peers.get(&leader_id).unwrap(); - let reply = client.raft_conf_change(&req).unwrap_or_else(|_e| { - let mut resp = RaftDone::new(); - resp.set_err(RespErr::ErrWrongLeader); - resp - }); - match reply.err { - RespErr::OK => return, - RespErr::ErrWrongLeader => (), - RespErr::ErrNoKey => return, - } - loop { - for (id, client) in peers.iter() { - if *id != self_id { - let reply = client.raft_conf_change(&req).unwrap_or_else(|_e| { - let mut resp = RaftDone::new(); - resp.set_err(RespErr::ErrWrongLeader); - resp - }); - match reply.err { - RespErr::OK => return, - RespErr::ErrWrongLeader => (), - RespErr::ErrNoKey => return, - } - } - } - } -} diff --git a/src/util/log.rs b/src/util/log.rs index df11498..7334967 100644 --- a/src/util/log.rs +++ b/src/util/log.rs @@ -1,6 +1,9 @@ use std::env; +use std::io::Write; -pub fn set_log_level() { +use env_logger; + +pub fn set_logger() { match env::var("RUST_LOG") { Ok(val) => { let log_level: &str = &val; @@ -13,5 +16,20 @@ pub fn set_log_level() { } } Err(_e) => env::set_var("RUST_LOG", "info"), - } + }; + env_logger::Builder::from_default_env() + .format(|buf, record| { + let ts = buf.timestamp(); + writeln!( + buf, + "[{} {} {} {}:{}] {}", + ts, + record.level(), + record.target(), + record.file().unwrap_or("unknown"), + record.line().unwrap_or(0), + record.args(), + ) + }) + .init(); }