1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2024-06-18 00:24:06 +02:00

use pipeline to save some time

This commit is contained in:
Chris Lu 2021-10-06 20:40:16 -07:00
parent 7584f758b8
commit 2336a397dc
2 changed files with 31 additions and 10 deletions

View file

@ -66,6 +66,24 @@ There are multiple cases after finding the name for greater or equal node
return return
*/ */
func (nl *ItemList) canAddMember(node *skiplist.SkipListElementReference, name string) (alreadyContains bool, nodeSize int, err error) {
ctx := context.Background()
pipe := nl.client.TxPipeline()
key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
countOperation := pipe.ZLexCount(ctx, key, "-", "+")
scoreOperationt := pipe.ZScore(ctx, key, name)
if _, err = pipe.Exec(ctx); err != nil && err != redis.Nil{
return false, 0, err
}
if err == redis.Nil {
err = nil
}
alreadyContains = scoreOperationt.Err() == nil
nodeSize = int(countOperation.Val())
return
}
func (nl *ItemList) WriteName(name string) error { func (nl *ItemList) WriteName(name string) error {
lookupKey := []byte(name) lookupKey := []byte(name)
@ -93,13 +111,16 @@ func (nl *ItemList) WriteName(name string) error {
} }
if prevNode != nil { if prevNode != nil {
alreadyContains, nodeSize, err := nl.canAddMember(prevNode.Reference(), name)
if err != nil {
return err
}
if alreadyContains {
// case 2.1 // case 2.1
if nl.NodeContainsItem(prevNode.Reference(), name) {
return nil return nil
} }
// case 2.2 // case 2.2
nodeSize := nl.NodeSize(prevNode.Reference())
if nodeSize < nl.batchSize { if nodeSize < nl.batchSize {
return nl.NodeAddMember(prevNode.Reference(), name) return nl.NodeAddMember(prevNode.Reference(), name)
} }

View file

@ -58,14 +58,14 @@ func TestNameList(t *testing.T) {
nameList.WriteName(name) nameList.WriteName(name)
nameList.ListNames("", func(name string) bool { nameList.ListNames("", func(name string) bool {
// println(name) println(name)
return true return true
}) })
if nameList.HasChanges() { if nameList.HasChanges() {
data = nameList.ToBytes() data = nameList.ToBytes()
} }
// println() println()
} }
nameList := LoadItemList(data, "/yyy/bin", client, store, maxNameBatchSizeLimit) nameList := LoadItemList(data, "/yyy/bin", client, store, maxNameBatchSizeLimit)
@ -76,7 +76,7 @@ func TestNameList(t *testing.T) {
} }
func xBenchmarkNameList(b *testing.B) { func BenchmarkNameList(b *testing.B) {
server, err := tempredis.Start(tempredis.Config{}) server, err := tempredis.Start(tempredis.Config{})
if err != nil { if err != nil {
@ -102,7 +102,7 @@ func xBenchmarkNameList(b *testing.B) {
} }
} }
func xBenchmarkRedis(b *testing.B) { func BenchmarkRedis(b *testing.B) {
server, err := tempredis.Start(tempredis.Config{}) server, err := tempredis.Start(tempredis.Config{})
if err != nil { if err != nil {
@ -120,7 +120,7 @@ func xBenchmarkRedis(b *testing.B) {
} }
} }
func TestNameListAdd(t *testing.T) { func xTestNameListAdd(t *testing.T) {
server, err := tempredis.Start(tempredis.Config{}) server, err := tempredis.Start(tempredis.Config{})
if err != nil { if err != nil {
@ -169,7 +169,7 @@ func TestNameListAdd(t *testing.T) {
*/ */
} }
func BenchmarkNameList(b *testing.B) { func xBenchmarkNameList(b *testing.B) {
server, err := tempredis.Start(tempredis.Config{}) server, err := tempredis.Start(tempredis.Config{})
if err != nil { if err != nil {
@ -196,7 +196,7 @@ func BenchmarkNameList(b *testing.B) {
} }
} }
func BenchmarkRedis(b *testing.B) { func xBenchmarkRedis(b *testing.B) {
client := redis.NewClient(&redis.Options{ client := redis.NewClient(&redis.Options{
Addr: "localhost:6379", Addr: "localhost:6379",