这是indexloc提供的服务,不要输入任何密码
Skip to content
This repository was archived by the owner on Dec 10, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Added

- Add index stats #37
- Add Wikipedia example #35
- Support cznicb and leveldb #34
- Add logging #33
Expand All @@ -23,7 +24,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- update Makefile #31


## [0.4.0] - 2019-03-13
## [0.4.0] - 2019-03-14

### Changed

Expand Down
14 changes: 8 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -585,18 +585,18 @@ $ docker exec -it blast-index1 blast-index node --grpc-addr=:5050
This section explain how to index Wikipedia dump to Blast.


### Download wikipedia dump
### Install wikiextractor

```bash
$ curl -o ~/tmp/enwiki-20190101-pages-articles.xml.bz2 https://dumps.wikimedia.org/enwiki/20190101/enwiki-20190101-pages-articles.xml.bz2
$ cd ${HOME}
$ git clone git@github.com:attardi/wikiextractor.git
```


### Install wikiextractor
### Download wikipedia dump

```bash
$ cd ${HOME}
$ git clone git@github.com:attardi/wikiextractor.git
$ curl -o ~/tmp/enwiki-20190101-pages-articles.xml.bz2 https://dumps.wikimedia.org/enwiki/20190101/enwiki-20190101-pages-articles.xml.bz2
```


Expand All @@ -613,10 +613,12 @@ $ ./WikiExtractor.py -o ~/tmp/enwiki --json ~/tmp/enwiki-20190101-pages-articles
```bash
$ for FILE in $(find ~/tmp/enwiki -type f -name '*' | sort)
do
echo "${FILE}"
cat ${FILE} | while read -r LINE; do
TIMESTAMP=$(date -u "+%Y-%m-%dT%H:%M:%SZ")
ID=$(echo ${LINE} | jq -r .id)
FIELDS=$(echo ${LINE} | jq -c -r '{url: .url, title_en: .title, text_en: .text, timestamp: "'${TIMESTAMP}'"}')
FIELDS=$(echo "${LINE}" | jq -c -r '{url: .url, title_en: .title, text_en: .text, timestamp: "'${TIMESTAMP}'"}')
echo "- ${ID} ${FIELDS}"
curl -X PUT "http://127.0.0.1:8080/documents/${ID}" -d "${FIELDS}"
done
done
Expand Down
15 changes: 13 additions & 2 deletions cmd/blast-index/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ func main() {
Usage: "gRPC address to connect to",
},
},
//ArgsUsage: "[id]",
Action: execCluster,
},
{
Expand Down Expand Up @@ -244,13 +243,25 @@ func main() {
Flags: []cli.Flag{
cli.StringFlag{
Name: "grpc-addr, g",
Value: "gRPC :5050",
Value: ":5050",
Usage: "address to connect to",
},
},
ArgsUsage: "[id]",
Action: execDelete,
},
{
Name: "stats",
Usage: "Get a index stats",
Flags: []cli.Flag{
cli.StringFlag{
Name: "grpc-addr, g",
Value: ":5050",
Usage: "address to connect to",
},
},
Action: execStats,
},
}

cli.HelpFlag = cli.BoolFlag{
Expand Down
67 changes: 67 additions & 0 deletions cmd/blast-index/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) 2019 Minoru Osuka
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"encoding/json"
"errors"
"fmt"
"os"

"github.com/mosuka/blast/index"
"github.com/mosuka/blast/protobuf"
"github.com/urfave/cli"
)

func execStats(c *cli.Context) error {
grpcAddr := c.String("grpc-addr")

client, err := index.NewGRPCClient(grpcAddr)
if err != nil {
return err
}
defer func() {
err := client.Close()
if err != nil {
fmt.Fprintln(os.Stderr, err)
}
}()

resp, err := client.GetIndexStats()
if err != nil {
return err
}

// Any -> map[string]interface{}
var statsMap *map[string]interface{}
statsInstance, err := protobuf.MarshalAny(resp.Stats)
if err != nil {
return err
}
if statsInstance == nil {
return errors.New("nil")
}
statsMap = statsInstance.(*map[string]interface{})

// map[string]interface -> []byte
fieldsBytes, err := json.MarshalIndent(statsMap, "", " ")
if err != nil {
return err
}

fmt.Fprintln(os.Stdout, fmt.Sprintf("%v\n", string(fieldsBytes)))

return nil
}
11 changes: 11 additions & 0 deletions index/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,14 @@ func (c *GRPCClient) Delete(doc *index.Document, opts ...grpc.CallOption) error

return nil
}

func (c *GRPCClient) GetIndexStats(opts ...grpc.CallOption) (*index.Stats, error) {
resp, err := c.client.GetStats(c.ctx, &empty.Empty{}, opts...)
if err != nil {
st, _ := status.FromError(err)

return nil, errors.New(st.Message())
}

return resp.Stats, nil
}
18 changes: 18 additions & 0 deletions index/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,21 @@ func (s *GRPCService) Delete(ctx context.Context, req *index.DeleteRequest) (*em

return resp, nil
}

func (s *GRPCService) GetStats(ctx context.Context, req *empty.Empty) (*index.GetStatsResponse, error) {
start := time.Now()
defer RecordMetrics(start, "stats")

resp := &index.GetStatsResponse{}

s.logger.Printf("[INFO] stats %v", req)

stats, err := s.raftServer.Stats()
if err != nil {
return resp, status.Error(codes.Internal, err.Error())
}

resp.Stats = stats

return resp, nil
}
11 changes: 11 additions & 0 deletions index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,17 @@ func (b *Index) Delete(id string) error {
return nil
}

func (b *Index) Stats() (map[string]interface{}, error) {
start := time.Now()
defer func() {
b.logger.Printf("[DEBUG] stats %f", float64(time.Since(start))/float64(time.Second))
}()

stats := b.index.StatsMap()

return stats, nil
}

func (b *Index) SnapshotItems() <-chan *pbindex.Document {
ch := make(chan *pbindex.Document, 1024)

Expand Down
9 changes: 9 additions & 0 deletions index/raft_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,15 @@ func (f *RaftFSM) Apply(l *raft.Log) interface{} {
}
}

func (f *RaftFSM) Stats() (map[string]interface{}, error) {
stats, err := f.index.Stats()
if err != nil {
return nil, err
}

return stats, nil
}

func (f *RaftFSM) Snapshot() (raft.FSMSnapshot, error) {
return &IndexFSMSnapshot{
index: f.index,
Expand Down
20 changes: 20 additions & 0 deletions index/raft_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,3 +626,23 @@ func (s *RaftServer) Delete(doc *index.Document) error {

return nil
}

func (s *RaftServer) Stats() (*index.Stats, error) {
statsMap, err := s.fsm.Stats()
if err != nil {
return nil, err
}

// map[string]interface{} -> Any
statsAny := &any.Any{}
err = protobuf.UnmarshalAny(statsMap, statsAny)
if err != nil {
return nil, err
}

indexStats := &index.Stats{
Stats: statsAny,
}

return indexStats, nil
}
Loading