diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index fbc163442..8cd7d5bf9 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -243,10 +243,15 @@ const ( MetaOffsetPrefix = "Meta" ) -func (ma *MetaAggregator) readOffset(f *Filer, peer pb.ServerAddress, peerSignature int32) (lastTsNs int64, err error) { - +func GetPeerMetaOffsetKey(peerSignature int32) []byte { key := []byte(MetaOffsetPrefix + "xxxx") util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature)) + return key +} + +func (ma *MetaAggregator) readOffset(f *Filer, peer pb.ServerAddress, peerSignature int32) (lastTsNs int64, err error) { + + key := GetPeerMetaOffsetKey(peerSignature) value, err := f.Store.KvGet(context.Background(), key) @@ -263,8 +268,7 @@ func (ma *MetaAggregator) readOffset(f *Filer, peer pb.ServerAddress, peerSignat func (ma *MetaAggregator) updateOffset(f *Filer, peer pb.ServerAddress, peerSignature int32, lastTsNs int64) (err error) { - key := []byte(MetaOffsetPrefix + "xxxx") - util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature)) + key := GetPeerMetaOffsetKey(peerSignature) value := make([]byte, 8) util.Uint64toBytes(value, uint64(lastTsNs)) diff --git a/weed/shell/command_cluster_ps.go b/weed/shell/command_cluster_ps.go index 5c495b2e2..fc6725fad 100644 --- a/weed/shell/command_cluster_ps.go +++ b/weed/shell/command_cluster_ps.go @@ -5,10 +5,13 @@ import ( "flag" "fmt" "github.com/seaweedfs/seaweedfs/weed/cluster" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/util" "io" + "time" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" ) @@ -92,6 +95,7 @@ func (c *commandClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.W } } + filerSignatures := make(map[*master_pb.ListClusterNodesResponse_ClusterNode]int32) fmt.Fprintf(writer, "* filers %d\n", len(filerNodes)) for _, node := range filerNodes { fmt.Fprintf(writer, " * %s (%v)\n", node.Address, node.Version) @@ -108,12 +112,29 @@ func (c *commandClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.W fmt.Fprintf(writer, " filer group: %s\n", resp.FilerGroup) } fmt.Fprintf(writer, " signature: %d\n", resp.Signature) + filerSignatures[node] = resp.Signature } else { fmt.Fprintf(writer, " failed to connect: %v\n", err) } return err }) } + for _, node := range filerNodes { + pb.WithFilerClient(false, pb.ServerAddress(node.Address), commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + fmt.Fprintf(writer, "* filer %s metadata sync time\n", node.Address) + selfSignature := filerSignatures[node] + for peer, peerSignature := range filerSignatures { + if selfSignature == peerSignature { + continue + } + if resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: filer.GetPeerMetaOffsetKey(peerSignature)}); err == nil && len(resp.Value) == 8 { + lastTsNs := int64(util.BytesToUint64(resp.Value)) + fmt.Fprintf(writer, " %s: %v\n", peer.Address, time.Unix(0, lastTsNs).UTC()) + } + } + return nil + }) + } // collect volume servers var volumeServers []pb.ServerAddress