package page_writer import ( "sync/atomic" ) func (up *UploadPipeline) LockForRead(startOffset, stopOffset int64) { startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize) stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize) if stopOffset%up.ChunkSize > 0 { stopLogicChunkIndex += 1 } up.activeReadChunksLock.Lock() defer up.activeReadChunksLock.Unlock() for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ { if count, found := up.activeReadChunks[i]; found { up.activeReadChunks[i] = count + 1 } else { up.activeReadChunks[i] = 1 } } } func (up *UploadPipeline) UnlockForRead(startOffset, stopOffset int64) { startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize) stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize) if stopOffset%up.ChunkSize > 0 { stopLogicChunkIndex += 1 } up.activeReadChunksLock.Lock() defer up.activeReadChunksLock.Unlock() for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ { if count, found := up.activeReadChunks[i]; found { if count == 1 { delete(up.activeReadChunks, i) } else { up.activeReadChunks[i] = count - 1 } } } } func (up *UploadPipeline) IsLocked(logicChunkIndex LogicChunkIndex) bool { up.activeReadChunksLock.Lock() defer up.activeReadChunksLock.Unlock() if count, found := up.activeReadChunks[logicChunkIndex]; found { return count > 0 } return false } func (up *UploadPipeline) waitForCurrentWritersToComplete() { up.uploaderCountCond.L.Lock() t := int32(100) for { t = atomic.LoadInt32(&up.uploaderCount) if t <= 0 { break } up.uploaderCountCond.Wait() } up.uploaderCountCond.L.Unlock() }