diff --git a/metastore/storage_etcd.go b/metastore/storage_etcd.go index fcb360c..bd2010e 100644 --- a/metastore/storage_etcd.go +++ b/metastore/storage_etcd.go @@ -8,38 +8,16 @@ import ( "github.com/mosuka/phalanx/clients" "github.com/mosuka/phalanx/errors" - "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) -func makeEtcdStorageEvent(event *clientv3.Event) (*StorageEvent, error) { - switch event.Type { - case mvccpb.PUT: - return &StorageEvent{ - Type: StorageEventTypePut, - Path: string(event.Kv.Key), - Value: event.Kv.Value, - }, nil - case mvccpb.DELETE: - return &StorageEvent{ - Type: StorageEventTypeDelete, - Path: string(event.Kv.Key), - Value: event.Kv.Value, - }, nil - default: - err := errors.ErrUnsupportedMetastoreEvent - return nil, err - } -} - type EtcdStorage struct { client *clientv3.Client kv clientv3.KV root string logger *zap.Logger ctx context.Context - stopWatcher chan bool events chan StorageEvent requestTimeout time.Duration } @@ -67,50 +45,13 @@ func NewEtcdStorageWithUri(uri string, logger *zap.Logger) (*EtcdStorage, error) root := filepath.ToSlash(filepath.Join(string(filepath.Separator), u.Host, u.Path)) - stopWatching := make(chan bool) - events := make(chan StorageEvent, 10) - - // Start etcd watcher - go func(root string, client *clientv3.Client, stopWatcher chan bool, events chan StorageEvent, logger *zap.Logger) { - watchPath := root + "/" - opts := []clientv3.OpOption{ - clientv3.WithFromKey(), - } - ctx := context.Background() - - watchChan := client.Watch(ctx, watchPath, opts...) - - for { - select { - case cancel := <-stopWatcher: - // check - if cancel { - return - } - case result := <-watchChan: - for _, event := range result.Events { - logger.Info("received etcd event", zap.Any("event", event)) - - metastoreEvent, err := makeEtcdStorageEvent(event) - if err != nil { - logger.Warn(err.Error(), zap.Any("event", event)) - continue - } - - events <- *metastoreEvent - } - } - } - }(root, client, stopWatching, events, metastorelogger) - return &EtcdStorage{ client: client, kv: clientv3.NewKV(client), root: root, logger: metastorelogger, ctx: context.Background(), - stopWatcher: stopWatching, - events: events, + events: make(chan StorageEvent, 10), requestTimeout: 3 * time.Second, }, nil } @@ -178,6 +119,14 @@ func (m *EtcdStorage) Put(path string, content []byte) error { return err } + // Send event to the event channel. + storageEvent := &StorageEvent{ + Type: StorageEventTypePut, + Path: fullPath, + Value: content, + } + m.events <- *storageEvent + return nil } @@ -192,6 +141,14 @@ func (m *EtcdStorage) Delete(path string) error { return err } + // Send event to the event channel. + storageEvent := &StorageEvent{ + Type: StorageEventTypeDelete, + Path: fullPath, + Value: []byte{}, + } + m.events <- *storageEvent + return nil } @@ -215,8 +172,6 @@ func (m *EtcdStorage) Exists(path string) (bool, error) { } func (m *EtcdStorage) Close() error { - m.stopWatcher <- true - if err := m.client.Close(); err != nil { m.logger.Error(err.Error()) return err diff --git a/metastore_test/storage_etcd_test.go b/metastore_test/storage_etcd_test.go index 1a00818..eb2a87c 100644 --- a/metastore_test/storage_etcd_test.go +++ b/metastore_test/storage_etcd_test.go @@ -213,3 +213,41 @@ func TestEtcdStorageList(t *testing.T) { t.Fatalf("unexpected %v\v", paths) } } + +func TestEtcdStorageWatch(t *testing.T) { + defer testutil.AfterTest(t) + integration.BeforeTest(t) + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseTCP: true}) + defer cluster.Terminate(t) + + etcdEndpoints := cluster.RandClient().Endpoints()[0] + u, err := url.Parse(etcdEndpoints) + if err != nil { + t.Fatalf("%v\n", err) + } + endpoints := fmt.Sprintf("%s:%s", u.Hostname(), u.Port()) + + uri := fmt.Sprintf("etcd://phalanx-test/metastore?endpoints=%s", endpoints) + logger := logging.NewLogger("WARN", "", 500, 3, 30, false) + + etcdStorage, err := metastore.NewEtcdStorageWithUri(uri, logger) + if err != nil { + t.Fatalf("%v\n", err) + } + defer etcdStorage.Close() + + etcdStorage.Put("/test/hello.txt", []byte("hello")) + etcdStorage.Delete("/test/hello.txt") + + event := <-etcdStorage.Events() + expected := metastore.StorageEventTypePut + if event.Type != expected { + t.Fatalf("%v\n", err) + } + + event = <-etcdStorage.Events() + expected = metastore.StorageEventTypeDelete + if event.Type != expected { + t.Fatalf("%v\n", err) + } +}