mirror of
https://github.com/chrislusf/seaweedfs
synced 2024-05-09 13:00:45 +02:00
Compare commits
16 commits
a184cc46bb
...
2f1063c955
Author | SHA1 | Date | |
---|---|---|---|
2f1063c955 | |||
0d04264494 | |||
abf01a0eb7 | |||
855607c536 | |||
607927da60 | |||
2f3fee9bb9 | |||
6e4b9181f5 | |||
cc2885b4f2 | |||
850018101a | |||
9cd7022c17 | |||
374c5d2461 | |||
e7fc64f524 | |||
af6b029147 | |||
339541a71b | |||
3be3afc80e | |||
9d5c1a3cbf |
|
@ -1,6 +1,16 @@
|
|||
version: '3.9'
|
||||
|
||||
services:
|
||||
postgres:
|
||||
image: postgres:13
|
||||
environment:
|
||||
- POSTGRES_DB=seaweedfs
|
||||
- POSTGRES_USER=seaweedfs
|
||||
- POSTGRES_PASSWORD=seaweedfs
|
||||
ports:
|
||||
- "5432:5432"
|
||||
#command: ["postgres", "-c", "log_statement=all"]
|
||||
command: ["postgres"]
|
||||
master:
|
||||
image: chrislusf/seaweedfs:local
|
||||
ports:
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
[leveldb2]
|
||||
enabled = true
|
||||
dir = "/data/filerldb2"
|
||||
dir = "/data/filerldb2
|
||||
|
|
3
docker/filer_leveldb3.toml
Normal file
3
docker/filer_leveldb3.toml
Normal file
|
@ -0,0 +1,3 @@
|
|||
[leveldb3]
|
||||
enabled = true
|
||||
dir = "/data/filer_leveldb3"
|
21
docker/filer_postgres2.toml
Normal file
21
docker/filer_postgres2.toml
Normal file
|
@ -0,0 +1,21 @@
|
|||
[postgres2]
|
||||
enabled = true
|
||||
port = 5432
|
||||
createTable = """
|
||||
CREATE TABLE IF NOT EXISTS "%s" (
|
||||
dirhash BIGINT,
|
||||
name VARCHAR(65535),
|
||||
directory VARCHAR(65535),
|
||||
meta bytea,
|
||||
PRIMARY KEY (dirhash, name)
|
||||
);
|
||||
"""
|
||||
hostname = "postgres"
|
||||
username = "seaweedfs"
|
||||
database = "seaweedfs" # create or use an existing database
|
||||
password = "seaweedfs"
|
||||
schema = ""
|
||||
sslmode = "disable"
|
||||
connection_max_idle = 5
|
||||
connection_max_open = 10
|
||||
connection_max_lifetime_seconds = 0
|
12
docker/filer_postgres_s3.toml
Normal file
12
docker/filer_postgres_s3.toml
Normal file
|
@ -0,0 +1,12 @@
|
|||
[postgres_s3]
|
||||
enabled = true
|
||||
port = 5432
|
||||
hostname = "postgres"
|
||||
username = "seaweedfs"
|
||||
database = "seaweedfs" # create or use an existing database
|
||||
password = "seaweedfs"
|
||||
schema = ""
|
||||
sslmode = "disable"
|
||||
connection_max_idle = 5
|
||||
connection_max_open = 10
|
||||
connection_max_lifetime_seconds = 0
|
28
go.mod
28
go.mod
|
@ -5,7 +5,7 @@ go 1.22.0
|
|||
require (
|
||||
cloud.google.com/go v0.112.1 // indirect
|
||||
cloud.google.com/go/pubsub v1.37.0
|
||||
cloud.google.com/go/storage v1.39.1
|
||||
cloud.google.com/go/storage v1.40.0
|
||||
github.com/Azure/azure-pipeline-go v0.2.3
|
||||
github.com/Azure/azure-storage-blob-go v0.15.0
|
||||
github.com/Shopify/sarama v1.38.1
|
||||
|
@ -18,7 +18,6 @@ require (
|
|||
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||
github.com/disintegration/imaging v1.6.2
|
||||
github.com/dustin/go-humanize v1.0.1
|
||||
github.com/eapache/go-resiliency v1.3.0 // indirect
|
||||
github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 // indirect
|
||||
|
@ -102,23 +101,22 @@ require (
|
|||
github.com/xdg-go/scram v1.1.2 // indirect
|
||||
github.com/xdg-go/stringprep v1.0.4 // indirect
|
||||
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect
|
||||
go.etcd.io/etcd/client/v3 v3.5.12
|
||||
go.mongodb.org/mongo-driver v1.14.0
|
||||
go.etcd.io/etcd/client/v3 v3.5.13
|
||||
go.mongodb.org/mongo-driver v1.15.0
|
||||
go.opencensus.io v0.24.0 // indirect
|
||||
gocloud.dev v0.37.0
|
||||
gocloud.dev/pubsub/natspubsub v0.37.0
|
||||
gocloud.dev/pubsub/rabbitpubsub v0.36.0
|
||||
gocloud.dev/pubsub/rabbitpubsub v0.37.0
|
||||
golang.org/x/crypto v0.22.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3
|
||||
golang.org/x/image v0.15.0
|
||||
golang.org/x/net v0.24.0
|
||||
golang.org/x/oauth2 v0.18.0 // indirect
|
||||
golang.org/x/oauth2 v0.19.0 // indirect
|
||||
golang.org/x/sys v0.19.0
|
||||
golang.org/x/text v0.14.0 // indirect
|
||||
golang.org/x/tools v0.17.0
|
||||
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
|
||||
google.golang.org/api v0.172.0
|
||||
google.golang.org/appengine v1.6.8 // indirect
|
||||
google.golang.org/api v0.176.0
|
||||
google.golang.org/genproto v0.0.0-20240311173647-c811ad7063a7 // indirect
|
||||
google.golang.org/grpc v1.63.2
|
||||
google.golang.org/protobuf v1.33.0
|
||||
|
@ -143,6 +141,7 @@ require (
|
|||
github.com/aws/aws-sdk-go-v2/config v1.27.11
|
||||
github.com/aws/aws-sdk-go-v2/credentials v1.17.11
|
||||
github.com/aws/aws-sdk-go-v2/service/s3 v1.51.4
|
||||
github.com/cognusion/imaging v1.0.1
|
||||
github.com/fluent/fluent-logger-golang v1.9.0
|
||||
github.com/getsentry/sentry-go v0.27.0
|
||||
github.com/golang-jwt/jwt/v5 v5.2.1
|
||||
|
@ -165,9 +164,10 @@ require (
|
|||
)
|
||||
|
||||
require (
|
||||
cloud.google.com/go/compute v1.25.0 // indirect
|
||||
cloud.google.com/go/compute/metadata v0.2.3 // indirect
|
||||
cloud.google.com/go/iam v1.1.6 // indirect
|
||||
cloud.google.com/go/auth v0.2.2 // indirect
|
||||
cloud.google.com/go/auth/oauth2adapt v0.2.1 // indirect
|
||||
cloud.google.com/go/compute/metadata v0.3.0 // indirect
|
||||
cloud.google.com/go/iam v1.1.7 // indirect
|
||||
filippo.io/edwards25519 v1.1.0 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.10.0 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1 // indirect
|
||||
|
@ -317,7 +317,7 @@ require (
|
|||
github.com/zeebo/blake3 v0.2.3 // indirect
|
||||
github.com/zeebo/errs v1.3.0 // indirect
|
||||
go.etcd.io/bbolt v1.3.8 // indirect
|
||||
go.etcd.io/etcd/api/v3 v3.5.12 // indirect
|
||||
go.etcd.io/etcd/api/v3 v3.5.13 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
|
||||
go.opentelemetry.io/otel v1.24.0 // indirect
|
||||
|
@ -329,8 +329,8 @@ require (
|
|||
golang.org/x/sync v0.6.0 // indirect
|
||||
golang.org/x/term v0.19.0 // indirect
|
||||
golang.org/x/time v0.5.0 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20240311173647-c811ad7063a7 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20240314234333-6e1732d8331c // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be // indirect
|
||||
gopkg.in/ini.v1 v1.67.0 // indirect
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
|
||||
gopkg.in/validator.v2 v2.0.1 // indirect
|
||||
|
|
57
go.sum
57
go.sum
|
@ -16,20 +16,22 @@ cloud.google.com/go v0.63.0/go.mod h1:GmezbQc7T2snqkEXWfZ0sy0VfkB/ivI2DdtJL2DEml
|
|||
cloud.google.com/go v0.65.0/go.mod h1:O5N8zS7uWy9vkA9vayVHs65eM1ubvY4h553ofrNHObY=
|
||||
cloud.google.com/go v0.112.1 h1:uJSeirPke5UNZHIb4SxfZklVSiWWVqW4oXlETwZziwM=
|
||||
cloud.google.com/go v0.112.1/go.mod h1:+Vbu+Y1UU+I1rjmzeMOb/8RfkKJK2Gyxi1X6jJCZLo4=
|
||||
cloud.google.com/go/auth v0.2.2 h1:gmxNJs4YZYcw6YvKRtVBaF2fyUE6UrWPyzU8jHvYfmI=
|
||||
cloud.google.com/go/auth v0.2.2/go.mod h1:2bDNJWtWziDT3Pu1URxHHbkHE/BbOCuyUiKIGcNvafo=
|
||||
cloud.google.com/go/auth/oauth2adapt v0.2.1 h1:VSPmMmUlT8CkIZ2PzD9AlLN+R3+D1clXMWHHa6vG/Ag=
|
||||
cloud.google.com/go/auth/oauth2adapt v0.2.1/go.mod h1:tOdK/k+D2e4GEwfBRA48dKNQiDsqIXxLh7VU319eV0g=
|
||||
cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
|
||||
cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE=
|
||||
cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc=
|
||||
cloud.google.com/go/bigquery v1.5.0/go.mod h1:snEHRnqQbz117VIFhE8bmtwIDY80NLUZUMb4Nv6dBIg=
|
||||
cloud.google.com/go/bigquery v1.7.0/go.mod h1://okPTzCYNXSlb24MZs83e2Do+h+VXtc4gLoIoXIAPc=
|
||||
cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ=
|
||||
cloud.google.com/go/compute v1.25.0 h1:H1/4SqSUhjPFE7L5ddzHOfY2bCAvjwNRZPNl6Ni5oYU=
|
||||
cloud.google.com/go/compute v1.25.0/go.mod h1:GR7F0ZPZH8EhChlMo9FkLd7eUTwEymjqQagxzilIxIE=
|
||||
cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY=
|
||||
cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA=
|
||||
cloud.google.com/go/compute/metadata v0.3.0 h1:Tz+eQXMEqDIKRsmY3cHTL6FVaynIjX2QxYC4trgAKZc=
|
||||
cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k=
|
||||
cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
|
||||
cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk=
|
||||
cloud.google.com/go/iam v1.1.6 h1:bEa06k05IO4f4uJonbB5iAgKTPpABy1ayxaIZV/GHVc=
|
||||
cloud.google.com/go/iam v1.1.6/go.mod h1:O0zxdPeGBoFdWW3HWmBxJsk0pfvNM/p/qa82rWOGTwI=
|
||||
cloud.google.com/go/iam v1.1.7 h1:z4VHOhwKLF/+UYXAJDFwGtNF0b6gjsW1Pk9Ml0U/IoM=
|
||||
cloud.google.com/go/iam v1.1.7/go.mod h1:J4PMPg8TtyurAUvSmPj8FF3EDgY1SPRZxcUGrn7WXGA=
|
||||
cloud.google.com/go/kms v1.15.7 h1:7caV9K3yIxvlQPAcaFffhlT7d1qpxjB1wHBtjWa13SM=
|
||||
cloud.google.com/go/kms v1.15.7/go.mod h1:ub54lbsa6tDkUwnu4W7Yt1aAIFLnspgh0kPGToDukeI=
|
||||
cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
|
||||
|
@ -43,8 +45,8 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo
|
|||
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
|
||||
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
|
||||
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
|
||||
cloud.google.com/go/storage v1.39.1 h1:MvraqHKhogCOTXTlct/9C3K3+Uy2jBmFYb3/Sp6dVtY=
|
||||
cloud.google.com/go/storage v1.39.1/go.mod h1:xK6xZmxZmo+fyP7+DEF6FhNc24/JAe95OLyOHCXFH1o=
|
||||
cloud.google.com/go/storage v1.40.0 h1:VEpDQV5CJxFmJ6ueWNsKxcr1QAYOXEgxDa+sBbJahPw=
|
||||
cloud.google.com/go/storage v1.40.0/go.mod h1:Rrj7/hKlG87BLqDJYtwR0fbPld8uJPbQ2ucUMY7Ir0g=
|
||||
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
||||
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
|
||||
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
|
||||
|
@ -230,6 +232,8 @@ github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWH
|
|||
github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
|
||||
github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
|
||||
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
|
||||
github.com/cognusion/imaging v1.0.1 h1:jJa1+jYHvr2zS5zZxoluYthH5KbVz4LEvD3xy/W2L90=
|
||||
github.com/cognusion/imaging v1.0.1/go.mod h1:ucYm08RsFoQvYXEV5XMsRBppxrWzD1AGxm6iod5/rvM=
|
||||
github.com/colinmarc/hdfs/v2 v2.4.0 h1:v6R8oBx/Wu9fHpdPoJJjpGSUxo8NhHIwrwsfhFvU9W0=
|
||||
github.com/colinmarc/hdfs/v2 v2.4.0/go.mod h1:0NAO+/3knbMx6+5pCv+Hcbaz4xn/Zzbn9+WIib2rKVI=
|
||||
github.com/coreos/go-semver v0.3.1 h1:yi21YpKnrx1gt5R+la8n5WgS0kCrsPp33dmEyHReZr4=
|
||||
|
@ -250,8 +254,6 @@ github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczC
|
|||
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
|
||||
github.com/disintegration/imaging v1.6.2 h1:w1LecBlG2Lnp8B3jk5zSuNqd7b4DXhcjwek1ei82L+c=
|
||||
github.com/disintegration/imaging v1.6.2/go.mod h1:44/5580QXChDfwIclfc/PCwrr44amcmDAg8hxG0Ewe4=
|
||||
github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI=
|
||||
github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ=
|
||||
github.com/dropbox/dropbox-sdk-go-unofficial/v6 v6.0.5 h1:FT+t0UEDykcor4y3dMVKXIiWJETBpRgERYTGlmMd7HU=
|
||||
|
@ -979,14 +981,14 @@ go.einride.tech/aip v0.66.0 h1:XfV+NQX6L7EOYK11yoHHFtndeaWh3KbD9/cN/6iWEt8=
|
|||
go.einride.tech/aip v0.66.0/go.mod h1:qAhMsfT7plxBX+Oy7Huol6YUvZ0ZzdUz26yZsQwfl1M=
|
||||
go.etcd.io/bbolt v1.3.8 h1:xs88BrvEv273UsB79e0hcVrlUWmS0a8upikMFhSyAtA=
|
||||
go.etcd.io/bbolt v1.3.8/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw=
|
||||
go.etcd.io/etcd/api/v3 v3.5.12 h1:W4sw5ZoU2Juc9gBWuLk5U6fHfNVyY1WC5g9uiXZio/c=
|
||||
go.etcd.io/etcd/api/v3 v3.5.12/go.mod h1:Ot+o0SWSyT6uHhA56al1oCED0JImsRiU9Dc26+C2a+4=
|
||||
go.etcd.io/etcd/api/v3 v3.5.13 h1:8WXU2/NBge6AUF1K1gOexB6e07NgsN1hXK0rSTtgSp4=
|
||||
go.etcd.io/etcd/api/v3 v3.5.13/go.mod h1:gBqlqkcMMZMVTMm4NDZloEVJzxQOQIls8splbqBDa0c=
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.13 h1:RVZSAnWWWiI5IrYAXjQorajncORbS0zI48LQlE2kQWg=
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.13/go.mod h1:XxHT4u1qU12E2+po+UVPrEeL94Um6zL58ppuJWXSAB8=
|
||||
go.etcd.io/etcd/client/v3 v3.5.12 h1:v5lCPXn1pf1Uu3M4laUE2hp/geOTc5uPcYYsNe1lDxg=
|
||||
go.etcd.io/etcd/client/v3 v3.5.12/go.mod h1:tSbBCakoWmmddL+BKVAJHa9km+O/E+bumDe9mSbPiqw=
|
||||
go.mongodb.org/mongo-driver v1.14.0 h1:P98w8egYRjYe3XDjxhYJagTokP/H6HzlsnojRgZRd80=
|
||||
go.mongodb.org/mongo-driver v1.14.0/go.mod h1:Vzb0Mk/pa7e6cWw85R4F/endUC3u0U9jGcNU603k65c=
|
||||
go.etcd.io/etcd/client/v3 v3.5.13 h1:o0fHTNJLeO0MyVbc7I3fsCf6nrOqn5d+diSarKnB2js=
|
||||
go.etcd.io/etcd/client/v3 v3.5.13/go.mod h1:cqiAeY8b5DEEcpxvgWKsbLIWNM/8Wy2xJSDMtioMcoI=
|
||||
go.mongodb.org/mongo-driver v1.15.0 h1:rJCKC8eEliewXjZGf0ddURtl7tTVy1TK3bfl0gkUSLc=
|
||||
go.mongodb.org/mongo-driver v1.15.0/go.mod h1:Vzb0Mk/pa7e6cWw85R4F/endUC3u0U9jGcNU603k65c=
|
||||
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
|
||||
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
|
||||
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
|
||||
|
@ -1033,8 +1035,8 @@ gocloud.dev v0.37.0 h1:XF1rN6R0qZI/9DYjN16Uy0durAmSlf58DHOcb28GPro=
|
|||
gocloud.dev v0.37.0/go.mod h1:7/O4kqdInCNsc6LqgmuFnS0GRew4XNNYWpA44yQnwco=
|
||||
gocloud.dev/pubsub/natspubsub v0.37.0 h1:pnszLOYtyOkwB8oqiwYA11OcJpZ0MGJYBhnKl8c3tCs=
|
||||
gocloud.dev/pubsub/natspubsub v0.37.0/go.mod h1:EN6ORFOviXxR/KALVQjTNh34EUcYQzMke6BCC6Czb8g=
|
||||
gocloud.dev/pubsub/rabbitpubsub v0.36.0 h1:f7KXLhk5HVZ81a2LKfTQY/HBDyXF0teQ7aMuJN+iK7I=
|
||||
gocloud.dev/pubsub/rabbitpubsub v0.36.0/go.mod h1:xCbgXeLHXrb2Yqs3WOTvmTvk6WeIfMMn3uq34uOk9yM=
|
||||
gocloud.dev/pubsub/rabbitpubsub v0.37.0 h1:vNmxM7C+6O6WbErN0TBB8NtrrUFHq3OS0yfokGVsJhQ=
|
||||
gocloud.dev/pubsub/rabbitpubsub v0.37.0/go.mod h1:f3FInA6G/zOhFoTPLZKca1CH3WknrJEd0Nlryl2C6fw=
|
||||
golang.org/x/arch v0.3.0 h1:02VY4/ZcO/gBOH6PUaoiptASxtXU10jazRCP865E97k=
|
||||
golang.org/x/arch v0.3.0/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
|
||||
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
|
@ -1072,7 +1074,6 @@ golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 h1:hNQpMuAJe5CtcUqCXaWga3FHu
|
|||
golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3/go.mod h1:idGWGoKP1toJGkd5/ig9ZLuPcZBC3ewk7SzmH0uou08=
|
||||
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
|
||||
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
|
||||
golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
|
||||
golang.org/x/image v0.15.0 h1:kOELfmgrmJlw4Cdb7g/QGuB3CvDrXbqEIww/pNtNBm8=
|
||||
golang.org/x/image v0.15.0/go.mod h1:HUYqC05R2ZcZ3ejNQsIHQDQiwWM4JBqmm6MKANTp4LE=
|
||||
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
|
||||
|
@ -1160,8 +1161,8 @@ golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4Iltr
|
|||
golang.org/x/oauth2 v0.0.0-20191202225959-858c2ad4c8b6/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||
golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
|
||||
golang.org/x/oauth2 v0.18.0 h1:09qnuIAgzdx1XplqJvW6CQqMCtGZykZWcXzPMPUusvI=
|
||||
golang.org/x/oauth2 v0.18.0/go.mod h1:Wf7knwG0MPoWIMMBgFlEaSUDaKskp0dCfrlJRJXbBi8=
|
||||
golang.org/x/oauth2 v0.19.0 h1:9+E/EZBCbTLNrbN35fHv/a/d/mOBatymz1zbtQrXpIg=
|
||||
golang.org/x/oauth2 v0.19.0/go.mod h1:vYi7skDa1x015PmRRYZ7+s1cWyPgrPiSYRe4rnsexc8=
|
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
|
@ -1365,16 +1366,14 @@ google.golang.org/api v0.24.0/go.mod h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0M
|
|||
google.golang.org/api v0.28.0/go.mod h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0MncE=
|
||||
google.golang.org/api v0.29.0/go.mod h1:Lcubydp8VUV7KeIHD9z2Bys/sm/vGKnG1UHuDBSrHWM=
|
||||
google.golang.org/api v0.30.0/go.mod h1:QGmEvQ87FHZNiUVJkT14jQNYJ4ZJjdRF23ZXz5138Fc=
|
||||
google.golang.org/api v0.172.0 h1:/1OcMZGPmW1rX2LCu2CmGUD1KXK1+pfzxotxyRUCCdk=
|
||||
google.golang.org/api v0.172.0/go.mod h1:+fJZq6QXWfa9pXhnIzsjx4yI22d4aI9ZpLb58gvXjis=
|
||||
google.golang.org/api v0.176.0 h1:dHj1/yv5Dm/eQTXiP9hNCRT3xzJHWXeNdRq29XbMxoE=
|
||||
google.golang.org/api v0.176.0/go.mod h1:Rra+ltKu14pps/4xTycZfobMgLpbosoaaL7c+SEMrO8=
|
||||
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
|
||||
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
|
||||
google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
|
||||
google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0=
|
||||
google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
|
||||
google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
|
||||
google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM=
|
||||
google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds=
|
||||
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
|
||||
google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
|
||||
google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
|
||||
|
@ -1411,10 +1410,10 @@ google.golang.org/genproto v0.0.0-20210903162649-d08c68adba83/go.mod h1:eFjDcFEc
|
|||
google.golang.org/genproto v0.0.0-20220421151946-72621c1f0bd3/go.mod h1:8w6bsBMX6yCPbAVTeqQHvzxW0EIFigd5lZyahWgyfDo=
|
||||
google.golang.org/genproto v0.0.0-20240311173647-c811ad7063a7 h1:ImUcDPHjTrAqNhlOkSocDLfG9rrNHH7w7uoKWPaWZ8s=
|
||||
google.golang.org/genproto v0.0.0-20240311173647-c811ad7063a7/go.mod h1:/3XmxOjePkvmKrHuBy4zNFw7IzxJXtAgdpXi8Ll990U=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20240311173647-c811ad7063a7 h1:oqta3O3AnlWbmIE3bFnWbu4bRxZjfbWCp0cKSuZh01E=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20240311173647-c811ad7063a7/go.mod h1:VQW3tUculP/D4B+xVCo+VgSq8As6wA9ZjHl//pmk+6s=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 h1:NnYq6UN9ReLM9/Y01KWNOWyI5xQ9kbIms5GGJVwS/Yc=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20240314234333-6e1732d8331c h1:kaI7oewGK5YnVwj+Y+EJBO/YN1ht8iTL9XkFHtVZLsc=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20240314234333-6e1732d8331c/go.mod h1:VQW3tUculP/D4B+xVCo+VgSq8As6wA9ZjHl//pmk+6s=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be h1:LG9vZxsWGOmUKieR8wPAUR3u3MpnYFQZROPIMaXh7/A=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
|
||||
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
|
||||
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
|
||||
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
|
||||
|
|
|
@ -72,7 +72,9 @@ func (r *LockRing) SetSnapshot(servers []pb.ServerAddress) {
|
|||
return servers[i] < servers[j]
|
||||
})
|
||||
|
||||
r.Lock()
|
||||
r.lastUpdateTime = time.Now()
|
||||
r.Unlock()
|
||||
|
||||
r.addOneSnapshot(servers)
|
||||
|
||||
|
|
|
@ -38,7 +38,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR
|
|||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
glog.V(0).Infof("delete directory %s: %v", p, err)
|
||||
glog.V(2).Infof("delete directory %s: %v", p, err)
|
||||
return fmt.Errorf("delete directory %s: %v", p, err)
|
||||
}
|
||||
}
|
||||
|
@ -76,7 +76,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
|
|||
}
|
||||
if lastFileName == "" && !isRecursive && len(entries) > 0 {
|
||||
// only for first iteration in the loop
|
||||
glog.V(0).Infof("deleting a folder %s has children: %+v ...", entry.FullPath, entries[0].Name())
|
||||
glog.V(2).Infof("deleting a folder %s has children: %+v ...", entry.FullPath, entries[0].Name())
|
||||
return fmt.Errorf("%s: %s", MsgFailDelNonEmptyFolder, entry.FullPath)
|
||||
}
|
||||
|
||||
|
|
33
weed/filer/postgres_s3/README.md
Normal file
33
weed/filer/postgres_s3/README.md
Normal file
|
@ -0,0 +1,33 @@
|
|||
# postgres_s3
|
||||
|
||||
The `postgres_s3` filer implementation uses postgres-specific features and data structures to improve upon previous SQL implementations based on the `abstract_sql` module.
|
||||
|
||||
Of note, the `postgres2` filer implementation may leak directory hierarchy metadata when frequent inserts and deletes are
|
||||
performed using the S3 API. If an application workload pattern creates directories, populates them temporarily, and then
|
||||
deletes all objects in the directory, the `postgres2` filer implementation will continue to indefinitely maintain
|
||||
information about the orphaned directories. While these directories will not be shown in S3 API list requests, the metadata
|
||||
remains and places the burden of an unbounded number of unused rows on postgres.
|
||||
|
||||
Seaweedfs provides the `-s3.allowEmptyFolder=false` CLI argument to automatically clean up orphaned directory entries, but
|
||||
this process necessarily races under high load and can cause unpredictable filer and postgres behavior.
|
||||
|
||||
To solve this problem, `postgres_s3` does the following:
|
||||
|
||||
1. One row in postgres _fully_ represents one object and its metadata
|
||||
2. Insert, update, get, and delete operate on a single row
|
||||
3. An array is stored of possible prefixes for each key
|
||||
4. List requests leverage the prefixes to dynamically assemble directory entries using a complex `SELECT` statement
|
||||
|
||||
In order to efficiently query directory entries during list requests, `postgres_s3` uses special features of
|
||||
postgres:
|
||||
|
||||
* An int64 array field called `prefixes` with a hash of each prefix found for a specific key
|
||||
* GIN indexing that provides fast set membership information on array fields
|
||||
* Special functions `split_part` (text parsing) and `cardinality` (length of the array field)
|
||||
|
||||
`postgres_s3` uses automatic upsert capability with `ON CONFLICT ... UPDATE` so that insert and update are the same
|
||||
race-free operation.
|
||||
|
||||
In the filer metadata tables, all objects start with `/`, causing prefix calculation to include the empty string (`""`).
|
||||
For space and index optimization, `postgres_s3` does not store the root prefix in the `prefixes` array, and instead
|
||||
relies on the condition `cardinality(prefixes) < 1` to discover objects at the root directory.
|
83
weed/filer/postgres_s3/postgres3_kvstore.go
Normal file
83
weed/filer/postgres_s3/postgres3_kvstore.go
Normal file
|
@ -0,0 +1,83 @@
|
|||
package postgres_s3
|
||||
|
||||
/*
|
||||
* Copyright 2022 Splunk Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"path"
|
||||
|
||||
"github.com/lib/pq"
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer/abstract_sql"
|
||||
)
|
||||
|
||||
func (store *PostgresS3Store) KvPut(ctx context.Context, key []byte, value []byte) error {
|
||||
db, _, _, err := store.getTxOrDB(ctx, "", false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("findDB: %v", err)
|
||||
}
|
||||
|
||||
prefixes := calculatePrefixes(string(key))
|
||||
hashedPrefixes := hashPrefixArray(prefixes)
|
||||
_, err = db.ExecContext(ctx, fmt.Sprintf(insertEntryPattern, abstract_sql.DEFAULT_TABLE), key, path.Base(string(key)), pq.Array(hashedPrefixes), value)
|
||||
if err != nil {
|
||||
return fmt.Errorf("kv put: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *PostgresS3Store) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
|
||||
db, _, _, err := store.getTxOrDB(ctx, "", false)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("findDB: %v", err)
|
||||
}
|
||||
|
||||
row := db.QueryRowContext(ctx, fmt.Sprintf(findEntryPattern, abstract_sql.DEFAULT_TABLE), key)
|
||||
|
||||
err = row.Scan(&value)
|
||||
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, filer.ErrKvNotFound
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("kv get: %v", err)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (store *PostgresS3Store) KvDelete(ctx context.Context, key []byte) error {
|
||||
db, _, _, err := store.getTxOrDB(ctx, "", false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("findDB: %v", err)
|
||||
}
|
||||
|
||||
res, err := db.ExecContext(ctx, fmt.Sprintf(deleteEntryPattern, abstract_sql.DEFAULT_TABLE), key)
|
||||
if err != nil {
|
||||
return fmt.Errorf("kv delete: %s", err)
|
||||
}
|
||||
|
||||
_, err = res.RowsAffected()
|
||||
if err != nil {
|
||||
return fmt.Errorf("kv delete no rows affected: %s", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
511
weed/filer/postgres_s3/postgres3_store.go
Normal file
511
weed/filer/postgres_s3/postgres3_store.go
Normal file
|
@ -0,0 +1,511 @@
|
|||
package postgres_s3
|
||||
|
||||
/*
|
||||
* Copyright 2022 Splunk Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/lib/pq"
|
||||
_ "github.com/lib/pq"
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer/abstract_sql"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
)
|
||||
|
||||
const (
|
||||
CONNECTION_URL_PATTERN = "host=%s port=%d sslmode=%s connect_timeout=30"
|
||||
|
||||
createTablePattern = `CREATE TABLE IF NOT EXISTS "%s" (
|
||||
key varchar(65535) PRIMARY KEY,
|
||||
name varchar(65535),
|
||||
prefixes bigint[],
|
||||
meta bytea
|
||||
)`
|
||||
createTableIndexPattern = `CREATE INDEX on "%s" USING gin (prefixes);`
|
||||
deleteTablePattern = `DROP TABLE "%s";`
|
||||
insertEntryPattern = `INSERT INTO "%s" (key, name, prefixes, meta) VALUES ($1, $2, $3, $4)
|
||||
ON CONFLICT (key)
|
||||
DO
|
||||
UPDATE SET meta = EXCLUDED.meta;`
|
||||
findEntryPattern = `SELECT meta FROM "%s" WHERE key = $1`
|
||||
deleteEntryPattern = `DELETE FROM "%s" WHERE key = $1`
|
||||
listEntryQueryPattern = `SELECT key, name, isdir, meta FROM
|
||||
(
|
||||
SELECT key, name, false as isdir, meta FROM "%s"
|
||||
WHERE prefixes @> $1 AND cardinality(prefixes) < $5
|
||||
AND name __COMPARISON__ $3 AND name LIKE $4 ORDER BY key ASC LIMIT $6
|
||||
) s1
|
||||
UNION
|
||||
(
|
||||
SELECT dir, dir, true isdir, NULL::bytea meta FROM
|
||||
(
|
||||
SELECT DISTINCT split_part(key, '/', $2) AS dir FROM "%s"
|
||||
WHERE prefixes @> $1 AND cardinality(prefixes) > $5 - 1 ORDER BY dir ASC
|
||||
) t1
|
||||
WHERE t1.dir > $3 AND t1.dir LIKE $4 ORDER BY dir ASC
|
||||
)
|
||||
ORDER BY name ASC LIMIT $6`
|
||||
deleteFolderChildrenPattern = `DELETE FROM "%s" WHERE prefixes @> $1 and key like $2`
|
||||
)
|
||||
|
||||
var (
|
||||
listEntryExclusivePattern string
|
||||
listEntryInclusivePattern string
|
||||
)
|
||||
|
||||
var _ filer.BucketAware = (*PostgresS3Store)(nil)
|
||||
|
||||
func init() {
|
||||
filer.Stores = append(filer.Stores, &PostgresS3Store{})
|
||||
|
||||
listEntryExclusivePattern = strings.ReplaceAll(listEntryQueryPattern, "__COMPARISON__", ">")
|
||||
listEntryInclusivePattern = strings.ReplaceAll(listEntryQueryPattern, "__COMPARISON__", ">=")
|
||||
}
|
||||
|
||||
type PostgresS3Store struct {
|
||||
DB *sql.DB
|
||||
SupportBucketTable bool
|
||||
dbs map[string]bool
|
||||
dbsLock sync.Mutex
|
||||
}
|
||||
|
||||
func (store *PostgresS3Store) GetName() string {
|
||||
return "postgres_s3"
|
||||
}
|
||||
|
||||
func (store *PostgresS3Store) Initialize(configuration util.Configuration, prefix string) error {
|
||||
return store.initialize(
|
||||
configuration.GetString(prefix+"username"),
|
||||
configuration.GetString(prefix+"password"),
|
||||
configuration.GetString(prefix+"hostname"),
|
||||
configuration.GetInt(prefix+"port"),
|
||||
configuration.GetString(prefix+"database"),
|
||||
configuration.GetString(prefix+"schema"),
|
||||
configuration.GetString(prefix+"sslmode"),
|
||||
configuration.GetInt(prefix+"connection_max_idle"),
|
||||
configuration.GetInt(prefix+"connection_max_open"),
|
||||
configuration.GetInt(prefix+"connection_max_lifetime_seconds"),
|
||||
)
|
||||
}
|
||||
|
||||
func (store *PostgresS3Store) initialize(user, password, hostname string, port int, database, schema, sslmode string, maxIdle, maxOpen, maxLifetimeSeconds int) (err error) {
|
||||
store.SupportBucketTable = true
|
||||
sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, sslmode)
|
||||
if user != "" {
|
||||
sqlUrl += " user=" + user
|
||||
}
|
||||
adaptedSqlUrl := sqlUrl
|
||||
if password != "" {
|
||||
sqlUrl += " password=" + password
|
||||
adaptedSqlUrl += " password=ADAPTED"
|
||||
}
|
||||
if database != "" {
|
||||
sqlUrl += " dbname=" + database
|
||||
adaptedSqlUrl += " dbname=" + database
|
||||
}
|
||||
if schema != "" {
|
||||
sqlUrl += " search_path=" + schema
|
||||
adaptedSqlUrl += " search_path=" + schema
|
||||
}
|
||||
var dbErr error
|
||||
store.DB, dbErr = sql.Open("postgres", sqlUrl)
|
||||
if dbErr != nil {
|
||||
store.DB.Close()
|
||||
store.DB = nil
|
||||
return fmt.Errorf("can not connect to %s error:%v", adaptedSqlUrl, err)
|
||||
}
|
||||
|
||||
store.DB.SetMaxIdleConns(maxIdle)
|
||||
store.DB.SetMaxOpenConns(maxOpen)
|
||||
store.DB.SetConnMaxLifetime(time.Duration(maxLifetimeSeconds) * time.Second)
|
||||
|
||||
if err = store.DB.Ping(); err != nil {
|
||||
return fmt.Errorf("connect to %s error:%v", sqlUrl, err)
|
||||
}
|
||||
|
||||
if err = store.CreateTable(context.Background(), abstract_sql.DEFAULT_TABLE); err != nil {
|
||||
return fmt.Errorf("init table %s: %v", abstract_sql.DEFAULT_TABLE, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *PostgresS3Store) CanDropWholeBucket() bool {
|
||||
return store.SupportBucketTable
|
||||
}
|
||||
|
||||
func (store *PostgresS3Store) OnBucketCreation(bucket string) {
|
||||
store.dbsLock.Lock()
|
||||
defer store.dbsLock.Unlock()
|
||||
|
||||
store.CreateTable(context.Background(), bucket)
|
||||
|
||||
if store.dbs == nil {
|
||||
return
|
||||
}
|
||||
store.dbs[bucket] = true
|
||||
}
|
||||
|
||||
func (store *PostgresS3Store) OnBucketDeletion(bucket string) {
|
||||
store.dbsLock.Lock()
|
||||
defer store.dbsLock.Unlock()
|
||||
|
||||
store.deleteTable(context.Background(), bucket)
|
||||
|
||||
if store.dbs == nil {
|
||||
return
|
||||
}
|
||||
delete(store.dbs, bucket)
|
||||
}
|
||||
|
||||
func (store *PostgresS3Store) getTxOrDB(ctx context.Context, fullpath util.FullPath, isForChildren bool) (txOrDB abstract_sql.TxOrDB, bucket string, shortPath util.FullPath, err error) {
|
||||
|
||||
shortPath = fullpath
|
||||
bucket = abstract_sql.DEFAULT_TABLE
|
||||
|
||||
if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
|
||||
txOrDB = tx
|
||||
} else {
|
||||
txOrDB = store.DB
|
||||
}
|
||||
|
||||
if !store.SupportBucketTable {
|
||||
return
|
||||
}
|
||||
|
||||
if !strings.HasPrefix(string(fullpath), "/buckets/") {
|
||||
return
|
||||
}
|
||||
|
||||
// detect bucket
|
||||
bucketAndObjectKey := string(fullpath)[len("/buckets/"):]
|
||||
t := strings.Index(bucketAndObjectKey, "/")
|
||||
if t < 0 && !isForChildren {
|
||||
return
|
||||
}
|
||||
bucket = bucketAndObjectKey
|
||||
shortPath = "/"
|
||||
if t > 0 {
|
||||
bucket = bucketAndObjectKey[:t]
|
||||
shortPath = util.FullPath(bucketAndObjectKey[t:])
|
||||
}
|
||||
|
||||
if isValidBucket(bucket) {
|
||||
store.dbsLock.Lock()
|
||||
defer store.dbsLock.Unlock()
|
||||
|
||||
if store.dbs == nil {
|
||||
store.dbs = make(map[string]bool)
|
||||
}
|
||||
|
||||
if _, found := store.dbs[bucket]; !found {
|
||||
if err = store.CreateTable(ctx, bucket); err == nil {
|
||||
store.dbs[bucket] = true
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (store *PostgresS3Store) InsertEntry(ctx context.Context, entry *filer.Entry) error {
|
||||
db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("findDB %s : %v", entry.FullPath, err)
|
||||
}
|
||||
|
||||
if entry.IsDirectory() {
|
||||
if isValidBucket(bucket) && !strings.HasPrefix(string(shortPath), "/.uploads") {
|
||||
// Ignore directory creations, but not bucket creations or multipart uploads
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
meta, err := entry.EncodeAttributesAndChunks()
|
||||
if err != nil {
|
||||
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
|
||||
}
|
||||
|
||||
if len(entry.Chunks) > 50 {
|
||||
meta = util.MaybeGzipData(meta)
|
||||
}
|
||||
|
||||
prefixes := calculatePrefixes(string(shortPath))
|
||||
hashedPrefixes := hashPrefixArray(prefixes)
|
||||
_, err = db.ExecContext(ctx, fmt.Sprintf(insertEntryPattern, bucket), shortPath, path.Base(string(shortPath)), pq.Array(hashedPrefixes), meta)
|
||||
if err != nil {
|
||||
return fmt.Errorf("insert/upsert %s: %s", entry.FullPath, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *PostgresS3Store) UpdateEntry(ctx context.Context, entry *filer.Entry) error {
|
||||
return store.InsertEntry(ctx, entry)
|
||||
}
|
||||
|
||||
func (store *PostgresS3Store) FindEntry(ctx context.Context, fullpath util.FullPath) (*filer.Entry, error) {
|
||||
|
||||
db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("findDB %s : %v", fullpath, err)
|
||||
}
|
||||
|
||||
row := db.QueryRowContext(ctx, fmt.Sprintf(findEntryPattern, bucket), shortPath)
|
||||
|
||||
var data []byte
|
||||
if err := row.Scan(&data); err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, filer_pb.ErrNotFound
|
||||
}
|
||||
return nil, fmt.Errorf("find %s: %v", fullpath, err)
|
||||
}
|
||||
|
||||
entry := &filer.Entry{
|
||||
FullPath: fullpath,
|
||||
}
|
||||
if err := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
|
||||
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
|
||||
}
|
||||
|
||||
return entry, nil
|
||||
}
|
||||
|
||||
func (store *PostgresS3Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
|
||||
db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("findDB %s : %v", fullpath, err)
|
||||
}
|
||||
|
||||
res, err := db.ExecContext(ctx, fmt.Sprintf(deleteEntryPattern, bucket), shortPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("delete %s: %s", fullpath, err)
|
||||
}
|
||||
|
||||
_, err = res.RowsAffected()
|
||||
if err != nil {
|
||||
return fmt.Errorf("delete %s but no rows affected: %s", fullpath, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *PostgresS3Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
|
||||
db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("findDB %s : %v", fullpath, err)
|
||||
}
|
||||
|
||||
if isValidBucket(bucket) && shortPath == "/" {
|
||||
if err = store.deleteTable(ctx, bucket); err == nil {
|
||||
store.dbsLock.Lock()
|
||||
delete(store.dbs, bucket)
|
||||
store.dbsLock.Unlock()
|
||||
return nil
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
sqlText := fmt.Sprintf(deleteFolderChildrenPattern, bucket)
|
||||
prefixes := calculatePrefixes(string(shortPath))
|
||||
hashedPrefixes := hashPrefixArray(prefixes)
|
||||
glog.V(4).Infof("delete %s SQL %s %d", string(shortPath), sqlText, hashedPrefixes)
|
||||
res, err := db.ExecContext(ctx, sqlText, pq.Array(hashedPrefixes), string(shortPath)+"/%")
|
||||
if err != nil {
|
||||
return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err)
|
||||
}
|
||||
|
||||
_, err = res.RowsAffected()
|
||||
if err != nil {
|
||||
return fmt.Errorf("deleteFolderChildren %s but no rows affected: %s", fullpath, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *PostgresS3Store) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
|
||||
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", nil)
|
||||
}
|
||||
|
||||
func (store *PostgresS3Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
|
||||
db, bucket, shortPath, err := store.getTxOrDB(ctx, dirPath, true)
|
||||
if err != nil {
|
||||
return lastFileName, fmt.Errorf("findDB %s : %v", dirPath, err)
|
||||
}
|
||||
|
||||
slashedShortPath := appendSlash(string(shortPath))
|
||||
shortPathParts := len(strings.Split(slashedShortPath, "/"))
|
||||
|
||||
sqlText := fmt.Sprintf(listEntryExclusivePattern, bucket, bucket)
|
||||
if includeStartFile {
|
||||
sqlText = fmt.Sprintf(listEntryInclusivePattern, bucket, bucket)
|
||||
}
|
||||
|
||||
prefixes := calculatePrefixes(string(slashedShortPath))
|
||||
hashedPrefixes := hashPrefixArray(prefixes)
|
||||
|
||||
rows, err := db.QueryContext(ctx, sqlText,
|
||||
pq.Array(hashedPrefixes),
|
||||
shortPathParts,
|
||||
startFileName,
|
||||
prefix+"%",
|
||||
shortPathParts-1,
|
||||
limit+1)
|
||||
|
||||
if err != nil {
|
||||
return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var key string
|
||||
var name string
|
||||
var isDir bool
|
||||
var data []byte
|
||||
if err = rows.Scan(&key, &name, &isDir, &data); err != nil {
|
||||
glog.V(0).Infof("scan %s : %v", dirPath, err)
|
||||
return lastFileName, fmt.Errorf("scan %s: %v", dirPath, err)
|
||||
}
|
||||
|
||||
if !isDir {
|
||||
lastFileName = name
|
||||
|
||||
entry := &filer.Entry{
|
||||
FullPath: util.NewFullPath(string(dirPath), name),
|
||||
}
|
||||
|
||||
if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
|
||||
glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err)
|
||||
return lastFileName, fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
|
||||
}
|
||||
|
||||
if !eachEntryFunc(entry) {
|
||||
break
|
||||
}
|
||||
} else {
|
||||
lastFileName = key
|
||||
dirName := key
|
||||
entry := &filer.Entry{
|
||||
FullPath: util.NewFullPath(string(dirPath), dirName),
|
||||
}
|
||||
|
||||
entry.Attr.Mode |= os.ModeDir | 0775
|
||||
if !eachEntryFunc(entry) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return lastFileName, nil
|
||||
}
|
||||
|
||||
func (store *PostgresS3Store) BeginTransaction(ctx context.Context) (context.Context, error) {
|
||||
tx, err := store.DB.BeginTx(ctx, &sql.TxOptions{
|
||||
Isolation: sql.LevelReadCommitted,
|
||||
ReadOnly: false,
|
||||
})
|
||||
if err != nil {
|
||||
return ctx, err
|
||||
}
|
||||
|
||||
return context.WithValue(ctx, "tx", tx), nil
|
||||
}
|
||||
|
||||
func (store *PostgresS3Store) CommitTransaction(ctx context.Context) error {
|
||||
if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
|
||||
return tx.Commit()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *PostgresS3Store) RollbackTransaction(ctx context.Context) error {
|
||||
if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
|
||||
return tx.Rollback()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *PostgresS3Store) Shutdown() {
|
||||
store.DB.Close()
|
||||
}
|
||||
|
||||
func (store *PostgresS3Store) CreateTable(ctx context.Context, bucket string) error {
|
||||
_, err := store.DB.ExecContext(ctx, fmt.Sprintf(createTablePattern, bucket))
|
||||
if err != nil {
|
||||
return fmt.Errorf("create bucket table: %v", err)
|
||||
}
|
||||
|
||||
_, err = store.DB.ExecContext(ctx, fmt.Sprintf(createTableIndexPattern, bucket))
|
||||
if err != nil {
|
||||
return fmt.Errorf("create bucket index: %v", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (store *PostgresS3Store) deleteTable(ctx context.Context, bucket string) error {
|
||||
if !store.SupportBucketTable {
|
||||
return nil
|
||||
}
|
||||
_, err := store.DB.ExecContext(ctx, fmt.Sprintf(deleteTablePattern, bucket))
|
||||
return err
|
||||
}
|
||||
|
||||
func isValidBucket(bucket string) bool {
|
||||
return bucket != abstract_sql.DEFAULT_TABLE && bucket != ""
|
||||
}
|
||||
|
||||
// calculatePrefixes returns the prefixes for a given path. The root prefix "/" is ignored to
|
||||
// save space in the returned array
|
||||
func calculatePrefixes(fullPath string) []string {
|
||||
res := strings.Split(fullPath, "/")
|
||||
maxPrefixes := len(res)
|
||||
|
||||
var retval []string
|
||||
for i := 1; i < maxPrefixes; i++ {
|
||||
calculatedPrefix := strings.Join(res[0:i], "/") + "/"
|
||||
if calculatedPrefix == "/" {
|
||||
continue
|
||||
}
|
||||
retval = append(retval, calculatedPrefix)
|
||||
}
|
||||
return retval
|
||||
}
|
||||
|
||||
// hashPrefixArray converts input prefix array into int64 hashes
|
||||
func hashPrefixArray(a []string) []int64 {
|
||||
hashed := make([]int64, len(a))
|
||||
for i := range a {
|
||||
hashed[i] = util.HashStringToLong(a[i])
|
||||
}
|
||||
return hashed
|
||||
}
|
||||
|
||||
func appendSlash(s string) string {
|
||||
if !strings.HasSuffix(s, "/") {
|
||||
return s + "/"
|
||||
}
|
||||
return s
|
||||
}
|
41
weed/filer/postgres_s3/postgres3_store_test.go
Normal file
41
weed/filer/postgres_s3/postgres3_store_test.go
Normal file
|
@ -0,0 +1,41 @@
|
|||
package postgres_s3
|
||||
|
||||
/*
|
||||
* Copyright 2022 Splunk Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestCalculatePrefixes(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
var path string
|
||||
var prefixes []string
|
||||
|
||||
path = "/test1"
|
||||
prefixes = calculatePrefixes(path)
|
||||
assert.Equal(prefixes, []string(nil))
|
||||
|
||||
path = "/test1/test2"
|
||||
prefixes = calculatePrefixes(path)
|
||||
assert.Equal(prefixes, []string{"/test1/"})
|
||||
|
||||
path = "/test1/test2/test3"
|
||||
prefixes = calculatePrefixes(path)
|
||||
assert.Equal(prefixes, []string{"/test1/", "/test1/test2/"})
|
||||
}
|
|
@ -8,7 +8,7 @@ import (
|
|||
"image/png"
|
||||
"io"
|
||||
|
||||
"github.com/disintegration/imaging"
|
||||
"github.com/cognusion/imaging"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
)
|
||||
|
|
|
@ -8,7 +8,7 @@ import (
|
|||
"image/png"
|
||||
"io"
|
||||
|
||||
"github.com/disintegration/imaging"
|
||||
"github.com/cognusion/imaging"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
|
||||
|
|
|
@ -36,6 +36,7 @@ import (
|
|||
_ "github.com/seaweedfs/seaweedfs/weed/filer/mysql2"
|
||||
_ "github.com/seaweedfs/seaweedfs/weed/filer/postgres"
|
||||
_ "github.com/seaweedfs/seaweedfs/weed/filer/postgres2"
|
||||
_ "github.com/seaweedfs/seaweedfs/weed/filer/postgres_s3"
|
||||
_ "github.com/seaweedfs/seaweedfs/weed/filer/redis"
|
||||
_ "github.com/seaweedfs/seaweedfs/weed/filer/redis2"
|
||||
_ "github.com/seaweedfs/seaweedfs/weed/filer/redis3"
|
||||
|
|
|
@ -89,7 +89,7 @@ func (fs *FilerServer) DeleteTaggingHandler(w http.ResponseWriter, r *http.Reque
|
|||
|
||||
// delete all tags or specific tags
|
||||
hasDeletion := false
|
||||
for header, _ := range existingEntry.Extended {
|
||||
for header := range existingEntry.Extended {
|
||||
if strings.HasPrefix(header, needle.PairNamePrefix) {
|
||||
if len(deletions) == 0 {
|
||||
delete(existingEntry.Extended, header)
|
||||
|
|
|
@ -402,14 +402,12 @@ func adjustAfterMove(v *master_pb.VolumeInformationMessage, volumeReplicas map[u
|
|||
replica.location = &loc
|
||||
for diskType, diskInfo := range fullNode.info.DiskInfos {
|
||||
if diskType == v.DiskType {
|
||||
diskInfo.VolumeCount--
|
||||
diskInfo.FreeVolumeCount++
|
||||
addVolumeCount(diskInfo, -1)
|
||||
}
|
||||
}
|
||||
for diskType, diskInfo := range emptyNode.info.DiskInfos {
|
||||
if diskType == v.DiskType {
|
||||
diskInfo.VolumeCount++
|
||||
diskInfo.FreeVolumeCount--
|
||||
addVolumeCount(diskInfo, 1)
|
||||
}
|
||||
}
|
||||
return
|
||||
|
|
|
@ -4,16 +4,17 @@ import (
|
|||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||
"golang.org/x/exp/slices"
|
||||
"google.golang.org/grpc"
|
||||
"io"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/operation"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||
|
@ -316,7 +317,7 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co
|
|||
|
||||
if !takeAction {
|
||||
// adjust volume count
|
||||
dst.dataNode.DiskInfos[replica.info.DiskType].VolumeCount++
|
||||
addVolumeCount(dst.dataNode.DiskInfos[replica.info.DiskType], 1)
|
||||
break
|
||||
}
|
||||
|
||||
|
@ -350,7 +351,7 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co
|
|||
}
|
||||
|
||||
// adjust volume count
|
||||
dst.dataNode.DiskInfos[replica.info.DiskType].VolumeCount++
|
||||
addVolumeCount(dst.dataNode.DiskInfos[replica.info.DiskType], 1)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
@ -361,6 +362,14 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co
|
|||
return nil
|
||||
}
|
||||
|
||||
func addVolumeCount(info *master_pb.DiskInfo, count int) {
|
||||
if info == nil {
|
||||
return
|
||||
}
|
||||
info.VolumeCount += int64(count)
|
||||
info.FreeVolumeCount -= int64(count)
|
||||
}
|
||||
|
||||
func keepDataNodesSorted(dataNodes []location, diskType types.DiskType) {
|
||||
fn := capacityByFreeVolumeCount(diskType)
|
||||
slices.SortFunc(dataNodes, func(a, b location) int {
|
||||
|
|
|
@ -5,15 +5,16 @@ import (
|
|||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||
"github.com/seaweedfs/seaweedfs/weed/wdclient"
|
||||
"io"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/operation"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
||||
|
@ -212,7 +213,7 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer
|
|||
hasFoundTarget = true
|
||||
|
||||
// adjust volume count
|
||||
dst.dataNode.DiskInfos[string(toDiskType)].VolumeCount++
|
||||
addVolumeCount(dst.dataNode.DiskInfos[string(toDiskType)], 1)
|
||||
|
||||
destServerAddress := pb.NewServerAddressFromDataNode(dst.dataNode)
|
||||
c.queues[destServerAddress] <- volumeTierMoveJob{sourceVolumeServer, vid}
|
||||
|
|
|
@ -328,7 +328,7 @@ func ReadUrlAsStreamAuthenticated(fileUrl, jwt string, cipherKey []byte, isConte
|
|||
}
|
||||
defer CloseResponse(r)
|
||||
if r.StatusCode >= 400 {
|
||||
retryable = r.StatusCode == http.StatusNotFound || r.StatusCode >= 500
|
||||
retryable = r.StatusCode == http.StatusNotFound || r.StatusCode >= 499
|
||||
return retryable, fmt.Errorf("%s: %s", fileUrl, r.Status)
|
||||
}
|
||||
|
||||
|
|
|
@ -66,9 +66,17 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition
|
|||
isDone = true
|
||||
return
|
||||
}
|
||||
logBuffer.RLock()
|
||||
lastTsNs := logBuffer.LastTsNs
|
||||
for lastTsNs == logBuffer.LastTsNs {
|
||||
logBuffer.RUnlock()
|
||||
loopTsNs := lastTsNs // make a copy
|
||||
|
||||
for lastTsNs == loopTsNs {
|
||||
if waitForDataFn() {
|
||||
// Update loopTsNs and loop again
|
||||
logBuffer.RLock()
|
||||
loopTsNs = logBuffer.LastTsNs
|
||||
logBuffer.RUnlock()
|
||||
continue
|
||||
} else {
|
||||
isDone = true
|
||||
|
|
Loading…
Reference in a new issue