From c14528653721f6de86b96631c58a918ed629e4f1 Mon Sep 17 00:00:00 2001 From: Minoru Osuka Date: Thu, 4 Jul 2019 00:04:54 +0900 Subject: [PATCH] Improve indexing performance --- README.md | 2 +- indexer/index.go | 79 +++++++++++++++++++++++------- indexer/raft_fsm.go | 59 +++++++++++++++-------- indexer/raft_server.go | 106 +++++++++++++++++++---------------------- 4 files changed, 150 insertions(+), 96 deletions(-) diff --git a/README.md b/README.md index cae205c..fa8f646 100644 --- a/README.md +++ b/README.md @@ -935,7 +935,7 @@ $ for FILE in $(find ~/tmp/enwiki -type f -name '*' | sort) echo "Indexing ${FILE}" TIMESTAMP=$(date -u "+%Y-%m-%dT%H:%M:%SZ") DOCS=$(cat ${FILE} | jq -r '. + {fields: {url: .url, title_en: .title, text_en: .text, timestamp: "'${TIMESTAMP}'", _type: "enwiki"}} | del(.url) | del(.title) | del(.text) | del(.fields.id)' | jq -s) - curl -s -X PUT -H 'Content-Type: application/json' "http://127.0.0.1:8080/documents" -d "${DOCS}" + curl -s -X PUT -H 'Content-Type: application/json' "http://127.0.0.1:5002/documents" -d "${DOCS}" done ``` diff --git a/indexer/index.go b/indexer/index.go index c8de8ad..6303a95 100644 --- a/indexer/index.go +++ b/indexer/index.go @@ -140,45 +140,92 @@ func (i *Index) Search(request *bleve.SearchRequest) (*bleve.SearchResult, error } func (i *Index) Index(id string, fields map[string]interface{}) error { - // index - err := i.index.Index(id, fields) + doc := map[string]interface{}{ + "id": id, + "fields": fields, + } + _, err := i.BulkIndex([]map[string]interface{}{doc}) if err != nil { i.logger.Error(err.Error()) return err } - // map[string]interface{} -> bytes - fieldsBytes, err := json.Marshal(fields) - if err != nil { - i.logger.Error(err.Error()) - return err + return nil +} + +func (i *Index) BulkIndex(docs []map[string]interface{}) (int, error) { + batch := i.index.NewBatch() + + count := 0 + + for _, doc := range docs { + id, ok := doc["id"].(string) + if !ok { + i.logger.Error("missing document id") + continue + } + fields, ok := doc["fields"].(map[string]interface{}) + if !ok { + i.logger.Error("missing document fields") + continue + } + err := batch.Index(id, fields) + if err != nil { + i.logger.Error(err.Error()) + continue + } + + // set original document + fieldsBytes, err := json.Marshal(fields) + if err != nil { + i.logger.Error(err.Error()) + continue + } + batch.SetInternal([]byte(id), fieldsBytes) + + count++ } - // set original document - err = i.index.SetInternal([]byte(id), fieldsBytes) + err := i.index.Batch(batch) if err != nil { i.logger.Error(err.Error()) - return err + return -1, err } - return nil + return count, nil } func (i *Index) Delete(id string) error { - err := i.index.Delete(id) + _, err := i.BulkDelete([]string{id}) if err != nil { i.logger.Error(err.Error()) return err } - // delete original document - err = i.index.SetInternal([]byte(id), nil) + return nil +} + +func (i *Index) BulkDelete(ids []string) (int, error) { + batch := i.index.NewBatch() + + count := 0 + + for _, id := range ids { + batch.Delete(id) + + // delete original document + batch.SetInternal([]byte(id), nil) + + count++ + } + + err := i.index.Batch(batch) if err != nil { i.logger.Error(err.Error()) - return err + return -1, err } - return nil + return count, nil } func (i *Index) Config() (map[string]interface{}, error) { diff --git a/indexer/raft_fsm.go b/indexer/raft_fsm.go index 15fd3a4..252d62d 100644 --- a/indexer/raft_fsm.go +++ b/indexer/raft_fsm.go @@ -114,8 +114,6 @@ func (f *RaftFSM) applyDeleteMetadata(id string) interface{} { } func (f *RaftFSM) GetDocument(id string) (map[string]interface{}, error) { - f.logger.Debug("get a document", zap.String("id", id)) - fields, err := f.index.Get(id) if err != nil { f.logger.Error(err.Error()) @@ -126,8 +124,6 @@ func (f *RaftFSM) GetDocument(id string) (map[string]interface{}, error) { } func (f *RaftFSM) applyIndexDocument(id string, fields map[string]interface{}) error { - f.logger.Debug("apply to index a document", zap.String("id", id), zap.Any("fields", fields)) - err := f.index.Index(id, fields) if err != nil { f.logger.Error(err.Error()) @@ -137,9 +133,17 @@ func (f *RaftFSM) applyIndexDocument(id string, fields map[string]interface{}) e return nil } -func (f *RaftFSM) applyDeleteDocument(id string) error { - f.logger.Debug("apply to delete a document", zap.String("id", id)) +func (f *RaftFSM) applyIndexDocuments(docs []map[string]interface{}) (int, error) { + count, err := f.index.BulkIndex(docs) + if err != nil { + f.logger.Error(err.Error()) + return -1, err + } + + return count, nil +} +func (f *RaftFSM) applyDeleteDocument(id string) error { err := f.index.Delete(id) if err != nil { f.logger.Error(err.Error()) @@ -149,9 +153,17 @@ func (f *RaftFSM) applyDeleteDocument(id string) error { return nil } -func (f *RaftFSM) Search(request *bleve.SearchRequest) (*bleve.SearchResult, error) { - f.logger.Debug("search documents") +func (f *RaftFSM) applyDeleteDocuments(ids []string) (int, error) { + count, err := f.index.BulkDelete(ids) + if err != nil { + f.logger.Error(err.Error()) + return -1, err + } + + return count, nil +} +func (f *RaftFSM) Search(request *bleve.SearchRequest) (*bleve.SearchResult, error) { result, err := f.index.Search(request) if err != nil { f.logger.Error(err.Error()) @@ -165,9 +177,17 @@ type fsmResponse struct { error error } -func (f *RaftFSM) Apply(l *raft.Log) interface{} { - f.logger.Debug("apply a message") +type fsmIndexDocumentResponse struct { + count int + error error +} + +type fsmDeleteDocumentResponse struct { + count int + error error +} +func (f *RaftFSM) Apply(l *raft.Log) interface{} { var msg message err := json.Unmarshal(l.Data, &msg) if err != nil { @@ -182,7 +202,6 @@ func (f *RaftFSM) Apply(l *raft.Log) interface{} { f.logger.Error(err.Error()) return &fsmResponse{error: err} } - err = f.applySetMetadata(data["id"].(string), data["metadata"].(map[string]interface{})) return &fsmResponse{error: err} case deleteNode: @@ -194,25 +213,23 @@ func (f *RaftFSM) Apply(l *raft.Log) interface{} { } return f.applyDeleteMetadata(data["id"].(string)) case indexDocument: - var data map[string]interface{} + var data []map[string]interface{} err := json.Unmarshal(msg.Data, &data) if err != nil { f.logger.Error(err.Error()) - return &fsmResponse{error: err} + return &fsmIndexDocumentResponse{count: -1, error: err} } - - err = f.applyIndexDocument(data["id"].(string), data["fields"].(map[string]interface{})) - return &fsmResponse{error: err} + count, err := f.applyIndexDocuments(data) + return &fsmIndexDocumentResponse{count: count, error: err} case deleteDocument: - var data string + var data []string err := json.Unmarshal(msg.Data, &data) if err != nil { f.logger.Error(err.Error()) - return &fsmResponse{error: err} + return &fsmDeleteDocumentResponse{count: -1, error: err} } - - err = f.applyDeleteDocument(data) - return &fsmResponse{error: err} + count, err := f.applyDeleteDocuments(data) + return &fsmDeleteDocumentResponse{count: count, error: err} default: err = errors.New("unsupported command") f.logger.Error(err.Error()) diff --git a/indexer/raft_server.go b/indexer/raft_server.go index dac9488..335e595 100644 --- a/indexer/raft_server.go +++ b/indexer/raft_server.go @@ -501,39 +501,34 @@ func (s *RaftServer) IndexDocument(docs []map[string]interface{}) (int, error) { return -1, raft.ErrNotLeader } - count := 0 - for _, doc := range docs { - msg, err := newMessage( - indexDocument, - doc, - ) - if err != nil { - s.logger.Error(err.Error()) - return -1, err - } - - msgBytes, err := json.Marshal(msg) - if err != nil { - s.logger.Error(err.Error()) - return -1, err - } + msg, err := newMessage( + indexDocument, + docs, + ) + if err != nil { + s.logger.Error(err.Error()) + return -1, err + } - f := s.raft.Apply(msgBytes, 10*time.Second) - err = f.Error() - if err != nil { - s.logger.Error(err.Error()) - return -1, err - } - err = f.Response().(*fsmResponse).error - if err != nil { - s.logger.Error(err.Error()) - return -1, err - } + msgBytes, err := json.Marshal(msg) + if err != nil { + s.logger.Error(err.Error()) + return -1, err + } - count++ + f := s.raft.Apply(msgBytes, 10*time.Second) + err = f.Error() + if err != nil { + s.logger.Error(err.Error()) + return -1, err + } + err = f.Response().(*fsmIndexDocumentResponse).error + if err != nil { + s.logger.Error(err.Error()) + return -1, err } - return count, nil + return f.Response().(*fsmIndexDocumentResponse).count, nil } func (s *RaftServer) DeleteDocument(ids []string) (int, error) { @@ -542,39 +537,34 @@ func (s *RaftServer) DeleteDocument(ids []string) (int, error) { return -1, raft.ErrNotLeader } - count := 0 - for _, id := range ids { - msg, err := newMessage( - deleteDocument, - id, - ) - if err != nil { - s.logger.Error(err.Error()) - return -1, err - } - - msgBytes, err := json.Marshal(msg) - if err != nil { - s.logger.Error(err.Error()) - return -1, err - } + msg, err := newMessage( + deleteDocument, + ids, + ) + if err != nil { + s.logger.Error(err.Error()) + return -1, err + } - f := s.raft.Apply(msgBytes, 10*time.Second) - err = f.Error() - if err != nil { - s.logger.Error(err.Error()) - return -1, err - } - err = f.Response().(*fsmResponse).error - if err != nil { - s.logger.Error(err.Error()) - return -1, err - } + msgBytes, err := json.Marshal(msg) + if err != nil { + s.logger.Error(err.Error()) + return -1, err + } - count++ + f := s.raft.Apply(msgBytes, 10*time.Second) + err = f.Error() + if err != nil { + s.logger.Error(err.Error()) + return -1, err + } + err = f.Response().(*fsmDeleteDocumentResponse).error + if err != nil { + s.logger.Error(err.Error()) + return -1, err } - return count, nil + return f.Response().(*fsmDeleteDocumentResponse).count, nil } func (s *RaftServer) GetIndexConfig() (map[string]interface{}, error) {