From 036566629ac327deaa66a296387901b56b6fb325 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 1 Aug 2022 00:06:18 -0700 Subject: [PATCH] filer.sync: fix synchronization logic in active-active mode fix https://github.com/seaweedfs/seaweedfs/issues/3328 --- weed/filer/filechunks.go | 5 ++++- weed/filer/filechunks2_test.go | 39 ++++++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go index 3b99e0785..00f4c2921 100644 --- a/weed/filer/filechunks.go +++ b/weed/filer/filechunks.go @@ -114,9 +114,12 @@ func DoMinusChunksBySourceFileId(as, bs []*filer_pb.FileChunk) (delta []*filer_p fileIds := make(map[string]bool) for _, interval := range bs { fileIds[interval.GetFileIdString()] = true + fileIds[interval.GetSourceFileId()] = true } for _, chunk := range as { - if _, found := fileIds[chunk.GetSourceFileId()]; !found { + _, sourceFileIdFound := fileIds[chunk.GetSourceFileId()] + _, fileIdFound := fileIds[chunk.GetFileId()] + if !sourceFileIdFound && !fileIdFound { delta = append(delta, chunk) } } diff --git a/weed/filer/filechunks2_test.go b/weed/filer/filechunks2_test.go index fe35b4a05..7aa00864b 100644 --- a/weed/filer/filechunks2_test.go +++ b/weed/filer/filechunks2_test.go @@ -1,13 +1,52 @@ package filer import ( + "github.com/stretchr/testify/assert" "golang.org/x/exp/slices" + "log" "testing" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" ) +func TestDoMinusChunks(t *testing.T) { + // https://github.com/seaweedfs/seaweedfs/issues/3328 + + // clusterA and clusterB using filer.sync to sync file: hello.txt + // clusterA append a new line and then clusterB also append a new line + // clusterA append a new line again + chunksInA := []*filer_pb.FileChunk{ + {Offset: 0, Size: 3, FileId: "11", Mtime: 100}, + {Offset: 3, Size: 3, FileId: "22", SourceFileId: "2", Mtime: 200}, + {Offset: 6, Size: 3, FileId: "33", Mtime: 300}, + } + chunksInB := []*filer_pb.FileChunk{ + {Offset: 0, Size: 3, FileId: "1", SourceFileId: "11", Mtime: 100}, + {Offset: 3, Size: 3, FileId: "2", Mtime: 200}, + {Offset: 6, Size: 3, FileId: "3", SourceFileId: "33", Mtime: 300}, + } + + // clusterB using command "echo 'content' > hello.txt" to overwrite file + // clusterA will receive two evenNotification, need to empty the whole file content first and add new content + // the first one is oldEntry is chunksInB and newEntry is empty fileChunks + firstOldEntry := chunksInB + var firstNewEntry []*filer_pb.FileChunk + + // clusterA received the first one event, gonna empty the whole chunk, according the code in filer_sink 194 + // we can get the deleted chunks and newChunks + firstDeletedChunks := DoMinusChunks(firstOldEntry, firstNewEntry) + log.Println("first deleted chunks:", firstDeletedChunks) + //firstNewEntry := DoMinusChunks(firstNewEntry, firstOldEntry) + + // clusterA need to delete all chunks in firstDeletedChunks + emptiedChunksInA := DoMinusChunksBySourceFileId(chunksInA, firstDeletedChunks) + // chunksInA supposed to be empty by minus the deletedChunks but it just delete the chunk which sync from clusterB + log.Println("clusterA synced empty chunks event result:", emptiedChunksInA) + // clusterB emptied it's chunks and clusterA must sync the change and empty chunks too + assert.Equalf(t, firstNewEntry, emptiedChunksInA, "empty") +} + func TestCompactFileChunksRealCase(t *testing.T) { chunks := []*filer_pb.FileChunk{