From dac9c28d05cd61402cf5fbac42507b49b5a4f7a5 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 5 Dec 2022 12:32:27 -0800 Subject: [PATCH] Revert "refactor: moved to locked entry" (#4035) * Revert "refactor: moved to locked entry" This reverts commit 94bc9afd9d3f8e049219c1cdc9f0d6e0eb4cf456. * only add LockedEntry, no changes to entryLock * fix compilation --- weed/mount/filehandle.go | 14 +++++++++----- weed/mount/weedfs_attr.go | 4 ++++ weed/mount/weedfs_dir_lookup.go | 2 ++ weed/mount/weedfs_file_copy_range.go | 4 ++++ weed/mount/weedfs_file_lseek.go | 2 ++ weed/mount/weedfs_file_sync.go | 3 +++ weed/mount/weedfs_xattr.go | 12 ++++++++++-- 7 files changed, 34 insertions(+), 7 deletions(-) diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go index 7281ede66..b6ec3d2da 100644 --- a/weed/mount/filehandle.go +++ b/weed/mount/filehandle.go @@ -8,16 +8,18 @@ import ( "golang.org/x/exp/slices" "golang.org/x/sync/semaphore" "math" + "sync" ) type FileHandleId uint64 type FileHandle struct { - fh FileHandleId - counter int64 - entry *LockedEntry - inode uint64 - wfs *WFS + fh FileHandleId + counter int64 + entry *LockedEntry + entryLock sync.Mutex + inode uint64 + wfs *WFS // cache file has been written to dirtyMetadata bool @@ -69,6 +71,8 @@ func (fh *FileHandle) UpdateEntry(fn func(entry *filer_pb.Entry)) *filer_pb.Entr } func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) { + fh.entryLock.Lock() + defer fh.entryLock.Unlock() if fh.entry == nil { return diff --git a/weed/mount/weedfs_attr.go b/weed/mount/weedfs_attr.go index 3550066e0..1d58e0852 100644 --- a/weed/mount/weedfs_attr.go +++ b/weed/mount/weedfs_attr.go @@ -44,6 +44,10 @@ func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse if status != fuse.OK { return status } + if fh != nil { + fh.entryLock.Lock() + defer fh.entryLock.Unlock() + } if size, ok := input.GetSize(); ok && entry != nil { glog.V(4).Infof("%v setattr set size=%v chunks=%d", path, size, len(entry.GetChunks())) diff --git a/weed/mount/weedfs_dir_lookup.go b/weed/mount/weedfs_dir_lookup.go index 015f04e99..2d3ea8ae5 100644 --- a/weed/mount/weedfs_dir_lookup.go +++ b/weed/mount/weedfs_dir_lookup.go @@ -58,10 +58,12 @@ func (wfs *WFS) Lookup(cancel <-chan struct{}, header *fuse.InHeader, name strin inode := wfs.inodeToPath.Lookup(fullFilePath, localEntry.Crtime.Unix(), localEntry.IsDirectory(), len(localEntry.HardLinkId) > 0, localEntry.Inode, true) if fh, found := wfs.fhmap.FindFileHandle(inode); found { + fh.entryLock.Lock() if entry := fh.GetEntry(); entry != nil { glog.V(4).Infof("lookup opened file %s size %d", dirPath.Child(localEntry.Name()), filer.FileSize(entry)) localEntry = filer.FromPbEntry(string(dirPath), entry) } + fh.entryLock.Unlock() } wfs.outputFilerEntry(out, inode, localEntry) diff --git a/weed/mount/weedfs_file_copy_range.go b/weed/mount/weedfs_file_copy_range.go index 4b0d22137..bc092a252 100644 --- a/weed/mount/weedfs_file_copy_range.go +++ b/weed/mount/weedfs_file_copy_range.go @@ -46,6 +46,8 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn) // lock source and target file handles fhOut.orderedMutex.Acquire(context.Background(), 1) defer fhOut.orderedMutex.Release(1) + fhOut.entryLock.Lock() + defer fhOut.entryLock.Unlock() if fhOut.entry == nil { return 0, fuse.ENOENT @@ -54,6 +56,8 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn) if fhIn.fh != fhOut.fh { fhIn.orderedMutex.Acquire(context.Background(), 1) defer fhIn.orderedMutex.Release(1) + fhIn.entryLock.Lock() + defer fhIn.entryLock.Unlock() } // directories are not supported diff --git a/weed/mount/weedfs_file_lseek.go b/weed/mount/weedfs_file_lseek.go index 69edf7144..9d6402f96 100644 --- a/weed/mount/weedfs_file_lseek.go +++ b/weed/mount/weedfs_file_lseek.go @@ -38,6 +38,8 @@ func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekO // lock the file until the proper offset was calculated fh.orderedMutex.Acquire(context.Background(), 1) defer fh.orderedMutex.Release(1) + fh.entryLock.Lock() + defer fh.entryLock.Unlock() fileSize := int64(filer.FileSize(fh.GetEntry())) offset := max(int64(in.Offset), 0) diff --git a/weed/mount/weedfs_file_sync.go b/weed/mount/weedfs_file_sync.go index 244963ad3..a08a37f9c 100644 --- a/weed/mount/weedfs_file_sync.go +++ b/weed/mount/weedfs_file_sync.go @@ -118,6 +118,9 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status { err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + fh.entryLock.Lock() + defer fh.entryLock.Unlock() + entry := fh.GetEntry() if entry == nil { return nil diff --git a/weed/mount/weedfs_xattr.go b/weed/mount/weedfs_xattr.go index 7a5109ee8..b03fa01f1 100644 --- a/weed/mount/weedfs_xattr.go +++ b/weed/mount/weedfs_xattr.go @@ -103,13 +103,17 @@ func (wfs *WFS) SetXAttr(cancel <-chan struct{}, input *fuse.SetXAttrIn, attr st } } - path, _, entry, status := wfs.maybeReadEntry(input.NodeId) + path, fh, entry, status := wfs.maybeReadEntry(input.NodeId) if status != fuse.OK { return status } if entry == nil { return fuse.ENOENT } + if fh != nil { + fh.entryLock.Lock() + defer fh.entryLock.Unlock() + } if entry.Extended == nil { entry.Extended = make(map[string][]byte) @@ -177,13 +181,17 @@ func (wfs *WFS) RemoveXAttr(cancel <-chan struct{}, header *fuse.InHeader, attr if len(attr) == 0 { return fuse.EINVAL } - path, _, entry, status := wfs.maybeReadEntry(header.NodeId) + path, fh, entry, status := wfs.maybeReadEntry(header.NodeId) if status != fuse.OK { return status } if entry == nil { return fuse.OK } + if fh != nil { + fh.entryLock.Lock() + defer fh.entryLock.Unlock() + } if entry.Extended == nil { return fuse.ENOATTR