fix signal-server implementation
This commit is contained in:
@ -54,23 +54,26 @@ func (signalingServer SignalingServer) handleStream(ctx context.Context, errGrou
|
||||
return nil
|
||||
} else if err == nil {
|
||||
switch innerMsg := msg.Message.(type) {
|
||||
case *proto.SignalingMessage_Bootstrap:
|
||||
errGroup.Go(signalingServer.handleRedisPubSub(ctx, msg.Sender, msg.Room, stream))
|
||||
case *proto.SignalingMessage_DiscoverRequest:
|
||||
// ignore msg.Receiver, from sender to whole channel
|
||||
if received, err := signalingServer.redis.Publish(ctx, signalingServer.redisKeyPrefix+":"+msg.Room+":discover", msg.Sender).Result(); err != nil {
|
||||
return err
|
||||
} else {
|
||||
log.Printf("peers received discover request: %v", received)
|
||||
log.Printf("peers received discover request %v -> %v(all): %v", msg.Sender, msg.Room, received)
|
||||
}
|
||||
errGroup.Go(signalingServer.handleRedisPubSub(ctx, msg.Sender, msg.Room, stream))
|
||||
case *proto.SignalingMessage_DiscoverResponse:
|
||||
if received, err := signalingServer.redis.Publish(ctx, signalingServer.redisKeyPrefix+":"+msg.Room+":discover:"+*msg.Receiver, msg.Sender).Result(); err != nil {
|
||||
return err
|
||||
} else {
|
||||
log.Printf("peers received discover response: %v", received)
|
||||
log.Printf("peers received discover response %v -> %v(%v): %v", msg.Sender, msg.Room, *msg.Receiver, received)
|
||||
}
|
||||
case *proto.SignalingMessage_SessionOffer:
|
||||
payload := &bytes.Buffer{}
|
||||
json.NewEncoder(payload).Encode(innerMsg.SessionOffer)
|
||||
if err := json.NewEncoder(payload).Encode(innerMsg.SessionOffer); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if received, err := signalingServer.redis.Publish(ctx, signalingServer.redisKeyPrefix+":"+msg.Room+":offer:"+*msg.Receiver, payload.String()).Result(); err != nil {
|
||||
return err
|
||||
@ -79,7 +82,9 @@ func (signalingServer SignalingServer) handleStream(ctx context.Context, errGrou
|
||||
}
|
||||
case *proto.SignalingMessage_SessionAnswer:
|
||||
payload := &bytes.Buffer{}
|
||||
json.NewEncoder(payload).Encode(innerMsg.SessionAnswer)
|
||||
if err := json.NewEncoder(payload).Encode(innerMsg.SessionAnswer); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if received, err := signalingServer.redis.Publish(ctx, signalingServer.redisKeyPrefix+":"+msg.Room+":answer:"+*msg.Receiver, payload.String()).Result(); err != nil {
|
||||
return err
|
||||
@ -102,48 +107,69 @@ func (signalingServer SignalingServer) handleRedisPubSub(ctx context.Context, na
|
||||
signalingServer.redisKeyPrefix+":"+room+":offer:"+name,
|
||||
signalingServer.redisKeyPrefix+":"+room+":answer:"+name,
|
||||
)
|
||||
defer pubsub.Unsubscribe(ctx)
|
||||
defer pubsub.Close()
|
||||
|
||||
if err := stream.Send(&proto.SignalingMessage{
|
||||
Room: room,
|
||||
Sender: name,
|
||||
Message: &proto.SignalingMessage_Bootstrap{},
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ch := pubsub.Channel()
|
||||
for msg := range ch {
|
||||
switch msg.Channel {
|
||||
case signalingServer.redisKeyPrefix + ":" + room + ":discover":
|
||||
stream.Send(&proto.SignalingMessage{
|
||||
if err := stream.Send(&proto.SignalingMessage{
|
||||
Room: room,
|
||||
Sender: name,
|
||||
Sender: msg.Payload,
|
||||
Message: &proto.SignalingMessage_DiscoverRequest{},
|
||||
})
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
case signalingServer.redisKeyPrefix + ":" + room + ":discover:" + name:
|
||||
stream.Send(&proto.SignalingMessage{
|
||||
if err := stream.Send(&proto.SignalingMessage{
|
||||
Room: room,
|
||||
Sender: name,
|
||||
Receiver: &msg.Payload,
|
||||
Sender: msg.Payload,
|
||||
Receiver: &name,
|
||||
Message: &proto.SignalingMessage_DiscoverResponse{},
|
||||
})
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
case signalingServer.redisKeyPrefix + ":" + room + ":offer:" + name:
|
||||
sdpMessage := &proto.SDPMessage{}
|
||||
json.NewDecoder(strings.NewReader(msg.Payload)).Decode(sdpMessage)
|
||||
if err := json.NewDecoder(strings.NewReader(msg.Payload)).Decode(sdpMessage); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
stream.Send(&proto.SignalingMessage{
|
||||
if err := stream.Send(&proto.SignalingMessage{
|
||||
Room: room,
|
||||
Sender: name,
|
||||
Receiver: &msg.Payload,
|
||||
Sender: sdpMessage.Sender,
|
||||
Receiver: &name,
|
||||
Message: &proto.SignalingMessage_SessionOffer{
|
||||
SessionOffer: sdpMessage,
|
||||
},
|
||||
})
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
case signalingServer.redisKeyPrefix + ":" + room + ":answer:" + name:
|
||||
sdpMessage := &proto.SDPMessage{}
|
||||
json.NewDecoder(strings.NewReader(msg.Payload)).Decode(sdpMessage)
|
||||
if err := json.NewDecoder(strings.NewReader(msg.Payload)).Decode(sdpMessage); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
stream.Send(&proto.SignalingMessage{
|
||||
if err := stream.Send(&proto.SignalingMessage{
|
||||
Room: room,
|
||||
Sender: name,
|
||||
Receiver: &msg.Payload,
|
||||
Message: &proto.SignalingMessage_SessionOffer{
|
||||
SessionOffer: sdpMessage,
|
||||
Sender: sdpMessage.Sender,
|
||||
Receiver: &name,
|
||||
Message: &proto.SignalingMessage_SessionAnswer{
|
||||
SessionAnswer: sdpMessage,
|
||||
},
|
||||
})
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user