From f8ef25099c63a5b0603f781e5d514e0c07bbf325 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Wed, 14 Sep 2022 10:33:35 +0500 Subject: [PATCH] Removing chunks on failed to write to replicas (#3591) * Removing chunks on failed to write to replicas https://github.com/seaweedfs/seaweedfs/issues/3578 * put with in the util.Retry * just purge on any errors --- .../filer_server_handlers_write_upload.go | 27 ++++++++++++------- weed/topology/store_replicate.go | 2 +- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go index 4dc588055..95920583d 100644 --- a/weed/server/filer_server_handlers_write_upload.go +++ b/weed/server/filer_server_handlers_write_upload.go @@ -107,7 +107,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque wg.Done() }() - chunk, toChunkErr := fs.dataToChunk(fileName, contentType, bytesBuffer.Bytes(), offset, so) + chunks, toChunkErr := fs.dataToChunk(fileName, contentType, bytesBuffer.Bytes(), offset, so) if toChunkErr != nil { uploadErrLock.Lock() if uploadErr == nil { @@ -115,12 +115,14 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque } uploadErrLock.Unlock() } - if chunk != nil { + if chunks != nil { fileChunksLock.Lock() - fileChunks = append(fileChunks, chunk) - fileChunksSize := len(fileChunks) + fileChunksSize := len(fileChunks) + len(chunks) + for _, chunk := range chunks { + fileChunks = append(fileChunks, chunk) + glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, fileChunksSize, chunk.FileId, offset, offset+int64(chunk.Size)) + } fileChunksLock.Unlock() - glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, fileChunksSize, chunk.FileId, offset, offset+int64(chunk.Size)) } }(chunkOffset) @@ -169,7 +171,7 @@ func (fs *FilerServer) doUpload(urlLocation string, limitedReader io.Reader, fil return uploadResult, err, data } -func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, chunkOffset int64, so *operation.StorageOption) (*filer_pb.FileChunk, error) { +func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, chunkOffset int64, so *operation.StorageOption) ([]*filer_pb.FileChunk, error) { dataReader := util.NewBytesReader(data) // retry to assign a different file id @@ -177,6 +179,7 @@ func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, ch var auth security.EncodedJwt var uploadErr error var uploadResult *operation.UploadResult + var failedFileChunks []*filer_pb.FileChunk err := util.Retry("filerDataToChunk", func() error { // assign one file id for one chunk @@ -191,19 +194,25 @@ func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, ch if uploadErr != nil { glog.V(4).Infof("retry later due to upload error: %v", uploadErr) stats.FilerRequestCounter.WithLabelValues(stats.ChunkDoUploadRetry).Inc() + fid, _ := filer_pb.ToFileIdObject(fileId) + fileChunk := filer_pb.FileChunk{ + FileId: fileId, + Offset: chunkOffset, + Fid: fid, + } + failedFileChunks = append(failedFileChunks, &fileChunk) return uploadErr } return nil }) if err != nil { glog.Errorf("upload error: %v", err) - return nil, err + return failedFileChunks, err } // if last chunk exhausted the reader exactly at the border if uploadResult.Size == 0 { return nil, nil } - - return uploadResult.ToPbFileChunk(fileId, chunkOffset), nil + return []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(fileId, chunkOffset)}, nil } diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go index 816f1fbe4..bbca859ac 100644 --- a/weed/topology/store_replicate.go +++ b/weed/topology/store_replicate.go @@ -197,7 +197,7 @@ func GetWritableRemoteReplications(s *storage.Store, grpcDialOption grpc.DialOpt } } } else { - err = fmt.Errorf("failed to lookup for %d: %v", volumeId, lookupErr) + err = fmt.Errorf("replicating lookup failed for %d: %v", volumeId, lookupErr) return }