这是indexloc提供的服务,不要输入任何密码
Skip to content
This repository was archived by the owner on Dec 10, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 39 additions & 45 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -273,15 +273,13 @@ You can see the result in JSON format. The result of the above command is:

```json
{
"node_config": {
"bind_addr": ":2000",
"data_dir": "/tmp/blast/indexer1",
"grpc_addr": ":5000",
"http_addr": ":8000",
"node_id": "indexer1",
"raft_storage_type": "boltdb"
},
"state": "Leader"
"id": "indexer1",
"bind_address": ":2000",
"state": 3,
"metadata": {
"grpc_address": ":5000",
"http_address": ":8000"
}
}
```

Expand Down Expand Up @@ -684,38 +682,34 @@ You can see the result in JSON format. The result of the above command is:

```json
{
"indexer1": {
"node_config": {
"bind_addr": ":2000",
"data_dir": "/tmp/blast/indexer1",
"grpc_addr": ":5000",
"http_addr": ":8000",
"node_id": "indexer1",
"raft_storage_type": "boltdb"
},
"state": "Leader"
},
"indexer2": {
"node_config": {
"bind_addr": ":2010",
"data_dir": "/tmp/blast/indexer2",
"grpc_addr": ":5010",
"http_addr": ":8010",
"node_id": "indexer2",
"raft_storage_type": "boltdb"
"nodes": {
"indexer1": {
"id": "indexer1",
"bind_address": ":2000",
"state": 3,
"metadata": {
"grpc_address": ":5000",
"http_address": ":8000"
}
},
"state": "Follower"
},
"indexer3": {
"node_config": {
"bind_addr": ":2020",
"data_dir": "/tmp/blast/indexer3",
"grpc_addr": ":5020",
"http_addr": ":8020",
"node_id": "indexer3",
"raft_storage_type": "boltdb"
"indexer2": {
"id": "indexer2",
"bind_address": ":2010",
"state": 1,
"metadata": {
"grpc_address": ":5010",
"http_address": ":8010"
}
},
"state": "Follower"
"indexer3": {
"id": "indexer3",
"bind_address": ":2020",
"state": 1,
"metadata": {
"grpc_address": ":5020",
"http_address": ":8020"
}
}
}
}
```
Expand Down Expand Up @@ -786,9 +780,9 @@ Manager can also bring up a cluster like an indexer. Specify a common index mapp
$ ./bin/blast manager start \
--grpc-address=:5100 \
--http-address=:8100 \
--node-id=cluster1 \
--node-id=manager1 \
--node-address=:2100 \
--data-dir=/tmp/blast/cluster1 \
--data-dir=/tmp/blast/manager1 \
--raft-storage-type=boltdb \
--index-mapping-file=./example/wiki_index_mapping.json \
--index-type=upside_down \
Expand All @@ -798,18 +792,18 @@ $ ./bin/blast manager start \
--peer-grpc-address=:5100 \
--grpc-address=:5110 \
--http-address=:8110 \
--node-id=cluster2 \
--node-id=manager2 \
--node-address=:2110 \
--data-dir=/tmp/blast/cluster2 \
--data-dir=/tmp/blast/manager2 \
--raft-storage-type=boltdb

$ ./bin/blast manager start \
--peer-grpc-address=:5100 \
--grpc-address=:5120 \
--http-address=:8120 \
--node-id=cluster3 \
--node-id=manager3 \
--node-address=:2120 \
--data-dir=/tmp/blast/cluster3 \
--data-dir=/tmp/blast/manager3 \
--raft-storage-type=boltdb
```

Expand Down
41 changes: 20 additions & 21 deletions cmd/blast/dispatcher_node_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ import (
"fmt"
"os"

"github.com/mosuka/blast/protobuf/distribute"

"github.com/mosuka/blast/dispatcher"
"github.com/urfave/cli"
)

func dispatcherNodeHealth(c *cli.Context) error {
grpcAddr := c.String("grpc-address")
healthiness := c.Bool("healthiness")
liveness := c.Bool("liveness")
readiness := c.Bool("readiness")

Expand All @@ -38,34 +41,30 @@ func dispatcherNodeHealth(c *cli.Context) error {
}
}()

if !liveness && !readiness {
LivenessState, err := client.LivenessProbe()
var state string
if healthiness {
state, err = client.NodeHealthCheck(distribute.NodeHealthCheckRequest_HEALTHINESS.String())
if err != nil {
return err
state = distribute.NodeHealthCheckResponse_UNHEALTHY.String()
}
_, _ = fmt.Fprintln(os.Stdout, fmt.Sprintf("%v", LivenessState))

readinessState, err := client.ReadinessProbe()
} else if liveness {
state, err = client.NodeHealthCheck(distribute.NodeHealthCheckRequest_LIVENESS.String())
if err != nil {
return err
state = distribute.NodeHealthCheckResponse_DEAD.String()
}
_, _ = fmt.Fprintln(os.Stdout, fmt.Sprintf("%v", readinessState))
} else {
if liveness {
state, err := client.LivenessProbe()
if err != nil {
return err
}
_, _ = fmt.Fprintln(os.Stdout, fmt.Sprintf("%v", state))
} else if readiness {
state, err = client.NodeHealthCheck(distribute.NodeHealthCheckRequest_READINESS.String())
if err != nil {
state = distribute.NodeHealthCheckResponse_NOT_READY.String()
}
if readiness {
state, err := client.ReadinessProbe()
if err != nil {
return err
}
_, _ = fmt.Fprintln(os.Stdout, fmt.Sprintf("%v", state))
} else {
state, err = client.NodeHealthCheck(distribute.NodeHealthCheckRequest_HEALTHINESS.String())
if err != nil {
state = distribute.NodeHealthCheckResponse_UNHEALTHY.String()
}
}

_, _ = fmt.Fprintln(os.Stdout, fmt.Sprintf("%v", state))

return nil
}
2 changes: 1 addition & 1 deletion cmd/blast/indexer_cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func indexerClusterInfo(c *cli.Context) error {
}
}()

cluster, err := client.GetCluster()
cluster, err := client.ClusterInfo()
if err != nil {
return err
}
Expand Down
5 changes: 2 additions & 3 deletions cmd/blast/indexer_cluster_leave.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@ func indexerClusterLeave(c *cli.Context) error {
// get grpc address of leader node
}

grpcAddr := c.String("grpc-address")
nodeId := c.String("node-id")

client, err := indexer.NewGRPCClient(grpcAddr)
client, err := indexer.NewGRPCClient(peerGrpcAddr)
if err != nil {
return err
}
Expand All @@ -47,7 +46,7 @@ func indexerClusterLeave(c *cli.Context) error {
}
}()

err = client.DeleteNode(nodeId)
err = client.ClusterLeave(nodeId)
if err != nil {
return err
}
Expand Down
29 changes: 14 additions & 15 deletions cmd/blast/indexer_cluster_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@ package main

import (
"encoding/json"
"errors"
"fmt"
"io"
"log"
"os"

"github.com/mosuka/blast/indexer"
"github.com/mosuka/blast/protobuf"
"github.com/mosuka/blast/protobuf/index"
"github.com/urfave/cli"
)

Expand All @@ -41,12 +40,22 @@ func indexerClusterWatch(c *cli.Context) error {
}
}()

err = indexerClusterInfo(c)
cluster, err := client.ClusterInfo()
if err != nil {
return err
}
resp := &index.ClusterWatchResponse{
Event: 0,
Node: nil,
Cluster: cluster,
}
clusterBytes, err := json.MarshalIndent(resp, "", " ")
if err != nil {
return err
}
_, _ = fmt.Fprintln(os.Stdout, fmt.Sprintf("%v", string(clusterBytes)))

watchClient, err := client.WatchCluster()
watchClient, err := client.ClusterWatch()
if err != nil {
return err
}
Expand All @@ -61,17 +70,7 @@ func indexerClusterWatch(c *cli.Context) error {
break
}

cluster, err := protobuf.MarshalAny(resp.Cluster)
if err != nil {
return err
}
if cluster == nil {
return errors.New("nil")
}

var clusterBytes []byte
clusterMap := *cluster.(*map[string]interface{})
clusterBytes, err = json.MarshalIndent(clusterMap, "", " ")
clusterBytes, err = json.MarshalIndent(resp, "", " ")
if err != nil {
return err
}
Expand Down
40 changes: 19 additions & 21 deletions cmd/blast/indexer_node_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ import (
"os"

"github.com/mosuka/blast/indexer"
"github.com/mosuka/blast/protobuf/index"
"github.com/urfave/cli"
)

func indexerNodeHealth(c *cli.Context) error {
grpcAddr := c.String("grpc-address")
healthiness := c.Bool("healthiness")
liveness := c.Bool("liveness")
readiness := c.Bool("readiness")

Expand All @@ -38,34 +40,30 @@ func indexerNodeHealth(c *cli.Context) error {
}
}()

if !liveness && !readiness {
LivenessState, err := client.LivenessProbe()
var state string
if healthiness {
state, err = client.NodeHealthCheck(index.NodeHealthCheckRequest_HEALTHINESS.String())
if err != nil {
return err
state = index.NodeHealthCheckResponse_UNHEALTHY.String()
}
_, _ = fmt.Fprintln(os.Stdout, fmt.Sprintf("%v", LivenessState))

readinessState, err := client.ReadinessProbe()
} else if liveness {
state, err = client.NodeHealthCheck(index.NodeHealthCheckRequest_LIVENESS.String())
if err != nil {
return err
state = index.NodeHealthCheckResponse_DEAD.String()
}
_, _ = fmt.Fprintln(os.Stdout, fmt.Sprintf("%v", readinessState))
} else {
if liveness {
state, err := client.LivenessProbe()
if err != nil {
return err
}
_, _ = fmt.Fprintln(os.Stdout, fmt.Sprintf("%v", state))
} else if readiness {
state, err = client.NodeHealthCheck(index.NodeHealthCheckRequest_READINESS.String())
if err != nil {
state = index.NodeHealthCheckResponse_NOT_READY.String()
}
if readiness {
state, err := client.ReadinessProbe()
if err != nil {
return err
}
_, _ = fmt.Fprintln(os.Stdout, fmt.Sprintf("%v", state))
} else {
state, err = client.NodeHealthCheck(index.NodeHealthCheckRequest_HEALTHINESS.String())
if err != nil {
state = index.NodeHealthCheckResponse_UNHEALTHY.String()
}
}

_, _ = fmt.Fprintln(os.Stdout, fmt.Sprintf("%v", state))

return nil
}
18 changes: 3 additions & 15 deletions cmd/blast/indexer_node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,8 @@ import (
)

func indexerNodeInfo(c *cli.Context) error {
clusterGrpcAddr := c.String("cluster-grpc-address")
shardId := c.String("shard-id")
peerGrpcAddr := c.String("peer-grpc-address")

if clusterGrpcAddr != "" && shardId != "" {

} else if peerGrpcAddr != "" {

}

grpcAddr := c.String("grpc-address")

nodeId := c.Args().Get(0)

client, err := indexer.NewGRPCClient(grpcAddr)
if err != nil {
return err
Expand All @@ -49,17 +37,17 @@ func indexerNodeInfo(c *cli.Context) error {
}
}()

metadata, err := client.GetNode(nodeId)
node, err := client.NodeInfo()
if err != nil {
return err
}

metadataBytes, err := json.MarshalIndent(metadata, "", " ")
nodeBytes, err := json.MarshalIndent(node, "", " ")
if err != nil {
return err
}

_, _ = fmt.Fprintln(os.Stdout, fmt.Sprintf("%v", string(metadataBytes)))
_, _ = fmt.Fprintln(os.Stdout, fmt.Sprintf("%v", string(nodeBytes)))

return nil
}
Loading