这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions proto/indexpb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ service Index {
}
rpc Commit (indexrpcpb.ApplyReq) returns (indexrpcpb.CommitResp) {
}
rpc Rollback (indexrpcpb.ApplyReq) returns (indexrpcpb.RollbackResp) {
}
rpc Search (indexrpcpb.SearchReq) returns (indexrpcpb.SearchResp) {
}
rpc Schema (indexrpcpb.SchemaReq) returns (indexrpcpb.SchemaResp) {
Expand Down
11 changes: 11 additions & 0 deletions proto/indexrpcpb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ enum ReqType {
Put = 2;
Delete = 3;
Commit = 4;
Rollback = 5;
}

message ApplyReq {
Expand All @@ -25,6 +26,7 @@ message ApplyReq {
PutReq put_req = 5;
DeleteReq delete_req = 6;
CommitReq commit_req = 7;
RollbackReq rollback_req = 8;
}

message ConfChangeReq {
Expand Down Expand Up @@ -110,6 +112,15 @@ message CommitResp {
RespErr err = 1;
}

message RollbackReq {
uint64 client_id = 1;
uint64 seq = 2;
}

message RollbackResp {
RespErr err = 1;
}

message SearchReq {
uint64 client_id = 1;
uint64 seq = 2;
Expand Down
32 changes: 31 additions & 1 deletion src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::proto::indexpb_grpc::IndexClient;
use crate::proto::indexrpcpb::{
ApplyReq, CommitReq, CommitResp, ConfChangeReq, DeleteReq, DeleteResp, GetReq, GetResp,
MetricsReq, MetricsResp, PeersReq, PeersResp, PutReq, PutResp, RaftDone, ReqType, RespErr,
SchemaReq, SchemaResp, SearchReq, SearchResp,
RollbackReq, RollbackResp, SchemaReq, SchemaResp, SearchReq, SearchResp,
};

pub fn create_client(addr: &str) -> IndexClient {
Expand Down Expand Up @@ -257,6 +257,36 @@ impl Clerk {
}
}

pub fn rollback(&mut self) {
let mut rollback_req = RollbackReq::new();
rollback_req.set_client_id(self.client_id);
rollback_req.set_seq(self.request_seq);

let mut req = ApplyReq::new();
req.set_client_id(self.client_id);
req.set_req_type(ReqType::Rollback);
req.set_rollback_req(rollback_req);

self.request_seq += 1;

loop {
let reply = self.servers[self.leader_id]
.rollback(&req)
.unwrap_or_else(|_e| {
let mut resp = RollbackResp::new();
resp.set_err(RespErr::ErrWrongLeader);
resp
});
match reply.err {
RespErr::OK => return,
RespErr::ErrWrongLeader => (),
RespErr::ErrNoKey => return,
}
self.leader_id = (self.leader_id + 1) % self.servers.len();
thread::sleep(Duration::from_millis(100));
}
}

pub fn search(&mut self, query: &str) -> String {
let mut req = SearchReq::new();
req.set_client_id(self.client_id);
Expand Down
1 change: 1 addition & 0 deletions src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod get;
pub mod leave;
pub mod metrics;
pub mod peers;
pub mod rollback;
pub mod schema;
pub mod search;
pub mod serve;
Expand Down
21 changes: 21 additions & 0 deletions src/cmd/rollback.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use clap::ArgMatches;

use crate::client::client::{create_client, Clerk};
use crate::util::log::set_logger;

pub fn run_rollback_cli(matches: &ArgMatches) -> Result<(), String> {
set_logger();

let servers: Vec<_> = matches
.values_of("SERVERS")
.unwrap()
.map(|addr| create_client(addr))
.collect();

let client_id = rand::random();

let mut client = Clerk::new(&servers, client_id);
client.rollback();

Ok(())
}
23 changes: 23 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use bayard::cmd::get::run_get_cli;
use bayard::cmd::leave::run_leave_cli;
use bayard::cmd::metrics::run_metrics_cli;
use bayard::cmd::peers::run_peers_cli;
use bayard::cmd::rollback::run_rollback_cli;
use bayard::cmd::schema::run_schema_cli;
use bayard::cmd::search::run_search_cli;
use bayard::cmd::serve::run_serve_cli;
Expand Down Expand Up @@ -282,6 +283,27 @@ fn main() {
.takes_value(true),
)
)
.subcommand(
SubCommand::with_name("rollback")
.name("rollback")
.setting(AppSettings::DeriveDisplayOrder)
.version(crate_version!())
.author(crate_authors!())
.about("Rollback index")
.arg(
Arg::with_name("SERVERS")
.help("The server addresses. Use `,` to separate address. Example: `127.0.0.1:5000,127.0.0.1:5001`")
.short("s")
.long("servers")
.value_name("IP:PORT")
.default_value("127.0.0.1:5000")
.multiple(true)
.use_delimiter(true)
.require_delimiter(true)
.value_delimiter(",")
.takes_value(true),
)
)
.subcommand(
SubCommand::with_name("search")
.name("search")
Expand Down Expand Up @@ -368,6 +390,7 @@ fn main() {
"get" => run_get_cli,
"delete" => run_delete_cli,
"commit" => run_commit_cli,
"rollback" => run_rollback_cli,
"search" => run_search_cli,
"schema" => run_schema_cli,
// "version" => run_version_cli,
Expand Down
72 changes: 38 additions & 34 deletions src/proto/indexpb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_8_0;

static file_descriptor_proto_data: &'static [u8] = b"\
\n\rindexpb.proto\x12\x07indexpb\x1a\x10indexrpcpb.proto\x1a\reraftpb.pr\
oto2\xc4\x04\n\x05Index\x120\n\x04Raft\x12\x10.eraftpb.Message\x1a\x14.i\
oto2\x82\x05\n\x05Index\x120\n\x04Raft\x12\x10.eraftpb.Message\x1a\x14.i\
ndexrpcpb.RaftDone\"\0\x12C\n\x0eRaftConfChange\x12\x19.indexrpcpb.ConfC\
hangeReq\x1a\x14.indexrpcpb.RaftDone\"\0\x126\n\x05Peers\x12\x14.indexrp\
cpb.PeersReq\x1a\x15.indexrpcpb.PeersResp\"\0\x12<\n\x07Metrics\x12\x16.\
Expand All @@ -37,39 +37,43 @@ static file_descriptor_proto_data: &'static [u8] = b"\
\x12\x14.indexrpcpb.ApplyReq\x1a\x13.indexrpcpb.PutResp\"\0\x128\n\x06De\
lete\x12\x14.indexrpcpb.ApplyReq\x1a\x16.indexrpcpb.DeleteResp\"\0\x128\
\n\x06Commit\x12\x14.indexrpcpb.ApplyReq\x1a\x16.indexrpcpb.CommitResp\"\
\0\x129\n\x06Search\x12\x15.indexrpcpb.SearchReq\x1a\x16.indexrpcpb.Sear\
chResp\"\0\x129\n\x06Schema\x12\x15.indexrpcpb.SchemaReq\x1a\x16.indexrp\
cpb.SchemaResp\"\0J\xfa\x04\n\x06\x12\x04\0\0\x1b\x01\n\x08\n\x01\x0c\
\x12\x03\0\0\x12\n\x08\n\x01\x02\x12\x03\x01\0\x10\n\t\n\x02\x03\0\x12\
\x03\x03\0\x1a\n\t\n\x02\x03\x01\x12\x03\x04\0\x17\n\n\n\x02\x06\0\x12\
\x04\x06\0\x1b\x01\n\n\n\x03\x06\0\x01\x12\x03\x06\x08\r\n\x0c\n\x04\x06\
\0\x02\0\x12\x04\x07\x04\x08\x05\n\x0c\n\x05\x06\0\x02\0\x01\x12\x03\x07\
\x08\x0c\n\x0c\n\x05\x06\0\x02\0\x02\x12\x03\x07\x0e\x1d\n\x0c\n\x05\x06\
\0\x02\0\x03\x12\x03\x07(;\n\x0c\n\x04\x06\0\x02\x01\x12\x04\t\x04\n\x05\
\n\x0c\n\x05\x06\0\x02\x01\x01\x12\x03\t\x08\x16\n\x0c\n\x05\x06\0\x02\
\x01\x02\x12\x03\t\x180\n\x0c\n\x05\x06\0\x02\x01\x03\x12\x03\t;N\n\x0c\
\n\x04\x06\0\x02\x02\x12\x04\x0b\x04\x0c\x05\n\x0c\n\x05\x06\0\x02\x02\
\x01\x12\x03\x0b\x08\r\n\x0c\n\x05\x06\0\x02\x02\x02\x12\x03\x0b\x0f\"\n\
\x0c\n\x05\x06\0\x02\x02\x03\x12\x03\x0b-A\n\x0c\n\x04\x06\0\x02\x03\x12\
\x04\r\x04\x0e\x05\n\x0c\n\x05\x06\0\x02\x03\x01\x12\x03\r\x08\x0f\n\x0c\
\n\x05\x06\0\x02\x03\x02\x12\x03\r\x11&\n\x0c\n\x05\x06\0\x02\x03\x03\
\x12\x03\r1G\n\x0c\n\x04\x06\0\x02\x04\x12\x04\x0f\x04\x10\x05\n\x0c\n\
\x05\x06\0\x02\x04\x01\x12\x03\x0f\x08\x0b\n\x0c\n\x05\x06\0\x02\x04\x02\
\x12\x03\x0f\r\x1e\n\x0c\n\x05\x06\0\x02\x04\x03\x12\x03\x0f);\n\x0c\n\
\x04\x06\0\x02\x05\x12\x04\x11\x04\x12\x05\n\x0c\n\x05\x06\0\x02\x05\x01\
\x12\x03\x11\x08\x0b\n\x0c\n\x05\x06\0\x02\x05\x02\x12\x03\x11\r\x20\n\
\x0c\n\x05\x06\0\x02\x05\x03\x12\x03\x11+=\n\x0c\n\x04\x06\0\x02\x06\x12\
\x04\x13\x04\x14\x05\n\x0c\n\x05\x06\0\x02\x06\x01\x12\x03\x13\x08\x0e\n\
\x0c\n\x05\x06\0\x02\x06\x02\x12\x03\x13\x10#\n\x0c\n\x05\x06\0\x02\x06\
\x03\x12\x03\x13.C\n\x0c\n\x04\x06\0\x02\x07\x12\x04\x15\x04\x16\x05\n\
\x0c\n\x05\x06\0\x02\x07\x01\x12\x03\x15\x08\x0e\n\x0c\n\x05\x06\0\x02\
\x07\x02\x12\x03\x15\x10#\n\x0c\n\x05\x06\0\x02\x07\x03\x12\x03\x15.C\n\
\x0c\n\x04\x06\0\x02\x08\x12\x04\x17\x04\x18\x05\n\x0c\n\x05\x06\0\x02\
\x08\x01\x12\x03\x17\x08\x0e\n\x0c\n\x05\x06\0\x02\x08\x02\x12\x03\x17\
\x10$\n\x0c\n\x05\x06\0\x02\x08\x03\x12\x03\x17/D\n\x0c\n\x04\x06\0\x02\
\t\x12\x04\x19\x04\x1a\x05\n\x0c\n\x05\x06\0\x02\t\x01\x12\x03\x19\x08\
\x0e\n\x0c\n\x05\x06\0\x02\t\x02\x12\x03\x19\x10$\n\x0c\n\x05\x06\0\x02\
\t\x03\x12\x03\x19/Db\x06proto3\
\0\x12<\n\x08Rollback\x12\x14.indexrpcpb.ApplyReq\x1a\x18.indexrpcpb.Rol\
lbackResp\"\0\x129\n\x06Search\x12\x15.indexrpcpb.SearchReq\x1a\x16.inde\
xrpcpb.SearchResp\"\0\x129\n\x06Schema\x12\x15.indexrpcpb.SchemaReq\x1a\
\x16.indexrpcpb.SchemaResp\"\0J\xb2\x05\n\x06\x12\x04\0\0\x1d\x01\n\x08\
\n\x01\x0c\x12\x03\0\0\x12\n\x08\n\x01\x02\x12\x03\x01\0\x10\n\t\n\x02\
\x03\0\x12\x03\x03\0\x1a\n\t\n\x02\x03\x01\x12\x03\x04\0\x17\n\n\n\x02\
\x06\0\x12\x04\x06\0\x1d\x01\n\n\n\x03\x06\0\x01\x12\x03\x06\x08\r\n\x0c\
\n\x04\x06\0\x02\0\x12\x04\x07\x04\x08\x05\n\x0c\n\x05\x06\0\x02\0\x01\
\x12\x03\x07\x08\x0c\n\x0c\n\x05\x06\0\x02\0\x02\x12\x03\x07\x0e\x1d\n\
\x0c\n\x05\x06\0\x02\0\x03\x12\x03\x07(;\n\x0c\n\x04\x06\0\x02\x01\x12\
\x04\t\x04\n\x05\n\x0c\n\x05\x06\0\x02\x01\x01\x12\x03\t\x08\x16\n\x0c\n\
\x05\x06\0\x02\x01\x02\x12\x03\t\x180\n\x0c\n\x05\x06\0\x02\x01\x03\x12\
\x03\t;N\n\x0c\n\x04\x06\0\x02\x02\x12\x04\x0b\x04\x0c\x05\n\x0c\n\x05\
\x06\0\x02\x02\x01\x12\x03\x0b\x08\r\n\x0c\n\x05\x06\0\x02\x02\x02\x12\
\x03\x0b\x0f\"\n\x0c\n\x05\x06\0\x02\x02\x03\x12\x03\x0b-A\n\x0c\n\x04\
\x06\0\x02\x03\x12\x04\r\x04\x0e\x05\n\x0c\n\x05\x06\0\x02\x03\x01\x12\
\x03\r\x08\x0f\n\x0c\n\x05\x06\0\x02\x03\x02\x12\x03\r\x11&\n\x0c\n\x05\
\x06\0\x02\x03\x03\x12\x03\r1G\n\x0c\n\x04\x06\0\x02\x04\x12\x04\x0f\x04\
\x10\x05\n\x0c\n\x05\x06\0\x02\x04\x01\x12\x03\x0f\x08\x0b\n\x0c\n\x05\
\x06\0\x02\x04\x02\x12\x03\x0f\r\x1e\n\x0c\n\x05\x06\0\x02\x04\x03\x12\
\x03\x0f);\n\x0c\n\x04\x06\0\x02\x05\x12\x04\x11\x04\x12\x05\n\x0c\n\x05\
\x06\0\x02\x05\x01\x12\x03\x11\x08\x0b\n\x0c\n\x05\x06\0\x02\x05\x02\x12\
\x03\x11\r\x20\n\x0c\n\x05\x06\0\x02\x05\x03\x12\x03\x11+=\n\x0c\n\x04\
\x06\0\x02\x06\x12\x04\x13\x04\x14\x05\n\x0c\n\x05\x06\0\x02\x06\x01\x12\
\x03\x13\x08\x0e\n\x0c\n\x05\x06\0\x02\x06\x02\x12\x03\x13\x10#\n\x0c\n\
\x05\x06\0\x02\x06\x03\x12\x03\x13.C\n\x0c\n\x04\x06\0\x02\x07\x12\x04\
\x15\x04\x16\x05\n\x0c\n\x05\x06\0\x02\x07\x01\x12\x03\x15\x08\x0e\n\x0c\
\n\x05\x06\0\x02\x07\x02\x12\x03\x15\x10#\n\x0c\n\x05\x06\0\x02\x07\x03\
\x12\x03\x15.C\n\x0c\n\x04\x06\0\x02\x08\x12\x04\x17\x04\x18\x05\n\x0c\n\
\x05\x06\0\x02\x08\x01\x12\x03\x17\x08\x10\n\x0c\n\x05\x06\0\x02\x08\x02\
\x12\x03\x17\x12%\n\x0c\n\x05\x06\0\x02\x08\x03\x12\x03\x170G\n\x0c\n\
\x04\x06\0\x02\t\x12\x04\x19\x04\x1a\x05\n\x0c\n\x05\x06\0\x02\t\x01\x12\
\x03\x19\x08\x0e\n\x0c\n\x05\x06\0\x02\t\x02\x12\x03\x19\x10$\n\x0c\n\
\x05\x06\0\x02\t\x03\x12\x03\x19/D\n\x0c\n\x04\x06\0\x02\n\x12\x04\x1b\
\x04\x1c\x05\n\x0c\n\x05\x06\0\x02\n\x01\x12\x03\x1b\x08\x0e\n\x0c\n\x05\
\x06\0\x02\n\x02\x12\x03\x1b\x10$\n\x0c\n\x05\x06\0\x02\n\x03\x12\x03\
\x1b/Db\x06proto3\
";

static mut file_descriptor_proto_lazy: ::protobuf::lazy::Lazy<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::lazy::Lazy {
Expand Down
28 changes: 28 additions & 0 deletions src/proto/indexpb_grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ const METHOD_INDEX_COMMIT: ::grpcio::Method<super::indexrpcpb::ApplyReq, super::
resp_mar: ::grpcio::Marshaller { ser: ::grpcio::pb_ser, de: ::grpcio::pb_de },
};

const METHOD_INDEX_ROLLBACK: ::grpcio::Method<super::indexrpcpb::ApplyReq, super::indexrpcpb::RollbackResp> = ::grpcio::Method {
ty: ::grpcio::MethodType::Unary,
name: "/indexpb.Index/Rollback",
req_mar: ::grpcio::Marshaller { ser: ::grpcio::pb_ser, de: ::grpcio::pb_de },
resp_mar: ::grpcio::Marshaller { ser: ::grpcio::pb_ser, de: ::grpcio::pb_de },
};

const METHOD_INDEX_SEARCH: ::grpcio::Method<super::indexrpcpb::SearchReq, super::indexrpcpb::SearchResp> = ::grpcio::Method {
ty: ::grpcio::MethodType::Unary,
name: "/indexpb.Index/Search",
Expand Down Expand Up @@ -228,6 +235,22 @@ impl IndexClient {
self.commit_async_opt(req, ::grpcio::CallOption::default())
}

pub fn rollback_opt(&self, req: &super::indexrpcpb::ApplyReq, opt: ::grpcio::CallOption) -> ::grpcio::Result<super::indexrpcpb::RollbackResp> {
self.client.unary_call(&METHOD_INDEX_ROLLBACK, req, opt)
}

pub fn rollback(&self, req: &super::indexrpcpb::ApplyReq) -> ::grpcio::Result<super::indexrpcpb::RollbackResp> {
self.rollback_opt(req, ::grpcio::CallOption::default())
}

pub fn rollback_async_opt(&self, req: &super::indexrpcpb::ApplyReq, opt: ::grpcio::CallOption) -> ::grpcio::Result<::grpcio::ClientUnaryReceiver<super::indexrpcpb::RollbackResp>> {
self.client.unary_call_async(&METHOD_INDEX_ROLLBACK, req, opt)
}

pub fn rollback_async(&self, req: &super::indexrpcpb::ApplyReq) -> ::grpcio::Result<::grpcio::ClientUnaryReceiver<super::indexrpcpb::RollbackResp>> {
self.rollback_async_opt(req, ::grpcio::CallOption::default())
}

pub fn search_opt(&self, req: &super::indexrpcpb::SearchReq, opt: ::grpcio::CallOption) -> ::grpcio::Result<super::indexrpcpb::SearchResp> {
self.client.unary_call(&METHOD_INDEX_SEARCH, req, opt)
}
Expand Down Expand Up @@ -273,6 +296,7 @@ pub trait Index {
fn put(&mut self, ctx: ::grpcio::RpcContext, req: super::indexrpcpb::ApplyReq, sink: ::grpcio::UnarySink<super::indexrpcpb::PutResp>);
fn delete(&mut self, ctx: ::grpcio::RpcContext, req: super::indexrpcpb::ApplyReq, sink: ::grpcio::UnarySink<super::indexrpcpb::DeleteResp>);
fn commit(&mut self, ctx: ::grpcio::RpcContext, req: super::indexrpcpb::ApplyReq, sink: ::grpcio::UnarySink<super::indexrpcpb::CommitResp>);
fn rollback(&mut self, ctx: ::grpcio::RpcContext, req: super::indexrpcpb::ApplyReq, sink: ::grpcio::UnarySink<super::indexrpcpb::RollbackResp>);
fn search(&mut self, ctx: ::grpcio::RpcContext, req: super::indexrpcpb::SearchReq, sink: ::grpcio::UnarySink<super::indexrpcpb::SearchResp>);
fn schema(&mut self, ctx: ::grpcio::RpcContext, req: super::indexrpcpb::SchemaReq, sink: ::grpcio::UnarySink<super::indexrpcpb::SchemaResp>);
}
Expand Down Expand Up @@ -312,6 +336,10 @@ pub fn create_index<S: Index + Send + Clone + 'static>(s: S) -> ::grpcio::Servic
instance.commit(ctx, req, resp)
});
let mut instance = s.clone();
builder = builder.add_unary_handler(&METHOD_INDEX_ROLLBACK, move |ctx, req, resp| {
instance.rollback(ctx, req, resp)
});
let mut instance = s.clone();
builder = builder.add_unary_handler(&METHOD_INDEX_SEARCH, move |ctx, req, resp| {
instance.search(ctx, req, resp)
});
Expand Down
Loading