1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2024-07-02 07:06:44 +02:00

fs: synchronized meta file writing

fix https://github.com/chrislusf/seaweedfs/issues/1175
This commit is contained in:
Chris Lu 2019-12-29 20:19:51 -08:00
parent 509f314350
commit 09043c8e5a

View file

@ -74,6 +74,19 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.
}
defer dst.Close()
var wg sync.WaitGroup
wg.Add(1)
outputChan := make(chan []byte, 1024)
go func() {
sizeBuf := make([]byte, 4)
for b := range outputChan {
util.Uint32toBytes(sizeBuf, uint32(len(b)))
dst.Write(sizeBuf)
dst.Write(b)
}
wg.Done()
}()
var dirCount, fileCount uint64
err = doTraverseBFS(ctx, writer, commandEnv.getFilerClient(filerServer, filerPort), filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) {
@ -89,11 +102,7 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.
return
}
sizeBuf := make([]byte, 4)
util.Uint32toBytes(sizeBuf, uint32(len(bytes)))
dst.Write(sizeBuf)
dst.Write(bytes)
outputChan <- bytes
if entry.IsDirectory {
atomic.AddUint64(&dirCount, 1)
@ -107,9 +116,13 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.
})
close(outputChan)
wg.Wait()
if err == nil {
fmt.Fprintf(writer, "\ntotal %d directories, %d files", dirCount, fileCount)
fmt.Fprintf(writer, "\nmeta data for http://%s:%d%s is saved to %s\n", filerServer, filerPort, path, fileName)
fmt.Fprintf(writer, "total %d directories, %d files\n", dirCount, fileCount)
fmt.Fprintf(writer, "meta data for http://%s:%d%s is saved to %s\n", filerServer, filerPort, path, fileName)
}
return err