diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index e241a904e..080312aa8 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -125,7 +125,7 @@ func runBenchmark(cmd *Command, args []string) bool { defer pprof.StopCPUProfile() } - b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", "", 0, strings.Split(*b.masters, ",")) + b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", "", 0, "", strings.Split(*b.masters, ",")) go b.masterClient.KeepConnectedToMaster() b.masterClient.WaitUntilConnected() diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 35f4cdc6a..d9c250127 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -44,16 +44,15 @@ type Filer struct { } func NewFiler(masters []string, grpcDialOption grpc.DialOption, - filerHost string, filerGrpcPort uint32, collection string, replication string, notifyFn func()) *Filer { + filerHost string, filerGrpcPort uint32, collection string, replication string, dataCenter string, notifyFn func()) *Filer { f := &Filer{ - MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, masters), + MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, dataCenter, masters), fileIdDeletionQueue: util.NewUnboundedQueue(), GrpcDialOption: grpcDialOption, } f.LocalMetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, f.logFlushFunc, notifyFn) f.metaLogCollection = collection f.metaLogReplication = replication - go f.loopProcessingDeletion() return f diff --git a/weed/filer/leveldb/leveldb_store_test.go b/weed/filer/leveldb/leveldb_store_test.go index b07f81129..44625a7d4 100644 --- a/weed/filer/leveldb/leveldb_store_test.go +++ b/weed/filer/leveldb/leveldb_store_test.go @@ -11,7 +11,7 @@ import ( ) func TestCreateAndFind(t *testing.T) { - testFiler := filer.NewFiler(nil, nil, "", 0, "", "", nil) + testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil) dir, _ := ioutil.TempDir("", "seaweedfs_filer_test") defer os.RemoveAll(dir) store := &LevelDBStore{} diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 065bb3251..dc93ae062 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -89,7 +89,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) glog.Fatal("master list is required!") } - fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, func() { + fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, option.DataCenter, func() { fs.listenersCond.Broadcast() }) fs.filer.Cipher = option.Cipher diff --git a/weed/server/master_server.go b/weed/server/master_server.go index cc1c4b2ad..ccc94ebac 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -93,7 +93,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste preallocateSize: preallocateSize, clientChans: make(map[string]chan *master_pb.VolumeLocation), grpcDialOption: grpcDialOption, - MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Host, 0, peers), + MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Host, 0, "", peers), adminLocks: NewAdminLocks(), } ms.bounedLeaderChan = make(chan int, 16) diff --git a/weed/shell/commands.go b/weed/shell/commands.go index 1a937ad53..0e285214b 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -45,7 +45,7 @@ var ( func NewCommandEnv(options ShellOptions) *CommandEnv { ce := &CommandEnv{ env: make(map[string]string), - MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", 0, strings.Split(*options.Masters, ",")), + MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", 0, "", strings.Split(*options.Masters, ",")), option: options, } ce.locker = exclusive_locks.NewExclusiveLocker(ce.MasterClient) diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index e91e6f28e..e39b9dfdf 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -24,14 +24,14 @@ type MasterClient struct { vidMap } -func NewMasterClient(grpcDialOption grpc.DialOption, clientType string, clientHost string, clientGrpcPort uint32, masters []string) *MasterClient { +func NewMasterClient(grpcDialOption grpc.DialOption, clientType string, clientHost string, clientGrpcPort uint32, clientDataCenter string, masters []string) *MasterClient { return &MasterClient{ clientType: clientType, clientHost: clientHost, grpcPort: clientGrpcPort, masters: masters, grpcDialOption: grpcDialOption, - vidMap: newVidMap(), + vidMap: newVidMap(clientDataCenter), } } @@ -89,7 +89,7 @@ func (mc *MasterClient) tryAllMasters() { } mc.currentMaster = "" - mc.vidMap = newVidMap() + mc.vidMap = newVidMap("") } } diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go index b72ac3f55..09b9eb71c 100644 --- a/weed/wdclient/vid_map.go +++ b/weed/wdclient/vid_map.go @@ -24,13 +24,14 @@ type Location struct { type vidMap struct { sync.RWMutex vid2Locations map[uint32][]Location - - cursor int32 + DataCenter string + cursor int32 } -func newVidMap() vidMap { +func newVidMap(dataCenter string) vidMap { return vidMap{ vid2Locations: make(map[uint32][]Location), + DataCenter: dataCenter, cursor: -1, } } @@ -57,7 +58,11 @@ func (vc *vidMap) LookupVolumeServerUrl(vid string) (serverUrls []string, err er return nil, fmt.Errorf("volume %d not found", id) } for _, loc := range locations { - serverUrls = append(serverUrls, loc.Url) + if vc.DataCenter == "" || loc.DataCenter == "" || vc.DataCenter != loc.DataCenter { + serverUrls = append(serverUrls, loc.Url) + } else { + serverUrls = append([]string{loc.Url}, serverUrls...) + } } return } @@ -93,7 +98,6 @@ func (vc *vidMap) GetVidLocations(vid string) (locations []Location, err error) func (vc *vidMap) GetLocations(vid uint32) (locations []Location, found bool) { vc.RLock() defer vc.RUnlock() - locations, found = vc.vid2Locations[vid] return } diff --git a/weed/wdclient/vid_map_test.go b/weed/wdclient/vid_map_test.go index 87be2fc25..0cea698ac 100644 --- a/weed/wdclient/vid_map_test.go +++ b/weed/wdclient/vid_map_test.go @@ -45,7 +45,7 @@ func TestLocationIndex(t *testing.T) { mustOk(7, maxCursorIndex, 0) // test with constructor - vm = newVidMap() + vm = newVidMap("") length := 7 for i := 0; i < 100; i++ { got, err := vm.getLocationIndex(length)