From 2f243f5b0b7c037eb13ae14f58bd1f8608fbc4d0 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 12 May 2020 08:48:00 -0700 Subject: [PATCH] refactor --- .../broker/broker_grpc_server_publish.go | 4 +- .../broker/broker_grpc_server_subscribe.go | 4 +- weed/messaging/broker/broker_server.go | 6 +-- .../{topic_lock.go => topic_manager.go} | 52 +++++++++---------- 4 files changed, 33 insertions(+), 33 deletions(-) rename weed/messaging/broker/{topic_lock.go => topic_manager.go} (57%) diff --git a/weed/messaging/broker/broker_grpc_server_publish.go b/weed/messaging/broker/broker_grpc_server_publish.go index 573706c06..dc11061af 100644 --- a/weed/messaging/broker/broker_grpc_server_publish.go +++ b/weed/messaging/broker/broker_grpc_server_publish.go @@ -57,8 +57,8 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis return fmt.Errorf("channel is already closed") } - tl := broker.topicLocks.RequestLock(tp, topicConfig, true) - defer broker.topicLocks.ReleaseLock(tp, true) + tl := broker.topicManager.RequestLock(tp, topicConfig, true) + defer broker.topicManager.ReleaseLock(tp, true) md5hash := md5.New() // process each message diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go index 76cbdef24..e7cbb6441 100644 --- a/weed/messaging/broker/broker_grpc_server_subscribe.go +++ b/weed/messaging/broker/broker_grpc_server_subscribe.go @@ -47,8 +47,8 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs Topic: in.Init.Topic, Partition: in.Init.Partition, } - lock := broker.topicLocks.RequestLock(tp, topicConfig, false) - defer broker.topicLocks.ReleaseLock(tp, false) + lock := broker.topicManager.RequestLock(tp, topicConfig, false) + defer broker.topicManager.ReleaseLock(tp, false) lastReadTime := time.Now() switch in.Init.StartPosition { diff --git a/weed/messaging/broker/broker_server.go b/weed/messaging/broker/broker_server.go index e6ff2cf00..0c04d2841 100644 --- a/weed/messaging/broker/broker_server.go +++ b/weed/messaging/broker/broker_server.go @@ -24,7 +24,7 @@ type MessageBrokerOption struct { type MessageBroker struct { option *MessageBrokerOption grpcDialOption grpc.DialOption - topicLocks *TopicLocks + topicManager *TopicManager } func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOption) (messageBroker *MessageBroker, err error) { @@ -34,7 +34,7 @@ func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOptio grpcDialOption: grpcDialOption, } - messageBroker.topicLocks = NewTopicLocks(messageBroker) + messageBroker.topicManager = NewTopicManager(messageBroker) messageBroker.checkFilers() @@ -58,7 +58,7 @@ func (broker *MessageBroker) keepConnectedToOneFiler() { Name: broker.option.Ip, GrpcPort: uint32(broker.option.Port), } - for _, tp := range broker.topicLocks.ListTopicPartitions() { + for _, tp := range broker.topicManager.ListTopicPartitions() { initRequest.Resources = append(initRequest.Resources, tp.String()) } if err := stream.Send(&filer_pb.KeepConnectedRequest{ diff --git a/weed/messaging/broker/topic_lock.go b/weed/messaging/broker/topic_manager.go similarity index 57% rename from weed/messaging/broker/topic_lock.go rename to weed/messaging/broker/topic_manager.go index 9ae446df3..21594dea5 100644 --- a/weed/messaging/broker/topic_lock.go +++ b/weed/messaging/broker/topic_manager.go @@ -25,7 +25,7 @@ func (tp *TopicPartition) String() string { return fmt.Sprintf(TopicPartitionFmt, tp.Namespace, tp.Topic, tp.Partition) } -type TopicLock struct { +type TopicControl struct { sync.Mutex cond *sync.Cond subscriberCount int @@ -33,20 +33,20 @@ type TopicLock struct { logBuffer *log_buffer.LogBuffer } -type TopicLocks struct { +type TopicManager struct { sync.Mutex - locks map[TopicPartition]*TopicLock - broker *MessageBroker + topicControls map[TopicPartition]*TopicControl + broker *MessageBroker } -func NewTopicLocks(messageBroker *MessageBroker) *TopicLocks { - return &TopicLocks{ - locks: make(map[TopicPartition]*TopicLock), - broker: messageBroker, +func NewTopicManager(messageBroker *MessageBroker) *TopicManager { + return &TopicManager{ + topicControls: make(map[TopicPartition]*TopicControl), + broker: messageBroker, } } -func (locks *TopicLocks) buildLogBuffer(tl *TopicLock, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer { +func (tm *TopicManager) buildLogBuffer(tl *TopicControl, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer { flushFn := func(startTime, stopTime time.Time, buf []byte) { @@ -63,7 +63,7 @@ func (locks *TopicLocks) buildLogBuffer(tl *TopicLock, tp TopicPartition, topicC tp.Partition, ) - if err := locks.broker.appendToFile(targetFile, topicConfig, buf); err != nil { + if err := tm.broker.appendToFile(targetFile, topicConfig, buf); err != nil { glog.V(0).Infof("log write failed %s: %v", targetFile, err) } } @@ -74,16 +74,16 @@ func (locks *TopicLocks) buildLogBuffer(tl *TopicLock, tp TopicPartition, topicC return logBuffer } -func (tl *TopicLocks) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicLock { - tl.Lock() - defer tl.Unlock() +func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicControl { + tm.Lock() + defer tm.Unlock() - lock, found := tl.locks[partition] + lock, found := tm.topicControls[partition] if !found { - lock = &TopicLock{} + lock = &TopicControl{} lock.cond = sync.NewCond(&lock.Mutex) - tl.locks[partition] = lock - lock.logBuffer = tl.buildLogBuffer(lock, partition, topicConfig) + tm.topicControls[partition] = lock + lock.logBuffer = tm.buildLogBuffer(lock, partition, topicConfig) } if isPublisher { lock.publisherCount++ @@ -93,11 +93,11 @@ func (tl *TopicLocks) RequestLock(partition TopicPartition, topicConfig *messagi return lock } -func (tl *TopicLocks) ReleaseLock(partition TopicPartition, isPublisher bool) { - tl.Lock() - defer tl.Unlock() +func (tm *TopicManager) ReleaseLock(partition TopicPartition, isPublisher bool) { + tm.Lock() + defer tm.Unlock() - lock, found := tl.locks[partition] + lock, found := tm.topicControls[partition] if !found { return } @@ -107,16 +107,16 @@ func (tl *TopicLocks) ReleaseLock(partition TopicPartition, isPublisher bool) { lock.subscriberCount-- } if lock.subscriberCount <= 0 && lock.publisherCount <= 0 { - delete(tl.locks, partition) + delete(tm.topicControls, partition) lock.logBuffer.Shutdown() } } -func (tl *TopicLocks) ListTopicPartitions() (tps []TopicPartition) { - tl.Lock() - defer tl.Unlock() +func (tm *TopicManager) ListTopicPartitions() (tps []TopicPartition) { + tm.Lock() + defer tm.Unlock() - for k := range tl.locks { + for k := range tm.topicControls { tps = append(tps, k) } return