From 6ba3dc9725eb768bcba7b51976a8250f35dc7287 Mon Sep 17 00:00:00 2001 From: okatu-loli Date: Thu, 23 Oct 2025 17:09:26 +0800 Subject: [PATCH] feat(bitqiu-upload): Implement chunked file upload support - Implement multi-part chunked upload logic for the BitQiu service. - Introduce `UploadInitData` and `ChunkUploadResponse` structs for structured API communication. - Refactor the `Save` method to orchestrate initial upload, chunked data transfer, and finalization. - Add `uploadFileInChunks` function to handle sequential uploading of file parts. - Add `completeChunkUpload` function to finalize the chunked upload process on the server. - Ensure proper temporary file cleanup using `defer tmpFile.Close()`. --- drivers/bitqiu/driver.go | 129 +++++++++++++++++++++++++++++++++++++-- drivers/bitqiu/types.go | 20 ++++++ 2 files changed, 143 insertions(+), 6 deletions(-) diff --git a/drivers/bitqiu/driver.go b/drivers/bitqiu/driver.go index d77a2726..0463d8a1 100644 --- a/drivers/bitqiu/driver.go +++ b/drivers/bitqiu/driver.go @@ -2,7 +2,9 @@ package bitqiu import ( "context" + "encoding/json" "fmt" + "io" "net/http/cookiejar" "path" "strconv" @@ -24,6 +26,7 @@ const ( loginURL = baseURL + "/loginServer/login" listURL = baseURL + "/apiToken/cfi/fs/resources/pages" uploadInitializeURL = baseURL + "/apiToken/cfi/fs/upload/v2/initialize" + uploadCompleteURL = baseURL + "/apiToken/cfi/fs/upload/v2/complete" downloadURL = baseURL + "/download/getUrl" createDirURL = baseURL + "/resource/create" moveResourceURL = baseURL + "/resource/remove" @@ -41,6 +44,7 @@ const ( const ( copyPollInterval = time.Second copyPollMaxAttempts = 60 + chunkSize = int64(1 << 20) ) type BitQiu struct { @@ -408,12 +412,17 @@ func (d *BitQiu) Put(ctx context.Context, dstDir model.Obj, file model.FileStrea } up(0) - _, md5sum, err := streamPkg.CacheFullInTempFileAndHash(file, utils.MD5) + tmpFile, md5sum, err := streamPkg.CacheFullInTempFileAndHash(file, utils.MD5) if err != nil { return nil, err } + defer tmpFile.Close() parentID := d.resolveParentID(dstDir) + parentPath := "" + if dstDir != nil { + parentPath = dstDir.GetPath() + } form := map[string]string{ "parentId": parentID, "name": file.GetName(), @@ -422,18 +431,37 @@ func (d *BitQiu) Put(ctx context.Context, dstDir model.Obj, file model.FileStrea "sampleMd5": md5sum, "org_channel": orgChannel, } - var resp Response[Resource] + var resp Response[json.RawMessage] if err = d.postForm(ctx, uploadInitializeURL, form, &resp); err != nil { return nil, err } if resp.Code != uploadSuccessCode { - if resp.Code == successCode { - return nil, fmt.Errorf("upload requires additional steps not implemented: %s", resp.Message) + switch resp.Code { + case successCode: + var initData UploadInitData + if err := json.Unmarshal(resp.Data, &initData); err != nil { + return nil, fmt.Errorf("parse upload init response failed: %w", err) + } + serverCode, err := d.uploadFileInChunks(ctx, tmpFile, file.GetSize(), md5sum, initData, up) + if err != nil { + return nil, err + } + obj, err := d.completeChunkUpload(ctx, initData, parentID, parentPath, file.GetName(), file.GetSize(), md5sum, serverCode) + if err != nil { + return nil, err + } + up(100) + return obj, nil + default: + return nil, fmt.Errorf("upload failed: %s", resp.Message) } - return nil, fmt.Errorf("upload failed: %s", resp.Message) } - obj, err := resp.Data.toObject(parentID, dstDir.GetPath()) + var resource Resource + if err := json.Unmarshal(resp.Data, &resource); err != nil { + return nil, fmt.Errorf("parse upload response failed: %w", err) + } + obj, err := resource.toObject(parentID, parentPath) if err != nil { return nil, err } @@ -441,6 +469,95 @@ func (d *BitQiu) Put(ctx context.Context, dstDir model.Obj, file model.FileStrea return obj, nil } +func (d *BitQiu) uploadFileInChunks(ctx context.Context, tmpFile model.File, size int64, md5sum string, initData UploadInitData, up driver.UpdateProgress) (string, error) { + if d.client == nil { + return "", fmt.Errorf("client not initialized") + } + if size <= 0 { + return "", fmt.Errorf("invalid file size") + } + buf := make([]byte, chunkSize) + offset := int64(0) + var finishedFlag string + + for offset < size { + chunkLen := chunkSize + remaining := size - offset + if remaining < chunkLen { + chunkLen = remaining + } + + reader := io.NewSectionReader(tmpFile, offset, chunkLen) + chunkBuf := buf[:chunkLen] + if _, err := io.ReadFull(reader, chunkBuf); err != nil { + return "", fmt.Errorf("read chunk failed: %w", err) + } + + headers := map[string]string{ + "accept": "*/*", + "content-type": "application/octet-stream", + "appid": initData.AppID, + "token": initData.Token, + "userid": strconv.FormatInt(initData.UserID, 10), + "serialnumber": initData.SerialNumber, + "hash": md5sum, + "len": strconv.FormatInt(chunkLen, 10), + "offset": strconv.FormatInt(offset, 10), + } + + var chunkResp ChunkUploadResponse + req := d.client.R(). + SetContext(ctx). + SetHeaders(headers). + SetBody(chunkBuf). + SetResult(&chunkResp) + + if _, err := req.Post(initData.UploadURL); err != nil { + return "", err + } + if chunkResp.ErrCode != 0 { + return "", fmt.Errorf("chunk upload failed with code %d", chunkResp.ErrCode) + } + finishedFlag = chunkResp.FinishedFlag + offset += chunkLen + up(float64(offset) * 100 / float64(size)) + } + + if finishedFlag == "" { + return "", fmt.Errorf("upload finished without server code") + } + return finishedFlag, nil +} + +func (d *BitQiu) completeChunkUpload(ctx context.Context, initData UploadInitData, parentID, parentPath, name string, size int64, md5sum, serverCode string) (model.Obj, error) { + form := map[string]string{ + "currentPage": "1", + "limit": "1", + "userId": strconv.FormatInt(initData.UserID, 10), + "status": "0", + "parentId": parentID, + "name": name, + "fileUid": initData.FileUID, + "fileSid": initData.FileSID, + "size": strconv.FormatInt(size, 10), + "serverCode": serverCode, + "snapTime": "", + "hash": md5sum, + "sampleMd5": md5sum, + "org_channel": orgChannel, + } + + var resp Response[Resource] + if err := d.postForm(ctx, uploadCompleteURL, form, &resp); err != nil { + return nil, err + } + if resp.Code != successCode { + return nil, fmt.Errorf("complete upload failed: %s", resp.Message) + } + + return resp.Data.toObject(parentID, parentPath) +} + func (d *BitQiu) login(ctx context.Context) error { if d.client == nil { return fmt.Errorf("client not initialized") diff --git a/drivers/bitqiu/types.go b/drivers/bitqiu/types.go index bc7b8cfa..4833c16f 100644 --- a/drivers/bitqiu/types.go +++ b/drivers/bitqiu/types.go @@ -81,3 +81,23 @@ func (t AsyncTask) ErrorMessage() string { } return "unknown error" } + +type UploadInitData struct { + Name string `json:"name"` + Size int64 `json:"size"` + Token string `json:"token"` + FileUID string `json:"fileUid"` + FileSID string `json:"fileSid"` + ParentID string `json:"parentId"` + UserID int64 `json:"userId"` + SerialNumber string `json:"serialNumber"` + UploadURL string `json:"uploadUrl"` + AppID string `json:"appId"` +} + +type ChunkUploadResponse struct { + ErrCode int `json:"errCode"` + Offset int64 `json:"offset"` + Finished int `json:"finished"` + FinishedFlag string `json:"finishedFlag"` +}