diff --git a/docker/compose/test-etcd-filer.yml b/docker/compose/test-etcd-filer.yml index 37c074974..a856b9e14 100644 --- a/docker/compose/test-etcd-filer.yml +++ b/docker/compose/test-etcd-filer.yml @@ -30,6 +30,7 @@ services: environment: WEED_LEVELDB2_ENABLED: 'false' WEED_ETCD_ENABLED: 'true' + WEED_ETCD_KEY_PREFIX: 'seaweedfs.' WEED_ETCD_SERVERS: "http://etcd:2379" volumes: - ./s3.json:/etc/seaweedfs/s3.json diff --git a/weed/command/scaffold/filer.toml b/weed/command/scaffold/filer.toml index 231e7510a..30a9cae51 100644 --- a/weed/command/scaffold/filer.toml +++ b/weed/command/scaffold/filer.toml @@ -270,6 +270,12 @@ username = "" password = "" key_prefix = "seaweedfs." timeout = "3s" +# Set the CA certificate path +tls_ca_file="" +# Set the client certificate path +tls_client_crt_file="" +# Set the client private key path +tls_client_key_file="" [mongodb] enabled = false diff --git a/weed/filer/etcd/etcd_store.go b/weed/filer/etcd/etcd_store.go index 9f96405a9..12261827c 100644 --- a/weed/filer/etcd/etcd_store.go +++ b/weed/filer/etcd/etcd_store.go @@ -1,9 +1,10 @@ package etcd import ( - "bytes" "context" + "crypto/tls" "fmt" + "go.etcd.io/etcd/client/pkg/v3/transport" "strings" "time" @@ -26,49 +27,78 @@ func init() { type EtcdStore struct { client *clientv3.Client etcdKeyPrefix string + timeout time.Duration } func (store *EtcdStore) GetName() string { return "etcd" } -func (store *EtcdStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) { +func (store *EtcdStore) Initialize(configuration weed_util.Configuration, prefix string) error { + configuration.SetDefault(prefix+"servers", "localhost:2379") + configuration.SetDefault(prefix+"timeout", "3s") + servers := configuration.GetString(prefix + "servers") - if servers == "" { - servers = "localhost:2379" - } username := configuration.GetString(prefix + "username") password := configuration.GetString(prefix + "password") store.etcdKeyPrefix = configuration.GetString(prefix + "key_prefix") - timeout := configuration.GetString(prefix + "timeout") - if timeout == "" { - timeout = "3s" + timeoutStr := configuration.GetString(prefix + "timeout") + timeout, err := time.ParseDuration(timeoutStr) + if err != nil { + return fmt.Errorf("parse etcd store timeout: %v", err) + } + store.timeout = timeout + + certFile := configuration.GetString(prefix + "tls_client_crt_file") + keyFile := configuration.GetString(prefix + "tls_client_key_file") + caFile := configuration.GetString(prefix + "tls_ca_file") + + var tlsConfig *tls.Config + if caFile != "" { + tlsInfo := transport.TLSInfo{ + CertFile: certFile, + KeyFile: keyFile, + TrustedCAFile: caFile, + } + var err error + tlsConfig, err = tlsInfo.ClientConfig() + if err != nil { + return fmt.Errorf("TLS client configuration error: %v", err) + } } - return store.initialize(servers, username, password, timeout) + return store.initialize(servers, username, password, store.timeout, tlsConfig) } -func (store *EtcdStore) initialize(servers string, username string, password string, timeout string) (err error) { +func (store *EtcdStore) initialize(servers, username, password string, timeout time.Duration, tlsConfig *tls.Config) error { glog.Infof("filer store etcd: %s", servers) - to, err := time.ParseDuration(timeout) - if err != nil { - return fmt.Errorf("parse timeout %s: %s", timeout, err) - } - - store.client, err = clientv3.New(clientv3.Config{ + client, err := clientv3.New(clientv3.Config{ Endpoints: strings.Split(servers, ","), Username: username, Password: password, - DialTimeout: to, + DialTimeout: timeout, + TLS: tlsConfig, }) if err != nil { return fmt.Errorf("connect to etcd %s: %s", servers, err) } - return + ctx, cancel := context.WithTimeout(context.Background(), store.timeout) + defer cancel() + + resp, err := client.Status(ctx, client.Endpoints()[0]) + if err != nil { + client.Close() + return fmt.Errorf("error checking etcd connection: %s", err) + } + + glog.V(0).Infof("сonnection to etcd has been successfully verified. etcd version: %s", resp.Version) + store.client = client + + return nil } func (store *EtcdStore) BeginTransaction(ctx context.Context) (context.Context, error) { @@ -159,15 +189,13 @@ func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, dirPath weed_u } resp, err := store.client.Get(ctx, store.etcdKeyPrefix+string(lastFileStart), - clientv3.WithFromKey(), clientv3.WithLimit(limit+1)) + clientv3.WithRange(clientv3.GetPrefixRangeEnd(store.etcdKeyPrefix+string(directoryPrefix))), + clientv3.WithLimit(limit+1)) if err != nil { return lastFileName, fmt.Errorf("list %s : %v", dirPath, err) } for _, kv := range resp.Kvs { - if !bytes.HasPrefix(kv.Key, directoryPrefix) { - break - } fileName := getNameFromKey(kv.Key) if fileName == "" { continue diff --git a/weed/filer/etcd/etcd_store_test.go b/weed/filer/etcd/etcd_store_test.go index 31e451e00..6abb74697 100644 --- a/weed/filer/etcd/etcd_store_test.go +++ b/weed/filer/etcd/etcd_store_test.go @@ -10,7 +10,7 @@ func TestStore(t *testing.T) { // to set up local env if false { store := &EtcdStore{} - store.initialize("localhost:2379", "", "", "3s") + store.initialize("localhost:2379", "", "", 3, nil) store_test.TestFilerStore(t, store) } }