diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go index b74ad25a0..7be54b193 100644 --- a/weed/filer/reader_cache.go +++ b/weed/filer/reader_cache.go @@ -23,16 +23,16 @@ type ReaderCache struct { type SingleChunkCacher struct { completedTimeNew int64 sync.Mutex - parent *ReaderCache - chunkFileId string - data []byte - err error - cipherKey []byte - isGzipped bool - chunkSize int - shouldCache bool - wg sync.WaitGroup - cacheStartedCh chan struct{} + parent *ReaderCache + chunkFileId string + data []byte + err error + cipherKey []byte + isGzipped bool + chunkSize int + shouldCache bool + wg sync.WaitGroup + cacheStartedCh chan struct{} } func NewReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache { diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index ddd6786d0..da61ed05d 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -68,7 +68,7 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest }() var startPosition log_buffer.MessagePosition - if req.GetInit()!=nil && req.GetInit().GetPartitionOffset() != nil { + if req.GetInit() != nil && req.GetInit().GetPartitionOffset() != nil { offset := req.GetInit().GetPartitionOffset() if offset.StartTsNs != 0 { startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2) diff --git a/weed/mq/broker/broker_topic_partition_read_write.go b/weed/mq/broker/broker_topic_partition_read_write.go index d2dc4ec3e..0f53c28c3 100644 --- a/weed/mq/broker/broker_topic_partition_read_write.go +++ b/weed/mq/broker/broker_topic_partition_read_write.go @@ -26,7 +26,7 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, partition *mq_pb.Par startTime, stopTime = startTime.UTC(), stopTime.UTC() - targetFile := fmt.Sprintf("%s/%s",partitionDir, startTime.Format(topic.TIME_FORMAT)) + targetFile := fmt.Sprintf("%s/%s", partitionDir, startTime.Format(topic.TIME_FORMAT)) // TODO append block with more metadata @@ -50,7 +50,7 @@ func (b *MessageQueueBroker) genLogOnDiskReadFunc(t topic.Topic, partition *mq_p return b.MasterClient.LookupFileId(fileId) } - eachChunkFn := func (buf []byte, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) { + eachChunkFn := func(buf []byte, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) { for pos := 0; pos+4 < len(buf); { size := util.BytesToUint32(buf[pos : pos+4]) @@ -99,7 +99,7 @@ func (b *MessageQueueBroker) genLogOnDiskReadFunc(t topic.Topic, partition *mq_p if chunk.Size == 0 { continue } - if chunk.IsChunkManifest{ + if chunk.IsChunkManifest { glog.Warningf("this should not happen. unexpected chunk manifest in %s/%s", partitionDir, entry.Name) return } @@ -145,7 +145,7 @@ func (b *MessageQueueBroker) genLogOnDiskReadFunc(t topic.Topic, partition *mq_p if entry.IsDirectory { return nil } - if stopTsNs!=0 && entry.Name > stopTime.UTC().Format(topic.TIME_FORMAT) { + if stopTsNs != 0 && entry.Name > stopTime.UTC().Format(topic.TIME_FORMAT) { isDone = true return nil } diff --git a/weed/mq/broker/broker_write.go b/weed/mq/broker/broker_write.go index 866cd17c2..896f0ee75 100644 --- a/weed/mq/broker/broker_write.go +++ b/weed/mq/broker/broker_write.go @@ -24,14 +24,14 @@ func (b *MessageQueueBroker) appendToFile(targetFile string, data []byte) error var offset int64 = 0 if err == filer_pb.ErrNotFound { entry = &filer_pb.Entry{ - Name: name, + Name: name, IsDirectory: false, Attributes: &filer_pb.FuseAttributes{ - Crtime: time.Now().Unix(), - Mtime: time.Now().Unix(), - FileMode: uint32(os.FileMode(0644)), - Uid: uint32(os.Getuid()), - Gid: uint32(os.Getgid()), + Crtime: time.Now().Unix(), + Mtime: time.Now().Unix(), + FileMode: uint32(os.FileMode(0644)), + Uid: uint32(os.Getuid()), + Gid: uint32(os.Getgid()), }, } } else if err != nil { @@ -45,11 +45,11 @@ func (b *MessageQueueBroker) appendToFile(targetFile string, data []byte) error // update the entry return b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - return filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{ - Directory: dir, - Entry: entry, - }) + return filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{ + Directory: dir, + Entry: entry, }) + }) } func (b *MessageQueueBroker) assignAndUpload(targetFile string, data []byte) (fileId string, uploadResult *operation.UploadResult, err error) { @@ -63,11 +63,11 @@ func (b *MessageQueueBroker) assignAndUpload(targetFile string, data []byte) (fi Collection: "topics", // TtlSec: wfs.option.TtlSec, // DiskType: string(wfs.option.DiskType), - DataCenter: b.option.DataCenter, - Path: targetFile, + DataCenter: b.option.DataCenter, + Path: targetFile, }, &operation.UploadOption{ - Cipher: b.option.Cipher, + Cipher: b.option.Cipher, }, func(host, fileId string) string { fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) diff --git a/weed/mq/client/cmd/weed_pub/publisher.go b/weed/mq/client/cmd/weed_pub/publisher.go index 6f5b2312d..2873ba21f 100644 --- a/weed/mq/client/cmd/weed_pub/publisher.go +++ b/weed/mq/client/cmd/weed_pub/publisher.go @@ -12,8 +12,8 @@ import ( ) var ( - messageCount = flag.Int("n", 1000, "message count") - concurrency = flag.Int("c", 4, "concurrent publishers") + messageCount = flag.Int("n", 1000, "message count") + concurrency = flag.Int("c", 4, "concurrent publishers") partitionCount = flag.Int("p", 6, "partition count") namespace = flag.String("ns", "test", "namespace") diff --git a/weed/mq/client/pub_client/publish.go b/weed/mq/client/pub_client/publish.go index 3b9817e74..1c5891049 100644 --- a/weed/mq/client/pub_client/publish.go +++ b/weed/mq/client/pub_client/publish.go @@ -7,7 +7,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" ) - func (p *TopicPublisher) Publish(key, value []byte) error { hashKey := util.HashToInt32(key) % pub_balancer.MaxPartitionCount if hashKey < 0 { diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go index c952bcfb6..9262d6e0c 100644 --- a/weed/mq/client/pub_client/publisher.go +++ b/weed/mq/client/pub_client/publisher.go @@ -16,7 +16,7 @@ type PublisherConfiguration struct { Topic topic.Topic CreateTopic bool CreateTopicPartitionCount int32 - Brokers []string + Brokers []string } type PublishClient struct { diff --git a/weed/mq/pub_balancer/balancer.go b/weed/mq/pub_balancer/balancer.go index 0007e7364..ad894b1d8 100644 --- a/weed/mq/pub_balancer/balancer.go +++ b/weed/mq/pub_balancer/balancer.go @@ -31,10 +31,10 @@ const ( type Balancer struct { Brokers cmap.ConcurrentMap[string, *BrokerStats] // key: broker address // Collected from all brokers when they connect to the broker leader - TopicToBrokers cmap.ConcurrentMap[string, *PartitionSlotToBrokerList] // key: topic name + TopicToBrokers cmap.ConcurrentMap[string, *PartitionSlotToBrokerList] // key: topic name OnPartitionChange func(topic *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment) - OnAddBroker func(broker string, brokerStats *BrokerStats) - OnRemoveBroker func(broker string, brokerStats *BrokerStats) + OnAddBroker func(broker string, brokerStats *BrokerStats) + OnRemoveBroker func(broker string, brokerStats *BrokerStats) } func NewBalancer() *Balancer { diff --git a/weed/mq/pub_balancer/broker_stats.go b/weed/mq/pub_balancer/broker_stats.go index 45c5271df..b4bb28e42 100644 --- a/weed/mq/pub_balancer/broker_stats.go +++ b/weed/mq/pub_balancer/broker_stats.go @@ -39,11 +39,11 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) { for _, topicPartitionStats := range stats.Stats { tps := &TopicPartitionStats{ TopicPartition: topic.TopicPartition{ - Topic: topic.Topic{Namespace: topicPartitionStats.Topic.Namespace, Name: topicPartitionStats.Topic.Name}, + Topic: topic.Topic{Namespace: topicPartitionStats.Topic.Namespace, Name: topicPartitionStats.Topic.Name}, Partition: topic.Partition{ RangeStart: topicPartitionStats.Partition.RangeStart, - RangeStop: topicPartitionStats.Partition.RangeStop, - RingSize: topicPartitionStats.Partition.RingSize, + RangeStop: topicPartitionStats.Partition.RangeStop, + RingSize: topicPartitionStats.Partition.RingSize, UnixTimeNs: topicPartitionStats.Partition.UnixTimeNs, }, }, @@ -66,11 +66,11 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) { func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Partition, isAdd bool) { tps := &TopicPartitionStats{ TopicPartition: topic.TopicPartition{ - Topic: topic.Topic{Namespace: t.Namespace, Name: t.Name}, + Topic: topic.Topic{Namespace: t.Namespace, Name: t.Name}, Partition: topic.Partition{ RangeStart: partition.RangeStart, - RangeStop: partition.RangeStop, - RingSize: partition.RingSize, + RangeStop: partition.RangeStop, + RingSize: partition.RingSize, UnixTimeNs: partition.UnixTimeNs, }, }, diff --git a/weed/mq/sub_coordinator/consumer_group.go b/weed/mq/sub_coordinator/consumer_group.go index a1279c204..f897fe2b3 100644 --- a/weed/mq/sub_coordinator/consumer_group.go +++ b/weed/mq/sub_coordinator/consumer_group.go @@ -16,12 +16,12 @@ type ConsumerGroupInstance struct { ResponseChan chan *mq_pb.SubscriberToSubCoordinatorResponse } type ConsumerGroup struct { - topic topic.Topic + topic topic.Topic // map a consumer group instance id to a consumer group instance ConsumerGroupInstances cmap.ConcurrentMap[string, *ConsumerGroupInstance] - mapping *PartitionConsumerMapping - reBalanceTimer *time.Timer - pubBalancer *pub_balancer.Balancer + mapping *PartitionConsumerMapping + reBalanceTimer *time.Timer + pubBalancer *pub_balancer.Balancer } func NewConsumerGroup(t *mq_pb.Topic, pubBalancer *pub_balancer.Balancer) *ConsumerGroup { @@ -40,13 +40,13 @@ func NewConsumerGroupInstance(instanceId string) *ConsumerGroupInstance { } } func (cg *ConsumerGroup) OnAddConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic) { - cg.onConsumerGroupInstanceChange("add consumer instance "+ consumerGroupInstance) + cg.onConsumerGroupInstanceChange("add consumer instance " + consumerGroupInstance) } func (cg *ConsumerGroup) OnRemoveConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic) { - cg.onConsumerGroupInstanceChange("remove consumer instance "+ consumerGroupInstance) + cg.onConsumerGroupInstanceChange("remove consumer instance " + consumerGroupInstance) } -func (cg *ConsumerGroup) onConsumerGroupInstanceChange(reason string){ +func (cg *ConsumerGroup) onConsumerGroupInstanceChange(reason string) { if cg.reBalanceTimer != nil { cg.reBalanceTimer.Stop() cg.reBalanceTimer = nil @@ -107,9 +107,9 @@ func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(knownPartitionSlotToBr for i, partitionSlot := range partitionSlots { assignedPartitions[i] = &mq_pb.SubscriberToSubCoordinatorResponse_AssignedPartition{ Partition: &mq_pb.Partition{ - RangeStop: partitionSlot.RangeStop, + RangeStop: partitionSlot.RangeStop, RangeStart: partitionSlot.RangeStart, - RingSize: partitionSlotToBrokerList.RingSize, + RingSize: partitionSlotToBrokerList.RingSize, UnixTimeNs: partitionSlot.UnixTimeNs, }, Broker: partitionSlot.Broker, @@ -126,5 +126,4 @@ func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(knownPartitionSlotToBr consumerGroupInstance.ResponseChan <- response } - } diff --git a/weed/mq/sub_coordinator/coordinator.go b/weed/mq/sub_coordinator/coordinator.go index 5a4474076..bb50991ab 100644 --- a/weed/mq/sub_coordinator/coordinator.go +++ b/weed/mq/sub_coordinator/coordinator.go @@ -31,7 +31,7 @@ func NewCoordinator(balancer *pub_balancer.Balancer) *Coordinator { func (c *Coordinator) GetTopicConsumerGroups(topic *mq_pb.Topic, createIfMissing bool) *TopicConsumerGroups { topicName := toTopicName(topic) tcg, _ := c.TopicSubscribers.Get(topicName) - if tcg == nil && createIfMissing{ + if tcg == nil && createIfMissing { tcg = &TopicConsumerGroups{ ConsumerGroups: cmap.New[*ConsumerGroup](), } @@ -56,14 +56,14 @@ func (c *Coordinator) AddSubscriber(consumerGroup, consumerGroupInstance string, cg, _ := tcg.ConsumerGroups.Get(consumerGroup) if cg == nil { cg = NewConsumerGroup(topic, c.balancer) - if !tcg.ConsumerGroups.SetIfAbsent(consumerGroup, cg){ + if !tcg.ConsumerGroups.SetIfAbsent(consumerGroup, cg) { cg, _ = tcg.ConsumerGroups.Get(consumerGroup) } } cgi, _ := cg.ConsumerGroupInstances.Get(consumerGroupInstance) if cgi == nil { cgi = NewConsumerGroupInstance(consumerGroupInstance) - if !cg.ConsumerGroupInstances.SetIfAbsent(consumerGroupInstance, cgi){ + if !cg.ConsumerGroupInstances.SetIfAbsent(consumerGroupInstance, cgi) { cgi, _ = cg.ConsumerGroupInstances.Get(consumerGroupInstance) } } diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go index fc8ea2b1e..aa2eefcdc 100644 --- a/weed/mq/topic/local_manager.go +++ b/weed/mq/topic/local_manager.go @@ -88,7 +88,7 @@ func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.Br manager.topics.IterCb(func(topic string, localTopic *LocalTopic) { for _, localPartition := range localTopic.Partitions { topicPartition := &TopicPartition{ - Topic: Topic{Namespace: localTopic.Namespace, Name: localTopic.Name}, + Topic: Topic{Namespace: localTopic.Namespace, Name: localTopic.Name}, Partition: localPartition.Partition, } stats.Stats[topicPartition.String()] = &mq_pb.TopicPartitionStats{ @@ -96,7 +96,7 @@ func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.Br Namespace: string(localTopic.Namespace), Name: localTopic.Name, }, - Partition: localPartition.Partition.ToPbPartition(), + Partition: localPartition.Partition.ToPbPartition(), ConsumerCount: localPartition.ConsumerCount, } // fmt.Printf("collect topic %+v partition %+v\n", topicPartition, localPartition.Partition) diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index 9b7281b65..8ae029bb4 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -22,6 +22,7 @@ type LocalPartition struct { } var TIME_FORMAT = "2006-01-02-15-04-05" + func NewLocalPartition(partition Partition, isLeader bool, followerBrokers []pb.ServerAddress, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition { return &LocalPartition{ Partition: partition, diff --git a/weed/pb/filer_pb/filer.pb.go b/weed/pb/filer_pb/filer.pb.go index 38101afdb..4e013f9f8 100644 --- a/weed/pb/filer_pb/filer.pb.go +++ b/weed/pb/filer_pb/filer.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.32.0 -// protoc v4.25.2 +// protoc-gen-go v1.31.0 +// protoc v4.25.3 // source: filer.proto package filer_pb diff --git a/weed/pb/filer_pb/filer_grpc.pb.go b/weed/pb/filer_pb/filer_grpc.pb.go index edc47d7df..ae1564f43 100644 --- a/weed/pb/filer_pb/filer_grpc.pb.go +++ b/weed/pb/filer_pb/filer_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.25.2 +// - protoc v4.25.3 // source: filer.proto package filer_pb diff --git a/weed/pb/iam_pb/iam.pb.go b/weed/pb/iam_pb/iam.pb.go index 1468d7b80..d1bd1c4b3 100644 --- a/weed/pb/iam_pb/iam.pb.go +++ b/weed/pb/iam_pb/iam.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.32.0 -// protoc v4.25.2 +// protoc-gen-go v1.31.0 +// protoc v4.25.3 // source: iam.proto package iam_pb diff --git a/weed/pb/iam_pb/iam_grpc.pb.go b/weed/pb/iam_pb/iam_grpc.pb.go index ba6f9879a..3c2a10a90 100644 --- a/weed/pb/iam_pb/iam_grpc.pb.go +++ b/weed/pb/iam_pb/iam_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.25.2 +// - protoc v4.25.3 // source: iam.proto package iam_pb diff --git a/weed/pb/master_pb/master.pb.go b/weed/pb/master_pb/master.pb.go index 30486eaaa..c619776ff 100644 --- a/weed/pb/master_pb/master.pb.go +++ b/weed/pb/master_pb/master.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.32.0 -// protoc v4.25.2 +// protoc-gen-go v1.31.0 +// protoc v4.25.3 // source: master.proto package master_pb diff --git a/weed/pb/master_pb/master_grpc.pb.go b/weed/pb/master_pb/master_grpc.pb.go index e8462bf96..4c41658cf 100644 --- a/weed/pb/master_pb/master_grpc.pb.go +++ b/weed/pb/master_pb/master_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.25.2 +// - protoc v4.25.3 // source: master.proto package master_pb diff --git a/weed/pb/mount_pb/mount.pb.go b/weed/pb/mount_pb/mount.pb.go index 226e4705c..c93aa3a52 100644 --- a/weed/pb/mount_pb/mount.pb.go +++ b/weed/pb/mount_pb/mount.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.32.0 -// protoc v4.25.2 +// protoc-gen-go v1.31.0 +// protoc v4.25.3 // source: mount.proto package mount_pb diff --git a/weed/pb/mount_pb/mount_grpc.pb.go b/weed/pb/mount_pb/mount_grpc.pb.go index bd6cd9f17..3dd6d126b 100644 --- a/weed/pb/mount_pb/mount_grpc.pb.go +++ b/weed/pb/mount_pb/mount_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.25.2 +// - protoc v4.25.3 // source: mount.proto package mount_pb diff --git a/weed/pb/mq_pb/mq_grpc.pb.go b/weed/pb/mq_pb/mq_grpc.pb.go index 39e1c115d..c2ddcdc62 100644 --- a/weed/pb/mq_pb/mq_grpc.pb.go +++ b/weed/pb/mq_pb/mq_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.25.2 +// - protoc v4.25.3 // source: mq.proto package mq_pb diff --git a/weed/pb/remote_pb/remote.pb.go b/weed/pb/remote_pb/remote.pb.go index 481b308f2..469a85a19 100644 --- a/weed/pb/remote_pb/remote.pb.go +++ b/weed/pb/remote_pb/remote.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.32.0 -// protoc v4.25.2 +// protoc-gen-go v1.31.0 +// protoc v4.25.3 // source: remote.proto package remote_pb diff --git a/weed/pb/s3_pb/s3.pb.go b/weed/pb/s3_pb/s3.pb.go index 82903904b..e8c8f3226 100644 --- a/weed/pb/s3_pb/s3.pb.go +++ b/weed/pb/s3_pb/s3.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.32.0 -// protoc v4.25.2 +// protoc-gen-go v1.31.0 +// protoc v4.25.3 // source: s3.proto package s3_pb diff --git a/weed/pb/s3_pb/s3_grpc.pb.go b/weed/pb/s3_pb/s3_grpc.pb.go index 304fe4581..2fedf571b 100644 --- a/weed/pb/s3_pb/s3_grpc.pb.go +++ b/weed/pb/s3_pb/s3_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.25.2 +// - protoc v4.25.3 // source: s3.proto package s3_pb diff --git a/weed/pb/volume_server_pb/volume_server.pb.go b/weed/pb/volume_server_pb/volume_server.pb.go index 3811b7fbd..de4891a56 100644 --- a/weed/pb/volume_server_pb/volume_server.pb.go +++ b/weed/pb/volume_server_pb/volume_server.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.32.0 -// protoc v4.25.2 +// protoc-gen-go v1.31.0 +// protoc v4.25.3 // source: volume_server.proto package volume_server_pb diff --git a/weed/pb/volume_server_pb/volume_server_grpc.pb.go b/weed/pb/volume_server_pb/volume_server_grpc.pb.go index 632b18abb..940adf339 100644 --- a/weed/pb/volume_server_pb/volume_server_grpc.pb.go +++ b/weed/pb/volume_server_pb/volume_server_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.25.2 +// - protoc v4.25.3 // source: volume_server.proto package volume_server_pb diff --git a/weed/util/buffered_queue/buffered_queue.go b/weed/util/buffered_queue/buffered_queue.go index 6f5f79eb5..edaa0a7ce 100644 --- a/weed/util/buffered_queue/buffered_queue.go +++ b/weed/util/buffered_queue/buffered_queue.go @@ -32,12 +32,12 @@ func NewBufferedQueue[T any](chunkSize int) *BufferedQueue[T] { // Create an empty chunk to initialize head and tail chunk := &ItemChunkNode[T]{items: make([]T, chunkSize), nodeId: 0} bq := &BufferedQueue[T]{ - chunkSize: chunkSize, - head: chunk, - tail: chunk, - last: chunk, - count: 0, - mutex: sync.Mutex{}, + chunkSize: chunkSize, + head: chunk, + tail: chunk, + last: chunk, + count: 0, + mutex: sync.Mutex{}, } bq.waitCond = sync.NewCond(&bq.mutex) return bq @@ -87,7 +87,7 @@ func (q *BufferedQueue[T]) Dequeue() (T, bool) { q.mutex.Lock() defer q.mutex.Unlock() - for q.count <= 0 && !q.isClosed { + for q.count <= 0 && !q.isClosed { q.waitCond.Wait() } if q.count <= 0 && q.isClosed { diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index e7dd3dce0..273df5593 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -27,39 +27,39 @@ type LogFlushFuncType func(startTime, stopTime time.Time, buf []byte) type LogReadFromDiskFuncType func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error) type LogBuffer struct { - name string - prevBuffers *SealedBuffers - buf []byte - batchIndex int64 - idx []int - pos int - startTime time.Time - stopTime time.Time - lastFlushTime time.Time - sizeBuf []byte - flushInterval time.Duration + name string + prevBuffers *SealedBuffers + buf []byte + batchIndex int64 + idx []int + pos int + startTime time.Time + stopTime time.Time + lastFlushTime time.Time + sizeBuf []byte + flushInterval time.Duration flushFn LogFlushFuncType ReadFromDiskFn LogReadFromDiskFuncType notifyFn func() - isStopping *atomic.Bool - flushChan chan *dataToFlush - lastTsNs int64 + isStopping *atomic.Bool + flushChan chan *dataToFlush + lastTsNs int64 sync.RWMutex } func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFuncType, readFromDiskFn LogReadFromDiskFuncType, notifyFn func()) *LogBuffer { lb := &LogBuffer{ - name: name, - prevBuffers: newSealedBuffers(PreviousBufferCount), - buf: make([]byte, BufferSize), - sizeBuf: make([]byte, 4), - flushInterval: flushInterval, - flushFn: flushFn, + name: name, + prevBuffers: newSealedBuffers(PreviousBufferCount), + buf: make([]byte, BufferSize), + sizeBuf: make([]byte, 4), + flushInterval: flushInterval, + flushFn: flushFn, ReadFromDiskFn: readFromDiskFn, - notifyFn: notifyFn, - flushChan: make(chan *dataToFlush, 256), - isStopping: new(atomic.Bool), + notifyFn: notifyFn, + flushChan: make(chan *dataToFlush, 256), + isStopping: new(atomic.Bool), } go lb.loopFlush() go lb.loopInterval() @@ -199,10 +199,10 @@ func (logBuffer *LogBuffer) copyToFlush() *dataToFlush { return nil } -func (logBuffer *LogBuffer) GetEarliestTime() time.Time{ +func (logBuffer *LogBuffer) GetEarliestTime() time.Time { return logBuffer.startTime } -func (logBuffer *LogBuffer) GetEarliestPosition() MessagePosition{ +func (logBuffer *LogBuffer) GetEarliestPosition() MessagePosition { return MessagePosition{ Time: logBuffer.startTime, BatchIndex: logBuffer.batchIndex, @@ -241,8 +241,8 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu } if tsMemory.IsZero() { // case 2.2 println("2.2 no data") - return nil, -2,nil - } else if lastReadPosition.Before(tsMemory) && lastReadPosition.BatchIndex +1 < tsBatchIndex { // case 2.3 + return nil, -2, nil + } else if lastReadPosition.Before(tsMemory) && lastReadPosition.BatchIndex+1 < tsBatchIndex { // case 2.3 if !logBuffer.lastFlushTime.IsZero() { glog.V(0).Infof("resume with last flush time: %v", logBuffer.lastFlushTime) return nil, -2, ResumeFromDiskError @@ -273,7 +273,7 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu } } // glog.V(4).Infof("%s return the current buf %v", m.name, lastReadPosition) - return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.batchIndex,nil + return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.batchIndex, nil } lastTs := lastReadPosition.UnixNano() diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go index 6acd5a50f..8a4d2d851 100644 --- a/weed/util/log_buffer/log_read.go +++ b/weed/util/log_buffer/log_read.go @@ -18,7 +18,7 @@ var ( ) type MessagePosition struct { - time.Time // this is the timestamp of the message + time.Time // this is the timestamp of the message BatchIndex int64 // this is only used when the timestamp is not enough to identify the next message, when the timestamp is in the previous batch. } diff --git a/weed/util/log_buffer/sealed_buffer.go b/weed/util/log_buffer/sealed_buffer.go index 920a811f2..c41b30fcc 100644 --- a/weed/util/log_buffer/sealed_buffer.go +++ b/weed/util/log_buffer/sealed_buffer.go @@ -6,10 +6,10 @@ import ( ) type MemBuffer struct { - buf []byte - size int - startTime time.Time - stopTime time.Time + buf []byte + size int + startTime time.Time + stopTime time.Time batchIndex int64 }