这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions proto/indexpb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,20 @@ service Index {
}
rpc RaftConfChange (indexrpcpb.ConfChangeReq) returns (indexrpcpb.RaftDone) {
}
rpc Peers (indexrpcpb.IndexReq) returns (indexrpcpb.PeersResp) {
rpc Peers (indexrpcpb.PeersReq) returns (indexrpcpb.PeersResp) {
}
rpc Metrics (indexrpcpb.IndexReq) returns (indexrpcpb.MetricsResp) {
rpc Metrics (indexrpcpb.MetricsReq) returns (indexrpcpb.MetricsResp) {
}
rpc Get (indexrpcpb.IndexReq) returns (indexrpcpb.GetResp) {
rpc Get (indexrpcpb.GetReq) returns (indexrpcpb.GetResp) {
}
rpc Put (indexrpcpb.IndexReq) returns (indexrpcpb.PutResp) {
rpc Put (indexrpcpb.ApplyReq) returns (indexrpcpb.PutResp) {
}
rpc Delete (indexrpcpb.IndexReq) returns (indexrpcpb.DeleteResp) {
rpc Delete (indexrpcpb.ApplyReq) returns (indexrpcpb.DeleteResp) {
}
rpc Commit (indexrpcpb.IndexReq) returns (indexrpcpb.CommitResp) {
rpc Commit (indexrpcpb.ApplyReq) returns (indexrpcpb.CommitResp) {
}
rpc Search (indexrpcpb.IndexReq) returns (indexrpcpb.SearchResp) {
rpc Search (indexrpcpb.SearchReq) returns (indexrpcpb.SearchResp) {
}
rpc Schema (indexrpcpb.IndexReq) returns (indexrpcpb.SchemaResp) {
rpc Schema (indexrpcpb.SchemaReq) returns (indexrpcpb.SchemaResp) {
}
}
125 changes: 88 additions & 37 deletions proto/indexrpcpb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,72 +10,123 @@ enum RespErr {
}

enum ReqType {
Get = 0;
Put = 1;
Delete = 2;
Commit = 3;
Search = 4;
Schema = 5;
Join = 6;
Leave = 7;
Peers = 8;
Metrics = 9;
}

message IndexReq {
Join = 0;
Leave = 1;
Put = 2;
Delete = 3;
Commit = 4;
}

message ApplyReq {
uint64 client_id = 1;
ReqType req_type = 2;
JoinReq join_req = 3;
LeaveReq leave_req = 4;
PutReq put_req = 5;
DeleteReq delete_req = 6;
CommitReq commit_req = 7;
}

message ConfChangeReq {
eraftpb.ConfChange cc = 1;
string ip = 2;
uint32 port = 3;
}

message RaftDone {
RespErr err = 1;
}

message JoinReq {
uint64 client_id = 1;
uint64 peer_id = 2;
string peer_addr = 3;
}

message LeaveReq {
uint64 client_id = 1;
uint64 peer_id = 2;
string peer_addr = 3;
}

message PeersReq {
uint64 client_id = 1;
uint64 seq = 2;
}

message PeersResp {
string value = 1;
RespErr err = 2;
}

message MetricsReq {
uint64 client_id = 1;
uint64 seq = 2;
}

message MetricsResp {
string value = 1;
RespErr err = 2;
}

message GetReq {
uint64 client_id = 1;
uint64 seq = 2;
ReqType req_type = 3;
uint64 peer_id = 4;
string peer_addr = 5;
string key = 6;
string value = 7;
string query = 8;
string doc_id = 3;
}

message GetResp {
string value = 1;
RespErr err = 2;
}

message PutReq {
uint64 client_id = 1;
uint64 seq = 2;
string doc_id = 3;
string fields = 4;
}

message PutResp {
RespErr err = 1;
}

message DeleteReq {
uint64 client_id = 1;
uint64 seq = 2;
string doc_id = 3;
}

message DeleteResp {
RespErr err = 1;
}

message CommitReq {
uint64 client_id = 1;
uint64 seq = 2;
}

message CommitResp {
RespErr err = 1;
}

message SearchResp {
string value = 1;
RespErr err = 2;
message SearchReq {
uint64 client_id = 1;
uint64 seq = 2;
string query = 3;
}

message SchemaResp {
message SearchResp {
string value = 1;
RespErr err = 2;
}

message ConfChangeReq {
eraftpb.ConfChange cc = 1;
string ip = 2;
uint32 port = 3;
}

message PeersResp {
string value = 1;
RespErr err = 2;
message SchemaReq {
uint64 client_id = 1;
uint64 seq = 2;
}

message MetricsResp {
message SchemaResp {
string value = 1;
RespErr err = 2;
}

message RaftDone {
RespErr err = 1;
}
74 changes: 43 additions & 31 deletions src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ use raft::eraftpb::{ConfChange, ConfChangeType};

use crate::proto::indexpb_grpc::IndexClient;
use crate::proto::indexrpcpb::{
CommitResp, ConfChangeReq, DeleteResp, GetResp, IndexReq, MetricsResp, PeersResp, PutResp,
RaftDone, ReqType, RespErr, SchemaResp, SearchResp,
ApplyReq, CommitReq, CommitResp, ConfChangeReq, DeleteReq, DeleteResp, GetReq, GetResp,
MetricsReq, MetricsResp, PeersReq, PeersResp, PutReq, PutResp, RaftDone, ReqType, RespErr,
SchemaReq, SchemaResp, SearchReq, SearchResp,
};

pub fn create_client(addr: &str) -> IndexClient {
Expand Down Expand Up @@ -41,14 +42,15 @@ impl Clerk {
cc.set_id(id);
cc.set_node_id(id);
cc.set_change_type(ConfChangeType::AddNode);
let mut req = ConfChangeReq::new();
req.set_cc(cc);
req.set_ip(ip.to_string());
req.set_port(port as u32);

let mut cc_req = ConfChangeReq::new();
cc_req.set_cc(cc);
cc_req.set_ip(ip.to_string());
cc_req.set_port(port as u32);

loop {
let reply = self.servers[self.leader_id]
.raft_conf_change(&req)
.raft_conf_change(&cc_req)
.unwrap_or_else(|e| {
error!("{:?}", e);
let mut resp = RaftDone::new();
Expand All @@ -70,12 +72,12 @@ impl Clerk {
cc.set_id(id);
cc.set_node_id(id);
cc.set_change_type(ConfChangeType::RemoveNode);
let mut req = ConfChangeReq::new();
req.set_cc(cc);
let mut cc_req = ConfChangeReq::new();
cc_req.set_cc(cc);

loop {
let reply = self.servers[self.leader_id]
.raft_conf_change(&req)
.raft_conf_change(&cc_req)
.unwrap_or_else(|e| {
error!("{:?}", e);
let mut resp = RaftDone::new();
Expand All @@ -93,9 +95,8 @@ impl Clerk {
}

pub fn peers(&mut self) -> String {
let mut req = IndexReq::new();
let mut req = PeersReq::new();
req.set_client_id(self.client_id);
req.set_req_type(ReqType::Peers);
req.set_seq(self.request_seq);
self.request_seq += 1;

Expand All @@ -118,9 +119,8 @@ impl Clerk {
}

pub fn metrics(&mut self) -> String {
let mut req = IndexReq::new();
let mut req = MetricsReq::new();
req.set_client_id(self.client_id);
req.set_req_type(ReqType::Metrics);
req.set_seq(self.request_seq);
self.request_seq += 1;

Expand All @@ -142,12 +142,11 @@ impl Clerk {
}
}

pub fn get(&mut self, key: &str) -> String {
let mut req = IndexReq::new();
pub fn get(&mut self, doc_id: &str) -> String {
let mut req = GetReq::new();
req.set_client_id(self.client_id);
req.set_req_type(ReqType::Get);
req.set_seq(self.request_seq);
req.set_key(key.to_owned());
req.set_doc_id(doc_id.to_owned());
self.request_seq += 1;

loop {
Expand All @@ -167,12 +166,17 @@ impl Clerk {
}

pub fn put(&mut self, key: &str, value: &str) {
let mut req = IndexReq::new();
let mut put_req = PutReq::new();
put_req.set_client_id(self.client_id);
put_req.set_seq(self.request_seq);
put_req.set_doc_id(key.to_owned());
put_req.set_fields(value.to_owned());

let mut req = ApplyReq::new();
req.set_client_id(self.client_id);
req.set_req_type(ReqType::Put);
req.set_seq(self.request_seq);
req.set_key(key.to_owned());
req.set_value(value.to_owned());
req.set_put_req(put_req);

self.request_seq += 1;

loop {
Expand All @@ -193,11 +197,16 @@ impl Clerk {
}

pub fn delete(&mut self, key: &str) {
let mut req = IndexReq::new();
let mut delete_req = DeleteReq::new();
delete_req.set_client_id(self.client_id);
delete_req.set_seq(self.request_seq);
delete_req.set_doc_id(key.to_owned());

let mut req = ApplyReq::new();
req.set_client_id(self.client_id);
req.set_req_type(ReqType::Delete);
req.set_seq(self.request_seq);
req.set_key(key.to_owned());
req.set_delete_req(delete_req);

self.request_seq += 1;

loop {
Expand All @@ -219,10 +228,15 @@ impl Clerk {
}

pub fn commit(&mut self) {
let mut req = IndexReq::new();
let mut commit_req = CommitReq::new();
commit_req.set_client_id(self.client_id);
commit_req.set_seq(self.request_seq);

let mut req = ApplyReq::new();
req.set_client_id(self.client_id);
req.set_req_type(ReqType::Commit);
req.set_seq(self.request_seq);
req.set_commit_req(commit_req);

self.request_seq += 1;

loop {
Expand All @@ -244,9 +258,8 @@ impl Clerk {
}

pub fn search(&mut self, query: &str) -> String {
let mut req = IndexReq::new();
let mut req = SearchReq::new();
req.set_client_id(self.client_id);
req.set_req_type(ReqType::Search);
req.set_seq(self.request_seq);
req.set_query(query.to_owned());
self.request_seq += 1;
Expand All @@ -270,9 +283,8 @@ impl Clerk {
}

pub fn schema(&mut self) -> String {
let mut req = IndexReq::new();
let mut req = SchemaReq::new();
req.set_client_id(self.client_id);
req.set_req_type(ReqType::Schema);
req.set_seq(self.request_seq);
self.request_seq += 1;

Expand Down
Loading