这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 17 additions & 62 deletions metastore/storage_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,38 +8,16 @@ import (

"github.com/mosuka/phalanx/clients"
"github.com/mosuka/phalanx/errors"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

func makeEtcdStorageEvent(event *clientv3.Event) (*StorageEvent, error) {
switch event.Type {
case mvccpb.PUT:
return &StorageEvent{
Type: StorageEventTypePut,
Path: string(event.Kv.Key),
Value: event.Kv.Value,
}, nil
case mvccpb.DELETE:
return &StorageEvent{
Type: StorageEventTypeDelete,
Path: string(event.Kv.Key),
Value: event.Kv.Value,
}, nil
default:
err := errors.ErrUnsupportedMetastoreEvent
return nil, err
}
}

type EtcdStorage struct {
client *clientv3.Client
kv clientv3.KV
root string
logger *zap.Logger
ctx context.Context
stopWatcher chan bool
events chan StorageEvent
requestTimeout time.Duration
}
Expand Down Expand Up @@ -67,50 +45,13 @@ func NewEtcdStorageWithUri(uri string, logger *zap.Logger) (*EtcdStorage, error)

root := filepath.ToSlash(filepath.Join(string(filepath.Separator), u.Host, u.Path))

stopWatching := make(chan bool)
events := make(chan StorageEvent, 10)

// Start etcd watcher
go func(root string, client *clientv3.Client, stopWatcher chan bool, events chan StorageEvent, logger *zap.Logger) {
watchPath := root + "/"
opts := []clientv3.OpOption{
clientv3.WithFromKey(),
}
ctx := context.Background()

watchChan := client.Watch(ctx, watchPath, opts...)

for {
select {
case cancel := <-stopWatcher:
// check
if cancel {
return
}
case result := <-watchChan:
for _, event := range result.Events {
logger.Info("received etcd event", zap.Any("event", event))

metastoreEvent, err := makeEtcdStorageEvent(event)
if err != nil {
logger.Warn(err.Error(), zap.Any("event", event))
continue
}

events <- *metastoreEvent
}
}
}
}(root, client, stopWatching, events, metastorelogger)

return &EtcdStorage{
client: client,
kv: clientv3.NewKV(client),
root: root,
logger: metastorelogger,
ctx: context.Background(),
stopWatcher: stopWatching,
events: events,
events: make(chan StorageEvent, 10),
requestTimeout: 3 * time.Second,
}, nil
}
Expand Down Expand Up @@ -178,6 +119,14 @@ func (m *EtcdStorage) Put(path string, content []byte) error {
return err
}

// Send event to the event channel.
storageEvent := &StorageEvent{
Type: StorageEventTypePut,
Path: fullPath,
Value: content,
}
m.events <- *storageEvent

return nil
}

Expand All @@ -192,6 +141,14 @@ func (m *EtcdStorage) Delete(path string) error {
return err
}

// Send event to the event channel.
storageEvent := &StorageEvent{
Type: StorageEventTypeDelete,
Path: fullPath,
Value: []byte{},
}
m.events <- *storageEvent

return nil
}

Expand All @@ -215,8 +172,6 @@ func (m *EtcdStorage) Exists(path string) (bool, error) {
}

func (m *EtcdStorage) Close() error {
m.stopWatcher <- true

if err := m.client.Close(); err != nil {
m.logger.Error(err.Error())
return err
Expand Down
38 changes: 38 additions & 0 deletions metastore_test/storage_etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,41 @@ func TestEtcdStorageList(t *testing.T) {
t.Fatalf("unexpected %v\v", paths)
}
}

func TestEtcdStorageWatch(t *testing.T) {
defer testutil.AfterTest(t)
integration.BeforeTest(t)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseTCP: true})
defer cluster.Terminate(t)

etcdEndpoints := cluster.RandClient().Endpoints()[0]
u, err := url.Parse(etcdEndpoints)
if err != nil {
t.Fatalf("%v\n", err)
}
endpoints := fmt.Sprintf("%s:%s", u.Hostname(), u.Port())

uri := fmt.Sprintf("etcd://phalanx-test/metastore?endpoints=%s", endpoints)
logger := logging.NewLogger("WARN", "", 500, 3, 30, false)

etcdStorage, err := metastore.NewEtcdStorageWithUri(uri, logger)
if err != nil {
t.Fatalf("%v\n", err)
}
defer etcdStorage.Close()

etcdStorage.Put("/test/hello.txt", []byte("hello"))
etcdStorage.Delete("/test/hello.txt")

event := <-etcdStorage.Events()
expected := metastore.StorageEventTypePut
if event.Type != expected {
t.Fatalf("%v\n", err)
}

event = <-etcdStorage.Events()
expected = metastore.StorageEventTypeDelete
if event.Type != expected {
t.Fatalf("%v\n", err)
}
}