204 lines
5.9 KiB
Go
Raw Normal View History

2023-03-02 00:35:09 +08:00
package signal_server
import (
"bytes"
"context"
"encoding/json"
"io"
"log"
"strings"
proto "git.jeffthecoder.xyz/public/chat-signaling-server/pkg/proto/signaling"
2023-03-02 00:35:09 +08:00
2024-01-26 22:59:42 +08:00
"github.com/goombaio/namegenerator"
2023-03-02 00:35:09 +08:00
"github.com/redis/go-redis/v9"
"golang.org/x/sync/errgroup"
)
type SignalingServer struct {
proto.UnimplementedSignalingServer
2024-01-26 22:59:42 +08:00
names namegenerator.Generator
2023-03-02 00:35:09 +08:00
2024-01-26 22:59:42 +08:00
redisKeyPrefix string
redis redis.UniversalClient
2023-03-02 00:35:09 +08:00
}
type Options struct {
RedisServers []string
RedisDatabase int
RedisKeyPrefix string
2024-01-26 22:59:42 +08:00
NameRandomSeed int64
2023-03-02 00:35:09 +08:00
}
func New(options Options) (*SignalingServer, error) {
redisClient := redis.NewUniversalClient(&redis.UniversalOptions{
Addrs: options.RedisServers,
DB: options.RedisDatabase,
})
if err := redisClient.Ping(context.Background()).Err(); err != nil {
return nil, err
}
return &SignalingServer{
2024-01-26 22:59:42 +08:00
redis: redisClient,
2023-03-02 00:35:09 +08:00
redisKeyPrefix: options.RedisKeyPrefix,
2024-01-26 22:59:42 +08:00
names: namegenerator.NewNameGenerator(options.NameRandomSeed),
2023-03-02 00:35:09 +08:00
}, nil
}
2024-01-26 22:59:42 +08:00
func (signalingServer SignalingServer) handleStream(ctx context.Context, name string, errGroup *errgroup.Group, stream proto.Signaling_BiuServer) func() error {
2023-03-02 00:35:09 +08:00
return func() error {
for {
msg, err := stream.Recv()
if err == io.EOF {
return nil
} else if err == nil {
switch innerMsg := msg.Message.(type) {
2023-03-02 23:33:29 +08:00
case *proto.SignalingMessage_Bootstrap:
2024-01-26 22:59:42 +08:00
errGroup.Go(signalingServer.handleRedisPubSub(ctx, name, msg.Room, stream))
2023-03-02 00:35:09 +08:00
case *proto.SignalingMessage_DiscoverRequest:
// ignore msg.Receiver, from sender to whole channel
if received, err := signalingServer.redis.Publish(ctx, signalingServer.redisKeyPrefix+":"+msg.Room+":discover", msg.Sender).Result(); err != nil {
return err
} else {
2024-01-26 22:59:42 +08:00
log.Printf("peers received discover request %v -> %v(all): %v", name, msg.Room, received)
2023-03-02 00:35:09 +08:00
}
case *proto.SignalingMessage_DiscoverResponse:
if received, err := signalingServer.redis.Publish(ctx, signalingServer.redisKeyPrefix+":"+msg.Room+":discover:"+*msg.Receiver, msg.Sender).Result(); err != nil {
return err
} else {
2024-01-26 22:59:42 +08:00
log.Printf("peers received discover response %v -> %v(%v): %v", name, msg.Room, *msg.Receiver, received)
2023-03-02 00:35:09 +08:00
}
case *proto.SignalingMessage_SessionOffer:
payload := &bytes.Buffer{}
2023-03-02 23:33:29 +08:00
if err := json.NewEncoder(payload).Encode(innerMsg.SessionOffer); err != nil {
return err
}
2023-03-02 00:35:09 +08:00
if received, err := signalingServer.redis.Publish(ctx, signalingServer.redisKeyPrefix+":"+msg.Room+":offer:"+*msg.Receiver, payload.String()).Result(); err != nil {
return err
} else {
2024-05-26 14:00:14 +08:00
log.Printf("peers received session offer: %v", received)
2023-03-02 00:35:09 +08:00
}
case *proto.SignalingMessage_SessionAnswer:
payload := &bytes.Buffer{}
2023-03-02 23:33:29 +08:00
if err := json.NewEncoder(payload).Encode(innerMsg.SessionAnswer); err != nil {
return err
}
2023-03-02 00:35:09 +08:00
if received, err := signalingServer.redis.Publish(ctx, signalingServer.redisKeyPrefix+":"+msg.Room+":answer:"+*msg.Receiver, payload.String()).Result(); err != nil {
return err
} else {
2024-05-26 14:00:14 +08:00
log.Printf("peers received session answer: %v", received)
2023-03-02 00:35:09 +08:00
}
}
} else {
return err
}
}
}
}
func (signalingServer SignalingServer) handleRedisPubSub(ctx context.Context, name, room string, stream proto.Signaling_BiuServer) func() error {
2023-03-02 00:35:09 +08:00
return func() error {
pubsub := signalingServer.redis.Subscribe(ctx,
signalingServer.redisKeyPrefix+":"+room+":discover",
signalingServer.redisKeyPrefix+":"+room+":discover:"+name,
signalingServer.redisKeyPrefix+":"+room+":offer:"+name,
signalingServer.redisKeyPrefix+":"+room+":answer:"+name,
)
2023-03-02 23:33:29 +08:00
defer pubsub.Unsubscribe(ctx)
2023-03-02 00:35:09 +08:00
defer pubsub.Close()
2023-03-02 23:33:29 +08:00
if err := stream.Send(&proto.SignalingMessage{
Room: room,
Sender: name,
Message: &proto.SignalingMessage_Bootstrap{},
}); err != nil {
return err
}
2023-03-02 00:35:09 +08:00
ch := pubsub.Channel()
2024-01-26 16:27:16 +08:00
for {
select {
case <-ctx.Done():
return nil
case msg := <-ch:
switch msg.Channel {
case signalingServer.redisKeyPrefix + ":" + room + ":discover":
if err := stream.Send(&proto.SignalingMessage{
Room: room,
Sender: msg.Payload,
Message: &proto.SignalingMessage_DiscoverRequest{},
}); err != nil {
return err
}
case signalingServer.redisKeyPrefix + ":" + room + ":discover:" + name:
if msg.Payload == name {
continue
}
if err := stream.Send(&proto.SignalingMessage{
Room: room,
Sender: msg.Payload,
Receiver: &name,
Message: &proto.SignalingMessage_DiscoverResponse{},
}); err != nil {
return err
}
case signalingServer.redisKeyPrefix + ":" + room + ":offer:" + name:
sdpMessage := &proto.SDPMessage{}
if err := json.NewDecoder(strings.NewReader(msg.Payload)).Decode(sdpMessage); err != nil {
return err
}
2023-03-02 00:35:09 +08:00
2024-01-26 16:27:16 +08:00
if err := stream.Send(&proto.SignalingMessage{
Room: room,
Sender: sdpMessage.Sender,
Receiver: &name,
Message: &proto.SignalingMessage_SessionOffer{
SessionOffer: sdpMessage,
},
}); err != nil {
return err
}
case signalingServer.redisKeyPrefix + ":" + room + ":answer:" + name:
sdpMessage := &proto.SDPMessage{}
if err := json.NewDecoder(strings.NewReader(msg.Payload)).Decode(sdpMessage); err != nil {
return err
}
2023-03-02 00:35:09 +08:00
2024-01-26 16:27:16 +08:00
if err := stream.Send(&proto.SignalingMessage{
Room: room,
Sender: sdpMessage.Sender,
Receiver: &name,
Message: &proto.SignalingMessage_SessionAnswer{
SessionAnswer: sdpMessage,
},
}); err != nil {
return err
}
2023-03-02 23:33:29 +08:00
}
2023-03-02 00:35:09 +08:00
}
}
}
}
2023-08-17 11:44:03 +08:00
func (signalingServer SignalingServer) Biu(stream proto.Signaling_BiuServer) error {
2023-03-02 00:35:09 +08:00
ctx, cancel := context.WithCancel(stream.Context())
defer cancel()
2024-01-26 22:59:42 +08:00
name := signalingServer.names.Generate()
2023-03-02 00:35:09 +08:00
errGroup := &errgroup.Group{}
2024-01-26 22:59:42 +08:00
errGroup.Go(signalingServer.handleStream(ctx, name, errGroup, stream))
2023-03-02 00:35:09 +08:00
return errGroup.Wait()
}