diff --git a/README.md b/README.md index 27c1473..e4ea1da 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,7 @@ 非常简易用来练手的命令行聊天工具 +> 感谢群友帮忙找到 redis pubsub coroutine 不能释放的问题 + # Quick start - 启动一个redis,不需要存储,redis主要是用pubsub @@ -18,11 +20,10 @@ # 没有做的地方 - 身份认证:就是个练手项目要什么身份认证。需要了自己参考 https://grpc.io/docs/guides/auth/ 去做身份认证 -- 配置项或者 flag:就是个练手项目,就简单一点吧。需要了自己用环境变量或者 flag 或者 cobra/viper 来做配置吧 # 项目组成 - [pkg/proto/signaling](pkg/proto/signaling): 信令协议的部分,就是非常简单的交换信息 - [pkg/impl/signal-server](pkg/impl/signal-server/): 协议的服务器实现部分,具体来说依赖一个 redis的 stun - [cmd/signal-server](cmd/signal-server): 服务器端的命令,写死了很多东西。想要自己用可以自己加 cobra 啊 viper 啊诸如此类的东西。 -- [cmd/demo-signal-client](cmd/demo-signal-client): 客户端命令,依赖我开的一个 coturn/stun 服务,你也可以自己开一个,然后换掉里面的 stun:nhz.jeffthecoder.xyz。使用了 bubbletea,没有维护状态,只维护了名字到连接的一张表,对端有连接就能发,没有连接就发不了。 \ No newline at end of file +- [cmd/demo-signal-client](cmd/demo-signal-client): 客户端命令,依赖我开的一个 coturn/stun 服务,你也可以自己开一个,然后换掉里面的 stun:nhz.jeffthecoder.xyz。使用了 bubbletea,没有维护状态,只维护了名字到连接的一张表,对端有连接就能发,没有连接就发不了。 diff --git a/cmd/demo-signal-client/main.go b/cmd/demo-signal-client/main.go index 2d0a3d5..6d810f4 100644 --- a/cmd/demo-signal-client/main.go +++ b/cmd/demo-signal-client/main.go @@ -3,10 +3,11 @@ package main import ( "bytes" "context" + "crypto/tls" "encoding/json" - "flag" "fmt" "io" + "net/url" "strings" "sync" @@ -15,8 +16,10 @@ import ( "github.com/charmbracelet/bubbles/viewport" tea "github.com/charmbracelet/bubbletea" "github.com/charmbracelet/lipgloss" + "github.com/spf13/cobra" "google.golang.org/grpc" + "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "github.com/pion/webrtc/v3" @@ -52,6 +55,9 @@ type SignalClient struct { webrtcConfig webrtc.Configuration + Server string + Credential credentials.TransportCredentials + Room string Name string @@ -69,7 +75,7 @@ type SignalClient struct { Lock sync.Locker } -func New(room, name string) *SignalClient { +func New(server, room string, iceServers []string) *SignalClient { ta := textarea.New() ta.Prompt = "Send a message" ta.Focus() @@ -86,6 +92,15 @@ Type a message and press Enter to send.`) ta.KeyMap.InsertNewline.SetEnabled(false) + transportCredential := insecure.NewCredentials() + parsedUrl, err := url.Parse(server) + if err != nil { + panic(err) + } + if parsedUrl.Scheme == "https" { + transportCredential = credentials.NewTLS(&tls.Config{}) + } + client := &SignalClient{ UI: SignalClientUI{ viewport: vp, @@ -95,13 +110,15 @@ Type a message and press Enter to send.`) webrtcConfig: webrtc.Configuration{ ICEServers: []webrtc.ICEServer{ { - URLs: []string{"stun:nhz.jeffthecoder.xyz:3478", "stun:nhz.jeffthecoder.xyz:3479"}, + URLs: iceServers, }, }, }, + Server: fmt.Sprintf("%v:%v", parsedUrl.Hostname(), parsedUrl.Port()), + Credential: transportCredential, + Room: room, - Name: name, PeerConns: make(map[string]*webrtc.PeerConnection), Channels: make(map[string]*webrtc.DataChannel), @@ -192,7 +209,7 @@ func (client *SignalClient) View() string { func (client *SignalClient) ConnectServer(ctx context.Context) { client.Program.Send(systemMsg("Dialing to server...")) - grpcClient, err := grpc.Dial("127.0.0.1:4444", grpc.WithTransportCredentials(insecure.NewCredentials())) + grpcClient, err := grpc.Dial(client.Server, grpc.WithTransportCredentials(client.Credential)) if err != nil { panic(err) } @@ -211,13 +228,11 @@ func (client *SignalClient) ConnectServer(ctx context.Context) { func (client *SignalClient) HandleConnection(ctx context.Context, grpcClient *grpc.ClientConn, stream proto.Signaling_BiuClient) { defer grpcClient.Close() room := client.Room - clientId := client.Name client.Program.Send(systemMsg("Waiting for server to be bootstrapped.")) stream.Send(&proto.SignalingMessage{ Room: room, - Sender: clientId, Message: &proto.SignalingMessage_Bootstrap{}, }) @@ -233,13 +248,13 @@ func (client *SignalClient) HandleConnection(ctx context.Context, grpcClient *gr } switch inner := msg.Message.(type) { case *proto.SignalingMessage_Bootstrap: - client.OnBootstrapReady(ctx, stream, room, clientId) + client.OnBootstrapReady(ctx, stream, room, msg.Sender) case *proto.SignalingMessage_DiscoverRequest: - client.OnDiscoverRequest(ctx, stream, room, clientId, msg.Sender) + client.OnDiscoverRequest(ctx, stream, room, msg.Sender) case *proto.SignalingMessage_DiscoverResponse: - client.OnDiscoverResponse(ctx, stream, room, clientId, msg.Sender) + client.OnDiscoverResponse(ctx, stream, room, msg.Sender) case *proto.SignalingMessage_SessionOffer: - client.OnOffer(ctx, stream, room, clientId, msg.Sender, inner.SessionOffer.SDP) + client.OnOffer(ctx, stream, room, msg.Sender, inner.SessionOffer.SDP) case *proto.SignalingMessage_SessionAnswer: client.OnAnswer(ctx, msg.Sender, inner.SessionAnswer.SDP) } @@ -247,25 +262,27 @@ func (client *SignalClient) HandleConnection(ctx context.Context, grpcClient *gr } func (client *SignalClient) OnBootstrapReady(ctx context.Context, stream proto.Signaling_BiuClient, room, name string) { + client.Name = name + client.Program.Send(systemMsg("Server ready!")) stream.Send(&proto.SignalingMessage{ Room: room, - Sender: name, + Sender: client.Name, Message: &proto.SignalingMessage_DiscoverRequest{}, }) } -func (client *SignalClient) OnDiscoverRequest(ctx context.Context, stream proto.Signaling_BiuClient, room, name, sender string) { +func (client *SignalClient) OnDiscoverRequest(ctx context.Context, stream proto.Signaling_BiuClient, room, sender string) { client.Program.Send(systemMsg("Client " + sender + " is joining into the room " + room)) stream.Send(&proto.SignalingMessage{ Room: room, - Sender: name, + Sender: client.Name, Receiver: &sender, Message: &proto.SignalingMessage_DiscoverResponse{}, }) } -func (client *SignalClient) OnDiscoverResponse(ctx context.Context, stream proto.Signaling_BiuClient, room, name, sender string) { +func (client *SignalClient) OnDiscoverResponse(ctx context.Context, stream proto.Signaling_BiuClient, room, sender string) { client.Program.Send(systemMsg("Client " + sender + " ponged")) peerConnection, err := client.GetOrCreatePeerConnection(sender) @@ -302,19 +319,19 @@ func (client *SignalClient) OnDiscoverResponse(ctx context.Context, stream proto stream.Send(&proto.SignalingMessage{ Room: room, - Sender: name, + Sender: client.Name, Receiver: &sender, Message: &proto.SignalingMessage_SessionOffer{ SessionOffer: &proto.SDPMessage{ SDP: buffer.String(), Type: proto.SDPMessageType_Data, - Sender: name, + Sender: client.Name, }, }, }) } -func (client *SignalClient) OnOffer(ctx context.Context, stream proto.Signaling_BiuClient, room, name, sender, sdp string) { +func (client *SignalClient) OnOffer(ctx context.Context, stream proto.Signaling_BiuClient, room, sender, sdp string) { client.Program.Send(systemMsg("Client " + sender + " is offering")) peerConnection, err := client.GetOrCreatePeerConnection(sender) @@ -346,13 +363,13 @@ func (client *SignalClient) OnOffer(ctx context.Context, stream proto.Signaling_ stream.Send(&proto.SignalingMessage{ Room: room, - Sender: name, + Sender: client.Name, Receiver: &sender, Message: &proto.SignalingMessage_SessionAnswer{ SessionAnswer: &proto.SDPMessage{ SDP: buffer.String(), Type: proto.SDPMessageType_Data, - Sender: name, + Sender: client.Name, }, }, }) @@ -439,18 +456,29 @@ func (client *SignalClient) SetupDataChannel(pc *webrtc.PeerConnection, dc *webr }) } +var ( + serverUrl string + serverRoom string + iceServers []string + + cmd = &cobra.Command{ + Run: func(cmd *cobra.Command, args []string) { + + signalClient := New(serverUrl, serverRoom, iceServers) + + signalClient.Program = tea.NewProgram(signalClient) + + if _, err := signalClient.Program.Run(); err != nil { + panic("err") + } + }, + } +) + func main() { - flag.Parse() + cmd.Flags().StringVar(&serverUrl, "server", "https://chat.jeffthecoder.xyz", "") + cmd.Flags().StringVar(&serverRoom, "room", "public", "") + cmd.Flags().StringSliceVar(&iceServers, "ice-servers", []string{"stun:nhz.jeffthecoder.xyz:3478", "stun:nhz.jeffthecoder.xyz:3479"}, "") - if flag.NArg() != 2 { - panic("invalid usage") - } - - signalClient := New(flag.Arg(0), flag.Arg(1)) - - signalClient.Program = tea.NewProgram(signalClient) - - if _, err := signalClient.Program.Run(); err != nil { - panic("err") - } + cmd.Execute() } diff --git a/cmd/signal-server/main.go b/cmd/signal-server/main.go index 5ad7530..708ec7a 100644 --- a/cmd/signal-server/main.go +++ b/cmd/signal-server/main.go @@ -3,29 +3,44 @@ package main import ( "log" "net" + "time" signal_server "git.jeffthecoder.xyz/public/chat-signaling-server/pkg/impl/signal-server" proto "git.jeffthecoder.xyz/public/chat-signaling-server/pkg/proto/signaling" + "github.com/spf13/cobra" "google.golang.org/grpc" ) +var ( + serverAddr string + signalSvrOpt signal_server.Options + + cmd = &cobra.Command{ + Run: func(cmd *cobra.Command, args []string) { + impl, err := signal_server.New(signalSvrOpt) + if err != nil { + panic(err) + } + listener, err := net.Listen("tcp4", serverAddr) + if err != nil { + panic(err) + } + grpcServer := grpc.NewServer() + proto.RegisterSignalingServer(grpcServer, impl) + log.Println("listening on", serverAddr) + if err := grpcServer.Serve(listener); err != nil { + panic(err) + } + }, + } +) + func main() { - impl, err := signal_server.New(signal_server.Options{ - RedisServers: []string{"127.0.0.1:6379"}, - RedisDatabase: 0, - RedisKeyPrefix: "signaling", - }) - if err != nil { - panic(err) - } - listener, err := net.Listen("tcp4", "0.0.0.0:4444") - if err != nil { - panic(err) - } - grpcServer := grpc.NewServer() - proto.RegisterSignalingServer(grpcServer, impl) - log.Println("listening on 4444") - if err := grpcServer.Serve(listener); err != nil { - panic(err) - } + cmd.Flags().StringVar(&serverAddr, "listen", "0.0.0.0:4444", "") + cmd.Flags().StringSliceVar(&signalSvrOpt.RedisServers, "redis-server", []string{"127.0.0.1:6379"}, "") + cmd.Flags().IntVar(&signalSvrOpt.RedisDatabase, "redis-db", 0, "") + cmd.Flags().StringVar(&signalSvrOpt.RedisKeyPrefix, "redis-key-prefix", "signaling", "") + cmd.Flags().Int64Var(&signalSvrOpt.NameRandomSeed, "random-seed", time.Now().Unix(), "") + + cmd.Execute() } diff --git a/flake.lock b/flake.lock index 21a40b6..4acf8b7 100644 --- a/flake.lock +++ b/flake.lock @@ -2,16 +2,16 @@ "nodes": { "nixpkgs": { "locked": { - "lastModified": 1706173671, - "narHash": "sha256-lciR7kQUK2FCAYuszyd7zyRRmTaXVeoZsCyK6QFpGdk=", + "lastModified": 1706253973, + "narHash": "sha256-zhqTp2IJjpaYME+8fcYkTGozJTHTthEwqvDYJR+oYGA=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "4fddc9be4eaf195d631333908f2a454b03628ee5", + "rev": "e9f86cbf65f8ad31b9191395e79a2fc561fbf781", "type": "github" }, "original": { "owner": "NixOS", - "ref": "nixpkgs-unstable", + "ref": "release-23.11", "repo": "nixpkgs", "type": "github" } diff --git a/flake.nix b/flake.nix index 7a3b81e..cbc4106 100644 --- a/flake.nix +++ b/flake.nix @@ -1,33 +1,44 @@ { inputs = { - nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable"; + nixpkgs.url = "github:NixOS/nixpkgs/release-23.11"; }; - outputs = { nixpkgs, ... }: { + outputs = { nixpkgs, ... }: rec { packages = nixpkgs.lib.genAttrs [ "x86_64-linux" "x86_64-darwin" ] (system: let pkgs = import nixpkgs { inherit system; }; in { default = pkgs.buildGoModule { pname = "demo-signaling-server"; version = "0.0.1"; + nativeBuildInputs = with pkgs; [ + protobuf + protoc-gen-go + protoc-gen-go-grpc + ]; + src = ./.; CGO_ENABLED = "0"; GOFLAGS = "-ldflags='-extldflags=-static -w -s"; - - vendorHash = "sha256-IUPGl5vHLyzbTVYsCLu4lIWoyq0h96deQ7q/nnVkPjc="; + vendorHash = "sha256-fxjQPK/6IWxnezix8aMMxw3+MZj8XxqnYD5Z9WUsdM4="; }; }); devShells = nixpkgs.lib.genAttrs [ "x86_64-linux" "x86_64-darwin" ] (system: let pkgs = import nixpkgs { inherit system; }; in { - default = pkgs.mkShell { - packages = with pkgs; [ - go - + default = packages."${system}".default.overrideAttrs (prevAttrs: { + GOFLAGS=""; + nativeBuildInputs = prevAttrs.nativeBuildInputs ++ (with pkgs; [ gopls golangci-lint - ]; - }; + delve + gosec + go-outline + gotools + gomodifytags + impl + gotests + ]); + }); }); }; } diff --git a/go.mod b/go.mod index 032bbc5..bc4f327 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,8 @@ require ( github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/google/uuid v1.3.0 // indirect + github.com/goombaio/namegenerator v0.0.0-20181006234301-989e774b106e // indirect + github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/lucasb-eyer/go-colorful v1.2.0 // indirect github.com/mattn/go-isatty v0.0.16 // indirect github.com/mattn/go-localereader v0.0.1 // indirect @@ -46,6 +48,8 @@ require ( github.com/pion/turn/v2 v2.1.0 // indirect github.com/pion/udp/v2 v2.0.1 // indirect github.com/rivo/uniseg v0.2.0 // indirect + github.com/spf13/cobra v1.8.0 // indirect + github.com/spf13/pflag v1.0.5 // indirect golang.org/x/crypto v0.6.0 // indirect golang.org/x/net v0.7.0 // indirect golang.org/x/sys v0.5.0 // indirect diff --git a/go.sum b/go.sum index 5b96c80..30e2a4c 100644 --- a/go.sum +++ b/go.sum @@ -15,6 +15,7 @@ github.com/charmbracelet/lipgloss v0.6.0 h1:1StyZB9vBSOyuZxQUcUwGr17JmojPNm87ini github.com/charmbracelet/lipgloss v0.6.0/go.mod h1:tHh2wr34xcHjC2HCXIlGSG1jaDF0S0atAUvBMP6Ppuk= github.com/containerd/console v1.0.3 h1:lIr7SlA5PxZyMV30bDW0MGbiOPXwc63yRuCP0ARubLw= github.com/containerd/console v1.0.3/go.mod h1:7LqA/THxQ86k76b8c/EMSiaJ3h1eZkMkXar0TQ1gf3U= +github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -40,7 +41,11 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/goombaio/namegenerator v0.0.0-20181006234301-989e774b106e h1:XmA6L9IPRdUr28a+SK/oMchGgQy159wvzXA5tJ7l+40= +github.com/goombaio/namegenerator v0.0.0-20181006234301-989e774b106e/go.mod h1:AFIo+02s+12CEg8Gzz9kzhCbmbq6JcKNrhHffCGA9z4= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= @@ -121,8 +126,13 @@ github.com/redis/go-redis/v9 v9.0.2/go.mod h1:/xDTe9EF1LM61hek62Poq2nzQSGj0xSrEt github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sahilm/fuzzy v0.1.0/go.mod h1:VFvziUEIMCrT6A6tw2RFIXPXXmzXbOsSHF0DOI8ZK9Y= github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZKJ48R1S7H23Ji7oFO5Bw= +github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0= +github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= diff --git a/pkg/impl/signal-server/signaling.go b/pkg/impl/signal-server/signaling.go index 10813de..2a56a21 100644 --- a/pkg/impl/signal-server/signaling.go +++ b/pkg/impl/signal-server/signaling.go @@ -10,6 +10,7 @@ import ( proto "git.jeffthecoder.xyz/public/chat-signaling-server/pkg/proto/signaling" + "github.com/goombaio/namegenerator" "github.com/redis/go-redis/v9" "golang.org/x/sync/errgroup" ) @@ -17,9 +18,10 @@ import ( type SignalingServer struct { proto.UnimplementedSignalingServer - redisKeyPrefix string + names namegenerator.Generator - redis redis.UniversalClient + redisKeyPrefix string + redis redis.UniversalClient } type Options struct { @@ -27,6 +29,8 @@ type Options struct { RedisDatabase int RedisKeyPrefix string + + NameRandomSeed int64 } func New(options Options) (*SignalingServer, error) { @@ -40,13 +44,14 @@ func New(options Options) (*SignalingServer, error) { } return &SignalingServer{ - redis: redisClient, - + redis: redisClient, redisKeyPrefix: options.RedisKeyPrefix, + + names: namegenerator.NewNameGenerator(options.NameRandomSeed), }, nil } -func (signalingServer SignalingServer) handleStream(ctx context.Context, errGroup *errgroup.Group, stream proto.Signaling_BiuServer) func() error { +func (signalingServer SignalingServer) handleStream(ctx context.Context, name string, errGroup *errgroup.Group, stream proto.Signaling_BiuServer) func() error { return func() error { for { msg, err := stream.Recv() @@ -55,19 +60,19 @@ func (signalingServer SignalingServer) handleStream(ctx context.Context, errGrou } else if err == nil { switch innerMsg := msg.Message.(type) { case *proto.SignalingMessage_Bootstrap: - errGroup.Go(signalingServer.handleRedisPubSub(ctx, msg.Sender, msg.Room, stream)) + errGroup.Go(signalingServer.handleRedisPubSub(ctx, name, msg.Room, stream)) case *proto.SignalingMessage_DiscoverRequest: // ignore msg.Receiver, from sender to whole channel if received, err := signalingServer.redis.Publish(ctx, signalingServer.redisKeyPrefix+":"+msg.Room+":discover", msg.Sender).Result(); err != nil { return err } else { - log.Printf("peers received discover request %v -> %v(all): %v", msg.Sender, msg.Room, received) + log.Printf("peers received discover request %v -> %v(all): %v", name, msg.Room, received) } case *proto.SignalingMessage_DiscoverResponse: if received, err := signalingServer.redis.Publish(ctx, signalingServer.redisKeyPrefix+":"+msg.Room+":discover:"+*msg.Receiver, msg.Sender).Result(); err != nil { return err } else { - log.Printf("peers received discover response %v -> %v(%v): %v", msg.Sender, msg.Room, *msg.Receiver, received) + log.Printf("peers received discover response %v -> %v(%v): %v", name, msg.Room, *msg.Receiver, received) } case *proto.SignalingMessage_SessionOffer: payload := &bytes.Buffer{} @@ -181,8 +186,6 @@ func (signalingServer SignalingServer) handleRedisPubSub(ctx context.Context, na } } } - - return nil } } @@ -190,9 +193,11 @@ func (signalingServer SignalingServer) Biu(stream proto.Signaling_BiuServer) err ctx, cancel := context.WithCancel(stream.Context()) defer cancel() + name := signalingServer.names.Generate() + errGroup := &errgroup.Group{} - errGroup.Go(signalingServer.handleStream(ctx, errGroup, stream)) + errGroup.Go(signalingServer.handleStream(ctx, name, errGroup, stream)) return errGroup.Wait() }