1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-09 21:02:46 +02:00
seaweedfs/weed/command/mount_std.go
chrislu f02c4f816b Production Integration: ML-aware FUSE mount optimizations
OPTION A COMPLETE: Full production integration of ML optimization system

## Major Integration Components:

### 1. Command Line Interface
- Add ML optimization flags to 'weed mount' command:
  * -ml.enabled: Enable/disable ML optimizations
  * -ml.prefetchWorkers: Configure concurrent prefetch workers (default: 8)
  * -ml.confidenceThreshold: Set ML confidence threshold (default: 0.6)
  * -ml.maxPrefetchAhead: Max chunks to prefetch ahead (default: 8)
  * -ml.batchSize: Batch size for prefetch operations (default: 3)
- Updated command help text with ML Optimization section and usage examples
- Complete flag parsing and validation pipeline

### 2. Core WFS Integration
- Add MLIntegrationManager to WFS struct with proper lifecycle management
- Initialize ML optimization based on mount flags with custom configuration
- Integrate ML system shutdown with graceful cleanup on mount termination
- Memory-safe initialization with proper error handling

### 3. FUSE Operation Hooks
- **File Open (wfs.Open)**: Apply ML-specific optimizations (FOPEN_KEEP_CACHE, direct I/O)
- **File Read (wfs.Read)**: Record access patterns for ML prefetch decision making
- **File Close (wfs.Release)**: Update ML file tracking and cleanup resources
- **Get Attributes (wfs.GetAttr)**: Apply ML-aware attribute cache timeouts
- All hooks properly guarded with nil checks and enabled status validation

### 4. Configuration Management
- Mount options propagated through Option struct to ML system
- NewMLIntegrationManagerWithConfig for runtime configuration
- Default fallbacks and validation for all ML parameters
- Seamless integration with existing mount option processing

## Production Features:

 **Zero-Impact Design**: ML optimizations only activate when explicitly enabled
 **Backward Compatibility**: All existing mount functionality preserved
 **Resource Management**: Proper initialization, shutdown, and cleanup
 **Error Handling**: Graceful degradation if ML components fail
 **Performance Monitoring**: Integration points for metrics and debugging
 **Configuration Flexibility**: Runtime tunable parameters via mount flags

## Testing Verification:
-  Successful compilation of entire codebase
-  Mount command properly shows ML flags in help text
-  Flag parsing and validation working correctly
-  ML optimization system initializes when enabled
-  FUSE operations integrate ML hooks without breaking existing functionality

## Usage Examples:

Basic ML optimization:
backers.md
bin
build
cmd
CODE_OF_CONDUCT.md
DESIGN.md
docker
examples
filerldb2
go.mod
go.sum
k8s
LICENSE
Makefile
ML_OPTIMIZATION_PLAN.md
note
other
random
README.md
s3tests_boto3
scripts
seaweedfs-rdma-sidecar
snap
SSE-C_IMPLEMENTATION.md
telemetry
test
test-volume-data
unmaintained
util
venv
weed
chrislu          console      Aug 27 13:07
chrislu          ttys004      Aug 27 13:11
chrislu          ttys012      Aug 28 14:00
Filesystem     512-blocks       Used Available Capacity  iused      ifree %iused  Mounted on
/dev/disk3s1s1 1942700360   22000776 332038696     7%   425955 1660193480    0%   /
devfs                 494        494         0   100%      856          0  100%   /dev
/dev/disk3s6   1942700360    6291632 332038696     2%        3 1660193480    0%   /System/Volumes/VM
/dev/disk3s2   1942700360   13899920 332038696     5%     1270 1660193480    0%   /System/Volumes/Preboot
/dev/disk3s4   1942700360       4440 332038696     1%       54 1660193480    0%   /System/Volumes/Update
/dev/disk1s2      1024000      12328    983744     2%        1    4918720    0%   /System/Volumes/xarts
/dev/disk1s1      1024000      11064    983744     2%       32    4918720    0%   /System/Volumes/iSCPreboot
/dev/disk1s3      1024000       7144    983744     1%       92    4918720    0%   /System/Volumes/Hardware
/dev/disk3s5   1942700360 1566013608 332038696    83% 11900819 1660193480    1%   /System/Volumes/Data
map auto_home           0          0         0   100%        0          0     -   /System/Volumes/Data/home
Filesystem     512-blocks       Used Available Capacity  iused      ifree %iused  Mounted on
/dev/disk3s1s1 1942700360   22000776 332038696     7%   425955 1660193480    0%   /
devfs                 494        494         0   100%      856          0  100%   /dev
/dev/disk3s6   1942700360    6291632 332038696     2%        3 1660193480    0%   /System/Volumes/VM
/dev/disk3s2   1942700360   13899920 332038696     5%     1270 1660193480    0%   /System/Volumes/Preboot
/dev/disk3s4   1942700360       4440 332038696     1%       54 1660193480    0%   /System/Volumes/Update
/dev/disk1s2      1024000      12328    983744     2%        1    4918720    0%   /System/Volumes/xarts
/dev/disk1s1      1024000      11064    983744     2%       32    4918720    0%   /System/Volumes/iSCPreboot
/dev/disk1s3      1024000       7144    983744     1%       92    4918720    0%   /System/Volumes/Hardware
/dev/disk3s5   1942700360 1566013608 332038696    83% 11900819 1660193480    1%   /System/Volumes/Data
map auto_home           0          0         0   100%        0          0     -   /System/Volumes/Data/home
/Users/chrislu/go/src/github.com/seaweedfs/seaweedfs
HQ-KT6TWPKFQD
/Users/chrislu/go/src/github.com/seaweedfs/seaweedfs

Custom ML configuration:
backers.md
bin
build
cmd
CODE_OF_CONDUCT.md
DESIGN.md
docker
examples
filerldb2
go.mod
go.sum
k8s
LICENSE
Makefile
ML_OPTIMIZATION_PLAN.md
note
other
random
README.md
s3tests_boto3
scripts
seaweedfs-rdma-sidecar
snap
SSE-C_IMPLEMENTATION.md
telemetry
test
test-volume-data
unmaintained
util
venv
weed
/Users/chrislu/go/src/github.com/seaweedfs/seaweedfs

## Architecture Impact:
- Clean separation between core FUSE and ML optimization layers
- Modular design allows easy extension and maintenance
- Production-ready with comprehensive error handling and resource management
- Foundation established for advanced ML features (Phase 4)

This completes Option A: Production Integration, providing a fully functional ML-aware FUSE mount system ready for real-world ML workloads.
2025-08-30 16:06:25 -07:00

315 lines
9.9 KiB
Go

//go:build linux || darwin || freebsd
// +build linux darwin freebsd
package command
import (
"context"
"fmt"
"net"
"net/http"
"os"
"os/user"
"runtime"
"strconv"
"strings"
"syscall"
"time"
"github.com/seaweedfs/seaweedfs/weed/util/version"
"github.com/hanwen/go-fuse/v2/fuse"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mount"
"github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
"github.com/seaweedfs/seaweedfs/weed/mount/unmount"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mount_pb"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"google.golang.org/grpc/reflection"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/grace"
)
func runMount(cmd *Command, args []string) bool {
if *mountOptions.debug {
go http.ListenAndServe(fmt.Sprintf(":%d", *mountOptions.debugPort), nil)
}
grace.SetupProfiling(*mountCpuProfile, *mountMemProfile)
if *mountReadRetryTime < time.Second {
*mountReadRetryTime = time.Second
}
util.RetryWaitTime = *mountReadRetryTime
umask, umaskErr := strconv.ParseUint(*mountOptions.umaskString, 8, 64)
if umaskErr != nil {
fmt.Printf("can not parse umask %s", *mountOptions.umaskString)
return false
}
if len(args) > 0 {
return false
}
return RunMount(&mountOptions, os.FileMode(umask))
}
func RunMount(option *MountOptions, umask os.FileMode) bool {
// basic checks
chunkSizeLimitMB := *mountOptions.chunkSizeLimitMB
if chunkSizeLimitMB <= 0 {
fmt.Printf("Please specify a reasonable buffer size.\n")
return false
}
// try to connect to filer
filerAddresses := pb.ServerAddresses(*option.filer).ToAddresses()
util.LoadSecurityConfiguration()
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
var cipher bool
var err error
for i := 0; i < 10; i++ {
err = pb.WithOneOfGrpcFilerClients(false, filerAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer grpc address %v configuration: %w", filerAddresses, err)
}
cipher = resp.Cipher
return nil
})
if err != nil {
glog.V(0).Infof("failed to talk to filer %v: %v", filerAddresses, err)
glog.V(0).Infof("wait for %d seconds ...", i+1)
time.Sleep(time.Duration(i+1) * time.Second)
}
}
if err != nil {
glog.Errorf("failed to talk to filer %v: %v", filerAddresses, err)
return true
}
filerMountRootPath := *option.filerMountRootPath
// clean up mount point
dir := util.ResolvePath(*option.dir)
if dir == "" {
fmt.Printf("Please specify the mount directory via \"-dir\"")
return false
}
unmount.Unmount(dir)
// start on local unix socket
if *option.localSocket == "" {
mountDirHash := util.HashToInt32([]byte(dir))
if mountDirHash < 0 {
mountDirHash = -mountDirHash
}
*option.localSocket = fmt.Sprintf("/tmp/seaweedfs-mount-%d.sock", mountDirHash)
}
if err := os.Remove(*option.localSocket); err != nil && !os.IsNotExist(err) {
glog.Fatalf("Failed to remove %s, error: %s", *option.localSocket, err.Error())
}
montSocketListener, err := net.Listen("unix", *option.localSocket)
if err != nil {
glog.Fatalf("Failed to listen on %s: %v", *option.localSocket, err)
}
// detect mount folder mode
if *option.dirAutoCreate {
os.MkdirAll(dir, os.FileMode(0777)&^umask)
}
fileInfo, err := os.Stat(dir)
// collect uid, gid
uid, gid := uint32(0), uint32(0)
mountMode := os.ModeDir | 0777
if err == nil {
mountMode = os.ModeDir | os.FileMode(0777)&^umask
uid, gid = util.GetFileUidGid(fileInfo)
fmt.Printf("mount point owner uid=%d gid=%d mode=%s\n", uid, gid, mountMode)
} else {
fmt.Printf("can not stat %s\n", dir)
return false
}
// detect uid, gid
if uid == 0 {
if u, err := user.Current(); err == nil {
if parsedId, pe := strconv.ParseUint(u.Uid, 10, 32); pe == nil {
uid = uint32(parsedId)
}
if parsedId, pe := strconv.ParseUint(u.Gid, 10, 32); pe == nil {
gid = uint32(parsedId)
}
fmt.Printf("current uid=%d gid=%d\n", uid, gid)
}
}
// mapping uid, gid
uidGidMapper, err := meta_cache.NewUidGidMapper(*option.uidMap, *option.gidMap)
if err != nil {
fmt.Printf("failed to parse %s %s: %v\n", *option.uidMap, *option.gidMap, err)
return false
}
// Ensure target mount point availability
if isValid := checkMountPointAvailable(dir); !isValid {
glog.Fatalf("Target mount point is not available: %s, please check!", dir)
return true
}
serverFriendlyName := strings.ReplaceAll(*option.filer, ",", "+")
// mount fuse
fuseMountOptions := &fuse.MountOptions{
AllowOther: *option.allowOthers,
Options: option.extraOptions,
MaxBackground: 128,
MaxWrite: 1024 * 1024 * 2,
MaxReadAhead: 1024 * 1024 * 2,
IgnoreSecurityLabels: false,
RememberInodes: false,
FsName: serverFriendlyName + ":" + filerMountRootPath,
Name: "seaweedfs",
SingleThreaded: false,
DisableXAttrs: *option.disableXAttr,
Debug: *option.debug,
EnableLocks: false,
ExplicitDataCacheControl: false,
DirectMount: true,
DirectMountFlags: 0,
//SyncRead: false, // set to false to enable the FUSE_CAP_ASYNC_READ capability
EnableAcl: true,
}
if *option.nonempty {
fuseMountOptions.Options = append(fuseMountOptions.Options, "nonempty")
}
if *option.readOnly {
if runtime.GOOS == "darwin" {
fuseMountOptions.Options = append(fuseMountOptions.Options, "rdonly")
} else {
fuseMountOptions.Options = append(fuseMountOptions.Options, "ro")
}
}
if runtime.GOOS == "darwin" {
// https://github-wiki-see.page/m/macfuse/macfuse/wiki/Mount-Options
ioSizeMB := 1
for ioSizeMB*2 <= *option.chunkSizeLimitMB && ioSizeMB*2 <= 32 {
ioSizeMB *= 2
}
fuseMountOptions.Options = append(fuseMountOptions.Options, "daemon_timeout=600")
if runtime.GOARCH == "amd64" {
fuseMountOptions.Options = append(fuseMountOptions.Options, "noapplexattr")
}
// fuseMountOptions.Options = append(fuseMountOptions.Options, "novncache") // need to test effectiveness
fuseMountOptions.Options = append(fuseMountOptions.Options, "slow_statfs")
fuseMountOptions.Options = append(fuseMountOptions.Options, "volname="+serverFriendlyName)
fuseMountOptions.Options = append(fuseMountOptions.Options, fmt.Sprintf("iosize=%d", ioSizeMB*1024*1024))
}
// find mount point
mountRoot := filerMountRootPath
if mountRoot != "/" && strings.HasSuffix(mountRoot, "/") {
mountRoot = mountRoot[0 : len(mountRoot)-1]
}
cacheDirForWrite := *option.cacheDirForWrite
if cacheDirForWrite == "" {
cacheDirForWrite = *option.cacheDirForRead
}
seaweedFileSystem := mount.NewSeaweedFileSystem(&mount.Option{
MountDirectory: dir,
FilerAddresses: filerAddresses,
GrpcDialOption: grpcDialOption,
FilerMountRootPath: mountRoot,
Collection: *option.collection,
Replication: *option.replication,
TtlSec: int32(*option.ttlSec),
DiskType: types.ToDiskType(*option.diskType),
ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024,
ConcurrentWriters: *option.concurrentWriters,
CacheDirForRead: *option.cacheDirForRead,
CacheSizeMBForRead: *option.cacheSizeMBForRead,
CacheDirForWrite: cacheDirForWrite,
CacheMetaTTlSec: *option.cacheMetaTtlSec,
DataCenter: *option.dataCenter,
Quota: int64(*option.collectionQuota) * 1024 * 1024,
MountUid: uid,
MountGid: gid,
MountMode: mountMode,
MountCtime: fileInfo.ModTime(),
MountMtime: time.Now(),
Umask: umask,
VolumeServerAccess: *mountOptions.volumeServerAccess,
Cipher: cipher,
UidGidMapper: uidGidMapper,
DisableXAttr: *option.disableXAttr,
IsMacOs: runtime.GOOS == "darwin",
// RDMA acceleration options
RdmaEnabled: *option.rdmaEnabled,
RdmaSidecarAddr: *option.rdmaSidecarAddr,
RdmaFallback: *option.rdmaFallback,
RdmaReadOnly: *option.rdmaReadOnly,
RdmaMaxConcurrent: *option.rdmaMaxConcurrent,
RdmaTimeoutMs: *option.rdmaTimeoutMs,
// ML optimization options
MLOptimizationEnabled: *option.mlOptimizationEnabled,
MLPrefetchWorkers: *option.mlPrefetchWorkers,
MLConfidenceThreshold: *option.mlConfidenceThreshold,
MLMaxPrefetchAhead: *option.mlMaxPrefetchAhead,
MLBatchSize: *option.mlBatchSize,
})
// create mount root
mountRootPath := util.FullPath(mountRoot)
mountRootParent, mountDir := mountRootPath.DirAndName()
if err = filer_pb.Mkdir(context.Background(), seaweedFileSystem, mountRootParent, mountDir, nil); err != nil {
fmt.Printf("failed to create dir %s on filer %s: %v\n", mountRoot, filerAddresses, err)
return false
}
server, err := fuse.NewServer(seaweedFileSystem, dir, fuseMountOptions)
if err != nil {
glog.Fatalf("Mount fail: %v", err)
}
grace.OnInterrupt(func() {
unmount.Unmount(dir)
})
if mountOptions.fuseCommandPid != 0 {
// send a signal to the parent process to notify that the mount is ready
err = syscall.Kill(mountOptions.fuseCommandPid, syscall.SIGTERM)
if err != nil {
fmt.Printf("failed to notify parent process: %v\n", err)
return false
}
}
grpcS := pb.NewGrpcServer()
mount_pb.RegisterSeaweedMountServer(grpcS, seaweedFileSystem)
reflection.Register(grpcS)
go grpcS.Serve(montSocketListener)
err = seaweedFileSystem.StartBackgroundTasks()
if err != nil {
fmt.Printf("failed to start background tasks: %v\n", err)
return false
}
glog.V(0).Infof("mounted %s%s to %v", *option.filer, mountRoot, dir)
glog.V(0).Infof("This is SeaweedFS version %s %s %s", version.Version(), runtime.GOOS, runtime.GOARCH)
server.Serve()
seaweedFileSystem.ClearCacheDir()
return true
}