这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require (
github.com/blugelabs/bluge v0.1.8
github.com/blugelabs/bluge_segment_api v0.2.0
github.com/blugelabs/query_string v0.1.0
github.com/fsnotify/fsnotify v1.5.1
github.com/gin-contrib/cors v1.3.1
github.com/gin-contrib/zap v0.0.1
github.com/gin-gonic/gin v1.7.4
Expand Down Expand Up @@ -40,6 +39,7 @@ require (

require (
github.com/form3tech-oss/jwt-go v3.2.3+incompatible // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
Expand Down
151 changes: 21 additions & 130 deletions metastore/storage_fs.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,22 @@
package metastore

import (
"fmt"
"io/ioutil"
"net/url"
"os"
"path/filepath"
"sync"

"github.com/fsnotify/fsnotify"
"github.com/mosuka/phalanx/errors"
"github.com/mosuka/phalanx/util"
"go.uber.org/zap"
)

type FileSystemStorage struct {
path string
logger *zap.Logger
fsWatcher *fsnotify.Watcher
stopWatcher chan bool
events chan StorageEvent
watchSet map[string]bool
mutex sync.RWMutex
path string
logger *zap.Logger
events chan StorageEvent
mutex sync.RWMutex
}

func NewFileSystemStorageWithUri(uri string, logger *zap.Logger) (*FileSystemStorage, error) {
Expand Down Expand Up @@ -52,99 +47,10 @@ func NewFileSystemStorageWithPath(path string, logger *zap.Logger) (*FileSystemS
}
}

fsWatcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
stopWatcher := make(chan bool)
events := make(chan StorageEvent, 10)

watchSet := make(map[string]bool)

// Add the root path of the metastore to the watch list.
watchSet[path] = true
if err := fsWatcher.Add(path); err != nil {
return nil, err
}

// Start file system watcher
go func(fsWatcher *fsnotify.Watcher, stopWatcher chan bool, event chan StorageEvent, logger *zap.Logger) {
for {
select {
case cancel := <-stopWatcher:
// check
if cancel {
return
}
case event, ok := <-fsWatcher.Events:
if !ok {
err := fmt.Errorf("failed to receive event")
logger.Warn(err.Error())
continue
}

logger.Info("received file system event", zap.Any("event", event))

storageEvent := &StorageEvent{
Type: StorageEventTypeUnknown,
Path: event.Name,
Value: []byte{},
}

switch {
case event.Op&fsnotify.Create == fsnotify.Create:
storageEvent.Type = StorageEventTypePut
if util.IsFile(event.Name) {
content, err := ioutil.ReadFile(event.Name)
if err != nil {
logger.Error(err.Error(), zap.String("path", event.Name))
continue
}
storageEvent.Value = content
}
case event.Op&fsnotify.Write == fsnotify.Write:
storageEvent.Type = StorageEventTypePut
if util.IsFile(event.Name) {
content, err := ioutil.ReadFile(event.Name)
if err != nil {
logger.Error(err.Error(), zap.String("path", event.Name))
continue
}
storageEvent.Value = content
}
case event.Op&fsnotify.Remove == fsnotify.Remove:
storageEvent.Type = StorageEventTypeDelete
case event.Op&fsnotify.Rename == fsnotify.Rename:
// TODO
continue
case event.Op&fsnotify.Chmod == fsnotify.Chmod:
// ignore
continue
default:
err := errors.ErrUnsupportedMetastoreEvent
logger.Warn(err.Error())
continue
}

events <- *storageEvent
case err, ok := <-fsWatcher.Errors:
if !ok {
err := fmt.Errorf("failed to receive error")
logger.Warn(err.Error())
continue
}
logger.Warn(err.Error())
}
}
}(fsWatcher, stopWatcher, events, fileLogger)

fsStorage := &FileSystemStorage{
path: path,
logger: fileLogger,
fsWatcher: fsWatcher,
stopWatcher: stopWatcher,
events: events,
watchSet: watchSet,
path: path,
logger: fileLogger,
events: make(chan StorageEvent, 10),
}

return fsStorage, nil
Expand Down Expand Up @@ -208,24 +114,20 @@ func (m *FileSystemStorage) Put(path string, content []byte) error {
return err
}

// Add the created path to the watch list.
// Do not add m.path to watchSet twice.
watchDir := filepath.Dir(fullPath)
if _, ok := m.watchSet[watchDir]; !ok && watchDir != m.path {
m.logger.Info("add to watch list", zap.String("path", watchDir))
if err := m.fsWatcher.Add(watchDir); err != nil {
m.logger.Warn(err.Error())
return err
}
m.watchSet[watchDir] = true
}

// Write file.
if err := ioutil.WriteFile(fullPath, content, 0600); err != nil {
m.logger.Error(err.Error(), zap.String("path", fullPath))
return err
}

// Send event to the event channel.
storageEvent := &StorageEvent{
Type: StorageEventTypePut,
Path: fullPath,
Value: content,
}
m.events <- *storageEvent

return nil
}

Expand All @@ -243,17 +145,13 @@ func (m *FileSystemStorage) Delete(path string) error {
return err
}

// Remove the removed path from the watch list.
// Do not remove m.path from watchSet.
watchDir := filepath.Dir(fullPath)
if _, ok := m.watchSet[watchDir]; ok && watchDir != m.path {
m.logger.Info("remove to watch list", zap.String("path", watchDir))
if err := m.fsWatcher.Remove(watchDir); err != nil {
m.logger.Warn(err.Error())
return err
}
delete(m.watchSet, watchDir)
// Send event to the event channel.
storageEvent := &StorageEvent{
Type: StorageEventTypeDelete,
Path: fullPath,
Value: []byte{},
}
m.events <- *storageEvent

return nil
}
Expand All @@ -268,13 +166,6 @@ func (m *FileSystemStorage) Exists(path string) (bool, error) {
}

func (m *FileSystemStorage) Close() error {
m.stopWatcher <- true

if err := m.fsWatcher.Close(); err != nil {
m.logger.Error(err.Error())
return err
}

return nil
}

Expand Down