mirror of
https://github.com/chrislusf/seaweedfs
synced 2024-09-14 21:10:36 +02:00
fix data writes error when consecutive calls overlaps with previous writes
This commit is contained in:
parent
43ff60cb23
commit
75e749039b
|
@ -9,6 +9,7 @@ import (
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
"github.com/chrislusf/seaweedfs/weed/operation"
|
"github.com/chrislusf/seaweedfs/weed/operation"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ContinuousDirtyPages struct {
|
type ContinuousDirtyPages struct {
|
||||||
|
@ -17,6 +18,7 @@ type ContinuousDirtyPages struct {
|
||||||
Size int64
|
Size int64
|
||||||
Data []byte
|
Data []byte
|
||||||
f *File
|
f *File
|
||||||
|
lock sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func newDirtyPages(file *File) *ContinuousDirtyPages {
|
func newDirtyPages(file *File) *ContinuousDirtyPages {
|
||||||
|
@ -28,6 +30,9 @@ func newDirtyPages(file *File) *ContinuousDirtyPages {
|
||||||
|
|
||||||
func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) {
|
func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) {
|
||||||
|
|
||||||
|
pages.lock.Lock()
|
||||||
|
defer pages.lock.Unlock()
|
||||||
|
|
||||||
var chunk *filer_pb.FileChunk
|
var chunk *filer_pb.FileChunk
|
||||||
|
|
||||||
if len(data) > len(pages.Data) {
|
if len(data) > len(pages.Data) {
|
||||||
|
@ -37,13 +42,14 @@ func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, da
|
||||||
if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil {
|
if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil {
|
||||||
if chunk != nil {
|
if chunk != nil {
|
||||||
glog.V(4).Infof("%s/%s flush existing [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
|
glog.V(4).Infof("%s/%s flush existing [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
|
||||||
|
chunks = append(chunks, chunk)
|
||||||
}
|
}
|
||||||
chunks = append(chunks, chunk)
|
|
||||||
} else {
|
} else {
|
||||||
glog.V(0).Infof("%s/%s failed to flush1 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
|
glog.V(0).Infof("%s/%s failed to flush1 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
pages.Size = 0
|
pages.Size = 0
|
||||||
|
pages.Offset = 0
|
||||||
|
|
||||||
// flush the big page
|
// flush the big page
|
||||||
if chunk, err = pages.saveToStorage(ctx, data, offset); err == nil {
|
if chunk, err = pages.saveToStorage(ctx, data, offset); err == nil {
|
||||||
|
@ -77,12 +83,18 @@ func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, da
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
pages.Offset = offset
|
pages.Offset = offset
|
||||||
pages.Size = int64(len(data))
|
|
||||||
copy(pages.Data, data)
|
copy(pages.Data, data)
|
||||||
|
pages.Size = int64(len(data))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
copy(pages.Data[offset-pages.Offset:], data)
|
if offset != pages.Offset+pages.Size {
|
||||||
|
// when this happens, debug shows the data overlapping with existing data is empty
|
||||||
|
// the data is not just append
|
||||||
|
copy(pages.Data[pages.Size:], data[pages.Offset+pages.Size-offset:])
|
||||||
|
} else {
|
||||||
|
copy(pages.Data[offset-pages.Offset:], data)
|
||||||
|
}
|
||||||
pages.Size = max(pages.Size, offset+int64(len(data))-pages.Offset)
|
pages.Size = max(pages.Size, offset+int64(len(data))-pages.Offset)
|
||||||
|
|
||||||
return
|
return
|
||||||
|
@ -90,12 +102,16 @@ func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, da
|
||||||
|
|
||||||
func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunk *filer_pb.FileChunk, err error) {
|
func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunk *filer_pb.FileChunk, err error) {
|
||||||
|
|
||||||
|
pages.lock.Lock()
|
||||||
|
defer pages.lock.Unlock()
|
||||||
|
|
||||||
if pages.Size == 0 {
|
if pages.Size == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil {
|
if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil {
|
||||||
pages.Size = 0
|
pages.Size = 0
|
||||||
|
pages.Offset = 0
|
||||||
if chunk != nil {
|
if chunk != nil {
|
||||||
glog.V(4).Infof("%s/%s flush [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
|
glog.V(4).Infof("%s/%s flush [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
|
||||||
}
|
}
|
||||||
|
@ -104,14 +120,16 @@ func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunk *f
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pages *ContinuousDirtyPages) saveExistingPagesToStorage(ctx context.Context) (*filer_pb.FileChunk, error) {
|
func (pages *ContinuousDirtyPages) saveExistingPagesToStorage(ctx context.Context) (*filer_pb.FileChunk, error) {
|
||||||
|
|
||||||
|
if pages.Size == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
return pages.saveToStorage(ctx, pages.Data[:pages.Size], pages.Offset)
|
return pages.saveToStorage(ctx, pages.Data[:pages.Size], pages.Offset)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte, offset int64) (*filer_pb.FileChunk, error) {
|
func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte, offset int64) (*filer_pb.FileChunk, error) {
|
||||||
|
|
||||||
if pages.Size == 0 {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
var fileId, host string
|
var fileId, host string
|
||||||
|
|
||||||
|
|
|
@ -140,6 +140,7 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f
|
||||||
|
|
||||||
chunks, err := fh.dirtyPages.AddPage(ctx, req.Offset, req.Data)
|
chunks, err := fh.dirtyPages.AddPage(ctx, req.Offset, req.Data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
glog.Errorf("%+v/%v write fh %d: [%d,%d): %v", fh.f.dir.Path, fh.f.Name, fh.handle, req.Offset, req.Offset+int64(len(req.Data)), err)
|
||||||
return fmt.Errorf("write %s/%s at [%d,%d): %v", fh.f.dir.Path, fh.f.Name, req.Offset, req.Offset+int64(len(req.Data)), err)
|
return fmt.Errorf("write %s/%s at [%d,%d): %v", fh.f.dir.Path, fh.f.Name, req.Offset, req.Offset+int64(len(req.Data)), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -179,7 +180,7 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
|
||||||
|
|
||||||
chunk, err := fh.dirtyPages.FlushToStorage(ctx)
|
chunk, err := fh.dirtyPages.FlushToStorage(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infof("flush %s/%s to %s [%d,%d): %v", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
|
glog.Errorf("flush %s/%s to %s [%d,%d): %v", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
|
||||||
return fmt.Errorf("flush %s/%s to %s [%d,%d): %v", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
|
return fmt.Errorf("flush %s/%s to %s [%d,%d): %v", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
|
||||||
}
|
}
|
||||||
if chunk != nil {
|
if chunk != nil {
|
||||||
|
|
|
@ -68,7 +68,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
|
||||||
count, e := vs.store.ReadVolumeNeedle(volumeId, n)
|
count, e := vs.store.ReadVolumeNeedle(volumeId, n)
|
||||||
glog.V(4).Infoln("read bytes", count, "error", e)
|
glog.V(4).Infoln("read bytes", count, "error", e)
|
||||||
if e != nil || count < 0 {
|
if e != nil || count < 0 {
|
||||||
glog.V(0).Infoln("read error:", e, r.URL.Path)
|
glog.V(0).Infof("read %s error:", r.URL.Path, e)
|
||||||
w.WriteHeader(http.StatusNotFound)
|
w.WriteHeader(http.StatusNotFound)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue