1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2024-07-02 23:26:42 +02:00

filer: retryable data chunk upload

This commit is contained in:
chrislu 2022-08-20 19:15:44 -07:00
parent 4081d50607
commit f8fa430257

View file

@ -256,25 +256,37 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAsChunkFunctionType { func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAsChunkFunctionType {
return func(reader io.Reader, name string, offset int64) (*filer_pb.FileChunk, error) { return func(reader io.Reader, name string, offset int64) (*filer_pb.FileChunk, error) {
// assign one file id for one chunk var fileId string
fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(so) var uploadResult *operation.UploadResult
if assignErr != nil {
return nil, assignErr
}
// upload the chunk to the volume server err := util.Retry("saveAsChunk", func() error {
uploadOption := &operation.UploadOption{ // assign one file id for one chunk
UploadUrl: urlLocation, assignedFileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(so)
Filename: name, if assignErr != nil {
Cipher: fs.option.Cipher, return assignErr
IsInputCompressed: false, }
MimeType: "",
PairMap: nil, fileId = assignedFileId
Jwt: auth,
} // upload the chunk to the volume server
uploadResult, uploadErr, _ := operation.Upload(reader, uploadOption) uploadOption := &operation.UploadOption{
if uploadErr != nil { UploadUrl: urlLocation,
return nil, uploadErr Filename: name,
Cipher: fs.option.Cipher,
IsInputCompressed: false,
MimeType: "",
PairMap: nil,
Jwt: auth,
}
var uploadErr error
uploadResult, uploadErr, _ = operation.Upload(reader, uploadOption)
if uploadErr != nil {
return uploadErr
}
return nil
})
if err != nil {
return nil, err
} }
return uploadResult.ToPbFileChunk(fileId, offset), nil return uploadResult.ToPbFileChunk(fileId, offset), nil