mirror of
https://github.com/AlistGo/alist.git
synced 2025-12-19 11:00:06 +08:00
feat(bitqiu-upload): Implement chunked file upload support
- Implement multi-part chunked upload logic for the BitQiu service. - Introduce `UploadInitData` and `ChunkUploadResponse` structs for structured API communication. - Refactor the `Save` method to orchestrate initial upload, chunked data transfer, and finalization. - Add `uploadFileInChunks` function to handle sequential uploading of file parts. - Add `completeChunkUpload` function to finalize the chunked upload process on the server. - Ensure proper temporary file cleanup using `defer tmpFile.Close()`.
This commit is contained in:
parent
6755e0e755
commit
6ba3dc9725
@ -2,7 +2,9 @@ package bitqiu
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"net/http/cookiejar"
|
"net/http/cookiejar"
|
||||||
"path"
|
"path"
|
||||||
"strconv"
|
"strconv"
|
||||||
@ -24,6 +26,7 @@ const (
|
|||||||
loginURL = baseURL + "/loginServer/login"
|
loginURL = baseURL + "/loginServer/login"
|
||||||
listURL = baseURL + "/apiToken/cfi/fs/resources/pages"
|
listURL = baseURL + "/apiToken/cfi/fs/resources/pages"
|
||||||
uploadInitializeURL = baseURL + "/apiToken/cfi/fs/upload/v2/initialize"
|
uploadInitializeURL = baseURL + "/apiToken/cfi/fs/upload/v2/initialize"
|
||||||
|
uploadCompleteURL = baseURL + "/apiToken/cfi/fs/upload/v2/complete"
|
||||||
downloadURL = baseURL + "/download/getUrl"
|
downloadURL = baseURL + "/download/getUrl"
|
||||||
createDirURL = baseURL + "/resource/create"
|
createDirURL = baseURL + "/resource/create"
|
||||||
moveResourceURL = baseURL + "/resource/remove"
|
moveResourceURL = baseURL + "/resource/remove"
|
||||||
@ -41,6 +44,7 @@ const (
|
|||||||
const (
|
const (
|
||||||
copyPollInterval = time.Second
|
copyPollInterval = time.Second
|
||||||
copyPollMaxAttempts = 60
|
copyPollMaxAttempts = 60
|
||||||
|
chunkSize = int64(1 << 20)
|
||||||
)
|
)
|
||||||
|
|
||||||
type BitQiu struct {
|
type BitQiu struct {
|
||||||
@ -408,12 +412,17 @@ func (d *BitQiu) Put(ctx context.Context, dstDir model.Obj, file model.FileStrea
|
|||||||
}
|
}
|
||||||
|
|
||||||
up(0)
|
up(0)
|
||||||
_, md5sum, err := streamPkg.CacheFullInTempFileAndHash(file, utils.MD5)
|
tmpFile, md5sum, err := streamPkg.CacheFullInTempFileAndHash(file, utils.MD5)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
defer tmpFile.Close()
|
||||||
|
|
||||||
parentID := d.resolveParentID(dstDir)
|
parentID := d.resolveParentID(dstDir)
|
||||||
|
parentPath := ""
|
||||||
|
if dstDir != nil {
|
||||||
|
parentPath = dstDir.GetPath()
|
||||||
|
}
|
||||||
form := map[string]string{
|
form := map[string]string{
|
||||||
"parentId": parentID,
|
"parentId": parentID,
|
||||||
"name": file.GetName(),
|
"name": file.GetName(),
|
||||||
@ -422,23 +431,131 @@ func (d *BitQiu) Put(ctx context.Context, dstDir model.Obj, file model.FileStrea
|
|||||||
"sampleMd5": md5sum,
|
"sampleMd5": md5sum,
|
||||||
"org_channel": orgChannel,
|
"org_channel": orgChannel,
|
||||||
}
|
}
|
||||||
var resp Response[Resource]
|
var resp Response[json.RawMessage]
|
||||||
if err = d.postForm(ctx, uploadInitializeURL, form, &resp); err != nil {
|
if err = d.postForm(ctx, uploadInitializeURL, form, &resp); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if resp.Code != uploadSuccessCode {
|
if resp.Code != uploadSuccessCode {
|
||||||
if resp.Code == successCode {
|
switch resp.Code {
|
||||||
return nil, fmt.Errorf("upload requires additional steps not implemented: %s", resp.Message)
|
case successCode:
|
||||||
|
var initData UploadInitData
|
||||||
|
if err := json.Unmarshal(resp.Data, &initData); err != nil {
|
||||||
|
return nil, fmt.Errorf("parse upload init response failed: %w", err)
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("upload failed: %s", resp.Message)
|
serverCode, err := d.uploadFileInChunks(ctx, tmpFile, file.GetSize(), md5sum, initData, up)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
obj, err := d.completeChunkUpload(ctx, initData, parentID, parentPath, file.GetName(), file.GetSize(), md5sum, serverCode)
|
||||||
obj, err := resp.Data.toObject(parentID, dstDir.GetPath())
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
up(100)
|
up(100)
|
||||||
return obj, nil
|
return obj, nil
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("upload failed: %s", resp.Message)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var resource Resource
|
||||||
|
if err := json.Unmarshal(resp.Data, &resource); err != nil {
|
||||||
|
return nil, fmt.Errorf("parse upload response failed: %w", err)
|
||||||
|
}
|
||||||
|
obj, err := resource.toObject(parentID, parentPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
up(100)
|
||||||
|
return obj, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *BitQiu) uploadFileInChunks(ctx context.Context, tmpFile model.File, size int64, md5sum string, initData UploadInitData, up driver.UpdateProgress) (string, error) {
|
||||||
|
if d.client == nil {
|
||||||
|
return "", fmt.Errorf("client not initialized")
|
||||||
|
}
|
||||||
|
if size <= 0 {
|
||||||
|
return "", fmt.Errorf("invalid file size")
|
||||||
|
}
|
||||||
|
buf := make([]byte, chunkSize)
|
||||||
|
offset := int64(0)
|
||||||
|
var finishedFlag string
|
||||||
|
|
||||||
|
for offset < size {
|
||||||
|
chunkLen := chunkSize
|
||||||
|
remaining := size - offset
|
||||||
|
if remaining < chunkLen {
|
||||||
|
chunkLen = remaining
|
||||||
|
}
|
||||||
|
|
||||||
|
reader := io.NewSectionReader(tmpFile, offset, chunkLen)
|
||||||
|
chunkBuf := buf[:chunkLen]
|
||||||
|
if _, err := io.ReadFull(reader, chunkBuf); err != nil {
|
||||||
|
return "", fmt.Errorf("read chunk failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
headers := map[string]string{
|
||||||
|
"accept": "*/*",
|
||||||
|
"content-type": "application/octet-stream",
|
||||||
|
"appid": initData.AppID,
|
||||||
|
"token": initData.Token,
|
||||||
|
"userid": strconv.FormatInt(initData.UserID, 10),
|
||||||
|
"serialnumber": initData.SerialNumber,
|
||||||
|
"hash": md5sum,
|
||||||
|
"len": strconv.FormatInt(chunkLen, 10),
|
||||||
|
"offset": strconv.FormatInt(offset, 10),
|
||||||
|
}
|
||||||
|
|
||||||
|
var chunkResp ChunkUploadResponse
|
||||||
|
req := d.client.R().
|
||||||
|
SetContext(ctx).
|
||||||
|
SetHeaders(headers).
|
||||||
|
SetBody(chunkBuf).
|
||||||
|
SetResult(&chunkResp)
|
||||||
|
|
||||||
|
if _, err := req.Post(initData.UploadURL); err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
if chunkResp.ErrCode != 0 {
|
||||||
|
return "", fmt.Errorf("chunk upload failed with code %d", chunkResp.ErrCode)
|
||||||
|
}
|
||||||
|
finishedFlag = chunkResp.FinishedFlag
|
||||||
|
offset += chunkLen
|
||||||
|
up(float64(offset) * 100 / float64(size))
|
||||||
|
}
|
||||||
|
|
||||||
|
if finishedFlag == "" {
|
||||||
|
return "", fmt.Errorf("upload finished without server code")
|
||||||
|
}
|
||||||
|
return finishedFlag, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *BitQiu) completeChunkUpload(ctx context.Context, initData UploadInitData, parentID, parentPath, name string, size int64, md5sum, serverCode string) (model.Obj, error) {
|
||||||
|
form := map[string]string{
|
||||||
|
"currentPage": "1",
|
||||||
|
"limit": "1",
|
||||||
|
"userId": strconv.FormatInt(initData.UserID, 10),
|
||||||
|
"status": "0",
|
||||||
|
"parentId": parentID,
|
||||||
|
"name": name,
|
||||||
|
"fileUid": initData.FileUID,
|
||||||
|
"fileSid": initData.FileSID,
|
||||||
|
"size": strconv.FormatInt(size, 10),
|
||||||
|
"serverCode": serverCode,
|
||||||
|
"snapTime": "",
|
||||||
|
"hash": md5sum,
|
||||||
|
"sampleMd5": md5sum,
|
||||||
|
"org_channel": orgChannel,
|
||||||
|
}
|
||||||
|
|
||||||
|
var resp Response[Resource]
|
||||||
|
if err := d.postForm(ctx, uploadCompleteURL, form, &resp); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if resp.Code != successCode {
|
||||||
|
return nil, fmt.Errorf("complete upload failed: %s", resp.Message)
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp.Data.toObject(parentID, parentPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *BitQiu) login(ctx context.Context) error {
|
func (d *BitQiu) login(ctx context.Context) error {
|
||||||
|
|||||||
@ -81,3 +81,23 @@ func (t AsyncTask) ErrorMessage() string {
|
|||||||
}
|
}
|
||||||
return "unknown error"
|
return "unknown error"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type UploadInitData struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Size int64 `json:"size"`
|
||||||
|
Token string `json:"token"`
|
||||||
|
FileUID string `json:"fileUid"`
|
||||||
|
FileSID string `json:"fileSid"`
|
||||||
|
ParentID string `json:"parentId"`
|
||||||
|
UserID int64 `json:"userId"`
|
||||||
|
SerialNumber string `json:"serialNumber"`
|
||||||
|
UploadURL string `json:"uploadUrl"`
|
||||||
|
AppID string `json:"appId"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ChunkUploadResponse struct {
|
||||||
|
ErrCode int `json:"errCode"`
|
||||||
|
Offset int64 `json:"offset"`
|
||||||
|
Finished int `json:"finished"`
|
||||||
|
FinishedFlag string `json:"finishedFlag"`
|
||||||
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user