package main import ( "bytes" "context" "encoding/json" "flag" "fmt" "io" "strings" proto "git.jeffthecoder.xyz/guochao/meow-signaling.jeffthecoder.xyz/pkg/proto/signal-server" "github.com/charmbracelet/bubbles/textarea" "github.com/charmbracelet/bubbles/viewport" tea "github.com/charmbracelet/bubbletea" "github.com/charmbracelet/lipgloss" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "github.com/pion/webrtc/v3" ) type errMsg error type peerMsg struct { Peer string Message string } type systemMsg string type SignalClientUI struct { viewport viewport.Model textarea textarea.Model } var ( StyleSender = lipgloss.NewStyle() StyleSenderBold = StyleSender.Bold(true) StyleError = lipgloss.NewStyle().Foreground(lipgloss.Color("red")) StylePeer = lipgloss.NewStyle() StylePeerSelected = lipgloss.NewStyle().Background(lipgloss.Color("gray")) ) type SignalClient struct { UI SignalClientUI Program *tea.Program Room string Name string Messages []string Error error Ready bool MessageIsValid bool Message string Receiver string PeerConns map[string]*webrtc.PeerConnection Channels map[string]*webrtc.DataChannel } func New(room, name string) *SignalClient { ta := textarea.New() ta.Prompt = "Send a message" ta.Focus() ta.Prompt = " | " ta.ShowLineNumbers = false ta.SetHeight(1) ta.FocusedStyle.CursorLine = lipgloss.NewStyle() vp := viewport.New(30, 5) vp.SetContent(`Welcome to the chat room! Type a message and press Enter to send.`) ta.KeyMap.InsertNewline.SetEnabled(false) client := &SignalClient{ UI: SignalClientUI{ viewport: vp, textarea: ta, }, Room: room, Name: name, PeerConns: make(map[string]*webrtc.PeerConnection), Channels: make(map[string]*webrtc.DataChannel), } return client } func (client *SignalClient) Init() tea.Cmd { go client.ConnectServer(context.Background()) return textarea.Blink } func (client *SignalClient) Update(msg tea.Msg) (tea.Model, tea.Cmd) { var ( tiCmd tea.Cmd vpCmd tea.Cmd ) client.UI.textarea, tiCmd = client.UI.textarea.Update(msg) client.UI.viewport, vpCmd = client.UI.viewport.Update(msg) switch msg := msg.(type) { case tea.KeyMsg: switch msg.Type { case tea.KeyCtrlC, tea.KeyEsc, tea.KeyCtrlD: fmt.Println() return client, tea.Quit case tea.KeyEnter: if client.MessageIsValid { client.Channels[client.Receiver].SendText(client.Message) client.Messages = append(client.Messages, StyleSenderBold.Render("You -> "+client.Receiver+": ")+client.Message) client.UI.viewport.SetContent(strings.Join(client.Messages, "\n")) client.UI.textarea.Reset() client.UI.viewport.GotoBottom() } } case tea.WindowSizeMsg: client.UI.textarea.SetWidth(msg.Width) client.UI.viewport.Width = msg.Width client.UI.viewport.Height = msg.Height - 1 // We handle errors just like any other message case errMsg: client.Error = msg return client, nil case peerMsg: client.Messages = append(client.Messages, StyleSenderBold.Render(msg.Peer+" -> You: ")+msg.Message) client.UI.viewport.SetContent(strings.Join(client.Messages, "\n")) client.UI.viewport.GotoBottom() case systemMsg: client.Messages = append(client.Messages, string(msg)) client.UI.viewport.SetContent(strings.Join(client.Messages, "\n")) client.UI.viewport.GotoBottom() } if selected, message, ok := strings.Cut(client.UI.textarea.Value(), ">"); ok { selected = strings.TrimSpace(selected) message = strings.TrimLeft(message, " \t") if _, ok := client.PeerConns[selected]; ok { client.MessageIsValid = true client.Message = message client.Receiver = selected } else { client.MessageIsValid = false } } else { client.MessageIsValid = false } if client.MessageIsValid { client.UI.textarea.Prompt = " | " } else { client.UI.textarea.Prompt = " ? " } return client, tea.Batch(tiCmd, vpCmd) } func (client *SignalClient) View() string { return fmt.Sprint( client.UI.viewport.View()+"\n", StyleError.String()+client.UI.textarea.View()+"\n", ) } func (client *SignalClient) ConnectServer(ctx context.Context) { client.Program.Send(systemMsg("Dialing to server...")) grpcClient, err := grpc.Dial("127.0.0.1:4444", grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { panic(err) } client.Program.Send(systemMsg("Connecting to room...")) signal_server := proto.NewSignalingClient(grpcClient) stream, err := signal_server.Connect(context.Background()) if err != nil { panic(err) } client.Program.Send(systemMsg("Connected.")) go client.HandleConnection(ctx, grpcClient, stream) } func (client *SignalClient) HandleConnection(ctx context.Context, grpcClient *grpc.ClientConn, stream proto.Signaling_ConnectClient) { defer grpcClient.Close() room := client.Room clientId := client.Name client.Program.Send(systemMsg("Waiting for server to be bootstrapped.")) stream.Send(&proto.SignalingMessage{ Room: room, Sender: clientId, Message: &proto.SignalingMessage_Bootstrap{}, }) client.Program.Send(systemMsg("Bootstrapped.")) webrtcConfig := webrtc.Configuration{ ICEServers: []webrtc.ICEServer{ { URLs: []string{"stun:nhz.jeffthecoder.xyz:3478", "stun:nhz.jeffthecoder.xyz:3479"}, }, }, } for { msg, err := stream.Recv() if err == io.EOF { break } switch inner := msg.Message.(type) { case *proto.SignalingMessage_Bootstrap: stream.Send(&proto.SignalingMessage{ Room: room, Sender: clientId, Message: &proto.SignalingMessage_DiscoverRequest{}, }) case *proto.SignalingMessage_DiscoverRequest: stream.Send(&proto.SignalingMessage{ Room: room, Sender: clientId, Receiver: &msg.Sender, Message: &proto.SignalingMessage_DiscoverResponse{}, }) case *proto.SignalingMessage_DiscoverResponse: peerConnection, ok := client.PeerConns[msg.Sender] if !ok { pc, err := webrtc.NewPeerConnection(webrtcConfig) if err != nil { continue } dataChannel, err := pc.CreateDataChannel("chan", nil) if err != nil { client.Program.Send(systemMsg(fmt.Sprint("failed to create answer: ", err))) continue } client.Channels[msg.Sender] = dataChannel dataChannel.OnOpen(func() { client.Program.Send(systemMsg(fmt.Sprint("Connected to client: ", msg.Sender))) }) dataChannel.OnMessage(func(dcMsg webrtc.DataChannelMessage) { if dcMsg.IsString { client.Program.Send(peerMsg{ Peer: msg.Sender, Message: string(dcMsg.Data), }) } }) peerConnection = pc } sdp, err := peerConnection.CreateOffer(&webrtc.OfferOptions{}) if err != nil { client.Program.Send(systemMsg(fmt.Sprint("Failed to create offer for peer "+msg.Sender+": ", err))) peerConnection.Close() continue } peerConnection.SetLocalDescription(sdp) buffer := &bytes.Buffer{} if err := json.NewEncoder(buffer).Encode(sdp); err != nil { client.Program.Send(systemMsg(fmt.Sprint("Failed to encode offer for peer "+msg.Sender+": ", err))) peerConnection.Close() continue } client.PeerConns[msg.Sender] = peerConnection stream.Send(&proto.SignalingMessage{ Room: room, Sender: clientId, Receiver: &msg.Sender, Message: &proto.SignalingMessage_SessionOffer{ SessionOffer: &proto.SDPMessage{ SDP: buffer.String(), Type: proto.SDPMessageType_Data, Sender: clientId, }, }, }) defer peerConnection.Close() case *proto.SignalingMessage_SessionOffer: peerConnection, ok := client.PeerConns[msg.Sender] if !ok { pc, err := webrtc.NewPeerConnection(webrtcConfig) if err != nil { client.Program.Send(systemMsg(fmt.Sprint("Failed to create peer connection for peer "+msg.Sender+": ", err))) continue } pc.OnDataChannel(func(dc *webrtc.DataChannel) { client.Program.Send(systemMsg(fmt.Sprint("Connected to peer " + msg.Sender + ": " + dc.Label()))) client.Channels[msg.Sender] = dc dc.OnMessage(func(dcMsg webrtc.DataChannelMessage) { if dcMsg.IsString { client.Program.Send(peerMsg{ Peer: msg.Sender, Message: string(dcMsg.Data), }) } }) }) client.PeerConns[msg.Sender] = pc peerConnection = pc } var offer webrtc.SessionDescription if err := json.NewDecoder(strings.NewReader(inner.SessionOffer.SDP)).Decode(&offer); err != nil { client.Program.Send(systemMsg(fmt.Sprint("Failed to decode offer for peer"+msg.Sender+": ", err))) continue } peerConnection.SetRemoteDescription(offer) sdp, err := peerConnection.CreateAnswer(nil) if err != nil { client.Program.Send(systemMsg(fmt.Sprint("Failed to create answer for peer "+msg.Sender+": ", err))) continue } peerConnection.SetLocalDescription(sdp) gatherComplete := webrtc.GatheringCompletePromise(peerConnection) <-gatherComplete buffer := &bytes.Buffer{} if err := json.NewEncoder(buffer).Encode(peerConnection.LocalDescription()); err != nil { client.Program.Send(systemMsg(fmt.Sprint("Failed to encode answer for peer"+msg.Sender+": ", err))) continue } stream.Send(&proto.SignalingMessage{ Room: room, Sender: clientId, Receiver: &msg.Sender, Message: &proto.SignalingMessage_SessionAnswer{ SessionAnswer: &proto.SDPMessage{ SDP: buffer.String(), Type: proto.SDPMessageType_Data, Sender: clientId, }, }, }) case *proto.SignalingMessage_SessionAnswer: peerConnection, ok := client.PeerConns[msg.Sender] if !ok { continue } var answer webrtc.SessionDescription if err := json.NewDecoder(strings.NewReader(inner.SessionAnswer.SDP)).Decode(&answer); err != nil { client.Program.Send(systemMsg(fmt.Sprint("Failed to decode answer for peer"+msg.Sender+": ", err))) continue } peerConnection.SetRemoteDescription(answer) gatherComplete := webrtc.GatheringCompletePromise(peerConnection) <-gatherComplete } } } func (client *SignalClient) OnBootstrapReady(ctx context.Context) error { panic("not implemented") } func (client *SignalClient) OnDiscoverRequest(ctx context.Context, sender string) error { panic("not implemented") } func (client *SignalClient) OnDiscoverResponse(ctx context.Context, sender string) error { panic("not implemented") } func (client *SignalClient) OnOffer(ctx context.Context, sender, sdp string) error { panic("not implemented") } func (client *SignalClient) OnAnswer(ctx context.Context, sender, sdp string) error { panic("not implemented") } func main() { flag.Parse() if flag.NArg() != 2 { panic("invalid usage") } signalClient := New(flag.Arg(0), flag.Arg(1)) signalClient.Program = tea.NewProgram(signalClient) if _, err := signalClient.Program.Run(); err != nil { panic("err") } // room := flag.Arg(0) // clientId := flag.Arg(1) // log.Println("dialing...") // client, err := grpc.Dial("127.0.0.1:4444", grpc.WithTransportCredentials(insecure.NewCredentials())) // if err != nil { // panic(err) // } // defer client.Close() // log.Println("connecting...client id: ", clientId) // signal_server := proto.NewSignalingClient(client) // stream, err := signal_server.Connect(context.Background()) // if err != nil { // panic(err) // } // log.Println("connected. discovering ", clientId, " -> ", room) // stream.Send(&proto.SignalingMessage{ // Room: room, // Sender: clientId, // Message: &proto.SignalingMessage_Bootstrap{}, // }) // webrtcConfig := webrtc.Configuration{ // ICEServers: []webrtc.ICEServer{ // { // URLs: []string{"stun:nhz.jeffthecoder.xyz:3478", "stun:nhz.jeffthecoder.xyz:3479"}, // }, // }, // } // connections := make(map[string]*webrtc.PeerConnection) // channels := make(map[string]*webrtc.DataChannel) // for { // msg, err := stream.Recv() // if err == io.EOF { // break // } // switch inner := msg.Message.(type) { // case *proto.SignalingMessage_Bootstrap: // stream.Send(&proto.SignalingMessage{ // Room: room, // Sender: clientId, // Message: &proto.SignalingMessage_DiscoverRequest{}, // }) // case *proto.SignalingMessage_DiscoverRequest: // time.Sleep(time.Second * 3) // log.Println("received discover request from ", msg.Sender, ", responding") // stream.Send(&proto.SignalingMessage{ // Room: room, // Sender: clientId, // Receiver: &msg.Sender, // Message: &proto.SignalingMessage_DiscoverResponse{}, // }) // case *proto.SignalingMessage_DiscoverResponse: // log.Println("received discover response from ", msg.Sender, ", offering") // peerConnection, ok := connections[msg.Sender] // if !ok { // pc, err := webrtc.NewPeerConnection(webrtcConfig) // if err != nil { // log.Println("failed to create peer connection: ", err) // continue // } // pc.OnConnectionStateChange(func(pcs webrtc.PeerConnectionState) { // log.Printf("Peer Connection(%v) State has changed: %s\n", msg.Sender, pcs) // }) // pc.OnICEConnectionStateChange(func(is webrtc.ICEConnectionState) { // log.Printf("ICE Connection(%v) State has changed: %s\n", msg.Sender, is) // }) // pc.OnICECandidate(func(i *webrtc.ICECandidate) { // log.Printf("ICE Candidate for %v: %s\n", msg.Sender, i) // }) // dataChannel, err := pc.CreateDataChannel("chan", nil) // if err != nil { // log.Println("failed to create answer: ", err) // continue // } // channels[msg.Sender] = dataChannel // dataChannel.OnOpen(func() { // log.Println("data channel opened") // }) // peerConnection = pc // } // sdp, err := peerConnection.CreateOffer(&webrtc.OfferOptions{}) // if err != nil { // log.Println("failed to create offer: ", err) // peerConnection.Close() // continue // } // log.Print("set local: ", sdp) // peerConnection.SetLocalDescription(sdp) // buffer := &bytes.Buffer{} // if err := json.NewEncoder(buffer).Encode(sdp); err != nil { // log.Println("failed to encode offer: ", err) // peerConnection.Close() // continue // } // connections[msg.Sender] = peerConnection // stream.Send(&proto.SignalingMessage{ // Room: room, // Sender: clientId, // Receiver: &msg.Sender, // Message: &proto.SignalingMessage_SessionOffer{ // SessionOffer: &proto.SDPMessage{ // SDP: buffer.String(), // Type: proto.SDPMessageType_Data, // Sender: clientId, // }, // }, // }) // defer peerConnection.Close() // case *proto.SignalingMessage_SessionOffer: // log.Println("received session offer: ", inner.SessionOffer.SDP) // peerConnection, ok := connections[msg.Sender] // if !ok { // pc, err := webrtc.NewPeerConnection(webrtcConfig) // if err != nil { // log.Println("failed to create peer connection: ", err) // continue // } // pc.OnConnectionStateChange(func(pcs webrtc.PeerConnectionState) { // log.Printf("Peer Connection(%v) State has changed: %s\n", msg.Sender, pcs) // }) // pc.OnICEConnectionStateChange(func(is webrtc.ICEConnectionState) { // log.Printf("ICE Connection(%v) State has changed: %s\n", msg.Sender, is) // }) // pc.OnICECandidate(func(i *webrtc.ICECandidate) { // log.Printf("ICE Candidate for %v: %s\n", msg.Sender, i) // }) // pc.OnDataChannel(func(dc *webrtc.DataChannel) { // log.Println("DataChannel ", dc.Label()) // channels[msg.Sender] = dc // }) // connections[msg.Sender] = pc // peerConnection = pc // } // var offer webrtc.SessionDescription // if err := json.NewDecoder(strings.NewReader(inner.SessionOffer.SDP)).Decode(&offer); err != nil { // log.Println("failed to decode offer: ", err) // continue // } // log.Println("set remote: ", offer) // peerConnection.SetRemoteDescription(offer) // sdp, err := peerConnection.CreateAnswer(nil) // if err != nil { // log.Println("failed to create answer: ", err) // continue // } // log.Println("set local: ", sdp) // peerConnection.SetLocalDescription(sdp) // gatherComplete := webrtc.GatheringCompletePromise(peerConnection) // <-gatherComplete // buffer := &bytes.Buffer{} // if err := json.NewEncoder(buffer).Encode(peerConnection.LocalDescription()); err != nil { // log.Println("failed to encode answer: ", err) // continue // } // log.Println("answering: ", buffer.String()) // stream.Send(&proto.SignalingMessage{ // Room: room, // Sender: clientId, // Receiver: &msg.Sender, // Message: &proto.SignalingMessage_SessionAnswer{ // SessionAnswer: &proto.SDPMessage{ // SDP: buffer.String(), // Type: proto.SDPMessageType_Data, // Sender: clientId, // }, // }, // }) // case *proto.SignalingMessage_SessionAnswer: // log.Println("received session anser: ", inner.SessionAnswer.SDP) // peerConnection, ok := connections[msg.Sender] // if !ok { // log.Println("no connection found. there might be some mistakes") // continue // } // var answer webrtc.SessionDescription // if err := json.NewDecoder(strings.NewReader(inner.SessionAnswer.SDP)).Decode(&answer); err != nil { // log.Println("failed to decode answer: ", err) // continue // } // log.Println("set remote: ", answer) // peerConnection.SetRemoteDescription(answer) // gatherComplete := webrtc.GatheringCompletePromise(peerConnection) // <-gatherComplete // } // } }