first commit
This commit is contained in:
163
pkg/impl/signal-server/signaling.go
Normal file
163
pkg/impl/signal-server/signaling.go
Normal file
@ -0,0 +1,163 @@
|
||||
package signal_server
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"log"
|
||||
"strings"
|
||||
|
||||
proto "git.jeffthecoder.xyz/guochao/meow-signaling.jeffthecoder.xyz/pkg/proto/signal-server"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
type SignalingServer struct {
|
||||
proto.UnimplementedSignalingServer
|
||||
|
||||
redisKeyPrefix string
|
||||
|
||||
redis redis.UniversalClient
|
||||
}
|
||||
|
||||
type Options struct {
|
||||
RedisServers []string
|
||||
RedisDatabase int
|
||||
|
||||
RedisKeyPrefix string
|
||||
}
|
||||
|
||||
func New(options Options) (*SignalingServer, error) {
|
||||
redisClient := redis.NewUniversalClient(&redis.UniversalOptions{
|
||||
Addrs: options.RedisServers,
|
||||
DB: options.RedisDatabase,
|
||||
})
|
||||
|
||||
if err := redisClient.Ping(context.Background()).Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &SignalingServer{
|
||||
redis: redisClient,
|
||||
|
||||
redisKeyPrefix: options.RedisKeyPrefix,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (signalingServer SignalingServer) handleStream(ctx context.Context, errGroup *errgroup.Group, stream proto.Signaling_ConnectServer) func() error {
|
||||
return func() error {
|
||||
for {
|
||||
msg, err := stream.Recv()
|
||||
if err == io.EOF {
|
||||
return nil
|
||||
} else if err == nil {
|
||||
switch innerMsg := msg.Message.(type) {
|
||||
case *proto.SignalingMessage_DiscoverRequest:
|
||||
// ignore msg.Receiver, from sender to whole channel
|
||||
if received, err := signalingServer.redis.Publish(ctx, signalingServer.redisKeyPrefix+":"+msg.Room+":discover", msg.Sender).Result(); err != nil {
|
||||
return err
|
||||
} else {
|
||||
log.Printf("peers received discover request: %v", received)
|
||||
}
|
||||
errGroup.Go(signalingServer.handleRedisPubSub(ctx, msg.Sender, msg.Room, stream))
|
||||
case *proto.SignalingMessage_DiscoverResponse:
|
||||
if received, err := signalingServer.redis.Publish(ctx, signalingServer.redisKeyPrefix+":"+msg.Room+":discover:"+*msg.Receiver, msg.Sender).Result(); err != nil {
|
||||
return err
|
||||
} else {
|
||||
log.Printf("peers received discover response: %v", received)
|
||||
}
|
||||
case *proto.SignalingMessage_SessionOffer:
|
||||
payload := &bytes.Buffer{}
|
||||
json.NewEncoder(payload).Encode(innerMsg.SessionOffer)
|
||||
|
||||
if received, err := signalingServer.redis.Publish(ctx, signalingServer.redisKeyPrefix+":"+msg.Room+":offer:"+*msg.Receiver, payload.String()).Result(); err != nil {
|
||||
return err
|
||||
} else {
|
||||
log.Printf("peers received discover response: %v", received)
|
||||
}
|
||||
case *proto.SignalingMessage_SessionAnswer:
|
||||
payload := &bytes.Buffer{}
|
||||
json.NewEncoder(payload).Encode(innerMsg.SessionAnswer)
|
||||
|
||||
if received, err := signalingServer.redis.Publish(ctx, signalingServer.redisKeyPrefix+":"+msg.Room+":answer:"+*msg.Receiver, payload.String()).Result(); err != nil {
|
||||
return err
|
||||
} else {
|
||||
log.Printf("peers received discover response: %v", received)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (signalingServer SignalingServer) handleRedisPubSub(ctx context.Context, name, room string, stream proto.Signaling_ConnectServer) func() error {
|
||||
return func() error {
|
||||
pubsub := signalingServer.redis.Subscribe(ctx,
|
||||
signalingServer.redisKeyPrefix+":"+room+":discover",
|
||||
signalingServer.redisKeyPrefix+":"+room+":discover:"+name,
|
||||
signalingServer.redisKeyPrefix+":"+room+":offer:"+name,
|
||||
signalingServer.redisKeyPrefix+":"+room+":answer:"+name,
|
||||
)
|
||||
defer pubsub.Close()
|
||||
|
||||
ch := pubsub.Channel()
|
||||
for msg := range ch {
|
||||
switch msg.Channel {
|
||||
case signalingServer.redisKeyPrefix + ":" + room + ":discover":
|
||||
stream.Send(&proto.SignalingMessage{
|
||||
Room: room,
|
||||
Sender: name,
|
||||
Message: &proto.SignalingMessage_DiscoverRequest{},
|
||||
})
|
||||
case signalingServer.redisKeyPrefix + ":" + room + ":discover:" + name:
|
||||
stream.Send(&proto.SignalingMessage{
|
||||
Room: room,
|
||||
Sender: name,
|
||||
Receiver: &msg.Payload,
|
||||
Message: &proto.SignalingMessage_DiscoverResponse{},
|
||||
})
|
||||
case signalingServer.redisKeyPrefix + ":" + room + ":offer:" + name:
|
||||
sdpMessage := &proto.SDPMessage{}
|
||||
json.NewDecoder(strings.NewReader(msg.Payload)).Decode(sdpMessage)
|
||||
|
||||
stream.Send(&proto.SignalingMessage{
|
||||
Room: room,
|
||||
Sender: name,
|
||||
Receiver: &msg.Payload,
|
||||
Message: &proto.SignalingMessage_SessionOffer{
|
||||
SessionOffer: sdpMessage,
|
||||
},
|
||||
})
|
||||
case signalingServer.redisKeyPrefix + ":" + room + ":answer:" + name:
|
||||
sdpMessage := &proto.SDPMessage{}
|
||||
json.NewDecoder(strings.NewReader(msg.Payload)).Decode(sdpMessage)
|
||||
|
||||
stream.Send(&proto.SignalingMessage{
|
||||
Room: room,
|
||||
Sender: name,
|
||||
Receiver: &msg.Payload,
|
||||
Message: &proto.SignalingMessage_SessionOffer{
|
||||
SessionOffer: sdpMessage,
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (signalingServer SignalingServer) Connect(stream proto.Signaling_ConnectServer) error {
|
||||
ctx, cancel := context.WithCancel(stream.Context())
|
||||
defer cancel()
|
||||
|
||||
errGroup := &errgroup.Group{}
|
||||
|
||||
errGroup.Go(signalingServer.handleStream(ctx, errGroup, stream))
|
||||
|
||||
return errGroup.Wait()
|
||||
}
|
Reference in New Issue
Block a user