This commit is contained in:
2024-01-26 22:59:42 +08:00
parent b51f725480
commit 56cc4cb993
8 changed files with 151 additions and 77 deletions

View File

@ -10,6 +10,7 @@ import (
proto "git.jeffthecoder.xyz/public/chat-signaling-server/pkg/proto/signaling"
"github.com/goombaio/namegenerator"
"github.com/redis/go-redis/v9"
"golang.org/x/sync/errgroup"
)
@ -17,9 +18,10 @@ import (
type SignalingServer struct {
proto.UnimplementedSignalingServer
redisKeyPrefix string
names namegenerator.Generator
redis redis.UniversalClient
redisKeyPrefix string
redis redis.UniversalClient
}
type Options struct {
@ -27,6 +29,8 @@ type Options struct {
RedisDatabase int
RedisKeyPrefix string
NameRandomSeed int64
}
func New(options Options) (*SignalingServer, error) {
@ -40,13 +44,14 @@ func New(options Options) (*SignalingServer, error) {
}
return &SignalingServer{
redis: redisClient,
redis: redisClient,
redisKeyPrefix: options.RedisKeyPrefix,
names: namegenerator.NewNameGenerator(options.NameRandomSeed),
}, nil
}
func (signalingServer SignalingServer) handleStream(ctx context.Context, errGroup *errgroup.Group, stream proto.Signaling_BiuServer) func() error {
func (signalingServer SignalingServer) handleStream(ctx context.Context, name string, errGroup *errgroup.Group, stream proto.Signaling_BiuServer) func() error {
return func() error {
for {
msg, err := stream.Recv()
@ -55,19 +60,19 @@ func (signalingServer SignalingServer) handleStream(ctx context.Context, errGrou
} else if err == nil {
switch innerMsg := msg.Message.(type) {
case *proto.SignalingMessage_Bootstrap:
errGroup.Go(signalingServer.handleRedisPubSub(ctx, msg.Sender, msg.Room, stream))
errGroup.Go(signalingServer.handleRedisPubSub(ctx, name, msg.Room, stream))
case *proto.SignalingMessage_DiscoverRequest:
// ignore msg.Receiver, from sender to whole channel
if received, err := signalingServer.redis.Publish(ctx, signalingServer.redisKeyPrefix+":"+msg.Room+":discover", msg.Sender).Result(); err != nil {
return err
} else {
log.Printf("peers received discover request %v -> %v(all): %v", msg.Sender, msg.Room, received)
log.Printf("peers received discover request %v -> %v(all): %v", name, msg.Room, received)
}
case *proto.SignalingMessage_DiscoverResponse:
if received, err := signalingServer.redis.Publish(ctx, signalingServer.redisKeyPrefix+":"+msg.Room+":discover:"+*msg.Receiver, msg.Sender).Result(); err != nil {
return err
} else {
log.Printf("peers received discover response %v -> %v(%v): %v", msg.Sender, msg.Room, *msg.Receiver, received)
log.Printf("peers received discover response %v -> %v(%v): %v", name, msg.Room, *msg.Receiver, received)
}
case *proto.SignalingMessage_SessionOffer:
payload := &bytes.Buffer{}
@ -181,8 +186,6 @@ func (signalingServer SignalingServer) handleRedisPubSub(ctx context.Context, na
}
}
}
return nil
}
}
@ -190,9 +193,11 @@ func (signalingServer SignalingServer) Biu(stream proto.Signaling_BiuServer) err
ctx, cancel := context.WithCancel(stream.Context())
defer cancel()
name := signalingServer.names.Generate()
errGroup := &errgroup.Group{}
errGroup.Go(signalingServer.handleStream(ctx, errGroup, stream))
errGroup.Go(signalingServer.handleStream(ctx, name, errGroup, stream))
return errGroup.Wait()
}