这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions proto/indexpb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ service Index {
}
rpc RaftConfChange (indexrpcpb.ConfChangeReq) returns (indexrpcpb.RaftDone) {
}
rpc Probe (indexrpcpb.ProbeReq) returns (indexrpcpb.ProbeResp) {
}
rpc Peers (indexrpcpb.PeersReq) returns (indexrpcpb.PeersResp) {
}
rpc Metrics (indexrpcpb.MetricsReq) returns (indexrpcpb.MetricsResp) {
Expand Down
10 changes: 10 additions & 0 deletions proto/indexrpcpb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ message LeaveReq {
string peer_addr = 3;
}

message ProbeReq {
uint64 client_id = 1;
uint64 seq = 2;
}

message ProbeResp {
string value = 1;
RespErr err = 2;
}

message PeersReq {
uint64 client_id = 1;
uint64 seq = 2;
Expand Down
33 changes: 30 additions & 3 deletions src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@ use raft::eraftpb::{ConfChange, ConfChangeType};
use crate::proto::indexpb_grpc::IndexClient;
use crate::proto::indexrpcpb::{
ApplyReq, CommitReq, CommitResp, ConfChangeReq, DeleteReq, DeleteResp, GetReq, GetResp,
MergeReq, MergeResp, MetricsReq, MetricsResp, PeersReq, PeersResp, PutReq, PutResp, RaftDone,
ReqType, RespErr, RollbackReq, RollbackResp, SchemaReq, SchemaResp, SearchReq, SearchResp,
MergeReq, MergeResp, MetricsReq, MetricsResp, PeersReq, PeersResp, ProbeReq, ProbeResp, PutReq,
PutResp, RaftDone, ReqType, RespErr, RollbackReq, RollbackResp, SchemaReq, SchemaResp,
SearchReq, SearchResp,
};

pub fn create_client(addr: &str) -> IndexClient {
let env = Arc::new(EnvBuilder::new().build());
let ch = ChannelBuilder::new(env).connect(&addr);
debug!("create channel for {}", addr);
let index_client = IndexClient::new(ch);
debug!("create index client for {}", addr);
IndexClient::new(ch)
index_client
}

pub struct Clerk {
Expand Down Expand Up @@ -94,6 +97,30 @@ impl Clerk {
}
}

pub fn probe(&mut self) -> String {
let mut req = ProbeReq::new();
req.set_client_id(self.client_id);
req.set_seq(self.request_seq);
self.request_seq += 1;

loop {
let reply = self.servers[self.leader_id]
.probe(&req)
.unwrap_or_else(|_e| {
let mut resp = ProbeResp::new();
resp.set_err(RespErr::ErrWrongLeader);
resp
});
match reply.err {
RespErr::OK => return reply.value,
RespErr::ErrWrongLeader => (),
RespErr::ErrNoKey => return String::from(""),
}
self.leader_id = (self.leader_id + 1) % self.servers.len();
thread::sleep(Duration::from_millis(100));
}
}

pub fn peers(&mut self) -> String {
let mut req = PeersReq::new();
req.set_client_id(self.client_id);
Expand Down
1 change: 1 addition & 0 deletions src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod leave;
pub mod merge;
pub mod metrics;
pub mod peers;
pub mod probe;
pub mod rollback;
pub mod schema;
pub mod search;
Expand Down
22 changes: 22 additions & 0 deletions src/cmd/probe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use clap::ArgMatches;

use crate::client::client::{create_client, Clerk};
use crate::util::log::set_logger;

pub fn run_probe_cli(matches: &ArgMatches) -> Result<(), String> {
set_logger();

let servers: Vec<_> = matches
.values_of("SERVERS")
.unwrap()
.map(|addr| create_client(addr))
.collect();

let client_id = rand::random();

let mut client = Clerk::new(&servers, client_id);
let value = client.probe();
print!("{}", value);

Ok(())
}
23 changes: 23 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use bayard::cmd::leave::run_leave_cli;
use bayard::cmd::merge::run_merge_cli;
use bayard::cmd::metrics::run_metrics_cli;
use bayard::cmd::peers::run_peers_cli;
use bayard::cmd::probe::run_probe_cli;
use bayard::cmd::rollback::run_rollback_cli;
use bayard::cmd::schema::run_schema_cli;
use bayard::cmd::search::run_search_cli;
Expand Down Expand Up @@ -100,6 +101,27 @@ fn main() {
.takes_value(true),
)
)
.subcommand(
SubCommand::with_name("probe")
.name("probe")
.setting(AppSettings::DeriveDisplayOrder)
.version(crate_version!())
.author(crate_authors!())
.about("Probe a server")
.arg(
Arg::with_name("SERVERS")
.help("The server addresses. Use `,` to separate address. Example: `127.0.0.1:5000,127.0.0.1:5001`")
.short("s")
.long("servers")
.value_name("IP:PORT")
.default_value("127.0.0.1:5000")
.multiple(true)
.use_delimiter(true)
.require_delimiter(true)
.value_delimiter(",")
.takes_value(true),
)
)
.subcommand(
SubCommand::with_name("peers")
.name("peers")
Expand Down Expand Up @@ -381,6 +403,7 @@ fn main() {
let options = some_options.unwrap();
let run_cli = match subcommand {
"serve" => run_serve_cli,
"probe" => run_probe_cli,
"peers" => run_peers_cli,
"metrics" => run_metrics_cli,
"leave" => run_leave_cli,
Expand Down
66 changes: 35 additions & 31 deletions src/proto/indexpb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,24 @@ const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_8_0;

static file_descriptor_proto_data: &'static [u8] = b"\
\n\rindexpb.proto\x12\x07indexpb\x1a\x10indexrpcpb.proto\x1a\reraftpb.pr\
oto2\xba\x05\n\x05Index\x120\n\x04Raft\x12\x10.eraftpb.Message\x1a\x14.i\
oto2\xf2\x05\n\x05Index\x120\n\x04Raft\x12\x10.eraftpb.Message\x1a\x14.i\
ndexrpcpb.RaftDone\"\0\x12C\n\x0eRaftConfChange\x12\x19.indexrpcpb.ConfC\
hangeReq\x1a\x14.indexrpcpb.RaftDone\"\0\x126\n\x05Peers\x12\x14.indexrp\
cpb.PeersReq\x1a\x15.indexrpcpb.PeersResp\"\0\x12<\n\x07Metrics\x12\x16.\
indexrpcpb.MetricsReq\x1a\x17.indexrpcpb.MetricsResp\"\0\x120\n\x03Get\
\x12\x12.indexrpcpb.GetReq\x1a\x13.indexrpcpb.GetResp\"\0\x122\n\x03Put\
\x12\x14.indexrpcpb.ApplyReq\x1a\x13.indexrpcpb.PutResp\"\0\x128\n\x06De\
lete\x12\x14.indexrpcpb.ApplyReq\x1a\x16.indexrpcpb.DeleteResp\"\0\x128\
\n\x06Commit\x12\x14.indexrpcpb.ApplyReq\x1a\x16.indexrpcpb.CommitResp\"\
\0\x12<\n\x08Rollback\x12\x14.indexrpcpb.ApplyReq\x1a\x18.indexrpcpb.Rol\
lbackResp\"\0\x126\n\x05Merge\x12\x14.indexrpcpb.ApplyReq\x1a\x15.indexr\
pcpb.MergeResp\"\0\x129\n\x06Search\x12\x15.indexrpcpb.SearchReq\x1a\x16\
.indexrpcpb.SearchResp\"\0\x129\n\x06Schema\x12\x15.indexrpcpb.SchemaReq\
\x1a\x16.indexrpcpb.SchemaResp\"\0J\xea\x05\n\x06\x12\x04\0\0\x1f\x01\n\
hangeReq\x1a\x14.indexrpcpb.RaftDone\"\0\x126\n\x05Probe\x12\x14.indexrp\
cpb.ProbeReq\x1a\x15.indexrpcpb.ProbeResp\"\0\x126\n\x05Peers\x12\x14.in\
dexrpcpb.PeersReq\x1a\x15.indexrpcpb.PeersResp\"\0\x12<\n\x07Metrics\x12\
\x16.indexrpcpb.MetricsReq\x1a\x17.indexrpcpb.MetricsResp\"\0\x120\n\x03\
Get\x12\x12.indexrpcpb.GetReq\x1a\x13.indexrpcpb.GetResp\"\0\x122\n\x03P\
ut\x12\x14.indexrpcpb.ApplyReq\x1a\x13.indexrpcpb.PutResp\"\0\x128\n\x06\
Delete\x12\x14.indexrpcpb.ApplyReq\x1a\x16.indexrpcpb.DeleteResp\"\0\x12\
8\n\x06Commit\x12\x14.indexrpcpb.ApplyReq\x1a\x16.indexrpcpb.CommitResp\
\"\0\x12<\n\x08Rollback\x12\x14.indexrpcpb.ApplyReq\x1a\x18.indexrpcpb.R\
ollbackResp\"\0\x126\n\x05Merge\x12\x14.indexrpcpb.ApplyReq\x1a\x15.inde\
xrpcpb.MergeResp\"\0\x129\n\x06Search\x12\x15.indexrpcpb.SearchReq\x1a\
\x16.indexrpcpb.SearchResp\"\0\x129\n\x06Schema\x12\x15.indexrpcpb.Schem\
aReq\x1a\x16.indexrpcpb.SchemaResp\"\0J\xa2\x06\n\x06\x12\x04\0\0!\x01\n\
\x08\n\x01\x0c\x12\x03\0\0\x12\n\x08\n\x01\x02\x12\x03\x01\0\x10\n\t\n\
\x02\x03\0\x12\x03\x03\0\x1a\n\t\n\x02\x03\x01\x12\x03\x04\0\x17\n\n\n\
\x02\x06\0\x12\x04\x06\0\x1f\x01\n\n\n\x03\x06\0\x01\x12\x03\x06\x08\r\n\
\x02\x06\0\x12\x04\x06\0!\x01\n\n\n\x03\x06\0\x01\x12\x03\x06\x08\r\n\
\x0c\n\x04\x06\0\x02\0\x12\x04\x07\x04\x08\x05\n\x0c\n\x05\x06\0\x02\0\
\x01\x12\x03\x07\x08\x0c\n\x0c\n\x05\x06\0\x02\0\x02\x12\x03\x07\x0e\x1d\
\n\x0c\n\x05\x06\0\x02\0\x03\x12\x03\x07(;\n\x0c\n\x04\x06\0\x02\x01\x12\
Expand All @@ -54,29 +55,32 @@ static file_descriptor_proto_data: &'static [u8] = b"\
\x06\0\x02\x02\x01\x12\x03\x0b\x08\r\n\x0c\n\x05\x06\0\x02\x02\x02\x12\
\x03\x0b\x0f\"\n\x0c\n\x05\x06\0\x02\x02\x03\x12\x03\x0b-A\n\x0c\n\x04\
\x06\0\x02\x03\x12\x04\r\x04\x0e\x05\n\x0c\n\x05\x06\0\x02\x03\x01\x12\
\x03\r\x08\x0f\n\x0c\n\x05\x06\0\x02\x03\x02\x12\x03\r\x11&\n\x0c\n\x05\
\x06\0\x02\x03\x03\x12\x03\r1G\n\x0c\n\x04\x06\0\x02\x04\x12\x04\x0f\x04\
\x10\x05\n\x0c\n\x05\x06\0\x02\x04\x01\x12\x03\x0f\x08\x0b\n\x0c\n\x05\
\x06\0\x02\x04\x02\x12\x03\x0f\r\x1e\n\x0c\n\x05\x06\0\x02\x04\x03\x12\
\x03\x0f);\n\x0c\n\x04\x06\0\x02\x05\x12\x04\x11\x04\x12\x05\n\x0c\n\x05\
\x03\r\x08\r\n\x0c\n\x05\x06\0\x02\x03\x02\x12\x03\r\x0f\"\n\x0c\n\x05\
\x06\0\x02\x03\x03\x12\x03\r-A\n\x0c\n\x04\x06\0\x02\x04\x12\x04\x0f\x04\
\x10\x05\n\x0c\n\x05\x06\0\x02\x04\x01\x12\x03\x0f\x08\x0f\n\x0c\n\x05\
\x06\0\x02\x04\x02\x12\x03\x0f\x11&\n\x0c\n\x05\x06\0\x02\x04\x03\x12\
\x03\x0f1G\n\x0c\n\x04\x06\0\x02\x05\x12\x04\x11\x04\x12\x05\n\x0c\n\x05\
\x06\0\x02\x05\x01\x12\x03\x11\x08\x0b\n\x0c\n\x05\x06\0\x02\x05\x02\x12\
\x03\x11\r\x20\n\x0c\n\x05\x06\0\x02\x05\x03\x12\x03\x11+=\n\x0c\n\x04\
\x03\x11\r\x1e\n\x0c\n\x05\x06\0\x02\x05\x03\x12\x03\x11);\n\x0c\n\x04\
\x06\0\x02\x06\x12\x04\x13\x04\x14\x05\n\x0c\n\x05\x06\0\x02\x06\x01\x12\
\x03\x13\x08\x0e\n\x0c\n\x05\x06\0\x02\x06\x02\x12\x03\x13\x10#\n\x0c\n\
\x05\x06\0\x02\x06\x03\x12\x03\x13.C\n\x0c\n\x04\x06\0\x02\x07\x12\x04\
\x03\x13\x08\x0b\n\x0c\n\x05\x06\0\x02\x06\x02\x12\x03\x13\r\x20\n\x0c\n\
\x05\x06\0\x02\x06\x03\x12\x03\x13+=\n\x0c\n\x04\x06\0\x02\x07\x12\x04\
\x15\x04\x16\x05\n\x0c\n\x05\x06\0\x02\x07\x01\x12\x03\x15\x08\x0e\n\x0c\
\n\x05\x06\0\x02\x07\x02\x12\x03\x15\x10#\n\x0c\n\x05\x06\0\x02\x07\x03\
\x12\x03\x15.C\n\x0c\n\x04\x06\0\x02\x08\x12\x04\x17\x04\x18\x05\n\x0c\n\
\x05\x06\0\x02\x08\x01\x12\x03\x17\x08\x10\n\x0c\n\x05\x06\0\x02\x08\x02\
\x12\x03\x17\x12%\n\x0c\n\x05\x06\0\x02\x08\x03\x12\x03\x170G\n\x0c\n\
\x05\x06\0\x02\x08\x01\x12\x03\x17\x08\x0e\n\x0c\n\x05\x06\0\x02\x08\x02\
\x12\x03\x17\x10#\n\x0c\n\x05\x06\0\x02\x08\x03\x12\x03\x17.C\n\x0c\n\
\x04\x06\0\x02\t\x12\x04\x19\x04\x1a\x05\n\x0c\n\x05\x06\0\x02\t\x01\x12\
\x03\x19\x08\r\n\x0c\n\x05\x06\0\x02\t\x02\x12\x03\x19\x0f\"\n\x0c\n\x05\
\x06\0\x02\t\x03\x12\x03\x19-A\n\x0c\n\x04\x06\0\x02\n\x12\x04\x1b\x04\
\x1c\x05\n\x0c\n\x05\x06\0\x02\n\x01\x12\x03\x1b\x08\x0e\n\x0c\n\x05\x06\
\0\x02\n\x02\x12\x03\x1b\x10$\n\x0c\n\x05\x06\0\x02\n\x03\x12\x03\x1b/D\
\n\x0c\n\x04\x06\0\x02\x0b\x12\x04\x1d\x04\x1e\x05\n\x0c\n\x05\x06\0\x02\
\x0b\x01\x12\x03\x1d\x08\x0e\n\x0c\n\x05\x06\0\x02\x0b\x02\x12\x03\x1d\
\x10$\n\x0c\n\x05\x06\0\x02\x0b\x03\x12\x03\x1d/Db\x06proto3\
\x03\x19\x08\x10\n\x0c\n\x05\x06\0\x02\t\x02\x12\x03\x19\x12%\n\x0c\n\
\x05\x06\0\x02\t\x03\x12\x03\x190G\n\x0c\n\x04\x06\0\x02\n\x12\x04\x1b\
\x04\x1c\x05\n\x0c\n\x05\x06\0\x02\n\x01\x12\x03\x1b\x08\r\n\x0c\n\x05\
\x06\0\x02\n\x02\x12\x03\x1b\x0f\"\n\x0c\n\x05\x06\0\x02\n\x03\x12\x03\
\x1b-A\n\x0c\n\x04\x06\0\x02\x0b\x12\x04\x1d\x04\x1e\x05\n\x0c\n\x05\x06\
\0\x02\x0b\x01\x12\x03\x1d\x08\x0e\n\x0c\n\x05\x06\0\x02\x0b\x02\x12\x03\
\x1d\x10$\n\x0c\n\x05\x06\0\x02\x0b\x03\x12\x03\x1d/D\n\x0c\n\x04\x06\0\
\x02\x0c\x12\x04\x1f\x04\x20\x05\n\x0c\n\x05\x06\0\x02\x0c\x01\x12\x03\
\x1f\x08\x0e\n\x0c\n\x05\x06\0\x02\x0c\x02\x12\x03\x1f\x10$\n\x0c\n\x05\
\x06\0\x02\x0c\x03\x12\x03\x1f/Db\x06proto3\
";

static mut file_descriptor_proto_lazy: ::protobuf::lazy::Lazy<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::lazy::Lazy {
Expand Down
28 changes: 28 additions & 0 deletions src/proto/indexpb_grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ const METHOD_INDEX_RAFT_CONF_CHANGE: ::grpcio::Method<super::indexrpcpb::ConfCha
resp_mar: ::grpcio::Marshaller { ser: ::grpcio::pb_ser, de: ::grpcio::pb_de },
};

const METHOD_INDEX_PROBE: ::grpcio::Method<super::indexrpcpb::ProbeReq, super::indexrpcpb::ProbeResp> = ::grpcio::Method {
ty: ::grpcio::MethodType::Unary,
name: "/indexpb.Index/Probe",
req_mar: ::grpcio::Marshaller { ser: ::grpcio::pb_ser, de: ::grpcio::pb_de },
resp_mar: ::grpcio::Marshaller { ser: ::grpcio::pb_ser, de: ::grpcio::pb_de },
};

const METHOD_INDEX_PEERS: ::grpcio::Method<super::indexrpcpb::PeersReq, super::indexrpcpb::PeersResp> = ::grpcio::Method {
ty: ::grpcio::MethodType::Unary,
name: "/indexpb.Index/Peers",
Expand Down Expand Up @@ -146,6 +153,22 @@ impl IndexClient {
self.raft_conf_change_async_opt(req, ::grpcio::CallOption::default())
}

pub fn probe_opt(&self, req: &super::indexrpcpb::ProbeReq, opt: ::grpcio::CallOption) -> ::grpcio::Result<super::indexrpcpb::ProbeResp> {
self.client.unary_call(&METHOD_INDEX_PROBE, req, opt)
}

pub fn probe(&self, req: &super::indexrpcpb::ProbeReq) -> ::grpcio::Result<super::indexrpcpb::ProbeResp> {
self.probe_opt(req, ::grpcio::CallOption::default())
}

pub fn probe_async_opt(&self, req: &super::indexrpcpb::ProbeReq, opt: ::grpcio::CallOption) -> ::grpcio::Result<::grpcio::ClientUnaryReceiver<super::indexrpcpb::ProbeResp>> {
self.client.unary_call_async(&METHOD_INDEX_PROBE, req, opt)
}

pub fn probe_async(&self, req: &super::indexrpcpb::ProbeReq) -> ::grpcio::Result<::grpcio::ClientUnaryReceiver<super::indexrpcpb::ProbeResp>> {
self.probe_async_opt(req, ::grpcio::CallOption::default())
}

pub fn peers_opt(&self, req: &super::indexrpcpb::PeersReq, opt: ::grpcio::CallOption) -> ::grpcio::Result<super::indexrpcpb::PeersResp> {
self.client.unary_call(&METHOD_INDEX_PEERS, req, opt)
}
Expand Down Expand Up @@ -313,6 +336,7 @@ impl IndexClient {
pub trait Index {
fn raft(&mut self, ctx: ::grpcio::RpcContext, req: super::eraftpb::Message, sink: ::grpcio::UnarySink<super::indexrpcpb::RaftDone>);
fn raft_conf_change(&mut self, ctx: ::grpcio::RpcContext, req: super::indexrpcpb::ConfChangeReq, sink: ::grpcio::UnarySink<super::indexrpcpb::RaftDone>);
fn probe(&mut self, ctx: ::grpcio::RpcContext, req: super::indexrpcpb::ProbeReq, sink: ::grpcio::UnarySink<super::indexrpcpb::ProbeResp>);
fn peers(&mut self, ctx: ::grpcio::RpcContext, req: super::indexrpcpb::PeersReq, sink: ::grpcio::UnarySink<super::indexrpcpb::PeersResp>);
fn metrics(&mut self, ctx: ::grpcio::RpcContext, req: super::indexrpcpb::MetricsReq, sink: ::grpcio::UnarySink<super::indexrpcpb::MetricsResp>);
fn get(&mut self, ctx: ::grpcio::RpcContext, req: super::indexrpcpb::GetReq, sink: ::grpcio::UnarySink<super::indexrpcpb::GetResp>);
Expand All @@ -336,6 +360,10 @@ pub fn create_index<S: Index + Send + Clone + 'static>(s: S) -> ::grpcio::Servic
instance.raft_conf_change(ctx, req, resp)
});
let mut instance = s.clone();
builder = builder.add_unary_handler(&METHOD_INDEX_PROBE, move |ctx, req, resp| {
instance.probe(ctx, req, resp)
});
let mut instance = s.clone();
builder = builder.add_unary_handler(&METHOD_INDEX_PEERS, move |ctx, req, resp| {
instance.peers(ctx, req, resp)
});
Expand Down
Loading