From b911ef2afafa865a253011d3d9217077c9e3b9bb Mon Sep 17 00:00:00 2001 From: Minoru Osuka Date: Tue, 30 Jul 2019 16:36:10 +0900 Subject: [PATCH] Change protobuf --- cmd/blast/main.go | 4 +- cmd/blast/manager_cluster_watch.go | 14 +- cmd/blast/manager_start.go | 23 +- dispatcher/grpc_service.go | 94 +- go.mod | 4 +- go.sum | 8 +- hashutils/hashutils.go | 32 + indexer/grpc_service.go | 98 +- manager/grpc_client.go | 65 +- manager/grpc_service.go | 204 ++-- manager/raft_fsm.go | 69 +- manager/raft_fsm_test.go | 213 ++-- manager/raft_server.go | 115 +- manager/server.go | 47 +- manager/server_test.go | 1636 +++++++++++++++----------- protobuf/management/management.pb.go | 294 +++-- protobuf/management/management.proto | 22 +- 17 files changed, 1654 insertions(+), 1288 deletions(-) create mode 100644 hashutils/hashutils.go diff --git a/cmd/blast/main.go b/cmd/blast/main.go index 42adee0..1a04817 100644 --- a/cmd/blast/main.go +++ b/cmd/blast/main.go @@ -289,7 +289,7 @@ func main() { //}, cli.StringFlag{ Name: "grpc-address", - Value: "", + Value: ":5100", Usage: "The gRPC address of the node for which to retrieve the node information", }, }, @@ -321,7 +321,7 @@ func main() { //}, cli.StringFlag{ Name: "grpc-address", - Value: "", + Value: ":5100", Usage: "The gRPC address of the node for which to retrieve the node information", }, }, diff --git a/cmd/blast/manager_cluster_watch.go b/cmd/blast/manager_cluster_watch.go index 8bef44b..0e74722 100644 --- a/cmd/blast/manager_cluster_watch.go +++ b/cmd/blast/manager_cluster_watch.go @@ -16,14 +16,12 @@ package main import ( "encoding/json" - "errors" "fmt" "io" "log" "os" "github.com/mosuka/blast/manager" - "github.com/mosuka/blast/protobuf" "github.com/urfave/cli" ) @@ -61,17 +59,7 @@ func managerClusterWatch(c *cli.Context) error { break } - cluster, err := protobuf.MarshalAny(resp.Cluster) - if err != nil { - return err - } - if cluster == nil { - return errors.New("nil") - } - - var clusterBytes []byte - clusterMap := *cluster.(*map[string]interface{}) - clusterBytes, err = json.MarshalIndent(clusterMap, "", " ") + clusterBytes, err := json.MarshalIndent(resp.Cluster, "", " ") if err != nil { return err } diff --git a/cmd/blast/manager_start.go b/cmd/blast/manager_start.go index e53a707..af0be52 100644 --- a/cmd/blast/manager_start.go +++ b/cmd/blast/manager_start.go @@ -24,6 +24,7 @@ import ( "github.com/mosuka/blast/indexutils" "github.com/mosuka/blast/logutils" "github.com/mosuka/blast/manager" + "github.com/mosuka/blast/protobuf/management" "github.com/urfave/cli" ) @@ -91,20 +92,12 @@ func managerStart(c *cli.Context) error { httpLogCompress, ) - // create cluster config - clusterConfig := config.DefaultClusterConfig() - if peerGrpcAddr != "" { - clusterConfig.PeerAddr = peerGrpcAddr - } - - // create node config - nodeConfig := &config.NodeConfig{ - NodeId: nodeId, - BindAddr: nodeAddr, - GRPCAddr: grpcAddr, - HTTPAddr: httpAddr, - DataDir: dataDir, - RaftStorageType: raftStorageType, + node := &management.Node{ + BindAddress: nodeAddr, + Metadata: &management.Metadata{ + GrpcAddress: grpcAddr, + HttpAddress: httpAddr, + }, } var err error @@ -127,7 +120,7 @@ func managerStart(c *cli.Context) error { IndexStorageType: indexStorageType, } - svr, err := manager.NewServer(clusterConfig, nodeConfig, indexConfig, logger.Named(nodeId), grpcLogger.Named(nodeId), httpLogger) + svr, err := manager.NewServer(peerGrpcAddr, nodeId, node, dataDir, raftStorageType, indexConfig, logger.Named(nodeId), grpcLogger.Named(nodeId), httpLogger) if err != nil { return err } diff --git a/dispatcher/grpc_service.go b/dispatcher/grpc_service.go index df7d1ad..39152ff 100644 --- a/dispatcher/grpc_service.go +++ b/dispatcher/grpc_service.go @@ -35,6 +35,7 @@ import ( "github.com/mosuka/blast/manager" "github.com/mosuka/blast/protobuf" "github.com/mosuka/blast/protobuf/distribute" + "github.com/mosuka/blast/protobuf/management" "github.com/mosuka/blast/sortutils" "go.uber.org/zap" "google.golang.org/grpc/codes" @@ -45,7 +46,7 @@ type GRPCService struct { managerAddr string logger *zap.Logger - managers map[string]interface{} + managers *management.Cluster managerClients map[string]*manager.GRPCClient updateManagersStopCh chan struct{} updateManagersDoneCh chan struct{} @@ -61,7 +62,7 @@ func NewGRPCService(managerAddr string, logger *zap.Logger) (*GRPCService, error managerAddr: managerAddr, logger: logger, - managers: make(map[string]interface{}, 0), + managers: &management.Cluster{Nodes: make(map[string]*management.Node, 0)}, managerClients: make(map[string]*manager.GRPCClient, 0), indexers: make(map[string]interface{}, 0), @@ -92,20 +93,14 @@ func (s *GRPCService) Stop() error { func (s *GRPCService) getManagerClient() (*manager.GRPCClient, error) { var client *manager.GRPCClient - for id, node := range s.managers { - nm, ok := node.(map[string]interface{}) - if !ok { - s.logger.Warn("assertion failed", zap.String("id", id)) - continue - } - - state, ok := nm["state"].(string) - if !ok { - s.logger.Warn("missing state", zap.String("id", id), zap.String("state", state)) + for id, node := range s.managers.Nodes { + if node.Metadata == nil { + s.logger.Warn("missing metadata", zap.String("id", id)) continue } - if state == raft.Leader.String() || state == raft.Follower.String() { + if node.Status == raft.Leader.String() || node.Status == raft.Follower.String() { + var ok bool client, ok = s.managerClients[id] if ok { return client, nil @@ -113,7 +108,7 @@ func (s *GRPCService) getManagerClient() (*manager.GRPCClient, error) { s.logger.Error("node does not exist", zap.String("id", id)) } } else { - s.logger.Debug("node has not available", zap.String("id", id), zap.String("state", state)) + s.logger.Debug("node has not available", zap.String("id", id), zap.String("state", node.Status)) } } @@ -123,7 +118,7 @@ func (s *GRPCService) getManagerClient() (*manager.GRPCClient, error) { return nil, err } -func (s *GRPCService) getInitialManagers(managerAddr string) (map[string]interface{}, error) { +func (s *GRPCService) getInitialManagers(managerAddr string) (*management.Cluster, error) { client, err := manager.NewGRPCClient(s.managerAddr) defer func() { err := client.Close() @@ -165,29 +160,21 @@ func (s *GRPCService) startUpdateManagers(checkInterval time.Duration) { s.logger.Debug("initialize manager list", zap.Any("managers", s.managers)) // create clients for managers - for nodeId, node := range s.managers { - nm, ok := node.(map[string]interface{}) - if !ok { - s.logger.Warn("assertion failed", zap.String("node_id", nodeId)) + for nodeId, node := range s.managers.Nodes { + if node.Metadata == nil { + s.logger.Warn("missing metadata", zap.String("node_id", nodeId)) continue } - nodeConfig, ok := nm["node_config"].(map[string]interface{}) - if !ok { - s.logger.Warn("missing metadata", zap.String("node_id", nodeId), zap.Any("node_config", nodeConfig)) - continue - } - - grpcAddr, ok := nodeConfig["grpc_addr"].(string) - if !ok { - s.logger.Warn("missing gRPC address", zap.String("node_id", nodeId), zap.String("grpc_addr", grpcAddr)) + if node.Metadata.GrpcAddress == "" { + s.logger.Warn("missing gRPC address", zap.String("node_id", nodeId), zap.String("grpc_addr", node.Metadata.GrpcAddress)) continue } - s.logger.Debug("create gRPC client", zap.String("node_id", nodeId), zap.String("grpc_addr", grpcAddr)) - client, err := manager.NewGRPCClient(grpcAddr) + s.logger.Debug("create gRPC client", zap.String("node_id", nodeId), zap.String("grpc_addr", node.Metadata.GrpcAddress)) + client, err := manager.NewGRPCClient(node.Metadata.GrpcAddress) if err != nil { - s.logger.Error(err.Error(), zap.String("node_id", nodeId), zap.String("grpc_addr", grpcAddr)) + s.logger.Error(err.Error(), zap.String("node_id", nodeId), zap.String("grpc_addr", node.Metadata.GrpcAddress)) } if client != nil { s.managerClients[nodeId] = client @@ -223,31 +210,18 @@ func (s *GRPCService) startUpdateManagers(checkInterval time.Duration) { s.logger.Error(err.Error()) continue } - - // get current manager cluster - managersIntr, err := protobuf.MarshalAny(resp.Cluster) - if err != nil { - s.logger.Error(err.Error()) - continue - } - if managersIntr == nil { - s.logger.Error(err.Error()) - continue - } - managers := *managersIntr.(*map[string]interface{}) + managers := resp.Cluster if !reflect.DeepEqual(s.managers, managers) { // open clients - for nodeId, metadata := range managers { - mm, ok := metadata.(map[string]interface{}) - if !ok { - s.logger.Warn("assertion failed", zap.String("node_id", nodeId)) + for nodeId, node := range managers.Nodes { + if node.Metadata == nil { + s.logger.Warn("missing metadata", zap.String("node_id", nodeId)) continue } - grpcAddr, ok := mm["grpc_addr"].(string) - if !ok { - s.logger.Warn("missing metadata", zap.String("node_id", nodeId), zap.String("grpc_addr", grpcAddr)) + if node.Metadata.GrpcAddress == "" { + s.logger.Warn("missing gRPC address", zap.String("node_id", nodeId), zap.String("grpc_addr", node.Metadata.GrpcAddress)) continue } @@ -255,9 +229,9 @@ func (s *GRPCService) startUpdateManagers(checkInterval time.Duration) { if exist { s.logger.Debug("client has already exist in manager list", zap.String("node_id", nodeId)) - if client.GetAddress() != grpcAddr { - s.logger.Debug("gRPC address has been changed", zap.String("node_id", nodeId), zap.String("client_grpc_addr", client.GetAddress()), zap.String("grpc_addr", grpcAddr)) - s.logger.Debug("recreate gRPC client", zap.String("node_id", nodeId), zap.String("grpc_addr", grpcAddr)) + if client.GetAddress() != node.Metadata.GrpcAddress { + s.logger.Debug("gRPC address has been changed", zap.String("node_id", nodeId), zap.String("client_grpc_addr", client.GetAddress()), zap.String("grpc_addr", node.Metadata.GrpcAddress)) + s.logger.Debug("recreate gRPC client", zap.String("node_id", nodeId), zap.String("grpc_addr", node.Metadata.GrpcAddress)) delete(s.managerClients, nodeId) @@ -266,24 +240,24 @@ func (s *GRPCService) startUpdateManagers(checkInterval time.Duration) { s.logger.Error(err.Error(), zap.String("node_id", nodeId)) } - newClient, err := manager.NewGRPCClient(grpcAddr) + newClient, err := manager.NewGRPCClient(node.Metadata.GrpcAddress) if err != nil { - s.logger.Error(err.Error(), zap.String("node_id", nodeId), zap.String("grpc_addr", grpcAddr)) + s.logger.Error(err.Error(), zap.String("node_id", nodeId), zap.String("grpc_addr", node.Metadata.GrpcAddress)) } if newClient != nil { s.managerClients[nodeId] = newClient } } else { - s.logger.Debug("gRPC address has not changed", zap.String("node_id", nodeId), zap.String("client_grpc_addr", client.GetAddress()), zap.String("grpc_addr", grpcAddr)) + s.logger.Debug("gRPC address has not changed", zap.String("node_id", nodeId), zap.String("client_grpc_addr", client.GetAddress()), zap.String("grpc_addr", node.Metadata.GrpcAddress)) } } else { s.logger.Debug("client does not exist in peer list", zap.String("node_id", nodeId)) - s.logger.Debug("create gRPC client", zap.String("node_id", nodeId), zap.String("grpc_addr", grpcAddr)) - newClient, err := manager.NewGRPCClient(grpcAddr) + s.logger.Debug("create gRPC client", zap.String("node_id", nodeId), zap.String("grpc_addr", node.Metadata.GrpcAddress)) + newClient, err := manager.NewGRPCClient(node.Metadata.GrpcAddress) if err != nil { - s.logger.Error(err.Error(), zap.String("node_id", nodeId), zap.String("grpc_addr", grpcAddr)) + s.logger.Error(err.Error(), zap.String("node_id", nodeId), zap.String("grpc_addr", node.Metadata.GrpcAddress)) } if newClient != nil { s.managerClients[nodeId] = newClient @@ -293,7 +267,7 @@ func (s *GRPCService) startUpdateManagers(checkInterval time.Duration) { // close nonexistent clients for nodeId, client := range s.managerClients { - if nodeConfig, exist := managers[nodeId]; !exist { + if nodeConfig, exist := managers.Nodes[nodeId]; !exist { s.logger.Info("this client is no longer in use", zap.String("node_id", nodeId), zap.Any("node_config", nodeConfig)) s.logger.Debug("close client", zap.String("node_id", nodeId), zap.String("grpc_addr", client.GetAddress())) diff --git a/go.mod b/go.mod index e9874b1..c987c7c 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module github.com/mosuka/blast go 1.12 require ( - github.com/armon/gomdb v0.0.0-20180202201627-75f545a47e89 // indirect github.com/blevesearch/bleve v0.7.0 github.com/blevesearch/blevex v0.0.0-20180227211930-4b158bb555a3 // indirect github.com/blevesearch/cld2 v0.0.0-20150916130542-10f17c049ec9 // indirect @@ -18,13 +17,13 @@ require ( github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect github.com/facebookgo/subset v0.0.0-20150612182917-8dac2c3c4870 // indirect github.com/golang/protobuf v1.3.1 + github.com/google/go-cmp v0.3.0 github.com/gorilla/mux v1.7.0 github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/hashicorp/golang-lru v0.5.1 // indirect github.com/hashicorp/raft v1.1.0 github.com/hashicorp/raft-boltdb v0.0.0-20190605210249-ef2e128ed477 - github.com/hashicorp/raft-mdb v0.0.0-20180824152511-9ee9663b6ffa github.com/ikawaha/kagome.ipadic v1.0.1 // indirect github.com/imdario/mergo v0.3.7 github.com/jmhodges/levigo v1.0.0 // indirect @@ -38,7 +37,6 @@ require ( github.com/prometheus/procfs v0.0.0-20190322151404-55ae3d9d5573 // indirect github.com/remyoudompheng/bigfft v0.0.0-20190321074620-2f0d2b0e0001 // indirect github.com/stretchr/objx v0.1.1 - github.com/stretchr/testify v1.3.0 github.com/syndtr/goleveldb v1.0.0 // indirect github.com/tebeka/snowball v0.0.0-20130405174319-16e884df4e19 // indirect github.com/tecbot/gorocksdb v0.0.0-20181010114359-8752a9433481 // indirect diff --git a/go.sum b/go.sum index a38a8a7..91ed1f7 100644 --- a/go.sum +++ b/go.sum @@ -15,8 +15,6 @@ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 h1:EFSB7Zo9Eg91v7MJPVsifUysc/wPdN+NOnVe6bWbdBM= github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg= -github.com/armon/gomdb v0.0.0-20180202201627-75f545a47e89 h1:A1SPjPcl2LdF2Skv9Zt41jWu4XYQAyvBDzrveQjlkhQ= -github.com/armon/gomdb v0.0.0-20180202201627-75f545a47e89/go.mod h1:wSblbytRgcqD+U+gGCKz5145DyjUYPh5fqh2uyXxfZw= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/blevesearch/bleve v0.7.0 h1:znyZ3zjsh2Scr60vszs7rbF29TU6i1q9bfnZf1vh0Ac= @@ -84,6 +82,8 @@ github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/mux v1.7.0 h1:tOSd0UKHQd6urX6ApfOn4XdBMY6Sh1MfxV3kmaazO+U= @@ -113,8 +113,6 @@ github.com/hashicorp/raft v1.1.0 h1:qPMePEczgbkiQsqCsRfuHRqvDUO+zmAInDaD5ptXlq0= github.com/hashicorp/raft v1.1.0/go.mod h1:4Ak7FSPnuvmb0GV6vgIAJ4vYT4bek9bb6Q+7HVbyzqM= github.com/hashicorp/raft-boltdb v0.0.0-20190605210249-ef2e128ed477 h1:bLsrEmB2NUwkHH18FOJBIa04wOV2RQalJrcafTYu6Lg= github.com/hashicorp/raft-boltdb v0.0.0-20190605210249-ef2e128ed477/go.mod h1:aUF6HQr8+t3FC/ZHAC+pZreUBhTaxumuu3L+d37uRxk= -github.com/hashicorp/raft-mdb v0.0.0-20180824152511-9ee9663b6ffa h1:ccwcWyXHTaonH6yzx+t/3p9aNm/ogSTfd6YobZOtHmE= -github.com/hashicorp/raft-mdb v0.0.0-20180824152511-9ee9663b6ffa/go.mod h1:ooP3NrrH0GG/sVjF9pbRvhF6nVHRR4mkkwscLqReN1o= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ikawaha/kagome.ipadic v1.0.1 h1:4c/tx3Rga6LvtTouEdvodcfeWWTttATZg8XIH8lRHG4= @@ -231,8 +229,6 @@ golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190316082340-a2f829d7f35f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190329044733-9eb1bfa1ce65 h1:hOY+O8MxdkPV10pNf7/XEHaySCiPKxixMKUshfHsGn0= -golang.org/x/sys v0.0.0-20190329044733-9eb1bfa1ce65/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190602015325-4c4f7f33c9ed h1:uPxWBzB3+mlnjy9W58qY1j/cjyFjutgw/Vhan2zLy/A= golang.org/x/sys v0.0.0-20190602015325-4c4f7f33c9ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= diff --git a/hashutils/hashutils.go b/hashutils/hashutils.go new file mode 100644 index 0000000..2ac1911 --- /dev/null +++ b/hashutils/hashutils.go @@ -0,0 +1,32 @@ +// Copyright (c) 2019 Minoru Osuka +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package hashutils + +import ( + "crypto/sha256" + "encoding/hex" + "encoding/json" +) + +func Hash(v interface{}) (string, error) { + b, err := json.Marshal(v) + if err != nil { + return "", err + } + + hb := sha256.Sum256(b) + + return hex.EncodeToString(hb[:]), nil +} diff --git a/indexer/grpc_service.go b/indexer/grpc_service.go index 20971a7..967572d 100644 --- a/indexer/grpc_service.go +++ b/indexer/grpc_service.go @@ -33,14 +33,13 @@ import ( "github.com/mosuka/blast/manager" "github.com/mosuka/blast/protobuf" "github.com/mosuka/blast/protobuf/index" + "github.com/mosuka/blast/protobuf/management" "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) type GRPCService struct { - //*grpc.Service - clusterConfig *config.ClusterConfig raftServer *RaftServer logger *zap.Logger @@ -53,7 +52,7 @@ type GRPCService struct { clusterChans map[chan index.GetClusterResponse]struct{} clusterMutex sync.RWMutex - managers map[string]interface{} + managers *management.Cluster managerClients map[string]*manager.GRPCClient updateManagersStopCh chan struct{} updateManagersDoneCh chan struct{} @@ -70,7 +69,7 @@ func NewGRPCService(clusterConfig *config.ClusterConfig, raftServer *RaftServer, cluster: make(map[string]interface{}, 0), clusterChans: make(map[chan index.GetClusterResponse]struct{}), - managers: make(map[string]interface{}, 0), + managers: &management.Cluster{Nodes: make(map[string]*management.Node, 0)}, managerClients: make(map[string]*manager.GRPCClient, 0), }, nil } @@ -102,20 +101,14 @@ func (s *GRPCService) Stop() error { func (s *GRPCService) getManagerClient() (*manager.GRPCClient, error) { var client *manager.GRPCClient - for id, node := range s.managers { - nm, ok := node.(map[string]interface{}) - if !ok { + for id, node := range s.managers.Nodes { + if node.Metadata == nil { s.logger.Warn("assertion failed", zap.String("id", id)) continue } - state, ok := nm["state"].(string) - if !ok { - s.logger.Warn("missing state", zap.String("id", id), zap.String("state", state)) - continue - } - - if state == raft.Leader.String() || state == raft.Follower.String() { + if node.Status == raft.Leader.String() || node.Status == raft.Follower.String() { + var ok bool client, ok = s.managerClients[id] if ok { return client, nil @@ -123,7 +116,7 @@ func (s *GRPCService) getManagerClient() (*manager.GRPCClient, error) { s.logger.Error("node does not exist", zap.String("id", id)) } } else { - s.logger.Debug("node has not available", zap.String("id", id), zap.String("state", state)) + s.logger.Debug("node has not available", zap.String("id", id), zap.String("state", node.Status)) } } @@ -133,7 +126,7 @@ func (s *GRPCService) getManagerClient() (*manager.GRPCClient, error) { return nil, err } -func (s *GRPCService) getInitialManagers(managerAddr string) (map[string]interface{}, error) { +func (s *GRPCService) getInitialManagers(managerAddr string) (*management.Cluster, error) { client, err := manager.NewGRPCClient(managerAddr) defer func() { err := client.Close() @@ -175,29 +168,21 @@ func (s *GRPCService) startUpdateManagers(checkInterval time.Duration) { s.logger.Debug("initialize manager list", zap.Any("managers", s.managers)) // create clients for managers - for nodeId, node := range s.managers { - nm, ok := node.(map[string]interface{}) - if !ok { - s.logger.Warn("assertion failed", zap.String("id", nodeId)) + for nodeId, node := range s.managers.Nodes { + if node.Metadata == nil { + s.logger.Warn("missing metadata", zap.String("id", nodeId)) continue } - nodeConfig, ok := nm["node_config"].(map[string]interface{}) - if !ok { - s.logger.Warn("missing metadata", zap.String("id", nodeId), zap.Any("node_config", nodeConfig)) - continue - } - - grpcAddr, ok := nodeConfig["grpc_addr"].(string) - if !ok { - s.logger.Warn("missing gRPC address", zap.String("id", nodeId), zap.String("grpc_addr", grpcAddr)) + if node.Metadata.GrpcAddress == "" { + s.logger.Warn("missing gRPC address", zap.String("id", nodeId), zap.String("grpc_addr", node.Metadata.GrpcAddress)) continue } - s.logger.Debug("create gRPC client", zap.String("id", nodeId), zap.String("grpc_addr", grpcAddr)) - client, err := manager.NewGRPCClient(grpcAddr) + s.logger.Debug("create gRPC client", zap.String("id", nodeId), zap.String("grpc_addr", node.Metadata.GrpcAddress)) + client, err := manager.NewGRPCClient(node.Metadata.GrpcAddress) if err != nil { - s.logger.Error(err.Error(), zap.String("id", nodeId), zap.String("grpc_addr", grpcAddr)) + s.logger.Error(err.Error(), zap.String("id", nodeId), zap.String("grpc_addr", node.Metadata.GrpcAddress)) } if client != nil { s.managerClients[nodeId] = client @@ -232,31 +217,18 @@ func (s *GRPCService) startUpdateManagers(checkInterval time.Duration) { s.logger.Error(err.Error()) continue } - - // get current manager cluster - managersIntr, err := protobuf.MarshalAny(resp.Cluster) - if err != nil { - s.logger.Error(err.Error()) - continue - } - if managersIntr == nil { - s.logger.Error(err.Error()) - continue - } - managers := *managersIntr.(*map[string]interface{}) + managers := resp.Cluster if !reflect.DeepEqual(s.managers, managers) { // open clients - for nodeId, nodeConfig := range managers { - mm, ok := nodeConfig.(map[string]interface{}) - if !ok { - s.logger.Warn("assertion failed", zap.String("node_id", nodeId)) + for nodeId, nodeConfig := range managers.Nodes { + if nodeConfig.Metadata == nil { + s.logger.Warn("missing metadata", zap.String("node_id", nodeId)) continue } - grpcAddr, ok := mm["grpc_addr"].(string) - if !ok { - s.logger.Warn("missing metadata", zap.String("node_id", nodeId), zap.String("grpc_addr", grpcAddr)) + if nodeConfig.Metadata.GrpcAddress == "" { + s.logger.Warn("missing metadata", zap.String("node_id", nodeId), zap.String("grpc_addr", nodeConfig.Metadata.GrpcAddress)) continue } @@ -264,9 +236,9 @@ func (s *GRPCService) startUpdateManagers(checkInterval time.Duration) { if exist { s.logger.Debug("client has already exist in manager list", zap.String("id", nodeId)) - if client.GetAddress() != grpcAddr { - s.logger.Debug("gRPC address has been changed", zap.String("node_id", nodeId), zap.String("client_grpc_addr", client.GetAddress()), zap.String("grpc_addr", grpcAddr)) - s.logger.Debug("recreate gRPC client", zap.String("node_id", nodeId), zap.String("grpc_addr", grpcAddr)) + if client.GetAddress() != nodeConfig.Metadata.GrpcAddress { + s.logger.Debug("gRPC address has been changed", zap.String("node_id", nodeId), zap.String("client_grpc_addr", client.GetAddress()), zap.String("grpc_addr", nodeConfig.Metadata.GrpcAddress)) + s.logger.Debug("recreate gRPC client", zap.String("node_id", nodeId), zap.String("grpc_addr", nodeConfig.Metadata.GrpcAddress)) delete(s.managerClients, nodeId) @@ -275,24 +247,24 @@ func (s *GRPCService) startUpdateManagers(checkInterval time.Duration) { s.logger.Error(err.Error(), zap.String("node_id", nodeId)) } - newClient, err := manager.NewGRPCClient(grpcAddr) + newClient, err := manager.NewGRPCClient(nodeConfig.Metadata.GrpcAddress) if err != nil { - s.logger.Error(err.Error(), zap.String("node_id", nodeId), zap.String("grpc_addr", grpcAddr)) + s.logger.Error(err.Error(), zap.String("node_id", nodeId), zap.String("grpc_addr", nodeConfig.Metadata.GrpcAddress)) } if newClient != nil { s.managerClients[nodeId] = newClient } } else { - s.logger.Debug("gRPC address has not changed", zap.String("node_id", nodeId), zap.String("client_grpc_addr", client.GetAddress()), zap.String("grpc_addr", grpcAddr)) + s.logger.Debug("gRPC address has not changed", zap.String("node_id", nodeId), zap.String("client_grpc_addr", client.GetAddress()), zap.String("grpc_addr", nodeConfig.Metadata.GrpcAddress)) } } else { s.logger.Debug("client does not exist in peer list", zap.String("node_id", nodeId)) - s.logger.Debug("create gRPC client", zap.String("node_id", nodeId), zap.String("grpc_addr", grpcAddr)) - newClient, err := manager.NewGRPCClient(grpcAddr) + s.logger.Debug("create gRPC client", zap.String("node_id", nodeId), zap.String("grpc_addr", nodeConfig.Metadata.GrpcAddress)) + newClient, err := manager.NewGRPCClient(nodeConfig.Metadata.GrpcAddress) if err != nil { - s.logger.Error(err.Error(), zap.String("node_id", nodeId), zap.String("grpc_addr", grpcAddr)) + s.logger.Error(err.Error(), zap.String("node_id", nodeId), zap.String("grpc_addr", nodeConfig.Metadata.GrpcAddress)) } if newClient != nil { s.managerClients[nodeId] = newClient @@ -302,7 +274,7 @@ func (s *GRPCService) startUpdateManagers(checkInterval time.Duration) { // close nonexistent clients for nodeId, client := range s.managerClients { - if nodeConfig, exist := managers[nodeId]; !exist { + if nodeConfig, exist := managers.Nodes[nodeId]; !exist { s.logger.Info("this client is no longer in use", zap.String("node_id", nodeId), zap.Any("node_config", nodeConfig)) s.logger.Debug("close client", zap.String("node_id", nodeId), zap.String("address", client.GetAddress())) @@ -480,6 +452,8 @@ func (s *GRPCService) startUpdateCluster(checkInterval time.Duration) { // keep current peer nodes s.logger.Debug("current peers", zap.Any("peers", peers)) s.peers = peers + } else { + s.logger.Debug("there is no change in peers", zap.Any("peers", peers)) } // notify current cluster @@ -513,6 +487,8 @@ func (s *GRPCService) startUpdateCluster(checkInterval time.Duration) { // keep current cluster s.logger.Debug("current cluster", zap.Any("cluster", cluster)) s.cluster = cluster + } else { + s.logger.Debug("there is no change in cluster", zap.Any("cluster", cluster)) } default: time.Sleep(100 * time.Millisecond) diff --git a/manager/grpc_client.go b/manager/grpc_client.go index cbb10c8..7fedf16 100644 --- a/manager/grpc_client.go +++ b/manager/grpc_client.go @@ -45,16 +45,6 @@ func NewGRPCContext() (context.Context, context.CancelFunc) { func NewGRPCClient(address string) (*GRPCClient, error) { ctx, cancel := NewGRPCContext() - //streamRetryOpts := []grpc_retry.CallOption{ - // grpc_retry.Disable(), - //} - - //unaryRetryOpts := []grpc_retry.CallOption{ - // grpc_retry.WithBackoff(grpc_retry.BackoffLinear(100 * time.Millisecond)), - // grpc_retry.WithCodes(codes.Unavailable), - // grpc_retry.WithMax(100), - //} - dialOpts := []grpc.DialOption{ grpc.WithInsecure(), grpc.WithDefaultCallOptions( @@ -95,28 +85,6 @@ func (c *GRPCClient) GetAddress() string { return c.conn.Target() } -//func (c *GRPCClient) LivenessProbe(opts ...grpc.CallOption) (string, error) { -// resp, err := c.client.LivenessProbe(c.ctx, &empty.Empty{}) -// if err != nil { -// st, _ := status.FromError(err) -// -// return management.LivenessProbeResponse_UNKNOWN.String(), errors.New(st.Message()) -// } -// -// return resp.State.String(), nil -//} - -//func (c *GRPCClient) ReadinessProbe(opts ...grpc.CallOption) (string, error) { -// resp, err := c.client.ReadinessProbe(c.ctx, &empty.Empty{}) -// if err != nil { -// st, _ := status.FromError(err) -// -// return management.ReadinessProbeResponse_UNKNOWN.String(), errors.New(st.Message()) -// } -// -// return resp.State.String(), nil -//} - func (c *GRPCClient) NodeHealthCheck(probe string, opts ...grpc.CallOption) (string, error) { req := &management.NodeHealthCheckRequest{} @@ -141,7 +109,7 @@ func (c *GRPCClient) NodeHealthCheck(probe string, opts ...grpc.CallOption) (str return resp.State.String(), nil } -func (c *GRPCClient) NodeInfo(id string, opts ...grpc.CallOption) (map[string]interface{}, error) { +func (c *GRPCClient) NodeInfo(id string, opts ...grpc.CallOption) (*management.Node, error) { req := &management.NodeInfoRequest{ Id: id, } @@ -153,30 +121,16 @@ func (c *GRPCClient) NodeInfo(id string, opts ...grpc.CallOption) (map[string]in return nil, errors.New(st.Message()) } - ins, err := protobuf.MarshalAny(resp.NodeConfig) - nodeConfig := *ins.(*map[string]interface{}) - - node := map[string]interface{}{ - "node_config": nodeConfig, - "state": resp.State, - } - - return node, nil + return resp.Node, nil } -func (c *GRPCClient) ClusterJoin(id string, nodeConfig map[string]interface{}, opts ...grpc.CallOption) error { - nodeConfigAny := &any.Any{} - err := protobuf.UnmarshalAny(nodeConfig, nodeConfigAny) - if err != nil { - return err - } - +func (c *GRPCClient) ClusterJoin(id string, node *management.Node, opts ...grpc.CallOption) error { req := &management.ClusterJoinRequest{ - Id: id, - NodeConfig: nodeConfigAny, + Id: id, + Node: node, } - _, err = c.client.ClusterJoin(c.ctx, req, opts...) + _, err := c.client.ClusterJoin(c.ctx, req, opts...) if err != nil { return err } @@ -197,7 +151,7 @@ func (c *GRPCClient) ClusterLeave(id string, opts ...grpc.CallOption) error { return nil } -func (c *GRPCClient) ClusterInfo(opts ...grpc.CallOption) (map[string]interface{}, error) { +func (c *GRPCClient) ClusterInfo(opts ...grpc.CallOption) (*management.Cluster, error) { resp, err := c.client.ClusterInfo(c.ctx, &empty.Empty{}, opts...) if err != nil { st, _ := status.FromError(err) @@ -205,10 +159,7 @@ func (c *GRPCClient) ClusterInfo(opts ...grpc.CallOption) (map[string]interface{ return nil, errors.New(st.Message()) } - ins, err := protobuf.MarshalAny(resp.Cluster) - cluster := *ins.(*map[string]interface{}) - - return cluster, nil + return resp.Cluster, nil } func (c *GRPCClient) ClusterWatch(opts ...grpc.CallOption) (management.Management_ClusterWatchClient, error) { diff --git a/manager/grpc_service.go b/manager/grpc_service.go index f9254df..67e5d66 100644 --- a/manager/grpc_service.go +++ b/manager/grpc_service.go @@ -17,15 +17,16 @@ package manager import ( "context" "errors" - "reflect" "strings" "sync" "time" "github.com/golang/protobuf/ptypes/any" "github.com/golang/protobuf/ptypes/empty" + "github.com/google/go-cmp/cmp" "github.com/hashicorp/raft" blasterrors "github.com/mosuka/blast/errors" + "github.com/mosuka/blast/hashutils" "github.com/mosuka/blast/protobuf" "github.com/mosuka/blast/protobuf/management" "go.uber.org/zap" @@ -34,16 +35,14 @@ import ( ) type GRPCService struct { - //*grpc.Service - raftServer *RaftServer logger *zap.Logger updateClusterStopCh chan struct{} updateClusterDoneCh chan struct{} - peers map[string]interface{} + peers *management.Cluster peerClients map[string]*GRPCClient - cluster map[string]interface{} + cluster *management.Cluster clusterChans map[chan management.ClusterInfoResponse]struct{} clusterMutex sync.RWMutex @@ -56,9 +55,9 @@ func NewGRPCService(raftServer *RaftServer, logger *zap.Logger) (*GRPCService, e raftServer: raftServer, logger: logger, - peers: make(map[string]interface{}, 0), + peers: &management.Cluster{Nodes: make(map[string]*management.Node, 0)}, peerClients: make(map[string]*GRPCClient, 0), - cluster: make(map[string]interface{}, 0), + cluster: &management.Cluster{Nodes: make(map[string]*management.Node, 0)}, clusterChans: make(map[chan management.ClusterInfoResponse]struct{}), stateChans: make(map[chan management.WatchResponse]struct{}), @@ -82,14 +81,15 @@ func (s *GRPCService) Stop() error { func (s *GRPCService) getLeaderClient() (*GRPCClient, error) { var client *GRPCClient - for id, node := range s.cluster { - state, ok := node.(map[string]interface{})["state"].(string) - if !ok { + for id, node := range s.cluster.Nodes { + state := node.Status + if node.Status == "" { s.logger.Warn("missing state", zap.String("id", id), zap.String("state", state)) continue } if state == raft.Leader.String() { + var ok bool client, ok = s.peerClients[id] if ok { break @@ -121,6 +121,19 @@ func (s *GRPCService) startUpdateCluster(checkInterval time.Duration) { ticker := time.NewTicker(checkInterval) defer ticker.Stop() + // create initial cluster hash + clusterHash, err := hashutils.Hash(s.cluster) + if err != nil { + s.logger.Error(err.Error()) + return + } + + peersHash, err := hashutils.Hash(s.peers) + if err != nil { + s.logger.Error(err.Error()) + return + } + for { select { case <-s.updateClusterStopCh: @@ -133,25 +146,39 @@ func (s *GRPCService) startUpdateCluster(checkInterval time.Duration) { return } + // create latest cluster hash + newClusterHash, err := hashutils.Hash(cluster) + if err != nil { + s.logger.Error(err.Error()) + return + } + // create peer node list with out self node - peers := make(map[string]interface{}, 0) - for nodeId, node := range cluster { + peers := &management.Cluster{Nodes: make(map[string]*management.Node, 0)} + for nodeId, node := range cluster.Nodes { if nodeId != s.NodeID() { - peers[nodeId] = node + peers.Nodes[nodeId] = node } } - if !reflect.DeepEqual(s.peers, peers) { + // create latest peers hash + newPeersHash, err := hashutils.Hash(peers) + if err != nil { + s.logger.Error(err.Error()) + return + } + + // compare peers hash + //if !reflect.DeepEqual(s.peers, peers) { + if !cmp.Equal(peersHash, newPeersHash) { // open clients - for nodeId, nodeInfo := range peers { - nodeConfig, ok := nodeInfo.(map[string]interface{})["node_config"].(map[string]interface{}) - if !ok { - s.logger.Warn("assertion failed", zap.String("node_id", nodeId), zap.Any("node_info", nodeInfo)) + for nodeId, nodeInfo := range peers.Nodes { + if nodeInfo.Metadata == nil { + s.logger.Warn("missing metadata", zap.String("node_id", nodeId), zap.Any("metadata", nodeInfo.Metadata)) continue } - grpcAddr, ok := nodeConfig["grpc_addr"].(string) - if !ok { - s.logger.Warn("missing metadata", zap.String("node_id", nodeId), zap.String("grpc_addr", grpcAddr)) + if nodeInfo.Metadata.GrpcAddress == "" { + s.logger.Warn("missing gRPC address", zap.String("node_id", nodeId), zap.String("grpc_addr", nodeInfo.Metadata.GrpcAddress)) continue } @@ -159,35 +186,30 @@ func (s *GRPCService) startUpdateCluster(checkInterval time.Duration) { if exist { s.logger.Debug("client has already exist in peer list", zap.String("node_id", nodeId)) - if client.GetAddress() != grpcAddr { - s.logger.Debug("gRPC address has been changed", zap.String("node_id", nodeId), zap.String("client_grpc_addr", client.GetAddress()), zap.String("grpc_addr", grpcAddr)) - s.logger.Debug("recreate gRPC client", zap.String("node_id", nodeId), zap.String("grpc_addr", grpcAddr)) - + if client.GetAddress() != nodeInfo.Metadata.GrpcAddress { + s.logger.Debug("gRPC address has been changed", zap.String("node_id", nodeId), zap.String("client_grpc_addr", client.GetAddress()), zap.String("grpc_addr", nodeInfo.Metadata.GrpcAddress)) + s.logger.Debug("recreate gRPC client", zap.String("node_id", nodeId), zap.String("grpc_addr", nodeInfo.Metadata.GrpcAddress)) delete(s.peerClients, nodeId) - err = client.Close() if err != nil { s.logger.Warn(err.Error(), zap.String("node_id", nodeId)) } - - newClient, err := NewGRPCClient(grpcAddr) + newClient, err := NewGRPCClient(nodeInfo.Metadata.GrpcAddress) if err != nil { - s.logger.Warn(err.Error(), zap.String("node_id", nodeId), zap.String("grpc_addr", grpcAddr)) + s.logger.Warn(err.Error(), zap.String("node_id", nodeId), zap.String("grpc_addr", nodeInfo.Metadata.GrpcAddress)) } - if newClient != nil { s.peerClients[nodeId] = newClient } } else { - s.logger.Debug("gRPC address has not changed", zap.String("node_id", nodeId), zap.String("client_grpc_addr", client.GetAddress()), zap.String("grpc_addr", grpcAddr)) + s.logger.Debug("gRPC address has not changed", zap.String("node_id", nodeId), zap.String("client_grpc_addr", client.GetAddress()), zap.String("grpc_addr", nodeInfo.Metadata.GrpcAddress)) } } else { s.logger.Debug("client does not exist in peer list", zap.String("node_id", nodeId)) - - s.logger.Debug("create gRPC client", zap.String("node_id", nodeId), zap.String("grpc_addr", grpcAddr)) - peerClient, err := NewGRPCClient(grpcAddr) + s.logger.Debug("create gRPC client", zap.String("node_id", nodeId), zap.String("grpc_addr", nodeInfo.Metadata.GrpcAddress)) + peerClient, err := NewGRPCClient(nodeInfo.Metadata.GrpcAddress) if err != nil { - s.logger.Warn(err.Error(), zap.String("node_id", nodeId), zap.String("grpc_addr", grpcAddr)) + s.logger.Warn(err.Error(), zap.String("node_id", nodeId), zap.String("grpc_addr", nodeInfo.Metadata.GrpcAddress)) } if peerClient != nil { s.logger.Debug("append peer client to peer client list", zap.String("grpc_addr", peerClient.GetAddress())) @@ -198,7 +220,7 @@ func (s *GRPCService) startUpdateCluster(checkInterval time.Duration) { // close nonexistent clients for nodeId, client := range s.peerClients { - if nodeConfig, exist := peers[nodeId]; !exist { + if nodeConfig, exist := peers.Nodes[nodeId]; !exist { s.logger.Info("this client is no longer in use", zap.String("node_id", nodeId), zap.Any("node_config", nodeConfig)) s.logger.Debug("close client", zap.String("node_id", nodeId), zap.String("grpc_addr", client.GetAddress())) @@ -215,18 +237,15 @@ func (s *GRPCService) startUpdateCluster(checkInterval time.Duration) { // keep current peer nodes s.logger.Debug("current peers", zap.Any("peers", peers)) s.peers = peers + } else { + s.logger.Debug("there is no change in peers", zap.Any("peers", peers)) } - // notify current cluster - if !reflect.DeepEqual(s.cluster, cluster) { - // convert to GetClusterResponse for channel output - clusterResp := &management.ClusterInfoResponse{} - clusterAny := &any.Any{} - err = protobuf.UnmarshalAny(cluster, clusterAny) - if err != nil { - s.logger.Warn(err.Error()) + // compare cluster hash + if !cmp.Equal(clusterHash, newClusterHash) { + clusterResp := &management.ClusterInfoResponse{ + Cluster: cluster, } - clusterResp.Cluster = clusterAny // output to channel for c := range s.clusterChans { @@ -235,7 +254,10 @@ func (s *GRPCService) startUpdateCluster(checkInterval time.Duration) { // keep current cluster s.logger.Debug("current cluster", zap.Any("cluster", cluster)) - s.cluster = cluster + // TODO: overwrite cluster hash + clusterHash = newClusterHash + } else { + s.logger.Debug("there is no change in cluster", zap.Any("cluster", cluster)) } default: time.Sleep(100 * time.Millisecond) @@ -282,39 +304,41 @@ func (s *GRPCService) NodeID() string { return s.raftServer.NodeID() } -func (s *GRPCService) getSelfNode() (map[string]interface{}, error) { - return map[string]interface{}{ - "node_config": s.raftServer.nodeConfig.ToMap(), - "state": s.raftServer.State().String(), - }, nil +func (s *GRPCService) getSelfNode() (*management.Node, error) { + node := s.raftServer.node + node.Status = s.raftServer.State().String() + + return node, nil } -func (s *GRPCService) getPeerNode(id string) (map[string]interface{}, error) { - var nodeInfo map[string]interface{} +func (s *GRPCService) getPeerNode(id string) (*management.Node, error) { + var nodeInfo *management.Node var err error if peerClient, exist := s.peerClients[id]; exist { nodeInfo, err = peerClient.NodeInfo(id) if err != nil { s.logger.Warn(err.Error()) - nodeInfo = map[string]interface{}{ - "node_config": map[string]interface{}{}, - "state": raft.Shutdown.String(), + nodeInfo = &management.Node{ + BindAddress: "", + Status: raft.Shutdown.String(), + Metadata: &management.Metadata{}, } } } else { s.logger.Warn("node does not exist in peer list", zap.String("id", id)) - nodeInfo = map[string]interface{}{ - "node_config": map[string]interface{}{}, - "state": raft.Shutdown.String(), + nodeInfo = &management.Node{ + BindAddress: "", + Status: raft.Shutdown.String(), + Metadata: &management.Metadata{}, } } return nodeInfo, nil } -func (s *GRPCService) getNode(id string) (map[string]interface{}, error) { - var nodeInfo map[string]interface{} +func (s *GRPCService) getNode(id string) (*management.Node, error) { + var nodeInfo *management.Node var err error if id == "" || id == s.NodeID() { @@ -340,30 +364,12 @@ func (s *GRPCService) NodeInfo(ctx context.Context, req *management.NodeInfoRequ return resp, status.Error(codes.Internal, err.Error()) } - nodeConfigAny := &any.Any{} - if nodeConfig, exist := nodeInfo["node_config"]; exist { - err = protobuf.UnmarshalAny(nodeConfig.(map[string]interface{}), nodeConfigAny) - if err != nil { - s.logger.Error(err.Error()) - return resp, status.Error(codes.Internal, err.Error()) - } - } else { - s.logger.Error("missing node_config", zap.Any("node_config", nodeConfig)) - } - - state, exist := nodeInfo["state"].(string) - if !exist { - s.logger.Error("missing node state", zap.String("state", state)) - state = raft.Shutdown.String() - } - - resp.NodeConfig = nodeConfigAny - resp.State = state + resp.Node = nodeInfo return resp, nil } -func (s *GRPCService) setNode(id string, nodeConfig map[string]interface{}) error { +func (s *GRPCService) setNode(id string, nodeConfig *management.Node) error { if s.raftServer.IsLeader() { err := s.raftServer.SetNode(id, nodeConfig) if err != nil { @@ -390,15 +396,7 @@ func (s *GRPCService) setNode(id string, nodeConfig map[string]interface{}) erro func (s *GRPCService) ClusterJoin(ctx context.Context, req *management.ClusterJoinRequest) (*empty.Empty, error) { resp := &empty.Empty{} - ins, err := protobuf.MarshalAny(req.NodeConfig) - if err != nil { - s.logger.Error(err.Error()) - return resp, status.Error(codes.Internal, err.Error()) - } - - nodeConfig := *ins.(*map[string]interface{}) - - err = s.setNode(req.Id, nodeConfig) + err := s.setNode(req.Id, req.Node) if err != nil { s.logger.Error(err.Error()) return resp, status.Error(codes.Internal, err.Error()) @@ -443,26 +441,21 @@ func (s *GRPCService) ClusterLeave(ctx context.Context, req *management.ClusterL return resp, nil } -func (s *GRPCService) getCluster() (map[string]interface{}, error) { +func (s *GRPCService) getCluster() (*management.Cluster, error) { cluster, err := s.raftServer.GetCluster() if err != nil { s.logger.Error(err.Error()) return nil, err } - // update node state - for nodeId := range cluster { + // update latest node state + for nodeId := range cluster.Nodes { node, err := s.getNode(nodeId) if err != nil { - s.logger.Error(err.Error()) - } - state := node["state"].(string) - - if _, ok := cluster[nodeId]; !ok { - cluster[nodeId] = map[string]interface{}{} + s.logger.Warn(err.Error()) + continue } - nodeInfo := cluster[nodeId].(map[string]interface{}) - nodeInfo["state"] = state + cluster.Nodes[nodeId].Status = node.Status } return cluster, nil @@ -477,14 +470,7 @@ func (s *GRPCService) ClusterInfo(ctx context.Context, req *empty.Empty) (*manag return resp, status.Error(codes.Internal, err.Error()) } - clusterAny := &any.Any{} - err = protobuf.UnmarshalAny(cluster, clusterAny) - if err != nil { - s.logger.Error(err.Error()) - return resp, status.Error(codes.Internal, err.Error()) - } - - resp.Cluster = clusterAny + resp.Cluster = cluster return resp, nil } diff --git a/manager/raft_fsm.go b/manager/raft_fsm.go index d918e62..75ceb1a 100644 --- a/manager/raft_fsm.go +++ b/manager/raft_fsm.go @@ -24,6 +24,7 @@ import ( "github.com/hashicorp/raft" blasterrors "github.com/mosuka/blast/errors" "github.com/mosuka/blast/maputils" + "github.com/mosuka/blast/protobuf/management" "go.uber.org/zap" ) @@ -31,10 +32,11 @@ type RaftFSM struct { path string logger *zap.Logger - metadata maputils.Map - metadataMutex sync.RWMutex + cluster *management.Cluster + clusterMutex sync.RWMutex - data maputils.Map + data maputils.Map + dataMutex sync.RWMutex } func NewRaftFSM(path string, logger *zap.Logger) (*RaftFSM, error) { @@ -46,7 +48,7 @@ func NewRaftFSM(path string, logger *zap.Logger) (*RaftFSM, error) { func (f *RaftFSM) Start() error { f.logger.Info("initialize metadata") - f.metadata = maputils.Map{} + f.cluster = &management.Cluster{Nodes: make(map[string]*management.Node, 0)} f.logger.Info("initialize store data") f.data = maputils.Map{} @@ -58,45 +60,37 @@ func (f *RaftFSM) Stop() error { return nil } -func (f *RaftFSM) GetNodeConfig(nodeId string) (map[string]interface{}, error) { - f.metadataMutex.RLock() - defer f.metadataMutex.RUnlock() +func (f *RaftFSM) GetNodeConfig(nodeId string) (*management.Node, error) { + f.clusterMutex.RLock() + defer f.clusterMutex.RUnlock() - nodeConfig, err := f.metadata.Get(nodeId) - if err != nil { - f.logger.Error(err.Error(), zap.String("node_id", nodeId)) - if err == maputils.ErrNotFound { - return nil, blasterrors.ErrNotFound - } - return nil, err + node, ok := f.cluster.Nodes[nodeId] + if !ok { + return nil, blasterrors.ErrNotFound } - return nodeConfig.(maputils.Map).ToMap(), nil + return node, nil } -func (f *RaftFSM) SetNodeConfig(nodeId string, nodeConfig map[string]interface{}) error { - f.metadataMutex.RLock() - defer f.metadataMutex.RUnlock() +func (f *RaftFSM) SetNodeConfig(nodeId string, node *management.Node) error { + f.clusterMutex.RLock() + defer f.clusterMutex.RUnlock() - err := f.metadata.Merge(nodeId, nodeConfig) - if err != nil { - f.logger.Error(err.Error(), zap.String("node_id", nodeId), zap.Any("node_config", nodeConfig)) - return err - } + f.cluster.Nodes[nodeId] = node return nil } func (f *RaftFSM) DeleteNodeConfig(nodeId string) error { - f.metadataMutex.RLock() - defer f.metadataMutex.RUnlock() + f.clusterMutex.RLock() + defer f.clusterMutex.RUnlock() - err := f.metadata.Delete(nodeId) - if err != nil { - f.logger.Error(err.Error(), zap.String("node_id", nodeId)) - return err + if _, ok := f.cluster.Nodes[nodeId]; !ok { + return blasterrors.ErrNotFound } + delete(f.cluster.Nodes, nodeId) + return nil } @@ -178,7 +172,22 @@ func (f *RaftFSM) Apply(l *raft.Log) interface{} { f.logger.Error(err.Error()) return &fsmResponse{error: err} } - err = f.SetNodeConfig(data["node_id"].(string), data["node_config"].(map[string]interface{})) + b, err := json.Marshal(data["node"]) + if err != nil { + f.logger.Error(err.Error()) + return &fsmResponse{error: err} + } + var node *management.Node + err = json.Unmarshal(b, &node) + if err != nil { + f.logger.Error(err.Error()) + return &fsmResponse{error: err} + } + err = f.SetNodeConfig(data["node_id"].(string), node) + if err != nil { + f.logger.Error(err.Error()) + return &fsmResponse{error: err} + } return &fsmResponse{error: err} case deleteNode: var data map[string]interface{} diff --git a/manager/raft_fsm_test.go b/manager/raft_fsm_test.go index 8bb2c97..1b6a243 100644 --- a/manager/raft_fsm_test.go +++ b/manager/raft_fsm_test.go @@ -20,7 +20,9 @@ import ( "reflect" "testing" + "github.com/hashicorp/raft" "github.com/mosuka/blast/logutils" + "github.com/mosuka/blast/protobuf/management" ) func TestRaftFSM_GetNode(t *testing.T) { @@ -52,32 +54,54 @@ func TestRaftFSM_GetNode(t *testing.T) { t.Fatalf("%v", err) } - _ = fsm.SetNodeConfig("node1", map[string]interface{}{ - "bind_addr": ":16060", - "grpc_addr": ":17070", - "http_addr": ":18080", - }) - _ = fsm.SetNodeConfig("node2", map[string]interface{}{ - "bind_addr": ":16061", - "grpc_addr": ":17071", - "http_addr": ":18081", - }) - _ = fsm.SetNodeConfig("node3", map[string]interface{}{ - "bind_addr": ":16062", - "grpc_addr": ":17072", - "http_addr": ":18082", - }) + _ = fsm.SetNodeConfig( + "node1", + &management.Node{ + BindAddress: "2100", + Status: raft.Leader.String(), + Metadata: &management.Metadata{ + GrpcAddress: "5100", + HttpAddress: "8100", + }, + }, + ) + _ = fsm.SetNodeConfig( + "node2", + &management.Node{ + BindAddress: "2110", + Status: raft.Follower.String(), + Metadata: &management.Metadata{ + GrpcAddress: "5110", + HttpAddress: "8110", + }, + }, + ) + _ = fsm.SetNodeConfig( + "node3", + &management.Node{ + BindAddress: "2120", + Status: raft.Follower.String(), + Metadata: &management.Metadata{ + GrpcAddress: "5120", + HttpAddress: "8120", + }, + }, + ) val1, err := fsm.GetNodeConfig("node2") if err != nil { t.Fatalf("%v", err) } - exp1 := map[string]interface{}{ - "bind_addr": ":16061", - "grpc_addr": ":17071", - "http_addr": ":18081", + exp1 := &management.Node{ + BindAddress: "2110", + Status: raft.Follower.String(), + Metadata: &management.Metadata{ + GrpcAddress: "5110", + HttpAddress: "8110", + }, } + act1 := val1 if !reflect.DeepEqual(exp1, act1) { t.Fatalf("expected content to see %v, saw %v", exp1, act1) @@ -114,53 +138,82 @@ func TestRaftFSM_SetNode(t *testing.T) { t.Fatalf("%v", err) } - _ = fsm.SetNodeConfig("node1", map[string]interface{}{ - "bind_addr": ":16060", - "grpc_addr": ":17070", - "http_addr": ":18080", - }) - _ = fsm.SetNodeConfig("node2", map[string]interface{}{ - "bind_addr": ":16061", - "grpc_addr": ":17071", - "http_addr": ":18081", - }) - _ = fsm.SetNodeConfig("node3", map[string]interface{}{ - "bind_addr": ":16062", - "grpc_addr": ":17072", - "http_addr": ":18082", - }) + _ = fsm.SetNodeConfig( + "node1", + &management.Node{ + BindAddress: "2100", + Status: raft.Leader.String(), + Metadata: &management.Metadata{ + GrpcAddress: "5100", + HttpAddress: "8100", + }, + }, + ) + _ = fsm.SetNodeConfig( + "node2", + &management.Node{ + BindAddress: "2110", + Status: raft.Follower.String(), + Metadata: &management.Metadata{ + GrpcAddress: "5110", + HttpAddress: "8110", + }, + }, + ) + _ = fsm.SetNodeConfig( + "node3", + &management.Node{ + BindAddress: "2120", + Status: raft.Follower.String(), + Metadata: &management.Metadata{ + GrpcAddress: "5120", + HttpAddress: "8120", + }, + }, + ) val1, err := fsm.GetNodeConfig("node2") if err != nil { t.Fatalf("%v", err) } - exp1 := map[string]interface{}{ - "bind_addr": ":16061", - "grpc_addr": ":17071", - "http_addr": ":18081", + exp1 := &management.Node{ + BindAddress: "2110", + Status: raft.Follower.String(), + Metadata: &management.Metadata{ + GrpcAddress: "5110", + HttpAddress: "8110", + }, } act1 := val1 if !reflect.DeepEqual(exp1, act1) { t.Fatalf("expected content to see %v, saw %v", exp1, act1) } - _ = fsm.SetNodeConfig("node2", map[string]interface{}{ - "bind_addr": ":16061", - "grpc_addr": ":17071", - "http_addr": ":18081", - "leader": true, - }) + _ = fsm.SetNodeConfig( + "node2", + &management.Node{ + BindAddress: "2110", + Status: raft.Shutdown.String(), + Metadata: &management.Metadata{ + GrpcAddress: "5110", + HttpAddress: "8110", + }, + }, + ) val2, err := fsm.GetNodeConfig("node2") if err != nil { t.Fatalf("%v", err) } - exp2 := map[string]interface{}{ - "bind_addr": ":16061", - "grpc_addr": ":17071", - "http_addr": ":18081", - "leader": true, + exp2 := &management.Node{ + BindAddress: "2110", + Status: raft.Shutdown.String(), + Metadata: &management.Metadata{ + GrpcAddress: "5110", + HttpAddress: "8110", + }, } + act2 := val2 if !reflect.DeepEqual(exp2, act2) { t.Fatalf("expected content to see %v, saw %v", exp2, act2) @@ -196,30 +249,51 @@ func TestRaftFSM_DeleteNode(t *testing.T) { t.Fatalf("%v", err) } - _ = fsm.SetNodeConfig("node1", map[string]interface{}{ - "bind_addr": ":16060", - "grpc_addr": ":17070", - "http_addr": ":18080", - }) - _ = fsm.SetNodeConfig("node2", map[string]interface{}{ - "bind_addr": ":16061", - "grpc_addr": ":17071", - "http_addr": ":18081", - }) - _ = fsm.SetNodeConfig("node3", map[string]interface{}{ - "bind_addr": ":16062", - "grpc_addr": ":17072", - "http_addr": ":18082", - }) + _ = fsm.SetNodeConfig( + "node1", + &management.Node{ + BindAddress: "2100", + Status: raft.Leader.String(), + Metadata: &management.Metadata{ + GrpcAddress: "5100", + HttpAddress: "8100", + }, + }, + ) + _ = fsm.SetNodeConfig( + "node2", + &management.Node{ + BindAddress: "2110", + Status: raft.Follower.String(), + Metadata: &management.Metadata{ + GrpcAddress: "5110", + HttpAddress: "8110", + }, + }, + ) + _ = fsm.SetNodeConfig( + "node3", + &management.Node{ + BindAddress: "2120", + Status: raft.Follower.String(), + Metadata: &management.Metadata{ + GrpcAddress: "5120", + HttpAddress: "8120", + }, + }, + ) val1, err := fsm.GetNodeConfig("node2") if err != nil { t.Fatalf("%v", err) } - exp1 := map[string]interface{}{ - "bind_addr": ":16061", - "grpc_addr": ":17071", - "http_addr": ":18081", + exp1 := &management.Node{ + BindAddress: "2110", + Status: raft.Follower.String(), + Metadata: &management.Metadata{ + GrpcAddress: "5110", + HttpAddress: "8110", + }, } act1 := val1 if !reflect.DeepEqual(exp1, act1) { @@ -340,9 +414,6 @@ func TestRaftFSM_Set(t *testing.T) { _ = fsm.SetValue("/", map[string]interface{}{ "a": "A", }, true) - if err != nil { - t.Fatalf("%v", err) - } val2, err := fsm.GetValue("/") if err != nil { t.Fatalf("%v", err) diff --git a/manager/raft_server.go b/manager/raft_server.go index fe29955..b998de7 100644 --- a/manager/raft_server.go +++ b/manager/raft_server.go @@ -30,34 +30,41 @@ import ( _ "github.com/mosuka/blast/builtins" "github.com/mosuka/blast/config" blasterrors "github.com/mosuka/blast/errors" + "github.com/mosuka/blast/protobuf/management" "go.uber.org/zap" //raftmdb "github.com/hashicorp/raft-mdb" ) type RaftServer struct { - nodeConfig *config.NodeConfig - indexConfig *config.IndexConfig - bootstrap bool - logger *zap.Logger + nodeId string + node *management.Node + dataDir string + raftStorageType string + indexConfig *config.IndexConfig + bootstrap bool + logger *zap.Logger raft *raft.Raft fsm *RaftFSM mu sync.RWMutex } -func NewRaftServer(nodeConfig *config.NodeConfig, indexConfig *config.IndexConfig, bootstrap bool, logger *zap.Logger) (*RaftServer, error) { +func NewRaftServer(nodeId string, node *management.Node, dataDir string, raftStorageType string, indexConfig *config.IndexConfig, bootstrap bool, logger *zap.Logger) (*RaftServer, error) { return &RaftServer{ - nodeConfig: nodeConfig, - indexConfig: indexConfig, - bootstrap: bootstrap, - logger: logger, + nodeId: nodeId, + node: node, + dataDir: dataDir, + raftStorageType: raftStorageType, + indexConfig: indexConfig, + bootstrap: bootstrap, + logger: logger, }, nil } func (s *RaftServer) Start() error { var err error - fsmPath := filepath.Join(s.nodeConfig.DataDir, "store") + fsmPath := filepath.Join(s.dataDir, "store") s.logger.Info("create finite state machine", zap.String("path", fsmPath)) s.fsm, err = NewRaftFSM(fsmPath, s.logger) if err != nil { @@ -72,27 +79,27 @@ func (s *RaftServer) Start() error { return err } - s.logger.Info("create Raft config", zap.String("node_id", s.nodeConfig.NodeId)) + s.logger.Info("create Raft config", zap.String("node_id", s.nodeId)) raftConfig := raft.DefaultConfig() - raftConfig.LocalID = raft.ServerID(s.nodeConfig.NodeId) + raftConfig.LocalID = raft.ServerID(s.nodeId) raftConfig.SnapshotThreshold = 1024 raftConfig.LogOutput = ioutil.Discard - s.logger.Info("resolve TCP address", zap.String("bind_addr", s.nodeConfig.BindAddr)) - addr, err := net.ResolveTCPAddr("tcp", s.nodeConfig.BindAddr) + s.logger.Info("resolve TCP address", zap.String("bind_addr", s.node.BindAddress)) + addr, err := net.ResolveTCPAddr("tcp", s.node.BindAddress) if err != nil { s.logger.Fatal(err.Error()) return err } - s.logger.Info("create TCP transport", zap.String("bind_addr", s.nodeConfig.BindAddr)) - transport, err := raft.NewTCPTransport(s.nodeConfig.BindAddr, addr, 3, 10*time.Second, ioutil.Discard) + s.logger.Info("create TCP transport", zap.String("bind_addr", s.node.BindAddress)) + transport, err := raft.NewTCPTransport(s.node.BindAddress, addr, 3, 10*time.Second, ioutil.Discard) if err != nil { s.logger.Fatal(err.Error()) return err } - snapshotPath := s.nodeConfig.DataDir + snapshotPath := s.dataDir s.logger.Info("create snapshot store", zap.String("path", snapshotPath)) snapshotStore, err := raft.NewFileSnapshotStore(snapshotPath, 2, ioutil.Discard) if err != nil { @@ -103,10 +110,10 @@ func (s *RaftServer) Start() error { s.logger.Info("create Raft machine") var logStore raft.LogStore var stableStore raft.StableStore - switch s.nodeConfig.RaftStorageType { + switch s.raftStorageType { case "boltdb": - logStorePath := filepath.Join(s.nodeConfig.DataDir, "raft", "log", "boltdb.db") - s.logger.Info("create raft log store", zap.String("path", logStorePath), zap.String("raft_storage_type", s.nodeConfig.RaftStorageType)) + logStorePath := filepath.Join(s.dataDir, "raft", "log", "boltdb.db") + s.logger.Info("create raft log store", zap.String("path", logStorePath), zap.String("raft_storage_type", s.raftStorageType)) err = os.MkdirAll(filepath.Dir(logStorePath), 0755) if err != nil { s.logger.Fatal(err.Error()) @@ -117,8 +124,8 @@ func (s *RaftServer) Start() error { s.logger.Fatal(err.Error()) return err } - stableStorePath := filepath.Join(s.nodeConfig.DataDir, "raft", "stable", "boltdb.db") - s.logger.Info("create raft stable store", zap.String("path", stableStorePath), zap.String("raft_storage_type", s.nodeConfig.RaftStorageType)) + stableStorePath := filepath.Join(s.dataDir, "raft", "stable", "boltdb.db") + s.logger.Info("create raft stable store", zap.String("path", stableStorePath), zap.String("raft_storage_type", s.raftStorageType)) err = os.MkdirAll(filepath.Dir(stableStorePath), 0755) stableStore, err = raftboltdb.NewBoltStore(stableStorePath) if err != nil { @@ -126,8 +133,8 @@ func (s *RaftServer) Start() error { return err } case "badger": - logStorePath := filepath.Join(s.nodeConfig.DataDir, "raft", "log") - s.logger.Info("create raft log store", zap.String("path", logStorePath), zap.String("raft_storage_type", s.nodeConfig.RaftStorageType)) + logStorePath := filepath.Join(s.dataDir, "raft", "log") + s.logger.Info("create raft log store", zap.String("path", logStorePath), zap.String("raft_storage_type", s.raftStorageType)) err = os.MkdirAll(filepath.Join(logStorePath, "badger"), 0755) if err != nil { s.logger.Fatal(err.Error()) @@ -138,8 +145,8 @@ func (s *RaftServer) Start() error { s.logger.Fatal(err.Error()) return err } - stableStorePath := filepath.Join(s.nodeConfig.DataDir, "raft", "stable") - s.logger.Info("create raft stable store", zap.String("path", stableStorePath), zap.String("raft_storage_type", s.nodeConfig.RaftStorageType)) + stableStorePath := filepath.Join(s.dataDir, "raft", "stable") + s.logger.Info("create raft stable store", zap.String("path", stableStorePath), zap.String("raft_storage_type", s.raftStorageType)) err = os.MkdirAll(filepath.Join(stableStorePath, "badger"), 0755) if err != nil { s.logger.Fatal(err.Error()) @@ -151,8 +158,8 @@ func (s *RaftServer) Start() error { return err } default: - logStorePath := filepath.Join(s.nodeConfig.DataDir, "raft", "log", "boltdb.db") - s.logger.Info("create raft log store", zap.String("path", logStorePath), zap.String("raft_storage_type", s.nodeConfig.RaftStorageType)) + logStorePath := filepath.Join(s.dataDir, "raft", "log", "boltdb.db") + s.logger.Info("create raft log store", zap.String("path", logStorePath), zap.String("raft_storage_type", s.raftStorageType)) err = os.MkdirAll(filepath.Dir(logStorePath), 0755) if err != nil { s.logger.Fatal(err.Error()) @@ -163,8 +170,8 @@ func (s *RaftServer) Start() error { s.logger.Fatal(err.Error()) return err } - stableStorePath := filepath.Join(s.nodeConfig.DataDir, "raft", "stable", "boltdb.db") - s.logger.Info("create raft stable store", zap.String("path", stableStorePath), zap.String("raft_storage_type", s.nodeConfig.RaftStorageType)) + stableStorePath := filepath.Join(s.dataDir, "raft", "stable", "boltdb.db") + s.logger.Info("create raft stable store", zap.String("path", stableStorePath), zap.String("raft_storage_type", s.raftStorageType)) err = os.MkdirAll(filepath.Dir(stableStorePath), 0755) stableStore, err = raftboltdb.NewBoltStore(stableStorePath) if err != nil { @@ -200,8 +207,8 @@ func (s *RaftServer) Start() error { } // set node config - s.logger.Info("register its own node config", zap.String("node_id", s.nodeConfig.NodeId), zap.Any("node_config", s.nodeConfig)) - err = s.setNodeConfig(s.nodeConfig.NodeId, s.nodeConfig.ToMap()) + s.logger.Info("register its own node config", zap.String("node_id", s.nodeId), zap.Any("node", s.node)) + err = s.setNodeConfig(s.nodeId, s.node) if err != nil { s.logger.Fatal(err.Error()) return err @@ -285,7 +292,7 @@ func (s *RaftServer) LeaderID(timeout time.Duration) (raft.ServerID, error) { } func (s *RaftServer) NodeID() string { - return s.nodeConfig.NodeId + return s.nodeId } func (s *RaftServer) Stats() map[string]string { @@ -310,7 +317,7 @@ func (s *RaftServer) WaitForDetectLeader(timeout time.Duration) error { return nil } -func (s *RaftServer) getNodeConfig(nodeId string) (map[string]interface{}, error) { +func (s *RaftServer) getNodeConfig(nodeId string) (*management.Node, error) { nodeConfig, err := s.fsm.GetNodeConfig(nodeId) if err != nil { s.logger.Error(err.Error()) @@ -320,12 +327,12 @@ func (s *RaftServer) getNodeConfig(nodeId string) (map[string]interface{}, error return nodeConfig, nil } -func (s *RaftServer) setNodeConfig(nodeId string, nodeConfig map[string]interface{}) error { +func (s *RaftServer) setNodeConfig(nodeId string, node *management.Node) error { msg, err := newMessage( setNode, map[string]interface{}{ - "node_id": nodeId, - "node_config": nodeConfig, + "node_id": nodeId, + "node": node, }, ) if err != nil { @@ -387,7 +394,7 @@ func (s *RaftServer) deleteNodeConfig(nodeId string) error { return nil } -func (s *RaftServer) GetNode(id string) (map[string]interface{}, error) { +func (s *RaftServer) GetNode(id string) (*management.Node, error) { cf := s.raft.GetConfiguration() err := cf.Error() if err != nil { @@ -395,15 +402,14 @@ func (s *RaftServer) GetNode(id string) (map[string]interface{}, error) { return nil, err } - node := make(map[string]interface{}, 0) + var node *management.Node for _, server := range cf.Configuration().Servers { if server.ID == raft.ServerID(id) { - nodeConfig, err := s.getNodeConfig(id) + node, err = s.getNodeConfig(id) if err != nil { s.logger.Error(err.Error()) return nil, err } - node["node_config"] = nodeConfig break } } @@ -411,7 +417,7 @@ func (s *RaftServer) GetNode(id string) (map[string]interface{}, error) { return node, nil } -func (s *RaftServer) SetNode(nodeId string, nodeConfig map[string]interface{}) error { +func (s *RaftServer) SetNode(nodeId string, node *management.Node) error { if !s.IsLeader() { s.logger.Warn(raft.ErrNotLeader.Error(), zap.String("state", s.raft.State().String())) return raft.ErrNotLeader @@ -431,15 +437,15 @@ func (s *RaftServer) SetNode(nodeId string, nodeConfig map[string]interface{}) e } } - bindAddr, ok := nodeConfig["bind_addr"].(string) - if !ok { - s.logger.Error("missing metadata", zap.String("bind_addr", bindAddr)) - return errors.New("missing metadata") + if node.BindAddress == "" { + err = errors.New("missing bind address") + s.logger.Error(err.Error(), zap.String("bind_addr", node.BindAddress)) + return err } // add node to Raft cluster - s.logger.Info("add voter", zap.String("nodeId", nodeId), zap.String("address", bindAddr)) - f := s.raft.AddVoter(raft.ServerID(nodeId), raft.ServerAddress(bindAddr), 0, 0) + s.logger.Info("add voter", zap.String("nodeId", nodeId), zap.Any("node", node)) + f := s.raft.AddVoter(raft.ServerID(nodeId), raft.ServerAddress(node.BindAddress), 0, 0) err = f.Error() if err != nil { s.logger.Error(err.Error()) @@ -447,7 +453,7 @@ func (s *RaftServer) SetNode(nodeId string, nodeConfig map[string]interface{}) e } // set node config - err = s.setNodeConfig(nodeId, nodeConfig) + err = s.setNodeConfig(nodeId, node) if err != nil { s.logger.Error(err.Error()) return err @@ -458,7 +464,7 @@ func (s *RaftServer) SetNode(nodeId string, nodeConfig map[string]interface{}) e func (s *RaftServer) DeleteNode(nodeId string) error { if !s.IsLeader() { - s.logger.Warn(raft.ErrNotLeader.Error(), zap.String("state", s.raft.State().String())) + s.logger.Error(raft.ErrNotLeader.Error(), zap.String("state", s.raft.State().String())) return raft.ErrNotLeader } @@ -492,7 +498,7 @@ func (s *RaftServer) DeleteNode(nodeId string) error { return nil } -func (s *RaftServer) GetCluster() (map[string]interface{}, error) { +func (s *RaftServer) GetCluster() (*management.Cluster, error) { cf := s.raft.GetConfiguration() err := cf.Error() if err != nil { @@ -500,14 +506,15 @@ func (s *RaftServer) GetCluster() (map[string]interface{}, error) { return nil, err } - cluster := map[string]interface{}{} + cluster := &management.Cluster{Nodes: make(map[string]*management.Node, 0)} for _, server := range cf.Configuration().Servers { node, err := s.GetNode(string(server.ID)) if err != nil { s.logger.Warn(err.Error()) - node = map[string]interface{}{} + continue } - cluster[string(server.ID)] = node + + cluster.Nodes[string(server.ID)] = node } return cluster, nil diff --git a/manager/server.go b/manager/server.go index b7ebeb1..bb36b92 100644 --- a/manager/server.go +++ b/manager/server.go @@ -17,16 +17,20 @@ package manager import ( accesslog "github.com/mash/go-accesslog" "github.com/mosuka/blast/config" + "github.com/mosuka/blast/protobuf/management" "go.uber.org/zap" ) type Server struct { - clusterConfig *config.ClusterConfig - nodeConfig *config.NodeConfig - indexConfig *config.IndexConfig - logger *zap.Logger - grpcLogger *zap.Logger - httpLogger accesslog.Logger + peerGrpcAddr string + nodeId string + node *management.Node + dataDir string + raftStorageType string + indexConfig *config.IndexConfig + logger *zap.Logger + grpcLogger *zap.Logger + httpLogger accesslog.Logger raftServer *RaftServer grpcService *GRPCService @@ -35,14 +39,17 @@ type Server struct { httpServer *HTTPServer } -func NewServer(clusterConfig *config.ClusterConfig, nodeConfig *config.NodeConfig, indexConfig *config.IndexConfig, logger *zap.Logger, grpcLogger *zap.Logger, httpLogger accesslog.Logger) (*Server, error) { +func NewServer(peerGrpcAddr string, nodeId string, node *management.Node, dataDir string, raftStorageType string, indexConfig *config.IndexConfig, logger *zap.Logger, grpcLogger *zap.Logger, httpLogger accesslog.Logger) (*Server, error) { return &Server{ - clusterConfig: clusterConfig, - nodeConfig: nodeConfig, - indexConfig: indexConfig, - logger: logger, - grpcLogger: grpcLogger, - httpLogger: httpLogger, + peerGrpcAddr: peerGrpcAddr, + nodeId: nodeId, + node: node, + dataDir: dataDir, + raftStorageType: raftStorageType, + indexConfig: indexConfig, + logger: logger, + grpcLogger: grpcLogger, + httpLogger: httpLogger, }, nil } @@ -50,11 +57,11 @@ func (s *Server) Start() { var err error // bootstrap node? - bootstrap := s.clusterConfig.PeerAddr == "" + bootstrap := s.peerGrpcAddr == "" s.logger.Info("bootstrap", zap.Bool("bootstrap", bootstrap)) // create raft server - s.raftServer, err = NewRaftServer(s.nodeConfig, s.indexConfig, bootstrap, s.logger) + s.raftServer, err = NewRaftServer(s.nodeId, s.node, s.dataDir, s.raftStorageType, s.indexConfig, bootstrap, s.logger) if err != nil { s.logger.Fatal(err.Error()) return @@ -68,21 +75,21 @@ func (s *Server) Start() { } // create gRPC server - s.grpcServer, err = NewGRPCServer(s.nodeConfig.GRPCAddr, s.grpcService, s.grpcLogger) + s.grpcServer, err = NewGRPCServer(s.node.Metadata.GrpcAddress, s.grpcService, s.grpcLogger) if err != nil { s.logger.Fatal(err.Error()) return } // create HTTP router - s.httpRouter, err = NewRouter(s.nodeConfig.GRPCAddr, s.logger) + s.httpRouter, err = NewRouter(s.node.Metadata.GrpcAddress, s.logger) if err != nil { s.logger.Fatal(err.Error()) return } // create HTTP server - s.httpServer, err = NewHTTPServer(s.nodeConfig.HTTPAddr, s.httpRouter, s.logger, s.httpLogger) + s.httpServer, err = NewHTTPServer(s.node.Metadata.HttpAddress, s.httpRouter, s.logger, s.httpLogger) if err != nil { s.logger.Error(err.Error()) return @@ -124,7 +131,7 @@ func (s *Server) Start() { // join to the existing cluster if !bootstrap { - client, err := NewGRPCClient(s.clusterConfig.PeerAddr) + client, err := NewGRPCClient(s.peerGrpcAddr) defer func() { err := client.Close() if err != nil { @@ -136,7 +143,7 @@ func (s *Server) Start() { return } - err = client.ClusterJoin(s.nodeConfig.NodeId, s.nodeConfig.ToMap()) + err = client.ClusterJoin(s.nodeId, s.node) if err != nil { s.logger.Fatal(err.Error()) return diff --git a/manager/server_test.go b/manager/server_test.go index b4bb963..e5dae37 100644 --- a/manager/server_test.go +++ b/manager/server_test.go @@ -15,6 +15,7 @@ package manager import ( + "fmt" "os" "path/filepath" "reflect" @@ -22,43 +23,44 @@ import ( "time" "github.com/hashicorp/raft" - "github.com/mosuka/blast/config" blasterrors "github.com/mosuka/blast/errors" - "github.com/mosuka/blast/indexutils" "github.com/mosuka/blast/logutils" "github.com/mosuka/blast/protobuf/management" + "github.com/mosuka/blast/strutils" "github.com/mosuka/blast/testutils" ) func TestServer_Start(t *testing.T) { curDir, _ := os.Getwd() - // create logger logger := logutils.NewLogger("WARN", "", 500, 3, 30, false) - - // create gRPC logger grpcLogger := logutils.NewLogger("WARN", "", 500, 3, 30, false) - - // create HTTP access logger httpAccessLogger := logutils.NewApacheCombinedLogger("", 500, 3, 30, false) - // create cluster config - clusterConfig := config.DefaultClusterConfig() - - // create node config - nodeConfig := testutils.TmpNodeConfig() - defer func() { - _ = os.RemoveAll(nodeConfig.DataDir) - }() + peerGrpcAddress := "" + grpcAddress := fmt.Sprintf(":%d", testutils.TmpPort()) + httpAddress := fmt.Sprintf(":%d", testutils.TmpPort()) + nodeId := fmt.Sprintf("node-%s", strutils.RandStr(5)) + bindAddress := fmt.Sprintf(":%d", testutils.TmpPort()) + dataDir := testutils.TmpDir() + raftStorageType := "boltdb" + + node := &management.Node{ + BindAddress: bindAddress, + Status: "", + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress, + HttpAddress: httpAddress, + }, + } - // create index config indexConfig, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") if err != nil { t.Fatalf("%v", err) } // create server - server, err := NewServer(clusterConfig, nodeConfig, indexConfig, logger, grpcLogger, httpAccessLogger) + server, err := NewServer(peerGrpcAddress, nodeId, node, dataDir, raftStorageType, indexConfig, logger, grpcLogger, httpAccessLogger) defer func() { if server != nil { server.Stop() @@ -78,32 +80,34 @@ func TestServer_Start(t *testing.T) { func TestServer_HealthCheck(t *testing.T) { curDir, _ := os.Getwd() - // create logger logger := logutils.NewLogger("WARN", "", 500, 3, 30, false) - - // create gRPC logger grpcLogger := logutils.NewLogger("WARN", "", 500, 3, 30, false) - - // create HTTP access logger httpAccessLogger := logutils.NewApacheCombinedLogger("", 500, 3, 30, false) - // create cluster config - clusterConfig := config.DefaultClusterConfig() - - // create node config - nodeConfig := testutils.TmpNodeConfig() - defer func() { - _ = os.RemoveAll(nodeConfig.DataDir) - }() + peerGrpcAddress := "" + grpcAddress := fmt.Sprintf(":%d", testutils.TmpPort()) + httpAddress := fmt.Sprintf(":%d", testutils.TmpPort()) + nodeId := fmt.Sprintf("node-%s", strutils.RandStr(5)) + bindAddress := fmt.Sprintf(":%d", testutils.TmpPort()) + dataDir := testutils.TmpDir() + raftStorageType := "boltdb" + + node := &management.Node{ + BindAddress: bindAddress, + Status: "", + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress, + HttpAddress: httpAddress, + }, + } - // create index config indexConfig, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") if err != nil { t.Fatalf("%v", err) } // create server - server, err := NewServer(clusterConfig, nodeConfig, indexConfig, logger, grpcLogger, httpAccessLogger) + server, err := NewServer(peerGrpcAddress, nodeId, node, dataDir, raftStorageType, indexConfig, logger, grpcLogger, httpAccessLogger) defer func() { if server != nil { server.Stop() @@ -120,7 +124,7 @@ func TestServer_HealthCheck(t *testing.T) { time.Sleep(5 * time.Second) // create gRPC client - client, err := NewGRPCClient(nodeConfig.GRPCAddr) + client, err := NewGRPCClient(node.Metadata.GrpcAddress) defer func() { if client != nil { err = client.Close() @@ -170,32 +174,34 @@ func TestServer_HealthCheck(t *testing.T) { func TestServer_GetNode(t *testing.T) { curDir, _ := os.Getwd() - // create logger logger := logutils.NewLogger("WARN", "", 500, 3, 30, false) - - // create gRPC logger grpcLogger := logutils.NewLogger("WARN", "", 500, 3, 30, false) - - // create HTTP access logger httpAccessLogger := logutils.NewApacheCombinedLogger("", 500, 3, 30, false) - // create cluster config - clusterConfig := config.DefaultClusterConfig() - - // create node config - nodeConfig := testutils.TmpNodeConfig() - defer func() { - _ = os.RemoveAll(nodeConfig.DataDir) - }() + peerGrpcAddress := "" + grpcAddress := fmt.Sprintf(":%d", testutils.TmpPort()) + httpAddress := fmt.Sprintf(":%d", testutils.TmpPort()) + nodeId := fmt.Sprintf("node-%s", strutils.RandStr(5)) + bindAddress := fmt.Sprintf(":%d", testutils.TmpPort()) + dataDir := testutils.TmpDir() + raftStorageType := "boltdb" + + node := &management.Node{ + BindAddress: bindAddress, + Status: "", + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress, + HttpAddress: httpAddress, + }, + } - // create index config indexConfig, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") if err != nil { t.Fatalf("%v", err) } // create server - server, err := NewServer(clusterConfig, nodeConfig, indexConfig, logger, grpcLogger, httpAccessLogger) + server, err := NewServer(peerGrpcAddress, nodeId, node, dataDir, raftStorageType, indexConfig, logger, grpcLogger, httpAccessLogger) defer func() { if server != nil { server.Stop() @@ -212,7 +218,7 @@ func TestServer_GetNode(t *testing.T) { time.Sleep(5 * time.Second) // create gRPC client - client, err := NewGRPCClient(nodeConfig.GRPCAddr) + client, err := NewGRPCClient(node.Metadata.GrpcAddress) defer func() { if client != nil { err = client.Close() @@ -226,13 +232,17 @@ func TestServer_GetNode(t *testing.T) { } // get node - nodeInfo, err := client.NodeInfo(nodeConfig.NodeId) + nodeInfo, err := client.NodeInfo(nodeId) if err != nil { t.Fatalf("%v", err) } - expNodeInfo := map[string]interface{}{ - "node_config": nodeConfig.ToMap(), - "state": "Leader", + expNodeInfo := &management.Node{ + BindAddress: bindAddress, + Status: raft.Leader.String(), + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress, + HttpAddress: httpAddress, + }, } actNodeInfo := nodeInfo if !reflect.DeepEqual(expNodeInfo, actNodeInfo) { @@ -243,32 +253,34 @@ func TestServer_GetNode(t *testing.T) { func TestServer_GetCluster(t *testing.T) { curDir, _ := os.Getwd() - // create logger logger := logutils.NewLogger("WARN", "", 500, 3, 30, false) - - // create gRPC logger grpcLogger := logutils.NewLogger("WARN", "", 500, 3, 30, false) - - // create HTTP access logger httpAccessLogger := logutils.NewApacheCombinedLogger("", 500, 3, 30, false) - // create cluster config - clusterConfig := config.DefaultClusterConfig() - - // create node config - nodeConfig := testutils.TmpNodeConfig() - defer func() { - _ = os.RemoveAll(nodeConfig.DataDir) - }() + peerGrpcAddress := "" + grpcAddress := fmt.Sprintf(":%d", testutils.TmpPort()) + httpAddress := fmt.Sprintf(":%d", testutils.TmpPort()) + nodeId := fmt.Sprintf("node-%s", strutils.RandStr(5)) + bindAddress := fmt.Sprintf(":%d", testutils.TmpPort()) + dataDir := testutils.TmpDir() + raftStorageType := "boltdb" + + node := &management.Node{ + BindAddress: bindAddress, + Status: "", + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress, + HttpAddress: httpAddress, + }, + } - // create index config indexConfig, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") if err != nil { t.Fatalf("%v", err) } // create server - server, err := NewServer(clusterConfig, nodeConfig, indexConfig, logger, grpcLogger, httpAccessLogger) + server, err := NewServer(peerGrpcAddress, nodeId, node, dataDir, raftStorageType, indexConfig, logger, grpcLogger, httpAccessLogger) defer func() { if server != nil { server.Stop() @@ -285,7 +297,7 @@ func TestServer_GetCluster(t *testing.T) { time.Sleep(5 * time.Second) // create gRPC client - client, err := NewGRPCClient(nodeConfig.GRPCAddr) + client, err := NewGRPCClient(node.Metadata.GrpcAddress) defer func() { if client != nil { err = client.Close() @@ -303,10 +315,16 @@ func TestServer_GetCluster(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - expCluster := map[string]interface{}{ - nodeConfig.NodeId: map[string]interface{}{ - "node_config": nodeConfig.ToMap(), - "state": "Leader", + expCluster := &management.Cluster{ + Nodes: map[string]*management.Node{ + nodeId: { + BindAddress: bindAddress, + Status: raft.Leader.String(), + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress, + HttpAddress: httpAddress, + }, + }, }, } actCluster := cluster @@ -315,259 +333,37 @@ func TestServer_GetCluster(t *testing.T) { } } -func TestServer_GetIndexMapping(t *testing.T) { - curDir, _ := os.Getwd() - - // create logger - logger := logutils.NewLogger("WARN", "", 500, 3, 30, false) - - // create gRPC logger - grpcLogger := logutils.NewLogger("WARN", "", 500, 3, 30, false) - - // create HTTP access logger - httpAccessLogger := logutils.NewApacheCombinedLogger("", 500, 3, 30, false) - - // create cluster config - clusterConfig := config.DefaultClusterConfig() - - // create node config - nodeConfig := testutils.TmpNodeConfig() - defer func() { - _ = os.RemoveAll(nodeConfig.DataDir) - }() - - // create index config - indexConfig, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") - if err != nil { - t.Fatalf("%v", err) - } - - // create server - server, err := NewServer(clusterConfig, nodeConfig, indexConfig, logger, grpcLogger, httpAccessLogger) - defer func() { - if server != nil { - server.Stop() - } - }() - if err != nil { - t.Fatalf("%v", err) - } - - // start server - server.Start() - - // sleep - time.Sleep(5 * time.Second) - - // create gRPC client - client, err := NewGRPCClient(nodeConfig.GRPCAddr) - defer func() { - if client != nil { - err = client.Close() - if err != nil { - t.Fatalf("%v", err) - } - } - }() - if err != nil { - t.Fatalf("%v", err) - } - - expIndexMapping := indexConfig.IndexMapping - if err != nil { - t.Fatalf("%v", err) - } - - actIntr, err := client.Get("index_config/index_mapping") - if err != nil { - t.Fatalf("%v", err) - } - - actIndexMapping, err := indexutils.NewIndexMappingFromMap(*actIntr.(*map[string]interface{})) - if err != nil { - t.Fatalf("%v", err) - } - - if !reflect.DeepEqual(expIndexMapping, actIndexMapping) { - t.Fatalf("expected content to see %v, saw %v", expIndexMapping, actIndexMapping) - } -} - -func TestServer_GetIndexType(t *testing.T) { - curDir, _ := os.Getwd() - - // create logger - logger := logutils.NewLogger("WARN", "", 500, 3, 30, false) - - // create gRPC logger - grpcLogger := logutils.NewLogger("WARN", "", 500, 3, 30, false) - - // create HTTP access logger - httpAccessLogger := logutils.NewApacheCombinedLogger("", 500, 3, 30, false) - - // create cluster config - clusterConfig := config.DefaultClusterConfig() - - // create node config - nodeConfig := testutils.TmpNodeConfig() - defer func() { - _ = os.RemoveAll(nodeConfig.DataDir) - }() - - // create index config - indexConfig, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") - if err != nil { - t.Fatalf("%v", err) - } - - // create server - server, err := NewServer(clusterConfig, nodeConfig, indexConfig, logger, grpcLogger, httpAccessLogger) - defer func() { - if server != nil { - server.Stop() - } - }() - if err != nil { - t.Fatalf("%v", err) - } - - // start server - server.Start() - - // sleep - time.Sleep(5 * time.Second) - - // create gRPC client - client, err := NewGRPCClient(nodeConfig.GRPCAddr) - defer func() { - if client != nil { - err = client.Close() - if err != nil { - t.Fatalf("%v", err) - } - } - }() - if err != nil { - t.Fatalf("%v", err) - } - - expIndexType := indexConfig.IndexType - if err != nil { - t.Fatalf("%v", err) - } - - actIndexType, err := client.Get("index_config/index_type") - if err != nil { - t.Fatalf("%v", err) - } - - if expIndexType != *actIndexType.(*string) { - t.Fatalf("expected content to see %v, saw %v", expIndexType, *actIndexType.(*string)) - } -} - -func TestServer_GetIndexStorageType(t *testing.T) { - curDir, _ := os.Getwd() - - // create logger - logger := logutils.NewLogger("WARN", "", 500, 3, 30, false) - - // create gRPC logger - grpcLogger := logutils.NewLogger("WARN", "", 500, 3, 30, false) - - // create HTTP access logger - httpAccessLogger := logutils.NewApacheCombinedLogger("", 500, 3, 30, false) - - // create cluster config - clusterConfig := config.DefaultClusterConfig() - - // create node config - nodeConfig := testutils.TmpNodeConfig() - defer func() { - _ = os.RemoveAll(nodeConfig.DataDir) - }() - - // create index config - indexConfig, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") - if err != nil { - t.Fatalf("%v", err) - } - - // create server - server, err := NewServer(clusterConfig, nodeConfig, indexConfig, logger, grpcLogger, httpAccessLogger) - defer func() { - if server != nil { - server.Stop() - } - }() - if err != nil { - t.Fatalf("%v", err) - } - - // start server - server.Start() - - // sleep - time.Sleep(5 * time.Second) - - // create gRPC client - client, err := NewGRPCClient(nodeConfig.GRPCAddr) - defer func() { - if client != nil { - err = client.Close() - if err != nil { - t.Fatalf("%v", err) - } - } - }() - if err != nil { - t.Fatalf("%v", err) - } - - expIndexStorageType := indexConfig.IndexStorageType - if err != nil { - t.Fatalf("%v", err) - } - - actIndexStorageType, err := client.Get("index_config/index_storage_type") - if err != nil { - t.Fatalf("%v", err) - } - - if expIndexStorageType != *actIndexStorageType.(*string) { - t.Fatalf("expected content to see %v, saw %v", expIndexStorageType, *actIndexStorageType.(*string)) - } -} - func TestServer_SetState(t *testing.T) { curDir, _ := os.Getwd() - // create logger logger := logutils.NewLogger("WARN", "", 500, 3, 30, false) - - // create gRPC logger grpcLogger := logutils.NewLogger("WARN", "", 500, 3, 30, false) - - // create HTTP access logger httpAccessLogger := logutils.NewApacheCombinedLogger("", 500, 3, 30, false) - // create cluster config - clusterConfig := config.DefaultClusterConfig() - - // create node config - nodeConfig := testutils.TmpNodeConfig() - defer func() { - _ = os.RemoveAll(nodeConfig.DataDir) - }() + peerGrpcAddress := "" + grpcAddress := fmt.Sprintf(":%d", testutils.TmpPort()) + httpAddress := fmt.Sprintf(":%d", testutils.TmpPort()) + nodeId := fmt.Sprintf("node-%s", strutils.RandStr(5)) + bindAddress := fmt.Sprintf(":%d", testutils.TmpPort()) + dataDir := testutils.TmpDir() + raftStorageType := "boltdb" + + node := &management.Node{ + BindAddress: bindAddress, + Status: "", + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress, + HttpAddress: httpAddress, + }, + } - // create index config indexConfig, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") if err != nil { t.Fatalf("%v", err) } // create server - server, err := NewServer(clusterConfig, nodeConfig, indexConfig, logger, grpcLogger, httpAccessLogger) + server, err := NewServer(peerGrpcAddress, nodeId, node, dataDir, raftStorageType, indexConfig, logger, grpcLogger, httpAccessLogger) defer func() { if server != nil { server.Stop() @@ -584,7 +380,7 @@ func TestServer_SetState(t *testing.T) { time.Sleep(5 * time.Second) // create gRPC client - client, err := NewGRPCClient(nodeConfig.GRPCAddr) + client, err := NewGRPCClient(node.Metadata.GrpcAddress) defer func() { if client != nil { err = client.Close() @@ -621,32 +417,34 @@ func TestServer_SetState(t *testing.T) { func TestServer_GetState(t *testing.T) { curDir, _ := os.Getwd() - // create logger logger := logutils.NewLogger("WARN", "", 500, 3, 30, false) - - // create gRPC logger grpcLogger := logutils.NewLogger("WARN", "", 500, 3, 30, false) - - // create HTTP access logger httpAccessLogger := logutils.NewApacheCombinedLogger("", 500, 3, 30, false) - // create cluster config - clusterConfig := config.DefaultClusterConfig() - - // create node config - nodeConfig := testutils.TmpNodeConfig() - defer func() { - _ = os.RemoveAll(nodeConfig.DataDir) - }() + peerGrpcAddress := "" + grpcAddress := fmt.Sprintf(":%d", testutils.TmpPort()) + httpAddress := fmt.Sprintf(":%d", testutils.TmpPort()) + nodeId := fmt.Sprintf("node-%s", strutils.RandStr(5)) + bindAddress := fmt.Sprintf(":%d", testutils.TmpPort()) + dataDir := testutils.TmpDir() + raftStorageType := "boltdb" + + node := &management.Node{ + BindAddress: bindAddress, + Status: "", + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress, + HttpAddress: httpAddress, + }, + } - // create index config indexConfig, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") if err != nil { t.Fatalf("%v", err) } // create server - server, err := NewServer(clusterConfig, nodeConfig, indexConfig, logger, grpcLogger, httpAccessLogger) + server, err := NewServer(peerGrpcAddress, nodeId, node, dataDir, raftStorageType, indexConfig, logger, grpcLogger, httpAccessLogger) defer func() { if server != nil { server.Stop() @@ -663,7 +461,7 @@ func TestServer_GetState(t *testing.T) { time.Sleep(5 * time.Second) // create gRPC client - client, err := NewGRPCClient(nodeConfig.GRPCAddr) + client, err := NewGRPCClient(node.Metadata.GrpcAddress) defer func() { if client != nil { err = client.Close() @@ -700,32 +498,34 @@ func TestServer_GetState(t *testing.T) { func TestServer_DeleteState(t *testing.T) { curDir, _ := os.Getwd() - // create logger logger := logutils.NewLogger("WARN", "", 500, 3, 30, false) - - // create gRPC logger grpcLogger := logutils.NewLogger("WARN", "", 500, 3, 30, false) - - // create HTTP access logger httpAccessLogger := logutils.NewApacheCombinedLogger("", 500, 3, 30, false) - // create cluster config - clusterConfig := config.DefaultClusterConfig() - - // create node config - nodeConfig := testutils.TmpNodeConfig() - defer func() { - _ = os.RemoveAll(nodeConfig.DataDir) - }() + peerGrpcAddress := "" + grpcAddress := fmt.Sprintf(":%d", testutils.TmpPort()) + httpAddress := fmt.Sprintf(":%d", testutils.TmpPort()) + nodeId := fmt.Sprintf("node-%s", strutils.RandStr(5)) + bindAddress := fmt.Sprintf(":%d", testutils.TmpPort()) + dataDir := testutils.TmpDir() + raftStorageType := "boltdb" + + node := &management.Node{ + BindAddress: bindAddress, + Status: "", + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress, + HttpAddress: httpAddress, + }, + } - // create index config indexConfig, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") if err != nil { t.Fatalf("%v", err) } // create server - server, err := NewServer(clusterConfig, nodeConfig, indexConfig, logger, grpcLogger, httpAccessLogger) + server, err := NewServer(peerGrpcAddress, nodeId, node, dataDir, raftStorageType, indexConfig, logger, grpcLogger, httpAccessLogger) defer func() { if server != nil { server.Stop() @@ -742,7 +542,7 @@ func TestServer_DeleteState(t *testing.T) { time.Sleep(5 * time.Second) // create gRPC client - client, err := NewGRPCClient(nodeConfig.GRPCAddr) + client, err := NewGRPCClient(node.Metadata.GrpcAddress) defer func() { if client != nil { err = client.Close() @@ -800,29 +600,34 @@ func TestServer_DeleteState(t *testing.T) { func TestCluster_Start(t *testing.T) { curDir, _ := os.Getwd() - // create logger logger := logutils.NewLogger("WARN", "", 500, 3, 30, false) - - // create gRPC logger grpcLogger := logutils.NewLogger("WARN", "", 500, 3, 30, false) - - // create HTTP access logger httpAccessLogger := logutils.NewApacheCombinedLogger("", 500, 3, 30, false) - // create index config - indexConfig, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") + peerGrpcAddress1 := "" + grpcAddress1 := fmt.Sprintf(":%d", testutils.TmpPort()) + httpAddress1 := fmt.Sprintf(":%d", testutils.TmpPort()) + nodeId1 := fmt.Sprintf("node-%s", strutils.RandStr(5)) + bindAddress1 := fmt.Sprintf(":%d", testutils.TmpPort()) + dataDir1 := testutils.TmpDir() + raftStorageType1 := "boltdb" + + node1 := &management.Node{ + BindAddress: bindAddress1, + Status: "", + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress1, + HttpAddress: httpAddress1, + }, + } + + indexConfig1, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") if err != nil { t.Fatalf("%v", err) } - // create configs for server1 - clusterConfig1 := config.DefaultClusterConfig() - nodeConfig1 := testutils.TmpNodeConfig() - defer func() { - _ = os.RemoveAll(nodeConfig1.DataDir) - }() - // create server1 - server1, err := NewServer(clusterConfig1, nodeConfig1, indexConfig, logger.Named("manager1"), grpcLogger, httpAccessLogger) + // create server + server1, err := NewServer(peerGrpcAddress1, nodeId1, node1, dataDir1, raftStorageType1, indexConfig1, logger, grpcLogger, httpAccessLogger) defer func() { if server1 != nil { server1.Stop() @@ -831,18 +636,34 @@ func TestCluster_Start(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - // start server1 + + // start server server1.Start() - // create configs for server2 - clusterConfig2 := config.DefaultClusterConfig() - clusterConfig2.PeerAddr = nodeConfig1.GRPCAddr - nodeConfig2 := testutils.TmpNodeConfig() - defer func() { - _ = os.RemoveAll(nodeConfig2.DataDir) - }() - // create server2 - server2, err := NewServer(clusterConfig2, nodeConfig2, config.DefaultIndexConfig(), logger.Named("manager2"), grpcLogger, httpAccessLogger) + peerGrpcAddress2 := grpcAddress1 + grpcAddress2 := fmt.Sprintf(":%d", testutils.TmpPort()) + httpAddress2 := fmt.Sprintf(":%d", testutils.TmpPort()) + nodeId2 := fmt.Sprintf("node-%s", strutils.RandStr(5)) + bindAddress2 := fmt.Sprintf(":%d", testutils.TmpPort()) + dataDir2 := testutils.TmpDir() + raftStorageType2 := "boltdb" + + node2 := &management.Node{ + BindAddress: bindAddress2, + Status: "", + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress2, + HttpAddress: httpAddress2, + }, + } + + indexConfig2, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") + if err != nil { + t.Fatalf("%v", err) + } + + // create server + server2, err := NewServer(peerGrpcAddress2, nodeId2, node2, dataDir2, raftStorageType2, indexConfig2, logger, grpcLogger, httpAccessLogger) defer func() { if server2 != nil { server2.Stop() @@ -851,18 +672,34 @@ func TestCluster_Start(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - // start server2 + + // start server server2.Start() - // create configs for server3 - clusterConfig3 := config.DefaultClusterConfig() - clusterConfig3.PeerAddr = nodeConfig1.GRPCAddr - nodeConfig3 := testutils.TmpNodeConfig() - defer func() { - _ = os.RemoveAll(nodeConfig3.DataDir) - }() - // create server3 - server3, err := NewServer(clusterConfig3, nodeConfig3, config.DefaultIndexConfig(), logger.Named("manager3"), grpcLogger, httpAccessLogger) + peerGrpcAddress3 := grpcAddress1 + grpcAddress3 := fmt.Sprintf(":%d", testutils.TmpPort()) + httpAddress3 := fmt.Sprintf(":%d", testutils.TmpPort()) + nodeId3 := fmt.Sprintf("node-%s", strutils.RandStr(5)) + bindAddress3 := fmt.Sprintf(":%d", testutils.TmpPort()) + dataDir3 := testutils.TmpDir() + raftStorageType3 := "boltdb" + + node3 := &management.Node{ + BindAddress: bindAddress3, + Status: "", + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress3, + HttpAddress: httpAddress3, + }, + } + + indexConfig3, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") + if err != nil { + t.Fatalf("%v", err) + } + + // create server + server3, err := NewServer(peerGrpcAddress3, nodeId3, node3, dataDir3, raftStorageType3, indexConfig3, logger, grpcLogger, httpAccessLogger) defer func() { if server3 != nil { server3.Stop() @@ -871,7 +708,8 @@ func TestCluster_Start(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - // start server3 + + // start server server3.Start() // sleep @@ -881,29 +719,34 @@ func TestCluster_Start(t *testing.T) { func TestCluster_HealthCheck(t *testing.T) { curDir, _ := os.Getwd() - // create logger logger := logutils.NewLogger("WARN", "", 500, 3, 30, false) - - // create gRPC logger grpcLogger := logutils.NewLogger("WARN", "", 500, 3, 30, false) - - // create HTTP access logger httpAccessLogger := logutils.NewApacheCombinedLogger("", 500, 3, 30, false) - // create index config - indexConfig, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") + peerGrpcAddress1 := "" + grpcAddress1 := fmt.Sprintf(":%d", testutils.TmpPort()) + httpAddress1 := fmt.Sprintf(":%d", testutils.TmpPort()) + nodeId1 := fmt.Sprintf("node-%s", strutils.RandStr(5)) + bindAddress1 := fmt.Sprintf(":%d", testutils.TmpPort()) + dataDir1 := testutils.TmpDir() + raftStorageType1 := "boltdb" + + node1 := &management.Node{ + BindAddress: bindAddress1, + Status: "", + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress1, + HttpAddress: httpAddress1, + }, + } + + indexConfig1, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") if err != nil { t.Fatalf("%v", err) } - // create configs for server1 - clusterConfig1 := config.DefaultClusterConfig() - nodeConfig1 := testutils.TmpNodeConfig() - defer func() { - _ = os.RemoveAll(nodeConfig1.DataDir) - }() - // create server1 - server1, err := NewServer(clusterConfig1, nodeConfig1, indexConfig, logger.Named("manager1"), grpcLogger, httpAccessLogger) + // create server + server1, err := NewServer(peerGrpcAddress1, nodeId1, node1, dataDir1, raftStorageType1, indexConfig1, logger, grpcLogger, httpAccessLogger) defer func() { if server1 != nil { server1.Stop() @@ -912,18 +755,34 @@ func TestCluster_HealthCheck(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - // start server1 + + // start server server1.Start() - // create configs for server2 - clusterConfig2 := config.DefaultClusterConfig() - clusterConfig2.PeerAddr = nodeConfig1.GRPCAddr - nodeConfig2 := testutils.TmpNodeConfig() - defer func() { - _ = os.RemoveAll(nodeConfig2.DataDir) - }() - // create server2 - server2, err := NewServer(clusterConfig2, nodeConfig2, config.DefaultIndexConfig(), logger.Named("manager2"), grpcLogger, httpAccessLogger) + peerGrpcAddress2 := grpcAddress1 + grpcAddress2 := fmt.Sprintf(":%d", testutils.TmpPort()) + httpAddress2 := fmt.Sprintf(":%d", testutils.TmpPort()) + nodeId2 := fmt.Sprintf("node-%s", strutils.RandStr(5)) + bindAddress2 := fmt.Sprintf(":%d", testutils.TmpPort()) + dataDir2 := testutils.TmpDir() + raftStorageType2 := "boltdb" + + node2 := &management.Node{ + BindAddress: bindAddress2, + Status: "", + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress2, + HttpAddress: httpAddress2, + }, + } + + indexConfig2, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") + if err != nil { + t.Fatalf("%v", err) + } + + // create server + server2, err := NewServer(peerGrpcAddress2, nodeId2, node2, dataDir2, raftStorageType2, indexConfig2, logger, grpcLogger, httpAccessLogger) defer func() { if server2 != nil { server2.Stop() @@ -932,18 +791,34 @@ func TestCluster_HealthCheck(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - // start server2 + + // start server server2.Start() - // create configs for server3 - clusterConfig3 := config.DefaultClusterConfig() - clusterConfig3.PeerAddr = nodeConfig1.GRPCAddr - nodeConfig3 := testutils.TmpNodeConfig() - defer func() { - _ = os.RemoveAll(nodeConfig3.DataDir) - }() - // create server3 - server3, err := NewServer(clusterConfig3, nodeConfig3, config.DefaultIndexConfig(), logger.Named("manager3"), grpcLogger, httpAccessLogger) + peerGrpcAddress3 := grpcAddress1 + grpcAddress3 := fmt.Sprintf(":%d", testutils.TmpPort()) + httpAddress3 := fmt.Sprintf(":%d", testutils.TmpPort()) + nodeId3 := fmt.Sprintf("node-%s", strutils.RandStr(5)) + bindAddress3 := fmt.Sprintf(":%d", testutils.TmpPort()) + dataDir3 := testutils.TmpDir() + raftStorageType3 := "boltdb" + + node3 := &management.Node{ + BindAddress: bindAddress3, + Status: "", + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress3, + HttpAddress: httpAddress3, + }, + } + + indexConfig3, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") + if err != nil { + t.Fatalf("%v", err) + } + + // create server + server3, err := NewServer(peerGrpcAddress3, nodeId3, node3, dataDir3, raftStorageType3, indexConfig3, logger, grpcLogger, httpAccessLogger) defer func() { if server3 != nil { server3.Stop() @@ -952,28 +827,29 @@ func TestCluster_HealthCheck(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - // start server3 + + // start server server3.Start() // sleep time.Sleep(5 * time.Second) // gRPC client for all servers - client1, err := NewGRPCClient(nodeConfig1.GRPCAddr) + client1, err := NewGRPCClient(node1.Metadata.GrpcAddress) defer func() { _ = client1.Close() }() if err != nil { t.Fatalf("%v", err) } - client2, err := NewGRPCClient(nodeConfig2.GRPCAddr) + client2, err := NewGRPCClient(node2.Metadata.GrpcAddress) defer func() { _ = client2.Close() }() if err != nil { t.Fatalf("%v", err) } - client3, err := NewGRPCClient(nodeConfig3.GRPCAddr) + client3, err := NewGRPCClient(node3.Metadata.GrpcAddress) defer func() { _ = client3.Close() }() @@ -1085,29 +961,34 @@ func TestCluster_HealthCheck(t *testing.T) { func TestCluster_GetNode(t *testing.T) { curDir, _ := os.Getwd() - // create logger logger := logutils.NewLogger("WARN", "", 500, 3, 30, false) - - // create gRPC logger grpcLogger := logutils.NewLogger("WARN", "", 500, 3, 30, false) - - // create HTTP access logger httpAccessLogger := logutils.NewApacheCombinedLogger("", 500, 3, 30, false) - // create index config - indexConfig, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") + peerGrpcAddress1 := "" + grpcAddress1 := fmt.Sprintf(":%d", testutils.TmpPort()) + httpAddress1 := fmt.Sprintf(":%d", testutils.TmpPort()) + nodeId1 := fmt.Sprintf("node-%s", strutils.RandStr(5)) + bindAddress1 := fmt.Sprintf(":%d", testutils.TmpPort()) + dataDir1 := testutils.TmpDir() + raftStorageType1 := "boltdb" + + node1 := &management.Node{ + BindAddress: bindAddress1, + Status: "", + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress1, + HttpAddress: httpAddress1, + }, + } + + indexConfig1, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") if err != nil { t.Fatalf("%v", err) } - // create configs for server1 - clusterConfig1 := config.DefaultClusterConfig() - nodeConfig1 := testutils.TmpNodeConfig() - defer func() { - _ = os.RemoveAll(nodeConfig1.DataDir) - }() - // create server1 - server1, err := NewServer(clusterConfig1, nodeConfig1, indexConfig, logger.Named("manager1"), grpcLogger, httpAccessLogger) + // create server + server1, err := NewServer(peerGrpcAddress1, nodeId1, node1, dataDir1, raftStorageType1, indexConfig1, logger, grpcLogger, httpAccessLogger) defer func() { if server1 != nil { server1.Stop() @@ -1116,18 +997,34 @@ func TestCluster_GetNode(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - // start server1 + + // start server server1.Start() - // create configs for server2 - clusterConfig2 := config.DefaultClusterConfig() - clusterConfig2.PeerAddr = nodeConfig1.GRPCAddr - nodeConfig2 := testutils.TmpNodeConfig() - defer func() { - _ = os.RemoveAll(nodeConfig2.DataDir) - }() - // create server2 - server2, err := NewServer(clusterConfig2, nodeConfig2, config.DefaultIndexConfig(), logger.Named("manager2"), grpcLogger, httpAccessLogger) + peerGrpcAddress2 := grpcAddress1 + grpcAddress2 := fmt.Sprintf(":%d", testutils.TmpPort()) + httpAddress2 := fmt.Sprintf(":%d", testutils.TmpPort()) + nodeId2 := fmt.Sprintf("node-%s", strutils.RandStr(5)) + bindAddress2 := fmt.Sprintf(":%d", testutils.TmpPort()) + dataDir2 := testutils.TmpDir() + raftStorageType2 := "boltdb" + + node2 := &management.Node{ + BindAddress: bindAddress2, + Status: "", + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress2, + HttpAddress: httpAddress2, + }, + } + + indexConfig2, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") + if err != nil { + t.Fatalf("%v", err) + } + + // create server + server2, err := NewServer(peerGrpcAddress2, nodeId2, node2, dataDir2, raftStorageType2, indexConfig2, logger, grpcLogger, httpAccessLogger) defer func() { if server2 != nil { server2.Stop() @@ -1136,18 +1033,34 @@ func TestCluster_GetNode(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - // start server2 + + // start server server2.Start() - // create configs for server3 - clusterConfig3 := config.DefaultClusterConfig() - clusterConfig3.PeerAddr = nodeConfig1.GRPCAddr - nodeConfig3 := testutils.TmpNodeConfig() - defer func() { - _ = os.RemoveAll(nodeConfig3.DataDir) - }() - // create server3 - server3, err := NewServer(clusterConfig3, nodeConfig3, config.DefaultIndexConfig(), logger.Named("manager3"), grpcLogger, httpAccessLogger) + peerGrpcAddress3 := grpcAddress1 + grpcAddress3 := fmt.Sprintf(":%d", testutils.TmpPort()) + httpAddress3 := fmt.Sprintf(":%d", testutils.TmpPort()) + nodeId3 := fmt.Sprintf("node-%s", strutils.RandStr(5)) + bindAddress3 := fmt.Sprintf(":%d", testutils.TmpPort()) + dataDir3 := testutils.TmpDir() + raftStorageType3 := "boltdb" + + node3 := &management.Node{ + BindAddress: bindAddress3, + Status: "", + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress3, + HttpAddress: httpAddress3, + }, + } + + indexConfig3, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") + if err != nil { + t.Fatalf("%v", err) + } + + // create server + server3, err := NewServer(peerGrpcAddress3, nodeId3, node3, dataDir3, raftStorageType3, indexConfig3, logger, grpcLogger, httpAccessLogger) defer func() { if server3 != nil { server3.Stop() @@ -1156,28 +1069,29 @@ func TestCluster_GetNode(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - // start server3 + + // start server server3.Start() // sleep time.Sleep(5 * time.Second) // gRPC client for all servers - client1, err := NewGRPCClient(nodeConfig1.GRPCAddr) + client1, err := NewGRPCClient(node1.Metadata.GrpcAddress) defer func() { _ = client1.Close() }() if err != nil { t.Fatalf("%v", err) } - client2, err := NewGRPCClient(nodeConfig2.GRPCAddr) + client2, err := NewGRPCClient(node2.Metadata.GrpcAddress) defer func() { _ = client2.Close() }() if err != nil { t.Fatalf("%v", err) } - client3, err := NewGRPCClient(nodeConfig3.GRPCAddr) + client3, err := NewGRPCClient(node3.Metadata.GrpcAddress) defer func() { _ = client3.Close() }() @@ -1186,117 +1100,153 @@ func TestCluster_GetNode(t *testing.T) { } // get all node info from all nodes - node11, err := client1.NodeInfo(nodeConfig1.NodeId) + node11, err := client1.NodeInfo(nodeId1) if err != nil { t.Fatalf("%v", err) } - expNode11 := map[string]interface{}{ - "node_config": server1.nodeConfig.ToMap(), - "state": raft.Leader.String(), + expNode11 := &management.Node{ + BindAddress: bindAddress1, + Status: raft.Leader.String(), + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress1, + HttpAddress: httpAddress1, + }, } actNode11 := node11 if !reflect.DeepEqual(expNode11, actNode11) { t.Fatalf("expected content to see %v, saw %v", expNode11, actNode11) } - node12, err := client1.NodeInfo(nodeConfig2.NodeId) + node12, err := client1.NodeInfo(nodeId2) if err != nil { t.Fatalf("%v", err) } - expNode12 := map[string]interface{}{ - "node_config": server2.nodeConfig.ToMap(), - "state": raft.Follower.String(), + expNode12 := &management.Node{ + BindAddress: bindAddress2, + Status: raft.Follower.String(), + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress2, + HttpAddress: httpAddress2, + }, } actNode12 := node12 if !reflect.DeepEqual(expNode12, actNode12) { t.Fatalf("expected content to see %v, saw %v", expNode12, actNode12) } - node13, err := client1.NodeInfo(nodeConfig3.NodeId) + node13, err := client1.NodeInfo(nodeId3) if err != nil { t.Fatalf("%v", err) } - expNode13 := map[string]interface{}{ - "node_config": server3.nodeConfig.ToMap(), - "state": raft.Follower.String(), + expNode13 := &management.Node{ + BindAddress: bindAddress3, + Status: raft.Follower.String(), + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress3, + HttpAddress: httpAddress3, + }, } actNode13 := node13 if !reflect.DeepEqual(expNode13, actNode13) { t.Fatalf("expected content to see %v, saw %v", expNode13, actNode13) } - node21, err := client2.NodeInfo(nodeConfig1.NodeId) + node21, err := client2.NodeInfo(nodeId1) if err != nil { t.Fatalf("%v", err) } - expNode21 := map[string]interface{}{ - "node_config": server1.nodeConfig.ToMap(), - "state": raft.Leader.String(), + expNode21 := &management.Node{ + BindAddress: bindAddress1, + Status: raft.Leader.String(), + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress1, + HttpAddress: httpAddress1, + }, } actNode21 := node21 if !reflect.DeepEqual(expNode21, actNode21) { t.Fatalf("expected content to see %v, saw %v", expNode21, actNode21) } - node22, err := client2.NodeInfo(nodeConfig2.NodeId) + node22, err := client2.NodeInfo(nodeId2) if err != nil { t.Fatalf("%v", err) } - expNode22 := map[string]interface{}{ - "node_config": server2.nodeConfig.ToMap(), - "state": raft.Follower.String(), + expNode22 := &management.Node{ + BindAddress: bindAddress2, + Status: raft.Follower.String(), + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress2, + HttpAddress: httpAddress2, + }, } actNode22 := node22 if !reflect.DeepEqual(expNode22, actNode22) { t.Fatalf("expected content to see %v, saw %v", expNode22, actNode22) } - node23, err := client2.NodeInfo(nodeConfig3.NodeId) + node23, err := client2.NodeInfo(nodeId3) if err != nil { t.Fatalf("%v", err) } - expNode23 := map[string]interface{}{ - "node_config": server3.nodeConfig.ToMap(), - "state": raft.Follower.String(), + expNode23 := &management.Node{ + BindAddress: bindAddress3, + Status: raft.Follower.String(), + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress3, + HttpAddress: httpAddress3, + }, } actNode23 := node23 if !reflect.DeepEqual(expNode23, actNode23) { t.Fatalf("expected content to see %v, saw %v", expNode23, actNode23) } - node31, err := client3.NodeInfo(nodeConfig1.NodeId) + node31, err := client3.NodeInfo(nodeId1) if err != nil { t.Fatalf("%v", err) } - expNode31 := map[string]interface{}{ - "node_config": server1.nodeConfig.ToMap(), - "state": raft.Leader.String(), + expNode31 := &management.Node{ + BindAddress: bindAddress1, + Status: raft.Leader.String(), + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress1, + HttpAddress: httpAddress1, + }, } actNode31 := node31 if !reflect.DeepEqual(expNode31, actNode31) { t.Fatalf("expected content to see %v, saw %v", expNode31, actNode31) } - node32, err := client3.NodeInfo(nodeConfig2.NodeId) + node32, err := client3.NodeInfo(nodeId2) if err != nil { t.Fatalf("%v", err) } - expNode32 := map[string]interface{}{ - "node_config": server2.nodeConfig.ToMap(), - "state": raft.Follower.String(), + expNode32 := &management.Node{ + BindAddress: bindAddress2, + Status: raft.Follower.String(), + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress2, + HttpAddress: httpAddress2, + }, } actNode32 := node32 if !reflect.DeepEqual(expNode32, actNode32) { t.Fatalf("expected content to see %v, saw %v", expNode32, actNode32) } - node33, err := client3.NodeInfo(nodeConfig3.NodeId) + node33, err := client3.NodeInfo(nodeId3) if err != nil { t.Fatalf("%v", err) } - expNode33 := map[string]interface{}{ - "node_config": server3.nodeConfig.ToMap(), - "state": raft.Follower.String(), + expNode33 := &management.Node{ + BindAddress: bindAddress3, + Status: raft.Follower.String(), + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress3, + HttpAddress: httpAddress3, + }, } actNode33 := node33 if !reflect.DeepEqual(expNode33, actNode33) { @@ -1307,29 +1257,34 @@ func TestCluster_GetNode(t *testing.T) { func TestCluster_GetCluster(t *testing.T) { curDir, _ := os.Getwd() - // create logger logger := logutils.NewLogger("WARN", "", 500, 3, 30, false) - - // create gRPC logger grpcLogger := logutils.NewLogger("WARN", "", 500, 3, 30, false) - - // create HTTP access logger httpAccessLogger := logutils.NewApacheCombinedLogger("", 500, 3, 30, false) - // create index config - indexConfig, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") + peerGrpcAddress1 := "" + grpcAddress1 := fmt.Sprintf(":%d", testutils.TmpPort()) + httpAddress1 := fmt.Sprintf(":%d", testutils.TmpPort()) + nodeId1 := fmt.Sprintf("node-%s", strutils.RandStr(5)) + bindAddress1 := fmt.Sprintf(":%d", testutils.TmpPort()) + dataDir1 := testutils.TmpDir() + raftStorageType1 := "boltdb" + + node1 := &management.Node{ + BindAddress: bindAddress1, + Status: "", + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress1, + HttpAddress: httpAddress1, + }, + } + + indexConfig1, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") if err != nil { t.Fatalf("%v", err) } - // create configs for server1 - clusterConfig1 := config.DefaultClusterConfig() - nodeConfig1 := testutils.TmpNodeConfig() - defer func() { - _ = os.RemoveAll(nodeConfig1.DataDir) - }() - // create server1 - server1, err := NewServer(clusterConfig1, nodeConfig1, indexConfig, logger.Named("manager1"), grpcLogger, httpAccessLogger) + // create server + server1, err := NewServer(peerGrpcAddress1, nodeId1, node1, dataDir1, raftStorageType1, indexConfig1, logger, grpcLogger, httpAccessLogger) defer func() { if server1 != nil { server1.Stop() @@ -1338,18 +1293,34 @@ func TestCluster_GetCluster(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - // start server1 + + // start server server1.Start() - // create configs for server2 - clusterConfig2 := config.DefaultClusterConfig() - clusterConfig2.PeerAddr = nodeConfig1.GRPCAddr - nodeConfig2 := testutils.TmpNodeConfig() - defer func() { - _ = os.RemoveAll(nodeConfig2.DataDir) - }() - // create server2 - server2, err := NewServer(clusterConfig2, nodeConfig2, config.DefaultIndexConfig(), logger.Named("manager2"), grpcLogger, httpAccessLogger) + peerGrpcAddress2 := grpcAddress1 + grpcAddress2 := fmt.Sprintf(":%d", testutils.TmpPort()) + httpAddress2 := fmt.Sprintf(":%d", testutils.TmpPort()) + nodeId2 := fmt.Sprintf("node-%s", strutils.RandStr(5)) + bindAddress2 := fmt.Sprintf(":%d", testutils.TmpPort()) + dataDir2 := testutils.TmpDir() + raftStorageType2 := "boltdb" + + node2 := &management.Node{ + BindAddress: bindAddress2, + Status: "", + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress2, + HttpAddress: httpAddress2, + }, + } + + indexConfig2, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") + if err != nil { + t.Fatalf("%v", err) + } + + // create server + server2, err := NewServer(peerGrpcAddress2, nodeId2, node2, dataDir2, raftStorageType2, indexConfig2, logger, grpcLogger, httpAccessLogger) defer func() { if server2 != nil { server2.Stop() @@ -1358,18 +1329,34 @@ func TestCluster_GetCluster(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - // start server2 + + // start server server2.Start() - // create configs for server3 - clusterConfig3 := config.DefaultClusterConfig() - clusterConfig3.PeerAddr = nodeConfig1.GRPCAddr - nodeConfig3 := testutils.TmpNodeConfig() - defer func() { - _ = os.RemoveAll(nodeConfig3.DataDir) - }() - // create server3 - server3, err := NewServer(clusterConfig3, nodeConfig3, config.DefaultIndexConfig(), logger.Named("manager3"), grpcLogger, httpAccessLogger) + peerGrpcAddress3 := grpcAddress1 + grpcAddress3 := fmt.Sprintf(":%d", testutils.TmpPort()) + httpAddress3 := fmt.Sprintf(":%d", testutils.TmpPort()) + nodeId3 := fmt.Sprintf("node-%s", strutils.RandStr(5)) + bindAddress3 := fmt.Sprintf(":%d", testutils.TmpPort()) + dataDir3 := testutils.TmpDir() + raftStorageType3 := "boltdb" + + node3 := &management.Node{ + BindAddress: bindAddress3, + Status: "", + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress3, + HttpAddress: httpAddress3, + }, + } + + indexConfig3, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") + if err != nil { + t.Fatalf("%v", err) + } + + // create server + server3, err := NewServer(peerGrpcAddress3, nodeId3, node3, dataDir3, raftStorageType3, indexConfig3, logger, grpcLogger, httpAccessLogger) defer func() { if server3 != nil { server3.Stop() @@ -1378,28 +1365,29 @@ func TestCluster_GetCluster(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - // start server3 + + // start server server3.Start() // sleep time.Sleep(5 * time.Second) - // gRPC client for manager1 - client1, err := NewGRPCClient(nodeConfig1.GRPCAddr) + // gRPC client for all servers + client1, err := NewGRPCClient(node1.Metadata.GrpcAddress) defer func() { _ = client1.Close() }() if err != nil { t.Fatalf("%v", err) } - client2, err := NewGRPCClient(nodeConfig2.GRPCAddr) + client2, err := NewGRPCClient(node2.Metadata.GrpcAddress) defer func() { _ = client2.Close() }() if err != nil { t.Fatalf("%v", err) } - client3, err := NewGRPCClient(nodeConfig3.GRPCAddr) + client3, err := NewGRPCClient(node3.Metadata.GrpcAddress) defer func() { _ = client3.Close() }() @@ -1412,18 +1400,32 @@ func TestCluster_GetCluster(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - expCluster1 := map[string]interface{}{ - nodeConfig1.NodeId: map[string]interface{}{ - "node_config": nodeConfig1.ToMap(), - "state": raft.Leader.String(), - }, - nodeConfig2.NodeId: map[string]interface{}{ - "node_config": nodeConfig2.ToMap(), - "state": raft.Follower.String(), - }, - nodeConfig3.NodeId: map[string]interface{}{ - "node_config": nodeConfig3.ToMap(), - "state": raft.Follower.String(), + expCluster1 := &management.Cluster{ + Nodes: map[string]*management.Node{ + nodeId1: { + BindAddress: bindAddress1, + Status: raft.Leader.String(), + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress1, + HttpAddress: httpAddress1, + }, + }, + nodeId2: { + BindAddress: bindAddress2, + Status: raft.Follower.String(), + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress2, + HttpAddress: httpAddress2, + }, + }, + nodeId3: { + BindAddress: bindAddress3, + Status: raft.Follower.String(), + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress3, + HttpAddress: httpAddress3, + }, + }, }, } actCluster1 := cluster1 @@ -1435,18 +1437,32 @@ func TestCluster_GetCluster(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - expCluster2 := map[string]interface{}{ - nodeConfig1.NodeId: map[string]interface{}{ - "node_config": nodeConfig1.ToMap(), - "state": raft.Leader.String(), - }, - nodeConfig2.NodeId: map[string]interface{}{ - "node_config": nodeConfig2.ToMap(), - "state": raft.Follower.String(), - }, - nodeConfig3.NodeId: map[string]interface{}{ - "node_config": nodeConfig3.ToMap(), - "state": raft.Follower.String(), + expCluster2 := &management.Cluster{ + Nodes: map[string]*management.Node{ + nodeId1: { + BindAddress: bindAddress1, + Status: raft.Leader.String(), + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress1, + HttpAddress: httpAddress1, + }, + }, + nodeId2: { + BindAddress: bindAddress2, + Status: raft.Follower.String(), + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress2, + HttpAddress: httpAddress2, + }, + }, + nodeId3: { + BindAddress: bindAddress3, + Status: raft.Follower.String(), + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress3, + HttpAddress: httpAddress3, + }, + }, }, } actCluster2 := cluster2 @@ -1458,18 +1474,32 @@ func TestCluster_GetCluster(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - expCluster3 := map[string]interface{}{ - nodeConfig1.NodeId: map[string]interface{}{ - "node_config": nodeConfig1.ToMap(), - "state": raft.Leader.String(), - }, - nodeConfig2.NodeId: map[string]interface{}{ - "node_config": nodeConfig2.ToMap(), - "state": raft.Follower.String(), - }, - nodeConfig3.NodeId: map[string]interface{}{ - "node_config": nodeConfig3.ToMap(), - "state": raft.Follower.String(), + expCluster3 := &management.Cluster{ + Nodes: map[string]*management.Node{ + nodeId1: { + BindAddress: bindAddress1, + Status: raft.Leader.String(), + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress1, + HttpAddress: httpAddress1, + }, + }, + nodeId2: { + BindAddress: bindAddress2, + Status: raft.Follower.String(), + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress2, + HttpAddress: httpAddress2, + }, + }, + nodeId3: { + BindAddress: bindAddress3, + Status: raft.Follower.String(), + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress3, + HttpAddress: httpAddress3, + }, + }, }, } actCluster3 := cluster3 @@ -1478,32 +1508,37 @@ func TestCluster_GetCluster(t *testing.T) { } } -func TestCluster_GetState(t *testing.T) { +func TestCluster_SetState(t *testing.T) { curDir, _ := os.Getwd() - // create logger logger := logutils.NewLogger("WARN", "", 500, 3, 30, false) - - // create gRPC logger grpcLogger := logutils.NewLogger("WARN", "", 500, 3, 30, false) - - // create HTTP access logger httpAccessLogger := logutils.NewApacheCombinedLogger("", 500, 3, 30, false) - // create index config - indexConfig, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") + peerGrpcAddress1 := "" + grpcAddress1 := fmt.Sprintf(":%d", testutils.TmpPort()) + httpAddress1 := fmt.Sprintf(":%d", testutils.TmpPort()) + nodeId1 := fmt.Sprintf("node-%s", strutils.RandStr(5)) + bindAddress1 := fmt.Sprintf(":%d", testutils.TmpPort()) + dataDir1 := testutils.TmpDir() + raftStorageType1 := "boltdb" + + node1 := &management.Node{ + BindAddress: bindAddress1, + Status: "", + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress1, + HttpAddress: httpAddress1, + }, + } + + indexConfig1, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") if err != nil { t.Fatalf("%v", err) } - // create configs for server1 - clusterConfig1 := config.DefaultClusterConfig() - nodeConfig1 := testutils.TmpNodeConfig() - defer func() { - _ = os.RemoveAll(nodeConfig1.DataDir) - }() - // create server1 - server1, err := NewServer(clusterConfig1, nodeConfig1, indexConfig, logger.Named("manager1"), grpcLogger, httpAccessLogger) + // create server + server1, err := NewServer(peerGrpcAddress1, nodeId1, node1, dataDir1, raftStorageType1, indexConfig1, logger, grpcLogger, httpAccessLogger) defer func() { if server1 != nil { server1.Stop() @@ -1512,18 +1547,34 @@ func TestCluster_GetState(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - // start server1 + + // start server server1.Start() - // create configs for server2 - clusterConfig2 := config.DefaultClusterConfig() - clusterConfig2.PeerAddr = nodeConfig1.GRPCAddr - nodeConfig2 := testutils.TmpNodeConfig() - defer func() { - _ = os.RemoveAll(nodeConfig2.DataDir) - }() - // create server2 - server2, err := NewServer(clusterConfig2, nodeConfig2, config.DefaultIndexConfig(), logger.Named("manager2"), grpcLogger, httpAccessLogger) + peerGrpcAddress2 := grpcAddress1 + grpcAddress2 := fmt.Sprintf(":%d", testutils.TmpPort()) + httpAddress2 := fmt.Sprintf(":%d", testutils.TmpPort()) + nodeId2 := fmt.Sprintf("node-%s", strutils.RandStr(5)) + bindAddress2 := fmt.Sprintf(":%d", testutils.TmpPort()) + dataDir2 := testutils.TmpDir() + raftStorageType2 := "boltdb" + + node2 := &management.Node{ + BindAddress: bindAddress2, + Status: "", + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress2, + HttpAddress: httpAddress2, + }, + } + + indexConfig2, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") + if err != nil { + t.Fatalf("%v", err) + } + + // create server + server2, err := NewServer(peerGrpcAddress2, nodeId2, node2, dataDir2, raftStorageType2, indexConfig2, logger, grpcLogger, httpAccessLogger) defer func() { if server2 != nil { server2.Stop() @@ -1532,18 +1583,34 @@ func TestCluster_GetState(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - // start server2 + + // start server server2.Start() - // create configs for server3 - clusterConfig3 := config.DefaultClusterConfig() - clusterConfig3.PeerAddr = nodeConfig1.GRPCAddr - nodeConfig3 := testutils.TmpNodeConfig() - defer func() { - _ = os.RemoveAll(nodeConfig3.DataDir) - }() - // create server3 - server3, err := NewServer(clusterConfig3, nodeConfig3, config.DefaultIndexConfig(), logger.Named("manager3"), grpcLogger, httpAccessLogger) + peerGrpcAddress3 := grpcAddress1 + grpcAddress3 := fmt.Sprintf(":%d", testutils.TmpPort()) + httpAddress3 := fmt.Sprintf(":%d", testutils.TmpPort()) + nodeId3 := fmt.Sprintf("node-%s", strutils.RandStr(5)) + bindAddress3 := fmt.Sprintf(":%d", testutils.TmpPort()) + dataDir3 := testutils.TmpDir() + raftStorageType3 := "boltdb" + + node3 := &management.Node{ + BindAddress: bindAddress3, + Status: "", + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress3, + HttpAddress: httpAddress3, + }, + } + + indexConfig3, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") + if err != nil { + t.Fatalf("%v", err) + } + + // create server + server3, err := NewServer(peerGrpcAddress3, nodeId3, node3, dataDir3, raftStorageType3, indexConfig3, logger, grpcLogger, httpAccessLogger) defer func() { if server3 != nil { server3.Stop() @@ -1552,28 +1619,29 @@ func TestCluster_GetState(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - // start server3 + + // start server server3.Start() // sleep time.Sleep(5 * time.Second) - // gRPC client for manager1 - client1, err := NewGRPCClient(nodeConfig1.GRPCAddr) + // gRPC client for all servers + client1, err := NewGRPCClient(node1.Metadata.GrpcAddress) defer func() { _ = client1.Close() }() if err != nil { t.Fatalf("%v", err) } - client2, err := NewGRPCClient(nodeConfig2.GRPCAddr) + client2, err := NewGRPCClient(node2.Metadata.GrpcAddress) defer func() { _ = client2.Close() }() if err != nil { t.Fatalf("%v", err) } - client3, err := NewGRPCClient(nodeConfig3.GRPCAddr) + client3, err := NewGRPCClient(node3.Metadata.GrpcAddress) defer func() { _ = client3.Close() }() @@ -1581,64 +1649,143 @@ func TestCluster_GetState(t *testing.T) { t.Fatalf("%v", err) } - // get index mapping from all nodes - indexConfig1, err := client1.Get("index_config") + err = client1.Set("test/key1", "val1") + if err != nil { + t.Fatalf("%v", err) + } + time.Sleep(2 * time.Second) // wait for data to propagate + + // get value from all nodes + val11, err := client1.Get("test/key1") + if err != nil { + t.Fatalf("%v", err) + } + expVal11 := "val1" + actVal11 := *val11.(*string) + if expVal11 != actVal11 { + t.Fatalf("expected content to see %v, saw %v", expVal11, actVal11) + } + val21, err := client2.Get("test/key1") + if err != nil { + t.Fatalf("%v", err) + } + expVal21 := "val1" + actVal21 := *val21.(*string) + if expVal21 != actVal21 { + t.Fatalf("expected content to see %v, saw %v", expVal21, actVal21) + } + val31, err := client3.Get("test/key1") if err != nil { t.Fatalf("%v", err) } - expIndexConfig1 := indexConfig.ToMap() - actIndexConfig1 := *indexConfig1.(*map[string]interface{}) - if !reflect.DeepEqual(expIndexConfig1, actIndexConfig1) { - t.Fatalf("expected content to see %v, saw %v", expIndexConfig1, actIndexConfig1) + expVal31 := "val1" + actVal31 := *val31.(*string) + if expVal31 != actVal31 { + t.Fatalf("expected content to see %v, saw %v", expVal31, actVal31) + } + + err = client2.Set("test/key2", "val2") + if err != nil { + t.Fatalf("%v", err) } + time.Sleep(2 * time.Second) // wait for data to propagate - indexConfig2, err := client2.Get("index_config") + // get value from all nodes + val12, err := client1.Get("test/key2") + if err != nil { + t.Fatalf("%v", err) + } + expVal12 := "val2" + actVal12 := *val12.(*string) + if expVal12 != actVal12 { + t.Fatalf("expected content to see %v, saw %v", expVal12, actVal12) + } + val22, err := client2.Get("test/key2") + if err != nil { + t.Fatalf("%v", err) + } + expVal22 := "val2" + actVal22 := *val22.(*string) + if expVal22 != actVal22 { + t.Fatalf("expected content to see %v, saw %v", expVal22, actVal22) + } + val32, err := client3.Get("test/key2") if err != nil { t.Fatalf("%v", err) } - expIndexConfig2 := indexConfig.ToMap() - actIndexConfig2 := *indexConfig2.(*map[string]interface{}) - if !reflect.DeepEqual(expIndexConfig2, actIndexConfig2) { - t.Fatalf("expected content to see %v, saw %v", expIndexConfig2, actIndexConfig2) + expVal32 := "val2" + actVal32 := *val32.(*string) + if expVal32 != actVal32 { + t.Fatalf("expected content to see %v, saw %v", expVal32, actVal32) + } + + err = client3.Set("test/key3", "val3") + if err != nil { + t.Fatalf("%v", err) } + time.Sleep(2 * time.Second) // wait for data to propagate - indexConfig3, err := client3.Get("index_config") + // get value from all nodes + val13, err := client1.Get("test/key3") if err != nil { t.Fatalf("%v", err) } - expIndexConfig3 := indexConfig.ToMap() - actIndexConfig3 := *indexConfig3.(*map[string]interface{}) - if !reflect.DeepEqual(expIndexConfig3, actIndexConfig3) { - t.Fatalf("expected content to see %v, saw %v", expIndexConfig3, actIndexConfig3) + expVal13 := "val3" + actVal13 := *val13.(*string) + if expVal13 != actVal13 { + t.Fatalf("expected content to see %v, saw %v", expVal13, actVal13) + } + val23, err := client2.Get("test/key3") + if err != nil { + t.Fatalf("%v", err) + } + expVal23 := "val3" + actVal23 := *val23.(*string) + if expVal23 != actVal23 { + t.Fatalf("expected content to see %v, saw %v", expVal23, actVal23) + } + val33, err := client3.Get("test/key3") + if err != nil { + t.Fatalf("%v", err) + } + expVal33 := "val3" + actVal33 := *val33.(*string) + if expVal33 != actVal33 { + t.Fatalf("expected content to see %v, saw %v", expVal33, actVal33) } } -func TestCluster_SetState(t *testing.T) { +func TestCluster_GetState(t *testing.T) { curDir, _ := os.Getwd() - // create logger logger := logutils.NewLogger("WARN", "", 500, 3, 30, false) - - // create gRPC logger grpcLogger := logutils.NewLogger("WARN", "", 500, 3, 30, false) - - // create HTTP access logger httpAccessLogger := logutils.NewApacheCombinedLogger("", 500, 3, 30, false) - // create index config - indexConfig, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") + peerGrpcAddress1 := "" + grpcAddress1 := fmt.Sprintf(":%d", testutils.TmpPort()) + httpAddress1 := fmt.Sprintf(":%d", testutils.TmpPort()) + nodeId1 := fmt.Sprintf("node-%s", strutils.RandStr(5)) + bindAddress1 := fmt.Sprintf(":%d", testutils.TmpPort()) + dataDir1 := testutils.TmpDir() + raftStorageType1 := "boltdb" + + node1 := &management.Node{ + BindAddress: bindAddress1, + Status: "", + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress1, + HttpAddress: httpAddress1, + }, + } + + indexConfig1, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") if err != nil { t.Fatalf("%v", err) } - // create configs for server1 - clusterConfig1 := config.DefaultClusterConfig() - nodeConfig1 := testutils.TmpNodeConfig() - defer func() { - _ = os.RemoveAll(nodeConfig1.DataDir) - }() - // create server1 - server1, err := NewServer(clusterConfig1, nodeConfig1, indexConfig, logger.Named("manager1"), grpcLogger, httpAccessLogger) + // create server + server1, err := NewServer(peerGrpcAddress1, nodeId1, node1, dataDir1, raftStorageType1, indexConfig1, logger, grpcLogger, httpAccessLogger) defer func() { if server1 != nil { server1.Stop() @@ -1647,18 +1794,34 @@ func TestCluster_SetState(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - // start server1 + + // start server server1.Start() - // create configs for server2 - clusterConfig2 := config.DefaultClusterConfig() - clusterConfig2.PeerAddr = nodeConfig1.GRPCAddr - nodeConfig2 := testutils.TmpNodeConfig() - defer func() { - _ = os.RemoveAll(nodeConfig2.DataDir) - }() - // create server2 - server2, err := NewServer(clusterConfig2, nodeConfig2, config.DefaultIndexConfig(), logger.Named("manager2"), grpcLogger, httpAccessLogger) + peerGrpcAddress2 := grpcAddress1 + grpcAddress2 := fmt.Sprintf(":%d", testutils.TmpPort()) + httpAddress2 := fmt.Sprintf(":%d", testutils.TmpPort()) + nodeId2 := fmt.Sprintf("node-%s", strutils.RandStr(5)) + bindAddress2 := fmt.Sprintf(":%d", testutils.TmpPort()) + dataDir2 := testutils.TmpDir() + raftStorageType2 := "boltdb" + + node2 := &management.Node{ + BindAddress: bindAddress2, + Status: "", + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress2, + HttpAddress: httpAddress2, + }, + } + + indexConfig2, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") + if err != nil { + t.Fatalf("%v", err) + } + + // create server + server2, err := NewServer(peerGrpcAddress2, nodeId2, node2, dataDir2, raftStorageType2, indexConfig2, logger, grpcLogger, httpAccessLogger) defer func() { if server2 != nil { server2.Stop() @@ -1667,18 +1830,34 @@ func TestCluster_SetState(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - // start server2 + + // start server server2.Start() - // create configs for server3 - clusterConfig3 := config.DefaultClusterConfig() - clusterConfig3.PeerAddr = nodeConfig1.GRPCAddr - nodeConfig3 := testutils.TmpNodeConfig() - defer func() { - _ = os.RemoveAll(nodeConfig3.DataDir) - }() - // create server3 - server3, err := NewServer(clusterConfig3, nodeConfig3, config.DefaultIndexConfig(), logger.Named("manager3"), grpcLogger, httpAccessLogger) + peerGrpcAddress3 := grpcAddress1 + grpcAddress3 := fmt.Sprintf(":%d", testutils.TmpPort()) + httpAddress3 := fmt.Sprintf(":%d", testutils.TmpPort()) + nodeId3 := fmt.Sprintf("node-%s", strutils.RandStr(5)) + bindAddress3 := fmt.Sprintf(":%d", testutils.TmpPort()) + dataDir3 := testutils.TmpDir() + raftStorageType3 := "boltdb" + + node3 := &management.Node{ + BindAddress: bindAddress3, + Status: "", + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress3, + HttpAddress: httpAddress3, + }, + } + + indexConfig3, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") + if err != nil { + t.Fatalf("%v", err) + } + + // create server + server3, err := NewServer(peerGrpcAddress3, nodeId3, node3, dataDir3, raftStorageType3, indexConfig3, logger, grpcLogger, httpAccessLogger) defer func() { if server3 != nil { server3.Stop() @@ -1687,28 +1866,29 @@ func TestCluster_SetState(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - // start server3 + + // start server server3.Start() // sleep time.Sleep(5 * time.Second) - // gRPC client for manager1 - client1, err := NewGRPCClient(nodeConfig1.GRPCAddr) + // gRPC client for all servers + client1, err := NewGRPCClient(node1.Metadata.GrpcAddress) defer func() { _ = client1.Close() }() if err != nil { t.Fatalf("%v", err) } - client2, err := NewGRPCClient(nodeConfig2.GRPCAddr) + client2, err := NewGRPCClient(node2.Metadata.GrpcAddress) defer func() { _ = client2.Close() }() if err != nil { t.Fatalf("%v", err) } - client3, err := NewGRPCClient(nodeConfig3.GRPCAddr) + client3, err := NewGRPCClient(node3.Metadata.GrpcAddress) defer func() { _ = client3.Close() }() @@ -1825,29 +2005,34 @@ func TestCluster_SetState(t *testing.T) { func TestCluster_DeleteState(t *testing.T) { curDir, _ := os.Getwd() - // create logger logger := logutils.NewLogger("WARN", "", 500, 3, 30, false) - - // create gRPC logger grpcLogger := logutils.NewLogger("WARN", "", 500, 3, 30, false) - - // create HTTP access logger httpAccessLogger := logutils.NewApacheCombinedLogger("", 500, 3, 30, false) - // create index config - indexConfig, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") + peerGrpcAddress1 := "" + grpcAddress1 := fmt.Sprintf(":%d", testutils.TmpPort()) + httpAddress1 := fmt.Sprintf(":%d", testutils.TmpPort()) + nodeId1 := fmt.Sprintf("node-%s", strutils.RandStr(5)) + bindAddress1 := fmt.Sprintf(":%d", testutils.TmpPort()) + dataDir1 := testutils.TmpDir() + raftStorageType1 := "boltdb" + + node1 := &management.Node{ + BindAddress: bindAddress1, + Status: "", + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress1, + HttpAddress: httpAddress1, + }, + } + + indexConfig1, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") if err != nil { t.Fatalf("%v", err) } - // create configs for server1 - clusterConfig1 := config.DefaultClusterConfig() - nodeConfig1 := testutils.TmpNodeConfig() - defer func() { - _ = os.RemoveAll(nodeConfig1.DataDir) - }() - // create server1 - server1, err := NewServer(clusterConfig1, nodeConfig1, indexConfig, logger.Named("manager1"), grpcLogger, httpAccessLogger) + // create server + server1, err := NewServer(peerGrpcAddress1, nodeId1, node1, dataDir1, raftStorageType1, indexConfig1, logger, grpcLogger, httpAccessLogger) defer func() { if server1 != nil { server1.Stop() @@ -1856,18 +2041,34 @@ func TestCluster_DeleteState(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - // start server1 + + // start server server1.Start() - // create configs for server2 - clusterConfig2 := config.DefaultClusterConfig() - clusterConfig2.PeerAddr = nodeConfig1.GRPCAddr - nodeConfig2 := testutils.TmpNodeConfig() - defer func() { - _ = os.RemoveAll(nodeConfig2.DataDir) - }() - // create server2 - server2, err := NewServer(clusterConfig2, nodeConfig2, config.DefaultIndexConfig(), logger.Named("manager2"), grpcLogger, httpAccessLogger) + peerGrpcAddress2 := grpcAddress1 + grpcAddress2 := fmt.Sprintf(":%d", testutils.TmpPort()) + httpAddress2 := fmt.Sprintf(":%d", testutils.TmpPort()) + nodeId2 := fmt.Sprintf("node-%s", strutils.RandStr(5)) + bindAddress2 := fmt.Sprintf(":%d", testutils.TmpPort()) + dataDir2 := testutils.TmpDir() + raftStorageType2 := "boltdb" + + node2 := &management.Node{ + BindAddress: bindAddress2, + Status: "", + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress2, + HttpAddress: httpAddress2, + }, + } + + indexConfig2, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") + if err != nil { + t.Fatalf("%v", err) + } + + // create server + server2, err := NewServer(peerGrpcAddress2, nodeId2, node2, dataDir2, raftStorageType2, indexConfig2, logger, grpcLogger, httpAccessLogger) defer func() { if server2 != nil { server2.Stop() @@ -1876,18 +2077,34 @@ func TestCluster_DeleteState(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - // start server2 + + // start server server2.Start() - // create configs for server3 - clusterConfig3 := config.DefaultClusterConfig() - clusterConfig3.PeerAddr = nodeConfig1.GRPCAddr - nodeConfig3 := testutils.TmpNodeConfig() - defer func() { - _ = os.RemoveAll(nodeConfig3.DataDir) - }() - // create server3 - server3, err := NewServer(clusterConfig3, nodeConfig3, config.DefaultIndexConfig(), logger.Named("manager3"), grpcLogger, httpAccessLogger) + peerGrpcAddress3 := grpcAddress1 + grpcAddress3 := fmt.Sprintf(":%d", testutils.TmpPort()) + httpAddress3 := fmt.Sprintf(":%d", testutils.TmpPort()) + nodeId3 := fmt.Sprintf("node-%s", strutils.RandStr(5)) + bindAddress3 := fmt.Sprintf(":%d", testutils.TmpPort()) + dataDir3 := testutils.TmpDir() + raftStorageType3 := "boltdb" + + node3 := &management.Node{ + BindAddress: bindAddress3, + Status: "", + Metadata: &management.Metadata{ + GrpcAddress: grpcAddress3, + HttpAddress: httpAddress3, + }, + } + + indexConfig3, err := testutils.TmpIndexConfig(filepath.Join(curDir, "../example/wiki_index_mapping.json"), "upside_down", "boltdb") + if err != nil { + t.Fatalf("%v", err) + } + + // create server + server3, err := NewServer(peerGrpcAddress3, nodeId3, node3, dataDir3, raftStorageType3, indexConfig3, logger, grpcLogger, httpAccessLogger) defer func() { if server3 != nil { server3.Stop() @@ -1896,28 +2113,29 @@ func TestCluster_DeleteState(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - // start server3 + + // start server server3.Start() // sleep time.Sleep(5 * time.Second) - // gRPC client for manager1 - client1, err := NewGRPCClient(nodeConfig1.GRPCAddr) + // gRPC client for all servers + client1, err := NewGRPCClient(node1.Metadata.GrpcAddress) defer func() { _ = client1.Close() }() if err != nil { t.Fatalf("%v", err) } - client2, err := NewGRPCClient(nodeConfig2.GRPCAddr) + client2, err := NewGRPCClient(node2.Metadata.GrpcAddress) defer func() { _ = client2.Close() }() if err != nil { t.Fatalf("%v", err) } - client3, err := NewGRPCClient(nodeConfig3.GRPCAddr) + client3, err := NewGRPCClient(node3.Metadata.GrpcAddress) defer func() { _ = client3.Close() }() diff --git a/protobuf/management/management.pb.go b/protobuf/management/management.pb.go index 5c7a100..8125b30 100644 --- a/protobuf/management/management.pb.go +++ b/protobuf/management/management.pb.go @@ -114,7 +114,7 @@ func (x WatchResponse_Command) String() string { } func (WatchResponse_Command) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_5e030ad796566078, []int{12, 0} + return fileDescriptor_5e030ad796566078, []int{15, 0} } type NodeHealthCheckRequest struct { @@ -196,6 +196,147 @@ func (m *NodeHealthCheckResponse) GetState() NodeHealthCheckResponse_State { } // use for raft +type Metadata struct { + GrpcAddress string `protobuf:"bytes,1,opt,name=grpc_address,json=grpcAddress,proto3" json:"grpc_address,omitempty"` + HttpAddress string `protobuf:"bytes,2,opt,name=http_address,json=httpAddress,proto3" json:"http_address,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Metadata) Reset() { *m = Metadata{} } +func (m *Metadata) String() string { return proto.CompactTextString(m) } +func (*Metadata) ProtoMessage() {} +func (*Metadata) Descriptor() ([]byte, []int) { + return fileDescriptor_5e030ad796566078, []int{2} +} + +func (m *Metadata) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Metadata.Unmarshal(m, b) +} +func (m *Metadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Metadata.Marshal(b, m, deterministic) +} +func (m *Metadata) XXX_Merge(src proto.Message) { + xxx_messageInfo_Metadata.Merge(m, src) +} +func (m *Metadata) XXX_Size() int { + return xxx_messageInfo_Metadata.Size(m) +} +func (m *Metadata) XXX_DiscardUnknown() { + xxx_messageInfo_Metadata.DiscardUnknown(m) +} + +var xxx_messageInfo_Metadata proto.InternalMessageInfo + +func (m *Metadata) GetGrpcAddress() string { + if m != nil { + return m.GrpcAddress + } + return "" +} + +func (m *Metadata) GetHttpAddress() string { + if m != nil { + return m.HttpAddress + } + return "" +} + +type Node struct { + BindAddress string `protobuf:"bytes,1,opt,name=bind_address,json=bindAddress,proto3" json:"bind_address,omitempty"` + Status string `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"` + Metadata *Metadata `protobuf:"bytes,3,opt,name=metadata,proto3" json:"metadata,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Node) Reset() { *m = Node{} } +func (m *Node) String() string { return proto.CompactTextString(m) } +func (*Node) ProtoMessage() {} +func (*Node) Descriptor() ([]byte, []int) { + return fileDescriptor_5e030ad796566078, []int{3} +} + +func (m *Node) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Node.Unmarshal(m, b) +} +func (m *Node) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Node.Marshal(b, m, deterministic) +} +func (m *Node) XXX_Merge(src proto.Message) { + xxx_messageInfo_Node.Merge(m, src) +} +func (m *Node) XXX_Size() int { + return xxx_messageInfo_Node.Size(m) +} +func (m *Node) XXX_DiscardUnknown() { + xxx_messageInfo_Node.DiscardUnknown(m) +} + +var xxx_messageInfo_Node proto.InternalMessageInfo + +func (m *Node) GetBindAddress() string { + if m != nil { + return m.BindAddress + } + return "" +} + +func (m *Node) GetStatus() string { + if m != nil { + return m.Status + } + return "" +} + +func (m *Node) GetMetadata() *Metadata { + if m != nil { + return m.Metadata + } + return nil +} + +type Cluster struct { + Nodes map[string]*Node `protobuf:"bytes,1,rep,name=nodes,proto3" json:"nodes,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Cluster) Reset() { *m = Cluster{} } +func (m *Cluster) String() string { return proto.CompactTextString(m) } +func (*Cluster) ProtoMessage() {} +func (*Cluster) Descriptor() ([]byte, []int) { + return fileDescriptor_5e030ad796566078, []int{4} +} + +func (m *Cluster) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Cluster.Unmarshal(m, b) +} +func (m *Cluster) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Cluster.Marshal(b, m, deterministic) +} +func (m *Cluster) XXX_Merge(src proto.Message) { + xxx_messageInfo_Cluster.Merge(m, src) +} +func (m *Cluster) XXX_Size() int { + return xxx_messageInfo_Cluster.Size(m) +} +func (m *Cluster) XXX_DiscardUnknown() { + xxx_messageInfo_Cluster.DiscardUnknown(m) +} + +var xxx_messageInfo_Cluster proto.InternalMessageInfo + +func (m *Cluster) GetNodes() map[string]*Node { + if m != nil { + return m.Nodes + } + return nil +} + type NodeInfoRequest struct { Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` @@ -207,7 +348,7 @@ func (m *NodeInfoRequest) Reset() { *m = NodeInfoRequest{} } func (m *NodeInfoRequest) String() string { return proto.CompactTextString(m) } func (*NodeInfoRequest) ProtoMessage() {} func (*NodeInfoRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_5e030ad796566078, []int{2} + return fileDescriptor_5e030ad796566078, []int{5} } func (m *NodeInfoRequest) XXX_Unmarshal(b []byte) error { @@ -236,8 +377,7 @@ func (m *NodeInfoRequest) GetId() string { } type NodeInfoResponse struct { - NodeConfig *any.Any `protobuf:"bytes,1,opt,name=nodeConfig,proto3" json:"nodeConfig,omitempty"` - State string `protobuf:"bytes,2,opt,name=state,proto3" json:"state,omitempty"` + Node *Node `protobuf:"bytes,1,opt,name=node,proto3" json:"node,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -247,7 +387,7 @@ func (m *NodeInfoResponse) Reset() { *m = NodeInfoResponse{} } func (m *NodeInfoResponse) String() string { return proto.CompactTextString(m) } func (*NodeInfoResponse) ProtoMessage() {} func (*NodeInfoResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_5e030ad796566078, []int{3} + return fileDescriptor_5e030ad796566078, []int{6} } func (m *NodeInfoResponse) XXX_Unmarshal(b []byte) error { @@ -268,23 +408,16 @@ func (m *NodeInfoResponse) XXX_DiscardUnknown() { var xxx_messageInfo_NodeInfoResponse proto.InternalMessageInfo -func (m *NodeInfoResponse) GetNodeConfig() *any.Any { +func (m *NodeInfoResponse) GetNode() *Node { if m != nil { - return m.NodeConfig + return m.Node } return nil } -func (m *NodeInfoResponse) GetState() string { - if m != nil { - return m.State - } - return "" -} - type ClusterJoinRequest struct { Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` - NodeConfig *any.Any `protobuf:"bytes,2,opt,name=nodeConfig,proto3" json:"nodeConfig,omitempty"` + Node *Node `protobuf:"bytes,2,opt,name=node,proto3" json:"node,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -294,7 +427,7 @@ func (m *ClusterJoinRequest) Reset() { *m = ClusterJoinRequest{} } func (m *ClusterJoinRequest) String() string { return proto.CompactTextString(m) } func (*ClusterJoinRequest) ProtoMessage() {} func (*ClusterJoinRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_5e030ad796566078, []int{4} + return fileDescriptor_5e030ad796566078, []int{7} } func (m *ClusterJoinRequest) XXX_Unmarshal(b []byte) error { @@ -322,9 +455,9 @@ func (m *ClusterJoinRequest) GetId() string { return "" } -func (m *ClusterJoinRequest) GetNodeConfig() *any.Any { +func (m *ClusterJoinRequest) GetNode() *Node { if m != nil { - return m.NodeConfig + return m.Node } return nil } @@ -340,7 +473,7 @@ func (m *ClusterLeaveRequest) Reset() { *m = ClusterLeaveRequest{} } func (m *ClusterLeaveRequest) String() string { return proto.CompactTextString(m) } func (*ClusterLeaveRequest) ProtoMessage() {} func (*ClusterLeaveRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_5e030ad796566078, []int{5} + return fileDescriptor_5e030ad796566078, []int{8} } func (m *ClusterLeaveRequest) XXX_Unmarshal(b []byte) error { @@ -369,7 +502,7 @@ func (m *ClusterLeaveRequest) GetId() string { } type ClusterInfoResponse struct { - Cluster *any.Any `protobuf:"bytes,1,opt,name=cluster,proto3" json:"cluster,omitempty"` + Cluster *Cluster `protobuf:"bytes,1,opt,name=cluster,proto3" json:"cluster,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -379,7 +512,7 @@ func (m *ClusterInfoResponse) Reset() { *m = ClusterInfoResponse{} } func (m *ClusterInfoResponse) String() string { return proto.CompactTextString(m) } func (*ClusterInfoResponse) ProtoMessage() {} func (*ClusterInfoResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_5e030ad796566078, []int{6} + return fileDescriptor_5e030ad796566078, []int{9} } func (m *ClusterInfoResponse) XXX_Unmarshal(b []byte) error { @@ -400,7 +533,7 @@ func (m *ClusterInfoResponse) XXX_DiscardUnknown() { var xxx_messageInfo_ClusterInfoResponse proto.InternalMessageInfo -func (m *ClusterInfoResponse) GetCluster() *any.Any { +func (m *ClusterInfoResponse) GetCluster() *Cluster { if m != nil { return m.Cluster } @@ -418,7 +551,7 @@ func (m *GetRequest) Reset() { *m = GetRequest{} } func (m *GetRequest) String() string { return proto.CompactTextString(m) } func (*GetRequest) ProtoMessage() {} func (*GetRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_5e030ad796566078, []int{7} + return fileDescriptor_5e030ad796566078, []int{10} } func (m *GetRequest) XXX_Unmarshal(b []byte) error { @@ -457,7 +590,7 @@ func (m *GetResponse) Reset() { *m = GetResponse{} } func (m *GetResponse) String() string { return proto.CompactTextString(m) } func (*GetResponse) ProtoMessage() {} func (*GetResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_5e030ad796566078, []int{8} + return fileDescriptor_5e030ad796566078, []int{11} } func (m *GetResponse) XXX_Unmarshal(b []byte) error { @@ -497,7 +630,7 @@ func (m *SetRequest) Reset() { *m = SetRequest{} } func (m *SetRequest) String() string { return proto.CompactTextString(m) } func (*SetRequest) ProtoMessage() {} func (*SetRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_5e030ad796566078, []int{9} + return fileDescriptor_5e030ad796566078, []int{12} } func (m *SetRequest) XXX_Unmarshal(b []byte) error { @@ -543,7 +676,7 @@ func (m *DeleteRequest) Reset() { *m = DeleteRequest{} } func (m *DeleteRequest) String() string { return proto.CompactTextString(m) } func (*DeleteRequest) ProtoMessage() {} func (*DeleteRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_5e030ad796566078, []int{10} + return fileDescriptor_5e030ad796566078, []int{13} } func (m *DeleteRequest) XXX_Unmarshal(b []byte) error { @@ -582,7 +715,7 @@ func (m *WatchRequest) Reset() { *m = WatchRequest{} } func (m *WatchRequest) String() string { return proto.CompactTextString(m) } func (*WatchRequest) ProtoMessage() {} func (*WatchRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_5e030ad796566078, []int{11} + return fileDescriptor_5e030ad796566078, []int{14} } func (m *WatchRequest) XXX_Unmarshal(b []byte) error { @@ -623,7 +756,7 @@ func (m *WatchResponse) Reset() { *m = WatchResponse{} } func (m *WatchResponse) String() string { return proto.CompactTextString(m) } func (*WatchResponse) ProtoMessage() {} func (*WatchResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_5e030ad796566078, []int{12} + return fileDescriptor_5e030ad796566078, []int{15} } func (m *WatchResponse) XXX_Unmarshal(b []byte) error { @@ -671,6 +804,10 @@ func init() { proto.RegisterEnum("management.WatchResponse_Command", WatchResponse_Command_name, WatchResponse_Command_value) proto.RegisterType((*NodeHealthCheckRequest)(nil), "management.NodeHealthCheckRequest") proto.RegisterType((*NodeHealthCheckResponse)(nil), "management.NodeHealthCheckResponse") + proto.RegisterType((*Metadata)(nil), "management.Metadata") + proto.RegisterType((*Node)(nil), "management.Node") + proto.RegisterType((*Cluster)(nil), "management.Cluster") + proto.RegisterMapType((map[string]*Node)(nil), "management.Cluster.NodesEntry") proto.RegisterType((*NodeInfoRequest)(nil), "management.NodeInfoRequest") proto.RegisterType((*NodeInfoResponse)(nil), "management.NodeInfoResponse") proto.RegisterType((*ClusterJoinRequest)(nil), "management.ClusterJoinRequest") @@ -689,52 +826,61 @@ func init() { } var fileDescriptor_5e030ad796566078 = []byte{ - // 719 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x55, 0xff, 0x6f, 0xd2, 0x40, - 0x1c, 0xa5, 0x65, 0x05, 0xf6, 0x61, 0x5f, 0x9a, 0x73, 0xd9, 0x17, 0x66, 0xe6, 0x56, 0x35, 0x99, - 0x2e, 0x16, 0x33, 0x35, 0x6a, 0xfc, 0x8a, 0xd0, 0x0c, 0x36, 0xec, 0x4c, 0xcb, 0x5c, 0xb6, 0x18, - 0x4d, 0x81, 0x1b, 0x90, 0xd1, 0x1e, 0xae, 0xc7, 0x92, 0xfd, 0x15, 0xfa, 0x97, 0xf8, 0xab, 0xff, - 0x9e, 0x69, 0xaf, 0x85, 0x1b, 0xb4, 0xdd, 0x12, 0x7f, 0xe3, 0xee, 0xde, 0x7b, 0x9f, 0xf7, 0xae, - 0x7d, 0x05, 0x1e, 0x0c, 0x2e, 0x08, 0x25, 0xcd, 0xe1, 0x59, 0xd1, 0xb6, 0x1c, 0xab, 0x83, 0x6d, - 0xec, 0x50, 0xee, 0xa7, 0xea, 0x1f, 0x23, 0x18, 0xef, 0x14, 0xd6, 0x3a, 0x84, 0x74, 0xfa, 0xb8, - 0x38, 0x22, 0x5a, 0xce, 0x15, 0x83, 0x15, 0xd6, 0x27, 0x8f, 0xb0, 0x3d, 0xa0, 0xc1, 0xa1, 0xf2, - 0x4b, 0x80, 0x65, 0x9d, 0xb4, 0x71, 0x15, 0x5b, 0x7d, 0xda, 0x2d, 0x77, 0x71, 0xeb, 0xdc, 0xc0, - 0x3f, 0x87, 0xd8, 0xa5, 0xe8, 0x3d, 0x48, 0x83, 0x0b, 0xd2, 0xc4, 0xab, 0xc2, 0xa6, 0xb0, 0xbd, - 0xb0, 0xbb, 0xad, 0x72, 0x06, 0xa2, 0x29, 0xea, 0x17, 0x0f, 0x6f, 0x30, 0x9a, 0xf2, 0x02, 0x24, - 0x7f, 0x8d, 0x16, 0x21, 0x5f, 0xd5, 0x4a, 0xf5, 0x46, 0xb5, 0xa6, 0x6b, 0xa6, 0x29, 0xa7, 0xd0, - 0x1c, 0xe4, 0xea, 0xb5, 0xaf, 0x9a, 0xbf, 0x12, 0xd0, 0x3c, 0xcc, 0x1a, 0x5a, 0xa9, 0xc2, 0x0e, - 0x45, 0xe5, 0x8f, 0x00, 0x2b, 0x53, 0xf2, 0xee, 0x80, 0x38, 0x2e, 0x46, 0x1f, 0x40, 0x72, 0xa9, - 0x45, 0x43, 0x4b, 0x8f, 0x12, 0x2d, 0x31, 0x8e, 0x6a, 0x7a, 0x04, 0x83, 0xf1, 0x14, 0x03, 0x24, - 0x7f, 0x8d, 0xf2, 0x90, 0x65, 0x9e, 0x4e, 0xe4, 0x94, 0xe7, 0xe0, 0x48, 0x0f, 0x97, 0x02, 0x9a, - 0x05, 0xa9, 0xe4, 0xf9, 0x93, 0x45, 0x94, 0x83, 0x99, 0x8a, 0x56, 0xaa, 0xc8, 0x69, 0x6f, 0xd3, - 0x73, 0x79, 0x22, 0xcf, 0x78, 0x70, 0xfd, 0xb0, 0xf1, 0x83, 0x2d, 0x25, 0x65, 0x0b, 0x16, 0xbd, - 0xd9, 0x35, 0xe7, 0x8c, 0x84, 0x57, 0xb7, 0x00, 0x62, 0xaf, 0xed, 0x9b, 0x9c, 0x35, 0xc4, 0x5e, - 0x5b, 0xf9, 0x0e, 0xf2, 0x18, 0x12, 0x64, 0x79, 0x0e, 0xe0, 0x90, 0x36, 0x2e, 0x13, 0xe7, 0xac, - 0xd7, 0xf1, 0xb1, 0xf9, 0xdd, 0x25, 0x95, 0x3d, 0x2b, 0x35, 0x7c, 0x56, 0x6a, 0xc9, 0xb9, 0x32, - 0x38, 0x1c, 0x5a, 0x0a, 0x6f, 0x40, 0xf4, 0xc5, 0x83, 0x58, 0xa7, 0x80, 0xca, 0xfd, 0xa1, 0x4b, - 0xf1, 0xc5, 0x3e, 0xe9, 0x39, 0x31, 0x2e, 0x26, 0x26, 0x8a, 0xb7, 0x9b, 0xa8, 0x3c, 0x84, 0x3b, - 0x81, 0x76, 0x1d, 0x5b, 0x97, 0x38, 0x2e, 0xa2, 0x36, 0x82, 0x5d, 0x4b, 0xa9, 0x42, 0xb6, 0xc5, - 0xb6, 0x13, 0x23, 0x86, 0x20, 0x65, 0x03, 0x60, 0x0f, 0xd3, 0x70, 0x88, 0x0c, 0xe9, 0x73, 0x7c, - 0x15, 0x4c, 0xf1, 0x7e, 0x2a, 0xaf, 0x21, 0xef, 0x9f, 0x07, 0xf2, 0x8f, 0x41, 0xba, 0xb4, 0xfa, - 0x43, 0x9c, 0x28, 0xce, 0x20, 0xca, 0x3e, 0x80, 0x99, 0x20, 0x3d, 0xd6, 0x12, 0x6f, 0xd6, 0xda, - 0x82, 0xf9, 0x0a, 0xee, 0x63, 0x8a, 0xe3, 0x9d, 0x6e, 0xc2, 0xdc, 0xb1, 0x45, 0x5b, 0xdd, 0x78, - 0xc4, 0x5f, 0x01, 0xe6, 0x03, 0x48, 0x10, 0xe7, 0x0d, 0x64, 0x5b, 0xc4, 0xb6, 0x2d, 0xa7, 0x1d, - 0xbc, 0xe1, 0x5b, 0xfc, 0x1b, 0x7e, 0x0d, 0xab, 0x96, 0x19, 0xd0, 0x08, 0x19, 0xe1, 0x00, 0x31, - 0x22, 0x51, 0xfa, 0xe6, 0x44, 0x3b, 0x90, 0x0d, 0x14, 0xbd, 0x6e, 0x1c, 0xe9, 0x07, 0xfa, 0xe1, - 0xb1, 0x2e, 0xa7, 0x50, 0x16, 0xd2, 0xa6, 0xd6, 0x90, 0x05, 0x04, 0x90, 0xa9, 0x68, 0x75, 0xad, - 0xa1, 0xc9, 0xe2, 0xee, 0xef, 0x0c, 0xc0, 0xe7, 0x91, 0x31, 0xf4, 0x8d, 0x35, 0x80, 0x6b, 0x1f, - 0x52, 0x6e, 0xfe, 0x5a, 0x14, 0xee, 0xdf, 0xa2, 0xbe, 0x4a, 0x0a, 0xed, 0x41, 0x2e, 0x2c, 0x0f, - 0x5a, 0x9f, 0xa4, 0x70, 0xad, 0x2b, 0xdc, 0x8d, 0x3e, 0xe4, 0x84, 0xf2, 0x5c, 0x4b, 0xd0, 0x06, - 0x0f, 0x9f, 0xae, 0x4f, 0x61, 0x79, 0xea, 0xba, 0x34, 0xef, 0xc3, 0xa9, 0xa4, 0x50, 0x0d, 0xe6, - 0xf8, 0x4a, 0xa0, 0x7b, 0x11, 0x4a, 0x7c, 0x59, 0x12, 0xa4, 0xaa, 0x23, 0x4f, 0x7e, 0xbe, 0x18, - 0x60, 0x21, 0x6a, 0xc2, 0x44, 0xba, 0x83, 0x91, 0x29, 0xff, 0x3d, 0xf9, 0x0f, 0xa9, 0xa7, 0x02, - 0x7a, 0x05, 0xe9, 0x3d, 0x4c, 0xd1, 0x32, 0x8f, 0x1d, 0xf7, 0xb2, 0xb0, 0x32, 0xb5, 0x3f, 0xb2, - 0xf1, 0x12, 0xd2, 0xe6, 0x24, 0x73, 0x5c, 0xbb, 0x84, 0x9b, 0x78, 0x07, 0x19, 0x56, 0x29, 0xb4, - 0xc6, 0x73, 0xaf, 0xd5, 0x2c, 0x81, 0xfe, 0x11, 0x24, 0x96, 0x7b, 0x35, 0xa2, 0x32, 0x8c, 0xbc, - 0x16, 0x5b, 0x26, 0x3f, 0xf3, 0x5b, 0xc8, 0x99, 0x8e, 0x35, 0x70, 0xbb, 0x84, 0xc6, 0x5e, 0x5e, - 0xec, 0xfc, 0x4f, 0x4f, 0x4e, 0x77, 0x3a, 0x3d, 0xda, 0x1d, 0x36, 0xd5, 0x16, 0xb1, 0x8b, 0x36, - 0x71, 0x87, 0xe7, 0x56, 0xb1, 0xd9, 0xb7, 0x5c, 0x5a, 0x8c, 0xf8, 0x33, 0x6f, 0x66, 0xfc, 0xcd, - 0x67, 0xff, 0x02, 0x00, 0x00, 0xff, 0xff, 0x53, 0x09, 0x0b, 0x91, 0xea, 0x07, 0x00, 0x00, + // 855 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x56, 0xff, 0x6f, 0xda, 0x46, + 0x14, 0xc7, 0x76, 0x0c, 0xe4, 0x39, 0x69, 0xad, 0xeb, 0x94, 0x26, 0xee, 0xd4, 0x25, 0xb7, 0x6e, + 0xca, 0x56, 0xd5, 0x54, 0x6c, 0xd3, 0xb2, 0xef, 0x63, 0xc1, 0x4a, 0xa0, 0x94, 0x56, 0x86, 0xae, + 0xea, 0x34, 0xa9, 0x3a, 0xf0, 0x15, 0x50, 0xb0, 0xcd, 0xf0, 0x11, 0x89, 0xbf, 0x61, 0x3f, 0x6c, + 0x7f, 0xc9, 0x7e, 0xdd, 0xbf, 0x37, 0x9d, 0xef, 0x6c, 0x1c, 0xb0, 0xc9, 0xa4, 0xfe, 0xe6, 0x7b, + 0xef, 0xf3, 0x3e, 0xf7, 0x79, 0x2f, 0xf7, 0x79, 0x01, 0x1e, 0xcd, 0xe6, 0x21, 0x0b, 0x07, 0x8b, + 0x77, 0x35, 0x9f, 0x04, 0x64, 0x44, 0x7d, 0x1a, 0xb0, 0xcc, 0xa7, 0x1d, 0xa7, 0x11, 0xac, 0x22, + 0xd6, 0xd1, 0x28, 0x0c, 0x47, 0x53, 0x5a, 0x4b, 0x0b, 0x49, 0xb0, 0x14, 0x30, 0xeb, 0xc1, 0x7a, + 0x8a, 0xfa, 0x33, 0x26, 0x93, 0xf8, 0x2f, 0x05, 0x0e, 0xba, 0xa1, 0x47, 0x2f, 0x29, 0x99, 0xb2, + 0xf1, 0xf9, 0x98, 0x0e, 0xaf, 0x5c, 0xfa, 0xc7, 0x82, 0x46, 0x0c, 0xfd, 0x08, 0xfa, 0x6c, 0x1e, + 0x0e, 0xe8, 0xa1, 0x72, 0xac, 0x9c, 0xde, 0xa9, 0x9f, 0xda, 0x19, 0x01, 0xf9, 0x25, 0xf6, 0x4b, + 0x8e, 0x77, 0x45, 0x19, 0xfe, 0x0a, 0xf4, 0xf8, 0x8c, 0xee, 0x82, 0x71, 0xe9, 0x34, 0x3a, 0xfd, + 0xcb, 0x56, 0xd7, 0xe9, 0xf5, 0xcc, 0x12, 0xda, 0x83, 0x6a, 0xa7, 0xf5, 0xab, 0x13, 0x9f, 0x14, + 0xb4, 0x0f, 0xbb, 0xae, 0xd3, 0x68, 0x8a, 0xa4, 0x8a, 0xff, 0x51, 0xe0, 0xfe, 0x06, 0x7d, 0x34, + 0x0b, 0x83, 0x88, 0xa2, 0x9f, 0x40, 0x8f, 0x18, 0x61, 0x89, 0xa4, 0xcf, 0xb6, 0x4a, 0x12, 0x35, + 0x76, 0x8f, 0x17, 0xb8, 0xa2, 0x0e, 0xbb, 0xa0, 0xc7, 0x67, 0x64, 0x40, 0x45, 0x68, 0x7a, 0x63, + 0x96, 0xb8, 0x82, 0x57, 0xdd, 0xe4, 0xa8, 0xa0, 0x5d, 0xd0, 0x1b, 0x5c, 0x9f, 0xa9, 0xa2, 0x2a, + 0xec, 0x34, 0x9d, 0x46, 0xd3, 0xd4, 0x78, 0x90, 0xab, 0x7c, 0x63, 0xee, 0x70, 0x78, 0xf7, 0x45, + 0xff, 0xad, 0x38, 0xea, 0xf8, 0x25, 0x54, 0x9f, 0x53, 0x46, 0x3c, 0xc2, 0x08, 0x3a, 0x81, 0xbd, + 0xd1, 0x7c, 0x36, 0x7c, 0x4b, 0x3c, 0x6f, 0x4e, 0xa3, 0x28, 0xd6, 0xb9, 0xeb, 0x1a, 0x3c, 0xd6, + 0x10, 0x21, 0x0e, 0x19, 0x33, 0x36, 0x4b, 0x21, 0xaa, 0x80, 0xf0, 0x98, 0x84, 0xe0, 0x08, 0x76, + 0x78, 0x37, 0x1c, 0x3a, 0x98, 0x04, 0xde, 0x3a, 0x1b, 0x8f, 0x25, 0x6c, 0x07, 0x50, 0xe6, 0x9d, + 0x2d, 0x12, 0x1e, 0x79, 0x42, 0x4f, 0xa1, 0xea, 0x4b, 0x51, 0x87, 0xda, 0xb1, 0x72, 0x6a, 0xd4, + 0x3f, 0xc8, 0x0e, 0x2b, 0x11, 0xec, 0xa6, 0x28, 0xfc, 0xa7, 0x02, 0x95, 0xf3, 0xe9, 0x22, 0x62, + 0x74, 0x8e, 0xbe, 0x04, 0x3d, 0x08, 0x3d, 0xca, 0x6f, 0xd4, 0x4e, 0x8d, 0xfa, 0xc3, 0x6c, 0xa9, + 0xc4, 0xc4, 0xf3, 0x8e, 0x9c, 0x80, 0xcd, 0x97, 0xae, 0x00, 0x5b, 0x6d, 0x80, 0x55, 0x10, 0x99, + 0xa0, 0x5d, 0xd1, 0xa5, 0xd4, 0xcc, 0x3f, 0xd1, 0xa7, 0xa0, 0x5f, 0x93, 0xe9, 0x82, 0xc6, 0x52, + 0x8d, 0xba, 0xb9, 0xfe, 0xd7, 0x73, 0x45, 0xfa, 0x5b, 0xf5, 0x4c, 0xc1, 0x27, 0x70, 0x97, 0x87, + 0x5a, 0xc1, 0xbb, 0x30, 0x79, 0x8f, 0x77, 0x40, 0x9d, 0x78, 0x92, 0x4f, 0x9d, 0x78, 0xf8, 0x0c, + 0xcc, 0x15, 0x44, 0x3e, 0x90, 0x47, 0xb0, 0xc3, 0xb5, 0xc4, 0xa8, 0xbc, 0x1b, 0xe2, 0x2c, 0x6e, + 0x03, 0x92, 0x5d, 0xb4, 0xc3, 0x49, 0x50, 0xc0, 0x9f, 0x72, 0xa9, 0x5b, 0xb9, 0x3e, 0x81, 0x7b, + 0x92, 0xab, 0x43, 0xc9, 0x35, 0x2d, 0x12, 0xdb, 0x4c, 0x61, 0x37, 0xf4, 0x3e, 0x81, 0xca, 0x50, + 0x84, 0xa5, 0xe4, 0x7b, 0x39, 0xa3, 0x76, 0x13, 0x0c, 0x7e, 0x08, 0x70, 0x41, 0x59, 0x72, 0xc7, + 0xc6, 0x84, 0xf1, 0x37, 0x60, 0xc4, 0x79, 0xc9, 0xfe, 0x79, 0x32, 0x70, 0x45, 0xbe, 0x00, 0xb1, + 0x09, 0xec, 0x64, 0x13, 0xd8, 0x8d, 0x60, 0x29, 0x87, 0x8e, 0xdb, 0x00, 0xbd, 0x2d, 0xd4, 0x2b, + 0x2e, 0xf5, 0x76, 0xae, 0x13, 0xd8, 0x6f, 0xd2, 0x29, 0x65, 0xb4, 0x58, 0xe9, 0x31, 0xec, 0xbd, + 0x26, 0x6c, 0x38, 0x2e, 0x46, 0xfc, 0xab, 0xc0, 0xbe, 0x84, 0xc8, 0x76, 0xbe, 0x83, 0xca, 0x30, + 0xf4, 0x7d, 0x12, 0x78, 0xd2, 0xff, 0x27, 0xd9, 0x61, 0xdd, 0xc0, 0xda, 0xe7, 0x02, 0xe8, 0x26, + 0x15, 0xc9, 0x05, 0x6a, 0x4e, 0x47, 0xda, 0xed, 0x1d, 0x3d, 0x86, 0x8a, 0x64, 0xe4, 0x9b, 0xe3, + 0x55, 0xf7, 0x59, 0xf7, 0xc5, 0xeb, 0xae, 0x59, 0x42, 0x15, 0xd0, 0x7a, 0x4e, 0xdf, 0x54, 0x10, + 0x40, 0xb9, 0xe9, 0x74, 0x9c, 0xbe, 0x63, 0xaa, 0xf5, 0xbf, 0xcb, 0x00, 0xcf, 0x53, 0x61, 0xe8, + 0x77, 0xf1, 0x94, 0x33, 0xbb, 0x09, 0xe1, 0xdb, 0x77, 0xa9, 0xf5, 0xf1, 0xff, 0x58, 0x6e, 0xb8, + 0x84, 0x2e, 0xa0, 0x9a, 0xb8, 0x00, 0x3d, 0x58, 0x2f, 0xc9, 0xd8, 0xc7, 0xfa, 0x30, 0x3f, 0x99, + 0x21, 0x32, 0x32, 0xa6, 0x40, 0x79, 0x9e, 0xcf, 0xb8, 0xc5, 0x3a, 0xd8, 0x18, 0x97, 0xc3, 0xff, + 0xad, 0xe0, 0x12, 0x6a, 0xc1, 0x5e, 0xd6, 0x11, 0xe8, 0xa3, 0x1c, 0xa6, 0xac, 0x57, 0xb6, 0x50, + 0x5d, 0xa6, 0x9a, 0xe2, 0xfe, 0x0a, 0x80, 0x56, 0xde, 0x0d, 0x6b, 0xdd, 0x3d, 0x4b, 0x45, 0xc5, + 0xef, 0xe4, 0x3d, 0xa8, 0x9e, 0x2a, 0xe8, 0x0c, 0xb4, 0x0b, 0xca, 0xd0, 0x41, 0x16, 0xbb, 0xf2, + 0xa5, 0x75, 0x7f, 0x23, 0x9e, 0xca, 0xf8, 0x1a, 0xb4, 0xde, 0x7a, 0xe5, 0xca, 0x76, 0x5b, 0x26, + 0xf1, 0x03, 0x94, 0x85, 0xa5, 0xd0, 0x51, 0xb6, 0xf6, 0x86, 0xcd, 0xb6, 0x94, 0xff, 0x0c, 0xba, + 0xe8, 0xfb, 0x30, 0xc7, 0x32, 0xa2, 0xf8, 0xa8, 0xd0, 0x4c, 0x71, 0xcf, 0xdf, 0x43, 0xb5, 0x17, + 0x90, 0x59, 0x34, 0x0e, 0x59, 0xe1, 0xf0, 0x0a, 0xef, 0xff, 0xe5, 0xc9, 0x6f, 0x8f, 0x47, 0x13, + 0x36, 0x5e, 0x0c, 0xec, 0x61, 0xe8, 0xd7, 0xfc, 0x30, 0x5a, 0x5c, 0x91, 0xda, 0x60, 0x4a, 0x22, + 0x56, 0xcb, 0xf9, 0xa9, 0x33, 0x28, 0xc7, 0xc1, 0x2f, 0xfe, 0x0b, 0x00, 0x00, 0xff, 0xff, 0x09, + 0x67, 0x31, 0xd3, 0x08, 0x09, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. diff --git a/protobuf/management/management.proto b/protobuf/management/management.proto index e3fc2cd..0e33f27 100644 --- a/protobuf/management/management.proto +++ b/protobuf/management/management.proto @@ -59,18 +59,32 @@ message NodeHealthCheckResponse { } // use for raft +message Metadata { + string grpc_address = 1; + string http_address = 2; +} + +message Node { + string bind_address = 1; + string status = 2; + Metadata metadata = 3; +} + +message Cluster { + map nodes = 1; +} + message NodeInfoRequest { string id = 1; } message NodeInfoResponse { - google.protobuf.Any nodeConfig = 1; - string state = 2; + Node node = 1; } message ClusterJoinRequest { string id = 1; - google.protobuf.Any nodeConfig = 2; + Node node = 2; } message ClusterLeaveRequest { @@ -78,7 +92,7 @@ message ClusterLeaveRequest { } message ClusterInfoResponse { - google.protobuf.Any cluster = 1; + Cluster cluster = 1; } message GetRequest {