diff --git a/weed/mount/page_writer/upload_pipeline.go b/weed/mount/page_writer/upload_pipeline.go index 4ed5ed767..586e05b3f 100644 --- a/weed/mount/page_writer/upload_pipeline.go +++ b/weed/mount/page_writer/upload_pipeline.go @@ -12,6 +12,8 @@ import ( type LogicChunkIndex int type UploadPipeline struct { + uploaderCount int32 + uploaderCountCond *sync.Cond filepath util.FullPath ChunkSize int64 writableChunks map[LogicChunkIndex]PageChunk @@ -19,8 +21,6 @@ type UploadPipeline struct { sealedChunks map[LogicChunkIndex]*SealedChunk sealedChunksLock sync.Mutex uploaders *util.LimitedConcurrentExecutor - uploaderCount int32 - uploaderCountCond *sync.Cond saveToStorageFn SaveToStorageFunc activeReadChunks map[LogicChunkIndex]int activeReadChunksLock sync.Mutex diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index a243b07f9..7b57c68c7 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -72,6 +72,9 @@ type FilerOption struct { } type FilerServer struct { + inFlightDataSize int64 + inFlightDataLimitCond *sync.Cond + filer_pb.UnimplementedSeaweedFilerServer option *FilerOption secret security.SigningKey @@ -90,9 +93,6 @@ type FilerServer struct { // track known metadata listeners knownListenersLock sync.Mutex knownListeners map[int32]int32 - - inFlightDataSize int64 - inFlightDataLimitCond *sync.Cond } func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) { diff --git a/weed/util/limiter.go b/weed/util/limiter.go index 2debaaa85..9d63c12a1 100644 --- a/weed/util/limiter.go +++ b/weed/util/limiter.go @@ -50,10 +50,10 @@ func (c *LimitedConcurrentExecutor) Execute(job func()) { type OperationRequest func() type LimitedOutOfOrderProcessor struct { - processorSlots uint32 - processors []chan OperationRequest processorLimit int32 processorLimitCond *sync.Cond + processorSlots uint32 + processors []chan OperationRequest currentProcessor int32 }