diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 5fbddc07e..977e2a6b8 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -331,8 +331,6 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err var mimeType string var chunks []*filer_pb.FileChunk - var assignResult *filer_pb.AssignVolumeResponse - var assignError error if task.fileMode&os.ModeDir == 0 && task.fileSize > 0 { @@ -342,66 +340,32 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err return err } - err = util.Retry("upload", func() error { - // assign a volume - assignErr := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.AssignVolumeRequest{ - Count: 1, - Replication: *worker.options.replication, - Collection: *worker.options.collection, - TtlSec: worker.options.ttlSec, - DiskType: *worker.options.diskType, - Path: task.destinationUrlPath, - } - - assignResult, assignError = client.AssignVolume(context.Background(), request) - if assignError != nil { - return fmt.Errorf("assign volume failure %v: %v", request, assignError) - } - if assignResult.Error != "" { - return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error) - } - if assignResult.Location.Url == "" { - return fmt.Errorf("assign volume failure %v: %v", request, assignResult) - } - return nil - }) - if assignErr != nil { - return assignErr - } - - // upload data - targetUrl := "http://" + assignResult.Location.Url + "/" + assignResult.FileId - uploadOption := &operation.UploadOption{ - UploadUrl: targetUrl, + finalFileId, uploadResult, flushErr, _ := operation.UploadWithRetry( + worker, + &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: *worker.options.replication, + Collection: *worker.options.collection, + TtlSec: worker.options.ttlSec, + DiskType: *worker.options.diskType, + Path: task.destinationUrlPath, + }, + &operation.UploadOption{ Filename: fileName, Cipher: worker.options.cipher, IsInputCompressed: false, MimeType: mimeType, PairMap: nil, - Jwt: security.EncodedJwt(assignResult.Auth), - } - uploadResult, err := operation.UploadData(data, uploadOption) - if err != nil { - return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err) - } - if uploadResult.Error != "" { - return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) - } - if *worker.options.verbose { - fmt.Printf("uploaded %s to %s\n", fileName, targetUrl) - } - - fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName) - chunks = append(chunks, uploadResult.ToPbFileChunk(assignResult.FileId, 0)) - - return nil - }) - if err != nil { - return fmt.Errorf("upload %v: %v\n", fileName, err) + }, + func(host, fileId string) string { + return fmt.Sprintf("http://%s/%s", host, fileId) + }, + util.NewBytesReader(data), + ) + if flushErr != nil { + return flushErr } - + chunks = append(chunks, uploadResult.ToPbFileChunk(finalFileId, 0)) } if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {