1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2024-07-06 00:57:10 +02:00

assign a different volume on large file copying

This commit is contained in:
Chris Lu 2018-05-30 01:05:26 -07:00
parent 2fe0d479f1
commit 26e7cd8c75

View file

@ -135,9 +135,18 @@ func doEachCopy(fileOrDir string, host string, path string) bool {
chunkCount = int(fi.Size()/chunkSize) + 1
}
if chunkCount == 1 {
return uploadFileAsOne(host, path, f, fi)
}
return uploadFileInChunks(host, path, f, fi, chunkCount, chunkSize)
}
func uploadFileAsOne(filerUrl string, urlFolder string, f *os.File, fi os.FileInfo) bool {
// assign a volume
assignResult, err := operation.Assign(*copy.master, &operation.VolumeAssignRequest{
Count: uint64(chunkCount),
Count: 1,
Replication: *copy.replication,
Collection: *copy.collection,
Ttl: *copy.ttl,
@ -146,14 +155,6 @@ func doEachCopy(fileOrDir string, host string, path string) bool {
fmt.Printf("Failed to assign from %s: %v\n", *copy.master, err)
}
if chunkCount == 1 {
return uploadFileAsOne(host, path, assignResult, f, fi)
}
return uploadFileInChunks(host, path, assignResult, f, fi, chunkCount, chunkSize)
}
func uploadFileAsOne(filerUrl string, urlFolder string, assignResult *operation.AssignResult, f *os.File, fi os.FileInfo) bool {
// upload the file content
mimeType := detectMimeType(f)
@ -182,17 +183,24 @@ func uploadFileAsOne(filerUrl string, urlFolder string, assignResult *operation.
return true
}
func uploadFileInChunks(filerUrl string, urlFolder string, assignResult *operation.AssignResult, f *os.File, fi os.FileInfo, chunkCount int, chunkSize int64) bool {
func uploadFileInChunks(filerUrl string, urlFolder string, f *os.File, fi os.FileInfo, chunkCount int, chunkSize int64) bool {
var chunks []*filer_pb.FileChunk
for i := int64(0); i < int64(chunkCount); i++ {
fileId := assignResult.Fid
if i > 0 {
fileId += "_" + strconv.FormatInt(i, 10)
// assign a volume
assignResult, err := operation.Assign(*copy.master, &operation.VolumeAssignRequest{
Count: 1,
Replication: *copy.replication,
Collection: *copy.collection,
Ttl: *copy.ttl,
})
if err != nil {
fmt.Printf("Failed to assign from %s: %v\n", *copy.master, err)
}
targetUrl := "http://" + assignResult.Url + "/" + fileId
targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
uploadResult, err := operation.Upload(targetUrl,
f.Name()+"-"+strconv.FormatInt(i+1, 10),
@ -207,12 +215,12 @@ func uploadFileInChunks(filerUrl string, urlFolder string, assignResult *operati
return false
}
chunks = append(chunks, &filer_pb.FileChunk{
FileId: fileId,
FileId: assignResult.Fid,
Offset: i * chunkSize,
Size: uint64(uploadResult.Size),
Mtime: time.Now().UnixNano(),
})
fmt.Printf("uploaded %s split %d => %s\n", f.Name(), i, targetUrl)
fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", f.Name(), i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
}
if err := withFilerClient(filerUrl, func(client filer_pb.SeaweedFilerClient) error {
@ -232,10 +240,6 @@ func uploadFileInChunks(filerUrl string, urlFolder string, assignResult *operati
},
}
fmt.Printf("%s%s set chunks: %v", urlFolder, f.Name(), len(chunks))
for i, chunk := range chunks {
fmt.Printf("%s%s chunks %d: %v [%d,%d)\n", urlFolder, f.Name(), i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
}
if _, err := client.CreateEntry(context.Background(), request); err != nil {
return fmt.Errorf("update fh: %v", err)
}
@ -245,6 +249,8 @@ func uploadFileInChunks(filerUrl string, urlFolder string, assignResult *operati
return false
}
fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), filerUrl, urlFolder, f.Name())
return true
}