diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 5a878e675..86f43348f 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -13,6 +13,12 @@ import ( "github.com/chrislusf/seaweedfs/weed/security" "path" "net/http" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "strconv" + "io" + "time" + "google.golang.org/grpc" + "context" ) var ( @@ -144,10 +150,10 @@ func doEachCopy(fileOrDir string, host string, path string) bool { return uploadFileAsOne(host, path, assignResult, f, fi) } - return uploadFileInChunks(host, path, assignResult, f, chunkCount) + return uploadFileInChunks(host, path, assignResult, f, fi, chunkCount, chunkSize) } -func uploadFileAsOne(filerUrl string, urlPath string, assignResult *operation.AssignResult, f *os.File, fi os.FileInfo) bool { +func uploadFileAsOne(filerUrl string, urlFolder string, assignResult *operation.AssignResult, f *os.File, fi os.FileInfo) bool { // upload the file content mimeType := detectMimeType(f) @@ -164,19 +170,82 @@ func uploadFileAsOne(filerUrl string, urlPath string, assignResult *operation.As fmt.Printf("upload %v to %s result: %v\n", f.Name(), targetUrl, uploadResult.Error) return false } + fmt.Printf("uploaded %s to %s\n", f.Name(), targetUrl) - if err = filer_operation.RegisterFile(filerUrl, filepath.Join(urlPath, f.Name()), assignResult.Fid, fi.Size(), + if err = filer_operation.RegisterFile(filerUrl, filepath.Join(urlFolder, f.Name()), assignResult.Fid, fi.Size(), os.Getuid(), os.Getgid(), copy.secret); err != nil { fmt.Printf("Failed to register file %s on %s: %v\n", f.Name(), filerUrl, err) return false } - fmt.Printf("Copied %s => http://%s%s\n", f.Name(), filerUrl, urlPath) + fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), filerUrl, urlFolder, f.Name()) return true } -func uploadFileInChunks(filerUrl string, path string, assignResult *operation.AssignResult, f *os.File, chunkCount int) bool { - return false +func uploadFileInChunks(filerUrl string, urlFolder string, assignResult *operation.AssignResult, f *os.File, fi os.FileInfo, chunkCount int, chunkSize int64) bool { + + var chunks []*filer_pb.FileChunk + + for i := int64(0); i < int64(chunkCount); i++ { + fileId := assignResult.Fid + if i > 0 { + fileId += "_" + strconv.FormatInt(i, 10) + } + + targetUrl := "http://" + assignResult.Url + "/" + fileId + + uploadResult, err := operation.Upload(targetUrl, + f.Name()+"-"+strconv.FormatInt(i+1, 10), + io.LimitReader(f, chunkSize), + false, "application/octet-stream", nil, "") + if err != nil { + fmt.Printf("upload data %v to %s: %v\n", f.Name(), targetUrl, err) + return false + } + if uploadResult.Error != "" { + fmt.Printf("upload %v to %s result: %v\n", f.Name(), targetUrl, uploadResult.Error) + return false + } + chunks = append(chunks, &filer_pb.FileChunk{ + FileId: fileId, + Offset: i * chunkSize, + Size: uint64(uploadResult.Size), + Mtime: time.Now().UnixNano(), + }) + fmt.Printf("uploaded %s split %d => %s\n", f.Name(), i, targetUrl) + } + + if err := withFilerClient(filerUrl, func(client filer_pb.SeaweedFilerClient) error { + request := &filer_pb.CreateEntryRequest{ + Directory: urlFolder, + Entry: &filer_pb.Entry{ + Name: f.Name(), + Attributes: &filer_pb.FuseAttributes{ + Crtime: time.Now().Unix(), + Mtime: time.Now().Unix(), + Gid: uint32(os.Getgid()), + Uid: uint32(os.Getuid()), + FileSize: uint64(fi.Size()), + FileMode: uint32(fi.Mode()), + }, + Chunks: chunks, + }, + } + + fmt.Printf("%s%s set chunks: %v", urlFolder, f.Name(), len(chunks)) + for i, chunk := range chunks { + fmt.Printf("%s%s chunks %d: %v [%d,%d)\n", urlFolder, f.Name(), i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size)) + } + if _, err := client.CreateEntry(context.Background(), request); err != nil { + return fmt.Errorf("update fh: %v", err) + } + return nil + }); err != nil { + fmt.Printf("upload data %v to http://%s%s%s: %v\n", f.Name(), filerUrl, urlFolder, f.Name(), err) + return false + } + + return true } func isGzipped(filename string) bool { @@ -195,3 +264,16 @@ func detectMimeType(f *os.File) string { mimeType := http.DetectContentType(head[:n]) return mimeType } + +func withFilerClient(filerAddress string, fn func(filer_pb.SeaweedFilerClient) error) error { + + grpcConnection, err := grpc.Dial(filerAddress, grpc.WithInsecure()) + if err != nil { + return fmt.Errorf("fail to dial %s: %v", filerAddress, err) + } + defer grpcConnection.Close() + + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + + return fn(client) +}