diff --git a/Cargo.lock b/Cargo.lock index e5e63f9..bd7d054 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -497,7 +497,6 @@ checksum = "b41b7ea54a0c9d92199de89e20e58d49f02f8e699814ef3fdf266f6f748d15c7" name = "bayard" version = "0.8.0" dependencies = [ - "actix-rt", "bayard-client", "bayard-common", "bayard-proto", @@ -506,10 +505,12 @@ dependencies = [ "crossbeam-channel", "futures 0.1.29", "grpcio", + "hyper", "log", "num_cpus", "raft", "serde_json", + "tokio", ] [[package]] @@ -544,6 +545,7 @@ dependencies = [ "crossbeam-channel", "ctrlc", "env_logger", + "tokio", ] [[package]] @@ -584,14 +586,13 @@ dependencies = [ name = "bayard-server" version = "0.8.0" dependencies = [ - "actix-server", - "actix-web", "async-std", "bayard-proto", "bincode", "cang-jie", "futures 0.1.29", "grpcio", + "hyper", "jieba-rs", "lazy_static", "lindera-tantivy", @@ -1371,6 +1372,16 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-body" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13d5ff830006f7646652e057693569bfe0d51760c0085a071769d142a205111b" +dependencies = [ + "bytes", + "http", +] + [[package]] name = "httparse" version = "1.3.4" @@ -1386,6 +1397,30 @@ dependencies = [ "quick-error", ] +[[package]] +name = "hyper" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96816e1d921eca64d208a85aab4f7798455a8e34229ee5a88c935bdee1b78b14" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "httparse", + "itoa", + "log", + "net2", + "pin-project", + "time", + "tokio", + "tower-service", + "want", +] + [[package]] name = "idna" version = "0.2.0" @@ -2736,9 +2771,21 @@ dependencies = [ "pin-project-lite", "signal-hook-registry", "slab", + "tokio-macros", "winapi 0.3.8", ] +[[package]] +name = "tokio-macros" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0c3acc6aa564495a0f2e1d59fab677cd7f81a19994cfc7f3ad0e64301560389" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tokio-rustls" version = "0.12.2" @@ -2779,6 +2826,12 @@ dependencies = [ "tokio", ] +[[package]] +name = "tower-service" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e987b6bf443f4b5b3b6f38704195592cca41c5bb7aedd3c3693c7081f8289860" + [[package]] name = "trust-dns-proto" version = "0.18.0-alpha.2" @@ -2818,6 +2871,12 @@ dependencies = [ "trust-dns-proto", ] +[[package]] +name = "try-lock" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e604eb7b43c06650e854be16a2a03155743d3752dd1c943f6829e26b7a36e382" + [[package]] name = "unicode-bidi" version = "0.3.4" @@ -2916,6 +2975,16 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "want" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" +dependencies = [ + "log", + "try-lock", +] + [[package]] name = "wasi" version = "0.9.0+wasi-snapshot-preview1" diff --git a/Makefile b/Makefile index e60db8d..a42476b 100644 --- a/Makefile +++ b/Makefile @@ -56,29 +56,29 @@ ifeq ($(shell cargo show --json bayard | jq -r '.versions[].num' | grep $(BAYARD endif docker-build: -ifeq ($(shell curl 'https://registry.hub.docker.com/v2/repositories/bayardsearch/bayard/tags' | jq -r '."results"[]["name"]' | grep $(BAYARD_VERSION)),) +ifeq ($(shell curl -s 'https://registry.hub.docker.com/v2/repositories/bayardsearch/bayard/tags' | jq -r '."results"[]["name"]' | grep $(BAYARD_VERSION)),) docker build --tag=bayardsearch/bayard:latest --file=bayard.dockerfile --build-arg="BAYARD_VERSION=$(BAYARD_VERSION)" . docker tag bayardsearch/bayard:latest bayardsearch/bayard:$(BAYARD_VERSION) endif -ifeq ($(shell curl 'https://registry.hub.docker.com/v2/repositories/bayardsearch/bayard-rest/tags' | jq -r '."results"[]["name"]' | grep $(BAYARD_REST_VERSION)),) +ifeq ($(shell curl -s 'https://registry.hub.docker.com/v2/repositories/bayardsearch/bayard-rest/tags' | jq -r '."results"[]["name"]' | grep $(BAYARD_REST_VERSION)),) docker build --tag=bayardsearch/bayard-rest:latest --file=bayard-rest.dockerfile --build-arg="BAYARD_REST_VERSION=$(BAYARD_REST_VERSION)" . docker tag bayardsearch/bayard-rest:latest bayardsearch/bayard-rest:$(BAYARD_REST_VERSION) endif -ifeq ($(shell curl 'https://registry.hub.docker.com/v2/repositories/bayardsearch/bayard-cli/tags' | jq -r '."results"[]["name"]' | grep $(BAYARD_CLI_VERSION)),) +ifeq ($(shell curl -s 'https://registry.hub.docker.com/v2/repositories/bayardsearch/bayard-cli/tags' | jq -r '."results"[]["name"]' | grep $(BAYARD_CLI_VERSION)),) docker build --tag=bayardsearch/bayard-cli:latest --file=bayard-cli.dockerfile --build-arg="BAYARD_CLI_VERSION=$(BAYARD_CLI_VERSION)" . docker tag bayardsearch/bayard-cli:latest bayardsearch/bayard-cli:$(BAYARD_CLI_VERSION) endif docker-push: -ifeq ($(shell curl 'https://registry.hub.docker.com/v2/repositories/bayardsearch/bayard/tags' | jq -r '."results"[]["name"]' | grep $(BAYARD_VERSION)),) +ifeq ($(shell curl -s 'https://registry.hub.docker.com/v2/repositories/bayardsearch/bayard/tags' | jq -r '."results"[]["name"]' | grep $(BAYARD_VERSION)),) docker push bayardsearch/bayard:latest docker push bayardsearch/bayard:$(BAYARD_VERSION) endif -ifeq ($(shell curl 'https://registry.hub.docker.com/v2/repositories/bayardsearch/bayard-rest/tags' | jq -r '."results"[]["name"]' | grep $(BAYARD_REST_VERSION)),) +ifeq ($(shell curl -s 'https://registry.hub.docker.com/v2/repositories/bayardsearch/bayard-rest/tags' | jq -r '."results"[]["name"]' | grep $(BAYARD_REST_VERSION)),) docker push bayardsearch/bayard-rest:latest docker push bayardsearch/bayard-rest:$(BAYARD_REST_VERSION) endif -ifeq ($(shell curl 'https://registry.hub.docker.com/v2/repositories/bayardsearch/bayard-cli/tags' | jq -r '."results"[]["name"]' | grep $(BAYARD_CLI_VERSION)),) +ifeq ($(shell curl -s 'https://registry.hub.docker.com/v2/repositories/bayardsearch/bayard-cli/tags' | jq -r '."results"[]["name"]' | grep $(BAYARD_CLI_VERSION)),) docker push bayardsearch/bayard-cli:latest docker push bayardsearch/bayard-cli:$(BAYARD_CLI_VERSION) endif diff --git a/bayard-common/Cargo.toml b/bayard-common/Cargo.toml index a7cb3ca..e848033 100644 --- a/bayard-common/Cargo.toml +++ b/bayard-common/Cargo.toml @@ -16,3 +16,4 @@ license = "MIT" crossbeam-channel = "0.4.2" ctrlc = { version = "3.1.4", features = ["termination"] } env_logger = "0.7.1" +tokio = { version = "0.2.21", features = ["signal"] } diff --git a/bayard-common/src/signal/mod.rs b/bayard-common/src/signal/mod.rs index f922ffc..8f4d8a9 100644 --- a/bayard-common/src/signal/mod.rs +++ b/bayard-common/src/signal/mod.rs @@ -1,5 +1,6 @@ use crossbeam_channel::{bounded, Receiver}; use ctrlc; +use tokio; pub fn sigterm_channel() -> Result, ctrlc::Error> { let (sender, receiver) = bounded(100); @@ -10,3 +11,9 @@ pub fn sigterm_channel() -> Result, ctrlc::Error> { Ok(receiver) } + +pub async fn shutdown_signal() { + tokio::signal::ctrl_c() + .await + .expect("failed to install CTRL+C signal handler"); +} diff --git a/bayard-server/Cargo.toml b/bayard-server/Cargo.toml index c3c674e..d6b05e5 100644 --- a/bayard-server/Cargo.toml +++ b/bayard-server/Cargo.toml @@ -13,13 +13,12 @@ categories = ["database"] license = "MIT" [dependencies] -actix-server = "1.0.2" -actix-web = "2.0.0" async-std = "1.5.0" bincode = "1.2.1" cang-jie = "0.8.0" futures = "0.1.29" grpcio = { version = "0.4.7", features = [ "secure" ] } +hyper = "0.13.5" jieba-rs = "0.5.0" lazy_static = "1.4.0" lindera-tantivy = "0.1.1" diff --git a/bayard-server/src/metric/handler.rs b/bayard-server/src/metric/handler.rs index e893162..b8a04bc 100644 --- a/bayard-server/src/metric/handler.rs +++ b/bayard-server/src/metric/handler.rs @@ -1,14 +1,22 @@ -use actix_web::{get, Error, HttpResponse}; +use hyper::{Body, Method, Request, Response, StatusCode}; use prometheus::{Encoder, TextEncoder}; -#[get("/metrics")] -pub async fn metrics() -> Result { - let metric_families = prometheus::gather(); - let mut buffer = Vec::::new(); - let encoder = TextEncoder::new(); - encoder.encode(&metric_families, &mut buffer).unwrap(); - let metrics_text = String::from_utf8(buffer.clone()).unwrap(); +pub async fn handle(req: Request) -> Result, hyper::Error> { + let mut response = Response::new(Body::empty()); + match (req.method(), req.uri().path()) { + (&Method::GET, "/metrics") => { + let metric_families = prometheus::gather(); + let mut buffer = Vec::::new(); + let encoder = TextEncoder::new(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + let metrics_text = String::from_utf8(buffer.clone()).unwrap(); - let res = HttpResponse::Ok().body(metrics_text); - Ok(res) + *response.status_mut() = StatusCode::OK; + *response.body_mut() = Body::from(metrics_text); + } + _ => { + *response.status_mut() = StatusCode::NOT_FOUND; + } + }; + Ok(response) } diff --git a/bayard-server/src/metric/mod.rs b/bayard-server/src/metric/mod.rs index 918a0f0..062ae9d 100644 --- a/bayard-server/src/metric/mod.rs +++ b/bayard-server/src/metric/mod.rs @@ -1,2 +1 @@ pub mod handler; -pub mod server; diff --git a/bayard-server/src/metric/server.rs b/bayard-server/src/metric/server.rs deleted file mode 100644 index c5ffb4b..0000000 --- a/bayard-server/src/metric/server.rs +++ /dev/null @@ -1,32 +0,0 @@ -use std::io; - -use actix_server::Server; -use actix_web::{middleware, App, HttpServer}; - -use crate::metric::handler::metrics; - -pub struct MetricsServer { - server: Server, -} - -impl MetricsServer { - pub fn new(address: &str, worker_num: usize) -> MetricsServer { - let server = HttpServer::new(move || { - App::new() - .wrap(middleware::DefaultHeaders::new().header("X-Version", "0.2")) - .wrap(middleware::Compress::default()) - .wrap(middleware::Logger::default()) - .service(metrics) - }) - .bind(address) - .unwrap() - .workers(worker_num) - .run(); - - MetricsServer { server } - } - - pub async fn shutdown(&mut self) -> io::Result<()> { - Ok(self.server.stop(true).await) - } -} diff --git a/bayard/Cargo.toml b/bayard/Cargo.toml index 45724e6..15582c7 100644 --- a/bayard/Cargo.toml +++ b/bayard/Cargo.toml @@ -17,15 +17,16 @@ name = "bayard" path = "src/main.rs" [dependencies] -actix-rt = "1.1.1" clap = "2.33.0" crossbeam-channel = "0.4.2" futures = "0.1.29" grpcio = { version = "0.4.7", features = ["secure"] } +hyper = "0.13.5" log = "0.4.8" num_cpus = "1.13.0" raft = "0.4.3" serde_json = "1.0.51" +tokio = { version = "0.2.21", features = ["macros"] } bayard-proto = "0.8.0" diff --git a/bayard/src/main.rs b/bayard/src/main.rs index 94f4827..19b3114 100644 --- a/bayard/src/main.rs +++ b/bayard/src/main.rs @@ -2,26 +2,30 @@ extern crate clap; use std::collections::HashMap; +use std::convert::TryFrom; +use std::io; +use std::net::SocketAddr; use std::path::Path; use std::sync::Arc; use bayard_proto::proto::{indexpb_grpc, raftpb_grpc}; use clap::{App, AppSettings, Arg}; -use crossbeam_channel::select; use futures::Future; use grpcio::{Environment, ServerBuilder}; +use hyper::service::{make_service_fn, service_fn}; +use hyper::Server; use log::*; use raft::storage::MemStorage; use bayard_client::raft::client::RaftClient; use bayard_common::log::set_logger; -use bayard_common::signal::sigterm_channel; +use bayard_common::signal::shutdown_signal; use bayard_server::index::server::IndexServer; -use bayard_server::metric::server::MetricsServer; +use bayard_server::metric::handler::handle; use bayard_server::raft::config::NodeAddress; -#[actix_rt::main] -async fn main() -> std::io::Result<()> { +#[tokio::main] +async fn main() -> Result<(), Box> { set_logger(); let threads = format!("{}", num_cpus::get().to_owned()); @@ -128,15 +132,6 @@ async fn main() -> std::io::Result<()> { .value_name("INDEXER_MEMORY_SIZE") .default_value("1000000000") .takes_value(true), - ) - .arg( - Arg::with_name("HTTP_WORKER_THREADS") - .help("Number of HTTP worker threads. By default http server uses number of available logical cpu as threads count.") - .short("w") - .long("http-worker-threads") - .value_name("HTTP_WORKER_THREADS") - .default_value(&threads) - .takes_value(true), ); let matches = app.get_matches(); @@ -175,15 +170,9 @@ async fn main() -> std::io::Result<()> { .unwrap() .parse::() .unwrap(); - let http_worker_threads = matches - .value_of("HTTP_WORKER_THREADS") - .unwrap() - .parse::() - .unwrap(); let raft_address = format!("{}:{}", host, raft_port); let index_address = format!("{}:{}", host, index_port); - let metrics_address = format!("{}:{}", host, metrics_port); let node_address = NodeAddress { index_address, @@ -197,7 +186,7 @@ async fn main() -> std::io::Result<()> { let mut client = RaftClient::new(peer_address); match client.join(id, node_address.clone()) { Ok(_addresses) => addresses = _addresses, - Err(e) => return Err(e), + Err(e) => return Err(Box::try_from(e).unwrap()), }; } @@ -249,21 +238,13 @@ async fn main() -> std::io::Result<()> { } // metrics service - let mut metrics_server = MetricsServer::new(metrics_address.as_str(), http_worker_threads); - info!("start metrics service on {}", metrics_address.as_str()); - - // Wait for signals for termination (SIGINT, SIGTERM). - let sigterm_receiver = sigterm_channel().unwrap(); - loop { - select! { - recv(sigterm_receiver) -> _ => { - info!("receive signal"); - break; - } - } - } + let metrics_address: SocketAddr = format!("{}:{}", host, metrics_port).parse().unwrap(); + let metrics_service = make_service_fn(|_| async { Ok::<_, io::Error>(service_fn(handle)) }); + let metrics_server = Server::bind(&metrics_address).serve(metrics_service); + let metrics_server_graceful = metrics_server.with_graceful_shutdown(shutdown_signal()); + info!("start metrics service on {}:{}", host, metrics_port); - match metrics_server.shutdown().await { + match metrics_server_graceful.await { Ok(_) => { info!("stop metrics service on {}:{}", host, metrics_port); }