feat(s3): Add S3 object archive and thaw task management

This commit introduces comprehensive support for S3 object archive and thaw operations, managed asynchronously through a new task system.

- **S3 Transition Task System**:
  - Adds a new `S3Transition` task configuration, including workers, max retries, and persistence options.
  - Initializes `S3TransitionTaskManager` to handle asynchronous S3 archive/thaw requests.
  - Registers dedicated API routes for monitoring S3 transition tasks.

- **Integrate S3 Archive/Thaw with Other API**:
  - Modifies the `Other` API handler to intercept `archive` and `thaw` methods for S3 storage drivers.
  - Dispatches these operations as `S3TransitionTask` instances to the task manager for background processing.
  - Returns a task ID to the client for tracking the status of the dispatched operation.

- **Refactor `other` package for improved API consistency**:
  - Exports previously internal structs such as `archiveRequest`, `thawRequest`, `objectDescriptor`, `archiveResponse`, `thawResponse`, and `restoreStatus` by making their names public.
  - Makes helper functions like `decodeOtherArgs`, `normalizeStorageClass`, and `normalizeRestoreTier` public.
  - Introduces new constants for various S3 `Other` API methods.
This commit is contained in:
okatu-loli 2025-10-15 17:56:04 +08:00
parent 0add69c201
commit f1e43cf7d2
6 changed files with 400 additions and 27 deletions

View File

@ -14,24 +14,31 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
)
type archiveRequest struct {
const (
OtherMethodArchive = "archive"
OtherMethodArchiveStatus = "archive_status"
OtherMethodThaw = "thaw"
OtherMethodThawStatus = "thaw_status"
)
type ArchiveRequest struct {
StorageClass string `json:"storage_class"`
}
type thawRequest struct {
type ThawRequest struct {
Days int64 `json:"days"`
Tier string `json:"tier"`
}
type objectDescriptor struct {
type ObjectDescriptor struct {
Path string `json:"path"`
Bucket string `json:"bucket"`
Key string `json:"key"`
}
type archiveResponse struct {
type ArchiveResponse struct {
Action string `json:"action"`
Object objectDescriptor `json:"object"`
Object ObjectDescriptor `json:"object"`
StorageClass string `json:"storage_class"`
RequestID string `json:"request_id,omitempty"`
VersionID string `json:"version_id,omitempty"`
@ -39,14 +46,14 @@ type archiveResponse struct {
LastModified string `json:"last_modified,omitempty"`
}
type thawResponse struct {
type ThawResponse struct {
Action string `json:"action"`
Object objectDescriptor `json:"object"`
Object ObjectDescriptor `json:"object"`
RequestID string `json:"request_id,omitempty"`
Status *restoreStatus `json:"status,omitempty"`
Status *RestoreStatus `json:"status,omitempty"`
}
type restoreStatus struct {
type RestoreStatus struct {
Ongoing bool `json:"ongoing"`
Expiry string `json:"expiry,omitempty"`
Raw string `json:"raw"`
@ -76,14 +83,14 @@ func (d *S3) Other(ctx context.Context, args model.OtherArgs) (interface{}, erro
func (d *S3) archive(ctx context.Context, args model.OtherArgs) (interface{}, error) {
key := getKey(args.Obj.GetPath(), false)
payload := archiveRequest{}
if err := decodeOtherArgs(args.Data, &payload); err != nil {
payload := ArchiveRequest{}
if err := DecodeOtherArgs(args.Data, &payload); err != nil {
return nil, fmt.Errorf("parse archive request: %w", err)
}
if payload.StorageClass == "" {
return nil, fmt.Errorf("storage_class is required")
}
storageClass := normalizeStorageClass(payload.StorageClass)
storageClass := NormalizeStorageClass(payload.StorageClass)
input := &s3.CopyObjectInput{
Bucket: &d.Bucket,
Key: &key,
@ -97,7 +104,7 @@ func (d *S3) archive(ctx context.Context, args model.OtherArgs) (interface{}, er
return nil, err
}
resp := archiveResponse{
resp := ArchiveResponse{
Action: "archive",
Object: d.describeObject(args.Obj, key),
StorageClass: storageClass,
@ -126,7 +133,7 @@ func (d *S3) archiveStatus(ctx context.Context, args model.OtherArgs) (interface
if err != nil {
return nil, err
}
return archiveResponse{
return ArchiveResponse{
Action: "archive_status",
Object: d.describeObject(args.Obj, key),
StorageClass: status.StorageClass,
@ -135,8 +142,8 @@ func (d *S3) archiveStatus(ctx context.Context, args model.OtherArgs) (interface
func (d *S3) thaw(ctx context.Context, args model.OtherArgs) (interface{}, error) {
key := getKey(args.Obj.GetPath(), false)
payload := thawRequest{Days: 1}
if err := decodeOtherArgs(args.Data, &payload); err != nil {
payload := ThawRequest{Days: 1}
if err := DecodeOtherArgs(args.Data, &payload); err != nil {
return nil, fmt.Errorf("parse thaw request: %w", err)
}
if payload.Days <= 0 {
@ -145,7 +152,7 @@ func (d *S3) thaw(ctx context.Context, args model.OtherArgs) (interface{}, error
restoreRequest := &s3.RestoreRequest{
Days: aws.Int64(payload.Days),
}
if tier := normalizeRestoreTier(payload.Tier); tier != "" {
if tier := NormalizeRestoreTier(payload.Tier); tier != "" {
restoreRequest.GlacierJobParameters = &s3.GlacierJobParameters{Tier: aws.String(tier)}
}
input := &s3.RestoreObjectInput{
@ -159,7 +166,7 @@ func (d *S3) thaw(ctx context.Context, args model.OtherArgs) (interface{}, error
return nil, err
}
status, _ := d.describeObjectStatus(ctx, key)
resp := thawResponse{
resp := ThawResponse{
Action: "thaw",
Object: d.describeObject(args.Obj, key),
RequestID: restoreReq.RequestID,
@ -176,15 +183,15 @@ func (d *S3) thawStatus(ctx context.Context, args model.OtherArgs) (interface{},
if err != nil {
return nil, err
}
return thawResponse{
return ThawResponse{
Action: "thaw_status",
Object: d.describeObject(args.Obj, key),
Status: status.Restore,
}, nil
}
func (d *S3) describeObject(obj model.Obj, key string) objectDescriptor {
return objectDescriptor{
func (d *S3) describeObject(obj model.Obj, key string) ObjectDescriptor {
return ObjectDescriptor{
Path: obj.GetPath(),
Bucket: d.Bucket,
Key: key,
@ -193,7 +200,7 @@ func (d *S3) describeObject(obj model.Obj, key string) objectDescriptor {
type objectStatus struct {
StorageClass string
Restore *restoreStatus
Restore *RestoreStatus
}
func (d *S3) describeObjectStatus(ctx context.Context, key string) (*objectStatus, error) {
@ -208,7 +215,7 @@ func (d *S3) describeObjectStatus(ctx context.Context, key string) (*objectStatu
return status, nil
}
func parseRestoreHeader(header *string) *restoreStatus {
func parseRestoreHeader(header *string) *RestoreStatus {
if header == nil {
return nil
}
@ -216,7 +223,7 @@ func parseRestoreHeader(header *string) *restoreStatus {
if value == "" {
return nil
}
status := &restoreStatus{Raw: value}
status := &RestoreStatus{Raw: value}
parts := strings.Split(value, ",")
for _, part := range parts {
part = strings.TrimSpace(part)
@ -240,7 +247,7 @@ func parseRestoreHeader(header *string) *restoreStatus {
return status
}
func decodeOtherArgs(data interface{}, target interface{}) error {
func DecodeOtherArgs(data interface{}, target interface{}) error {
if data == nil {
return nil
}
@ -251,7 +258,7 @@ func decodeOtherArgs(data interface{}, target interface{}) error {
return json.Unmarshal(raw, target)
}
func normalizeStorageClass(value string) string {
func NormalizeStorageClass(value string) string {
normalized := strings.ToLower(strings.TrimSpace(strings.ReplaceAll(value, "-", "_")))
if normalized == "" {
return value
@ -262,7 +269,7 @@ func normalizeStorageClass(value string) string {
return value
}
func normalizeRestoreTier(value string) string {
func NormalizeRestoreTier(value string) string {
normalized := strings.ToLower(strings.TrimSpace(value))
switch normalized {
case "", "default":

View File

@ -37,6 +37,18 @@ func InitTaskManager() {
if len(tool.TransferTaskManager.GetAll()) == 0 { //prevent offline downloaded files from being deleted
CleanTempDir()
}
workers := conf.Conf.Tasks.S3Transition.Workers
if workers < 0 {
workers = 0
}
fs.S3TransitionTaskManager = tache.NewManager[*fs.S3TransitionTask](
tache.WithWorks(workers),
tache.WithPersistFunction(
db.GetTaskDataFunc("s3_transition", conf.Conf.Tasks.S3Transition.TaskPersistant),
db.UpdateTaskDataFunc("s3_transition", conf.Conf.Tasks.S3Transition.TaskPersistant),
),
tache.WithMaxRetry(conf.Conf.Tasks.S3Transition.MaxRetry),
)
fs.ArchiveDownloadTaskManager = tache.NewManager[*fs.ArchiveDownloadTask](tache.WithWorks(setting.GetInt(conf.TaskDecompressDownloadThreadsNum, conf.Conf.Tasks.Decompress.Workers)), tache.WithPersistFunction(db.GetTaskDataFunc("decompress", conf.Conf.Tasks.Decompress.TaskPersistant), db.UpdateTaskDataFunc("decompress", conf.Conf.Tasks.Decompress.TaskPersistant)), tache.WithMaxRetry(conf.Conf.Tasks.Decompress.MaxRetry))
op.RegisterSettingChangingCallback(func() {
fs.ArchiveDownloadTaskManager.SetWorkersNumActive(taskFilterNegative(setting.GetInt(conf.TaskDecompressDownloadThreadsNum, conf.Conf.Tasks.Decompress.Workers)))

View File

@ -60,6 +60,7 @@ type TasksConfig struct {
Copy TaskConfig `json:"copy" envPrefix:"COPY_"`
Decompress TaskConfig `json:"decompress" envPrefix:"DECOMPRESS_"`
DecompressUpload TaskConfig `json:"decompress_upload" envPrefix:"DECOMPRESS_UPLOAD_"`
S3Transition TaskConfig `json:"s3_transition" envPrefix:"S3_TRANSITION_"`
AllowRetryCanceled bool `json:"allow_retry_canceled" env:"ALLOW_RETRY_CANCELED"`
}
@ -184,6 +185,11 @@ func DefaultConfig() *Config {
Workers: 5,
MaxRetry: 2,
},
S3Transition: TaskConfig{
Workers: 5,
MaxRetry: 2,
// TaskPersistant: true,
},
AllowRetryCanceled: false,
},
Cors: Cors{

View File

@ -2,10 +2,15 @@ package fs
import (
"context"
"encoding/json"
stdpath "path"
"strings"
"github.com/alist-org/alist/v3/drivers/s3"
"github.com/alist-org/alist/v3/internal/errs"
"github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/internal/op"
"github.com/alist-org/alist/v3/internal/task"
"github.com/pkg/errors"
)
@ -53,6 +58,38 @@ func other(ctx context.Context, args model.FsOtherArgs) (interface{}, error) {
if err != nil {
return nil, errors.WithMessage(err, "failed get storage")
}
originalPath := args.Path
if _, ok := storage.(*s3.S3); ok {
method := strings.ToLower(strings.TrimSpace(args.Method))
if method == s3.OtherMethodArchive || method == s3.OtherMethodThaw {
if S3TransitionTaskManager == nil {
return nil, errors.New("s3 transition task manager is not initialized")
}
var payload json.RawMessage
if args.Data != nil {
raw, err := json.Marshal(args.Data)
if err != nil {
return nil, errors.WithMessage(err, "failed to encode request payload")
}
payload = raw
}
taskCreator, _ := ctx.Value("user").(*model.User)
tsk := &S3TransitionTask{
TaskExtension: task.TaskExtension{Creator: taskCreator},
status: "queued",
StorageMountPath: storage.GetStorage().MountPath,
ObjectPath: actualPath,
DisplayPath: originalPath,
ObjectName: stdpath.Base(actualPath),
Transition: method,
Payload: payload,
}
S3TransitionTaskManager.Add(tsk)
return map[string]string{"task_id": tsk.GetID()}, nil
}
}
args.Path = actualPath
return op.Other(ctx, storage, args)
}

View File

@ -0,0 +1,310 @@
package fs
import (
"encoding/json"
"fmt"
"strings"
"time"
"github.com/alist-org/alist/v3/drivers/s3"
"github.com/alist-org/alist/v3/internal/driver"
"github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/internal/op"
"github.com/alist-org/alist/v3/internal/task"
"github.com/pkg/errors"
"github.com/xhofe/tache"
)
const s3TransitionPollInterval = 15 * time.Second
// S3TransitionTask represents an asynchronous S3 archive/thaw request that is
// tracked via the task manager so that clients can monitor the progress of the
// operation.
type S3TransitionTask struct {
task.TaskExtension
status string
StorageMountPath string `json:"storage_mount_path"`
ObjectPath string `json:"object_path"`
DisplayPath string `json:"display_path"`
ObjectName string `json:"object_name"`
Transition string `json:"transition"`
Payload json.RawMessage `json:"payload,omitempty"`
TargetStorageClass string `json:"target_storage_class,omitempty"`
RequestID string `json:"request_id,omitempty"`
VersionID string `json:"version_id,omitempty"`
storage driver.Driver `json:"-"`
}
// S3TransitionTaskManager holds asynchronous S3 archive/thaw tasks.
var S3TransitionTaskManager *tache.Manager[*S3TransitionTask]
var _ task.TaskExtensionInfo = (*S3TransitionTask)(nil)
func (t *S3TransitionTask) GetName() string {
action := strings.ToLower(t.Transition)
if action == "" {
action = "transition"
}
display := t.DisplayPath
if display == "" {
display = t.ObjectPath
}
if display == "" {
display = t.ObjectName
}
return fmt.Sprintf("s3 %s %s", action, display)
}
func (t *S3TransitionTask) GetStatus() string {
return t.status
}
func (t *S3TransitionTask) Run() error {
t.ReinitCtx()
t.ClearEndTime()
start := time.Now()
t.SetStartTime(start)
defer func() { t.SetEndTime(time.Now()) }()
if err := t.ensureStorage(); err != nil {
t.status = fmt.Sprintf("locate storage failed: %v", err)
return err
}
payload, err := t.decodePayload()
if err != nil {
t.status = fmt.Sprintf("decode payload failed: %v", err)
return err
}
method := strings.ToLower(strings.TrimSpace(t.Transition))
switch method {
case s3.OtherMethodArchive:
t.status = "submitting archive request"
t.SetProgress(0)
resp, err := op.Other(t.Ctx(), t.storage, model.FsOtherArgs{
Path: t.ObjectPath,
Method: s3.OtherMethodArchive,
Data: payload,
})
if err != nil {
t.status = fmt.Sprintf("archive request failed: %v", err)
return err
}
archiveResp, ok := toArchiveResponse(resp)
if ok {
if t.TargetStorageClass == "" {
t.TargetStorageClass = archiveResp.StorageClass
}
t.RequestID = archiveResp.RequestID
t.VersionID = archiveResp.VersionID
if archiveResp.StorageClass != "" {
t.status = fmt.Sprintf("archive requested, waiting for %s", archiveResp.StorageClass)
} else {
t.status = "archive requested"
}
} else if sc := t.extractTargetStorageClass(); sc != "" {
t.TargetStorageClass = sc
t.status = fmt.Sprintf("archive requested, waiting for %s", sc)
} else {
t.status = "archive requested"
}
if t.TargetStorageClass != "" {
t.TargetStorageClass = s3.NormalizeStorageClass(t.TargetStorageClass)
}
t.SetProgress(25)
return t.waitForArchive()
case s3.OtherMethodThaw:
t.status = "submitting thaw request"
t.SetProgress(0)
resp, err := op.Other(t.Ctx(), t.storage, model.FsOtherArgs{
Path: t.ObjectPath,
Method: s3.OtherMethodThaw,
Data: payload,
})
if err != nil {
t.status = fmt.Sprintf("thaw request failed: %v", err)
return err
}
thawResp, ok := toThawResponse(resp)
if ok {
t.RequestID = thawResp.RequestID
if thawResp.Status != nil && !thawResp.Status.Ongoing {
t.SetProgress(100)
t.status = thawCompletionMessage(thawResp.Status)
return nil
}
}
t.status = "thaw requested"
t.SetProgress(25)
return t.waitForThaw()
default:
return errors.Errorf("unsupported transition method: %s", t.Transition)
}
}
func (t *S3TransitionTask) ensureStorage() error {
if t.storage != nil {
return nil
}
storage, err := op.GetStorageByMountPath(t.StorageMountPath)
if err != nil {
return err
}
t.storage = storage
return nil
}
func (t *S3TransitionTask) decodePayload() (interface{}, error) {
if len(t.Payload) == 0 {
return nil, nil
}
var payload interface{}
if err := json.Unmarshal(t.Payload, &payload); err != nil {
return nil, err
}
return payload, nil
}
func (t *S3TransitionTask) extractTargetStorageClass() string {
if len(t.Payload) == 0 {
return ""
}
var req s3.ArchiveRequest
if err := json.Unmarshal(t.Payload, &req); err != nil {
return ""
}
return s3.NormalizeStorageClass(req.StorageClass)
}
func (t *S3TransitionTask) waitForArchive() error {
ticker := time.NewTicker(s3TransitionPollInterval)
defer ticker.Stop()
ctx := t.Ctx()
for {
select {
case <-ctx.Done():
t.status = "archive canceled"
return ctx.Err()
case <-ticker.C:
resp, err := op.Other(ctx, t.storage, model.FsOtherArgs{
Path: t.ObjectPath,
Method: s3.OtherMethodArchiveStatus,
})
if err != nil {
t.status = fmt.Sprintf("archive status error: %v", err)
return err
}
archiveResp, ok := toArchiveResponse(resp)
if !ok {
t.status = fmt.Sprintf("unexpected archive status response: %T", resp)
return errors.Errorf("unexpected archive status response: %T", resp)
}
currentClass := strings.TrimSpace(archiveResp.StorageClass)
target := strings.TrimSpace(t.TargetStorageClass)
if target == "" {
target = currentClass
t.TargetStorageClass = currentClass
}
if currentClass == "" {
t.status = "waiting for storage class update"
t.SetProgress(50)
continue
}
if strings.EqualFold(currentClass, target) {
t.SetProgress(100)
t.status = fmt.Sprintf("archive complete (%s)", currentClass)
return nil
}
t.status = fmt.Sprintf("storage class %s (target %s)", currentClass, target)
t.SetProgress(75)
}
}
}
func (t *S3TransitionTask) waitForThaw() error {
ticker := time.NewTicker(s3TransitionPollInterval)
defer ticker.Stop()
ctx := t.Ctx()
for {
select {
case <-ctx.Done():
t.status = "thaw canceled"
return ctx.Err()
case <-ticker.C:
resp, err := op.Other(ctx, t.storage, model.FsOtherArgs{
Path: t.ObjectPath,
Method: s3.OtherMethodThawStatus,
})
if err != nil {
t.status = fmt.Sprintf("thaw status error: %v", err)
return err
}
thawResp, ok := toThawResponse(resp)
if !ok {
t.status = fmt.Sprintf("unexpected thaw status response: %T", resp)
return errors.Errorf("unexpected thaw status response: %T", resp)
}
status := thawResp.Status
if status == nil {
t.status = "waiting for thaw status"
t.SetProgress(50)
continue
}
if status.Ongoing {
t.status = fmt.Sprintf("thaw in progress (%s)", status.Raw)
t.SetProgress(75)
continue
}
t.SetProgress(100)
t.status = thawCompletionMessage(status)
return nil
}
}
}
func thawCompletionMessage(status *s3.RestoreStatus) string {
if status == nil {
return "thaw complete"
}
if status.Expiry != "" {
return fmt.Sprintf("thaw complete, expires %s", status.Expiry)
}
return "thaw complete"
}
func toArchiveResponse(v interface{}) (s3.ArchiveResponse, bool) {
switch resp := v.(type) {
case s3.ArchiveResponse:
return resp, true
case *s3.ArchiveResponse:
if resp != nil {
return *resp, true
}
}
return s3.ArchiveResponse{}, false
}
func toThawResponse(v interface{}) (s3.ThawResponse, bool) {
switch resp := v.(type) {
case s3.ThawResponse:
return resp, true
case *s3.ThawResponse:
if resp != nil {
return *resp, true
}
}
return s3.ThawResponse{}, false
}
// Ensure compatibility with persistence when tasks are restored.
func (t *S3TransitionTask) OnRestore() {
// The storage handle is not persisted intentionally; it will be lazily
// re-fetched on the next Run invocation.
t.storage = nil
}

View File

@ -220,6 +220,7 @@ func SetupTaskRoute(g *gin.RouterGroup) {
taskRoute(g.Group("/copy"), fs.CopyTaskManager)
taskRoute(g.Group("/offline_download"), tool.DownloadTaskManager)
taskRoute(g.Group("/offline_download_transfer"), tool.TransferTaskManager)
taskRoute(g.Group("/s3_transition"), fs.S3TransitionTaskManager)
taskRoute(g.Group("/decompress"), fs.ArchiveDownloadTaskManager)
taskRoute(g.Group("/decompress_upload"), fs.ArchiveContentUploadTaskManager)
}