mirror of
https://github.com/chrislusf/seaweedfs
synced 2024-06-26 12:29:37 +02:00
fix: filer authenticate with with volume server (#5480)
This commit is contained in:
parent
9e07a87fcb
commit
5c8e6014ba
|
@ -107,7 +107,7 @@ func fetchWholeChunk(bytesBuffer *bytes.Buffer, lookupFileIdFn wdclient.LookupFi
|
|||
glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
|
||||
return err
|
||||
}
|
||||
err = retriedStreamFetchChunkData(bytesBuffer, urlStrings, cipherKey, isGzipped, true, 0, 0)
|
||||
err = retriedStreamFetchChunkData(bytesBuffer, urlStrings, "", cipherKey, isGzipped, true, 0, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -123,7 +123,7 @@ func fetchChunkRange(buffer []byte, lookupFileIdFn wdclient.LookupFileIdFunction
|
|||
return util.RetriedFetchChunkData(buffer, urlStrings, cipherKey, isGzipped, false, offset)
|
||||
}
|
||||
|
||||
func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) {
|
||||
func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, jwt string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) {
|
||||
|
||||
var shouldRetry bool
|
||||
var totalWritten int
|
||||
|
@ -132,7 +132,7 @@ func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, cipherKe
|
|||
for _, urlString := range urlStrings {
|
||||
var localProcessed int
|
||||
var writeErr error
|
||||
shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
|
||||
shouldRetry, err = util.ReadUrlAsStreamAuthenticated(urlString+"?readDeleted=true", jwt, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
|
||||
if totalWritten > localProcessed {
|
||||
toBeSkipped := totalWritten - localProcessed
|
||||
if len(data) <= toBeSkipped {
|
||||
|
|
|
@ -69,11 +69,17 @@ func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.R
|
|||
|
||||
type DoStreamContent func(writer io.Writer) error
|
||||
|
||||
func PrepareStreamContent(masterClient wdclient.HasLookupFileIdFunction, chunks []*filer_pb.FileChunk, offset int64, size int64) (DoStreamContent, error) {
|
||||
return PrepareStreamContentWithThrottler(masterClient, chunks, offset, size, 0)
|
||||
func PrepareStreamContent(masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64) (DoStreamContent, error) {
|
||||
return PrepareStreamContentWithThrottler(masterClient, jwtFunc, chunks, offset, size, 0)
|
||||
}
|
||||
|
||||
func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) (DoStreamContent, error) {
|
||||
type VolumeServerJwtFunction func(fileId string) string
|
||||
|
||||
func noJwtFunc(string) string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) (DoStreamContent, error) {
|
||||
glog.V(4).Infof("prepare to stream content for chunks: %d", len(chunks))
|
||||
chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
|
||||
|
||||
|
@ -119,7 +125,8 @@ func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunc
|
|||
}
|
||||
urlStrings := fileId2Url[chunkView.FileId]
|
||||
start := time.Now()
|
||||
err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize))
|
||||
jwt := jwtFunc(chunkView.FileId)
|
||||
err := retriedStreamFetchChunkData(writer, urlStrings, jwt, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize))
|
||||
offset += int64(chunkView.ViewSize)
|
||||
remaining -= int64(chunkView.ViewSize)
|
||||
stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds())
|
||||
|
@ -143,7 +150,7 @@ func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunc
|
|||
}
|
||||
|
||||
func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
|
||||
streamFn, err := PrepareStreamContent(masterClient, chunks, offset, size)
|
||||
streamFn, err := PrepareStreamContent(masterClient, noJwtFunc, chunks, offset, size)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -91,6 +91,7 @@ type FilerServer struct {
|
|||
secret security.SigningKey
|
||||
filer *filer.Filer
|
||||
filerGuard *security.Guard
|
||||
volumeGuard *security.Guard
|
||||
grpcDialOption grpc.DialOption
|
||||
|
||||
// metrics read from the master
|
||||
|
@ -113,6 +114,14 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
|
|||
v.SetDefault("jwt.filer_signing.read.expires_after_seconds", 60)
|
||||
readExpiresAfterSec := v.GetInt("jwt.filer_signing.read.expires_after_seconds")
|
||||
|
||||
volumeSigningKey := v.GetString("jwt.signing.key")
|
||||
v.SetDefault("jwt.signing.expires_after_seconds", 10)
|
||||
volumeExpiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds")
|
||||
|
||||
volumeReadSigningKey := v.GetString("jwt.signing.read.key")
|
||||
v.SetDefault("jwt.signing.read.expires_after_seconds", 60)
|
||||
volumeReadExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds")
|
||||
|
||||
v.SetDefault("cors.allowed_origins.values", "*")
|
||||
|
||||
allowedOrigins := v.GetString("cors.allowed_origins.values")
|
||||
|
@ -145,6 +154,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
|
|||
fs.filer.Cipher = option.Cipher
|
||||
// we do not support IP whitelist right now
|
||||
fs.filerGuard = security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
|
||||
fs.volumeGuard = security.NewGuard([]string{}, volumeSigningKey, volumeExpiresAfterSec, volumeReadSigningKey, volumeReadExpiresAfterSec)
|
||||
|
||||
fs.checkWithMaster()
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@ package weed_server
|
|||
|
||||
import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/security"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util/mem"
|
||||
"io"
|
||||
|
@ -20,6 +21,26 @@ func init() {
|
|||
}}
|
||||
}
|
||||
|
||||
func (fs *FilerServer) maybeAddVolumeJwtAuthorization(r *http.Request, fileId string, isWrite bool) {
|
||||
encodedJwt := fs.maybeGetVolumeJwtAuthorizationToken(fileId, isWrite)
|
||||
|
||||
if encodedJwt == "" {
|
||||
return
|
||||
}
|
||||
|
||||
r.Header.Set("Authorization", "BEARER "+string(encodedJwt))
|
||||
}
|
||||
|
||||
func (fs *FilerServer) maybeGetVolumeJwtAuthorizationToken(fileId string, isWrite bool) string {
|
||||
var encodedJwt security.EncodedJwt
|
||||
if isWrite {
|
||||
encodedJwt = security.GenJwtForVolumeServer(fs.volumeGuard.SigningKey, fs.volumeGuard.ExpiresAfterSec, fileId)
|
||||
} else {
|
||||
encodedJwt = security.GenJwtForVolumeServer(fs.volumeGuard.ReadSigningKey, fs.volumeGuard.ReadExpiresAfterSec, fileId)
|
||||
}
|
||||
return string(encodedJwt)
|
||||
}
|
||||
|
||||
func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Request, fileId string) {
|
||||
|
||||
urlStrings, err := fs.filer.MasterClient.GetLookupFileIdFunction()(fileId)
|
||||
|
|
|
@ -15,6 +15,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||||
"github.com/seaweedfs/seaweedfs/weed/security"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util/mem"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||
|
@ -261,7 +262,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
|
|||
}
|
||||
}
|
||||
|
||||
streamFn, err := filer.PrepareStreamContentWithThrottler(fs.filer.MasterClient, chunks, offset, size, fs.option.DownloadMaxBytesPs)
|
||||
streamFn, err := filer.PrepareStreamContentWithThrottler(fs.filer.MasterClient, fs.maybeGetVolumeReadJwtAuthorizationToken, chunks, offset, size, fs.option.DownloadMaxBytesPs)
|
||||
if err != nil {
|
||||
stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadStream).Inc()
|
||||
glog.Errorf("failed to prepare stream content %s: %v", r.URL, err)
|
||||
|
@ -277,3 +278,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
|
|||
}, nil
|
||||
})
|
||||
}
|
||||
|
||||
func (fs *FilerServer) maybeGetVolumeReadJwtAuthorizationToken(fileId string) string {
|
||||
return string(security.GenJwtForVolumeServer(fs.volumeGuard.ReadSigningKey, fs.volumeGuard.ReadExpiresAfterSec, fileId))
|
||||
}
|
||||
|
|
|
@ -53,11 +53,15 @@ func Post(url string, values url.Values) ([]byte, error) {
|
|||
// github.com/seaweedfs/seaweedfs/unmaintained/repeated_vacuum/repeated_vacuum.go
|
||||
// may need increasing http.Client.Timeout
|
||||
func Get(url string) ([]byte, bool, error) {
|
||||
return GetAuthenticated(url, "")
|
||||
}
|
||||
|
||||
func GetAuthenticated(url, jwt string) ([]byte, bool, error) {
|
||||
request, err := http.NewRequest("GET", url, nil)
|
||||
if err != nil {
|
||||
return nil, true, err
|
||||
}
|
||||
maybeAddAuth(request, jwt)
|
||||
request.Header.Add("Accept-Encoding", "gzip")
|
||||
|
||||
response, err := client.Do(request)
|
||||
|
@ -101,11 +105,15 @@ func Head(url string) (http.Header, error) {
|
|||
return r.Header, nil
|
||||
}
|
||||
|
||||
func Delete(url string, jwt string) error {
|
||||
req, err := http.NewRequest("DELETE", url, nil)
|
||||
func maybeAddAuth(req *http.Request, jwt string) {
|
||||
if jwt != "" {
|
||||
req.Header.Set("Authorization", "BEARER "+string(jwt))
|
||||
}
|
||||
}
|
||||
|
||||
func Delete(url string, jwt string) error {
|
||||
req, err := http.NewRequest("DELETE", url, nil)
|
||||
maybeAddAuth(req, jwt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -133,9 +141,7 @@ func Delete(url string, jwt string) error {
|
|||
|
||||
func DeleteProxied(url string, jwt string) (body []byte, httpStatus int, err error) {
|
||||
req, err := http.NewRequest("DELETE", url, nil)
|
||||
if jwt != "" {
|
||||
req.Header.Set("Authorization", "BEARER "+string(jwt))
|
||||
}
|
||||
maybeAddAuth(req, jwt)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
@ -193,9 +199,7 @@ func DownloadFile(fileUrl string, jwt string) (filename string, header http.Head
|
|||
return "", nil, nil, err
|
||||
}
|
||||
|
||||
if len(jwt) > 0 {
|
||||
req.Header.Set("Authorization", "BEARER "+jwt)
|
||||
}
|
||||
maybeAddAuth(req, jwt)
|
||||
|
||||
response, err := client.Do(req)
|
||||
if err != nil {
|
||||
|
@ -229,7 +233,7 @@ func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullC
|
|||
|
||||
if cipherKey != nil {
|
||||
var n int
|
||||
_, err := readEncryptedUrl(fileUrl, cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) {
|
||||
_, err := readEncryptedUrl(fileUrl, "", cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) {
|
||||
n = copy(buf, data)
|
||||
})
|
||||
return int64(n), err
|
||||
|
@ -298,11 +302,16 @@ func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullC
|
|||
}
|
||||
|
||||
func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
|
||||
return ReadUrlAsStreamAuthenticated(fileUrl, "", cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
|
||||
}
|
||||
|
||||
func ReadUrlAsStreamAuthenticated(fileUrl, jwt string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
|
||||
if cipherKey != nil {
|
||||
return readEncryptedUrl(fileUrl, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
|
||||
return readEncryptedUrl(fileUrl, jwt, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("GET", fileUrl, nil)
|
||||
maybeAddAuth(req, jwt)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
@ -354,8 +363,8 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is
|
|||
|
||||
}
|
||||
|
||||
func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) {
|
||||
encryptedData, retryable, err := Get(fileUrl)
|
||||
func readEncryptedUrl(fileUrl, jwt string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) {
|
||||
encryptedData, retryable, err := GetAuthenticated(fileUrl, jwt)
|
||||
if err != nil {
|
||||
return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err)
|
||||
}
|
||||
|
@ -392,9 +401,7 @@ func ReadUrlAsReaderCloser(fileUrl string, jwt string, rangeHeader string) (*htt
|
|||
req.Header.Add("Accept-Encoding", "gzip")
|
||||
}
|
||||
|
||||
if len(jwt) > 0 {
|
||||
req.Header.Set("Authorization", "BEARER "+jwt)
|
||||
}
|
||||
maybeAddAuth(req, jwt)
|
||||
|
||||
r, err := client.Do(req)
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in a new issue