diff --git a/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go index fcd8e61aa..131345f9c 100644 --- a/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go +++ b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go @@ -27,6 +27,7 @@ import ( "google.golang.org/protobuf/proto" "net/url" "path" + "sync" "time" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -49,33 +50,44 @@ func getPath(rawUrl string) string { } type GoCDKPubSub struct { - topicURL string - topic *pubsub.Topic + topicURL string + topic *pubsub.Topic + topicLock sync.RWMutex } func (k *GoCDKPubSub) GetName() string { return "gocdk_pub_sub" } +func (k *GoCDKPubSub) setTopic(topic *pubsub.Topic) { + k.topicLock.Lock() + k.topic = topic + k.topicLock.Unlock() + k.doReconnect() +} + func (k *GoCDKPubSub) doReconnect() { var conn *amqp.Connection + k.topicLock.RLock() + defer k.topicLock.RUnlock() if k.topic.As(&conn) { - go func() { - <-conn.NotifyClose(make(chan *amqp.Error)) - conn.Close() + go func(c *amqp.Connection) { + <-c.NotifyClose(make(chan *amqp.Error)) + c.Close() + k.topicLock.RLock() k.topic.Shutdown(context.Background()) + k.topicLock.RUnlock() for { glog.Info("Try reconnect") conn, err := amqp.Dial(os.Getenv("RABBIT_SERVER_URL")) if err == nil { - k.topic = rabbitpubsub.OpenTopic(conn, getPath(k.topicURL), nil) - k.doReconnect() + k.setTopic(rabbitpubsub.OpenTopic(conn, getPath(k.topicURL), nil)) break } glog.Error(err) time.Sleep(time.Second) } - }() + }(conn) } } @@ -86,8 +98,7 @@ func (k *GoCDKPubSub) Initialize(configuration util.Configuration, prefix string if err != nil { glog.Fatalf("Failed to open topic: %v", err) } - k.topic = topic - k.doReconnect() + k.setTopic(topic) return nil } @@ -96,6 +107,8 @@ func (k *GoCDKPubSub) SendMessage(key string, message proto.Message) error { if err != nil { return err } + k.topicLock.RLock() + defer k.topicLock.RUnlock() err = k.topic.Send(context.Background(), &pubsub.Message{ Body: bytes, Metadata: map[string]string{"key": key},