diff --git a/weed/mount/dirty_pages_chunked.go b/weed/mount/dirty_pages_chunked.go index e0d764070..52308e0e5 100644 --- a/weed/mount/dirty_pages_chunked.go +++ b/weed/mount/dirty_pages_chunked.go @@ -84,7 +84,7 @@ func (pages *ChunkedDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader } chunk.Mtime = mtime pages.collection, pages.replication = collection, replication - pages.fh.addChunks([]*filer_pb.FileChunk{chunk}) + pages.fh.AddChunks([]*filer_pb.FileChunk{chunk}) pages.fh.entryViewCache = nil glog.V(3).Infof("%v saveToStorage %s [%d,%d)", fileFullPath, chunk.FileId, offset, offset+size) diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go index 8ce31a078..49918c104 100644 --- a/weed/mount/filehandle.go +++ b/weed/mount/filehandle.go @@ -13,12 +13,12 @@ import ( type FileHandleId uint64 type FileHandle struct { - fh FileHandleId - counter int64 - entry *filer_pb.Entry - chunkAddLock sync.Mutex - inode uint64 - wfs *WFS + fh FileHandleId + counter int64 + entry *filer_pb.Entry + entryLock sync.Mutex + inode uint64 + wfs *WFS // cache file has been written to dirtyMetadata bool @@ -53,7 +53,20 @@ func (fh *FileHandle) FullPath() util.FullPath { return fp } -func (fh *FileHandle) addChunks(chunks []*filer_pb.FileChunk) { +func (fh *FileHandle) GetEntry() *filer_pb.Entry { + fh.entryLock.Lock() + defer fh.entryLock.Unlock() + return fh.entry +} +func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) { + fh.entryLock.Lock() + defer fh.entryLock.Unlock() + fh.entry = entry +} + +func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) { + fh.entryLock.Lock() + defer fh.entryLock.Unlock() // find the earliest incoming chunk newChunks := chunks @@ -82,10 +95,8 @@ func (fh *FileHandle) addChunks(chunks []*filer_pb.FileChunk) { glog.V(4).Infof("%s existing %d chunks adds %d more", fh.FullPath(), len(fh.entry.Chunks), len(chunks)) - fh.chunkAddLock.Lock() fh.entry.Chunks = append(fh.entry.Chunks, newChunks...) fh.entryViewCache = nil - fh.chunkAddLock.Unlock() } func (fh *FileHandle) Release() { diff --git a/weed/mount/filehandle_read.go b/weed/mount/filehandle_read.go index 5439b8bfd..88ab8612c 100644 --- a/weed/mount/filehandle_read.go +++ b/weed/mount/filehandle_read.go @@ -25,6 +25,9 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { fileFullPath := fh.FullPath() + fh.entryLock.Lock() + defer fh.entryLock.Unlock() + entry := fh.entry if entry == nil { return 0, io.EOF diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index 2ab82b3ed..6437499bf 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -131,10 +131,11 @@ func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle } var found bool if fh, found = wfs.fhmap.FindFileHandle(inode); found { - if fh.entry.Attributes == nil { - fh.entry.Attributes = &filer_pb.FuseAttributes{} + entry = fh.GetEntry() + if entry != nil && fh.entry.Attributes == nil { + entry.Attributes = &filer_pb.FuseAttributes{} } - return path, fh, fh.entry, fuse.OK + return path, fh, entry, fuse.OK } entry, status = wfs.maybeLoadEntry(path) return diff --git a/weed/mount/weedfs_attr.go b/weed/mount/weedfs_attr.go index cb935d0e4..be504f5e2 100644 --- a/weed/mount/weedfs_attr.go +++ b/weed/mount/weedfs_attr.go @@ -43,6 +43,10 @@ func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse if status != fuse.OK { return status } + if fh != nil { + fh.entryLock.Lock() + defer fh.entryLock.Unlock() + } if size, ok := input.GetSize(); ok { glog.V(4).Infof("%v setattr set size=%v chunks=%d", path, size, len(entry.Chunks)) diff --git a/weed/mount/weedfs_file_sync.go b/weed/mount/weedfs_file_sync.go index 1c80329c2..f3536ddce 100644 --- a/weed/mount/weedfs_file_sync.go +++ b/weed/mount/weedfs_file_sync.go @@ -118,6 +118,9 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status { err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + fh.entryLock.Lock() + defer fh.entryLock.Unlock() + entry := fh.entry if entry == nil { return nil diff --git a/weed/mount/weedfs_xattr.go b/weed/mount/weedfs_xattr.go index c85a1b3a1..2fb2b0741 100644 --- a/weed/mount/weedfs_xattr.go +++ b/weed/mount/weedfs_xattr.go @@ -94,10 +94,15 @@ func (wfs *WFS) SetXAttr(cancel <-chan struct{}, input *fuse.SetXAttrIn, attr st } } - path, _, entry, status := wfs.maybeReadEntry(input.NodeId) + path, fh, entry, status := wfs.maybeReadEntry(input.NodeId) if status != fuse.OK { return status } + if fh != nil { + fh.entryLock.Lock() + defer fh.entryLock.Unlock() + } + if entry.Extended == nil { entry.Extended = make(map[string][]byte) } @@ -154,10 +159,15 @@ func (wfs *WFS) RemoveXAttr(cancel <-chan struct{}, header *fuse.InHeader, attr if len(attr) == 0 { return fuse.EINVAL } - path, _, entry, status := wfs.maybeReadEntry(header.NodeId) + path, fh, entry, status := wfs.maybeReadEntry(header.NodeId) if status != fuse.OK { return status } + if fh != nil { + fh.entryLock.Lock() + defer fh.entryLock.Unlock() + } + if entry.Extended == nil { return fuse.ENOATTR }