这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions proto/indexpb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ service Index {
}
rpc Rollback (indexrpcpb.ApplyReq) returns (indexrpcpb.RollbackResp) {
}
rpc Merge (indexrpcpb.ApplyReq) returns (indexrpcpb.MergeResp) {
}
rpc Search (indexrpcpb.SearchReq) returns (indexrpcpb.SearchResp) {
}
rpc Schema (indexrpcpb.SchemaReq) returns (indexrpcpb.SchemaResp) {
Expand Down
11 changes: 11 additions & 0 deletions proto/indexrpcpb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ enum ReqType {
Delete = 3;
Commit = 4;
Rollback = 5;
Merge = 6;
}

message ApplyReq {
Expand All @@ -27,6 +28,7 @@ message ApplyReq {
DeleteReq delete_req = 6;
CommitReq commit_req = 7;
RollbackReq rollback_req = 8;
MergeReq merge_req = 9;
}

message ConfChangeReq {
Expand Down Expand Up @@ -121,6 +123,15 @@ message RollbackResp {
RespErr err = 1;
}

message MergeReq {
uint64 client_id = 1;
uint64 seq = 2;
}

message MergeResp {
RespErr err = 1;
}

message SearchReq {
uint64 client_id = 1;
uint64 seq = 2;
Expand Down
34 changes: 32 additions & 2 deletions src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use raft::eraftpb::{ConfChange, ConfChangeType};
use crate::proto::indexpb_grpc::IndexClient;
use crate::proto::indexrpcpb::{
ApplyReq, CommitReq, CommitResp, ConfChangeReq, DeleteReq, DeleteResp, GetReq, GetResp,
MetricsReq, MetricsResp, PeersReq, PeersResp, PutReq, PutResp, RaftDone, ReqType, RespErr,
RollbackReq, RollbackResp, SchemaReq, SchemaResp, SearchReq, SearchResp,
MergeReq, MergeResp, MetricsReq, MetricsResp, PeersReq, PeersResp, PutReq, PutResp, RaftDone,
ReqType, RespErr, RollbackReq, RollbackResp, SchemaReq, SchemaResp, SearchReq, SearchResp,
};

pub fn create_client(addr: &str) -> IndexClient {
Expand Down Expand Up @@ -287,6 +287,36 @@ impl Clerk {
}
}

pub fn merge(&mut self) {
let mut merge_req = MergeReq::new();
merge_req.set_client_id(self.client_id);
merge_req.set_seq(self.request_seq);

let mut req = ApplyReq::new();
req.set_client_id(self.client_id);
req.set_req_type(ReqType::Merge);
req.set_merge_req(merge_req);

self.request_seq += 1;

loop {
let reply = self.servers[self.leader_id]
.merge(&req)
.unwrap_or_else(|_e| {
let mut resp = MergeResp::new();
resp.set_err(RespErr::ErrWrongLeader);
resp
});
match reply.err {
RespErr::OK => return,
RespErr::ErrWrongLeader => (),
RespErr::ErrNoKey => return,
}
self.leader_id = (self.leader_id + 1) % self.servers.len();
thread::sleep(Duration::from_millis(100));
}
}

pub fn search(&mut self, query: &str) -> String {
let mut req = SearchReq::new();
req.set_client_id(self.client_id);
Expand Down
1 change: 1 addition & 0 deletions src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod commit;
pub mod delete;
pub mod get;
pub mod leave;
pub mod merge;
pub mod metrics;
pub mod peers;
pub mod rollback;
Expand Down
21 changes: 21 additions & 0 deletions src/cmd/merge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use clap::ArgMatches;

use crate::client::client::{create_client, Clerk};
use crate::util::log::set_logger;

pub fn run_merge_cli(matches: &ArgMatches) -> Result<(), String> {
set_logger();

let servers: Vec<_> = matches
.values_of("SERVERS")
.unwrap()
.map(|addr| create_client(addr))
.collect();

let client_id = rand::random();

let mut client = Clerk::new(&servers, client_id);
client.merge();

Ok(())
}
48 changes: 23 additions & 25 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use bayard::cmd::commit::run_commit_cli;
use bayard::cmd::delete::run_delete_cli;
use bayard::cmd::get::run_get_cli;
use bayard::cmd::leave::run_leave_cli;
use bayard::cmd::merge::run_merge_cli;
use bayard::cmd::metrics::run_metrics_cli;
use bayard::cmd::peers::run_peers_cli;
use bayard::cmd::rollback::run_rollback_cli;
Expand Down Expand Up @@ -304,6 +305,27 @@ fn main() {
.takes_value(true),
)
)
.subcommand(
SubCommand::with_name("merge")
.name("merge")
.setting(AppSettings::DeriveDisplayOrder)
.version(crate_version!())
.author(crate_authors!())
.about("Merge index")
.arg(
Arg::with_name("SERVERS")
.help("The server addresses. Use `,` to separate address. Example: `127.0.0.1:5000,127.0.0.1:5001`")
.short("s")
.long("servers")
.value_name("IP:PORT")
.default_value("127.0.0.1:5000")
.multiple(true)
.use_delimiter(true)
.require_delimiter(true)
.value_delimiter(",")
.takes_value(true),
)
)
.subcommand(
SubCommand::with_name("search")
.name("search")
Expand Down Expand Up @@ -353,30 +375,6 @@ fn main() {
.takes_value(true),
)
)
// .subcommand(
// SubCommand::with_name("version")
// .name("version")
// .setting(AppSettings::DeriveDisplayOrder)
// .version(crate_version!())
// .author(crate_authors!())
// .about("Show remote server version")
// .arg(
// Arg::with_name("HOST")
// .help("The node address")
// .short("H")
// .long("host")
// .default_value("0.0.0.0")
// .takes_value(true),
// )
// .arg(
// Arg::with_name("CLIENT_PORT")
// .help("The gRPC listen port for client connection")
// .short("c")
// .long("client-port")
// .default_value("5000")
// .takes_value(true),
// )
// )
.get_matches();

let (subcommand, some_options) = app.subcommand();
Expand All @@ -391,9 +389,9 @@ fn main() {
"delete" => run_delete_cli,
"commit" => run_commit_cli,
"rollback" => run_rollback_cli,
"merge" => run_merge_cli,
"search" => run_search_cli,
"schema" => run_schema_cli,
// "version" => run_version_cli,
_ => panic!("Subcommand {} is unknown", subcommand),
};

Expand Down
33 changes: 18 additions & 15 deletions src/proto/indexpb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_8_0;

static file_descriptor_proto_data: &'static [u8] = b"\
\n\rindexpb.proto\x12\x07indexpb\x1a\x10indexrpcpb.proto\x1a\reraftpb.pr\
oto2\x82\x05\n\x05Index\x120\n\x04Raft\x12\x10.eraftpb.Message\x1a\x14.i\
oto2\xba\x05\n\x05Index\x120\n\x04Raft\x12\x10.eraftpb.Message\x1a\x14.i\
ndexrpcpb.RaftDone\"\0\x12C\n\x0eRaftConfChange\x12\x19.indexrpcpb.ConfC\
hangeReq\x1a\x14.indexrpcpb.RaftDone\"\0\x126\n\x05Peers\x12\x14.indexrp\
cpb.PeersReq\x1a\x15.indexrpcpb.PeersResp\"\0\x12<\n\x07Metrics\x12\x16.\
Expand All @@ -38,15 +38,16 @@ static file_descriptor_proto_data: &'static [u8] = b"\
lete\x12\x14.indexrpcpb.ApplyReq\x1a\x16.indexrpcpb.DeleteResp\"\0\x128\
\n\x06Commit\x12\x14.indexrpcpb.ApplyReq\x1a\x16.indexrpcpb.CommitResp\"\
\0\x12<\n\x08Rollback\x12\x14.indexrpcpb.ApplyReq\x1a\x18.indexrpcpb.Rol\
lbackResp\"\0\x129\n\x06Search\x12\x15.indexrpcpb.SearchReq\x1a\x16.inde\
xrpcpb.SearchResp\"\0\x129\n\x06Schema\x12\x15.indexrpcpb.SchemaReq\x1a\
\x16.indexrpcpb.SchemaResp\"\0J\xb2\x05\n\x06\x12\x04\0\0\x1d\x01\n\x08\
\n\x01\x0c\x12\x03\0\0\x12\n\x08\n\x01\x02\x12\x03\x01\0\x10\n\t\n\x02\
\x03\0\x12\x03\x03\0\x1a\n\t\n\x02\x03\x01\x12\x03\x04\0\x17\n\n\n\x02\
\x06\0\x12\x04\x06\0\x1d\x01\n\n\n\x03\x06\0\x01\x12\x03\x06\x08\r\n\x0c\
\n\x04\x06\0\x02\0\x12\x04\x07\x04\x08\x05\n\x0c\n\x05\x06\0\x02\0\x01\
\x12\x03\x07\x08\x0c\n\x0c\n\x05\x06\0\x02\0\x02\x12\x03\x07\x0e\x1d\n\
\x0c\n\x05\x06\0\x02\0\x03\x12\x03\x07(;\n\x0c\n\x04\x06\0\x02\x01\x12\
lbackResp\"\0\x126\n\x05Merge\x12\x14.indexrpcpb.ApplyReq\x1a\x15.indexr\
pcpb.MergeResp\"\0\x129\n\x06Search\x12\x15.indexrpcpb.SearchReq\x1a\x16\
.indexrpcpb.SearchResp\"\0\x129\n\x06Schema\x12\x15.indexrpcpb.SchemaReq\
\x1a\x16.indexrpcpb.SchemaResp\"\0J\xea\x05\n\x06\x12\x04\0\0\x1f\x01\n\
\x08\n\x01\x0c\x12\x03\0\0\x12\n\x08\n\x01\x02\x12\x03\x01\0\x10\n\t\n\
\x02\x03\0\x12\x03\x03\0\x1a\n\t\n\x02\x03\x01\x12\x03\x04\0\x17\n\n\n\
\x02\x06\0\x12\x04\x06\0\x1f\x01\n\n\n\x03\x06\0\x01\x12\x03\x06\x08\r\n\
\x0c\n\x04\x06\0\x02\0\x12\x04\x07\x04\x08\x05\n\x0c\n\x05\x06\0\x02\0\
\x01\x12\x03\x07\x08\x0c\n\x0c\n\x05\x06\0\x02\0\x02\x12\x03\x07\x0e\x1d\
\n\x0c\n\x05\x06\0\x02\0\x03\x12\x03\x07(;\n\x0c\n\x04\x06\0\x02\x01\x12\
\x04\t\x04\n\x05\n\x0c\n\x05\x06\0\x02\x01\x01\x12\x03\t\x08\x16\n\x0c\n\
\x05\x06\0\x02\x01\x02\x12\x03\t\x180\n\x0c\n\x05\x06\0\x02\x01\x03\x12\
\x03\t;N\n\x0c\n\x04\x06\0\x02\x02\x12\x04\x0b\x04\x0c\x05\n\x0c\n\x05\
Expand All @@ -69,11 +70,13 @@ static file_descriptor_proto_data: &'static [u8] = b"\
\x05\x06\0\x02\x08\x01\x12\x03\x17\x08\x10\n\x0c\n\x05\x06\0\x02\x08\x02\
\x12\x03\x17\x12%\n\x0c\n\x05\x06\0\x02\x08\x03\x12\x03\x170G\n\x0c\n\
\x04\x06\0\x02\t\x12\x04\x19\x04\x1a\x05\n\x0c\n\x05\x06\0\x02\t\x01\x12\
\x03\x19\x08\x0e\n\x0c\n\x05\x06\0\x02\t\x02\x12\x03\x19\x10$\n\x0c\n\
\x05\x06\0\x02\t\x03\x12\x03\x19/D\n\x0c\n\x04\x06\0\x02\n\x12\x04\x1b\
\x04\x1c\x05\n\x0c\n\x05\x06\0\x02\n\x01\x12\x03\x1b\x08\x0e\n\x0c\n\x05\
\x06\0\x02\n\x02\x12\x03\x1b\x10$\n\x0c\n\x05\x06\0\x02\n\x03\x12\x03\
\x1b/Db\x06proto3\
\x03\x19\x08\r\n\x0c\n\x05\x06\0\x02\t\x02\x12\x03\x19\x0f\"\n\x0c\n\x05\
\x06\0\x02\t\x03\x12\x03\x19-A\n\x0c\n\x04\x06\0\x02\n\x12\x04\x1b\x04\
\x1c\x05\n\x0c\n\x05\x06\0\x02\n\x01\x12\x03\x1b\x08\x0e\n\x0c\n\x05\x06\
\0\x02\n\x02\x12\x03\x1b\x10$\n\x0c\n\x05\x06\0\x02\n\x03\x12\x03\x1b/D\
\n\x0c\n\x04\x06\0\x02\x0b\x12\x04\x1d\x04\x1e\x05\n\x0c\n\x05\x06\0\x02\
\x0b\x01\x12\x03\x1d\x08\x0e\n\x0c\n\x05\x06\0\x02\x0b\x02\x12\x03\x1d\
\x10$\n\x0c\n\x05\x06\0\x02\x0b\x03\x12\x03\x1d/Db\x06proto3\
";

static mut file_descriptor_proto_lazy: ::protobuf::lazy::Lazy<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::lazy::Lazy {
Expand Down
28 changes: 28 additions & 0 deletions src/proto/indexpb_grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ const METHOD_INDEX_ROLLBACK: ::grpcio::Method<super::indexrpcpb::ApplyReq, super
resp_mar: ::grpcio::Marshaller { ser: ::grpcio::pb_ser, de: ::grpcio::pb_de },
};

const METHOD_INDEX_MERGE: ::grpcio::Method<super::indexrpcpb::ApplyReq, super::indexrpcpb::MergeResp> = ::grpcio::Method {
ty: ::grpcio::MethodType::Unary,
name: "/indexpb.Index/Merge",
req_mar: ::grpcio::Marshaller { ser: ::grpcio::pb_ser, de: ::grpcio::pb_de },
resp_mar: ::grpcio::Marshaller { ser: ::grpcio::pb_ser, de: ::grpcio::pb_de },
};

const METHOD_INDEX_SEARCH: ::grpcio::Method<super::indexrpcpb::SearchReq, super::indexrpcpb::SearchResp> = ::grpcio::Method {
ty: ::grpcio::MethodType::Unary,
name: "/indexpb.Index/Search",
Expand Down Expand Up @@ -251,6 +258,22 @@ impl IndexClient {
self.rollback_async_opt(req, ::grpcio::CallOption::default())
}

pub fn merge_opt(&self, req: &super::indexrpcpb::ApplyReq, opt: ::grpcio::CallOption) -> ::grpcio::Result<super::indexrpcpb::MergeResp> {
self.client.unary_call(&METHOD_INDEX_MERGE, req, opt)
}

pub fn merge(&self, req: &super::indexrpcpb::ApplyReq) -> ::grpcio::Result<super::indexrpcpb::MergeResp> {
self.merge_opt(req, ::grpcio::CallOption::default())
}

pub fn merge_async_opt(&self, req: &super::indexrpcpb::ApplyReq, opt: ::grpcio::CallOption) -> ::grpcio::Result<::grpcio::ClientUnaryReceiver<super::indexrpcpb::MergeResp>> {
self.client.unary_call_async(&METHOD_INDEX_MERGE, req, opt)
}

pub fn merge_async(&self, req: &super::indexrpcpb::ApplyReq) -> ::grpcio::Result<::grpcio::ClientUnaryReceiver<super::indexrpcpb::MergeResp>> {
self.merge_async_opt(req, ::grpcio::CallOption::default())
}

pub fn search_opt(&self, req: &super::indexrpcpb::SearchReq, opt: ::grpcio::CallOption) -> ::grpcio::Result<super::indexrpcpb::SearchResp> {
self.client.unary_call(&METHOD_INDEX_SEARCH, req, opt)
}
Expand Down Expand Up @@ -297,6 +320,7 @@ pub trait Index {
fn delete(&mut self, ctx: ::grpcio::RpcContext, req: super::indexrpcpb::ApplyReq, sink: ::grpcio::UnarySink<super::indexrpcpb::DeleteResp>);
fn commit(&mut self, ctx: ::grpcio::RpcContext, req: super::indexrpcpb::ApplyReq, sink: ::grpcio::UnarySink<super::indexrpcpb::CommitResp>);
fn rollback(&mut self, ctx: ::grpcio::RpcContext, req: super::indexrpcpb::ApplyReq, sink: ::grpcio::UnarySink<super::indexrpcpb::RollbackResp>);
fn merge(&mut self, ctx: ::grpcio::RpcContext, req: super::indexrpcpb::ApplyReq, sink: ::grpcio::UnarySink<super::indexrpcpb::MergeResp>);
fn search(&mut self, ctx: ::grpcio::RpcContext, req: super::indexrpcpb::SearchReq, sink: ::grpcio::UnarySink<super::indexrpcpb::SearchResp>);
fn schema(&mut self, ctx: ::grpcio::RpcContext, req: super::indexrpcpb::SchemaReq, sink: ::grpcio::UnarySink<super::indexrpcpb::SchemaResp>);
}
Expand Down Expand Up @@ -340,6 +364,10 @@ pub fn create_index<S: Index + Send + Clone + 'static>(s: S) -> ::grpcio::Servic
instance.rollback(ctx, req, resp)
});
let mut instance = s.clone();
builder = builder.add_unary_handler(&METHOD_INDEX_MERGE, move |ctx, req, resp| {
instance.merge(ctx, req, resp)
});
let mut instance = s.clone();
builder = builder.add_unary_handler(&METHOD_INDEX_SEARCH, move |ctx, req, resp| {
instance.search(ctx, req, resp)
});
Expand Down
Loading