From d693e7741852db9f77b542d9e07a3a6620448a83 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 9 May 2020 00:43:53 -0700 Subject: [PATCH] add pub sub md5 --- weed/messaging/broker/broker_grpc_server.go | 2 +- weed/messaging/msgclient/pub_chan.go | 14 +++++++++++++- weed/messaging/msgclient/sub_chan.go | 17 +++++++++++++---- 3 files changed, 27 insertions(+), 6 deletions(-) diff --git a/weed/messaging/broker/broker_grpc_server.go b/weed/messaging/broker/broker_grpc_server.go index 305213622..6918a28a6 100644 --- a/weed/messaging/broker/broker_grpc_server.go +++ b/weed/messaging/broker/broker_grpc_server.go @@ -33,5 +33,5 @@ func genTopicDir(namespace, topic string) string { } func genTopicDirEntry(namespace, topic string) (dir, entry string) { - return fmt.Sprintf("%s/%s/%s", filer2.TopicsDir, namespace), topic + return fmt.Sprintf("%s/%s", filer2.TopicsDir, namespace), topic } diff --git a/weed/messaging/msgclient/pub_chan.go b/weed/messaging/msgclient/pub_chan.go index ccf301a6a..9bc88f7c0 100644 --- a/weed/messaging/msgclient/pub_chan.go +++ b/weed/messaging/msgclient/pub_chan.go @@ -1,6 +1,8 @@ package msgclient import ( + "crypto/md5" + "hash" "io" "log" @@ -13,6 +15,7 @@ import ( type PubChannel struct { client messaging_pb.SeaweedMessaging_PublishClient grpcConnection *grpc.ClientConn + md5hash hash.Hash } func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) { @@ -32,15 +35,20 @@ func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) { return &PubChannel{ client: pc, grpcConnection: grpcConnection, + md5hash: md5.New(), }, nil } func (pc *PubChannel) Publish(m []byte) error { - return pc.client.Send(&messaging_pb.PublishRequest{ + err := pc.client.Send(&messaging_pb.PublishRequest{ Data: &messaging_pb.Message{ Value: m, }, }) + if err == nil { + pc.md5hash.Write(m) + } + return err } func (pc *PubChannel) Close() error { @@ -62,3 +70,7 @@ func (pc *PubChannel) Close() error { } return nil } + +func (pc *PubChannel) Md5() []byte { + return pc.md5hash.Sum(nil) +} diff --git a/weed/messaging/msgclient/sub_chan.go b/weed/messaging/msgclient/sub_chan.go index edd4d1049..aae5c0c71 100644 --- a/weed/messaging/msgclient/sub_chan.go +++ b/weed/messaging/msgclient/sub_chan.go @@ -1,6 +1,8 @@ package msgclient import ( + "crypto/md5" + "hash" "io" "log" "time" @@ -10,8 +12,9 @@ import ( ) type SubChannel struct { - ch chan []byte - stream messaging_pb.SeaweedMessaging_SubscribeClient + ch chan []byte + stream messaging_pb.SeaweedMessaging_SubscribeClient + md5hash hash.Hash } func (mc *MessagingClient) NewSubChannel(chanName string) (*SubChannel, error) { @@ -30,8 +33,9 @@ func (mc *MessagingClient) NewSubChannel(chanName string) (*SubChannel, error) { } t := &SubChannel{ - ch: make(chan []byte), - stream: sc, + ch: make(chan []byte), + stream: sc, + md5hash: md5.New(), } go func() { @@ -51,6 +55,7 @@ func (mc *MessagingClient) NewSubChannel(chanName string) (*SubChannel, error) { close(t.ch) return } + t.md5hash.Write(resp.Data.Value) t.ch <- resp.Data.Value } }() @@ -61,3 +66,7 @@ func (mc *MessagingClient) NewSubChannel(chanName string) (*SubChannel, error) { func (sc *SubChannel) Channel() chan []byte { return sc.ch } + +func (sc *SubChannel) Md5() []byte { + return sc.md5hash.Sum(nil) +}