mirror of
https://github.com/chrislusf/seaweedfs
synced 2024-06-30 06:12:15 +02:00
volume: add grpc file read operation
This is added more for performance benchmarking.
This commit is contained in:
parent
2cddc23ae8
commit
9b6296e77a
|
@ -19,6 +19,7 @@ import (
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
"github.com/chrislusf/seaweedfs/weed/operation"
|
"github.com/chrislusf/seaweedfs/weed/operation"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/security"
|
"github.com/chrislusf/seaweedfs/weed/security"
|
||||||
"github.com/chrislusf/seaweedfs/weed/util"
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
"github.com/chrislusf/seaweedfs/weed/wdclient"
|
"github.com/chrislusf/seaweedfs/weed/wdclient"
|
||||||
|
@ -40,6 +41,7 @@ type BenchmarkOptions struct {
|
||||||
maxCpu *int
|
maxCpu *int
|
||||||
grpcDialOption grpc.DialOption
|
grpcDialOption grpc.DialOption
|
||||||
masterClient *wdclient.MasterClient
|
masterClient *wdclient.MasterClient
|
||||||
|
grpcRead *bool
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -64,6 +66,7 @@ func init() {
|
||||||
b.replication = cmdBenchmark.Flag.String("replication", "000", "replication type")
|
b.replication = cmdBenchmark.Flag.String("replication", "000", "replication type")
|
||||||
b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
|
b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
|
||||||
b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
|
b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
|
||||||
|
b.grpcRead = cmdBenchmark.Flag.Bool("grpcRead", false, "use grpc API to read")
|
||||||
sharedBytes = make([]byte, 1024)
|
sharedBytes = make([]byte, 1024)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -278,23 +281,61 @@ func readFiles(fileIdLineChan chan string, s *stat) {
|
||||||
fmt.Printf("reading file %s\n", fid)
|
fmt.Printf("reading file %s\n", fid)
|
||||||
}
|
}
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
url, err := b.masterClient.LookupFileId(fid)
|
var bytesRead int
|
||||||
if err != nil {
|
var err error
|
||||||
s.failed++
|
if *b.grpcRead {
|
||||||
println("!!!! ", fid, " location not found!!!!!")
|
volumeServer, err := b.masterClient.LookupVolumeServer(fid)
|
||||||
continue
|
if err != nil {
|
||||||
|
s.failed++
|
||||||
|
println("!!!! ", fid, " location not found!!!!!")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
bytesRead, err = grpcFileGet(volumeServer, fid, b.grpcDialOption)
|
||||||
|
} else {
|
||||||
|
url, err := b.masterClient.LookupFileId(fid)
|
||||||
|
if err != nil {
|
||||||
|
s.failed++
|
||||||
|
println("!!!! ", fid, " location not found!!!!!")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var bytes []byte
|
||||||
|
bytes, err = util.Get(url)
|
||||||
|
bytesRead = len(bytes)
|
||||||
}
|
}
|
||||||
if bytesRead, err := util.Get(url); err == nil {
|
if err == nil {
|
||||||
s.completed++
|
s.completed++
|
||||||
s.transferred += int64(len(bytesRead))
|
s.transferred += int64(bytesRead)
|
||||||
readStats.addSample(time.Now().Sub(start))
|
readStats.addSample(time.Now().Sub(start))
|
||||||
} else {
|
} else {
|
||||||
s.failed++
|
s.failed++
|
||||||
fmt.Printf("Failed to read %s error:%v\n", url, err)
|
fmt.Printf("Failed to read %s error:%v\n", fid, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func grpcFileGet(volumeServer, fid string, grpcDialOption grpc.DialOption) (bytesRead int, err error) {
|
||||||
|
err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error {
|
||||||
|
fileGetClient, err := client.FileGet(ctx, &volume_server_pb.FileGetRequest{FileId: fid})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
resp, respErr := fileGetClient.Recv()
|
||||||
|
if resp != nil {
|
||||||
|
bytesRead += len(resp.Data)
|
||||||
|
}
|
||||||
|
if respErr != nil {
|
||||||
|
if respErr == io.EOF {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return respErr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) {
|
func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) {
|
||||||
file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
|
file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -8,6 +8,10 @@ service VolumeServer {
|
||||||
//Experts only: takes multiple fid parameters. This function does not propagate deletes to replicas.
|
//Experts only: takes multiple fid parameters. This function does not propagate deletes to replicas.
|
||||||
rpc BatchDelete (BatchDeleteRequest) returns (BatchDeleteResponse) {
|
rpc BatchDelete (BatchDeleteRequest) returns (BatchDeleteResponse) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rpc FileGet (FileGetRequest) returns (stream FileGetResponse) {
|
||||||
|
}
|
||||||
|
|
||||||
rpc VacuumVolumeCheck (VacuumVolumeCheckRequest) returns (VacuumVolumeCheckResponse) {
|
rpc VacuumVolumeCheck (VacuumVolumeCheckRequest) returns (VacuumVolumeCheckResponse) {
|
||||||
}
|
}
|
||||||
rpc VacuumVolumeCompact (VacuumVolumeCompactRequest) returns (VacuumVolumeCompactResponse) {
|
rpc VacuumVolumeCompact (VacuumVolumeCompactRequest) returns (VacuumVolumeCompactResponse) {
|
||||||
|
@ -100,6 +104,22 @@ message DeleteResult {
|
||||||
uint32 version = 5;
|
uint32 version = 5;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message FileGetRequest {
|
||||||
|
string file_id = 1;
|
||||||
|
bool accept_gzip = 2;
|
||||||
|
}
|
||||||
|
message FileGetResponse {
|
||||||
|
bytes data = 1;
|
||||||
|
uint32 content_length = 2;
|
||||||
|
string content_type = 3;
|
||||||
|
uint64 last_modified = 4;
|
||||||
|
string filename = 5;
|
||||||
|
string etag = 6;
|
||||||
|
bool is_gzipped = 7;
|
||||||
|
map<string, string> headers = 8;
|
||||||
|
int32 errorCode = 9;
|
||||||
|
}
|
||||||
|
|
||||||
message Empty {
|
message Empty {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
File diff suppressed because it is too large
Load diff
130
weed/server/volume_grpc_file.go
Normal file
130
weed/server/volume_grpc_file.go
Normal file
|
@ -0,0 +1,130 @@
|
||||||
|
package weed_server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (vs *VolumeServer) FileGet(req *volume_server_pb.FileGetRequest, stream volume_server_pb.VolumeServer_FileGetServer) error {
|
||||||
|
|
||||||
|
headResponse := &volume_server_pb.FileGetResponse{}
|
||||||
|
n := new(needle.Needle)
|
||||||
|
|
||||||
|
commaIndex := strings.LastIndex(req.FileId, ",")
|
||||||
|
vid := req.FileId[:commaIndex]
|
||||||
|
fid := req.FileId[commaIndex+1:]
|
||||||
|
|
||||||
|
volumeId, err := needle.NewVolumeId(vid)
|
||||||
|
if err != nil {
|
||||||
|
headResponse.ErrorCode = http.StatusBadRequest
|
||||||
|
return stream.Send(headResponse)
|
||||||
|
}
|
||||||
|
err = n.ParsePath(fid)
|
||||||
|
if err != nil {
|
||||||
|
headResponse.ErrorCode = http.StatusBadRequest
|
||||||
|
return stream.Send(headResponse)
|
||||||
|
}
|
||||||
|
|
||||||
|
hasVolume := vs.store.HasVolume(volumeId)
|
||||||
|
_, hasEcVolume := vs.store.FindEcVolume(volumeId)
|
||||||
|
|
||||||
|
if !hasVolume && !hasEcVolume {
|
||||||
|
headResponse.ErrorCode = http.StatusMovedPermanently
|
||||||
|
return stream.Send(headResponse)
|
||||||
|
}
|
||||||
|
|
||||||
|
cookie := n.Cookie
|
||||||
|
var count int
|
||||||
|
if hasVolume {
|
||||||
|
count, err = vs.store.ReadVolumeNeedle(volumeId, n)
|
||||||
|
} else if hasEcVolume {
|
||||||
|
count, err = vs.store.ReadEcShardNeedle(context.Background(), volumeId, n)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil || count < 0 {
|
||||||
|
headResponse.ErrorCode = http.StatusNotFound
|
||||||
|
return stream.Send(headResponse)
|
||||||
|
}
|
||||||
|
if n.Cookie != cookie {
|
||||||
|
headResponse.ErrorCode = http.StatusNotFound
|
||||||
|
return stream.Send(headResponse)
|
||||||
|
}
|
||||||
|
|
||||||
|
if n.LastModified != 0 {
|
||||||
|
headResponse.LastModified = n.LastModified
|
||||||
|
}
|
||||||
|
|
||||||
|
headResponse.Etag = n.Etag()
|
||||||
|
|
||||||
|
if n.HasPairs() {
|
||||||
|
pairMap := make(map[string]string)
|
||||||
|
err = json.Unmarshal(n.Pairs, &pairMap)
|
||||||
|
if err != nil {
|
||||||
|
glog.V(0).Infoln("Unmarshal pairs error:", err)
|
||||||
|
}
|
||||||
|
headResponse.Headers = pairMap
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
// skip this, no redirection
|
||||||
|
if vs.tryHandleChunkedFile(n, filename, w, r) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
if n.NameSize > 0 {
|
||||||
|
headResponse.Filename = string(n.Name)
|
||||||
|
}
|
||||||
|
mtype := ""
|
||||||
|
if n.MimeSize > 0 {
|
||||||
|
mt := string(n.Mime)
|
||||||
|
if !strings.HasPrefix(mt, "application/octet-stream") {
|
||||||
|
mtype = mt
|
||||||
|
}
|
||||||
|
}
|
||||||
|
headResponse.ContentType = mtype
|
||||||
|
|
||||||
|
headResponse.IsGzipped = n.IsGzipped()
|
||||||
|
|
||||||
|
if n.IsGzipped() && req.AcceptGzip {
|
||||||
|
if n.Data, err = util.UnGzipData(n.Data); err != nil {
|
||||||
|
glog.V(0).Infof("ungzip %s error: %v", req.FileId, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
headResponse.ContentLength = uint32(len(n.Data))
|
||||||
|
bytesToRead := len(n.Data)
|
||||||
|
bytesRead := 0
|
||||||
|
|
||||||
|
t := headResponse
|
||||||
|
|
||||||
|
for bytesRead < bytesToRead {
|
||||||
|
|
||||||
|
stopIndex := bytesRead + BufferSizeLimit
|
||||||
|
if stopIndex > bytesToRead {
|
||||||
|
stopIndex = bytesToRead
|
||||||
|
}
|
||||||
|
|
||||||
|
if t == nil {
|
||||||
|
t = &volume_server_pb.FileGetResponse{}
|
||||||
|
}
|
||||||
|
t.Data = n.Data[bytesRead:stopIndex]
|
||||||
|
|
||||||
|
err = stream.Send(t)
|
||||||
|
t = nil
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
bytesRead = stopIndex
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
Loading…
Reference in a new issue