From 53d1d2b78a480ac9b8432b9a78d6f6c48a6cfbf7 Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 12 Apr 2024 22:29:53 -0700 Subject: [PATCH] save schema when configuring topic --- weed/mq/broker/broker_grpc_configure.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go index 6a6e92922..40ac8df23 100644 --- a/weed/mq/broker/broker_grpc_configure.go +++ b/weed/mq/broker/broker_grpc_configure.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" + "github.com/seaweedfs/seaweedfs/weed/mq/schema" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" @@ -27,6 +28,14 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. return resp, err } + // validate the schema + if request.RecordType != nil { + if _, err = schema.NewSchema(request.RecordType); err != nil { + return nil, status.Errorf(codes.InvalidArgument, "invalid record type %+v: %v", request.RecordType, err) + } + } + + t := topic.FromPbTopic(request.Topic) var readErr, assignErr error resp, readErr = b.readTopicConfFromFiler(t) @@ -56,6 +65,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. return nil, status.Errorf(codes.Unavailable, pub_balancer.ErrNoBroker.Error()) } resp.BrokerPartitionAssignments = pub_balancer.AllocateTopicPartitions(b.Balancer.Brokers, request.PartitionCount) + resp.RecordType = request.RecordType // save the topic configuration on filer if err := b.saveTopicConfToFiler(request.Topic, resp); err != nil {