diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index d6a06bb57..2d0b00b76 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -24,18 +24,20 @@ type MasterClient struct { grpcDialOption grpc.DialOption vidMap + vidMapCacheSize int OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time) } func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, masters map[string]pb.ServerAddress) *MasterClient { return &MasterClient{ - FilerGroup: filerGroup, - clientType: clientType, - clientHost: clientHost, - masters: masters, - grpcDialOption: grpcDialOption, - vidMap: newVidMap(clientDataCenter), + FilerGroup: filerGroup, + clientType: clientType, + clientHost: clientHost, + masters: masters, + grpcDialOption: grpcDialOption, + vidMap: newVidMap(clientDataCenter), + vidMapCacheSize: 5, } } @@ -175,10 +177,12 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToleader).Inc() return nil } - mc.vidMap = newVidMap("") + //mc.vidMap = newVidMap("") + mc.resetVidMap() mc.updateVidMap(resp) } else { - mc.vidMap = newVidMap("") + mc.resetVidMap() + //mc.vidMap = newVidMap("") } mc.currentMaster = master @@ -263,3 +267,17 @@ func (mc *MasterClient) WithClient(streamingMode bool, fn func(client master_pb. }) }) } + +func (mc *MasterClient) resetVidMap() { + tail := &vidMap{vid2Locations: mc.vid2Locations, ecVid2Locations: mc.ecVid2Locations, cache: mc.cache} + mc.vidMap = newVidMap("") + mc.vidMap.cache = tail + + for i := 0; i < mc.vidMapCacheSize && tail.cache != nil; i++ { + if i == mc.vidMapCacheSize-1 { + tail.cache = nil + } else { + tail = tail.cache + } + } +} diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go index 754c77051..5771c112a 100644 --- a/weed/wdclient/vid_map.go +++ b/weed/wdclient/vid_map.go @@ -40,6 +40,7 @@ type vidMap struct { ecVid2Locations map[uint32][]Location DataCenter string cursor int32 + cache *vidMap } func newVidMap(dataCenter string) vidMap { @@ -119,17 +120,29 @@ func (vc *vidMap) GetVidLocations(vid string) (locations []Location, err error) } func (vc *vidMap) GetLocations(vid uint32) (locations []Location, found bool) { + glog.V(4).Infof("~ lookup volume id %d: %+v ec:%+v", vid, vc.vid2Locations, vc.ecVid2Locations) + locations, found = vc.getLocations(vid) + if found && len(locations) > 0 { + return locations, found + } + + if vc.cache != nil { + return vc.cache.GetLocations(vid) + } + + return nil, false +} + +func (vc *vidMap) getLocations(vid uint32) (locations []Location, found bool) { vc.RLock() defer vc.RUnlock() - glog.V(4).Infof("~ lookup volume id %d: %+v ec:%+v", vid, vc.vid2Locations, vc.ecVid2Locations) - locations, found = vc.vid2Locations[vid] if found && len(locations) > 0 { return } locations, found = vc.ecVid2Locations[vid] - return locations, found && len(locations) > 0 + return } func (vc *vidMap) addLocation(vid uint32, location Location) { @@ -177,6 +190,10 @@ func (vc *vidMap) addEcLocation(vid uint32, location Location) { } func (vc *vidMap) deleteLocation(vid uint32, location Location) { + if vc.cache != nil { + vc.cache.deleteLocation(vid, location) + } + vc.Lock() defer vc.Unlock() @@ -193,10 +210,13 @@ func (vc *vidMap) deleteLocation(vid uint32, location Location) { break } } - } func (vc *vidMap) deleteEcLocation(vid uint32, location Location) { + if vc.cache != nil { + vc.cache.deleteLocation(vid, location) + } + vc.Lock() defer vc.Unlock() @@ -213,5 +233,4 @@ func (vc *vidMap) deleteEcLocation(vid uint32, location Location) { break } } - } diff --git a/weed/wdclient/vid_map_test.go b/weed/wdclient/vid_map_test.go index 0cea698ac..b1cd24490 100644 --- a/weed/wdclient/vid_map_test.go +++ b/weed/wdclient/vid_map_test.go @@ -2,6 +2,9 @@ package wdclient import ( "fmt" + "google.golang.org/grpc" + "strconv" + "sync" "testing" ) @@ -59,6 +62,76 @@ func TestLocationIndex(t *testing.T) { } } +func TestLookupFileId(t *testing.T) { + mc := NewMasterClient(grpc.EmptyDialOption{}, "", "", "", "", nil) + length := 5 + + //Construct a cache linked list of length 5 + for i := 0; i < length; i++ { + mc.addLocation(uint32(i), Location{Url: strconv.FormatInt(int64(i), 10)}) + mc.resetVidMap() + } + for i := 0; i < length; i++ { + locations, found := mc.GetLocations(uint32(i)) + if !found || len(locations) != 1 || locations[0].Url != strconv.FormatInt(int64(i), 10) { + t.Fatalf("urls of vid=%d is not valid.", i) + } + } + + //When continue to add nodes to the linked list, the previous node will be deleted, and the cache of the response will be gone. + for i := length; i < length+5; i++ { + mc.addLocation(uint32(i), Location{Url: strconv.FormatInt(int64(i), 10)}) + mc.resetVidMap() + } + for i := 0; i < length; i++ { + locations, found := mc.GetLocations(uint32(i)) + if found { + t.Fatalf("urls of vid[%d] should not exists, but found: %v", i, locations) + } + } + + //The delete operation will be applied to all cache nodes + _, found := mc.GetLocations(uint32(length)) + if !found { + t.Fatalf("urls of vid[%d] not found", length) + } + + //If the locations of the current node exist, return directly + newUrl := "abc" + mc.addLocation(uint32(length), Location{Url: newUrl}) + locations, found := mc.GetLocations(uint32(length)) + if !found || locations[0].Url != newUrl { + t.Fatalf("urls of vid[%d] not found", length) + } + + //After delete `abc`, cache nodes are searched + deleteLoc := Location{Url: newUrl} + mc.deleteLocation(uint32(length), deleteLoc) + locations, found = mc.GetLocations(uint32(length)) + if found && locations[0].Url != strconv.FormatInt(int64(length), 10) { + t.Fatalf("urls of vid[%d] not expected", length) + } + + //lock: concurrent test + var wg sync.WaitGroup + for i := 0; i < 20; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 100; i++ { + for i := 0; i < 20; i++ { + _, _ = mc.GetLocations(uint32(i)) + } + } + }() + } + + for i := 0; i < 100; i++ { + mc.addLocation(uint32(i), Location{}) + } + wg.Wait() +} + func BenchmarkLocationIndex(b *testing.B) { b.SetParallelism(8) vm := vidMap{