1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-09 21:02:46 +02:00
seaweedfs/weed/mount/weedfs_file_read.go
chrislu f02c4f816b Production Integration: ML-aware FUSE mount optimizations
OPTION A COMPLETE: Full production integration of ML optimization system

## Major Integration Components:

### 1. Command Line Interface
- Add ML optimization flags to 'weed mount' command:
  * -ml.enabled: Enable/disable ML optimizations
  * -ml.prefetchWorkers: Configure concurrent prefetch workers (default: 8)
  * -ml.confidenceThreshold: Set ML confidence threshold (default: 0.6)
  * -ml.maxPrefetchAhead: Max chunks to prefetch ahead (default: 8)
  * -ml.batchSize: Batch size for prefetch operations (default: 3)
- Updated command help text with ML Optimization section and usage examples
- Complete flag parsing and validation pipeline

### 2. Core WFS Integration
- Add MLIntegrationManager to WFS struct with proper lifecycle management
- Initialize ML optimization based on mount flags with custom configuration
- Integrate ML system shutdown with graceful cleanup on mount termination
- Memory-safe initialization with proper error handling

### 3. FUSE Operation Hooks
- **File Open (wfs.Open)**: Apply ML-specific optimizations (FOPEN_KEEP_CACHE, direct I/O)
- **File Read (wfs.Read)**: Record access patterns for ML prefetch decision making
- **File Close (wfs.Release)**: Update ML file tracking and cleanup resources
- **Get Attributes (wfs.GetAttr)**: Apply ML-aware attribute cache timeouts
- All hooks properly guarded with nil checks and enabled status validation

### 4. Configuration Management
- Mount options propagated through Option struct to ML system
- NewMLIntegrationManagerWithConfig for runtime configuration
- Default fallbacks and validation for all ML parameters
- Seamless integration with existing mount option processing

## Production Features:

 **Zero-Impact Design**: ML optimizations only activate when explicitly enabled
 **Backward Compatibility**: All existing mount functionality preserved
 **Resource Management**: Proper initialization, shutdown, and cleanup
 **Error Handling**: Graceful degradation if ML components fail
 **Performance Monitoring**: Integration points for metrics and debugging
 **Configuration Flexibility**: Runtime tunable parameters via mount flags

## Testing Verification:
-  Successful compilation of entire codebase
-  Mount command properly shows ML flags in help text
-  Flag parsing and validation working correctly
-  ML optimization system initializes when enabled
-  FUSE operations integrate ML hooks without breaking existing functionality

## Usage Examples:

Basic ML optimization:
backers.md
bin
build
cmd
CODE_OF_CONDUCT.md
DESIGN.md
docker
examples
filerldb2
go.mod
go.sum
k8s
LICENSE
Makefile
ML_OPTIMIZATION_PLAN.md
note
other
random
README.md
s3tests_boto3
scripts
seaweedfs-rdma-sidecar
snap
SSE-C_IMPLEMENTATION.md
telemetry
test
test-volume-data
unmaintained
util
venv
weed
chrislu          console      Aug 27 13:07
chrislu          ttys004      Aug 27 13:11
chrislu          ttys012      Aug 28 14:00
Filesystem     512-blocks       Used Available Capacity  iused      ifree %iused  Mounted on
/dev/disk3s1s1 1942700360   22000776 332038696     7%   425955 1660193480    0%   /
devfs                 494        494         0   100%      856          0  100%   /dev
/dev/disk3s6   1942700360    6291632 332038696     2%        3 1660193480    0%   /System/Volumes/VM
/dev/disk3s2   1942700360   13899920 332038696     5%     1270 1660193480    0%   /System/Volumes/Preboot
/dev/disk3s4   1942700360       4440 332038696     1%       54 1660193480    0%   /System/Volumes/Update
/dev/disk1s2      1024000      12328    983744     2%        1    4918720    0%   /System/Volumes/xarts
/dev/disk1s1      1024000      11064    983744     2%       32    4918720    0%   /System/Volumes/iSCPreboot
/dev/disk1s3      1024000       7144    983744     1%       92    4918720    0%   /System/Volumes/Hardware
/dev/disk3s5   1942700360 1566013608 332038696    83% 11900819 1660193480    1%   /System/Volumes/Data
map auto_home           0          0         0   100%        0          0     -   /System/Volumes/Data/home
Filesystem     512-blocks       Used Available Capacity  iused      ifree %iused  Mounted on
/dev/disk3s1s1 1942700360   22000776 332038696     7%   425955 1660193480    0%   /
devfs                 494        494         0   100%      856          0  100%   /dev
/dev/disk3s6   1942700360    6291632 332038696     2%        3 1660193480    0%   /System/Volumes/VM
/dev/disk3s2   1942700360   13899920 332038696     5%     1270 1660193480    0%   /System/Volumes/Preboot
/dev/disk3s4   1942700360       4440 332038696     1%       54 1660193480    0%   /System/Volumes/Update
/dev/disk1s2      1024000      12328    983744     2%        1    4918720    0%   /System/Volumes/xarts
/dev/disk1s1      1024000      11064    983744     2%       32    4918720    0%   /System/Volumes/iSCPreboot
/dev/disk1s3      1024000       7144    983744     1%       92    4918720    0%   /System/Volumes/Hardware
/dev/disk3s5   1942700360 1566013608 332038696    83% 11900819 1660193480    1%   /System/Volumes/Data
map auto_home           0          0         0   100%        0          0     -   /System/Volumes/Data/home
/Users/chrislu/go/src/github.com/seaweedfs/seaweedfs
HQ-KT6TWPKFQD
/Users/chrislu/go/src/github.com/seaweedfs/seaweedfs

Custom ML configuration:
backers.md
bin
build
cmd
CODE_OF_CONDUCT.md
DESIGN.md
docker
examples
filerldb2
go.mod
go.sum
k8s
LICENSE
Makefile
ML_OPTIMIZATION_PLAN.md
note
other
random
README.md
s3tests_boto3
scripts
seaweedfs-rdma-sidecar
snap
SSE-C_IMPLEMENTATION.md
telemetry
test
test-volume-data
unmaintained
util
venv
weed
/Users/chrislu/go/src/github.com/seaweedfs/seaweedfs

## Architecture Impact:
- Clean separation between core FUSE and ML optimization layers
- Modular design allows easy extension and maintenance
- Production-ready with comprehensive error handling and resource management
- Foundation established for advanced ML features (Phase 4)

This completes Option A: Production Integration, providing a fully functional ML-aware FUSE mount system ready for real-world ML workloads.
2025-08-30 16:06:25 -07:00

123 lines
3.5 KiB
Go

package mount
import (
"bytes"
"context"
"fmt"
"io"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/hanwen/go-fuse/v2/fuse"
"github.com/seaweedfs/seaweedfs/weed/glog"
)
/**
* Read data
*
* Read should send exactly the number of bytes requested except
* on EOF or error, otherwise the rest of the data will be
* substituted with zeroes. An exception to this is when the file
* has been opened in 'direct_io' mode, in which case the return
* value of the read system call will reflect the return value of
* this operation.
*
* fi->fh will contain the value set by the open method, or will
* be undefined if the open method didn't set any value.
*
* Valid replies:
* fuse_reply_buf
* fuse_reply_iov
* fuse_reply_data
* fuse_reply_err
*
* @param req request handle
* @param ino the inode number
* @param size number of bytes to read
* @param off offset to read from
* @param fi file information
*/
func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buff []byte) (fuse.ReadResult, fuse.Status) {
fh := wfs.GetHandle(FileHandleId(in.Fh))
if fh == nil {
return nil, fuse.ENOENT
}
fhActiveLock := fh.wfs.fhLockTable.AcquireLock("Read", fh.fh, util.SharedLock)
defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
// Create a context that will be cancelled when the cancel channel receives a signal
ctx, cancelFunc := context.WithCancel(context.Background())
go func() {
select {
case <-cancel:
cancelFunc()
}
}()
offset := int64(in.Offset)
totalRead, err := readDataByFileHandleWithContext(ctx, buff, fh, offset)
if err != nil {
glog.Warningf("file handle read %s %d: %v", fh.FullPath(), totalRead, err)
return nil, fuse.EIO
}
// Notify ML integration of file read for pattern detection
if wfs.mlIntegration != nil && totalRead > 0 {
wfs.mlIntegration.OnFileRead(in.NodeId, offset, int(totalRead))
}
if IsDebugFileReadWrite {
// print(".")
mirrorData := make([]byte, totalRead)
fh.mirrorFile.ReadAt(mirrorData, offset)
if bytes.Compare(mirrorData, buff[:totalRead]) != 0 {
againBuff := make([]byte, len(buff))
againRead, _ := readDataByFileHandleWithContext(ctx, againBuff, fh, offset)
againCorrect := bytes.Compare(mirrorData, againBuff[:againRead]) == 0
againSame := bytes.Compare(buff[:totalRead], againBuff[:againRead]) == 0
fmt.Printf("\ncompare %v [%d,%d) size:%d againSame:%v againCorrect:%v\n", fh.mirrorFile.Name(), offset, offset+totalRead, totalRead, againSame, againCorrect)
//fmt.Printf("read mirrow data: %v\n", mirrorData)
//fmt.Printf("read actual data: %v\n", againBuff[:totalRead])
}
}
return fuse.ReadResultData(buff[:totalRead]), fuse.OK
}
func readDataByFileHandle(buff []byte, fhIn *FileHandle, offset int64) (int64, error) {
// read data from source file
size := len(buff)
fhIn.lockForRead(offset, size)
defer fhIn.unlockForRead(offset, size)
n, tsNs, err := fhIn.readFromChunks(buff, offset)
if err == nil || err == io.EOF {
maxStop := fhIn.readFromDirtyPages(buff, offset, tsNs)
n = max(maxStop-offset, n)
}
if err == io.EOF {
err = nil
}
return n, err
}
func readDataByFileHandleWithContext(ctx context.Context, buff []byte, fhIn *FileHandle, offset int64) (int64, error) {
// read data from source file
size := len(buff)
fhIn.lockForRead(offset, size)
defer fhIn.unlockForRead(offset, size)
n, tsNs, err := fhIn.readFromChunksWithContext(ctx, buff, offset)
if err == nil || err == io.EOF {
maxStop := fhIn.readFromDirtyPages(buff, offset, tsNs)
n = max(maxStop-offset, n)
}
if err == io.EOF {
err = nil
}
return n, err
}