这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions src/cmd/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@ pub fn run_serve_cli(matches: &ArgMatches) -> Result<(), String> {
}
let data_directory = matches.value_of("DATA_DIRECTORY").unwrap();
let schema_file = matches.value_of("SCHEMA_FILE").unwrap();
let mut tokenizer_file = "";
if let Some(f) = matches.value_of("TOKENIZER_FILE") {
tokenizer_file = f;
}
let tokenizer_file = matches.value_of("TOKENIZER_FILE").unwrap();

let indexer_threads = matches
.value_of("INDEXER_THREADS")
Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ fn main() {
.short("T")
.long("tokenizer-file")
.value_name("TOKENIZER_FILE")
.default_value("./etc/tokenizer.json")
.takes_value(true),
)
.arg(
Expand Down
118 changes: 79 additions & 39 deletions src/server/peer.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
//use std::collections::HashMap;
// use std::collections::HashMap;
use std::sync::mpsc::{Receiver, RecvTimeoutError, SyncSender};
use std::thread;
use std::time::{Duration, Instant};

use log::*;
use raft::eraftpb::{ConfChange, Entry, EntryType, Message};
use raft::storage::MemStorage as PeerStorage;
use raft::{self, RawNode};
use raft::eraftpb::{ConfChange, ConfState, Entry, EntryType, Message, MessageType};
use raft::storage::MemStorage;

use crate::server::util;

Expand All @@ -17,21 +17,23 @@ pub enum PeerMessage {
}

pub struct Peer {
pub raft_group: RawNode<PeerStorage>,
// last_applying_idx: u64,
pub raw_node: RawNode<MemStorage>,
last_apply_index: u64,
// last_compacted_idx: u64,
conf_state: Option<ConfState>,
apply_ch: SyncSender<Entry>,
// peers_addr: HashMap<u64, (String, u32)>, // id, (host, port)
}

impl Peer {
pub fn new(id: u64, apply_ch: SyncSender<Entry>, peers: Vec<u64>) -> Peer {
let cfg = util::default_raft_config(id, peers);
let storge = PeerStorage::new();
let storge = MemStorage::new();
let peer = Peer {
raft_group: RawNode::new(&cfg, storge, vec![]).unwrap(),
// last_applying_idx: 0,
raw_node: RawNode::new(&cfg, storge, vec![]).unwrap(),
last_apply_index: 0,
// last_compacted_idx: 0,
conf_state: None,
apply_ch,
// peers_addr: HashMap::new(),
};
Expand All @@ -45,21 +47,26 @@ impl Peer {
}

fn listen_message(&mut self, sender: SyncSender<Message>, receiver: Receiver<PeerMessage>) {
debug!("start listening message");

let mut t = Instant::now();
let mut timeout = Duration::from_millis(100);
loop {
match receiver.recv_timeout(timeout) {
Ok(PeerMessage::Propose(p)) => match self.raft_group.propose(vec![], p) {
Ok(_) => (),
Err(_) => self.apply_message(Entry::new()),
Ok(PeerMessage::Propose(p)) => match self.raw_node.propose(vec![], p.clone()) {
Ok(_) => info!("proposal succeeded: {:?}", p),
Err(_) => {
warn!("proposal failed: {:?}", p);
self.apply_message(Entry::new());
}
},
Ok(PeerMessage::ConfChange(cc)) => {
match self.raft_group.propose_conf_change(vec![], cc.clone()) {
Ok(_) => (),
Err(_) => error!("conf change failed: {:?}", cc),
match self.raw_node.propose_conf_change(vec![], cc.clone()) {
Ok(_) => info!("proposed configuration change succeeded: {:?}", cc),
Err(_) => warn!("proposed configuration change failed: {:?}", cc),
}
}
Ok(PeerMessage::Message(m)) => self.raft_group.step(m).unwrap(),
Ok(PeerMessage::Message(m)) => self.raw_node.step(m).unwrap(),
Err(RecvTimeoutError::Timeout) => (),
Err(RecvTimeoutError::Disconnected) => return,
}
Expand All @@ -68,7 +75,7 @@ impl Peer {
if d >= timeout {
t = Instant::now();
timeout = Duration::from_millis(200);
self.raft_group.tick();
self.raw_node.tick();
} else {
timeout -= d;
}
Expand All @@ -77,58 +84,59 @@ impl Peer {
}
}

fn is_leader(&self) -> bool {
self.raw_node.raft.leader_id == self.raw_node.raft.id
}

fn on_ready(&mut self, sender: SyncSender<Message>) {
if !self.raft_group.has_ready() {
if !self.raw_node.has_ready() {
return;
}
let mut ready = self.raw_node.ready();

let is_leader = self.is_leader();

let mut ready = self.raft_group.ready();
let is_leader = self.raft_group.raft.leader_id == self.raft_group.raft.id;
// leader
if is_leader {
// debug!("I'm leader");
let msgs = ready.messages.drain(..);
for _msg in msgs {
Self::send_message(sender.clone(), _msg.clone());
for msg in msgs {
Self::send_message(sender.clone(), msg.clone());
}
}

if !raft::is_empty_snap(&ready.snapshot) {
self.raft_group
self.raw_node
.mut_store()
.wl()
.apply_snapshot(ready.snapshot.clone())
.unwrap()
}

if !ready.entries.is_empty() {
self.raft_group
self.raw_node
.mut_store()
.wl()
.append(&ready.entries)
.unwrap();
}

if let Some(ref hs) = ready.hs {
self.raft_group.mut_store().wl().set_hardstate(hs.clone());
self.raw_node.mut_store().wl().set_hardstate(hs.clone());
}

// not leader
if !is_leader {
// debug!("I'm follower");
let msgs = ready.messages.drain(..);
for mut _msg in msgs {
for _entry in _msg.mut_entries().iter() {
if _entry.get_entry_type() == EntryType::EntryConfChange {}
}
Self::send_message(sender.clone(), _msg.clone());
for msg in msgs {
Self::send_message(sender.clone(), msg.clone());
}
}

if let Some(committed_entries) = ready.committed_entries.take() {
let mut _last_apply_index = 0;
for entry in committed_entries {
// Mostly, you need to save the last apply index to resume applying
// after restart. Here we just ignore this because we use a Memory storage.
_last_apply_index = entry.get_index();
self.last_apply_index = entry.get_index();

if entry.get_data().is_empty() {
// Emtpy entry, when the peer becomes Leader it will send an empty entry.
Expand All @@ -139,32 +147,64 @@ impl Peer {
EntryType::EntryNormal => self.apply_message(entry.clone()),
EntryType::EntryConfChange => {
let cc = util::parse_data(&entry.data);
debug!("config: {:?}", cc);
self.raft_group.apply_conf_change(&cc);
debug!("apply conf change");
info!("apply config change: {:?}", cc);
self.raw_node.apply_conf_change(&cc);
self.apply_message(entry.clone());

self.conf_state = Some(self.raw_node.apply_conf_change(&cc));
}
}
}

// if last_apply_index > 0 {
// info!("last apply index: {}", last_apply_index);
// let data = b"data".to_vec();
// match self.raw_node.mut_store().wl().create_snapshot(
// last_apply_index,
// conf_state,
// data,
// ) {
// Ok(_s) => (),
// Err(e) => error!("creating snapshot failed: {:?}", e),
// }
// }
}

// Advance the Raft
self.raft_group.advance(ready);
self.raw_node.advance(ready);

// let raft_applied = self.raw_node.raft.raft_log.get_applied();
// info!("raft_applied: {}", raft_applied);
// match self.raw_node.mut_store().wl().compact(raft_applied) {
// Ok(_) => (),
// Err(e) => error!("compaction failed: {:?}", e),
// }
}

fn send_message(sender: SyncSender<Message>, msg: Message) {
thread::spawn(move || {
// for entry in msg.mut_entries().iter() {
// debug!("leader: {:?}", entry);
// }
match msg.msg_type {
MessageType::MsgHeartbeat => debug!("send message: {:?}", msg),
MessageType::MsgHeartbeatResponse => debug!("send message: {:?}", msg),
_ => info!("send message: {:?}", msg),
}

sender.send(msg).unwrap_or_else(|e| {
panic!("raft send message error: {}", e);
panic!("error sending message: {:?}", e);
});
});
}

fn apply_message(&self, entry: Entry) {
let sender = self.apply_ch.clone();
thread::spawn(move || {
info!("apply entry: {:?}", entry);

sender.send(entry).unwrap_or_else(|e| {
panic!("raft send apply entry error: {}", e);
panic!("error sending apply entry: {:?}", e);
});
});
}
Expand Down
Loading