1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-19 01:30:23 +02:00
seaweedfs/weed/mq/kafka/gateway/server.go
2025-09-18 01:04:53 -07:00

256 lines
6.7 KiB
Go

package gateway
import (
"context"
"fmt"
"net"
"strconv"
"strings"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/protocol"
"github.com/seaweedfs/seaweedfs/weed/pb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// resolveAdvertisedAddress resolves the appropriate address to advertise to Kafka clients
// when the server binds to all interfaces (:: or 0.0.0.0)
func resolveAdvertisedAddress() string {
// Try to find a non-loopback interface
interfaces, err := net.Interfaces()
if err != nil {
glog.V(1).Infof("Failed to get network interfaces, using localhost: %v", err)
return "127.0.0.1"
}
for _, iface := range interfaces {
// Skip loopback and inactive interfaces
if iface.Flags&net.FlagLoopback != 0 || iface.Flags&net.FlagUp == 0 {
continue
}
addrs, err := iface.Addrs()
if err != nil {
continue
}
for _, addr := range addrs {
if ipNet, ok := addr.(*net.IPNet); ok && !ipNet.IP.IsLoopback() {
// Prefer IPv4 addresses for better Kafka client compatibility
if ipv4 := ipNet.IP.To4(); ipv4 != nil {
return ipv4.String()
}
}
}
}
// Fallback to localhost if no suitable interface found
glog.V(1).Infof("No non-loopback interface found, using localhost")
return "127.0.0.1"
}
type Options struct {
Listen string
Masters string // SeaweedFS master servers
FilerGroup string // filer group name (optional)
}
type Server struct {
opts Options
ln net.Listener
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
handler *protocol.Handler
coordinatorRegistry *CoordinatorRegistry
}
func NewServer(opts Options) *Server {
ctx, cancel := context.WithCancel(context.Background())
var handler *protocol.Handler
var err error
// Create SeaweedMQ handler - masters are required for production
if opts.Masters == "" {
glog.Fatalf("SeaweedMQ masters are required for Kafka gateway - provide masters addresses")
}
// Use the intended listen address as the client host for master registration
clientHost := opts.Listen
if clientHost == "" {
clientHost = "127.0.0.1:9092" // Default Kafka port
}
handler, err = protocol.NewSeaweedMQBrokerHandler(opts.Masters, opts.FilerGroup, clientHost)
if err != nil {
glog.Fatalf("Failed to create SeaweedMQ handler with masters %s: %v", opts.Masters, err)
}
glog.V(1).Infof("Created Kafka gateway with SeaweedMQ brokers via masters %s", opts.Masters)
server := &Server{
opts: opts,
ctx: ctx,
cancel: cancel,
handler: handler,
}
return server
}
func (s *Server) Start() error {
ln, err := net.Listen("tcp", s.opts.Listen)
if err != nil {
return err
}
s.ln = ln
// Get gateway address for coordinator registry
host, port := s.GetListenerAddr()
gatewayAddress := fmt.Sprintf("%s:%d", host, port)
glog.V(1).Infof("Kafka gateway started at %s, will advertise SMQ brokers in Metadata responses", gatewayAddress)
// Set gateway address in handler for coordinator registry
s.handler.SetGatewayAddress(gatewayAddress)
// Initialize coordinator registry for distributed coordinator assignment (only if masters are configured)
if s.opts.Masters != "" {
seedFiler := pb.ServerAddress(strings.Split(s.opts.Masters, ",")[0]) // Use first master as seed filer
grpcDialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
s.coordinatorRegistry = NewCoordinatorRegistry(gatewayAddress, seedFiler, grpcDialOption)
s.handler.SetCoordinatorRegistry(s.coordinatorRegistry)
// Start coordinator registry
if err := s.coordinatorRegistry.Start(); err != nil {
glog.Errorf("Failed to start coordinator registry: %v", err)
return err
}
glog.V(1).Infof("Started coordinator registry for gateway %s", gatewayAddress)
} else {
glog.V(1).Infof("No masters configured, skipping coordinator registry setup (test mode)")
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
for {
conn, err := s.ln.Accept()
if err != nil {
select {
case <-s.ctx.Done():
return
default:
return
}
}
s.wg.Add(1)
go func(c net.Conn) {
defer s.wg.Done()
if err := s.handler.HandleConn(s.ctx, c); err != nil {
glog.V(1).Infof("handle conn %v: %v", c.RemoteAddr(), err)
}
}(conn)
}
}()
return nil
}
func (s *Server) Wait() error {
s.wg.Wait()
return nil
}
func (s *Server) Close() error {
s.cancel()
// Stop coordinator registry
if s.coordinatorRegistry != nil {
if err := s.coordinatorRegistry.Stop(); err != nil {
glog.Warningf("Error stopping coordinator registry: %v", err)
}
}
if s.ln != nil {
_ = s.ln.Close()
}
// Wait for goroutines to finish with a timeout to prevent hanging
done := make(chan struct{})
go func() {
s.wg.Wait()
close(done)
}()
select {
case <-done:
// Normal shutdown
case <-time.After(1 * time.Second):
// Timeout - force shutdown
glog.Warningf("Server shutdown timed out after 1 second, forcing close")
}
// Close the handler (important for SeaweedMQ mode)
if s.handler != nil {
if err := s.handler.Close(); err != nil {
glog.Warningf("Error closing handler: %v", err)
}
}
return nil
}
// Removed registerWithBrokerLeader - no longer needed
// Addr returns the bound address of the server listener, or empty if not started.
func (s *Server) Addr() string {
if s.ln == nil {
return ""
}
// Normalize to an address reachable by clients
host, port := s.GetListenerAddr()
return net.JoinHostPort(host, strconv.Itoa(port))
}
// GetHandler returns the protocol handler (for testing)
func (s *Server) GetHandler() *protocol.Handler {
return s.handler
}
// GetListenerAddr returns the actual listening address and port
func (s *Server) GetListenerAddr() (string, int) {
if s.ln == nil {
// Return empty values to indicate address not available yet
// The caller should handle this appropriately
return "", 0
}
addr := s.ln.Addr().String()
// Parse [::]:port or host:port format - use exact match for kafka-go compatibility
if strings.HasPrefix(addr, "[::]:") {
port := strings.TrimPrefix(addr, "[::]:")
if p, err := strconv.Atoi(port); err == nil {
// Resolve appropriate address when bound to IPv6 all interfaces
return resolveAdvertisedAddress(), p
}
}
// Handle host:port format
if host, port, err := net.SplitHostPort(addr); err == nil {
if p, err := strconv.Atoi(port); err == nil {
// Resolve appropriate address when bound to all interfaces
if host == "::" || host == "" || host == "0.0.0.0" {
host = resolveAdvertisedAddress()
}
return host, p
}
}
// This should not happen if the listener was set up correctly
glog.Warningf("Unable to parse listener address: %s", addr)
return "", 0
}