From 75e749039ba3ae441281266d7eb6abeb07c161bd Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 7 Sep 2018 13:11:43 -0700 Subject: [PATCH] fix data writes error when consecutive calls overlaps with previous writes --- weed/filesys/dirty_page.go | 30 +++++++++++++++++----- weed/filesys/filehandle.go | 3 ++- weed/server/volume_server_handlers_read.go | 2 +- 3 files changed, 27 insertions(+), 8 deletions(-) diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index f4e47950e..dd080f721 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -9,6 +9,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "sync" ) type ContinuousDirtyPages struct { @@ -17,6 +18,7 @@ type ContinuousDirtyPages struct { Size int64 Data []byte f *File + lock sync.Mutex } func newDirtyPages(file *File) *ContinuousDirtyPages { @@ -28,6 +30,9 @@ func newDirtyPages(file *File) *ContinuousDirtyPages { func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) { + pages.lock.Lock() + defer pages.lock.Unlock() + var chunk *filer_pb.FileChunk if len(data) > len(pages.Data) { @@ -37,13 +42,14 @@ func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, da if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil { if chunk != nil { glog.V(4).Infof("%s/%s flush existing [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) + chunks = append(chunks, chunk) } - chunks = append(chunks, chunk) } else { glog.V(0).Infof("%s/%s failed to flush1 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err) return } pages.Size = 0 + pages.Offset = 0 // flush the big page if chunk, err = pages.saveToStorage(ctx, data, offset); err == nil { @@ -77,12 +83,18 @@ func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, da return } pages.Offset = offset - pages.Size = int64(len(data)) copy(pages.Data, data) + pages.Size = int64(len(data)) return } - copy(pages.Data[offset-pages.Offset:], data) + if offset != pages.Offset+pages.Size { + // when this happens, debug shows the data overlapping with existing data is empty + // the data is not just append + copy(pages.Data[pages.Size:], data[pages.Offset+pages.Size-offset:]) + } else { + copy(pages.Data[offset-pages.Offset:], data) + } pages.Size = max(pages.Size, offset+int64(len(data))-pages.Offset) return @@ -90,12 +102,16 @@ func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, da func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunk *filer_pb.FileChunk, err error) { + pages.lock.Lock() + defer pages.lock.Unlock() + if pages.Size == 0 { return nil, nil } if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil { pages.Size = 0 + pages.Offset = 0 if chunk != nil { glog.V(4).Infof("%s/%s flush [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) } @@ -104,14 +120,16 @@ func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunk *f } func (pages *ContinuousDirtyPages) saveExistingPagesToStorage(ctx context.Context) (*filer_pb.FileChunk, error) { + + if pages.Size == 0 { + return nil, nil + } + return pages.saveToStorage(ctx, pages.Data[:pages.Size], pages.Offset) } func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte, offset int64) (*filer_pb.FileChunk, error) { - if pages.Size == 0 { - return nil, nil - } var fileId, host string diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 0c13db984..192bf6d52 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -140,6 +140,7 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f chunks, err := fh.dirtyPages.AddPage(ctx, req.Offset, req.Data) if err != nil { + glog.Errorf("%+v/%v write fh %d: [%d,%d): %v", fh.f.dir.Path, fh.f.Name, fh.handle, req.Offset, req.Offset+int64(len(req.Data)), err) return fmt.Errorf("write %s/%s at [%d,%d): %v", fh.f.dir.Path, fh.f.Name, req.Offset, req.Offset+int64(len(req.Data)), err) } @@ -179,7 +180,7 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { chunk, err := fh.dirtyPages.FlushToStorage(ctx) if err != nil { - glog.V(0).Infof("flush %s/%s to %s [%d,%d): %v", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size), err) + glog.Errorf("flush %s/%s to %s [%d,%d): %v", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size), err) return fmt.Errorf("flush %s/%s to %s [%d,%d): %v", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size), err) } if chunk != nil { diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index 64b90b8e7..66f8a048e 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -68,7 +68,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) count, e := vs.store.ReadVolumeNeedle(volumeId, n) glog.V(4).Infoln("read bytes", count, "error", e) if e != nil || count < 0 { - glog.V(0).Infoln("read error:", e, r.URL.Path) + glog.V(0).Infof("read %s error:", r.URL.Path, e) w.WriteHeader(http.StatusNotFound) return }