fix hanging goroutine

This commit is contained in:
guochao 2024-01-26 16:27:16 +08:00
parent 03539164e5
commit 86f4adca3f

View File

@ -119,59 +119,65 @@ func (signalingServer SignalingServer) handleRedisPubSub(ctx context.Context, na
}
ch := pubsub.Channel()
for msg := range ch {
switch msg.Channel {
case signalingServer.redisKeyPrefix + ":" + room + ":discover":
if err := stream.Send(&proto.SignalingMessage{
Room: room,
Sender: msg.Payload,
Message: &proto.SignalingMessage_DiscoverRequest{},
}); err != nil {
return err
}
case signalingServer.redisKeyPrefix + ":" + room + ":discover:" + name:
if msg.Payload == name {
continue
}
if err := stream.Send(&proto.SignalingMessage{
Room: room,
Sender: msg.Payload,
Receiver: &name,
Message: &proto.SignalingMessage_DiscoverResponse{},
}); err != nil {
return err
}
case signalingServer.redisKeyPrefix + ":" + room + ":offer:" + name:
sdpMessage := &proto.SDPMessage{}
if err := json.NewDecoder(strings.NewReader(msg.Payload)).Decode(sdpMessage); err != nil {
return err
}
for {
select {
case <-ctx.Done():
return nil
case msg := <-ch:
if err := stream.Send(&proto.SignalingMessage{
Room: room,
Sender: sdpMessage.Sender,
Receiver: &name,
Message: &proto.SignalingMessage_SessionOffer{
SessionOffer: sdpMessage,
},
}); err != nil {
return err
}
case signalingServer.redisKeyPrefix + ":" + room + ":answer:" + name:
sdpMessage := &proto.SDPMessage{}
if err := json.NewDecoder(strings.NewReader(msg.Payload)).Decode(sdpMessage); err != nil {
return err
}
switch msg.Channel {
case signalingServer.redisKeyPrefix + ":" + room + ":discover":
if err := stream.Send(&proto.SignalingMessage{
Room: room,
Sender: msg.Payload,
Message: &proto.SignalingMessage_DiscoverRequest{},
}); err != nil {
return err
}
case signalingServer.redisKeyPrefix + ":" + room + ":discover:" + name:
if msg.Payload == name {
continue
}
if err := stream.Send(&proto.SignalingMessage{
Room: room,
Sender: msg.Payload,
Receiver: &name,
Message: &proto.SignalingMessage_DiscoverResponse{},
}); err != nil {
return err
}
case signalingServer.redisKeyPrefix + ":" + room + ":offer:" + name:
sdpMessage := &proto.SDPMessage{}
if err := json.NewDecoder(strings.NewReader(msg.Payload)).Decode(sdpMessage); err != nil {
return err
}
if err := stream.Send(&proto.SignalingMessage{
Room: room,
Sender: sdpMessage.Sender,
Receiver: &name,
Message: &proto.SignalingMessage_SessionAnswer{
SessionAnswer: sdpMessage,
},
}); err != nil {
return err
if err := stream.Send(&proto.SignalingMessage{
Room: room,
Sender: sdpMessage.Sender,
Receiver: &name,
Message: &proto.SignalingMessage_SessionOffer{
SessionOffer: sdpMessage,
},
}); err != nil {
return err
}
case signalingServer.redisKeyPrefix + ":" + room + ":answer:" + name:
sdpMessage := &proto.SDPMessage{}
if err := json.NewDecoder(strings.NewReader(msg.Payload)).Decode(sdpMessage); err != nil {
return err
}
if err := stream.Send(&proto.SignalingMessage{
Room: room,
Sender: sdpMessage.Sender,
Receiver: &name,
Message: &proto.SignalingMessage_SessionAnswer{
SessionAnswer: sdpMessage,
},
}); err != nil {
return err
}
}
}
}