From 1dd870bbd91bd8e6fac144ca34def8f7b6a00902 Mon Sep 17 00:00:00 2001 From: Minoru Osuka Date: Thu, 21 Mar 2019 10:20:52 +0900 Subject: [PATCH 1/3] Support buil update --- README.md | 128 +++++++++++++++-- cmd/blast-index/delete.go | 47 ++++++- cmd/blast-index/get.go | 8 +- cmd/blast-index/index.go | 89 ++++++++---- cmd/blast-index/main.go | 46 +++--- config/config_bleve.go | 19 +++ example/docs_wiki.json | 38 +++++ index/grpc_client.go | 48 +++++++ index/grpc_service.go | 49 +++++++ index/http_handler.go | 200 ++++++++++++++++++++++---- index/http_server.go | 2 + index/raft_server.go | 1 - protobuf/index/index.pb.go | 280 ++++++++++++++++++++++++++++++------- protobuf/index/index.proto | 6 + protobuf/util_test.go | 248 ++++++++++++++++++++++++++++++++ 15 files changed, 1061 insertions(+), 148 deletions(-) create mode 100644 config/config_bleve.go create mode 100644 example/docs_wiki.json create mode 100644 protobuf/util_test.go diff --git a/README.md b/README.md index f6194e7..ef8b3c9 100644 --- a/README.md +++ b/README.md @@ -129,7 +129,7 @@ $ make \ #### macOS ```bash -$ make GOOS=darwin \ +$ make \ GOOS=darwin \ BUILD_TAGS="kagome icu libstemmer cld2 cznicb leveldb badger" \ CGO_ENABLED=1 \ @@ -166,6 +166,41 @@ blast-index ``` +## Testing Blast + +If you want to test your changes, run command like following: + +```bash +$ make \ + test +``` + +You can test with all the Bleve extensions supported by Blast as follows: + + +### Linux + +```bash +$ make \ + BUILD_TAGS="kagome icu libstemmer cld2 cznicb leveldb badger" \ + CGO_ENABLED=1 \ + test +``` + + +#### macOS + +```bash +$ make \ + BUILD_TAGS="kagome icu libstemmer cld2 cznicb leveldb badger" \ + CGO_ENABLED=1 \ + CGO_LDFLAGS="-L/usr/local/opt/icu4c/lib" \ + CGO_CFLAGS="-I/usr/local/opt/icu4c/include" \ + build +``` + + + ## Starting Blast index node Running a Blast index node is easy. Start Blast data node like so: @@ -188,7 +223,15 @@ You can now put, get, search and delete the documents via CLI. For document indexing, execute the following command: ```bash -$ cat ./example/doc_enwiki_1.json | xargs -0 ./bin/blast-index index --grpc-addr=:5050 enwiki_1 +$ cat ./example/doc_enwiki_1.json | xargs -0 ./bin/blast-index index --grpc-addr=:5050 --id=enwiki_1 +``` + +You can see the result in JSON format. The result of the above command is: + +```bash +{ + "count": 1 +} ``` @@ -197,7 +240,7 @@ $ cat ./example/doc_enwiki_1.json | xargs -0 ./bin/blast-index index --grpc-addr Getting a document is as following: ```bash -$ ./bin/blast-index get --grpc-addr=:5050 enwiki_1 +$ ./bin/blast-index get --grpc-addr=:5050 --id=enwiki_1 ``` You can see the result in JSON format. The result of the above command is: @@ -390,7 +433,49 @@ Please refer to following document for details of search request and result: Deleting a document is as following: ```bash -$ ./bin/blast-index delete --grpc-addr=:5050 enwiki_1 +$ ./bin/blast-index delete --grpc-addr=:5050 --id=enwiki_1 +``` + +You can see the result in JSON format. The result of the above command is: + +```bash +{ + "count": 1 +} +``` + + +### Indexing documents in bulk via CLI + +Indexing documents in bulk, run the following command: + +```bash +$ cat ./example/docs_wiki.json | xargs -0 ./bin/blast-index index --grpc-addr=:5050 +``` + +You can see the result in JSON format. The result of the above command is: + +```bash +{ + "count": 4 +} +``` + + +### Deleting documents in bulk via CLI + +Deleting documents in bulk, run the following command: + +```bash +$ cat ./example/docs_wiki.json | xargs -0 ./bin/blast-index delete --grpc-addr=:5050 +``` + +You can see the result in JSON format. The result of the above command is: + +```bash +{ + "count": 4 +} ``` @@ -401,10 +486,10 @@ Also you can do above commands via HTTP REST API that listened port 8080. ### Indexing a document via HTTP REST API -Putting a document via HTTP is as following: +Indexing a document via HTTP is as following: ```bash -$ curl -X PUT 'http://127.0.0.1:8080/documents/enwiki_1' -d @./example/doc_enwiki_1.json +$ curl -s -X PUT 'http://127.0.0.1:8080/documents/enwiki_1' -d @./example/doc_enwiki_1.json ``` @@ -413,7 +498,7 @@ $ curl -X PUT 'http://127.0.0.1:8080/documents/enwiki_1' -d @./example/doc_enwik Getting a document via HTTP is as following: ```bash -$ curl -X GET 'http://127.0.0.1:8080/documents/enwiki_1' +$ curl -s -X GET 'http://127.0.0.1:8080/documents/enwiki_1' ``` @@ -435,6 +520,24 @@ $ curl -X DELETE 'http://127.0.0.1:8080/documents/enwiki_1' ``` +### Indexing documents in bulk via HTTP REST API + +Indexing documents in bulk via HTTP is as following: + +```bash +$ curl -s -X PUT 'http://127.0.0.1:8080/documents' -d @./example/docs_wiki.json +``` + + +### Deleting documents in bulk via HTTP REST API + +Deleting documents in bulk via HTTP is as following: + +```bash +$ curl -X DELETE 'http://127.0.0.1:8080/documents' -d @./example/docs_wiki.json +``` + + ## Bringing up a cluster Blast is easy to bring up the cluster. Blast data node is already running, but that is not fault tolerant. If you need to increase the fault tolerance, bring up 2 more data nodes like so: @@ -621,12 +724,9 @@ $ ./WikiExtractor.py -o ~/tmp/enwiki --json ~/tmp/enwiki-20190101-pages-articles $ for FILE in $(find ~/tmp/enwiki -type f -name '*' | sort) do echo "${FILE}" - cat ${FILE} | while read -r LINE; do - TIMESTAMP=$(date -u "+%Y-%m-%dT%H:%M:%SZ") - ID=$(echo ${LINE} | jq -r .id) - FIELDS=$(echo "${LINE}" | jq -c -r '{url: .url, title_en: .title, text_en: .text, timestamp: "'${TIMESTAMP}'", _type: "enwiki"}') - echo "- ${ID} ${FIELDS}" - curl -X PUT "http://127.0.0.1:8080/documents/${ID}" -d "${FIELDS}" - done + TIMESTAMP=$(date -u "+%Y-%m-%dT%H:%M:%SZ") + DOCS=$(cat ${FILE} | jq -r '. + {fields: {url: .url, title_en: .title, text_en: .text, timestamp: "'${TIMESTAMP}'", _type: "enwiki"}} | del(.url) | del(.title) | del(.text) | del(.fields.id)' | jq -s) + echo "${DOCS}" + curl -s -X PUT -H 'Content-Type: application/json' "http://127.0.0.1:8080/documents" -d "${DOCS}" done ``` diff --git a/cmd/blast-index/delete.go b/cmd/blast-index/delete.go index 4328b9d..ebf11ae 100644 --- a/cmd/blast-index/delete.go +++ b/cmd/blast-index/delete.go @@ -15,6 +15,7 @@ package main import ( + "encoding/json" "errors" "fmt" "os" @@ -26,18 +27,43 @@ import ( func execDelete(c *cli.Context) error { grpcAddr := c.String("grpc-addr") + id := c.String("id") - id := c.Args().Get(0) + // create documents + docs := make([]*pbindex.Document, 0) if id == "" { - err := errors.New("key argument must be set") - return err - } + if c.NArg() == 0 { + err := errors.New("arguments are not correct") + return err + } + + // documents + docsStr := c.Args().Get(0) + + var docMaps []map[string]interface{} + err := json.Unmarshal([]byte(docsStr), &docMaps) + if err != nil { + return err + } + + for _, docMap := range docMaps { + // create document + doc := &pbindex.Document{ + Id: docMap["id"].(string), + } - doc := &pbindex.Document{ - Id: id, + docs = append(docs, doc) + } + } else { + doc := &pbindex.Document{ + Id: id, + } + + docs = append(docs, doc) } + // create client client, err := index.NewGRPCClient(grpcAddr) if err != nil { return err @@ -49,10 +75,17 @@ func execDelete(c *cli.Context) error { } }() - err = client.Delete(doc) + result, err := client.BulkDelete(docs) + if err != nil { + return err + } + + resultBytes, err := json.MarshalIndent(result, "", " ") if err != nil { return err } + fmt.Fprintln(os.Stdout, fmt.Sprintf("%v\n", string(resultBytes))) + return nil } diff --git a/cmd/blast-index/get.go b/cmd/blast-index/get.go index 30c505e..7776595 100644 --- a/cmd/blast-index/get.go +++ b/cmd/blast-index/get.go @@ -20,19 +20,17 @@ import ( "fmt" "os" - "github.com/mosuka/blast/protobuf" - "github.com/mosuka/blast/index" + "github.com/mosuka/blast/protobuf" pbindex "github.com/mosuka/blast/protobuf/index" "github.com/urfave/cli" ) func execGet(c *cli.Context) error { grpcAddr := c.String("grpc-addr") - - id := c.Args().Get(0) + id := c.String("id") if id == "" { - err := errors.New("key argument must be set") + err := errors.New("arguments are not correct") return err } diff --git a/cmd/blast-index/index.go b/cmd/blast-index/index.go index f1f3aae..b3158d4 100644 --- a/cmd/blast-index/index.go +++ b/cmd/blast-index/index.go @@ -29,39 +29,70 @@ import ( func execIndex(c *cli.Context) error { grpcAddr := c.String("grpc-addr") + id := c.String("id") - id := c.Args().Get(0) - if id == "" { - err := errors.New("key argument must be set") + if c.NArg() == 0 { + err := errors.New("arguments are not correct") return err } - fields := c.Args().Get(1) - if fields == "" { - err := errors.New("value argument must be set") - return err - } + // create documents + docs := make([]*pbindex.Document, 0) - // string -> map[string]interface{} - var fieldsMap map[string]interface{} - err := json.Unmarshal([]byte(fields), &fieldsMap) - if err != nil { - return err - } + if id == "" { + // documents + docsStr := c.Args().Get(0) - // map[string]interface{} -> Any - fieldsAny := &any.Any{} - err = protobuf.UnmarshalAny(fieldsMap, fieldsAny) - if err != nil { - return err - } + var docMaps []map[string]interface{} + err := json.Unmarshal([]byte(docsStr), &docMaps) + if err != nil { + return err + } - // create document - doc := &pbindex.Document{ - Id: id, - Fields: fieldsAny, + for _, docMap := range docMaps { + // map[string]interface{} -> Any + fieldsAny := &any.Any{} + err = protobuf.UnmarshalAny(docMap["fields"], fieldsAny) + if err != nil { + return err + } + + // create document + doc := &pbindex.Document{ + Id: docMap["id"].(string), + Fields: fieldsAny, + } + + docs = append(docs, doc) + } + } else { + // document + fields := c.Args().Get(0) + + // string -> map[string]interface{} + var fieldsMap map[string]interface{} + err := json.Unmarshal([]byte(fields), &fieldsMap) + if err != nil { + return err + } + + // map[string]interface{} -> Any + fieldsAny := &any.Any{} + err = protobuf.UnmarshalAny(fieldsMap, fieldsAny) + if err != nil { + return err + } + + // create document + doc := &pbindex.Document{ + Id: id, + Fields: fieldsAny, + } + + docs = append(docs, doc) } + // create gRPC client client, err := index.NewGRPCClient(grpcAddr) if err != nil { return err @@ -73,10 +104,18 @@ func execIndex(c *cli.Context) error { } }() - err = client.Index(doc) + // index documents in bulk + result, err := client.BulkIndex(docs) if err != nil { return err } + resultBytes, err := json.MarshalIndent(result, "", " ") + if err != nil { + return err + } + + fmt.Fprintln(os.Stdout, fmt.Sprintf("%v\n", string(resultBytes))) + return nil } diff --git a/cmd/blast-index/main.go b/cmd/blast-index/main.go index 398c8b8..221f038 100644 --- a/cmd/blast-index/main.go +++ b/cmd/blast-index/main.go @@ -207,48 +207,62 @@ func main() { Value: ":5050", Usage: "gRPC address to connect to", }, + cli.StringFlag{ + Name: "id, i", + Value: "", + Usage: "document id", + }, }, - ArgsUsage: "[id]", - Action: execGet, + Action: execGet, }, { - Name: "search", - Usage: "Search documents", + Name: "index", + Usage: "Index documents in bulk", Flags: []cli.Flag{ cli.StringFlag{ Name: "grpc-addr, g", Value: ":5050", Usage: "gRPC address to connect to", }, + cli.StringFlag{ + Name: "id, i", + Value: "", + Usage: "document id", + }, }, - ArgsUsage: "[search request]", - Action: execSearch, + ArgsUsage: "[documents | fields]", + Action: execIndex, }, { - Name: "index", - Usage: "Index a document", + Name: "delete", + Usage: "Delete documents in bulk", Flags: []cli.Flag{ cli.StringFlag{ Name: "grpc-addr, g", Value: ":5050", - Usage: "gRPC address to connect to", + Usage: "address to connect to", + }, + cli.StringFlag{ + Name: "id, i", + Value: "", + Usage: "document id", }, }, - ArgsUsage: "[id] [fields]", - Action: execIndex, + ArgsUsage: "[documents]", + Action: execDelete, }, { - Name: "delete", - Usage: "Delete a document", + Name: "search", + Usage: "Search documents", Flags: []cli.Flag{ cli.StringFlag{ Name: "grpc-addr, g", Value: ":5050", - Usage: "address to connect to", + Usage: "gRPC address to connect to", }, }, - ArgsUsage: "[id]", - Action: execDelete, + ArgsUsage: "[search request]", + Action: execSearch, }, { Name: "stats", diff --git a/config/config_bleve.go b/config/config_bleve.go new file mode 100644 index 0000000..879ee5e --- /dev/null +++ b/config/config_bleve.go @@ -0,0 +1,19 @@ +// Copyright (c) 2019 Minoru Osuka +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + _ "github.com/blevesearch/bleve/config" +) diff --git a/example/docs_wiki.json b/example/docs_wiki.json new file mode 100644 index 0000000..42af51b --- /dev/null +++ b/example/docs_wiki.json @@ -0,0 +1,38 @@ +[ + { + "id": "arwiki_1", + "fields": { + "title_ar": "محرك بحث", + "text_ar": "محرك البحث (بالإنجليزية: Search engine) هو نظام لإسترجاع المعلومات صمم للمساعدة على البحث عن المعلومات المخزنة على أي نظام حاسوبي. تعرض نتائج البحث عادة على شكل قائمة لأماكن تواجد المعلومات ومرتبة وفق معايير معينة. تسمح محركات البحث باختصار مدة البحث والتغلب على مشكلة أحجام البيانات المتصاعدة (إغراق معلوماتي).", + "timestamp": "2018-03-25T18:04:00Z", + "_type": "arwiki" + } + }, + { + "id": "bgwiki_1", + "fields": { + "title_bg": "Търсачка", + "text_bg": "Търсачка или търсеща машина (на английски: Web search engine) е специализиран софтуер за извличане на информация, съхранена в компютърна система или мрежа. Това може да е персонален компютър, Интернет, корпоративна мрежа и т.н. Без допълнителни уточнения, най-често под търсачка се разбира уеб(-)търсачка, която търси в Интернет. Други видове търсачки са корпоративните търсачки, които търсят в интранет мрежите, личните търсачки – за индивидуалните компютри и мобилните търсачки. В търсачката потребителят (търсещият) прави запитване за съдържание, отговарящо на определен критерий (обикновено такъв, който съдържа определени думи и фрази). В резултат се получават списък от точки, които отговарят, пълно или частично, на този критерий. Търсачките обикновено използват редовно подновявани индекси, за да оперират бързо и ефикасно. Някои търсачки също търсят в информацията, която е на разположение в нюзгрупите и други големи бази данни. За разлика от Уеб директориите, които се поддържат от хора редактори, търсачките оперират алгоритмично. Повечето Интернет търсачки са притежавани от различни корпорации.", + "timestamp": "2018-07-11T11:03:00Z", + "_type": "bgwiki" + } + }, + { + "id": "cawiki_1", + "fields": { + "title_ca": "Motor de cerca", + "text_ca": "Un motor de cerca o de recerca o bé cercador és un programa informàtic dissenyat per ajudar a trobar informació emmagatzemada en un sistema informàtic com ara una xarxa, Internet, un servidor o un ordinador personal. L'objectiu principal és el de trobar altres programes informàtics, pàgines web i documents, entre d'altres. A partir d'una determinada paraula o paraules o una determinada frase l'usuari demana un contingut sota un criteri determinat i retorna una llista de referències que compleixin aquest criteri. El procés es realitza a través de les metadades, vies per comunicar informació que utilitzen els motors per cada cerca. Els índex que utilitzen els cercadors sempre estan actualitzats a través d'un robot web per generar rapidesa i eficàcia en la recerca. Els directoris, en canvi, són gestionats per editors humans.", + "timestamp": "2018-07-09T18:07:00Z", + "_type": "cawiki" + } + }, + { + "id": "zhwiki_1", + "fields": { + "title_zh": "搜索引擎", + "text_zh": "搜索引擎(英语:search engine)是一种信息检索系统,旨在协助搜索存储在计算机系统中的信息。搜索结果一般被称为“hits”,通常会以表单的形式列出。网络搜索引擎是最常见、公开的一种搜索引擎,其功能为搜索万维网上储存的信息.", + "timestamp": "2018-08-27T05:47:00Z", + "_type": "zhwiki" + } + } +] diff --git a/index/grpc_client.go b/index/grpc_client.go index 87361b0..b045309 100644 --- a/index/grpc_client.go +++ b/index/grpc_client.go @@ -208,6 +208,30 @@ func (c *GRPCClient) Index(doc *index.Document, opts ...grpc.CallOption) error { return nil } +func (c *GRPCClient) BulkIndex(docs []*index.Document, opts ...grpc.CallOption) (*index.BulkResult, error) { + stream, err := c.client.BulkIndex(c.ctx, opts...) + if err != nil { + st, _ := status.FromError(err) + + return nil, errors.New(st.Message()) + } + + for _, doc := range docs { + err := stream.Send(doc) + if err != nil { + c.logger.Printf("[WARN]: %v", err) + break + } + } + + rep, err := stream.CloseAndRecv() + if err != nil { + return nil, err + } + + return rep, nil +} + func (c *GRPCClient) Delete(doc *index.Document, opts ...grpc.CallOption) error { req := &index.DeleteRequest{ Document: doc, @@ -223,6 +247,30 @@ func (c *GRPCClient) Delete(doc *index.Document, opts ...grpc.CallOption) error return nil } +func (c *GRPCClient) BulkDelete(docs []*index.Document, opts ...grpc.CallOption) (*index.BulkResult, error) { + stream, err := c.client.BulkDelete(c.ctx, opts...) + if err != nil { + st, _ := status.FromError(err) + + return nil, errors.New(st.Message()) + } + + for _, doc := range docs { + err := stream.Send(doc) + if err != nil { + c.logger.Printf("[WARN]: %v", err) + break + } + } + + rep, err := stream.CloseAndRecv() + if err != nil { + return nil, err + } + + return rep, nil +} + func (c *GRPCClient) GetIndexStats(opts ...grpc.CallOption) (*index.Stats, error) { resp, err := c.client.GetStats(c.ctx, &empty.Empty{}, opts...) if err != nil { diff --git a/index/grpc_service.go b/index/grpc_service.go index 381621c..5501569 100644 --- a/index/grpc_service.go +++ b/index/grpc_service.go @@ -16,6 +16,7 @@ package index import ( "context" + "io" "log" "time" @@ -183,6 +184,30 @@ func (s *GRPCService) Index(ctx context.Context, req *index.IndexRequest) (*empt return resp, nil } +func (s *GRPCService) BulkIndex(stream index.Index_BulkIndexServer) error { + count := int32(0) + + for { + doc, err := stream.Recv() + if err == io.EOF { + return stream.SendAndClose(&index.BulkResult{ + Count: count, + }) + } + if err != nil { + return err + } + + // index + err = s.raftServer.Index(doc) + if err != nil { + return err + } + + count++ + } +} + func (s *GRPCService) Delete(ctx context.Context, req *index.DeleteRequest) (*empty.Empty, error) { start := time.Now() defer RecordMetrics(start, "delete") @@ -199,6 +224,30 @@ func (s *GRPCService) Delete(ctx context.Context, req *index.DeleteRequest) (*em return resp, nil } +func (s *GRPCService) BulkDelete(stream index.Index_BulkDeleteServer) error { + count := int32(0) + + for { + doc, err := stream.Recv() + if err == io.EOF { + return stream.SendAndClose(&index.BulkResult{ + Count: count, + }) + } + if err != nil { + return err + } + + // index + err = s.raftServer.Delete(doc) + if err != nil { + return err + } + + count++ + } +} + func (s *GRPCService) GetStats(ctx context.Context, req *empty.Empty) (*index.GetStatsResponse, error) { start := time.Now() defer RecordMetrics(start, "stats") diff --git a/index/http_handler.go b/index/http_handler.go index b4f9186..2eb91fd 100644 --- a/index/http_handler.go +++ b/index/http_handler.go @@ -21,14 +21,14 @@ import ( "net/http" "time" - "github.com/golang/protobuf/ptypes/any" - "github.com/mosuka/blast/protobuf" - "github.com/mosuka/blast/protobuf/index" - "github.com/blevesearch/bleve" + "github.com/golang/protobuf/ptypes/any" "github.com/gorilla/mux" "github.com/mosuka/blast/errors" blasthttp "github.com/mosuka/blast/http" + "github.com/mosuka/blast/protobuf" + "github.com/mosuka/blast/protobuf/index" + pbindex "github.com/mosuka/blast/protobuf/index" "github.com/mosuka/blast/version" ) @@ -186,9 +186,13 @@ func (h *IndexHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { blasthttp.RecordMetrics(start, httpStatus, w, r, h.logger) }() + // create documents + docs := make([]*index.Document, 0) + vars := mux.Vars(r) + id := vars["id"] - fieldsBytes, err := ioutil.ReadAll(r.Body) + bodyBytes, err := ioutil.ReadAll(r.Body) if err != nil { httpStatus = http.StatusInternalServerError @@ -205,30 +209,102 @@ func (h *IndexHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - // bytes -> map[sgtring]inerface{} - var fieldsMap map[string]interface{} - err = json.Unmarshal(fieldsBytes, &fieldsMap) - if err != nil { - httpStatus = http.StatusBadRequest + if id == "" { + // Indexing documents in bulk + var docMaps []map[string]interface{} + err := json.Unmarshal(bodyBytes, &docMaps) + if err != nil { + httpStatus = http.StatusBadRequest - msgMap := map[string]interface{}{ - "message": err.Error(), - "status": httpStatus, + msgMap := map[string]interface{}{ + "message": err.Error(), + "status": httpStatus, + } + + content, err = blasthttp.NewJSONMessage(msgMap) + if err != nil { + h.logger.Printf("[ERR] %v", err) + } + + return } - content, err = blasthttp.NewJSONMessage(msgMap) + for _, docMap := range docMaps { + fieldsAny := &any.Any{} + err = protobuf.UnmarshalAny(docMap["fields"], fieldsAny) + if err != nil { + httpStatus = http.StatusBadRequest + + msgMap := map[string]interface{}{ + "message": err.Error(), + "status": httpStatus, + } + + content, err = blasthttp.NewJSONMessage(msgMap) + if err != nil { + h.logger.Printf("[ERR] %v", err) + } + + return + } + + doc := &pbindex.Document{ + Id: docMap["id"].(string), + Fields: fieldsAny, + } + + docs = append(docs, doc) + } + } else { + // Indexing a document + var fieldsMap map[string]interface{} + err := json.Unmarshal(bodyBytes, &fieldsMap) if err != nil { - h.logger.Printf("[ERR] %v", err) + httpStatus = http.StatusBadRequest + + msgMap := map[string]interface{}{ + "message": err.Error(), + "status": httpStatus, + } + + content, err = blasthttp.NewJSONMessage(msgMap) + if err != nil { + h.logger.Printf("[ERR] %v", err) + } + + return } - return + fieldsAny := &any.Any{} + err = protobuf.UnmarshalAny(fieldsMap, fieldsAny) + if err != nil { + httpStatus = http.StatusBadRequest + + msgMap := map[string]interface{}{ + "message": err.Error(), + "status": httpStatus, + } + + content, err = blasthttp.NewJSONMessage(msgMap) + if err != nil { + h.logger.Printf("[ERR] %v", err) + } + + return + } + + doc := &pbindex.Document{ + Id: id, + Fields: fieldsAny, + } + + docs = append(docs, doc) } - // map[string]interface{} -> Any - fieldsAny := &any.Any{} - err = protobuf.UnmarshalAny(fieldsMap, fieldsAny) + // index documents in bulk + result, err := h.client.BulkIndex(docs) if err != nil { - httpStatus = http.StatusBadRequest + httpStatus = http.StatusInternalServerError msgMap := map[string]interface{}{ "message": err.Error(), @@ -243,12 +319,7 @@ func (h *IndexHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - doc := &index.Document{ - Id: vars["id"], - Fields: fieldsAny, - } - - err = h.client.Index(doc) + content, err = json.MarshalIndent(result, "", " ") if err != nil { httpStatus = http.StatusInternalServerError @@ -287,13 +358,84 @@ func (h *DeleteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { blasthttp.RecordMetrics(start, httpStatus, w, r, h.logger) }() + // create documents + docs := make([]*index.Document, 0) + vars := mux.Vars(r) + id := vars["id"] - doc := &index.Document{ - Id: vars["id"], + bodyBytes, err := ioutil.ReadAll(r.Body) + if err != nil { + httpStatus = http.StatusInternalServerError + + msgMap := map[string]interface{}{ + "message": err.Error(), + "status": httpStatus, + } + + content, err = blasthttp.NewJSONMessage(msgMap) + if err != nil { + h.logger.Printf("[ERR] %v", err) + } + + return + } + + if id == "" { + // Deleting documents in bulk + var docMaps []map[string]interface{} + err := json.Unmarshal(bodyBytes, &docMaps) + if err != nil { + httpStatus = http.StatusBadRequest + + msgMap := map[string]interface{}{ + "message": err.Error(), + "status": httpStatus, + } + + content, err = blasthttp.NewJSONMessage(msgMap) + if err != nil { + h.logger.Printf("[ERR] %v", err) + } + + return + } + + for _, docMap := range docMaps { + doc := &pbindex.Document{ + Id: docMap["id"].(string), + } + + docs = append(docs, doc) + } + } else { + // Deleting a document + doc := &pbindex.Document{ + Id: id, + } + + docs = append(docs, doc) + } + + // delete documents in bulk + result, err := h.client.BulkDelete(docs) + if err != nil { + httpStatus = http.StatusInternalServerError + + msgMap := map[string]interface{}{ + "message": err.Error(), + "status": httpStatus, + } + + content, err = blasthttp.NewJSONMessage(msgMap) + if err != nil { + h.logger.Printf("[ERR] %v", err) + } + + return } - err := h.client.Delete(doc) + content, err = json.MarshalIndent(result, "", " ") if err != nil { httpStatus = http.StatusInternalServerError diff --git a/index/http_server.go b/index/http_server.go index 57f40a1..2bd3fdd 100644 --- a/index/http_server.go +++ b/index/http_server.go @@ -45,6 +45,8 @@ func NewHTTPServer(httpAddr string, grpcClient *GRPCClient, logger *log.Logger, router.StrictSlash(true) router.Handle("/", NewRootHandler(logger)).Methods("GET") + router.Handle("/documents", NewIndexHandler(grpcClient, logger)).Methods("PUT") + router.Handle("/documents", NewDeleteHandler(grpcClient, logger)).Methods("DELETE") router.Handle("/documents/{id}", NewGetHandler(grpcClient, logger)).Methods("GET") router.Handle("/documents/{id}", NewIndexHandler(grpcClient, logger)).Methods("PUT") router.Handle("/documents/{id}", NewDeleteHandler(grpcClient, logger)).Methods("DELETE") diff --git a/index/raft_server.go b/index/raft_server.go index 0b980fa..39e9dd5 100644 --- a/index/raft_server.go +++ b/index/raft_server.go @@ -21,7 +21,6 @@ import ( "time" "github.com/blevesearch/bleve" - _ "github.com/blevesearch/bleve/config" "github.com/blevesearch/bleve/mapping" "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes/any" diff --git a/protobuf/index/index.pb.go b/protobuf/index/index.pb.go index a281f70..193a558 100644 --- a/protobuf/index/index.pb.go +++ b/protobuf/index/index.pb.go @@ -56,7 +56,7 @@ func (x IndexCommand_Type) String() string { } func (IndexCommand_Type) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_7b2daf652facb3ae, []int{9, 0} + return fileDescriptor_7b2daf652facb3ae, []int{10, 0} } type Document struct { @@ -106,6 +106,45 @@ func (m *Document) GetFields() *any.Any { return nil } +type BulkResult struct { + Count int32 `protobuf:"varint,1,opt,name=count,proto3" json:"count,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *BulkResult) Reset() { *m = BulkResult{} } +func (m *BulkResult) String() string { return proto.CompactTextString(m) } +func (*BulkResult) ProtoMessage() {} +func (*BulkResult) Descriptor() ([]byte, []int) { + return fileDescriptor_7b2daf652facb3ae, []int{1} +} + +func (m *BulkResult) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_BulkResult.Unmarshal(m, b) +} +func (m *BulkResult) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_BulkResult.Marshal(b, m, deterministic) +} +func (m *BulkResult) XXX_Merge(src proto.Message) { + xxx_messageInfo_BulkResult.Merge(m, src) +} +func (m *BulkResult) XXX_Size() int { + return xxx_messageInfo_BulkResult.Size(m) +} +func (m *BulkResult) XXX_DiscardUnknown() { + xxx_messageInfo_BulkResult.DiscardUnknown(m) +} + +var xxx_messageInfo_BulkResult proto.InternalMessageInfo + +func (m *BulkResult) GetCount() int32 { + if m != nil { + return m.Count + } + return 0 +} + type Stats struct { Stats *any.Any `protobuf:"bytes,1,opt,name=stats,proto3" json:"stats,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` @@ -117,7 +156,7 @@ func (m *Stats) Reset() { *m = Stats{} } func (m *Stats) String() string { return proto.CompactTextString(m) } func (*Stats) ProtoMessage() {} func (*Stats) Descriptor() ([]byte, []int) { - return fileDescriptor_7b2daf652facb3ae, []int{1} + return fileDescriptor_7b2daf652facb3ae, []int{2} } func (m *Stats) XXX_Unmarshal(b []byte) error { @@ -156,7 +195,7 @@ func (m *GetRequest) Reset() { *m = GetRequest{} } func (m *GetRequest) String() string { return proto.CompactTextString(m) } func (*GetRequest) ProtoMessage() {} func (*GetRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_7b2daf652facb3ae, []int{2} + return fileDescriptor_7b2daf652facb3ae, []int{3} } func (m *GetRequest) XXX_Unmarshal(b []byte) error { @@ -195,7 +234,7 @@ func (m *GetResponse) Reset() { *m = GetResponse{} } func (m *GetResponse) String() string { return proto.CompactTextString(m) } func (*GetResponse) ProtoMessage() {} func (*GetResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_7b2daf652facb3ae, []int{3} + return fileDescriptor_7b2daf652facb3ae, []int{4} } func (m *GetResponse) XXX_Unmarshal(b []byte) error { @@ -234,7 +273,7 @@ func (m *SearchRequest) Reset() { *m = SearchRequest{} } func (m *SearchRequest) String() string { return proto.CompactTextString(m) } func (*SearchRequest) ProtoMessage() {} func (*SearchRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_7b2daf652facb3ae, []int{4} + return fileDescriptor_7b2daf652facb3ae, []int{5} } func (m *SearchRequest) XXX_Unmarshal(b []byte) error { @@ -273,7 +312,7 @@ func (m *SearchResponse) Reset() { *m = SearchResponse{} } func (m *SearchResponse) String() string { return proto.CompactTextString(m) } func (*SearchResponse) ProtoMessage() {} func (*SearchResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_7b2daf652facb3ae, []int{5} + return fileDescriptor_7b2daf652facb3ae, []int{6} } func (m *SearchResponse) XXX_Unmarshal(b []byte) error { @@ -312,7 +351,7 @@ func (m *IndexRequest) Reset() { *m = IndexRequest{} } func (m *IndexRequest) String() string { return proto.CompactTextString(m) } func (*IndexRequest) ProtoMessage() {} func (*IndexRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_7b2daf652facb3ae, []int{6} + return fileDescriptor_7b2daf652facb3ae, []int{7} } func (m *IndexRequest) XXX_Unmarshal(b []byte) error { @@ -351,7 +390,7 @@ func (m *DeleteRequest) Reset() { *m = DeleteRequest{} } func (m *DeleteRequest) String() string { return proto.CompactTextString(m) } func (*DeleteRequest) ProtoMessage() {} func (*DeleteRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_7b2daf652facb3ae, []int{7} + return fileDescriptor_7b2daf652facb3ae, []int{8} } func (m *DeleteRequest) XXX_Unmarshal(b []byte) error { @@ -390,7 +429,7 @@ func (m *GetStatsResponse) Reset() { *m = GetStatsResponse{} } func (m *GetStatsResponse) String() string { return proto.CompactTextString(m) } func (*GetStatsResponse) ProtoMessage() {} func (*GetStatsResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_7b2daf652facb3ae, []int{8} + return fileDescriptor_7b2daf652facb3ae, []int{9} } func (m *GetStatsResponse) XXX_Unmarshal(b []byte) error { @@ -430,7 +469,7 @@ func (m *IndexCommand) Reset() { *m = IndexCommand{} } func (m *IndexCommand) String() string { return proto.CompactTextString(m) } func (*IndexCommand) ProtoMessage() {} func (*IndexCommand) Descriptor() ([]byte, []int) { - return fileDescriptor_7b2daf652facb3ae, []int{9} + return fileDescriptor_7b2daf652facb3ae, []int{10} } func (m *IndexCommand) XXX_Unmarshal(b []byte) error { @@ -468,6 +507,7 @@ func (m *IndexCommand) GetData() *any.Any { func init() { proto.RegisterEnum("index.IndexCommand_Type", IndexCommand_Type_name, IndexCommand_Type_value) proto.RegisterType((*Document)(nil), "index.Document") + proto.RegisterType((*BulkResult)(nil), "index.BulkResult") proto.RegisterType((*Stats)(nil), "index.Stats") proto.RegisterType((*GetRequest)(nil), "index.GetRequest") proto.RegisterType((*GetResponse)(nil), "index.GetResponse") @@ -482,46 +522,49 @@ func init() { func init() { proto.RegisterFile("protobuf/index/index.proto", fileDescriptor_7b2daf652facb3ae) } var fileDescriptor_7b2daf652facb3ae = []byte{ - // 617 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x94, 0x5f, 0x6f, 0xd3, 0x3c, - 0x14, 0xc6, 0xdb, 0xae, 0xed, 0xdb, 0xf7, 0xac, 0xed, 0x3a, 0x6f, 0x83, 0x12, 0x6e, 0xa6, 0x5c, - 0xa0, 0x0a, 0x50, 0x2a, 0x6d, 0x62, 0xb0, 0x3f, 0x48, 0x94, 0x26, 0x2a, 0x63, 0x6b, 0x26, 0xb5, - 0x99, 0x40, 0xdc, 0x54, 0xe9, 0xe2, 0x6d, 0xd1, 0x92, 0x38, 0xd4, 0x0e, 0xa2, 0x1f, 0x91, 0x6b, - 0xbe, 0x10, 0x8a, 0xed, 0xa4, 0xe9, 0x50, 0x3a, 0xb1, 0x9b, 0x28, 0x3e, 0x7e, 0x7e, 0xc7, 0x27, - 0xe7, 0xf8, 0x09, 0x28, 0xe1, 0x8c, 0x30, 0x32, 0x8d, 0xae, 0xbb, 0x6e, 0xe0, 0xe0, 0x9f, 0xe2, - 0xa9, 0xf1, 0x20, 0xaa, 0xf0, 0x85, 0xf2, 0xec, 0x86, 0x90, 0x1b, 0x0f, 0x77, 0x53, 0xa5, 0x1d, - 0xcc, 0x85, 0x42, 0x79, 0x7e, 0x7f, 0x0b, 0xfb, 0x21, 0x4b, 0x36, 0xdb, 0x69, 0x74, 0x66, 0x5f, - 0x33, 0xfe, 0x10, 0x3b, 0xea, 0x27, 0xa8, 0xe9, 0xe4, 0x2a, 0xf2, 0x71, 0xc0, 0x50, 0x13, 0x4a, - 0xae, 0xd3, 0x2e, 0xee, 0x16, 0x3b, 0xff, 0x8f, 0x4a, 0xae, 0x83, 0x5e, 0x43, 0xf5, 0xda, 0xc5, - 0x9e, 0x43, 0xdb, 0xa5, 0xdd, 0x62, 0x67, 0x7d, 0x6f, 0x5b, 0x13, 0x67, 0x68, 0x49, 0x36, 0xad, - 0x17, 0xcc, 0x47, 0x52, 0xa3, 0xee, 0x43, 0x65, 0xcc, 0x6c, 0x46, 0xd1, 0x4b, 0xa8, 0xd0, 0xf8, - 0x85, 0x67, 0xca, 0xa3, 0x84, 0x44, 0x3d, 0x04, 0x18, 0x60, 0x36, 0xc2, 0xdf, 0x23, 0x4c, 0x19, - 0x7a, 0x05, 0x35, 0x47, 0x16, 0x23, 0xe1, 0x0d, 0x4d, 0x74, 0x21, 0xa9, 0x71, 0x94, 0x0a, 0xd4, - 0x23, 0x58, 0xe7, 0x28, 0x0d, 0x49, 0x40, 0xf1, 0xbf, 0xb1, 0xe7, 0xd0, 0x18, 0x63, 0x7b, 0x76, - 0x75, 0x9b, 0x9c, 0x7c, 0x0c, 0x4d, 0xca, 0x03, 0x93, 0x99, 0x88, 0xac, 0x2c, 0xbe, 0x41, 0xb3, - 0xb0, 0x7a, 0x06, 0xcd, 0x24, 0x9b, 0x2c, 0xe6, 0x10, 0x1a, 0x69, 0x3a, 0x1a, 0x79, 0xab, 0xb3, - 0xd5, 0x93, 0x6c, 0xb1, 0x52, 0x3d, 0x86, 0xfa, 0x69, 0x5c, 0xf6, 0xa3, 0x7a, 0x72, 0x02, 0x0d, - 0x1d, 0x7b, 0x98, 0xe1, 0x47, 0xd1, 0x07, 0xd0, 0x1a, 0x60, 0xc6, 0x87, 0x98, 0x7e, 0x89, 0xba, - 0x3c, 0xcc, 0xba, 0xa4, 0x85, 0x48, 0x0e, 0xf1, 0x77, 0x51, 0xd6, 0xdc, 0x27, 0xbe, 0x6f, 0x07, - 0xf1, 0xc5, 0x29, 0xb3, 0x79, 0x88, 0x39, 0xd3, 0xdc, 0x6b, 0x4b, 0x26, 0x2b, 0xd1, 0xac, 0x79, - 0x88, 0x47, 0x5c, 0x85, 0x3a, 0x50, 0x76, 0x6c, 0x66, 0xaf, 0xbc, 0x64, 0x5c, 0xa1, 0xde, 0x41, - 0x39, 0xe6, 0xd0, 0x16, 0x6c, 0x5c, 0x9a, 0x67, 0xe6, 0xc5, 0x17, 0x73, 0xd2, 0xbf, 0x18, 0x0e, - 0x7b, 0xa6, 0xde, 0x2a, 0xa0, 0x16, 0xd4, 0xc7, 0x86, 0x35, 0x19, 0x1a, 0x56, 0x4f, 0xef, 0x59, - 0xbd, 0x56, 0x31, 0x96, 0xe9, 0xc6, 0xb9, 0x61, 0x19, 0x8b, 0x60, 0x09, 0x21, 0x68, 0x9e, 0x9a, - 0xba, 0xf1, 0x75, 0xa2, 0x5f, 0xf4, 0x2f, 0x87, 0x86, 0x69, 0xb5, 0xd6, 0x32, 0xc2, 0x34, 0x58, - 0xde, 0xfb, 0x55, 0x86, 0x0a, 0x2f, 0x19, 0xed, 0x43, 0xf9, 0x33, 0x71, 0x03, 0xb4, 0xa9, 0x71, - 0xe3, 0xc4, 0xef, 0xb2, 0xbf, 0xca, 0x93, 0xbf, 0xaa, 0x35, 0x62, 0xdb, 0xa9, 0x05, 0xf4, 0x06, - 0x2a, 0xe7, 0xd8, 0xfe, 0x81, 0x11, 0x12, 0x14, 0x5f, 0x3c, 0x8c, 0x1d, 0xc1, 0x7f, 0x03, 0xcc, - 0x4c, 0xe2, 0x60, 0x94, 0x23, 0x52, 0x76, 0x44, 0x42, 0x29, 0x4b, 0x26, 0xa5, 0x16, 0xd0, 0x07, - 0x6e, 0xa6, 0xbe, 0x17, 0x51, 0x86, 0x67, 0xb9, 0x78, 0x3b, 0xc5, 0xa5, 0x32, 0x93, 0xe1, 0x04, - 0x6a, 0xe3, 0xc0, 0x0e, 0xe9, 0x2d, 0x61, 0xb9, 0x7c, 0x7e, 0xed, 0x1a, 0xac, 0x0d, 0x30, 0x43, - 0x9b, 0x72, 0xde, 0x0b, 0x63, 0x2b, 0x28, 0x1b, 0x4a, 0x4f, 0x7b, 0x0b, 0x55, 0xe1, 0x1b, 0xb4, - 0x9d, 0x5c, 0xab, 0xac, 0xaf, 0x94, 0x9d, 0x7b, 0xd1, 0x14, 0x3c, 0x48, 0x26, 0xb3, 0x95, 0xbd, - 0x5a, 0x0f, 0x37, 0xf7, 0x1d, 0x54, 0x85, 0x3d, 0xd2, 0x03, 0x97, 0xdc, 0xb2, 0x82, 0x7c, 0x0f, - 0xb5, 0xc4, 0x1a, 0xb9, 0x8d, 0x79, 0xba, 0xf8, 0xc8, 0x25, 0x0f, 0xa9, 0x85, 0x8f, 0x9d, 0x6f, - 0x2f, 0x6e, 0x5c, 0x76, 0x1b, 0x4d, 0xb5, 0x2b, 0xe2, 0x77, 0x7d, 0x42, 0xa3, 0x3b, 0xbb, 0x3b, - 0xf5, 0x6c, 0xca, 0xba, 0xcb, 0x3f, 0xfd, 0x69, 0x95, 0xaf, 0xf7, 0xff, 0x04, 0x00, 0x00, 0xff, - 0xff, 0x31, 0x3b, 0xb6, 0x38, 0x0d, 0x06, 0x00, 0x00, + // 665 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x55, 0x5f, 0x6f, 0xda, 0x3e, + 0x14, 0x05, 0x4a, 0xf8, 0xd1, 0x5b, 0xa0, 0xd4, 0x6d, 0x7f, 0x63, 0xd9, 0x4b, 0xe5, 0x87, 0x09, + 0x6d, 0x53, 0x90, 0xca, 0xd6, 0xad, 0x7f, 0x26, 0x8d, 0x42, 0xc4, 0xba, 0x96, 0x54, 0x0a, 0x54, + 0x9b, 0xf6, 0x82, 0x02, 0x71, 0x5b, 0xd4, 0x10, 0x33, 0xec, 0x4c, 0xe3, 0x7b, 0xed, 0x9b, 0xec, + 0x0b, 0x4d, 0xb1, 0x9d, 0x10, 0x3a, 0x41, 0xb7, 0xbe, 0xa0, 0xf8, 0xfa, 0x9c, 0x73, 0xaf, 0xef, + 0xf5, 0x31, 0xa0, 0x4f, 0xa6, 0x94, 0xd3, 0x41, 0x70, 0x5d, 0x1b, 0xf9, 0x2e, 0xf9, 0x21, 0x7f, + 0x0d, 0x11, 0x44, 0x9a, 0x58, 0xe8, 0x4f, 0x6f, 0x28, 0xbd, 0xf1, 0x48, 0x2d, 0x46, 0x3a, 0xfe, + 0x4c, 0x22, 0xf4, 0x67, 0xf7, 0xb7, 0xc8, 0x78, 0xc2, 0xa3, 0xcd, 0x4a, 0x1c, 0x9d, 0x3a, 0xd7, + 0x5c, 0xfc, 0xc8, 0x1d, 0xfc, 0x11, 0xf2, 0x2d, 0x3a, 0x0c, 0xc6, 0xc4, 0xe7, 0xa8, 0x04, 0x99, + 0x91, 0x5b, 0x49, 0xef, 0xa5, 0xab, 0xeb, 0x76, 0x66, 0xe4, 0xa2, 0x57, 0x90, 0xbb, 0x1e, 0x11, + 0xcf, 0x65, 0x95, 0xcc, 0x5e, 0xba, 0xba, 0xb1, 0xbf, 0x63, 0xc8, 0x1c, 0x46, 0xa4, 0x66, 0x34, + 0xfc, 0x99, 0xad, 0x30, 0x18, 0x03, 0x9c, 0x06, 0xde, 0x9d, 0x4d, 0x58, 0xe0, 0x71, 0xb4, 0x03, + 0xda, 0x90, 0x06, 0x3e, 0x17, 0x72, 0x9a, 0x2d, 0x17, 0xb8, 0x0e, 0x5a, 0x97, 0x3b, 0x9c, 0xa1, + 0x17, 0xa0, 0xb1, 0xf0, 0x43, 0x6c, 0x2f, 0x53, 0x96, 0x10, 0x7c, 0x08, 0xd0, 0x26, 0xdc, 0x26, + 0xdf, 0x02, 0xc2, 0x38, 0x7a, 0x09, 0x79, 0x57, 0x15, 0xac, 0xc8, 0x9b, 0x86, 0xec, 0x54, 0x74, + 0x0e, 0x3b, 0x06, 0xe0, 0x23, 0xd8, 0x10, 0x54, 0x36, 0xa1, 0x3e, 0x23, 0xff, 0xc6, 0xbd, 0x80, + 0x62, 0x97, 0x38, 0xd3, 0xe1, 0x6d, 0x94, 0xf9, 0x18, 0x4a, 0x4c, 0x04, 0xfa, 0x53, 0x19, 0x59, + 0x59, 0x7c, 0x91, 0x25, 0xc9, 0xf8, 0x1c, 0x4a, 0x91, 0x9a, 0x2a, 0xe6, 0x10, 0x8a, 0xb1, 0x5c, + 0xd8, 0xb2, 0x95, 0x6a, 0x85, 0x48, 0x2d, 0x44, 0xe2, 0x63, 0x28, 0x9c, 0x85, 0x65, 0x3f, 0xaa, + 0x27, 0x27, 0x50, 0x6c, 0x11, 0x8f, 0x70, 0xf2, 0x28, 0xf6, 0x01, 0x94, 0xdb, 0x84, 0x8b, 0x21, + 0xc6, 0x27, 0xc1, 0x8b, 0xc3, 0x2c, 0x28, 0xb6, 0x04, 0xa9, 0x21, 0xfe, 0x4a, 0xab, 0x9a, 0x9b, + 0x74, 0x3c, 0x76, 0xfc, 0xf0, 0x72, 0x65, 0xf9, 0x6c, 0x42, 0x04, 0xa7, 0xb4, 0x5f, 0x51, 0x9c, + 0x24, 0xc4, 0xe8, 0xcd, 0x26, 0xc4, 0x16, 0x28, 0x54, 0x85, 0xac, 0xeb, 0x70, 0x67, 0xe5, 0x45, + 0x14, 0x08, 0x7c, 0x07, 0xd9, 0x90, 0x87, 0xb6, 0x61, 0xf3, 0xca, 0x3a, 0xb7, 0x2e, 0x3f, 0x5b, + 0xfd, 0xe6, 0x65, 0xa7, 0xd3, 0xb0, 0x5a, 0xe5, 0x14, 0x2a, 0x43, 0xa1, 0x6b, 0xf6, 0xfa, 0x1d, + 0xb3, 0xd7, 0x68, 0x35, 0x7a, 0x8d, 0x72, 0x3a, 0x84, 0xb5, 0xcc, 0x0b, 0xb3, 0x67, 0xce, 0x83, + 0x19, 0x84, 0xa0, 0x74, 0x66, 0xb5, 0xcc, 0x2f, 0xfd, 0xd6, 0x65, 0xf3, 0xaa, 0x63, 0x5a, 0xbd, + 0xf2, 0x5a, 0x02, 0x18, 0x07, 0xb3, 0xfb, 0x3f, 0x35, 0xd0, 0x44, 0xc9, 0xa8, 0x0e, 0xd9, 0x4f, + 0x74, 0xe4, 0xa3, 0x2d, 0x43, 0x98, 0x2b, 0xfc, 0x56, 0xfd, 0xd5, 0xff, 0xff, 0xa3, 0x5a, 0x33, + 0xb4, 0x26, 0x4e, 0xa1, 0x37, 0xa0, 0x5d, 0x10, 0xe7, 0x3b, 0x41, 0x48, 0xb2, 0xc4, 0xe2, 0x61, + 0xda, 0x11, 0xfc, 0xd7, 0x26, 0xdc, 0xa2, 0x2e, 0x41, 0x4b, 0x40, 0xfa, 0xae, 0x14, 0x54, 0xb0, + 0x68, 0x52, 0x38, 0x85, 0x3e, 0x08, 0x33, 0x35, 0xbd, 0x80, 0x71, 0x32, 0x5d, 0x4a, 0xaf, 0xc4, + 0x74, 0x85, 0x4c, 0x28, 0x9c, 0x40, 0xbe, 0xeb, 0x3b, 0x13, 0x76, 0x4b, 0xf9, 0x52, 0xfe, 0xf2, + 0xda, 0x0d, 0x58, 0x6b, 0x13, 0x8e, 0xb6, 0xd4, 0xbc, 0xe7, 0xc6, 0xd6, 0x51, 0x32, 0x14, 0x67, + 0x7b, 0x0b, 0x39, 0xe9, 0x1b, 0xb4, 0x13, 0x5d, 0xab, 0xa4, 0xaf, 0xf4, 0xdd, 0x7b, 0xd1, 0x98, + 0x78, 0x10, 0x4d, 0x66, 0x3b, 0x79, 0xb5, 0x1e, 0x6e, 0xee, 0x3b, 0xc8, 0x49, 0x7b, 0xc4, 0x09, + 0x17, 0xdc, 0xb2, 0x82, 0x59, 0x87, 0xf5, 0xf0, 0x01, 0x94, 0x59, 0xef, 0x5b, 0x48, 0x8f, 0x4e, + 0x3c, 0x7f, 0x23, 0x71, 0xaa, 0x9a, 0x46, 0xaf, 0xe5, 0xab, 0xa9, 0x52, 0xfe, 0x2d, 0xeb, 0x3d, + 0xe4, 0x23, 0x17, 0x2e, 0x9d, 0xc1, 0x93, 0x79, 0x3f, 0x17, 0xec, 0x8a, 0x53, 0xa7, 0xd5, 0xaf, + 0xcf, 0x6f, 0x46, 0xfc, 0x36, 0x18, 0x18, 0x43, 0x3a, 0xae, 0x8d, 0x29, 0x0b, 0xee, 0x9c, 0xda, + 0xc0, 0x73, 0x18, 0xaf, 0x2d, 0xfe, 0x07, 0x0d, 0x72, 0x62, 0x5d, 0xff, 0x1d, 0x00, 0x00, 0xff, + 0xff, 0xb0, 0xc0, 0xfa, 0x4c, 0x9c, 0x06, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -545,6 +588,8 @@ type IndexClient interface { Search(ctx context.Context, in *SearchRequest, opts ...grpc.CallOption) (*SearchResponse, error) Index(ctx context.Context, in *IndexRequest, opts ...grpc.CallOption) (*empty.Empty, error) Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*empty.Empty, error) + BulkIndex(ctx context.Context, opts ...grpc.CallOption) (Index_BulkIndexClient, error) + BulkDelete(ctx context.Context, opts ...grpc.CallOption) (Index_BulkDeleteClient, error) GetStats(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*GetStatsResponse, error) } @@ -637,6 +682,74 @@ func (c *indexClient) Delete(ctx context.Context, in *DeleteRequest, opts ...grp return out, nil } +func (c *indexClient) BulkIndex(ctx context.Context, opts ...grpc.CallOption) (Index_BulkIndexClient, error) { + stream, err := c.cc.NewStream(ctx, &_Index_serviceDesc.Streams[0], "/index.Index/BulkIndex", opts...) + if err != nil { + return nil, err + } + x := &indexBulkIndexClient{stream} + return x, nil +} + +type Index_BulkIndexClient interface { + Send(*Document) error + CloseAndRecv() (*BulkResult, error) + grpc.ClientStream +} + +type indexBulkIndexClient struct { + grpc.ClientStream +} + +func (x *indexBulkIndexClient) Send(m *Document) error { + return x.ClientStream.SendMsg(m) +} + +func (x *indexBulkIndexClient) CloseAndRecv() (*BulkResult, error) { + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + m := new(BulkResult) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *indexClient) BulkDelete(ctx context.Context, opts ...grpc.CallOption) (Index_BulkDeleteClient, error) { + stream, err := c.cc.NewStream(ctx, &_Index_serviceDesc.Streams[1], "/index.Index/BulkDelete", opts...) + if err != nil { + return nil, err + } + x := &indexBulkDeleteClient{stream} + return x, nil +} + +type Index_BulkDeleteClient interface { + Send(*Document) error + CloseAndRecv() (*BulkResult, error) + grpc.ClientStream +} + +type indexBulkDeleteClient struct { + grpc.ClientStream +} + +func (x *indexBulkDeleteClient) Send(m *Document) error { + return x.ClientStream.SendMsg(m) +} + +func (x *indexBulkDeleteClient) CloseAndRecv() (*BulkResult, error) { + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + m := new(BulkResult) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + func (c *indexClient) GetStats(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*GetStatsResponse, error) { out := new(GetStatsResponse) err := c.cc.Invoke(ctx, "/index.Index/GetStats", in, out, opts...) @@ -657,6 +770,8 @@ type IndexServer interface { Search(context.Context, *SearchRequest) (*SearchResponse, error) Index(context.Context, *IndexRequest) (*empty.Empty, error) Delete(context.Context, *DeleteRequest) (*empty.Empty, error) + BulkIndex(Index_BulkIndexServer) error + BulkDelete(Index_BulkDeleteServer) error GetStats(context.Context, *empty.Empty) (*GetStatsResponse, error) } @@ -826,6 +941,58 @@ func _Index_Delete_Handler(srv interface{}, ctx context.Context, dec func(interf return interceptor(ctx, in, info, handler) } +func _Index_BulkIndex_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(IndexServer).BulkIndex(&indexBulkIndexServer{stream}) +} + +type Index_BulkIndexServer interface { + SendAndClose(*BulkResult) error + Recv() (*Document, error) + grpc.ServerStream +} + +type indexBulkIndexServer struct { + grpc.ServerStream +} + +func (x *indexBulkIndexServer) SendAndClose(m *BulkResult) error { + return x.ServerStream.SendMsg(m) +} + +func (x *indexBulkIndexServer) Recv() (*Document, error) { + m := new(Document) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func _Index_BulkDelete_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(IndexServer).BulkDelete(&indexBulkDeleteServer{stream}) +} + +type Index_BulkDeleteServer interface { + SendAndClose(*BulkResult) error + Recv() (*Document, error) + grpc.ServerStream +} + +type indexBulkDeleteServer struct { + grpc.ServerStream +} + +func (x *indexBulkDeleteServer) SendAndClose(m *BulkResult) error { + return x.ServerStream.SendMsg(m) +} + +func (x *indexBulkDeleteServer) Recv() (*Document, error) { + m := new(Document) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + func _Index_GetStats_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(empty.Empty) if err := dec(in); err != nil { @@ -889,6 +1056,17 @@ var _Index_serviceDesc = grpc.ServiceDesc{ Handler: _Index_GetStats_Handler, }, }, - Streams: []grpc.StreamDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "BulkIndex", + Handler: _Index_BulkIndex_Handler, + ClientStreams: true, + }, + { + StreamName: "BulkDelete", + Handler: _Index_BulkDelete_Handler, + ClientStreams: true, + }, + }, Metadata: "protobuf/index/index.proto", } diff --git a/protobuf/index/index.proto b/protobuf/index/index.proto index 7db37ca..eb6b467 100644 --- a/protobuf/index/index.proto +++ b/protobuf/index/index.proto @@ -33,6 +33,8 @@ service Index { rpc Search (SearchRequest) returns (SearchResponse) {} rpc Index (IndexRequest) returns (google.protobuf.Empty) {} rpc Delete (DeleteRequest) returns (google.protobuf.Empty) {} + rpc BulkIndex(stream Document) returns (BulkResult) {} + rpc BulkDelete(stream Document) returns (BulkResult) {} rpc GetStats (google.protobuf.Empty) returns (GetStatsResponse) {} } @@ -42,6 +44,10 @@ message Document { google.protobuf.Any fields = 2; } +message BulkResult { + int32 count = 1; +} + message Stats { google.protobuf.Any stats = 1; } diff --git a/protobuf/util_test.go b/protobuf/util_test.go new file mode 100644 index 0000000..a9ca4f8 --- /dev/null +++ b/protobuf/util_test.go @@ -0,0 +1,248 @@ +package protobuf + +import ( + "bytes" + "testing" + + "github.com/blevesearch/bleve" + "github.com/blevesearch/bleve/search/query" + "github.com/golang/protobuf/ptypes/any" + "github.com/mosuka/blast/protobuf/index" + "github.com/mosuka/blast/protobuf/raft" +) + +func TestMarshalAny(t *testing.T) { + // test map[string]interface{} + data := map[string]interface{}{"a": 1, "b": 2, "c": 3} + + mapAny := &any.Any{} + err := UnmarshalAny(data, mapAny) + if err != nil { + t.Errorf("%v", err) + } + + expectedType := "map[string]interface {}" + actualType := mapAny.TypeUrl + if expectedType != actualType { + t.Errorf("expected content to see %s, saw %s", expectedType, actualType) + } + + expectedValue := []byte(`{"a":1,"b":2,"c":3}`) + actualValue := mapAny.Value + if !bytes.Equal(expectedValue, actualValue) { + t.Errorf("expected content to see %v, saw %v", expectedValue, actualValue) + } + + // test index.Document + fieldsMap := map[string]interface{}{"f1": "aaa", "f2": 222, "f3": "ccc"} + fieldsAny := &any.Any{} + err = UnmarshalAny(fieldsMap, fieldsAny) + if err != nil { + t.Errorf("%v", err) + } + + doc := &index.Document{ + Id: "1", + Fields: fieldsAny, + } + + docAny := &any.Any{} + err = UnmarshalAny(doc, docAny) + if err != nil { + t.Errorf("%v", err) + } + + expectedType = "index.Document" + actualType = docAny.TypeUrl + if expectedType != actualType { + t.Errorf("expected content to see %s, saw %s", expectedType, actualType) + } + + expectedValue = []byte(`{"id":"1","fields":{"type_url":"map[string]interface {}","value":"eyJmMSI6ImFhYSIsImYyIjoyMjIsImYzIjoiY2NjIn0="}}`) + actualValue = docAny.Value + if !bytes.Equal(expectedValue, actualValue) { + t.Errorf("expected content to see %v, saw %v", expectedValue, actualValue) + } + + // test raft.Node + node := &raft.Node{ + Id: "node1", + GrpcAddr: ":5050", + DataDir: "/tmp/blast/index1", + BindAddr: ":6060", + HttpAddr: ":8080", + Leader: true, + } + + nodeAny := &any.Any{} + err = UnmarshalAny(node, nodeAny) + if err != nil { + t.Errorf("%v", err) + } + + expectedType = "raft.Node" + actualType = nodeAny.TypeUrl + if expectedType != actualType { + t.Errorf("expected content to see %s, saw %s", expectedType, actualType) + } + + expectedValue = []byte(`{"id":"node1","bind_addr":":6060","grpc_addr":":5050","http_addr":":8080","leader":true,"data_dir":"/tmp/blast/index1"}`) + actualValue = nodeAny.Value + if !bytes.Equal(expectedValue, actualValue) { + t.Errorf("expected content to see %v, saw %v", expectedValue, actualValue) + } + + // test bleve.SearchRequest + searchReq := bleve.NewSearchRequest(bleve.NewQueryStringQuery("blast")) + + searchReqAny := &any.Any{} + err = UnmarshalAny(searchReq, searchReqAny) + if err != nil { + t.Errorf("%v", err) + } + + expectedType = "bleve.SearchRequest" + actualType = searchReqAny.TypeUrl + if expectedType != actualType { + t.Errorf("expected content to see %s, saw %s", expectedType, actualType) + } + + expectedValue = []byte(`{"query":{"query":"blast"},"size":10,"from":0,"highlight":null,"fields":null,"facets":null,"explain":false,"sort":["-_score"],"includeLocations":false}`) + actualValue = searchReqAny.Value + if !bytes.Equal(expectedValue, actualValue) { + t.Errorf("expected content to see %v, saw %v", expectedValue, actualValue) + } + + // test bleve.SearchResult + searchReslt := &bleve.SearchResult{ + Total: 10, + } + + searchResltAny := &any.Any{} + err = UnmarshalAny(searchReslt, searchResltAny) + if err != nil { + t.Errorf("%v", err) + } + + expectedType = "bleve.SearchResult" + actualType = searchResltAny.TypeUrl + if expectedType != actualType { + t.Errorf("expected content to see %s, saw %s", expectedType, actualType) + } + + expectedValue = []byte(`{"status":null,"request":null,"hits":null,"total_hits":10,"max_score":0,"took":0,"facets":null}`) + actualValue = searchResltAny.Value + if !bytes.Equal(expectedValue, actualValue) { + t.Errorf("expected content to see %v, saw %v", expectedValue, actualValue) + } +} + +func TestUnmarshalAny(t *testing.T) { + // test map[string]interface{} + dataAny := &any.Any{ + TypeUrl: "map[string]interface {}", + Value: []byte(`{"a":1,"b":2,"c":3}`), + } + + data, err := MarshalAny(dataAny) + if err != nil { + t.Errorf("%v", err) + } + dataMap := *data.(*map[string]interface{}) + + if dataMap["a"] != float64(1) { + t.Errorf("expected content to see %v, saw %v", 1, dataMap["a"]) + } + if dataMap["b"] != float64(2) { + t.Errorf("expected content to see %v, saw %v", 2, dataMap["b"]) + } + if dataMap["c"] != float64(3) { + t.Errorf("expected content to see %v, saw %v", 3, dataMap["c"]) + } + + // index.Document + dataAny = &any.Any{ + TypeUrl: "index.Document", + Value: []byte(`{"id":"1","fields":{"type_url":"map[string]interface {}","value":"eyJmMSI6ImFhYSIsImYyIjoyMjIsImYzIjoiY2NjIn0="}}`), + } + + data, err = MarshalAny(dataAny) + if err != nil { + t.Errorf("%v", err) + } + dataDoc := data.(*index.Document) + + if dataDoc.Id != "1" { + t.Errorf("expected content to see %v, saw %v", "1", dataDoc.Id) + } + if dataDoc.Fields.TypeUrl != "map[string]interface {}" { + t.Errorf("expected content to see %v, saw %v", "map[string]interface {}", dataDoc.Fields.TypeUrl) + } + if !bytes.Equal(dataDoc.Fields.Value, []byte(`{"f1":"aaa","f2":222,"f3":"ccc"}`)) { + t.Errorf("expected content to see %v, saw %v", []byte("eyJmMSI6ImFhYSIsImYyIjoyMjIsImYzIjoiY2NjIn0="), dataDoc.Fields.Value) + } + + // raft.Node + dataAny = &any.Any{ + TypeUrl: "raft.Node", + Value: []byte(`{"id":"node1","bind_addr":":6060","grpc_addr":":5050","http_addr":":8080","leader":true,"data_dir":"/tmp/blast/index1"}`), + } + + data, err = MarshalAny(dataAny) + if err != nil { + t.Errorf("%v", err) + } + dataNode := data.(*raft.Node) + + if dataNode.Id != "node1" { + t.Errorf("expected content to see %v, saw %v", "node1", dataNode.Id) + } + if dataNode.HttpAddr != ":8080" { + t.Errorf("expected content to see %v, saw %v", ":8080", dataNode.HttpAddr) + } + if dataNode.BindAddr != ":6060" { + t.Errorf("expected content to see %v, saw %v", ":6060", dataNode.BindAddr) + } + if dataNode.GrpcAddr != ":5050" { + t.Errorf("expected content to see %v, saw %v", ":5050", dataNode.BindAddr) + } + if dataNode.DataDir != "/tmp/blast/index1" { + t.Errorf("expected content to see %v, saw %v", "/tmp/blast/index1", dataNode.DataDir) + } + if dataNode.Leader != true { + t.Errorf("expected content to see %v, saw %v", true, dataNode.Leader) + } + + // test bleve.SearchRequest + dataAny = &any.Any{ + TypeUrl: "bleve.SearchRequest", + Value: []byte(`{"query":{"query":"blast"},"size":10,"from":0,"highlight":null,"fields":null,"facets":null,"explain":false,"sort":["-_score"],"includeLocations":false}`), + } + + data, err = MarshalAny(dataAny) + if err != nil { + t.Errorf("%v", err) + } + searchRequest := data.(*bleve.SearchRequest) + + if searchRequest.Query.(*query.QueryStringQuery).Query != bleve.NewQueryStringQuery("blast").Query { + t.Errorf("expected content to see %v, saw %v", bleve.NewQueryStringQuery("blast").Query, searchRequest.Query.(*query.QueryStringQuery).Query) + } + + // test blast.SearchResult + dataAny = &any.Any{ + TypeUrl: "bleve.SearchResult", + Value: []byte(`{"status":null,"request":null,"hits":null,"total_hits":10,"max_score":0,"took":0,"facets":null}`), + } + + data, err = MarshalAny(dataAny) + if err != nil { + t.Errorf("%v", err) + } + searchResult := data.(*bleve.SearchResult) + + if searchResult.Total != 10 { + t.Errorf("expected content to see %v, saw %v", 10, searchResult.Total) + } + +} From 809857a3e5d01d0ecdfab3ea229ba44d1ce7e189 Mon Sep 17 00:00:00 2001 From: Minoru Osuka Date: Thu, 21 Mar 2019 10:23:55 +0900 Subject: [PATCH 2/3] Updte CHANGES.md --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index 9ee8659..03ae67b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Added +- Support bulk update #41 - Support Badger #38 - Add index stats #37 - Add Wikipedia example #35 From 80373b41909b62acdfc1a32ef231d43f9bf1646c Mon Sep 17 00:00:00 2001 From: Minoru Osuka Date: Thu, 21 Mar 2019 10:31:16 +0900 Subject: [PATCH 3/3] Update README.md --- README.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/README.md b/README.md index ef8b3c9..a755c8a 100644 --- a/README.md +++ b/README.md @@ -723,10 +723,9 @@ $ ./WikiExtractor.py -o ~/tmp/enwiki --json ~/tmp/enwiki-20190101-pages-articles ```bash $ for FILE in $(find ~/tmp/enwiki -type f -name '*' | sort) do - echo "${FILE}" + echo "Indexing ${FILE}" TIMESTAMP=$(date -u "+%Y-%m-%dT%H:%M:%SZ") DOCS=$(cat ${FILE} | jq -r '. + {fields: {url: .url, title_en: .title, text_en: .text, timestamp: "'${TIMESTAMP}'", _type: "enwiki"}} | del(.url) | del(.title) | del(.text) | del(.fields.id)' | jq -s) - echo "${DOCS}" curl -s -X PUT -H 'Content-Type: application/json' "http://127.0.0.1:8080/documents" -d "${DOCS}" done ```