diff --git a/weed/command/master.go b/weed/command/master.go index fb09a24b4..3d33f4f7a 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -100,8 +100,11 @@ func runMaster(cmd *Command, args []string) bool { } func startMaster(masterOption MasterOptions, masterWhiteList []string) { + + myMasterAddress, peers := checkPeers(*masterOption.ip, *masterOption.port, *masterOption.peers) + r := mux.NewRouter() - ms := weed_server.NewMasterServer(r, masterOption.toMasterOption(masterWhiteList)) + ms := weed_server.NewMasterServer(r, masterOption.toMasterOption(masterWhiteList), peers) listeningAddress := *masterOption.ipBind + ":" + strconv.Itoa(*masterOption.port) glog.V(0).Infof("Start Seaweed Master %s at %s", util.VERSION, listeningAddress) masterListener, e := util.NewListener(listeningAddress, 0) @@ -109,7 +112,6 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { glog.Fatalf("Master startup error: %v", e) } // start raftServer - myMasterAddress, peers := checkPeers(*masterOption.ip, *masterOption.port, *masterOption.peers) raftServer := weed_server.NewRaftServer(security.LoadClientTLS(viper.Sub("grpc"), "master"), peers, myMasterAddress, *masterOption.metaFolder, ms.Topo, *masterOption.pulseSeconds) if raftServer == nil { @@ -131,6 +133,8 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.VERSION, *masterOption.ipBind, grpcPort) go grpcS.Serve(grpcL) + go ms.MasterClient.KeepConnectedToMaster() + // start http server httpS := &http.Server{Handler: r} go httpS.Serve(masterListener) @@ -152,11 +156,10 @@ func checkPeers(masterIp string, masterPort int, peers string) (masterAddress st } } - peerCount := len(cleanedPeers) if !hasSelf { - peerCount += 1 + cleanedPeers = append(cleanedPeers, masterAddress) } - if peerCount%2 == 0 { + if len(cleanedPeers)%2 == 0 { glog.Fatalf("Only odd number of masters are supported!") } return diff --git a/weed/command/server.go b/weed/command/server.go index d39abd9ae..1c6439edc 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -5,7 +5,6 @@ import ( "os" "runtime" "runtime/pprof" - "strconv" "strings" "time" @@ -122,14 +121,13 @@ func runServer(cmd *Command, args []string) bool { *isStartingFiler = true } - master := *serverIp + ":" + strconv.Itoa(*masterOptions.port) masterOptions.ip = serverIp masterOptions.ipBind = serverBindIp - filerOptions.masters = &master + filerOptions.masters = masterOptions.peers filerOptions.ip = serverBindIp serverOptions.v.ip = serverIp serverOptions.v.bindIp = serverBindIp - serverOptions.v.masters = &master + serverOptions.v.masters = masterOptions.peers serverOptions.v.idleConnectionTimeout = serverTimeout serverOptions.v.dataCenter = serverDataCenter serverOptions.v.rack = serverRack diff --git a/weed/filer2/filer_deletion.go b/weed/filer2/filer_deletion.go index fea93d57f..25e27e504 100644 --- a/weed/filer2/filer_deletion.go +++ b/weed/filer2/filer_deletion.go @@ -15,7 +15,7 @@ func (f *Filer) loopProcessingDeletion() { lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) { m := make(map[string]operation.LookupResult) for _, vid := range vids { - locs := f.MasterClient.GetVidLocations(vid) + locs, _ := f.MasterClient.GetVidLocations(vid) var locations []operation.Location for _, loc := range locs { locations = append(locations, operation.Location{ diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 8eea2441e..0b395701d 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -95,7 +95,11 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol return nil, err } var locs []*filer_pb.Location - for _, loc := range fs.filer.MasterClient.GetLocations(uint32(vid)) { + locations, found := fs.filer.MasterClient.GetLocations(uint32(vid)) + if !found { + continue + } + for _, loc := range locations { locs = append(locs, &filer_pb.Location{ Url: loc.Url, PublicUrl: loc.PublicUrl, diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 3689b5495..e9eb32cca 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -1,8 +1,10 @@ package weed_server import ( + "context" "fmt" "github.com/chrislusf/seaweedfs/weed/shell" + "github.com/chrislusf/seaweedfs/weed/wdclient" "google.golang.org/grpc" "net/http" "net/http/httputil" @@ -56,9 +58,11 @@ type MasterServer struct { clientChans map[string]chan *master_pb.VolumeLocation grpcDialOpiton grpc.DialOption + + MasterClient *wdclient.MasterClient } -func NewMasterServer(r *mux.Router, option *MasterOption) *MasterServer { +func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *MasterServer { v := viper.GetViper() signingKey := v.GetString("jwt.signing.key") @@ -73,11 +77,14 @@ func NewMasterServer(r *mux.Router, option *MasterOption) *MasterServer { if option.VolumePreallocate { preallocateSize = int64(option.VolumeSizeLimitMB) * (1 << 20) } + + grpcDialOption := security.LoadClientTLS(v.Sub("grpc"), "master") ms := &MasterServer{ option: option, preallocateSize: preallocateSize, clientChans: make(map[string]chan *master_pb.VolumeLocation), - grpcDialOpiton: security.LoadClientTLS(v.Sub("grpc"), "master"), + grpcDialOpiton: grpcDialOption, + MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "master", peers), } ms.bounedLeaderChan = make(chan int, 16) seq := sequence.NewMemorySequencer() @@ -92,7 +99,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption) *MasterServer { r.HandleFunc("/", ms.proxyToLeader(ms.uiStatusHandler)) r.HandleFunc("/ui/index.html", ms.uiStatusHandler) r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler))) - r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.dirLookupHandler))) + r.HandleFunc("/dir/lookup", ms.guard.WhiteList(ms.dirLookupHandler)) r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler))) r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler))) r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler))) @@ -102,7 +109,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption) *MasterServer { r.HandleFunc("/stats/health", ms.guard.WhiteList(statsHealthHandler)) r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler)) r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler)) - r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler)) + r.HandleFunc("/{fileId}", ms.redirectHandler) } ms.Topo.StartRefreshWritableVolumes(ms.grpcDialOpiton, ms.option.GarbageThreshold, ms.preallocateSize) diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go index 5c7ff41cf..93f983375 100644 --- a/weed/server/master_server_handlers.go +++ b/weed/server/master_server_handlers.go @@ -22,21 +22,7 @@ func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volume if _, ok := volumeLocations[vid]; ok { continue } - volumeId, err := needle.NewVolumeId(vid) - if err == nil { - machines := ms.Topo.Lookup(collection, volumeId) - if machines != nil { - var ret []operation.Location - for _, dn := range machines { - ret = append(ret, operation.Location{Url: dn.Url(), PublicUrl: dn.PublicUrl}) - } - volumeLocations[vid] = operation.LookupResult{VolumeId: vid, Locations: ret} - } else { - volumeLocations[vid] = operation.LookupResult{VolumeId: vid, Error: fmt.Sprintf("volumeId %s not found.", vid)} - } - } else { - volumeLocations[vid] = operation.LookupResult{VolumeId: vid, Error: fmt.Sprintf("Unknown volumeId format: %s", vid)} - } + volumeLocations[vid] = ms.findVolumeLocation(collection, vid) } return } @@ -59,10 +45,8 @@ func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request) vid = fileId[0:commaSep] } } - vids := []string{vid} collection := r.FormValue("collection") //optional, but can be faster if too many collections - volumeLocations := ms.lookupVolumeId(vids, collection) - location := volumeLocations[vid] + location := ms.findVolumeLocation(collection, vid) httpStatus := http.StatusOK if location.Error != "" { httpStatus = http.StatusNotFound @@ -74,6 +58,35 @@ func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request) writeJsonQuiet(w, r, httpStatus, location) } +// findVolumeLocation finds the volume location from master topo if it is leader, +// or from master client if not leader +func (ms *MasterServer) findVolumeLocation(collection string, vid string) operation.LookupResult { + var locations []operation.Location + var err error + if ms.Topo.IsLeader() { + volumeId, newVolumeIdErr := needle.NewVolumeId(vid) + machines := ms.Topo.Lookup(collection, volumeId) + for _, loc := range machines { + locations = append(locations, operation.Location{Url: loc.Url(), PublicUrl: loc.PublicUrl}) + } + err = newVolumeIdErr + } else { + machines, getVidLocationsErr := ms.MasterClient.GetVidLocations(vid) + for _, loc := range machines { + locations = append(locations, operation.Location{Url: loc.Url, PublicUrl: loc.PublicUrl}) + } + err = getVidLocationsErr + } + ret := operation.LookupResult{ + VolumeId: vid, + Locations: locations, + } + if err != nil { + ret.Error = err.Error() + } + return ret +} + func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) { stats.AssignRequest() requestedCount, e := strconv.ParseUint(r.FormValue("count"), 10, 64) diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index 343bcb8da..6b5da1132 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -95,23 +95,19 @@ func (ms *MasterServer) volumeStatusHandler(w http.ResponseWriter, r *http.Reque func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) { vid, _, _, _, _ := parseURLPath(r.URL.Path) - volumeId, err := needle.NewVolumeId(vid) - if err != nil { - debug("parsing error:", err, r.URL.Path) - return - } collection := r.FormValue("collection") - machines := ms.Topo.Lookup(collection, volumeId) - if machines != nil && len(machines) > 0 { + location := ms.findVolumeLocation(collection, vid) + if location.Error == "" { + loc := location.Locations[rand.Intn(len(location.Locations))] var url string if r.URL.RawQuery != "" { - url = util.NormalizeUrl(machines[rand.Intn(len(machines))].PublicUrl) + r.URL.Path + "?" + r.URL.RawQuery + url = util.NormalizeUrl(loc.PublicUrl) + r.URL.Path + "?" + r.URL.RawQuery } else { - url = util.NormalizeUrl(machines[rand.Intn(len(machines))].PublicUrl) + r.URL.Path + url = util.NormalizeUrl(loc.PublicUrl) + r.URL.Path } http.Redirect(w, r, url, http.StatusMovedPermanently) } else { - writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %d or collection %s not found", volumeId, collection)) + writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %s not found: %s", vid, location.Error)) } } diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 8ad0d51c8..f07cb93f9 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -87,8 +87,8 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr func doEcEncode(ctx context.Context, commandEnv *CommandEnv, collection string, vid needle.VolumeId) (err error) { // find volume location - locations := commandEnv.MasterClient.GetLocations(uint32(vid)) - if len(locations) == 0 { + locations, found := commandEnv.MasterClient.GetLocations(uint32(vid)) + if !found { return fmt.Errorf("volume %d not found", vid) } diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go index 06308944d..01d9cdaed 100644 --- a/weed/wdclient/vid_map.go +++ b/weed/wdclient/vid_map.go @@ -64,20 +64,25 @@ func (vc *vidMap) LookupVolumeServer(fileId string) (volumeServer string, err er return serverUrl, nil } -func (vc *vidMap) GetVidLocations(vid string) (locations []Location) { +func (vc *vidMap) GetVidLocations(vid string) (locations []Location, err error) { id, err := strconv.Atoi(vid) if err != nil { glog.V(1).Infof("Unknown volume id %s", vid) - return nil + return nil, fmt.Errorf("Unknown volume id %s", vid) } - return vc.GetLocations(uint32(id)) + foundLocations, found := vc.GetLocations(uint32(id)) + if found { + return foundLocations, nil + } + return nil, fmt.Errorf("volume id %s not found", vid) } -func (vc *vidMap) GetLocations(vid uint32) (locations []Location) { +func (vc *vidMap) GetLocations(vid uint32) (locations []Location, found bool) { vc.RLock() defer vc.RUnlock() - return vc.vid2Locations[vid] + locations, found = vc.vid2Locations[vid] + return } func (vc *vidMap) GetRandomLocation(vid uint32) (serverUrl string, err error) {