diff --git a/cmd/demo-signal-client/main.go b/cmd/demo-signal-client/main.go index 48f8d95..6594c10 100644 --- a/cmd/demo-signal-client/main.go +++ b/cmd/demo-signal-client/main.go @@ -223,21 +223,22 @@ func (client *SignalClient) ConnectServer(ctx context.Context) { panic(err) } - client.Program.Send(systemMsg("Connecting to room...")) - signal_server := proto.NewSignalingClient(grpcClient) - stream, err := signal_server.Biu(context.Background()) - if err != nil { - panic(err) - } - client.Program.Send(systemMsg("Connected.")) - - go client.HandleConnection(ctx, grpcClient, stream) + go client.HandleConnection(ctx, grpcClient) } -func (client *SignalClient) HandleConnection(ctx context.Context, grpcClient *grpc.ClientConn, stream proto.Signaling_BiuClient) { +func (client *SignalClient) HandleConnection(ctx context.Context, grpcClient *grpc.ClientConn) { defer grpcClient.Close() room := client.Room + signal_client := proto.NewSignalingClient(grpcClient) + + client.Program.Send(systemMsg("Connecting to room...")) + stream, err := signal_client.Biu(context.Background()) + if err != nil { + return + } + client.Program.Send(systemMsg("Connected.")) + client.Program.Send(systemMsg("Waiting for server to be bootstrapped.")) stream.Send(&proto.SignalingMessage{ @@ -248,12 +249,29 @@ func (client *SignalClient) HandleConnection(ctx context.Context, grpcClient *gr client.Program.Send(systemMsg("Bootstrapped.")) for { + if stream == nil { + var err error + client.Program.Send(systemMsg("Connecting to room...")) + stream, err = signal_client.Biu(context.Background()) + if err != nil { + continue + } + client.Program.Send(systemMsg("Connected.")) + stream.Send(&proto.SignalingMessage{ + Sender: client.Name, + Room: room, + Message: &proto.SignalingMessage_Bootstrap{}, + }) + } msg, err := stream.Recv() if err == io.EOF { break } if err != nil { - panic(err) + stream.CloseSend() + stream = nil + client.Program.Send(systemMsg("Reconnecting")) + continue } switch inner := msg.Message.(type) { case *proto.SignalingMessage_Bootstrap: @@ -449,7 +467,7 @@ func (client *SignalClient) OnAnswer(ctx context.Context, sender, sdp string) { func (client SignalClient) OnIceCandidate(ctx context.Context, sender, candidate string) { peerConnection, ok := client.PeerConns[sender] - if !ok { + if !ok || peerConnection == nil { return } candidateInit := webrtc.ICECandidateInit{} diff --git a/pkg/impl/signal-server/signaling.go b/pkg/impl/signal-server/signaling.go index adaf40e..cc85977 100644 --- a/pkg/impl/signal-server/signaling.go +++ b/pkg/impl/signal-server/signaling.go @@ -51,7 +51,8 @@ func New(options Options) (*SignalingServer, error) { }, nil } -func (signalingServer SignalingServer) handleStream(ctx context.Context, name string, errGroup *errgroup.Group, stream proto.Signaling_BiuServer) func() error { +func (signalingServer SignalingServer) handleStream(ctx context.Context, errGroup *errgroup.Group, stream proto.Signaling_BiuServer) func() error { + var name string return func() error { for { msg, err := stream.Recv() @@ -60,6 +61,11 @@ func (signalingServer SignalingServer) handleStream(ctx context.Context, name st } else if err == nil { switch innerMsg := msg.Message.(type) { case *proto.SignalingMessage_Bootstrap: + if msg.Sender != "" { + name = msg.Sender + } else { + name = signalingServer.names.Generate() + } errGroup.Go(signalingServer.handleRedisPubSub(ctx, name, msg.Room, stream)) case *proto.SignalingMessage_DiscoverRequest: // ignore msg.Receiver, from sender to whole channel @@ -219,11 +225,9 @@ func (signalingServer SignalingServer) Biu(stream proto.Signaling_BiuServer) err ctx, cancel := context.WithCancel(stream.Context()) defer cancel() - name := signalingServer.names.Generate() - errGroup := &errgroup.Group{} - errGroup.Go(signalingServer.handleStream(ctx, name, errGroup, stream)) + errGroup.Go(signalingServer.handleStream(ctx, errGroup, stream)) return errGroup.Wait() }