diff --git a/go.mod b/go.mod index 98ac2b4e5..fbb9764ee 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/aws/aws-sdk-go v1.33.5 github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72 github.com/cespare/xxhash v1.1.0 - github.com/chrislusf/raft v1.0.1 + github.com/chrislusf/raft v1.0.2-0.20201002174524-b13c3bfdb011 github.com/coreos/go-semver v0.3.0 // indirect github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/disintegration/imaging v1.6.2 diff --git a/go.sum b/go.sum index 22f44c9c0..b9ceb80fc 100644 --- a/go.sum +++ b/go.sum @@ -69,6 +69,8 @@ github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/chrislusf/raft v1.0.1 h1:Wa4ffkmkysW7cX3T/gMC/Mk3PhnOXhsqOVwQJcMndhw= github.com/chrislusf/raft v1.0.1/go.mod h1:Ep5DP+mJSosjfKiix1uU7Lc2Df/SX4oGJEpZlXH5l68= +github.com/chrislusf/raft v1.0.2-0.20201002174524-b13c3bfdb011 h1:vN1GvfLgDg8kIPCdhuVKAjlYpxG1B86jiKejB6MC/Q0= +github.com/chrislusf/raft v1.0.2-0.20201002174524-b13c3bfdb011/go.mod h1:Ep5DP+mJSosjfKiix1uU7Lc2Df/SX4oGJEpZlXH5l68= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa h1:OaNxuTZr7kxeODyLWsRMC+OD03aFUH+mW6r2d+MWa5Y= diff --git a/weed/command/master.go b/weed/command/master.go index 144962f63..a42983259 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -41,6 +41,7 @@ type MasterOptions struct { disableHttp *bool metricsAddress *string metricsIntervalSec *int + raftResumeState *bool } func init() { @@ -59,6 +60,7 @@ func init() { m.disableHttp = cmdMaster.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.") m.metricsAddress = cmdMaster.Flag.String("metrics.address", "", "Prometheus gateway address :") m.metricsIntervalSec = cmdMaster.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds") + m.raftResumeState = cmdMaster.Flag.Bool("resumeState", false, "resume previous state on start master server") } var cmdMaster = &Command{ @@ -118,10 +120,10 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { glog.Fatalf("Master startup error: %v", e) } // start raftServer - raftServer := weed_server.NewRaftServer(security.LoadClientTLS(util.GetViper(), "grpc.master"), - peers, myMasterAddress, util.ResolvePath(*masterOption.metaFolder), ms.Topo, 5) + raftServer, err := weed_server.NewRaftServer(security.LoadClientTLS(util.GetViper(), "grpc.master"), + peers, myMasterAddress, util.ResolvePath(*masterOption.metaFolder), ms.Topo, 5, *masterOption.raftResumeState) if raftServer == nil { - glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717", *masterOption.metaFolder) + glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717: %s", *masterOption.metaFolder, err) } ms.SetRaftServer(raftServer) r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET") diff --git a/weed/command/server.go b/weed/command/server.go index 7efc45475..1b2aada23 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -81,6 +81,7 @@ func init() { masterOptions.garbageThreshold = cmdServer.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces") masterOptions.metricsAddress = cmdServer.Flag.String("metrics.address", "", "Prometheus gateway address") masterOptions.metricsIntervalSec = cmdServer.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds") + masterOptions.raftResumeState = cmdMaster.Flag.Bool("resumeState", false, "resume previous state on start master server") filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection") filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port") diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index 958680d2b..7045437e8 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -28,7 +28,31 @@ type RaftServer struct { *raft.GrpcServer } -func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, dataDir string, topo *topology.Topology, pulseSeconds int) *RaftServer { +type StateMachine struct { + raft.StateMachine + topo *topology.Topology +} + +func (s StateMachine) Save() ([]byte, error) { + state := topology.MaxVolumeIdCommand{ + MaxVolumeId: s.topo.GetMaxVolumeId(), + } + glog.V(1).Infof("Save raft state %+v", state) + return json.Marshal(state) +} + +func (s StateMachine) Recovery(data []byte) error { + state := topology.MaxVolumeIdCommand{} + err := json.Unmarshal(data, &state) + if err != nil { + return err + } + glog.V(1).Infof("Recovery raft state %+v", state) + s.topo.UpAdjustMaxVolumeId(state.MaxVolumeId) + return nil +} + +func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, dataDir string, topo *topology.Topology, pulseSeconds int, raftResumeState bool) (*RaftServer, error) { s := &RaftServer{ peers: peers, serverAddr: serverAddr, @@ -46,26 +70,41 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d transporter := raft.NewGrpcTransporter(grpcDialOption) glog.V(0).Infof("Starting RaftServer with %v", serverAddr) - // always clear previous metadata - os.RemoveAll(path.Join(s.dataDir, "conf")) - os.RemoveAll(path.Join(s.dataDir, "log")) - os.RemoveAll(path.Join(s.dataDir, "snapshot")) + if !raftResumeState { + // always clear previous metadata + os.RemoveAll(path.Join(s.dataDir, "conf")) + os.RemoveAll(path.Join(s.dataDir, "log")) + os.RemoveAll(path.Join(s.dataDir, "snapshot")) + } + if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), 0600); err != nil { + return nil, err + } + // Clear old cluster configurations if peers are changed if oldPeers, changed := isPeersChanged(s.dataDir, serverAddr, s.peers); changed { glog.V(0).Infof("Peers Change: %v => %v", oldPeers, s.peers) } - s.raftServer, err = raft.NewServer(s.serverAddr, s.dataDir, transporter, nil, topo, "") + stateMachine := StateMachine{topo: topo} + s.raftServer, err = raft.NewServer(s.serverAddr, s.dataDir, transporter, stateMachine, topo, "") if err != nil { glog.V(0).Infoln(err) - return nil + return nil, err } s.raftServer.SetHeartbeatInterval(500 * time.Millisecond) s.raftServer.SetElectionTimeout(time.Duration(pulseSeconds) * 500 * time.Millisecond) - s.raftServer.Start() + if err := s.raftServer.LoadSnapshot(); err != nil { + return nil, err + } + if err := s.raftServer.Start(); err != nil { + return nil, err + } for _, peer := range s.peers { - s.raftServer.AddPeer(peer, pb.ServerToGrpcAddress(peer)) + if err := s.raftServer.AddPeer(peer, pb.ServerToGrpcAddress(peer)); err != nil { + return nil, err + } + } s.GrpcServer = raft.NewGrpcServer(s.raftServer) @@ -81,13 +120,13 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d if err != nil { glog.V(0).Infoln(err) - return nil + return nil, err } } glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader()) - return s + return s, nil } func (s *RaftServer) Peers() (members []string) {