1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2024-06-01 08:12:02 +02:00
seaweedfs/weed/filer/filer.go
chrislu 9f9ef1340c use streaming mode for long poll grpc calls
streaming mode would create separate grpc connections for each call.
this is to ensure the long poll connections are properly closed.
2021-12-26 00:15:03 -08:00

328 lines
9.7 KiB
Go

package filer
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/cluster"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"os"
"strings"
"time"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/log_buffer"
"github.com/chrislusf/seaweedfs/weed/wdclient"
)
const (
LogFlushInterval = time.Minute
PaginationSize = 1024
FilerStoreId = "filer.store.id"
)
var (
OS_UID = uint32(os.Getuid())
OS_GID = uint32(os.Getgid())
)
type Filer struct {
Store VirtualFilerStore
MasterClient *wdclient.MasterClient
fileIdDeletionQueue *util.UnboundedQueue
GrpcDialOption grpc.DialOption
DirBucketsPath string
FsyncBuckets []string
buckets *FilerBuckets
Cipher bool
LocalMetaLogBuffer *log_buffer.LogBuffer
metaLogCollection string
metaLogReplication string
MetaAggregator *MetaAggregator
Signature int32
FilerConf *FilerConf
RemoteStorage *FilerRemoteStorage
UniqueFileId uint32
}
func NewFiler(masters []pb.ServerAddress, grpcDialOption grpc.DialOption,
filerHost pb.ServerAddress, collection string, replication string, dataCenter string, notifyFn func()) *Filer {
f := &Filer{
MasterClient: wdclient.NewMasterClient(grpcDialOption, cluster.FilerType, filerHost, dataCenter, masters),
fileIdDeletionQueue: util.NewUnboundedQueue(),
GrpcDialOption: grpcDialOption,
FilerConf: NewFilerConf(),
RemoteStorage: NewFilerRemoteStorage(),
UniqueFileId: uint32(util.RandomInt32()),
}
f.LocalMetaLogBuffer = log_buffer.NewLogBuffer("local", LogFlushInterval, f.logFlushFunc, notifyFn)
f.metaLogCollection = collection
f.metaLogReplication = replication
go f.loopProcessingDeletion()
return f
}
func (f *Filer) AggregateFromPeers(self pb.ServerAddress) {
f.MetaAggregator = NewMetaAggregator(f, self, f.GrpcDialOption)
f.MasterClient.OnPeerUpdate = f.MetaAggregator.OnPeerUpdate
for _, peerUpdate := range f.ListExistingPeerUpdates() {
f.MetaAggregator.OnPeerUpdate(peerUpdate)
}
}
func (f *Filer) ListExistingPeerUpdates() (existingNodes []*master_pb.ClusterNodeUpdate) {
if grpcErr := pb.WithMasterClient(false, f.MasterClient.GetMaster(), f.GrpcDialOption, func(client master_pb.SeaweedClient) error {
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
ClientType: cluster.FilerType,
})
glog.V(0).Infof("the cluster has %d filers\n", len(resp.ClusterNodes))
for _, node := range resp.ClusterNodes {
existingNodes = append(existingNodes, &master_pb.ClusterNodeUpdate{
NodeType: cluster.FilerType,
Address: node.Address,
IsLeader: node.IsLeader,
IsAdd: true,
})
}
return err
}); grpcErr != nil {
glog.V(0).Infof("connect to %s: %v", f.MasterClient.GetMaster(), grpcErr)
}
return
}
func (f *Filer) SetStore(store FilerStore) {
f.Store = NewFilerStoreWrapper(store)
f.setOrLoadFilerStoreSignature(store)
}
func (f *Filer) setOrLoadFilerStoreSignature(store FilerStore) {
storeIdBytes, err := store.KvGet(context.Background(), []byte(FilerStoreId))
if err == ErrKvNotFound || err == nil && len(storeIdBytes) == 0 {
f.Signature = util.RandomInt32()
storeIdBytes = make([]byte, 4)
util.Uint32toBytes(storeIdBytes, uint32(f.Signature))
if err = store.KvPut(context.Background(), []byte(FilerStoreId), storeIdBytes); err != nil {
glog.Fatalf("set %s=%d : %v", FilerStoreId, f.Signature, err)
}
glog.V(0).Infof("create %s to %d", FilerStoreId, f.Signature)
} else if err == nil && len(storeIdBytes) == 4 {
f.Signature = int32(util.BytesToUint32(storeIdBytes))
glog.V(0).Infof("existing %s = %d", FilerStoreId, f.Signature)
} else {
glog.Fatalf("read %v=%v : %v", FilerStoreId, string(storeIdBytes), err)
}
}
func (f *Filer) GetStore() (store FilerStore) {
return f.Store
}
func (fs *Filer) GetMaster() pb.ServerAddress {
return fs.MasterClient.GetMaster()
}
func (fs *Filer) KeepMasterClientConnected() {
fs.MasterClient.KeepConnectedToMaster()
}
func (f *Filer) BeginTransaction(ctx context.Context) (context.Context, error) {
return f.Store.BeginTransaction(ctx)
}
func (f *Filer) CommitTransaction(ctx context.Context) error {
return f.Store.CommitTransaction(ctx)
}
func (f *Filer) RollbackTransaction(ctx context.Context) error {
return f.Store.RollbackTransaction(ctx)
}
func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool, signatures []int32) error {
if string(entry.FullPath) == "/" {
return nil
}
oldEntry, _ := f.FindEntry(ctx, entry.FullPath)
/*
if !hasWritePermission(lastDirectoryEntry, entry) {
glog.V(0).Infof("directory %s: %v, entry: uid=%d gid=%d",
lastDirectoryEntry.FullPath, lastDirectoryEntry.Attr, entry.Uid, entry.Gid)
return fmt.Errorf("no write permission in folder %v", lastDirectoryEntry.FullPath)
}
*/
if oldEntry == nil {
dirParts := strings.Split(string(entry.FullPath), "/")
if err := f.ensureParentDirecotryEntry(ctx, entry, dirParts, len(dirParts)-1, isFromOtherCluster); err != nil {
return err
}
glog.V(4).Infof("InsertEntry %s: new entry: %v", entry.FullPath, entry.Name())
if err := f.Store.InsertEntry(ctx, entry); err != nil {
glog.Errorf("insert entry %s: %v", entry.FullPath, err)
return fmt.Errorf("insert entry %s: %v", entry.FullPath, err)
}
} else {
if o_excl {
glog.V(3).Infof("EEXIST: entry %s already exists", entry.FullPath)
return fmt.Errorf("EEXIST: entry %s already exists", entry.FullPath)
}
glog.V(4).Infof("UpdateEntry %s: old entry: %v", entry.FullPath, oldEntry.Name())
if err := f.UpdateEntry(ctx, oldEntry, entry); err != nil {
glog.Errorf("update entry %s: %v", entry.FullPath, err)
return fmt.Errorf("update entry %s: %v", entry.FullPath, err)
}
}
f.maybeAddBucket(entry)
f.NotifyUpdateEvent(ctx, oldEntry, entry, true, isFromOtherCluster, signatures)
f.deleteChunksIfNotNew(oldEntry, entry)
glog.V(4).Infof("CreateEntry %s: created", entry.FullPath)
return nil
}
func (f *Filer) ensureParentDirecotryEntry(ctx context.Context, entry *Entry, dirParts []string, level int, isFromOtherCluster bool) (err error) {
if level == 0 {
return nil
}
dirPath := "/" + util.Join(dirParts[:level]...)
// fmt.Printf("%d directory: %+v\n", i, dirPath)
// check the store directly
glog.V(4).Infof("find uncached directory: %s", dirPath)
dirEntry, _ := f.FindEntry(ctx, util.FullPath(dirPath))
// no such existing directory
if dirEntry == nil {
// ensure parent directory
if err = f.ensureParentDirecotryEntry(ctx, entry, dirParts, level-1, isFromOtherCluster); err != nil {
return err
}
// create the directory
now := time.Now()
dirEntry = &Entry{
FullPath: util.FullPath(dirPath),
Attr: Attr{
Mtime: now,
Crtime: now,
Mode: os.ModeDir | entry.Mode | 0111,
Uid: entry.Uid,
Gid: entry.Gid,
Collection: entry.Collection,
Replication: entry.Replication,
UserName: entry.UserName,
GroupNames: entry.GroupNames,
},
}
glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode)
mkdirErr := f.Store.InsertEntry(ctx, dirEntry)
if mkdirErr != nil {
if _, err := f.FindEntry(ctx, util.FullPath(dirPath)); err == filer_pb.ErrNotFound {
glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr)
return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr)
}
} else {
f.maybeAddBucket(dirEntry)
f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster, nil)
}
} else if !dirEntry.IsDirectory() {
glog.Errorf("CreateEntry %s: %s should be a directory", entry.FullPath, dirPath)
return fmt.Errorf("%s is a file", dirPath)
}
return nil
}
func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err error) {
if oldEntry != nil {
entry.Attr.Crtime = oldEntry.Attr.Crtime
if oldEntry.IsDirectory() && !entry.IsDirectory() {
glog.Errorf("existing %s is a directory", oldEntry.FullPath)
return fmt.Errorf("existing %s is a directory", oldEntry.FullPath)
}
if !oldEntry.IsDirectory() && entry.IsDirectory() {
glog.Errorf("existing %s is a file", oldEntry.FullPath)
return fmt.Errorf("existing %s is a file", oldEntry.FullPath)
}
}
return f.Store.UpdateEntry(ctx, entry)
}
var (
Root = &Entry{
FullPath: "/",
Attr: Attr{
Mtime: time.Now(),
Crtime: time.Now(),
Mode: os.ModeDir | 0755,
Uid: OS_UID,
Gid: OS_GID,
},
}
)
func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, err error) {
if string(p) == "/" {
return Root, nil
}
entry, err = f.Store.FindEntry(ctx, p)
if entry != nil && entry.TtlSec > 0 {
if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
f.Store.DeleteOneEntry(ctx, entry)
return nil, filer_pb.ErrNotFound
}
}
return
}
func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (expiredCount int64, lastFileName string, err error) {
lastFileName, err = f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool {
if entry.TtlSec > 0 {
if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
f.Store.DeleteOneEntry(ctx, entry)
expiredCount++
return true
}
}
return eachEntryFunc(entry)
})
if err != nil {
return expiredCount, lastFileName, err
}
return
}
func (f *Filer) Shutdown() {
f.LocalMetaLogBuffer.Shutdown()
f.Store.Shutdown()
}