diff --git a/drivers/s3/other.go b/drivers/s3/other.go index c5f6b83e..e299dae0 100644 --- a/drivers/s3/other.go +++ b/drivers/s3/other.go @@ -14,24 +14,31 @@ import ( "github.com/aws/aws-sdk-go/service/s3" ) -type archiveRequest struct { +const ( + OtherMethodArchive = "archive" + OtherMethodArchiveStatus = "archive_status" + OtherMethodThaw = "thaw" + OtherMethodThawStatus = "thaw_status" +) + +type ArchiveRequest struct { StorageClass string `json:"storage_class"` } -type thawRequest struct { +type ThawRequest struct { Days int64 `json:"days"` Tier string `json:"tier"` } -type objectDescriptor struct { +type ObjectDescriptor struct { Path string `json:"path"` Bucket string `json:"bucket"` Key string `json:"key"` } -type archiveResponse struct { +type ArchiveResponse struct { Action string `json:"action"` - Object objectDescriptor `json:"object"` + Object ObjectDescriptor `json:"object"` StorageClass string `json:"storage_class"` RequestID string `json:"request_id,omitempty"` VersionID string `json:"version_id,omitempty"` @@ -39,14 +46,14 @@ type archiveResponse struct { LastModified string `json:"last_modified,omitempty"` } -type thawResponse struct { +type ThawResponse struct { Action string `json:"action"` - Object objectDescriptor `json:"object"` + Object ObjectDescriptor `json:"object"` RequestID string `json:"request_id,omitempty"` - Status *restoreStatus `json:"status,omitempty"` + Status *RestoreStatus `json:"status,omitempty"` } -type restoreStatus struct { +type RestoreStatus struct { Ongoing bool `json:"ongoing"` Expiry string `json:"expiry,omitempty"` Raw string `json:"raw"` @@ -76,14 +83,14 @@ func (d *S3) Other(ctx context.Context, args model.OtherArgs) (interface{}, erro func (d *S3) archive(ctx context.Context, args model.OtherArgs) (interface{}, error) { key := getKey(args.Obj.GetPath(), false) - payload := archiveRequest{} - if err := decodeOtherArgs(args.Data, &payload); err != nil { + payload := ArchiveRequest{} + if err := DecodeOtherArgs(args.Data, &payload); err != nil { return nil, fmt.Errorf("parse archive request: %w", err) } if payload.StorageClass == "" { return nil, fmt.Errorf("storage_class is required") } - storageClass := normalizeStorageClass(payload.StorageClass) + storageClass := NormalizeStorageClass(payload.StorageClass) input := &s3.CopyObjectInput{ Bucket: &d.Bucket, Key: &key, @@ -97,7 +104,7 @@ func (d *S3) archive(ctx context.Context, args model.OtherArgs) (interface{}, er return nil, err } - resp := archiveResponse{ + resp := ArchiveResponse{ Action: "archive", Object: d.describeObject(args.Obj, key), StorageClass: storageClass, @@ -126,7 +133,7 @@ func (d *S3) archiveStatus(ctx context.Context, args model.OtherArgs) (interface if err != nil { return nil, err } - return archiveResponse{ + return ArchiveResponse{ Action: "archive_status", Object: d.describeObject(args.Obj, key), StorageClass: status.StorageClass, @@ -135,8 +142,8 @@ func (d *S3) archiveStatus(ctx context.Context, args model.OtherArgs) (interface func (d *S3) thaw(ctx context.Context, args model.OtherArgs) (interface{}, error) { key := getKey(args.Obj.GetPath(), false) - payload := thawRequest{Days: 1} - if err := decodeOtherArgs(args.Data, &payload); err != nil { + payload := ThawRequest{Days: 1} + if err := DecodeOtherArgs(args.Data, &payload); err != nil { return nil, fmt.Errorf("parse thaw request: %w", err) } if payload.Days <= 0 { @@ -145,7 +152,7 @@ func (d *S3) thaw(ctx context.Context, args model.OtherArgs) (interface{}, error restoreRequest := &s3.RestoreRequest{ Days: aws.Int64(payload.Days), } - if tier := normalizeRestoreTier(payload.Tier); tier != "" { + if tier := NormalizeRestoreTier(payload.Tier); tier != "" { restoreRequest.GlacierJobParameters = &s3.GlacierJobParameters{Tier: aws.String(tier)} } input := &s3.RestoreObjectInput{ @@ -159,7 +166,7 @@ func (d *S3) thaw(ctx context.Context, args model.OtherArgs) (interface{}, error return nil, err } status, _ := d.describeObjectStatus(ctx, key) - resp := thawResponse{ + resp := ThawResponse{ Action: "thaw", Object: d.describeObject(args.Obj, key), RequestID: restoreReq.RequestID, @@ -176,15 +183,15 @@ func (d *S3) thawStatus(ctx context.Context, args model.OtherArgs) (interface{}, if err != nil { return nil, err } - return thawResponse{ + return ThawResponse{ Action: "thaw_status", Object: d.describeObject(args.Obj, key), Status: status.Restore, }, nil } -func (d *S3) describeObject(obj model.Obj, key string) objectDescriptor { - return objectDescriptor{ +func (d *S3) describeObject(obj model.Obj, key string) ObjectDescriptor { + return ObjectDescriptor{ Path: obj.GetPath(), Bucket: d.Bucket, Key: key, @@ -193,7 +200,7 @@ func (d *S3) describeObject(obj model.Obj, key string) objectDescriptor { type objectStatus struct { StorageClass string - Restore *restoreStatus + Restore *RestoreStatus } func (d *S3) describeObjectStatus(ctx context.Context, key string) (*objectStatus, error) { @@ -208,7 +215,7 @@ func (d *S3) describeObjectStatus(ctx context.Context, key string) (*objectStatu return status, nil } -func parseRestoreHeader(header *string) *restoreStatus { +func parseRestoreHeader(header *string) *RestoreStatus { if header == nil { return nil } @@ -216,7 +223,7 @@ func parseRestoreHeader(header *string) *restoreStatus { if value == "" { return nil } - status := &restoreStatus{Raw: value} + status := &RestoreStatus{Raw: value} parts := strings.Split(value, ",") for _, part := range parts { part = strings.TrimSpace(part) @@ -240,7 +247,7 @@ func parseRestoreHeader(header *string) *restoreStatus { return status } -func decodeOtherArgs(data interface{}, target interface{}) error { +func DecodeOtherArgs(data interface{}, target interface{}) error { if data == nil { return nil } @@ -251,7 +258,7 @@ func decodeOtherArgs(data interface{}, target interface{}) error { return json.Unmarshal(raw, target) } -func normalizeStorageClass(value string) string { +func NormalizeStorageClass(value string) string { normalized := strings.ToLower(strings.TrimSpace(strings.ReplaceAll(value, "-", "_"))) if normalized == "" { return value @@ -262,7 +269,7 @@ func normalizeStorageClass(value string) string { return value } -func normalizeRestoreTier(value string) string { +func NormalizeRestoreTier(value string) string { normalized := strings.ToLower(strings.TrimSpace(value)) switch normalized { case "", "default": diff --git a/internal/bootstrap/task.go b/internal/bootstrap/task.go index c67e3029..8f05b84a 100644 --- a/internal/bootstrap/task.go +++ b/internal/bootstrap/task.go @@ -37,6 +37,18 @@ func InitTaskManager() { if len(tool.TransferTaskManager.GetAll()) == 0 { //prevent offline downloaded files from being deleted CleanTempDir() } + workers := conf.Conf.Tasks.S3Transition.Workers + if workers < 0 { + workers = 0 + } + fs.S3TransitionTaskManager = tache.NewManager[*fs.S3TransitionTask]( + tache.WithWorks(workers), + tache.WithPersistFunction( + db.GetTaskDataFunc("s3_transition", conf.Conf.Tasks.S3Transition.TaskPersistant), + db.UpdateTaskDataFunc("s3_transition", conf.Conf.Tasks.S3Transition.TaskPersistant), + ), + tache.WithMaxRetry(conf.Conf.Tasks.S3Transition.MaxRetry), + ) fs.ArchiveDownloadTaskManager = tache.NewManager[*fs.ArchiveDownloadTask](tache.WithWorks(setting.GetInt(conf.TaskDecompressDownloadThreadsNum, conf.Conf.Tasks.Decompress.Workers)), tache.WithPersistFunction(db.GetTaskDataFunc("decompress", conf.Conf.Tasks.Decompress.TaskPersistant), db.UpdateTaskDataFunc("decompress", conf.Conf.Tasks.Decompress.TaskPersistant)), tache.WithMaxRetry(conf.Conf.Tasks.Decompress.MaxRetry)) op.RegisterSettingChangingCallback(func() { fs.ArchiveDownloadTaskManager.SetWorkersNumActive(taskFilterNegative(setting.GetInt(conf.TaskDecompressDownloadThreadsNum, conf.Conf.Tasks.Decompress.Workers))) diff --git a/internal/conf/config.go b/internal/conf/config.go index cdb86fee..cf7cde0b 100644 --- a/internal/conf/config.go +++ b/internal/conf/config.go @@ -60,6 +60,7 @@ type TasksConfig struct { Copy TaskConfig `json:"copy" envPrefix:"COPY_"` Decompress TaskConfig `json:"decompress" envPrefix:"DECOMPRESS_"` DecompressUpload TaskConfig `json:"decompress_upload" envPrefix:"DECOMPRESS_UPLOAD_"` + S3Transition TaskConfig `json:"s3_transition" envPrefix:"S3_TRANSITION_"` AllowRetryCanceled bool `json:"allow_retry_canceled" env:"ALLOW_RETRY_CANCELED"` } @@ -184,6 +185,11 @@ func DefaultConfig() *Config { Workers: 5, MaxRetry: 2, }, + S3Transition: TaskConfig{ + Workers: 5, + MaxRetry: 2, + // TaskPersistant: true, + }, AllowRetryCanceled: false, }, Cors: Cors{ diff --git a/internal/fs/other.go b/internal/fs/other.go index 85b7b1d1..14f8f63d 100644 --- a/internal/fs/other.go +++ b/internal/fs/other.go @@ -2,10 +2,15 @@ package fs import ( "context" + "encoding/json" + stdpath "path" + "strings" + "github.com/alist-org/alist/v3/drivers/s3" "github.com/alist-org/alist/v3/internal/errs" "github.com/alist-org/alist/v3/internal/model" "github.com/alist-org/alist/v3/internal/op" + "github.com/alist-org/alist/v3/internal/task" "github.com/pkg/errors" ) @@ -53,6 +58,38 @@ func other(ctx context.Context, args model.FsOtherArgs) (interface{}, error) { if err != nil { return nil, errors.WithMessage(err, "failed get storage") } + originalPath := args.Path + + if _, ok := storage.(*s3.S3); ok { + method := strings.ToLower(strings.TrimSpace(args.Method)) + if method == s3.OtherMethodArchive || method == s3.OtherMethodThaw { + if S3TransitionTaskManager == nil { + return nil, errors.New("s3 transition task manager is not initialized") + } + var payload json.RawMessage + if args.Data != nil { + raw, err := json.Marshal(args.Data) + if err != nil { + return nil, errors.WithMessage(err, "failed to encode request payload") + } + payload = raw + } + taskCreator, _ := ctx.Value("user").(*model.User) + tsk := &S3TransitionTask{ + TaskExtension: task.TaskExtension{Creator: taskCreator}, + status: "queued", + StorageMountPath: storage.GetStorage().MountPath, + ObjectPath: actualPath, + DisplayPath: originalPath, + ObjectName: stdpath.Base(actualPath), + Transition: method, + Payload: payload, + } + S3TransitionTaskManager.Add(tsk) + return map[string]string{"task_id": tsk.GetID()}, nil + } + } + args.Path = actualPath return op.Other(ctx, storage, args) } diff --git a/internal/fs/s3_transition.go b/internal/fs/s3_transition.go new file mode 100644 index 00000000..395f3c5b --- /dev/null +++ b/internal/fs/s3_transition.go @@ -0,0 +1,310 @@ +package fs + +import ( + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/alist-org/alist/v3/drivers/s3" + "github.com/alist-org/alist/v3/internal/driver" + "github.com/alist-org/alist/v3/internal/model" + "github.com/alist-org/alist/v3/internal/op" + "github.com/alist-org/alist/v3/internal/task" + "github.com/pkg/errors" + "github.com/xhofe/tache" +) + +const s3TransitionPollInterval = 15 * time.Second + +// S3TransitionTask represents an asynchronous S3 archive/thaw request that is +// tracked via the task manager so that clients can monitor the progress of the +// operation. +type S3TransitionTask struct { + task.TaskExtension + status string + + StorageMountPath string `json:"storage_mount_path"` + ObjectPath string `json:"object_path"` + DisplayPath string `json:"display_path"` + ObjectName string `json:"object_name"` + Transition string `json:"transition"` + Payload json.RawMessage `json:"payload,omitempty"` + + TargetStorageClass string `json:"target_storage_class,omitempty"` + RequestID string `json:"request_id,omitempty"` + VersionID string `json:"version_id,omitempty"` + + storage driver.Driver `json:"-"` +} + +// S3TransitionTaskManager holds asynchronous S3 archive/thaw tasks. +var S3TransitionTaskManager *tache.Manager[*S3TransitionTask] + +var _ task.TaskExtensionInfo = (*S3TransitionTask)(nil) + +func (t *S3TransitionTask) GetName() string { + action := strings.ToLower(t.Transition) + if action == "" { + action = "transition" + } + display := t.DisplayPath + if display == "" { + display = t.ObjectPath + } + if display == "" { + display = t.ObjectName + } + return fmt.Sprintf("s3 %s %s", action, display) +} + +func (t *S3TransitionTask) GetStatus() string { + return t.status +} + +func (t *S3TransitionTask) Run() error { + t.ReinitCtx() + t.ClearEndTime() + start := time.Now() + t.SetStartTime(start) + defer func() { t.SetEndTime(time.Now()) }() + + if err := t.ensureStorage(); err != nil { + t.status = fmt.Sprintf("locate storage failed: %v", err) + return err + } + + payload, err := t.decodePayload() + if err != nil { + t.status = fmt.Sprintf("decode payload failed: %v", err) + return err + } + + method := strings.ToLower(strings.TrimSpace(t.Transition)) + switch method { + case s3.OtherMethodArchive: + t.status = "submitting archive request" + t.SetProgress(0) + resp, err := op.Other(t.Ctx(), t.storage, model.FsOtherArgs{ + Path: t.ObjectPath, + Method: s3.OtherMethodArchive, + Data: payload, + }) + if err != nil { + t.status = fmt.Sprintf("archive request failed: %v", err) + return err + } + archiveResp, ok := toArchiveResponse(resp) + if ok { + if t.TargetStorageClass == "" { + t.TargetStorageClass = archiveResp.StorageClass + } + t.RequestID = archiveResp.RequestID + t.VersionID = archiveResp.VersionID + if archiveResp.StorageClass != "" { + t.status = fmt.Sprintf("archive requested, waiting for %s", archiveResp.StorageClass) + } else { + t.status = "archive requested" + } + } else if sc := t.extractTargetStorageClass(); sc != "" { + t.TargetStorageClass = sc + t.status = fmt.Sprintf("archive requested, waiting for %s", sc) + } else { + t.status = "archive requested" + } + if t.TargetStorageClass != "" { + t.TargetStorageClass = s3.NormalizeStorageClass(t.TargetStorageClass) + } + t.SetProgress(25) + return t.waitForArchive() + case s3.OtherMethodThaw: + t.status = "submitting thaw request" + t.SetProgress(0) + resp, err := op.Other(t.Ctx(), t.storage, model.FsOtherArgs{ + Path: t.ObjectPath, + Method: s3.OtherMethodThaw, + Data: payload, + }) + if err != nil { + t.status = fmt.Sprintf("thaw request failed: %v", err) + return err + } + thawResp, ok := toThawResponse(resp) + if ok { + t.RequestID = thawResp.RequestID + if thawResp.Status != nil && !thawResp.Status.Ongoing { + t.SetProgress(100) + t.status = thawCompletionMessage(thawResp.Status) + return nil + } + } + t.status = "thaw requested" + t.SetProgress(25) + return t.waitForThaw() + default: + return errors.Errorf("unsupported transition method: %s", t.Transition) + } +} + +func (t *S3TransitionTask) ensureStorage() error { + if t.storage != nil { + return nil + } + storage, err := op.GetStorageByMountPath(t.StorageMountPath) + if err != nil { + return err + } + t.storage = storage + return nil +} + +func (t *S3TransitionTask) decodePayload() (interface{}, error) { + if len(t.Payload) == 0 { + return nil, nil + } + var payload interface{} + if err := json.Unmarshal(t.Payload, &payload); err != nil { + return nil, err + } + return payload, nil +} + +func (t *S3TransitionTask) extractTargetStorageClass() string { + if len(t.Payload) == 0 { + return "" + } + var req s3.ArchiveRequest + if err := json.Unmarshal(t.Payload, &req); err != nil { + return "" + } + return s3.NormalizeStorageClass(req.StorageClass) +} + +func (t *S3TransitionTask) waitForArchive() error { + ticker := time.NewTicker(s3TransitionPollInterval) + defer ticker.Stop() + + ctx := t.Ctx() + for { + select { + case <-ctx.Done(): + t.status = "archive canceled" + return ctx.Err() + case <-ticker.C: + resp, err := op.Other(ctx, t.storage, model.FsOtherArgs{ + Path: t.ObjectPath, + Method: s3.OtherMethodArchiveStatus, + }) + if err != nil { + t.status = fmt.Sprintf("archive status error: %v", err) + return err + } + archiveResp, ok := toArchiveResponse(resp) + if !ok { + t.status = fmt.Sprintf("unexpected archive status response: %T", resp) + return errors.Errorf("unexpected archive status response: %T", resp) + } + currentClass := strings.TrimSpace(archiveResp.StorageClass) + target := strings.TrimSpace(t.TargetStorageClass) + if target == "" { + target = currentClass + t.TargetStorageClass = currentClass + } + if currentClass == "" { + t.status = "waiting for storage class update" + t.SetProgress(50) + continue + } + if strings.EqualFold(currentClass, target) { + t.SetProgress(100) + t.status = fmt.Sprintf("archive complete (%s)", currentClass) + return nil + } + t.status = fmt.Sprintf("storage class %s (target %s)", currentClass, target) + t.SetProgress(75) + } + } +} + +func (t *S3TransitionTask) waitForThaw() error { + ticker := time.NewTicker(s3TransitionPollInterval) + defer ticker.Stop() + + ctx := t.Ctx() + for { + select { + case <-ctx.Done(): + t.status = "thaw canceled" + return ctx.Err() + case <-ticker.C: + resp, err := op.Other(ctx, t.storage, model.FsOtherArgs{ + Path: t.ObjectPath, + Method: s3.OtherMethodThawStatus, + }) + if err != nil { + t.status = fmt.Sprintf("thaw status error: %v", err) + return err + } + thawResp, ok := toThawResponse(resp) + if !ok { + t.status = fmt.Sprintf("unexpected thaw status response: %T", resp) + return errors.Errorf("unexpected thaw status response: %T", resp) + } + status := thawResp.Status + if status == nil { + t.status = "waiting for thaw status" + t.SetProgress(50) + continue + } + if status.Ongoing { + t.status = fmt.Sprintf("thaw in progress (%s)", status.Raw) + t.SetProgress(75) + continue + } + t.SetProgress(100) + t.status = thawCompletionMessage(status) + return nil + } + } +} + +func thawCompletionMessage(status *s3.RestoreStatus) string { + if status == nil { + return "thaw complete" + } + if status.Expiry != "" { + return fmt.Sprintf("thaw complete, expires %s", status.Expiry) + } + return "thaw complete" +} + +func toArchiveResponse(v interface{}) (s3.ArchiveResponse, bool) { + switch resp := v.(type) { + case s3.ArchiveResponse: + return resp, true + case *s3.ArchiveResponse: + if resp != nil { + return *resp, true + } + } + return s3.ArchiveResponse{}, false +} + +func toThawResponse(v interface{}) (s3.ThawResponse, bool) { + switch resp := v.(type) { + case s3.ThawResponse: + return resp, true + case *s3.ThawResponse: + if resp != nil { + return *resp, true + } + } + return s3.ThawResponse{}, false +} + +// Ensure compatibility with persistence when tasks are restored. +func (t *S3TransitionTask) OnRestore() { + // The storage handle is not persisted intentionally; it will be lazily + // re-fetched on the next Run invocation. + t.storage = nil +} diff --git a/server/handles/task.go b/server/handles/task.go index 6d49f9e5..a4dbce0f 100644 --- a/server/handles/task.go +++ b/server/handles/task.go @@ -220,6 +220,7 @@ func SetupTaskRoute(g *gin.RouterGroup) { taskRoute(g.Group("/copy"), fs.CopyTaskManager) taskRoute(g.Group("/offline_download"), tool.DownloadTaskManager) taskRoute(g.Group("/offline_download_transfer"), tool.TransferTaskManager) + taskRoute(g.Group("/s3_transition"), fs.S3TransitionTaskManager) taskRoute(g.Group("/decompress"), fs.ArchiveDownloadTaskManager) taskRoute(g.Group("/decompress_upload"), fs.ArchiveContentUploadTaskManager) }