diff --git a/.gitignore b/.gitignore index 37e6c12..6ffd79e 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,8 @@ bin/phalanx dist/ +.env + *.pem *.csr diff --git a/clients/s3.go b/clients/s3.go new file mode 100644 index 0000000..cd0d001 --- /dev/null +++ b/clients/s3.go @@ -0,0 +1,112 @@ +package clients + +import ( + "context" + "fmt" + "net/url" + "os" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/mosuka/phalanx/errors" +) + +func NewS3ClientWithUri(uri string) (*s3.Client, error) { + // Parse URI. + u, err := url.Parse(uri) + if err != nil { + return nil, err + } + + if u.Scheme != "s3" { + return nil, errors.ErrInvalidUri + } + + ctx := context.Background() + + var cfg aws.Config + + profile := os.Getenv("AWS_PROFILE") + if str := u.Query().Get("profile"); str != "" { + profile = str + } + if profile != "" { + cfg, err = config.LoadDefaultConfig(ctx, config.WithSharedConfigProfile(profile)) + if err != nil { + return nil, err + } + } else { + cfg, err = config.LoadDefaultConfig(ctx) + if err != nil { + return nil, err + } + } + + useCredentials := false + accessKeyId := os.Getenv("AWS_ACCESS_KEY_ID") + if str := u.Query().Get("access_key_id"); str != "" { + accessKeyId = str + } + if accessKeyId != "" { + useCredentials = true + } + + secretAccessKey := os.Getenv("AWS_SECRET_ACCESS_KEY") + if str := u.Query().Get("secret_access_key"); str != "" { + secretAccessKey = str + } + if secretAccessKey != "" { + useCredentials = true + } + + sessionToken := os.Getenv("AWS_SESSION_TOKEN") + if str := u.Query().Get("session_token"); str != "" { + sessionToken = str + } + if sessionToken != "" { + useCredentials = true + } + + if useCredentials { + cfg.Credentials = aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider(accessKeyId, secretAccessKey, sessionToken)) + } + + region := os.Getenv("AWS_DEFAULT_REGION") + if str := u.Query().Get("region"); str != "" { + region = str + } + if region != "" { + cfg.Region = region + } + + endpoint := os.Getenv("AWS_ENDPOINT") + if str := u.Query().Get("endpoint"); str != "" { + endpoint = str + } + if endpoint != "" { + customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { + return aws.Endpoint{ + PartitionID: "aws", + URL: endpoint, + SigningRegion: cfg.Region, + }, nil + }) + cfg.EndpointResolverWithOptions = customResolver + } + + usePathStyle := false + usePathStyleStr := os.Getenv("AWS_USE_PATH_STYLE") + if str := u.Query().Get("use_path_style"); str != "" { + usePathStyleStr = str + } + if usePathStyleStr == "true" { + fmt.Println("AWS_USE_PATH_STYLE is set to true. Using path style.") + usePathStyle = true + } + + return s3.NewFromConfig(cfg, func(options *s3.Options) { + options.UsePathStyle = usePathStyle + }), nil +} diff --git a/clients_integration_test/etcd_test.go b/clients_integration_test/etcd_test.go new file mode 100644 index 0000000..ded1fdd --- /dev/null +++ b/clients_integration_test/etcd_test.go @@ -0,0 +1,17 @@ +//go:build integration + +package clients_integration_test + +import ( + "testing" + + "github.com/mosuka/phalanx/clients" +) + +func TestNewEtcdClientWithUri(t *testing.T) { + uri := "etcd://phalanx-test/metastore/test?endpoints=localhost:2379" + + if _, err := clients.NewEtcdClientWithUri(uri); err != nil { + t.Fatalf("error %v\n", err) + } +} diff --git a/clients_integration_test/minio_test.go b/clients_integration_test/minio_test.go new file mode 100644 index 0000000..4a4fe68 --- /dev/null +++ b/clients_integration_test/minio_test.go @@ -0,0 +1,17 @@ +//go:build integration + +package clients_integration_test + +import ( + "testing" + + "github.com/mosuka/phalanx/clients" +) + +func TestNewMinioClientWithUri(t *testing.T) { + uri := "minio://phalanx-test/indexes/test?endpoint=localhost:9000" + + if _, err := clients.NewMinioClientWithUri(uri); err != nil { + t.Fatalf("error %v\n", err) + } +} diff --git a/clients_integration_test/s3_test.go b/clients_integration_test/s3_test.go new file mode 100644 index 0000000..4ff0801 --- /dev/null +++ b/clients_integration_test/s3_test.go @@ -0,0 +1,17 @@ +//go:build integration + +package clients_integration_test + +import ( + "testing" + + "github.com/mosuka/phalanx/clients" +) + +func TestNewS3ClientWithUri(t *testing.T) { + uri := "s3://phalanx-test/indexes/test?endpoint=http://localhost:4572" + + if _, err := clients.NewS3ClientWithUri(uri); err != nil { + t.Fatalf("error %v\n", err) + } +} diff --git a/cluster/cluster.go b/cluster/cluster.go index 6fd919b..8a8a55c 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -57,8 +57,6 @@ func NewCluster(host string, bindPort int, nodeMetadata NodeMetadata, isSeedNode // } // members.UpdateNode(10 * time.Second) - fmt.Println("cluster created", memberList.LocalNode().Name) - return &Cluster{ memberList: memberList, nodeEventDeliegate: nodeEventDeliegate, diff --git a/directory/directory.go b/directory/directory.go index 4f1b4e5..db588d0 100644 --- a/directory/directory.go +++ b/directory/directory.go @@ -21,8 +21,15 @@ const ( SchemeTypeMem SchemeTypeFile SchemeTypeMinio + SchemeTypeS3 ) +type uint64Slice []uint64 + +func (e uint64Slice) Len() int { return len(e) } +func (e uint64Slice) Swap(i, j int) { e[i], e[j] = e[j], e[i] } +func (e uint64Slice) Less(i, j int) bool { return e[i] < e[j] } + // Enum value maps for SchemeType. var ( SchemeType_name = map[SchemeType]string{ @@ -30,12 +37,14 @@ var ( SchemeTypeMem: "mem", SchemeTypeFile: "file", SchemeTypeMinio: "minio", + SchemeTypeS3: "s3", } SchemeType_value = map[string]SchemeType{ "unknown": SchemeTypeUnknown, "mem": SchemeTypeMem, "file": SchemeTypeFile, "minio": SchemeTypeMinio, + "s3": SchemeTypeS3, } ) @@ -64,6 +73,8 @@ func NewIndexConfigWithUri(uri string, lockUri string, logger *zap.Logger) (blug return FileSystemIndexConfig(uri, directoryLogger), nil case SchemeType_name[SchemeTypeMinio]: return MinioIndexConfig(uri, lockUri, directoryLogger), nil + case SchemeType_name[SchemeTypeS3]: + return S3IndexConfig(uri, lockUri, directoryLogger), nil default: err := errors.ErrUnsupportedDirectoryType directoryLogger.Error(err.Error(), zap.String("scheme", u.Scheme)) diff --git a/directory/directory_minio.go b/directory/directory_minio.go index 5e2bdfd..e72f420 100644 --- a/directory/directory_minio.go +++ b/directory/directory_minio.go @@ -23,13 +23,6 @@ import ( "go.uber.org/zap" ) -type uint64Slice []uint64 - -func (e uint64Slice) Len() int { return len(e) } -func (e uint64Slice) Swap(i, j int) { e[i], e[j] = e[j], e[i] } -func (e uint64Slice) Less(i, j int) bool { return e[i] < e[j] } - -// func MinioIndexConfig(uri string, lockManager lock.LockManager, logger *zap.Logger) bluge.Config { func MinioIndexConfig(uri string, lockUri string, logger *zap.Logger) bluge.Config { return bluge.DefaultConfigWithDirectory(func() index.Directory { return NewMinioDirectoryWithUri(uri, lockUri, logger) @@ -47,7 +40,6 @@ type MinioDirectory struct { logger *zap.Logger } -// func NewMinioDirectoryWithUri(uri string, lockManager lock.LockManager, logger *zap.Logger) *MinioDirectory { func NewMinioDirectoryWithUri(uri string, lockUri string, logger *zap.Logger) *MinioDirectory { directoryLogger := logger.Named("minio") @@ -120,8 +112,7 @@ func (d *MinioDirectory) Setup(readOnly bool) error { Region: region, } - err = d.client.MakeBucket(ctx, d.bucket, opts) - if err != nil { + if err = d.client.MakeBucket(ctx, d.bucket, opts); err != nil { d.logger.Error(err.Error(), zap.String("region", region), zap.String("bucket", d.bucket)) return err } @@ -236,8 +227,7 @@ func (d *MinioDirectory) Remove(kind string, id uint64) error { ctx, cancel := context.WithTimeout(d.ctx, d.requestTimeout) defer cancel() - err := d.client.RemoveObject(ctx, d.bucket, path, opts) - if err != nil { + if err := d.client.RemoveObject(ctx, d.bucket, path, opts); err != nil { d.logger.Error(err.Error(), zap.String("bucket", d.bucket), zap.String("path", path), zap.Any("opts", opts)) return err } diff --git a/directory/directory_s3.go b/directory/directory_s3.go new file mode 100644 index 0000000..912f624 --- /dev/null +++ b/directory/directory_s3.go @@ -0,0 +1,276 @@ +package directory + +import ( + "bytes" + "context" + "errors" + "fmt" + "github.com/aws/aws-sdk-go-v2/aws" + phalanxerrors "github.com/mosuka/phalanx/errors" + "io" + "io/ioutil" + "net/url" + "path/filepath" + "sort" + "strconv" + "time" + + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/blugelabs/bluge" + "github.com/blugelabs/bluge/index" + segment "github.com/blugelabs/bluge_segment_api" + "github.com/mosuka/phalanx/clients" + "github.com/mosuka/phalanx/lock" + "go.uber.org/zap" +) + +func S3IndexConfig(uri string, lockUri string, logger *zap.Logger) bluge.Config { + return bluge.DefaultConfigWithDirectory(func() index.Directory { + return NewS3DirectoryWithUri(uri, lockUri, logger) + }) +} + +type S3Directory struct { + bucket string + path string + client *s3.Client + ctx context.Context + requestTimeout time.Duration + lockUri string + lockManager lock.LockManager + logger *zap.Logger +} + +func NewS3DirectoryWithUri(uri string, lockUri string, logger *zap.Logger) *S3Directory { + directoryLogger := logger.Named("s3") + + client, err := clients.NewS3ClientWithUri(uri) + if err != nil { + logger.Error(err.Error(), zap.String("uri", uri)) + return nil + } + + // Parse URI. + u, err := url.Parse(uri) + if err != nil { + logger.Error(err.Error(), zap.String("uri", uri)) + return nil + } + if u.Scheme != SchemeType_name[SchemeTypeS3] { + err := phalanxerrors.ErrInvalidUri + logger.Error(err.Error(), zap.String("uri", uri)) + return nil + } + + return &S3Directory{ + client: client, + bucket: u.Host, + path: u.Path, + ctx: context.Background(), + requestTimeout: 3 * time.Second, + lockUri: lockUri, + lockManager: nil, + logger: directoryLogger, + } +} + +func (d *S3Directory) fileName(kind string, id uint64) string { + return fmt.Sprintf("%012x", id) + kind +} + +func (d *S3Directory) Setup(readOnly bool) error { + d.logger.Info("setup", zap.String("bucket", d.bucket), zap.String("path", d.path)) + + //ctx, cancel := context.WithTimeout(d.ctx, d.requestTimeout) + //defer cancel() + + input := &s3.CreateBucketInput{ + Bucket: aws.String(d.bucket), + } + + _, err := d.client.CreateBucket(d.ctx, input) + if err != nil { + var bne *types.BucketAlreadyExists + if errors.As(err, &bne) { + d.logger.Info(err.Error(), zap.String("bucket", d.bucket)) + } else { + d.logger.Error(err.Error(), zap.String("bucket", d.bucket)) + return err + } + } + + return nil +} + +func (d *S3Directory) List(kind string) ([]uint64, error) { + ctx, cancel := context.WithTimeout(d.ctx, d.requestTimeout) + defer cancel() + + input := &s3.ListObjectsV2Input{ + Bucket: aws.String(d.bucket), + } + + list, err := d.client.ListObjectsV2(ctx, input) + if err != nil { + d.logger.Error(err.Error(), zap.String("bucket", d.bucket)) + return nil, err + } + + var rv uint64Slice + for _, object := range list.Contents { + if filepath.Ext(*object.Key) != kind { + continue + } + + // E.g. indexes/wikipedia_en/000000000004.seg -> 000000000004 + base := filepath.Base(*object.Key) + base = base[:len(base)-len(kind)] + + var epoch uint64 + epoch, err := strconv.ParseUint(base, 16, 64) + if err != nil { + d.logger.Error(err.Error(), zap.String("base", base)) + return nil, err + } + rv = append(rv, epoch) + } + + sort.Sort(sort.Reverse(rv)) + + return rv, nil +} + +func (d *S3Directory) Load(kind string, id uint64) (*segment.Data, io.Closer, error) { + path := filepath.Join(d.path, d.fileName(kind, id)) + + ctx, cancel := context.WithTimeout(d.ctx, d.requestTimeout) + defer cancel() + + input := &s3.GetObjectInput{ + Bucket: aws.String(d.bucket), + Key: aws.String(path), + } + + object, err := d.client.GetObject(ctx, input) + if err != nil { + d.logger.Error(err.Error(), zap.String("bucket", d.bucket), zap.String("path", path)) + return nil, nil, err + } + + data, err := ioutil.ReadAll(object.Body) + if err != nil { + d.logger.Error(err.Error()) + return nil, nil, err + } + + return segment.NewDataBytes(data), nil, nil +} + +func (d *S3Directory) Persist(kind string, id uint64, w index.WriterTo, closeCh chan struct{}) error { + var buf bytes.Buffer + size, err := w.WriteTo(&buf, closeCh) + if err != nil { + d.logger.Error(err.Error()) + return err + } + + reader := bytes.NewReader(buf.Bytes()) + + path := filepath.Join(d.path, d.fileName(kind, id)) + + ctx, cancel := context.WithTimeout(d.ctx, d.requestTimeout) + defer cancel() + + input := &s3.PutObjectInput{ + Bucket: aws.String(d.bucket), + Key: aws.String(path), + Body: reader, + } + + if _, err := d.client.PutObject(ctx, input); err != nil { + d.logger.Error(err.Error(), zap.String("bucket", d.bucket), zap.String("path", path), zap.Int64("size", size)) + + // TODO: Remove the failed file. + return err + } + + return nil +} + +func (d *S3Directory) Remove(kind string, id uint64) error { + path := filepath.Join(d.path, d.fileName(kind, id)) + + ctx, cancel := context.WithTimeout(d.ctx, d.requestTimeout) + defer cancel() + + input := &s3.DeleteObjectInput{ + Bucket: aws.String(d.bucket), + Key: aws.String(path), + } + + if _, err := d.client.DeleteObject(ctx, input); err != nil { + d.logger.Error(err.Error(), zap.String("bucket", d.bucket), zap.String("path", path)) + return err + } + + return nil +} + +func (d *S3Directory) Stats() (uint64, uint64) { + numFilesOnDisk := uint64(0) + numBytesUsedDisk := uint64(0) + + ctx, cancel := context.WithTimeout(d.ctx, d.requestTimeout) + defer cancel() + + input := &s3.ListObjectsV2Input{ + Bucket: aws.String(d.bucket), + Prefix: aws.String(d.path), + } + + list, err := d.client.ListObjectsV2(ctx, input) + if err != nil { + d.logger.Error(err.Error(), zap.String("bucket", d.bucket)) + return 0, 0 + } + + for _, object := range list.Contents { + numFilesOnDisk++ + numBytesUsedDisk += uint64(object.Size) + } + + return numFilesOnDisk, numBytesUsedDisk +} + +func (d *S3Directory) Sync() error { + return nil +} + +func (d *S3Directory) Lock() error { + // Create lock manager + lockManager, err := lock.NewLockManagerWithUri(d.lockUri, d.logger) + if err != nil { + d.logger.Error(err.Error(), zap.String("lock_uri", d.lockUri)) + return err + } + d.lockManager = lockManager + + if _, err := d.lockManager.Lock(); err != nil { + d.logger.Error(err.Error()) + return err + } + + return nil +} + +func (d *S3Directory) Unlock() error { + if err := d.lockManager.Unlock(); err != nil { + d.logger.Error(err.Error()) + return err + } + + d.lockManager.Close() + + return nil +} diff --git a/directory_integration_test/directory_minio_test.go b/directory_integration_test/directory_minio_test.go new file mode 100644 index 0000000..e1c4c8f --- /dev/null +++ b/directory_integration_test/directory_minio_test.go @@ -0,0 +1,36 @@ +package directory_integration_test_test + +import ( + "testing" + + "github.com/mosuka/phalanx/directory" + "github.com/mosuka/phalanx/logging" +) + +func TestNewMinioDirectoryWithUri(t *testing.T) { + logger := logging.NewLogger("WARN", "", 500, 3, 30, false) + + uri := "minio://phalanx-test/indexes/test?endpoint=localhost:9000&access_key=minio&secret_key=miniosecret&secure=false®ion=us-east-1" + lockUri := "etcd://phalanx-test/locks/test?endpoints=localhost:2379" + + directory := directory.NewMinioDirectoryWithUri(uri, lockUri, logger) + if directory == nil { + t.Fatalf("failed to create MinIO directory\n") + } +} + +func TestMinIODirectorySetup(t *testing.T) { + logger := logging.NewLogger("WARN", "", 500, 3, 30, false) + + uri := "minio://phalanx-test/indexes/test?endpoint=localhost:9000&access_key=minio&secret_key=miniosecret&secure=false®ion=us-east-1" + lockUri := "etcd://phalanx-test/locks/test?endpoints=localhost:2379" + + directory := directory.NewMinioDirectoryWithUri(uri, lockUri, logger) + if directory == nil { + t.Fatalf("failed to create S3 directory\n") + } + + if err := directory.Setup(false); err != nil { + t.Fatalf("failed to setup S3 directory\n") + } +} diff --git a/directory_integration_test/directory_s3_test.go b/directory_integration_test/directory_s3_test.go new file mode 100644 index 0000000..a2ba2e1 --- /dev/null +++ b/directory_integration_test/directory_s3_test.go @@ -0,0 +1,36 @@ +package directory_integration_test_test + +import ( + "testing" + + "github.com/mosuka/phalanx/directory" + "github.com/mosuka/phalanx/logging" +) + +func TestNewS3DirectoryWithUri(t *testing.T) { + logger := logging.NewLogger("WARN", "", 500, 3, 30, false) + + uri := "s3://phalanx-test/indexes/test?endpoint=http://localhost:4566" + lockUri := "etcd://phalanx-test/locks/test?endpoints=localhost:2379" + + directory := directory.NewS3DirectoryWithUri(uri, lockUri, logger) + if directory == nil { + t.Fatalf("failed to create S3 directory\n") + } +} + +func TestS3DirectorySetup(t *testing.T) { + logger := logging.NewLogger("WARN", "", 500, 3, 30, false) + + uri := "s3://phalanx-test/indexes/test?endpoint=http://localhost:4566" + lockUri := "etcd://phalanx-test/locks/test?endpoints=localhost:2379" + + directory := directory.NewS3DirectoryWithUri(uri, lockUri, logger) + if directory == nil { + t.Fatalf("failed to create S3 directory\n") + } + + if err := directory.Setup(false); err != nil { + t.Fatalf("failed to setup S3 directory\n") + } +} diff --git a/directory_test/directory_fs_test.go b/directory_test/directory_fs_test.go new file mode 100644 index 0000000..46a0f7f --- /dev/null +++ b/directory_test/directory_fs_test.go @@ -0,0 +1,29 @@ +package directory_integration_test_test + +import ( + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/mosuka/phalanx/directory" + "github.com/mosuka/phalanx/logging" +) + +func TestNewFileSystemDirectoryWithUri(t *testing.T) { + logger := logging.NewLogger("WARN", "", 500, 3, 30, false) + + tmpDir, err := ioutil.TempDir("", "phalanx-test") + if err != nil { + t.Fatalf("%v\n", err) + } + defer os.RemoveAll(tmpDir) + + path := filepath.ToSlash(tmpDir) + uri := "file://" + path + + directory := directory.NewFileSystemDirectoryWithUri(uri, logger) + if directory == nil { + t.Fatalf("failed to create S3 directory\n") + } +} diff --git a/directory_test/directory_mem_test.go b/directory_test/directory_mem_test.go new file mode 100644 index 0000000..2d6a151 --- /dev/null +++ b/directory_test/directory_mem_test.go @@ -0,0 +1,21 @@ +package directory_integration_test_test + +import ( + "github.com/thanhpk/randstr" + "testing" + + "github.com/mosuka/phalanx/directory" + "github.com/mosuka/phalanx/logging" +) + +func TestNewInMemoryDirectoryWithUri(t *testing.T) { + logger := logging.NewLogger("WARN", "", 500, 3, 30, false) + + path := randstr.String(8) + uri := "mem://" + path + + directory := directory.NewInMemoryDirectoryWithUri(uri, logger) + if directory == nil { + t.Fatalf("failed to create In-Memory directory\n") + } +} diff --git a/directory_test/directory_minio_test.go b/directory_test/directory_minio_test.go deleted file mode 100644 index b3f256e..0000000 --- a/directory_test/directory_minio_test.go +++ /dev/null @@ -1,131 +0,0 @@ -package directory_test - -// import ( -// "testing" - -// "github.com/blugelabs/bluge/index" -// "github.com/mosuka/phalanx/lock" -// "github.com/mosuka/phalanx/logging" -// ) - -// func TestNewMinioDirectoryWithUri(t *testing.T) { -// logger := logging.NewLogger("WARN", "", 500, 3, 30, false) - -// lock_uri := "etcd://phalanx-test/locks/wikipedia_en?endpoints=localhost:2379" -// lockManager, err := lock.NewLockManagerWithUri(lock_uri, logger) -// if err != nil { -// t.Fatalf("%v\n", err) -// } - -// uri := "minio://phalanx-test/indexes/wikipedia_en?endpoint=localhost:9000&access_key=minio&secret_key=miniosecret&secure=false®ion=us-east-1" -// directory := NewMinioDirectoryWithUri(uri, lockManager, logger) -// if directory == nil { -// t.Fatalf("failed to create MinIO directory\n") -// } -// } - -// func TestMinioDirectorySetup(t *testing.T) { -// logger := logging.NewLogger("WARN", "", 500, 3, 30, false) - -// lock_uri := "etcd://phalanx-test/locks/wikipedia_en?endpoints=localhost:2379" -// lockManager, err := lock.NewLockManagerWithUri(lock_uri, logger) -// if err != nil { -// t.Fatalf("%v\n", err) -// } - -// uri := "minio://phalanx-test/indexes/wikipedia_en?endpoint=localhost:9000&access_key=minio&secret_key=miniosecret&secure=false®ion=us-east-1" -// directory := NewMinioDirectoryWithUri(uri, lockManager, logger) -// if directory == nil { -// t.Fatalf("failed to create MinIO directory\n") -// } - -// if err := directory.Setup(false); err != nil { -// t.Fatalf("%v\n", err) -// } -// } - -// func TestMinioDirectoryList(t *testing.T) { -// logger := logging.NewLogger("WARN", "", 500, 3, 30, false) - -// lock_uri := "etcd://phalanx-test/locks/wikipedia_en?endpoints=localhost:2379" -// lockManager, err := lock.NewLockManagerWithUri(lock_uri, logger) -// if err != nil { -// t.Fatalf("%v\n", err) -// } - -// uri := "minio://phalanx-test/indexes/wikipedia_en?endpoint=localhost:9000&access_key=minio&secret_key=miniosecret&secure=false®ion=us-east-1" -// directory := NewMinioDirectoryWithUri(uri, lockManager, logger) -// if directory == nil { -// t.Fatalf("failed to create MinIO directory\n") -// } - -// if err := directory.Setup(false); err != nil { -// t.Fatalf("%v\n", err) -// } - -// kind := index.ItemKindSegment - -// _, err = directory.List(kind) -// if err != nil { -// t.Fatalf("%v\n", err) -// } -// } - -// func TestMinioDirectoryLoad(t *testing.T) { -// logger := logging.NewLogger("WARN", "", 500, 3, 30, false) - -// lock_uri := "etcd://phalanx-test/locks/wikipedia_en?endpoints=localhost:2379" -// lockManager, err := lock.NewLockManagerWithUri(lock_uri, logger) -// if err != nil { -// t.Fatalf("%v\n", err) -// } - -// uri := "minio://phalanx-test/indexes/wikipedia_en?endpoint=localhost:9000&access_key=minio&secret_key=miniosecret&secure=false®ion=us-east-1" -// directory := NewMinioDirectoryWithUri(uri, lockManager, logger) -// if directory == nil { -// t.Fatalf("failed to create MinIO directory\n") -// } - -// if err := directory.Setup(false); err != nil { -// t.Fatalf("%v\n", err) -// } - -// kind := index.ItemKindSegment - -// ids, err := directory.List(kind) -// if err != nil { -// t.Fatalf("%v\n", err) -// } - -// id := ids[0] - -// _, _, err = directory.Load(kind, id) -// if err != nil { -// t.Fatalf("%v\n", err) -// } -// } - -// func TestMinioDirectoryLock(t *testing.T) { -// logger := logging.NewLogger("WARN", "", 500, 3, 30, false) - -// lock_uri := "etcd://phalanx-test/locks/wikipedia_en?endpoints=localhost:2379" -// lockManager, err := lock.NewLockManagerWithUri(lock_uri, logger) -// if err != nil { -// t.Fatalf("%v\n", err) -// } - -// uri := "minio://phalanx-test/indexes/wikipedia_en?endpoint=localhost:9000&access_key=minio&secret_key=miniosecret&secure=false®ion=us-east-1" -// directory := NewMinioDirectoryWithUri(uri, lockManager, logger) -// if directory == nil { -// t.Fatalf("failed to create MinIO directory\n") -// } - -// if err := directory.Setup(false); err != nil { -// t.Fatalf("%v\n", err) -// } - -// if err := directory.Lock(); err != nil { -// t.Fatalf("%v\n", err) -// } -// defer directory.Unlock() -// } diff --git a/docker-compose.yml b/docker-compose.yml index 820d112..ac6693d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,7 +3,7 @@ version: '3.7' services: etcd: container_name: etcd - image: quay.io/coreos/etcd:v3.5.1 + image: quay.io/coreos/etcd:latest entrypoint: /usr/local/bin/etcd command: - '--name=etcd' @@ -23,7 +23,7 @@ services: etcdkeeper: container_name: etcdkeeper - image: evildecay/etcdkeeper:v0.7.6 + image: evildecay/etcdkeeper:latest environment: HOST: etcdkeeper ports: @@ -34,7 +34,7 @@ services: minio: container_name: minio - image: minio/minio:RELEASE.2021-09-24T00-24-24Z + image: minio/minio:latest ports: - "9000:9000" - "9001:9001" @@ -49,6 +49,14 @@ services: retries: 3 restart: always + localstack: + image: localstack/localstack:latest + environment: + SERVICES: ${LOCALSTACK_SERVICES} + DEFAULT_REGION: ${LOCALSTACK_DEFAULT_REGION} + ports: + - "4566:4566" + phalanx: container_name: phalanx image: mosuka/phalanx:latest diff --git a/docs_md/SUMMARY.md b/docs_md/SUMMARY.md index a71f9e5..e1cb383 100644 --- a/docs_md/SUMMARY.md +++ b/docs_md/SUMMARY.md @@ -3,6 +3,7 @@ * [Build](/build.md) * [Get started](/get_started.md) * [Run with MinIO and etcd](/run_with_minio_etcd.md) +* [Run with AWS](/run_with_aws.md) * [Index Mapping](/index_mapping.md) * [Analyzer](/analyzer.md) * [Char filters](/analyzer/char_filters.md) diff --git a/docs_md/run_with_aws.md b/docs_md/run_with_aws.md new file mode 100644 index 0000000..6d874b1 --- /dev/null +++ b/docs_md/run_with_aws.md @@ -0,0 +1,93 @@ +# Run with AWS(LocalStack) + +To experience Phalanx functionality, let's start Phalanx with [MinIO](https://min.io/) and [etcd](https://etcd.io/). +This repository has a docker-compose.yml file. With it, you can easily launch MinIO and etcd on Docker. + +``` +% docker-compose up etcd etcdkeeper localstack +``` + +Once the container has been started, you can check the MinIO and etcd data in your browser at the following URL. + +- MinIO +http://localhost:9001/dashboard + +- ETCD Keeper +http://localhost:8080/etcdkeeper/ + + +## Start Phalanx with etcd metastore + +Start Phalanx with etcd specified as the metastore: + +``` +% ./bin/phalanx --index-metastore-uri=etcd://phalanx/metastore +``` + +### Create index with MinIO and etcd + +If you have started Phalanx to use MinIO and etcd, you can use this command to create an index. + +``` +% curl -XPUT -H 'Content-type: application/json' http://localhost:8000/v1/indexes/example --data-binary ' +{ + "index_uri": "s3://phalanx/indexes/example", + "lock_uri": "etcd://phalanx/locks/example", + "index_mapping": { + "id": { + "type": "numeric", + "options": { + "index": true, + "store": true, + "sortable": true, + "aggregatable": true + } + }, + "text": { + "type": "text", + "options": { + "index": true, + "store": true, + "term_positions": true, + "highlight": true, + "sortable": true, + "aggregatable": true + }, + "analyzer": { + "char_filters": [ + { + "name": "ascii_folding" + }, + { + "name": "unicode_normalize", + "options": { + "form": "NFKC" + } + } + ], + "tokenizer": { + "name": "unicode" + }, + "token_filters": [ + { + "name": "lower_case" + } + ] + } + } + }, + "num_shards": 1, + "default_search_field": "_all", + "default_analyzer": { + "tokenizer": { + "name": "unicode" + }, + "token_filters": [ + { + "name": "lower_case" + } + ] + } +} +' +``` diff --git a/errors/errors.go b/errors/errors.go index b807206..a47bb55 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -46,4 +46,6 @@ var ( ErrNodeDoesNotFound = errors.New("node not found") ErrInvalidData = errors.New("invalid data") + + ErrInvalidCredentials = errors.New("invalid credentials") ) diff --git a/.env b/example.env similarity index 56% rename from .env rename to example.env index d3ea36e..3df0fe4 100644 --- a/.env +++ b/example.env @@ -9,4 +9,14 @@ MINIO_SESSION_TOKEN= MINIO_ENDPOINT=127.0.0.1:9000 MINIO_SECURE=false +AWS_ACCESS_KEY_ID=dummy +AWS_SECRET_ACCESS_KEY=dummy +AWS_SESSION_TOKEN= +AWS_DEFAULT_REGION=ap-northeast-1 +AWS_ENDPOINT=http://localhost:4566 +AWS_USE_PATH_STYLE=true + ETCD_ENDPOINTS=127.0.0.1:2379 + +LOCALSTACK_SERVICES=s3 +LOCALSTACK_DEFAULT_REGION=ap-northeast-1 diff --git a/go.mod b/go.mod index 352b560..20e97e6 100644 --- a/go.mod +++ b/go.mod @@ -41,6 +41,21 @@ require ( require ( github.com/RoaringBitmap/roaring v0.9.1 // indirect github.com/armon/go-metrics v0.3.10 // indirect + github.com/aws/aws-sdk-go-v2 v1.12.0 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.1.0 // indirect + github.com/aws/aws-sdk-go-v2/config v1.12.0 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.7.0 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.9.0 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.3 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.1.0 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.3.3 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.6.0 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.6.0 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.10.0 // indirect + github.com/aws/aws-sdk-go-v2/service/s3 v1.23.0 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.8.0 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.13.0 // indirect + github.com/aws/smithy-go v1.9.1 // indirect github.com/axiomhq/hyperloglog v0.0.0-20191112132149-a4c4c47bc57f // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.2.0 // indirect diff --git a/go.sum b/go.sum index 88d670c..0715f98 100644 --- a/go.sum +++ b/go.sum @@ -69,6 +69,36 @@ github.com/armon/go-metrics v0.3.10 h1:FR+drcQStOe+32sYyJYyZ7FIdgoGGBnwLl+flodp8 github.com/armon/go-metrics v0.3.10/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= +github.com/aws/aws-sdk-go-v2 v1.12.0 h1:z5bijqy+eXLK/QqF6eQcwCN2qw1k+m9OUDicqCZygu0= +github.com/aws/aws-sdk-go-v2 v1.12.0/go.mod h1:tWhQI5N5SiMawto3uMAQJU5OUN/1ivhDDHq7HTsJvZ0= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.1.0 h1:Wkxd2/y6/QFlNQYD8ueQqGy/9BYBq/E7v7fNeLV2P8o= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.1.0/go.mod h1:VFAAzjEWnl0aWGwxREbyuC6qJOVnTANhKY4KYq2TPP0= +github.com/aws/aws-sdk-go-v2/config v1.12.0 h1:WOhIzj5HdixjlvQ4SLYAOk6OUUsuu88RwcsTzexa9cg= +github.com/aws/aws-sdk-go-v2/config v1.12.0/go.mod h1:GQONFVSDdG6RRho1C730SGNyDhS1kSTnxpOE76ptBqo= +github.com/aws/aws-sdk-go-v2/credentials v1.7.0 h1:KFuKwPs7i5SE5a0LxqAxz75qxSjr2HnHnhu0UPGlvpM= +github.com/aws/aws-sdk-go-v2/credentials v1.7.0/go.mod h1:Kmq64kahHJtXfmnEwnvRKeNjLBqkdP++Itln9BmQerE= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.9.0 h1:fPq3oloONbHaA0O8KX/KYUQk7pG9JjKBwYQvQsQDK84= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.9.0/go.mod h1:19SxQ+9zANyJCnNaoF3ovl8bFil4TaqCYEDdqNGKM+A= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.3 h1:YPNiEXnuWdkpNOwBFHhcLwkSmewwQRcPFO9dHmxU0qg= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.3/go.mod h1:L72JSFj9OwHwyukeuKFFyTj6uFWE4AjB0IQp97bd9Lc= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.1.0 h1:ArRd27pSm66f7cCBDPS77wvxiS4IRjFatpzVBD7Aojc= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.1.0/go.mod h1:KdVvdk4gb7iatuHZgIkIqvJlWHBtjCJLUtD/uO/FkWw= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.3 h1:fmGqMNlFTHr9Y48qmYYv2qIo+TAsST3qZa2d1HcwBeo= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.3/go.mod h1:N4dv+zawriMFZBO/6UKg3zt+XO6xWOQo1neAA0lFbo4= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.6.0 h1:zQlcDaAP0sk7jVSkBnBd4fc07M8bSAi6k1WjL48tB9M= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.6.0/go.mod h1:lzucjNKa47J5dstwdXwRrDLMEeWwOYK2+BgUKR3xthI= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.6.0 h1:rwE0kWa5qm0yEoNPwC3zhrt1tFVXTmkWRlUxLayAwyc= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.6.0/go.mod h1:wTgFkG6t7jS/6Y0SILXwfspV3IXowb6ngsAlSajW0Kc= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.10.0 h1:BIqXLjEbWh7vTj1pQ/63czJUsfck6UwSLpJjhsiZezI= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.10.0/go.mod h1:63zwSPj+6owUqMTuMk12LQBJobiEsCy286evNW+/Mhk= +github.com/aws/aws-sdk-go-v2/service/s3 v1.23.0 h1:4CUrngIysbIQpC56JchMWDNJpQCGVCElS5osSbr5qLc= +github.com/aws/aws-sdk-go-v2/service/s3 v1.23.0/go.mod h1:l+Y3grd9VGhuO7IlmFwAFNSDPFIDi/5oNa9jlk89KIc= +github.com/aws/aws-sdk-go-v2/service/sso v1.8.0 h1:X77LUt6Djy3Z02r6tW7Z+4FNr6GCnEG54EXfskc19M4= +github.com/aws/aws-sdk-go-v2/service/sso v1.8.0/go.mod h1:AB6v3BedyhVRIbPQbJnUsBmtup2pFiikpp5n3YyB6Ac= +github.com/aws/aws-sdk-go-v2/service/sts v1.13.0 h1:n8+dZMOvwkGtmhub8B2wYvRHut45/NB7DeNhNcUnBpg= +github.com/aws/aws-sdk-go-v2/service/sts v1.13.0/go.mod h1:jQto17aC9pJ6xRa1g29uXZhbcS6qNT3PSnKfPShq4sY= +github.com/aws/smithy-go v1.9.1 h1:5vetTooLk4hPWV8q6ym6+lXKAT1Urnm49YkrRKo2J8o= +github.com/aws/smithy-go v1.9.1/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= github.com/axiomhq/hyperloglog v0.0.0-20191112132149-a4c4c47bc57f h1:y06x6vGnFYfXUoVMbrcP1Uzpj4JG01eB5vRps9G8agM= github.com/axiomhq/hyperloglog v0.0.0-20191112132149-a4c4c47bc57f/go.mod h1:2stgcRjl6QmW+gU2h5E7BQXg4HU0gzxKWDuT5HviN9s= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= @@ -342,6 +372,8 @@ github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANyt github.com/influxdata/influxdb v1.7.6/go.mod h1:qZna6X/4elxqT3yI9iZYdZrWWdeFOOprn86kgg4+IzY= github.com/jinzhu/copier v0.3.4 h1:mfU6jI9PtCeUjkjQ322dlff9ELjGDu975C2p/nrubVI= github.com/jinzhu/copier v0.3.4/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/joho/godotenv v1.4.0 h1:3l4+N6zfMWnkbPEXKng2o2/MR5mSwTrBih4ZEkkz1lg= github.com/joho/godotenv v1.4.0/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= diff --git a/metastore_integration_test/storage_etcd_test.go b/metastore_integration_test/storage_etcd_test.go index cba513b..4c1d590 100644 --- a/metastore_integration_test/storage_etcd_test.go +++ b/metastore_integration_test/storage_etcd_test.go @@ -3,16 +3,19 @@ package metastore_integration_test import ( + "fmt" "reflect" "sort" "testing" "github.com/mosuka/phalanx/logging" "github.com/mosuka/phalanx/metastore" + "github.com/thanhpk/randstr" ) func TestEtcdStorageWithUri(t *testing.T) { - uri := "etcd://phalanx-test/metastore?endpoints=localhost:2379" + tmpDir := randstr.String(8) + uri := fmt.Sprintf("etcd://phalanx-test/metastore/newtest/%s?endpoints=localhost:2379", tmpDir) logger := logging.NewLogger("WARN", "", 500, 3, 30, false) etcdStorage, err := metastore.NewEtcdStorageWithUri(uri, logger) @@ -23,7 +26,8 @@ func TestEtcdStorageWithUri(t *testing.T) { } func TestEtcdStoragePut(t *testing.T) { - uri := "etcd://phalanx-test/metastore/puttest?endpoints=localhost:2379" + tmpDir := randstr.String(8) + uri := fmt.Sprintf("etcd://phalanx-test/metastore/puttest/%s?endpoints=localhost:2379", tmpDir) logger := logging.NewLogger("WARN", "", 500, 3, 30, false) etcdStorage, err := metastore.NewEtcdStorageWithUri(uri, logger) @@ -39,7 +43,8 @@ func TestEtcdStoragePut(t *testing.T) { } func TestEtcdStorageGet(t *testing.T) { - uri := "etcd://phalanx-test/metastore/gettest?endpoints=localhost:2379" + tmpDir := randstr.String(8) + uri := fmt.Sprintf("etcd://phalanx-test/metastore/gettest/%s?endpoints=localhost:2379", tmpDir) logger := logging.NewLogger("WARN", "", 500, 3, 30, false) etcdStorage, err := metastore.NewEtcdStorageWithUri(uri, logger) @@ -61,7 +66,8 @@ func TestEtcdStorageGet(t *testing.T) { } func TestEtcdStorageDelete(t *testing.T) { - uri := "etcd://phalanx-test/metastore/deletetest?endpoints=localhost:2379" + tmpDir := randstr.String(8) + uri := fmt.Sprintf("etcd://phalanx-test/metastore/deletetest/%s?endpoints=localhost:2379", tmpDir) logger := logging.NewLogger("WARN", "", 500, 3, 30, false) etcdStorage, err := metastore.NewEtcdStorageWithUri(uri, logger) @@ -82,7 +88,8 @@ func TestEtcdStorageDelete(t *testing.T) { } func TestEtcdStorageExists(t *testing.T) { - uri := "etcd://phalanx-test/metastore/existstest?endpoints=localhost:2379" + tmpDir := randstr.String(8) + uri := fmt.Sprintf("etcd://phalanx-test/metastore/existstest/%s?endpoints=localhost:2379", tmpDir) logger := logging.NewLogger("WARN", "", 500, 3, 30, false) etcdStorage, err := metastore.NewEtcdStorageWithUri(uri, logger) @@ -116,7 +123,8 @@ func TestEtcdStorageExists(t *testing.T) { } func TestEtcdStorageList(t *testing.T) { - uri := "etcd://phalanx-test/metastore/listtest?endpoints=localhost:2379" + tmpDir := randstr.String(8) + uri := fmt.Sprintf("etcd://phalanx-test/metastore/listtest/%s?endpoints=localhost:2379", tmpDir) logger := logging.NewLogger("WARN", "", 500, 3, 30, false) etcdStorage, err := metastore.NewEtcdStorageWithUri(uri, logger) diff --git a/server/index_service.go b/server/index_service.go index ec5c577..30f8497 100644 --- a/server/index_service.go +++ b/server/index_service.go @@ -5,6 +5,8 @@ import ( "encoding/json" "fmt" "math/rand" + "net/url" + "path" "strings" "sync" "time" @@ -534,11 +536,28 @@ func (s *IndexService) CreateIndex(req *proto.CreateIndexRequest) (*proto.Create // If the index lock is omitted, the shard lock is also omitted. shardLockUri := "" if req.LockUri != "" { - shardLockUri = fmt.Sprintf("%s/%s", req.LockUri, shardName) + // Parse URI. + lu, err := url.Parse(req.LockUri) + if err != nil { + return nil, err + } + lu.Path = path.Join(lu.Path, shardName) + + //shardLockUri = fmt.Sprintf("%s/%s", req.LockUri, shardName) + shardLockUri = lu.String() + } + + // Parse URI. + iu, err := url.Parse(req.IndexUri) + if err != nil { + return nil, err } + iu.Path = path.Join(iu.Path, shardName) + shardMetadata := &phalanxmetastore.ShardMetadata{ - ShardName: shardName, - ShardUri: fmt.Sprintf("%s/%s", req.IndexUri, shardName), + ShardName: shardName, + //ShardUri: fmt.Sprintf("%s/%s", req.IndexUri, shardName), + ShardUri: iu.String(), ShardLockUri: shardLockUri, }