use std::pin::Pin; use fred::prelude::*; use futures::stream::*; use signaling::{signaling_server::*, SignalingMessage}; use tonic::Status; use tokio::sync::mpsc::{Receiver, Sender}; /// A wrapper around [`tokio::sync::mpsc::Receiver`] that implements [`Stream`]. /// /// [`tokio::sync::mpsc::Receiver`]: struct@tokio::sync::mpsc::Receiver /// [`Stream`]: trait@crate::Stream #[derive(Debug)] pub struct ReceiverStream { inner: Receiver, } impl ReceiverStream { /// Create a new `ReceiverStream`. pub fn new(recv: Receiver) -> Self { Self { inner: recv } } /// Get back the inner `Receiver`. pub fn into_inner(self) -> Receiver { self.inner } /// Closes the receiving half of a channel without dropping it. /// /// This prevents any further messages from being sent on the channel while /// still enabling the receiver to drain messages that are buffered. Any /// outstanding [`Permit`] values will still be able to send messages. /// /// To guarantee no messages are dropped, after calling `close()`, you must /// receive all items from the stream until `None` is returned. /// /// [`Permit`]: struct@tokio::sync::mpsc::Permit pub fn close(&mut self) { self.inner.close(); } } impl Stream for ReceiverStream { type Item = T; fn poll_next( mut self: Pin<&mut Self>, cx: &mut futures::task::Context<'_>, ) -> futures::task::Poll> { self.inner.poll_recv(cx) } } impl AsRef> for ReceiverStream { fn as_ref(&self) -> &Receiver { &self.inner } } impl AsMut> for ReceiverStream { fn as_mut(&mut self) -> &mut Receiver { &mut self.inner } } impl From> for ReceiverStream { fn from(recv: Receiver) -> Self { Self::new(recv) } } pub struct SignalingService { pub redis: fred::types::RedisConfig, } impl SignalingService { async fn copy_from_tonic_stream_to_sender( mut stream: tonic::Streaming, stream_tx: Sender, ) -> Result<(), tonic::Status> { while let Some(message_or_error) = stream.next().await { match message_or_error { Ok(message) => { let _ = stream_tx.send(message).await; } Err(status) => return Err(status), } } Ok(()) } async fn copy_from_receiver_to_sender( mut stream_rx: Receiver, stream: tokio::sync::mpsc::Sender>, ) -> Result<(), tonic::Status> { while let Some(msg) = stream_rx.recv().await { let _ = stream.send(Ok(msg)).await; } Ok(()) } async fn handle_with_redis_exchange( stream: tonic::Streaming, tx: Sender>, redis: fred::clients::RedisClient, pubsub: fred::clients::SubscriberClient, ) { let (stream_tx, stream_rx) = tokio::sync::mpsc::channel(128); let (result_tx, result_rx) = tokio::sync::mpsc::channel(128); tokio::spawn(SignalingService::copy_from_tonic_stream_to_sender( stream, stream_tx, )); tokio::spawn(SignalingService::copy_from_receiver_to_sender( result_rx, tx, )); super::redisexchange::send_message_to_peers(stream_rx, result_tx, redis, pubsub).await; tracing::debug!("stream terminated"); } } #[tonic::async_trait] impl Signaling for SignalingService { type BiuStream = Pin> + Send>>; async fn biu( &self, request: tonic::Request>, ) -> Result, Status> { let in_stream = request.into_inner(); let (tx, rx) = tokio::sync::mpsc::channel(128); let builder = fred::types::Builder::from_config(self.redis.clone()); let redis = builder.build().unwrap(); if let Err(err) = redis.init().await { return Err(Status::internal(err.to_string())); } let pubsub = builder.build_subscriber_client().unwrap(); if let Err(err) = pubsub.init().await { return Err(Status::internal(err.to_string())); } tokio::spawn(SignalingService::handle_with_redis_exchange( in_stream, tx, redis, pubsub, )); let output_stream = ReceiverStream::new(rx); Ok(tonic::Response::new( Box::pin(output_stream) as Self::BiuStream )) } }