1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2024-05-07 03:50:11 +02:00

refactor client subscribe metadata

This commit is contained in:
Chris Lu 2021-08-04 16:25:46 -07:00
parent b9ecf1e3a8
commit 6b743dbbf9
11 changed files with 175 additions and 314 deletions

View file

@ -1,14 +1,12 @@
package main
import (
"context"
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"google.golang.org/grpc"
"io"
"strconv"
"time"
)
@ -74,38 +72,9 @@ func startGenerateMetadata() {
func startSubscribeMetadata(eachEntryFunc func(event *filer_pb.SubscribeMetadataResponse) error) {
lastTsNs := int64(0)
tailErr := pb.FollowMetadata(*tailFiler, grpc.WithInsecure(), "tail",
*dir, 0, 0, eachEntryFunc, false)
tailErr := pb.WithFilerClient(*tailFiler, grpc.WithInsecure(), func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
ClientName: "tail",
PathPrefix: *dir,
SinceNs: lastTsNs,
})
if err != nil {
return fmt.Errorf("listen: %v", err)
}
for {
resp, listenErr := stream.Recv()
if listenErr == io.EOF {
return nil
}
if listenErr != nil {
return listenErr
}
if err = eachEntryFunc(resp); err != nil {
glog.V(0).Infof("tail last record:%+v", time.Unix(0, lastTsNs))
return err
}
lastTsNs = resp.TsNs
}
})
if tailErr != nil {
fmt.Printf("tail %s: %v\n", *tailFiler, tailErr)
}

View file

@ -1,16 +1,13 @@
package command
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc"
"io"
"time"
)
@ -110,48 +107,12 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
processEventFn := genProcessFunction(sourcePath, targetPath, dataSink, debug)
return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
ClientName: "backup_" + dataSink.GetName(),
PathPrefix: sourcePath,
SinceNs: startFrom.UnixNano(),
})
if err != nil {
return fmt.Errorf("listen: %v", err)
}
var counter int64
var lastWriteTime time.Time
for {
resp, listenErr := stream.Recv()
if listenErr == io.EOF {
return nil
}
if listenErr != nil {
return listenErr
}
if err := processEventFn(resp); err != nil {
return fmt.Errorf("processEventFn: %v", err)
}
counter++
if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
counter = 0
lastWriteTime = time.Now()
if err := setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), resp.TsNs); err != nil {
return fmt.Errorf("setOffset: %v", err)
}
}
}
processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3 * time.Second, func(counter int64, lastTsNs int64) error {
glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3))
return setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), lastTsNs)
})
return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_" + dataSink.GetName(),
sourcePath, startFrom.UnixNano(), 0, processEventFnWithOffset, false)
}

View file

@ -7,7 +7,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/spf13/viper"
"google.golang.org/grpc"
"io"
"reflect"
"time"
@ -190,48 +189,15 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error {
return nil
}
tailErr := pb.WithFilerClient(*metaBackup.filerAddress, metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
ClientName: "meta_backup",
PathPrefix: *metaBackup.filerDirectory,
SinceNs: startTime.UnixNano(),
})
if err != nil {
return fmt.Errorf("listen: %v", err)
}
var counter int64
var lastWriteTime time.Time
for {
resp, listenErr := stream.Recv()
if listenErr == io.EOF {
return nil
}
if listenErr != nil {
return listenErr
}
if err = eachEntryFunc(resp); err != nil {
return err
}
counter++
if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
glog.V(0).Infof("meta backup %s progressed to %v %0.2f/sec", *metaBackup.filerAddress, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
counter = 0
lastWriteTime = time.Now()
if err2 := metaBackup.setOffset(lastWriteTime); err2 != nil {
return err2
}
}
}
processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3 * time.Second, func(counter int64, lastTsNs int64) error {
lastTime := time.Unix(0, lastTsNs)
glog.V(0).Infof("meta backup %s progressed to %v %0.2f/sec", *metaBackup.filerAddress, lastTime, float64(counter)/float64(3))
return metaBackup.setOffset(lastTime)
})
return tailErr
return pb.FollowMetadata(*metaBackup.filerAddress, metaBackup.grpcDialOption, "meta_backup",
*metaBackup.filerDirectory, startTime.UnixNano(), 0, processEventFnWithOffset, false)
}
func (metaBackup *FilerMetaBackupOptions) getOffset() (lastWriteTime time.Time, err error) {

View file

@ -3,16 +3,15 @@ package command
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/golang/protobuf/jsonpb"
jsoniter "github.com/json-iterator/go"
"github.com/olivere/elastic/v7"
"io"
"os"
"path/filepath"
"strings"
"time"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
@ -104,37 +103,18 @@ func runFilerMetaTail(cmd *Command, args []string) bool {
}
}
tailErr := pb.WithFilerClient(*tailFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
ClientName: "tail",
PathPrefix: *tailTarget,
SinceNs: time.Now().Add(-*tailStart).UnixNano(),
})
if err != nil {
return fmt.Errorf("listen: %v", err)
}
for {
resp, listenErr := stream.Recv()
if listenErr == io.EOF {
tailErr := pb.FollowMetadata(*tailFiler, grpcDialOption, "tail",
*tailTarget, time.Now().Add(-*tailStart).UnixNano(), 0,
func(resp *filer_pb.SubscribeMetadataResponse) error {
if !shouldPrint(resp) {
return nil
}
if listenErr != nil {
return listenErr
}
if !shouldPrint(resp) {
continue
}
if err = eachEntryFunc(resp); err != nil {
if err := eachEntryFunc(resp); err != nil {
return err
}
}
return nil
}, false)
})
if tailErr != nil {
fmt.Printf("tail %s: %v\n", *tailFiler, tailErr)
}

View file

@ -15,7 +15,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/grace"
"google.golang.org/grpc"
"io"
"strings"
"time"
)
@ -166,50 +165,14 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
return persistEventFn(resp)
}
return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
ClientName: "syncTo_" + targetFiler,
PathPrefix: sourcePath,
SinceNs: sourceFilerOffsetTsNs,
Signature: targetFilerSignature,
})
if err != nil {
return fmt.Errorf("listen: %v", err)
}
var counter int64
var lastWriteTime time.Time
for {
resp, listenErr := stream.Recv()
if listenErr == io.EOF {
return nil
}
if listenErr != nil {
return listenErr
}
if err := processEventFn(resp); err != nil {
return err
}
counter++
if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
counter = 0
lastWriteTime = time.Now()
if err := setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, resp.TsNs); err != nil {
return err
}
}
}
processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3 * time.Second, func(counter int64, lastTsNs int64) error {
glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3))
return setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, lastTsNs)
})
return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_" + targetFiler,
sourcePath, sourceFilerOffsetTsNs, targetFilerSignature, processEventFnWithOffset, false)
}
const (

View file

@ -3,10 +3,12 @@ package filer
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/remote_storage"
_ "github.com/chrislusf/seaweedfs/weed/remote_storage/s3"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
"math"
"strings"
@ -141,4 +143,41 @@ func AddRemoteStorageMapping(oldContent []byte, dir string, storageLocation *fil
}
return
}
}
func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress string) (mappings *filer_pb.RemoteStorageMapping, readErr error) {
var oldContent []byte
if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE)
return readErr
}); readErr != nil {
return nil, readErr
}
mappings, readErr = UnmarshalRemoteStorageMappings(oldContent)
if readErr != nil {
return nil, fmt.Errorf("unmarshal mappings: %v", readErr)
}
return
}
func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress string, storageName string) (conf *filer_pb.RemoteConf, readErr error) {
var oldContent []byte
if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX)
return readErr
}); readErr != nil {
return nil, readErr
}
// unmarshal storage configuration
conf = &filer_pb.RemoteConf{}
if unMarshalErr := proto.Unmarshal(oldContent, conf); unMarshalErr != nil {
readErr = fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr)
return
}
return
}

View file

@ -2,12 +2,9 @@ package meta_cache
import (
"context"
"fmt"
"io"
"time"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
@ -62,38 +59,8 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil
}
for {
err := client.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
ClientName: "mount",
PathPrefix: dir,
SinceNs: lastTsNs,
Signature: selfSignature,
})
if err != nil {
return fmt.Errorf("subscribe: %v", err)
}
return util.Retry("followMetaUpdates", func() error {
return pb.WithFilerClientFollowMetadata(client, "mount", dir, lastTsNs, selfSignature, processEventFn, true)
})
for {
resp, listenErr := stream.Recv()
if listenErr == io.EOF {
return nil
}
if listenErr != nil {
return listenErr
}
if err := processEventFn(resp); err != nil {
glog.Fatalf("process %v: %v", resp, err)
}
lastTsNs = resp.TsNs
}
})
if err != nil {
glog.Errorf("subscribing filer meta change: %v", err)
}
time.Sleep(time.Second)
}
}

94
weed/pb/filer_pb_tail.go Normal file
View file

@ -0,0 +1,94 @@
package pb
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"google.golang.org/grpc"
"io"
"time"
)
type ProcessMetadataFunc func(resp *filer_pb.SubscribeMetadataResponse) error
func FollowMetadata(filerAddress string, grpcDialOption grpc.DialOption,
clientName string, pathPrefix string, lastTsNs int64, selfSignature int32,
processEventFn ProcessMetadataFunc, fatalOnError bool) error {
err := WithFilerClient(filerAddress, grpcDialOption, makeFunc(
clientName, pathPrefix, lastTsNs, selfSignature, processEventFn, fatalOnError))
if err != nil {
return fmt.Errorf("subscribing filer meta change: %v", err)
}
return err
}
func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient,
clientName string, pathPrefix string, lastTsNs int64, selfSignature int32,
processEventFn ProcessMetadataFunc, fatalOnError bool) error {
err := filerClient.WithFilerClient(makeFunc(
clientName, pathPrefix, lastTsNs, selfSignature, processEventFn, fatalOnError))
if err != nil {
return fmt.Errorf("subscribing filer meta change: %v", err)
}
return nil
}
func makeFunc(clientName string, pathPrefix string, lastTsNs int64, selfSignature int32,
processEventFn ProcessMetadataFunc, fatalOnError bool) func(client filer_pb.SeaweedFilerClient) error {
return func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
ClientName: clientName,
PathPrefix: pathPrefix,
SinceNs: lastTsNs,
Signature: selfSignature,
})
if err != nil {
return fmt.Errorf("subscribe: %v", err)
}
for {
resp, listenErr := stream.Recv()
if listenErr == io.EOF {
return nil
}
if listenErr != nil {
return listenErr
}
if err := processEventFn(resp); err != nil {
if fatalOnError {
glog.Fatalf("process %v: %v", resp, err)
} else {
glog.Errorf("process %v: %v", resp, err)
}
}
lastTsNs = resp.TsNs
}
}
}
func AddOffsetFunc(processEventFn ProcessMetadataFunc, offsetInterval time.Duration, offsetFunc func(counter int64, offset int64) error) ProcessMetadataFunc {
var counter int64
var lastWriteTime time.Time
return func(resp *filer_pb.SubscribeMetadataResponse) error {
if err := processEventFn(resp); err != nil {
return err
}
counter++
if lastWriteTime.Add(offsetInterval).Before(time.Now()) {
counter = 0
lastWriteTime = time.Now()
if err := offsetFunc(counter, resp.TsNs); err != nil {
return err
}
}
return nil
}
}

View file

@ -1,46 +0,0 @@
package remote_storage
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
)
func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress string) (mappings *filer_pb.RemoteStorageMapping, readErr error) {
var oldContent []byte
if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
oldContent, readErr = filer.ReadInsideFiler(client, filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE)
return readErr
}); readErr != nil {
return nil, readErr
}
mappings, readErr = filer.UnmarshalRemoteStorageMappings(oldContent)
if readErr != nil {
return nil, fmt.Errorf("unmarshal mappings: %v", readErr)
}
return
}
func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress string, storageName string) (conf *filer_pb.RemoteConf, readErr error) {
var oldContent []byte
if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
oldContent, readErr = filer.ReadInsideFiler(client, filer.DirectoryEtcRemote, storageName+filer.REMOTE_STORAGE_CONF_SUFFIX)
return readErr
}); readErr != nil {
return nil, readErr
}
// unmarshal storage configuration
conf = &filer_pb.RemoteConf{}
if unMarshalErr := proto.Unmarshal(oldContent, conf); unMarshalErr != nil {
readErr = fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, storageName+filer.REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr)
return
}
return
}

View file

@ -1,13 +1,11 @@
package s3api
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"io"
"time"
"github.com/chrislusf/seaweedfs/weed/util"
)
func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, prefix string, lastTsNs int64) error {
@ -34,37 +32,8 @@ func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, prefix string, la
return nil
}
for {
err := s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
ClientName: clientName,
PathPrefix: prefix,
SinceNs: lastTsNs,
})
if err != nil {
return fmt.Errorf("subscribe: %v", err)
}
return util.Retry("followIamChanges", func() error {
return pb.WithFilerClientFollowMetadata(s3a, clientName, prefix, lastTsNs, 0, processEventFn, true)
})
for {
resp, listenErr := stream.Recv()
if listenErr == io.EOF {
return nil
}
if listenErr != nil {
return listenErr
}
if err := processEventFn(resp); err != nil {
glog.Fatalf("process %v: %v", resp, err)
}
lastTsNs = resp.TsNs
}
})
if err != nil {
glog.Errorf("subscribing filer meta change: %v", err)
}
time.Sleep(time.Second)
}
}

View file

@ -9,7 +9,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/remote_storage"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"io"
)
@ -79,7 +78,7 @@ func (c *commandRemoteMount) Do(args []string, commandEnv *CommandEnv, writer io
func (c *commandRemoteMount) listExistingRemoteStorageMounts(commandEnv *CommandEnv, writer io.Writer) (err error) {
// read current mapping
mappings, readErr := remote_storage.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress)
mappings, readErr := filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress)
if readErr != nil {
return readErr
}
@ -95,7 +94,7 @@ func (c *commandRemoteMount) listExistingRemoteStorageMounts(commandEnv *Command
func (c *commandRemoteMount) findRemoteStorageConfiguration(commandEnv *CommandEnv, writer io.Writer, remote *filer_pb.RemoteStorageLocation) (conf *filer_pb.RemoteConf, err error) {
return remote_storage.ReadRemoteStorageConf(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, remote.Name)
return filer.ReadRemoteStorageConf(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, remote.Name)
}