diff --git a/weed/command/master.go b/weed/command/master.go index 3d33f4f7a..55e3409ed 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -37,6 +37,9 @@ type MasterOptions struct { disableHttp *bool metricsAddress *string metricsIntervalSec *int + + sequencerType *string + etcdUrls *string } func init() { @@ -55,6 +58,9 @@ func init() { m.disableHttp = cmdMaster.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.") m.metricsAddress = cmdMaster.Flag.String("metrics.address", "", "Prometheus gateway address") m.metricsIntervalSec = cmdMaster.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds") + m.sequencerType = cmdMaster.Flag.String("sequencerType", "memory", "Choose [memory|etcd] type for store the file sequence") + m.etcdUrls = cmdMaster.Flag.String("etcdUrls", "", + "when sequencerType=etcd, set etcdUrls for etcd cluster that store file sequence, example : http://127.0.0.1:2379,http://127.0.0.1:2389") } var cmdMaster = &Command{ diff --git a/weed/sequence/etcd_sequencer.go b/weed/sequence/etcd_sequencer.go index 51e0ec93f..1fc378640 100644 --- a/weed/sequence/etcd_sequencer.go +++ b/weed/sequence/etcd_sequencer.go @@ -1,21 +1,30 @@ package sequence +/* +Note : +(1) store the sequence in the ETCD cluster, and local file(sequence.dat) +(2) batch get the sequences from ETCD cluster, and store the max sequence id in the local file +(3) the sequence range is : [currentSeqId, maxSeqId), when the currentSeqId >= maxSeqId, fetch the new maxSeqId. +*/ + import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/glog" - "go.etcd.io/etcd/client" + "sync" + "time" + "io" "os" "strconv" "strings" - "sync" - "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "go.etcd.io/etcd/client" ) const ( + // EtcdKeyPrefix = "/seaweedfs" EtcdKeySequence = "/master/sequence" - EtcdKeyPrefix = "/seaweedfs" EtcdContextTimeoutSecond = 100 * time.Second DefaultEtcdSteps uint64 = 500 // internal counter SequencerFileName = "sequencer.dat" @@ -25,13 +34,12 @@ const ( type EtcdSequencer struct { sequenceLock sync.Mutex - // available sequence range : [steps, maxCounter) - maxCounter uint64 - steps uint64 + // available sequence range : [currentSeqId, maxSeqId) + currentSeqId uint64 + maxSeqId uint64 - etcdClient client.Client - keysAPI client.KeysAPI - seqFile *os.File + keysAPI client.KeysAPI + seqFile *os.File } func NewEtcdSequencer(etcdUrls string, metaFolder string) (*EtcdSequencer, error) { @@ -50,6 +58,7 @@ func NewEtcdSequencer(etcdUrls string, metaFolder string) (*EtcdSequencer, error } keysApi := client.NewKeysAPI(cli) + // TODO: the current sequence id in local file is not used maxValue, _, err := readSequenceFile(file) if err != nil { return nil, fmt.Errorf("read sequence from file failed, %v", err) @@ -61,22 +70,19 @@ func NewEtcdSequencer(etcdUrls string, metaFolder string) (*EtcdSequencer, error return nil, err } - // make the step and max the same, and then they are fake, - // after invoking the NextFileId(), they are different and real - maxCounter, steps := newSeq, newSeq - sequencer := &EtcdSequencer{maxCounter: maxCounter, - steps: steps, - etcdClient: cli, - keysAPI: keysApi, - seqFile: file, + sequencer := &EtcdSequencer{maxSeqId: newSeq, + currentSeqId: newSeq, + keysAPI: keysApi, + seqFile: file, } return sequencer, nil } -func (es *EtcdSequencer) NextFileId(count uint64) (new uint64, cnt uint64) { +func (es *EtcdSequencer) NextFileId(count uint64) uint64 { es.sequenceLock.Lock() defer es.sequenceLock.Unlock() - if (es.steps + count) >= es.maxCounter { + + if (es.currentSeqId + count) >= es.maxSeqId { reqSteps := DefaultEtcdSteps if count > DefaultEtcdSteps { reqSteps += count @@ -85,18 +91,19 @@ func (es *EtcdSequencer) NextFileId(count uint64) (new uint64, cnt uint64) { glog.V(4).Infof("get max sequence id from etcd, %d", maxId) if err != nil { glog.Error(err) - return 0, 0 + return 0 } - es.steps, es.maxCounter = maxId-reqSteps, maxId - glog.V(4).Infof("current id : %d, max id : %d", es.steps, es.maxCounter) + es.currentSeqId, es.maxSeqId = maxId-reqSteps, maxId + glog.V(4).Infof("current id : %d, max id : %d", es.currentSeqId, es.maxSeqId) - if err := writeSequenceFile(es.seqFile, es.maxCounter, es.steps); err != nil { + if err := writeSequenceFile(es.seqFile, es.maxSeqId, es.currentSeqId); err != nil { glog.Errorf("flush sequence to file failed, %v", err) } } - ret := es.steps - es.steps += count - return ret, count + + ret := es.currentSeqId + es.currentSeqId += count + return ret } /** @@ -106,13 +113,13 @@ the max value should be saved in local config file and ETCD cluster func (es *EtcdSequencer) SetMax(seenValue uint64) { es.sequenceLock.Lock() defer es.sequenceLock.Unlock() - if seenValue > es.maxCounter { + if seenValue > es.maxSeqId { maxId, err := setMaxSequenceToEtcd(es.keysAPI, seenValue) if err != nil { glog.Errorf("set Etcd Max sequence failed : %v", err) return } - es.steps, es.maxCounter = maxId, maxId + es.currentSeqId, es.maxSeqId = maxId, maxId if err := writeSequenceFile(es.seqFile, maxId, maxId); err != nil { glog.Errorf("flush sequence to file failed, %v", err) @@ -121,11 +128,11 @@ func (es *EtcdSequencer) SetMax(seenValue uint64) { } func (es *EtcdSequencer) GetMax() uint64 { - return es.maxCounter + return es.maxSeqId } func (es *EtcdSequencer) Peek() uint64 { - return es.steps + return es.currentSeqId } func batchGetSequenceFromEtcd(kvApi client.KeysAPI, step uint64) (uint64, error) { @@ -164,8 +171,11 @@ func batchGetSequenceFromEtcd(kvApi client.KeysAPI, step uint64) (uint64, error) } /** - update the key of EtcdKeySequence in ETCD cluster with the parameter of maxSeq, -until the value of EtcdKeySequence is equal to or larger than the maxSeq +update the value of the key EtcdKeySequence in ETCD cluster with the parameter of maxSeq, +when the value of the key EtcdKeySequence is equal to or large than the parameter maxSeq, +return the value of EtcdKeySequence in the ETCD cluster; +when the value of the EtcdKeySequence is less than the parameter maxSeq, +return the value of the parameter maxSeq */ func setMaxSequenceToEtcd(kvApi client.KeysAPI, maxSeq uint64) (uint64, error) { maxSeqStr := strconv.FormatUint(maxSeq, 10) @@ -178,10 +188,10 @@ func setMaxSequenceToEtcd(kvApi client.KeysAPI, maxSeq uint64) (uint64, error) { if ce, ok := err.(client.Error); ok && (ce.Code == client.ErrorCodeKeyNotFound) { _, err := kvApi.Create(ctx, EtcdKeySequence, maxSeqStr) if err == nil { - continue // create ETCD key success, retry get ETCD value + continue } if ce, ok = err.(client.Error); ok && (ce.Code == client.ErrorCodeNodeExist) { - continue // ETCD key exist, retry get ETCD value + continue } return 0, err } else { @@ -206,8 +216,6 @@ func setMaxSequenceToEtcd(kvApi client.KeysAPI, maxSeq uint64) (uint64, error) { return 0, err } } - - return maxSeq, nil } func openSequenceFile(file string) (*os.File, error) { @@ -227,7 +235,7 @@ func openSequenceFile(file string) (*os.File, error) { } /* - sequence : step 以冒号分割 +read sequence and step from sequence file */ func readSequenceFile(file *os.File) (uint64, uint64, error) { sequence := make([]byte, FileMaxSequenceLength) @@ -255,7 +263,7 @@ func readSequenceFile(file *os.File) (uint64, uint64, error) { } /** -先不存放step到文件中 +write the sequence and step to sequence file */ func writeSequenceFile(file *os.File, sequence, step uint64) error { _ = step @@ -276,103 +284,13 @@ func writeSequenceFile(file *os.File, sequence, step uint64) error { return nil } -func deleteEtcdKey(kvApi client.KeysAPI, key string) error { - ctx, cancel := context.WithTimeout(context.Background(), EtcdContextTimeoutSecond) - defer cancel() - _, err := kvApi.Delete(ctx, key, &client.DeleteOptions{Dir: false}) - if err != nil { - return err - } - return nil -} - -//func (es *EtcdSequencer) Load() error { -// es.sequenceLock.Lock() -// defer es.sequenceLock.Unlock() -// reqSteps := DefaultEtcdSteps -// maxId, err := batchGetSequenceFromEtcd(es.keysAPI, reqSteps) -// glog.V(4).Infof("get max sequence id from etcd, %d", maxId) -// if err != nil { -// glog.Error(err) -// return err -// } -// es.steps, es.maxCounter = maxId-reqSteps, maxId -// glog.V(4).Infof("current id : %d, max id : %d", es.steps, es.maxCounter) -// -// if err := writeSequenceFile(es.seqFile, es.maxCounter, es.steps); err != nil { -// glog.Errorf("flush sequence to file failed, %v", err) -// return err -// } -// return nil -//} - -//func getEtcdKey(kv client.KeysAPI, key string) (string, error) { -// resp, err := kv.Get(context.Background(), key, &client.GetOptions{Recursive: false, Quorum: true}) -// if err != nil { -// glog.Warningf("key:%s result:%v", EtcdKeySequence, err) -// return "", err -// } -// if resp.Node == nil { -// return "", fmt.Errorf("the key is not exist, %s", key) -// } -// return resp.Node.Value, nil -//} - -//func (es *EtcdSequencer) setLocalSequence(maxValue uint64) { -// es.sequenceLock.Lock() -// defer es.sequenceLock.Unlock() -// if maxValue > es.maxCounter { -// es.maxCounter, es.steps = maxValue, maxValue-DefaultEtcdSteps -// -// if err := writeSequenceFile(es.seqFile, es.maxCounter, es.steps); err != nil { -// glog.Errorf("flush sequence to file failed, %v", err) -// } -// } -//} - -//func getEtcdKeysApi(etcdUrls, user, passwd string) (client.KeysAPI, error) { -// cli, err := client.New(client.Config{ -// Endpoints: strings.Split(etcdUrls, ","), -// Username: user, -// Password: passwd, -// }) -// if err != nil { -// return nil, err -// } -// keysApi := client.NewKeysAPI(cli) -// return keysApi, nil -//} - -//func (es *EtcdSequencer) asyncStartWatcher() { -// es.startWatcher(es.keysAPI, EtcdKeySequence, func(value string, index uint64) { -// newValue, err := strconv.ParseUint(value, 10, 64) -// if err != nil { -// glog.Warning(err) -// } -// es.setLocalSequence(newValue) -// }) -//} - -//func (es *EtcdSequencer) startWatcher(kvApi client.KeysAPI, key string, callback func(value string, index uint64)) { -// ctx, cancel := context.WithTimeout(context.Background(), EtcdContextTimeoutSecond) -// defer cancel() -// ctx.Done() -// -// getResp, err := kvApi.Get(ctx, key, &client.GetOptions{Recursive: false, Quorum: true}) -// if err != nil { -// return -// } -// -// watcher := kvApi.Watcher(key, &client.WatcherOptions{AfterIndex: getResp.Index, Recursive: false}) -// go func(w client.Watcher) { -// for { -// resp, err := w.Next(context.Background()) -// if err != nil { -// glog.Error(err) -// continue -// } -// callback(resp.Node.Value, resp.Index) -// } -// }(watcher) -// return -//} +// the UT helper method +// func deleteEtcdKey(kvApi client.KeysAPI, key string) error { +// ctx, cancel := context.WithTimeout(context.Background(), EtcdContextTimeoutSecond) +// defer cancel() +// _, err := kvApi.Delete(ctx, key, &client.DeleteOptions{Dir: false}) +// if err != nil { +// return err +// } +// return nil +// } diff --git a/weed/server/master_server.go b/weed/server/master_server.go index cde583560..fd3236c53 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -39,6 +39,9 @@ type MasterOption struct { DisableHttp bool MetricsAddress string MetricsIntervalSec int + + sequencerType string + etcdUrls string } type MasterServer struct { @@ -87,7 +90,11 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "master", peers), } ms.bounedLeaderChan = make(chan int, 16) - seq := sequence.NewMemorySequencer() + + seq := ms.createSequencer(option) + if nil == seq { + glog.Fatalf("create sequencer failed.") + } ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, ms.option.PulseSeconds) ms.vg = topology.NewDefaultVolumeGrowth() glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB") @@ -230,3 +237,22 @@ func (ms *MasterServer) startAdminScripts() { } }() } + +func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer { + var seq sequence.Sequencer + glog.V(0).Infof("sequencer type [%s]", option.sequencerType) + switch strings.ToLower(option.sequencerType) { + case "memory": + seq = sequence.NewMemorySequencer() + case "etcd": + var err error + seq, err = sequence.NewEtcdSequencer(option.etcdUrls, option.MetaFolder) + if err != nil { + glog.Error(err) + seq = nil + } + default: + seq = sequence.NewMemorySequencer() + } + return seq +}