implement trickle ice

This commit is contained in:
guochao 2024-05-26 14:03:34 +08:00
parent f02c30bbdc
commit 7ab9485cb0
3 changed files with 99 additions and 4 deletions

View File

@ -266,6 +266,8 @@ func (client *SignalClient) HandleConnection(ctx context.Context, grpcClient *gr
client.OnOffer(ctx, stream, room, msg.Sender, inner.SessionOffer.SDP)
case *proto.SignalingMessage_SessionAnswer:
client.OnAnswer(ctx, msg.Sender, inner.SessionAnswer.SDP)
case *proto.SignalingMessage_ICECandidate:
client.OnIceCandidate(ctx, msg.Sender, inner.ICECandidate.Candidate)
}
}
}
@ -298,6 +300,27 @@ func (client *SignalClient) OnDiscoverResponse(ctx context.Context, stream proto
if err != nil {
return
}
peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) {
if i == nil {
return
}
buffer := &bytes.Buffer{}
if err := json.NewEncoder(buffer).Encode(i.ToJSON()); err != nil {
return
}
stream.Send(&proto.SignalingMessage{
Room: client.Room,
Sender: client.Name,
Receiver: &sender,
Message: &proto.SignalingMessage_ICECandidate{
ICECandidate: &proto.ICECandidate{
Candidate: buffer.String(),
Sender: client.Name,
},
},
})
})
dataChannel, err := peerConnection.CreateDataChannel(dataChannelName, nil)
if err != nil {
client.Program.Send(systemMsg(fmt.Sprint("failed to create answer: ", err)))
@ -314,8 +337,10 @@ func (client *SignalClient) OnDiscoverResponse(ctx context.Context, stream proto
}
peerConnection.SetLocalDescription(sdp)
gatherComplete := webrtc.GatheringCompletePromise(peerConnection)
<-gatherComplete
if !trickleIce {
gatherComplete := webrtc.GatheringCompletePromise(peerConnection)
<-gatherComplete
}
buffer := &bytes.Buffer{}
if err := json.NewEncoder(buffer).Encode(peerConnection.LocalDescription()); err != nil {
@ -347,6 +372,26 @@ func (client *SignalClient) OnOffer(ctx context.Context, stream proto.Signaling_
if err != nil {
return
}
peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) {
if i == nil {
return
}
buffer := &bytes.Buffer{}
if err := json.NewEncoder(buffer).Encode(i.ToJSON()); err != nil {
return
}
stream.Send(&proto.SignalingMessage{
Room: client.Room,
Sender: client.Name,
Receiver: &sender,
Message: &proto.SignalingMessage_ICECandidate{
ICECandidate: &proto.ICECandidate{
Candidate: buffer.String(),
Sender: client.Name,
},
},
})
})
var offer webrtc.SessionDescription
if err := json.NewDecoder(strings.NewReader(sdp)).Decode(&offer); err != nil {
client.Program.Send(systemMsg(fmt.Sprint("Failed to decode offer for peer"+sender+": ", err)))
@ -361,8 +406,10 @@ func (client *SignalClient) OnOffer(ctx context.Context, stream proto.Signaling_
}
peerConnection.SetLocalDescription(answer)
gatherComplete := webrtc.GatheringCompletePromise(peerConnection)
<-gatherComplete
if !trickleIce {
gatherComplete := webrtc.GatheringCompletePromise(peerConnection)
<-gatherComplete
}
buffer := &bytes.Buffer{}
if err := json.NewEncoder(buffer).Encode(peerConnection.LocalDescription()); err != nil {
@ -400,6 +447,20 @@ func (client *SignalClient) OnAnswer(ctx context.Context, sender, sdp string) {
}
func (client SignalClient) OnIceCandidate(ctx context.Context, sender, candidate string) {
peerConnection, ok := client.PeerConns[sender]
if !ok {
return
}
candidateInit := webrtc.ICECandidateInit{}
if err := json.NewDecoder(strings.NewReader(candidate)).Decode(&candidateInit); err != nil {
return
}
client.Program.Send(systemMsg("Client " + sender + " has offered candidate: " + candidateInit.Candidate))
peerConnection.AddICECandidate(candidateInit)
}
func (client SignalClient) GetOrCreatePeerConnection(sender string) (*webrtc.PeerConnection, error) {
client.Lock.Lock()
defer client.Lock.Unlock()
@ -469,6 +530,7 @@ var (
serverUrl string
serverRoom string
iceServers []string
trickleIce bool
cmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
@ -488,6 +550,7 @@ func main() {
cmd.Flags().StringVar(&serverUrl, "server", "https://chat.jeffthecoder.xyz", "")
cmd.Flags().StringVar(&serverRoom, "room", "public", "")
cmd.Flags().StringSliceVar(&iceServers, "ice-servers", []string{"stun:nhz.jeffthecoder.xyz:3478", "stun:nhz.jeffthecoder.xyz:3479"}, "")
cmd.Flags().BoolVar(&trickleIce, "trickle-ice", false, "")
cmd.Execute()
}

View File

@ -96,6 +96,16 @@ func (signalingServer SignalingServer) handleStream(ctx context.Context, name st
} else {
log.Printf("peers received session answer: %v", received)
}
case *proto.SignalingMessage_ICECandidate:
payload := &bytes.Buffer{}
if err := json.NewEncoder(payload).Encode(innerMsg.ICECandidate); err != nil {
return err
}
if received, err := signalingServer.redis.Publish(ctx, signalingServer.redisKeyPrefix+":"+msg.Room+":icecandidate:"+*msg.Receiver, payload.String()).Result(); err != nil {
return err
} else {
log.Printf("peers received candidate: %v: %s", received, innerMsg.ICECandidate.Candidate)
}
}
} else {
return err
@ -111,6 +121,7 @@ func (signalingServer SignalingServer) handleRedisPubSub(ctx context.Context, na
signalingServer.redisKeyPrefix+":"+room+":discover:"+name,
signalingServer.redisKeyPrefix+":"+room+":offer:"+name,
signalingServer.redisKeyPrefix+":"+room+":answer:"+name,
signalingServer.redisKeyPrefix+":"+room+":icecandidate:"+name,
)
defer pubsub.Unsubscribe(ctx)
defer pubsub.Close()
@ -183,6 +194,21 @@ func (signalingServer SignalingServer) handleRedisPubSub(ctx context.Context, na
}); err != nil {
return err
}
case signalingServer.redisKeyPrefix + ":" + room + ":icecandidate:" + name:
candidate := &proto.ICECandidate{}
if err := json.NewDecoder(strings.NewReader(msg.Payload)).Decode(candidate); err != nil {
return err
}
if err := stream.Send(&proto.SignalingMessage{
Room: room,
Sender: candidate.Sender,
Receiver: &name,
Message: &proto.SignalingMessage_ICECandidate{
ICECandidate: candidate,
},
}); err != nil {
return err
}
}
}
}

View File

@ -19,6 +19,11 @@ message SDPMessage {
string Sender = 3;
}
message ICECandidate {
string Candidate = 1;
string Sender = 2;
}
message SignalingMessage {
string Room = 1;
string Sender = 2;
@ -30,6 +35,7 @@ message SignalingMessage {
SDPMessage SessionOffer = 13;
SDPMessage SessionAnswer = 14;
ICECandidate ICECandidate = 15;
};
}