From 16bd3b5628f360d8eb929e072771edeb9944e439 Mon Sep 17 00:00:00 2001 From: Ryo Date: Wed, 27 Aug 2025 20:58:56 +0800 Subject: [PATCH] feat(infra): add object tagging support for PutObject and ListObjects (#1845) --- backend/infra/contract/storage/option.go | 12 ++ backend/infra/contract/storage/storage.go | 14 ++- backend/infra/impl/storage/minio/minio.go | 50 ++++++--- backend/infra/impl/storage/s3/s3.go | 106 ++++++++++++------ backend/infra/impl/storage/tos/tos.go | 44 ++++++-- .../infra/contract/storage/storage_mock.go | 12 +- 6 files changed, 171 insertions(+), 67 deletions(-) diff --git a/backend/infra/contract/storage/option.go b/backend/infra/contract/storage/option.go index 127e7e84..4d1af121 100644 --- a/backend/infra/contract/storage/option.go +++ b/backend/infra/contract/storage/option.go @@ -38,11 +38,23 @@ type PutOption struct { ContentDisposition *string ContentLanguage *string Expires *time.Time + Tagging map[string]string ObjectSize int64 } type PutOptFn func(option *PutOption) +func WithTagging(tag map[string]string) PutOptFn { + return func(o *PutOption) { + if len(tag) > 0 { + o.Tagging = make(map[string]string, len(tag)) + for k, v := range tag { + o.Tagging[k] = v + } + } + } +} + func WithContentType(v string) PutOptFn { return func(o *PutOption) { o.ContentType = &v diff --git a/backend/infra/contract/storage/storage.go b/backend/infra/contract/storage/storage.go index 4473b215..f3497561 100644 --- a/backend/infra/contract/storage/storage.go +++ b/backend/infra/contract/storage/storage.go @@ -24,15 +24,20 @@ import ( //go:generate mockgen -destination ../../../internal/mock/infra/contract/storage/storage_mock.go -package mock -source storage.go Factory type Storage interface { + // PutObject puts the object with the specified key. PutObject(ctx context.Context, objectKey string, content []byte, opts ...PutOptFn) error + // PutObjectWithReader puts the object with the specified key. PutObjectWithReader(ctx context.Context, objectKey string, content io.Reader, opts ...PutOptFn) error + // GetObject returns the object with the specified key. GetObject(ctx context.Context, objectKey string) ([]byte, error) + // DeleteObject deletes the object with the specified key. DeleteObject(ctx context.Context, objectKey string) error + // GetObjectUrl returns a presigned URL for the object. + // The URL is valid for the specified duration. GetObjectUrl(ctx context.Context, objectKey string, opts ...GetOptFn) (string, error) - // ListObjects returns all objects with the specified prefix. + // ListAllObjects returns all objects with the specified prefix. // It may return a large number of objects, consider using ListObjectsPaginated for better performance. - ListObjects(ctx context.Context, prefix string) ([]*FileInfo, error) - + ListAllObjects(ctx context.Context, prefix string, withTagging bool) ([]*FileInfo, error) // ListObjectsPaginated returns objects with pagination support. // Use this method when dealing with large number of objects. ListObjectsPaginated(ctx context.Context, input *ListObjectsPaginatedInput) (*ListObjectsPaginatedOutput, error) @@ -50,6 +55,8 @@ type ListObjectsPaginatedInput struct { Prefix string PageSize int Cursor string + // Include objects tagging in the listing + WithTagging bool } type ListObjectsPaginatedOutput struct { @@ -64,4 +71,5 @@ type FileInfo struct { LastModified time.Time ETag string Size int64 + Tagging map[string]string } diff --git a/backend/infra/impl/storage/minio/minio.go b/backend/infra/impl/storage/minio/minio.go index 6858d8ce..0a32953f 100644 --- a/backend/infra/impl/storage/minio/minio.go +++ b/backend/infra/impl/storage/minio/minio.go @@ -21,7 +21,6 @@ import ( "context" "fmt" "io" - "log" "math/rand" "net/url" "time" @@ -99,34 +98,45 @@ func (m *minioClient) test() { ctx := context.Background() objectName := fmt.Sprintf("test-file-%d.txt", rand.Int()) - m.ListObjects(ctx, "") + err := m.PutObject(ctx, objectName, []byte("hello content"), + storage.WithContentType("text/plain"), storage.WithTagging(map[string]string{ + "uid": "7543149965070155780", + "conversation_id": "7543149965070155781", + "type": "user", + })) + if err != nil { + logs.CtxErrorf(ctx, "upload file failed: %v", err) + } - err := m.PutObject(ctx, objectName, []byte("hello content"), storage.WithContentType("text/plain")) + logs.CtxInfof(ctx, "upload file success") + + files, err := m.ListAllObjects(ctx, "test-file-", true) if err != nil { - log.Fatalf("upload file failed: %v", err) + logs.CtxErrorf(ctx, "list objects failed: %v", err) } - log.Printf("upload file success") + + logs.CtxInfof(ctx, "list objects success, files.len: %v", len(files)) url, err := m.GetObjectUrl(ctx, objectName) if err != nil { - log.Fatalf("get file url failed: %v", err) + logs.CtxErrorf(ctx, "get file url failed: %v", err) } - log.Printf("get file url success, url: %s", url) + logs.CtxInfof(ctx, "get file url success, url: %s", url) content, err := m.GetObject(ctx, objectName) if err != nil { - log.Fatalf("download file failed: %v", err) + logs.CtxErrorf(ctx, "download file failed: %v", err) } - log.Printf("download file success, content: %s", string(content)) + logs.CtxInfof(ctx, "download file success, content: %s", string(content)) err = m.DeleteObject(ctx, objectName) if err != nil { - log.Fatalf("delete object failed: %v", err) + logs.CtxErrorf(ctx, "delete object failed: %v", err) } - log.Printf("delete object success") + logs.CtxInfof(ctx, "delete object success") } func (m *minioClient) PutObject(ctx context.Context, objectKey string, content []byte, opts ...storage.PutOptFn) error { @@ -161,6 +171,10 @@ func (m *minioClient) PutObjectWithReader(ctx context.Context, objectKey string, minioOpts.Expires = *option.Expires } + if option.Tagging != nil { + minioOpts.UserTags = option.Tagging + } + _, err := m.client.PutObject(ctx, m.bucketName, objectKey, content, option.ObjectSize, minioOpts) if err != nil { @@ -223,7 +237,7 @@ func (m *minioClient) ListObjectsPaginated(ctx context.Context, input *storage.L return nil, fmt.Errorf("page size must be positive") } - files, err := m.ListObjects(ctx, input.Prefix) + files, err := m.ListAllObjects(ctx, input.Prefix, input.WithTagging) if err != nil { return nil, err } @@ -235,10 +249,11 @@ func (m *minioClient) ListObjectsPaginated(ctx context.Context, input *storage.L }, nil } -func (m *minioClient) ListObjects(ctx context.Context, prefix string) ([]*storage.FileInfo, error) { +func (m *minioClient) ListAllObjects(ctx context.Context, prefix string, withTagging bool) ([]*storage.FileInfo, error) { opts := minio.ListObjectsOptions{ - Prefix: prefix, - Recursive: true, + Prefix: prefix, + Recursive: true, + WithMetadata: withTagging, } objectCh := m.client.ListObjects(ctx, m.bucketName, opts) @@ -248,14 +263,17 @@ func (m *minioClient) ListObjects(ctx context.Context, prefix string) ([]*storag if object.Err != nil { return nil, object.Err } + files = append(files, &storage.FileInfo{ Key: object.Key, LastModified: object.LastModified, ETag: object.ETag, Size: object.Size, + Tagging: object.UserTags, }) - logs.CtxDebugf(ctx, "key = %s, lastModified = %s, eTag = %s, size = %d", object.Key, object.LastModified, object.ETag, object.Size) + logs.CtxDebugf(ctx, "key = %s, lastModified = %s, eTag = %s, size = %d, tagging = %v", + object.Key, object.LastModified, object.ETag, object.Size, object.UserTags) } return files, nil diff --git a/backend/infra/impl/storage/s3/s3.go b/backend/infra/impl/storage/s3/s3.go index e21ef0fb..3b75aa2e 100644 --- a/backend/infra/impl/storage/s3/s3.go +++ b/backend/infra/impl/storage/s3/s3.go @@ -21,16 +21,19 @@ import ( "context" "fmt" "io" + "net/url" "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/coze-dev/coze-studio/backend/infra/contract/storage" "github.com/coze-dev/coze-studio/backend/infra/impl/storage/proxy" "github.com/coze-dev/coze-studio/backend/pkg/logs" + "github.com/coze-dev/coze-studio/backend/pkg/taskgroup" ) type s3Client struct { @@ -178,6 +181,11 @@ func (t *s3Client) PutObjectWithReader(ctx context.Context, objectKey string, co input.ContentLength = aws.Int64(option.ObjectSize) } + if option.Tagging != nil { + tagging := mapToQueryParams(option.Tagging) + input.Tagging = aws.String(tagging) + } + // upload object _, err := client.PutObject(ctx, input) return err @@ -239,49 +247,36 @@ func (t *s3Client) GetObjectUrl(ctx context.Context, objectKey string, opts ...s return req.URL, nil } -func (t *s3Client) ListObjects(ctx context.Context, prefix string) ([]*storage.FileInfo, error) { - client := t.client - bucket := t.bucketName +func (t *s3Client) ListAllObjects(ctx context.Context, prefix string, withTagging bool) ([]*storage.FileInfo, error) { const ( DefaultPageSize = 100 MaxListObjects = 10000 ) - input := &s3.ListObjectsV2Input{ - Bucket: aws.String(bucket), - Prefix: aws.String(prefix), - MaxKeys: aws.Int32(DefaultPageSize), - } - - paginator := s3.NewListObjectsV2Paginator(client, input) - var files []*storage.FileInfo - for paginator.HasMorePages() { - page, err := paginator.NextPage(ctx) + var cursor string + for { + output, err := t.ListObjectsPaginated(ctx, &storage.ListObjectsPaginatedInput{ + Prefix: prefix, + PageSize: DefaultPageSize, + WithTagging: withTagging, + Cursor: cursor, + }) + if err != nil { - return nil, fmt.Errorf("failed to get page, %v", err) + return nil, err } - for _, obj := range page.Contents { - f := &storage.FileInfo{} - if obj.Key != nil { - f.Key = *obj.Key - } - if obj.LastModified != nil { - f.LastModified = *obj.LastModified - } - if obj.ETag != nil { - f.ETag = *obj.ETag - } - if obj.Size != nil { - f.Size = *obj.Size - } - - files = append(files, f) - } + cursor = output.Cursor + + files = append(files, output.Files...) if len(files) >= MaxListObjects { - logs.CtxErrorf(ctx, "[ListObjects] max list objects reached, total: %d", len(files)) + logs.CtxErrorf(ctx, "list objects failed, max list objects: %d", MaxListObjects) + break + } + + if !output.IsTruncated { break } } @@ -340,5 +335,52 @@ func (t *s3Client) ListObjectsPaginated(ctx context.Context, input *storage.List output.Cursor = *p.NextContinuationToken } + if input.WithTagging { + taskGroup := taskgroup.NewTaskGroup(ctx, 5) + for idx := range files { + f := files[idx] + taskGroup.Go(func() error { + tagging, err := client.GetObjectTagging(ctx, &s3.GetObjectTaggingInput{ + Bucket: aws.String(bucket), + Key: aws.String(f.Key), + }) + if err != nil { + return err + } + + f.Tagging = tagsToMap(tagging.TagSet) + return nil + }) + } + + if err := taskGroup.Wait(); err != nil { + return nil, err + } + } + return output, nil } + +func mapToQueryParams(tagging map[string]string) string { + if len(tagging) == 0 { + return "" + } + params := url.Values{} + for k, v := range tagging { + params.Set(k, v) + } + return params.Encode() +} + +func tagsToMap(tags []types.Tag) map[string]string { + if len(tags) == 0 { + return nil + } + m := make(map[string]string, len(tags)) + for _, tag := range tags { + if tag.Key != nil && tag.Value != nil { + m[*tag.Key] = *tag.Value + } + } + return m +} diff --git a/backend/infra/impl/storage/tos/tos.go b/backend/infra/impl/storage/tos/tos.go index 4a383fb3..051902eb 100644 --- a/backend/infra/impl/storage/tos/tos.go +++ b/backend/infra/impl/storage/tos/tos.go @@ -73,15 +73,20 @@ func getTosClient(ctx context.Context, ak, sk, bucketName, endpoint, region stri func (t *tosClient) test() { // test list objects ctx := context.Background() - t.ListObjects(ctx, "") // test upload objectKey := fmt.Sprintf("test-%s.txt", time.Now().Format("20060102150405")) - err := t.PutObject(context.Background(), objectKey, []byte("hello world")) + err := t.PutObject(context.Background(), objectKey, []byte("hello world"), storage.WithTagging(map[string]string{ + "uid": "7543149965070155780", + "conversation_id": "7543149965070155781", + "type": "user", + })) if err != nil { logs.CtxErrorf(context.Background(), "PutObject failed, objectKey: %s, err: %v", objectKey, err) } + t.ListAllObjects(ctx, "", true) + // test download content, err := t.GetObject(context.Background(), objectKey) if err != nil { @@ -175,6 +180,10 @@ func (t *tosClient) PutObjectWithReader(ctx context.Context, objectKey string, c input.ContentLength = option.ObjectSize } + if len(option.Tagging) > 0 { + input.Meta = option.Tagging + } + _, err := client.PutObjectV2(ctx, input) return err @@ -251,9 +260,10 @@ func (t *tosClient) ListObjectsPaginated(ctx context.Context, input *storage.Lis output, err := t.client.ListObjectsV2(ctx, &tos.ListObjectsV2Input{ Bucket: t.bucketName, ListObjectsInput: tos.ListObjectsInput{ - MaxKeys: int(input.PageSize), - Marker: input.Cursor, - Prefix: input.Prefix, + MaxKeys: int(input.PageSize), + Marker: input.Cursor, + Prefix: input.Prefix, + FetchMeta: input.WithTagging, }, }) if err != nil { @@ -267,11 +277,23 @@ func (t *tosClient) ListObjectsPaginated(ctx context.Context, input *storage.Lis continue } + var tagging map[string]string + if obj.Meta != nil { + obj.Meta.Range(func(key, value string) bool { + if tagging == nil { + tagging = make(map[string]string) + } + tagging[key] = value + return true + }) + } + files = append(files, &storage.FileInfo{ Key: obj.Key, LastModified: obj.LastModified, ETag: obj.ETag, Size: obj.Size, + Tagging: tagging, }) } @@ -282,7 +304,7 @@ func (t *tosClient) ListObjectsPaginated(ctx context.Context, input *storage.Lis }, nil } -func (t *tosClient) ListObjects(ctx context.Context, prefix string) ([]*storage.FileInfo, error) { +func (t *tosClient) ListAllObjects(ctx context.Context, prefix string, withTagging bool) ([]*storage.FileInfo, error) { const ( DefaultPageSize = 100 MaxListObjects = 10000 @@ -293,16 +315,18 @@ func (t *tosClient) ListObjects(ctx context.Context, prefix string) ([]*storage. for { output, err := t.ListObjectsPaginated(ctx, &storage.ListObjectsPaginatedInput{ - Prefix: prefix, - PageSize: DefaultPageSize, - Cursor: cursor, + Prefix: prefix, + PageSize: DefaultPageSize, + Cursor: cursor, + WithTagging: withTagging, }) if err != nil { return nil, fmt.Errorf("list objects failed, prefix = %v, err: %v", prefix, err) } for _, object := range output.Files { - logs.CtxDebugf(ctx, "key = %s, lastModified = %s, eTag = %s, size = %d", object.Key, object.LastModified, object.ETag, object.Size) + logs.CtxDebugf(ctx, "key = %s, lastModified = %s, eTag = %s, size = %d, tagging = %v", + object.Key, object.LastModified, object.ETag, object.Size, object.Tagging) files = append(files, object) } diff --git a/backend/internal/mock/infra/contract/storage/storage_mock.go b/backend/internal/mock/infra/contract/storage/storage_mock.go index 45cd6188..6749e4c2 100644 --- a/backend/internal/mock/infra/contract/storage/storage_mock.go +++ b/backend/internal/mock/infra/contract/storage/storage_mock.go @@ -91,19 +91,19 @@ func (mr *MockStorageMockRecorder) GetObjectUrl(ctx, objectKey any, opts ...any) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetObjectUrl", reflect.TypeOf((*MockStorage)(nil).GetObjectUrl), varargs...) } -// ListObjects mocks base method. -func (m *MockStorage) ListObjects(ctx context.Context, prefix string) ([]*storage.FileInfo, error) { +// ListAllObjects mocks base method. +func (m *MockStorage) ListAllObjects(ctx context.Context, prefix string, withTagging bool) ([]*storage.FileInfo, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ListObjects", ctx, prefix) + ret := m.ctrl.Call(m, "ListAllObjects", ctx, prefix, withTagging) ret0, _ := ret[0].([]*storage.FileInfo) ret1, _ := ret[1].(error) return ret0, ret1 } -// ListObjects indicates an expected call of ListObjects. -func (mr *MockStorageMockRecorder) ListObjects(ctx, prefix any) *gomock.Call { +// ListAllObjects indicates an expected call of ListAllObjects. +func (mr *MockStorageMockRecorder) ListAllObjects(ctx, prefix, withTagging any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListObjects", reflect.TypeOf((*MockStorage)(nil).ListObjects), ctx, prefix) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListAllObjects", reflect.TypeOf((*MockStorage)(nil).ListAllObjects), ctx, prefix, withTagging) } // ListObjectsPaginated mocks base method.