first implementation

This commit is contained in:
guochao 2024-05-29 18:49:28 +08:00
commit 3079e602c8
31 changed files with 5415 additions and 0 deletions

1
.envrc Normal file
View File

@ -0,0 +1 @@
use nix --expr '(import <nixpkgs> {}).callPackage ./shell.nix {}'

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
.direnv
.vscode
/target

3824
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

6
Cargo.toml Normal file
View File

@ -0,0 +1,6 @@
[workspace]
resolver = "2"
members = [
"./signaling",
"./chat-signaling-server"
]

15
README.md Normal file
View File

@ -0,0 +1,15 @@
# Rust project scaffold
## use this repo
```
nix-shell -p cargo --run 'cargo update'
nix-shell --expr 'with import <nixpkgs> {}; rustPackages.callPackage ./shell.nix {}'
```
or with flakes
```
nix shell nixpkgs#cargo --run cargo update
nix develop
```

View File

@ -0,0 +1,57 @@
[package]
name = "chat-signaling-server"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[[bin]]
name = "server"
[dependencies]
signaling = { path = "../signaling" }
futures = "0.3"
tokio = { version = "1", features = ["full"] }
axum = { version = "0.6", features = ["http2", "ws"] }
tonic = "0.11"
tonic-web = "0.11"
tracing = "0.1"
tracing-subscriber = "0.3"
tracing-opentelemetry = { version = "0.22", optional = true }
opentelemetry = { version = "0.21", optional = true }
opentelemetry_sdk = { version = "0.21", optional = true }
opentelemetry-stdout = { version = "0.2", features = ["trace"], optional = true }
redis = { version = "0.25", features = ["tokio-comp"] }
sea-orm = { version = "0", features = [ "sqlx-sqlite", "runtime-tokio-rustls", "macros", "mock", "with-chrono", "with-json", "with-uuid" ] }
tera = "1"
serde = "1"
serde_json = "1"
include_dir = { version = "0.7", features = ["metadata", "glob"], optional = true }
tower = { version = "0.4", optional = true }
tower-http = { version = "0.4", features = ["trace"] }
mime_guess = { version = "2", optional = true }
clap = { version = "4", features = ["derive", "env"] }
anyhow = "1"
itertools = "0.13"
names = "0.14"
lazy_static = "1"
[features]
default = [ "embed-templates", "serve-static" ]
otlp = [ "dep:tracing-opentelemetry", "dep:opentelemetry", "dep:opentelemetry_sdk" ]
debug = [ "dep:opentelemetry-stdout" ]
embed-templates = [ "dep:include_dir" ]
embed-static = [ ]
serve-static = ["embed-static", "dep:mime_guess", "dep:tower" ]
extract-static = ["embed-static"]

View File

@ -0,0 +1,72 @@
use std::net::ToSocketAddrs;
use clap::Parser;
use futures::StreamExt;
#[derive(clap::Parser)]
#[command(author, version, about, long_about = None)]
struct Opts {
#[command(subcommand)]
subcommand: Option<Subcommand>,
}
#[derive(clap::Subcommand)]
enum Subcommand {
Serve {
#[clap(long, short = 'l', default_value = "127.0.0.1", env)]
listen_address: String,
#[clap(long, short = 'p', default_value = "3000", env)]
listen_port: u16,
},
#[cfg(feature = "extract-static")]
Extract { destination: String },
}
impl Default for Subcommand {
fn default() -> Self {
Subcommand::Serve {
listen_address: "127.0.0.1".into(),
listen_port: 3000,
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let opts = Opts::parse();
tracing_subscriber::fmt::init();
match opts.subcommand.unwrap_or_default() {
Subcommand::Serve {
listen_address,
listen_port,
} => {
let addrs: Vec<_> = (listen_address.as_str(), listen_port)
.to_socket_addrs()?
.collect();
let service = chat_signaling_server::web::routes::make_service()?;
let mut futures =
futures::stream::FuturesUnordered::from_iter(addrs.iter().map(|addr| async {
let addr = addr.clone();
tracing::info! {?addr, "app is running..."};
axum::Server::bind(&addr).serve(service.clone()).await
}));
while let Some(Err(error)) = futures.next().await {
tracing::error!(?error, "failed");
unreachable!()
}
}
#[cfg(feature = "extract-static")]
Subcommand::Extract { destination } => {
staticfiles::extract(destination).await?;
}
}
Ok(())
}

View File

@ -0,0 +1,3 @@
pub mod signaling;
pub mod types;
pub mod web;

View File

@ -0,0 +1,3 @@
pub mod grpc;
mod redisexchange;
pub mod websocket;

View File

@ -0,0 +1,157 @@
use std::{pin::Pin, sync::Arc};
use futures::stream::*;
use signaling::{signaling_server::*, SignalingMessage};
use tonic::Status;
use tokio::sync::{
mpsc::{Receiver, Sender},
Mutex,
};
/// A wrapper around [`tokio::sync::mpsc::Receiver`] that implements [`Stream`].
///
/// [`tokio::sync::mpsc::Receiver`]: struct@tokio::sync::mpsc::Receiver
/// [`Stream`]: trait@crate::Stream
#[derive(Debug)]
pub struct ReceiverStream<T> {
inner: Receiver<T>,
}
impl<T> ReceiverStream<T> {
/// Create a new `ReceiverStream`.
pub fn new(recv: Receiver<T>) -> Self {
Self { inner: recv }
}
/// Get back the inner `Receiver`.
pub fn into_inner(self) -> Receiver<T> {
self.inner
}
/// Closes the receiving half of a channel without dropping it.
///
/// This prevents any further messages from being sent on the channel while
/// still enabling the receiver to drain messages that are buffered. Any
/// outstanding [`Permit`] values will still be able to send messages.
///
/// To guarantee no messages are dropped, after calling `close()`, you must
/// receive all items from the stream until `None` is returned.
///
/// [`Permit`]: struct@tokio::sync::mpsc::Permit
pub fn close(&mut self) {
self.inner.close();
}
}
impl<T> Stream for ReceiverStream<T> {
type Item = T;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut futures::task::Context<'_>,
) -> futures::task::Poll<Option<Self::Item>> {
self.inner.poll_recv(cx)
}
}
impl<T> AsRef<Receiver<T>> for ReceiverStream<T> {
fn as_ref(&self) -> &Receiver<T> {
&self.inner
}
}
impl<T> AsMut<Receiver<T>> for ReceiverStream<T> {
fn as_mut(&mut self) -> &mut Receiver<T> {
&mut self.inner
}
}
impl<T> From<Receiver<T>> for ReceiverStream<T> {
fn from(recv: Receiver<T>) -> Self {
Self::new(recv)
}
}
pub struct SignalingService {
pub redis: redis::Client,
}
impl SignalingService {
async fn copy_from_tonic_stream_to_sender(
mut stream: tonic::Streaming<SignalingMessage>,
stream_tx: Sender<SignalingMessage>,
) -> Result<(), tonic::Status> {
while let Some(message_or_error) = stream.next().await {
match message_or_error {
Ok(message) => {
let _ = stream_tx.send(message).await;
}
Err(status) => return Err(status),
}
}
Ok(())
}
async fn copy_from_receiver_to_sender(
mut stream_rx: Receiver<SignalingMessage>,
stream: tokio::sync::mpsc::Sender<Result<SignalingMessage, tonic::Status>>,
) -> Result<(), tonic::Status> {
while let Some(msg) = stream_rx.recv().await {
let _ = stream.send(Ok(msg)).await;
}
Ok(())
}
async fn handle_with_redis_exchange(
stream: tonic::Streaming<SignalingMessage>,
tx: Sender<Result<SignalingMessage, Status>>,
redis: redis::aio::MultiplexedConnection,
pubsub: Arc<Mutex<redis::aio::PubSub>>,
) {
let (stream_tx, stream_rx) = tokio::sync::mpsc::channel(128);
let (result_tx, result_rx) = tokio::sync::mpsc::channel(128);
tokio::spawn(SignalingService::copy_from_tonic_stream_to_sender(
stream, stream_tx,
));
tokio::spawn(SignalingService::copy_from_receiver_to_sender(
result_rx, tx,
));
super::redisexchange::send_message_to_peers(stream_rx, result_tx, redis, pubsub).await;
tracing::debug!("stream terminated");
}
}
#[tonic::async_trait]
impl Signaling for SignalingService {
type BiuStream =
Pin<Box<dyn Stream<Item = Result<signaling::SignalingMessage, tonic::Status>> + Send>>;
async fn biu(
&self,
request: tonic::Request<tonic::Streaming<signaling::SignalingMessage>>,
) -> Result<tonic::Response<Self::BiuStream>, Status> {
let in_stream = request.into_inner();
let (tx, rx) = tokio::sync::mpsc::channel(128);
let redis = self.redis.get_multiplexed_tokio_connection().await.unwrap();
let pubsub = match self.redis.get_async_pubsub().await {
Ok(pubsub) => pubsub,
Err(err) => return Err(Status::unknown(err.to_string())),
};
tokio::spawn(SignalingService::handle_with_redis_exchange(
in_stream,
tx,
redis,
Arc::new(Mutex::new(pubsub)),
));
let output_stream = ReceiverStream::new(rx);
Ok(tonic::Response::new(
Box::pin(output_stream) as Self::BiuStream
))
}
}

View File

@ -0,0 +1,342 @@
use std::sync::Arc;
use redis::AsyncCommands;
use signaling::{IceCandidate, SdpMessage, SignalingMessage};
use tokio::sync::{
mpsc::{Receiver, Sender},
Mutex,
};
use futures::StreamExt;
enum RedisChannel {
DiscoverRequest,
DiscoverResponse(String),
SessionOffer(String),
SessionAnswer(String),
SessionIceCandidate(String),
}
impl ToString for RedisChannel {
fn to_string(&self) -> String {
match self {
RedisChannel::DiscoverRequest => format!("discover"),
RedisChannel::DiscoverResponse(name) => format!("discover:{name}"),
RedisChannel::SessionOffer(name) => format!("offer:{name}"),
RedisChannel::SessionAnswer(name) => format!("answer:{name}"),
RedisChannel::SessionIceCandidate(name) => format!("icecandidate:{name}"),
}
}
}
impl TryFrom<&str> for RedisChannel {
type Error = String;
fn try_from(value: &str) -> Result<Self, Self::Error> {
if let Some((left, right)) = value.split_once(":") {
if left == "discover" {
return Ok(Self::DiscoverResponse(right.to_string()));
}
if left == "offer" {
return Ok(Self::SessionOffer(right.to_string()));
}
if left == "answer" {
return Ok(Self::SessionAnswer(right.to_string()));
}
if left == "icecandidate" {
return Ok(Self::SessionIceCandidate(right.to_string()));
}
}
if value == "discover" {
return Ok(Self::DiscoverRequest);
}
return Err(value.to_string());
}
}
pub async fn send_message_to_peers(
mut messages_to_peers: Receiver<SignalingMessage>,
messages_from_peers: Sender<SignalingMessage>,
mut redis: redis::aio::MultiplexedConnection,
pubsub: Arc<Mutex<redis::aio::PubSub>>,
) {
let mut name = String::default();
let (closing_tx, closing_rx) = tokio::sync::mpsc::channel(1);
let closing_rx = Arc::new(Mutex::new(closing_rx));
while let Some(message) = messages_to_peers.recv().await {
let inner_message = message.message.unwrap();
let room: String = message.room;
match inner_message {
signaling::signaling_message::Message::Bootstrap(()) => {
if message.sender != "" {
name = message.sender.to_string();
} else {
let mut gen = names::Generator::default();
name = gen.next().unwrap();
}
tokio::spawn(receive_message_from_peers(
name.clone(),
room,
messages_from_peers.clone(),
pubsub.clone(),
closing_rx.clone(),
));
}
signaling::signaling_message::Message::DiscoverRequest(()) => {
let peers = match redis
.publish::<_, _, u64>(format!("chatchat:{room}:discover"), name.clone())
.await
{
Ok(peers) => peers,
Err(error) => {
tracing::error!(?error, "failed to publish discover request");
break;
}
};
tracing::info!(peers, room, sender = name, "broadcasting discover")
}
signaling::signaling_message::Message::DiscoverResponse(()) => {
let receiver = message.receiver.unwrap();
let peers = match redis
.publish::<_, _, u64>(
format!("chatchat:{room}:discover:{receiver}"),
name.clone(),
)
.await
{
Ok(peers) => peers,
Err(error) => {
tracing::error!(?error, "failed to publish discover request");
break;
}
};
tracing::info!(peers, room, sender = name, receiver, "reply to discover");
}
signaling::signaling_message::Message::SessionOffer(sdp) => {
let msg = serde_json::to_string(&sdp).unwrap();
let receiver = message.receiver.unwrap();
let peers = match redis
.publish::<_, _, u64>(format!("chatchat:{room}:offer:{receiver}"), msg)
.await
{
Ok(peers) => peers,
Err(error) => {
tracing::error!(?error, "failed to publish offer");
break;
}
};
tracing::info!(peers, room, sender = name, receiver, "offering")
}
signaling::signaling_message::Message::SessionAnswer(sdp) => {
let msg = serde_json::to_string(&sdp).unwrap();
let receiver = message.receiver.unwrap();
let peers = match redis
.publish::<_, _, u64>(format!("chatchat:{room}:answer:{receiver}"), msg)
.await
{
Ok(peers) => peers,
Err(error) => {
tracing::error!(?error, "failed to publish offer");
break;
}
};
tracing::info!(peers, room, sender = name, receiver, "answering")
}
signaling::signaling_message::Message::IceCandidate(candidate) => {
let msg = serde_json::to_string(&candidate).unwrap();
let receiver = message.receiver.unwrap();
let peers = match redis
.publish::<_, _, u64>(format!("chatchat:{room}:icecandidate:{receiver}"), msg)
.await
{
Ok(peers) => peers,
Err(error) => {
tracing::error!(?error, "failed to publish offer");
break;
}
};
tracing::info!(peers, room, sender = name, receiver, "candidate")
}
}
}
tracing::debug!(name, "stopped send to peer");
let _ = closing_tx.send(()).await;
}
pub async fn receive_message_from_peers(
name: String,
room: String,
tx: Sender<SignalingMessage>,
pubsub: Arc<Mutex<redis::aio::PubSub>>,
closing_rx: Arc<Mutex<Receiver<()>>>,
) {
let mut closing_rx = closing_rx.lock().await;
let mut pubsub = pubsub.lock().await;
let discover_request_channel = format!(
"chatchat:{room}:{}",
RedisChannel::DiscoverRequest.to_string()
);
let discover_response_channel = format!(
"chatchat:{room}:{}",
RedisChannel::DiscoverResponse(name.clone()).to_string()
);
let session_offer_channel = format!(
"chatchat:{room}:{}",
RedisChannel::SessionOffer(name.clone()).to_string()
);
let session_answer_channel = format!(
"chatchat:{room}:{}",
RedisChannel::SessionAnswer(name.clone()).to_string()
);
let session_ice_channel = format!(
"chatchat:{room}:{}",
RedisChannel::SessionIceCandidate(name.clone()).to_string()
);
tracing::trace!(room, name, channels=?[
&discover_request_channel,
&discover_response_channel,
&session_offer_channel,
&session_answer_channel,
&session_ice_channel,
], "subscribed");
pubsub
.subscribe(&[
discover_request_channel,
discover_response_channel,
session_offer_channel,
session_answer_channel,
session_ice_channel,
])
.await
.unwrap();
let mut messages = pubsub.on_message();
tracing::info!(room, name = name.clone(), "connection ready");
let _ = tx
.send(SignalingMessage {
room: room.clone(),
sender: name.clone(),
receiver: None,
message: Some(signaling::signaling_message::Message::Bootstrap(())),
})
.await;
loop {
tokio::select! {
_ = closing_rx.recv() => {
break;
},
maybe_message = messages.next() => {
let message = if let Some(message) = maybe_message {
message
} else {
break;
};
let channel = match message
.get_channel_name()
.strip_prefix(&format!("chatchat:{}:", room.clone()))
{
Some(channel) => channel,
_ => continue,
};
let channel = match RedisChannel::try_from(channel) {
Ok(channel) => channel,
Err(unrecognized) => {
tracing::warn!(unrecognized, "unrecognized");
continue;
}
};
let payload: String = match message.get_payload() {
Ok(msg) => msg,
_ => continue,
};
match channel {
RedisChannel::DiscoverRequest => {
if payload == name {
continue;
}
tracing::info!(room, sender = payload, name, "new peer in room");
let _ = tx
.send(SignalingMessage {
room: room.clone(),
sender: payload,
receiver: None,
message: Some(signaling::signaling_message::Message::DiscoverRequest(())),
})
.await;
}
RedisChannel::DiscoverResponse(name) => {
tracing::info!(room, sender = payload, name, "peer connecting");
let _ = tx
.send(SignalingMessage {
room: room.clone(),
sender: payload,
receiver: Some(name),
message: Some(signaling::signaling_message::Message::DiscoverResponse(())),
})
.await;
}
RedisChannel::SessionOffer(name) => {
let sdp: SdpMessage = match serde_json::from_str(&payload) {
Ok(sdp) => sdp,
_ => continue,
};
tracing::info!(room, sender = sdp.sender, name, "peer offering");
let _ = tx
.send(SignalingMessage {
room: room.clone(),
sender: sdp.sender.clone(),
receiver: Some(name),
message: Some(signaling::signaling_message::Message::SessionOffer(sdp)),
})
.await;
}
RedisChannel::SessionAnswer(name) => {
let sdp: SdpMessage = match serde_json::from_str(&payload) {
Ok(sdp) => sdp,
_ => continue,
};
tracing::info!(room, sender = sdp.sender, name, "peer answer");
let _ = tx
.send(SignalingMessage {
room: room.clone(),
sender: sdp.sender.clone(),
receiver: Some(name),
message: Some(signaling::signaling_message::Message::SessionAnswer(sdp)),
})
.await;
}
RedisChannel::SessionIceCandidate(name) => {
let candidate: IceCandidate = match serde_json::from_str(&payload) {
Ok(candidate) => candidate,
_ => continue,
};
tracing::info!(room, sender = candidate.sender, name, "peer candidate");
let _ = tx
.send(SignalingMessage {
room: room.clone(),
sender: candidate.sender.clone(),
receiver: Some(name.clone()),
message: Some(signaling::signaling_message::Message::IceCandidate(
candidate,
)),
})
.await;
}
}
}
}
}
tracing::debug!(name, "stopped recv from peer");
}

View File

@ -0,0 +1,79 @@
use axum::{extract::State, response::IntoResponse};
use crate::{signaling::redisexchange, types::AppState};
use axum::extract::ws::Message;
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock};
pub async fn handle_message_from_frontend(
mut ws: axum::extract::ws::WebSocket,
redis: redis::aio::MultiplexedConnection,
pubsub: Arc<Mutex<redis::aio::PubSub>>,
) {
let (peer_tx, mut peer_rx) = tokio::sync::mpsc::channel(128);
let (browser_tx, browser_rx) = tokio::sync::mpsc::channel(128);
tokio::spawn(redisexchange::send_message_to_peers(
browser_rx, peer_tx, redis, pubsub,
));
loop {
tokio::select! {
msg_from_peer = peer_rx.recv() => {
let msg = match msg_from_peer {
Some(msg) => msg,
None => break
};
let _ = ws.send(axum::extract::ws::Message::Text(serde_json::to_string(&msg).unwrap())).await;
},
msg_from_ws = ws.recv() => {
let msg = match msg_from_ws {
Some(msg) => msg,
None => break
};
let msg = match msg {
Ok(msg) => msg,
Err(error) => {
tracing::error!(?error, "websocket read error");
break;
}
};
match msg {
Message::Text(text) => {
let msg = match serde_json::from_str(&text) {
Ok(msg) => msg,
Err(error) => {
tracing::warn!(?error, "failed to decode websocket");
continue;
},
};
let _ = browser_tx.send(msg).await;
}
_ => continue,
}
}
}
}
// browser_rx.close();
}
pub async fn handle_ws_upgrade(
upgrade: axum::extract::WebSocketUpgrade,
State(state): State<Arc<RwLock<AppState>>>,
) -> Result<impl IntoResponse, String> {
let state = state.read().await;
let redis: redis::aio::MultiplexedConnection =
match state.redis.get_multiplexed_tokio_connection().await {
Ok(redis) => redis,
Err(err) => return Err(err.to_string()),
};
let pubsub = match state.clone().redis.get_async_pubsub().await {
Ok(pubsub) => pubsub,
Err(err) => return Err(err.to_string()),
};
Ok(upgrade
.on_upgrade(|ws| handle_message_from_frontend(ws, redis, Arc::new(Mutex::new(pubsub)))))
}

View File

@ -0,0 +1,12 @@
use serde::Serialize;
#[derive(Serialize, Default)]
pub struct PageContext {
navigation: String,
}
#[derive(Clone)]
pub struct AppState {
pub templates: tera::Tera,
pub redis: redis::Client,
}

View File

@ -0,0 +1,3 @@
pub mod routes;
pub mod staticfiles;
pub mod templates;

View File

@ -0,0 +1,85 @@
use std::sync::Arc;
use axum::{
extract::State,
response::{ErrorResponse, Html},
routing::{get, post, IntoMakeService},
Json, Router,
};
use serde::Serialize;
use tokio::sync::RwLock;
use tower_http::trace::TraceLayer;
use crate::{
signaling::grpc::SignalingService as GrpcSignalingService,
types::{AppState, PageContext},
web::templates::load_templates,
};
#[cfg(feature = "serve-static")]
use crate::web::staticfiles;
async fn root(State(state): State<Arc<RwLock<AppState>>>) -> axum::response::Result<Html<String>> {
let state = state.read().await;
let ctx = match tera::Context::from_serialize(&PageContext::default()) {
Ok(ctx) => ctx,
Err(err) => return Err(ErrorResponse::from(format!("{err}"))),
};
match state.templates.render("index.html", &ctx) {
Ok(result) => Ok(Html::from(result)),
Err(err) => Err(ErrorResponse::from(format!("{err}"))),
}
}
#[derive(Serialize, Default)]
struct ReloadResult {
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<String>,
}
async fn reload(State(state): State<Arc<RwLock<AppState>>>) -> Json<ReloadResult> {
let mut state = state.write_owned().await;
if let Err(err) = state.templates.full_reload() {
return Json(ReloadResult {
error: Some(err.to_string()),
});
}
drop(state);
Json(ReloadResult::default())
}
pub fn make_service() -> Result<IntoMakeService<Router<()>>, Box<dyn std::error::Error>> {
let templates = load_templates()?;
let router = Router::new();
#[cfg(feature = "serve-static")]
let router = router.route_service("/static/*path", staticfiles::StaticFiles::strip("/static/"));
let redis = redis::Client::open("redis://127.0.0.1")?;
let grpc_service = tonic_web::enable(signaling::signaling_server::SignalingServer::new(
GrpcSignalingService {
redis: redis.clone(),
},
));
Ok(router
.route("/", get(root))
.route("/reload", post(reload))
.route(
"/signaling.Signaling/*rpc",
axum::routing::any_service(grpc_service.clone()),
)
.route("/ws", get(crate::signaling::websocket::handle_ws_upgrade))
.with_state(Arc::new(RwLock::new(crate::types::AppState {
templates: templates,
redis,
// names: Arc::new(Mutex::new(names::Generator::default())),
})))
.layer(TraceLayer::new_for_http())
// .into_make_service_with_connect_info()
.into_make_service())
}

View File

@ -0,0 +1,155 @@
#[cfg(feature = "embed-static")]
static STATIC_DIR: include_dir::Dir<'_> = include_dir::include_dir!("$CARGO_MANIFEST_DIR/static");
#[cfg(feature = "extract-static")]
pub async fn extract<P: AsRef<std::path::Path>>(to: P) -> std::io::Result<()> {
let base_path = to.as_ref();
let mut dirs = vec![STATIC_DIR.clone()];
tracing::info!("extracing static assets...");
while let Some(dir) = dirs.pop() {
for entry in dir.entries() {
let path = base_path.join(entry.path());
match entry {
DirEntry::Dir(d) => {
tracing::trace!(dir=?d, "directory been put into queue");
tokio::fs::create_dir_all(&path).await?;
dirs.insert(0, d.clone());
}
DirEntry::File(f) => {
tracing::trace!(?path, "file extracted");
tokio::fs::write(path, f.contents()).await?;
}
}
}
}
Ok(())
}
#[cfg(feature = "serve-static")]
pub use with_service::router;
#[cfg(feature = "serve-static")]
pub use with_service::StaticFiles;
#[cfg(feature = "serve-static")]
mod with_service {
use std::{convert::Infallible, task::Poll};
use axum::{
body::{Bytes, Full},
extract::Path,
http::{Request, StatusCode},
response::{IntoResponse, Response},
routing::{MethodFilter, MethodRouter},
};
use futures::Future;
use include_dir::Dir;
use super::STATIC_DIR;
async fn head(Path(static_path): Path<String>) -> Response {
if super::STATIC_DIR.contains(static_path) {
(StatusCode::OK, "").into_response()
} else {
(StatusCode::NOT_FOUND, "").into_response()
}
}
async fn get(Path(static_path): Path<String>) -> Response {
if let Some(file) = super::STATIC_DIR.get_file(static_path) {
(StatusCode::OK, file.contents()).into_response()
} else {
(StatusCode::NOT_FOUND, "").into_response()
}
}
#[derive(Clone)]
pub enum ResponseFuture {
OpenFileFuture(&'static [u8], String),
NotFoundFuture,
}
impl Future for ResponseFuture {
type Output = Result<Response, Infallible>;
fn poll(
self: std::pin::Pin<&mut Self>,
_: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
match self.clone() {
ResponseFuture::OpenFileFuture(content, mime_type) => {
Poll::Ready(Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", mime_type.clone())
.body(axum::body::boxed(Full::new(Bytes::copy_from_slice(
content,
))))
.unwrap()))
}
ResponseFuture::NotFoundFuture => {
Poll::Ready(Ok(StatusCode::NOT_FOUND.into_response()))
}
}
}
}
#[derive(Clone, Debug)]
pub struct StaticFiles(Dir<'static>, String);
impl<B> tower::Service<Request<B>> for StaticFiles
where
B: Send + 'static,
{
type Response = Response;
type Error = Infallible;
type Future = ResponseFuture;
fn poll_ready(
&mut self,
_: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
std::task::Poll::Ready(Ok(()))
}
fn call(&mut self, req: Request<B>) -> Self::Future {
let path = if let Some(path) = req.uri().path().strip_prefix(&self.1) {
path
} else {
return ResponseFuture::NotFoundFuture;
};
let mime_type = mime_guess::from_path(path)
.first()
.map_or("application/octet-stream".to_string(), |m| {
m.essence_str().to_string()
});
tracing::trace!(path, "trying to get static");
if let Some(file) = self.0.get_file(path) {
ResponseFuture::OpenFileFuture(file.contents(), mime_type.to_string())
} else {
ResponseFuture::NotFoundFuture
}
}
}
impl StaticFiles {
pub fn new() -> Self {
StaticFiles(STATIC_DIR.clone(), "/".to_string())
}
pub fn strip<S: ToString>(prefix: S) -> Self {
StaticFiles(STATIC_DIR.clone(), prefix.to_string())
}
}
pub fn router() -> MethodRouter {
MethodRouter::new()
.on(MethodFilter::HEAD, head)
.on(MethodFilter::GET, get)
}
}

View File

@ -0,0 +1,37 @@
use itertools::Itertools;
#[cfg(feature = "embed-templates")]
static TEMPLATES_DIR: include_dir::Dir<'_> =
include_dir::include_dir!("$CARGO_MANIFEST_DIR/templates");
pub fn load_templates() -> Result<tera::Tera, Box<dyn std::error::Error>> {
let mut templates = tera::Tera::parse("templates/**/*.html")?;
#[cfg(feature = "embed-templates")]
{
let template_names: std::collections::HashSet<_> = templates
.get_template_names()
.map(|s| s.to_string())
.collect();
for entry in TEMPLATES_DIR
.find("**/*.html")?
.into_iter()
.sorted_by(|a, b| Ord::cmp(a.path(), b.path()))
{
if let Some(file) = entry.as_file() {
let path = file.path();
let path = path.to_string_lossy().to_string();
if template_names.contains(&path) {
continue;
}
if let Some(content) = file.contents_utf8() {
templates.add_raw_template(&path, content)?;
}
}
}
}
templates.build_inheritance_chains()?;
Ok(templates)
}

View File

@ -0,0 +1,23 @@
#app {
width: 100%;
height: 100%;
display: flex;
flex-direction: column;
flex-grow: 1;
}
#message-list {
width: 100%;
flex-grow: 1;
overflow-y: scroll;
}
#inputs {
display: flex;
flex-direction: row;
}
#inputs>#message-input {
flex-grow: 1;
}

View File

@ -0,0 +1,14 @@
body {
margin: 0;
padding: 0;
width: 100vw;
height: 100vh;
display: flex;
flex-direction: column;
}
#content {
width: 100%;
flex-grow: 1;
}

View File

@ -0,0 +1,26 @@
<!DOCTYPE html>
<html lang="en">
<head>
{% block head %}
<title>{% block fulltitle %}{% block title %}{% endblock %} - {% block sitename %}Demo{% endblock %}{% endblock %}</title>
{% block stylesheets %}
<link rel="stylesheet" href="/static/style.css" />
{% block pagestylesheet %}{% endblock %}
{% endblock %}
{% endblock %}
</head>
<body>
<div id="content">{% block content %}{% endblock %}</div>
<div id="footer">
{% block footer %}
&copy; Copyright 2008 by <a href="http://domain.invalid/">you</a>.
{% endblock %}
</div>
{% block scripts %}
{% block pagescripts %}{% endblock %}
{% endblock %}
</body>
</html>

View File

@ -0,0 +1,267 @@
{% extends "_layout.html" %}
{% block title %}ChatBox{% endblock %}
{% block pagestylesheet %}
<link rel="stylesheet" href="/static/index.css" />
{% endblock %}
{% block content %}
<div id="app">
<div id="message-list">
</div>
<div id="inputs">
<select id="peers">
</select>
<input id="message-input" />
</div>
</div>
{% endblock %}
{% block pagescripts %}
<script>
(() => {
let message_list = document.getElementById("message-list")
let peers = document.getElementById("peers")
let inputbox = document.getElementById("message-input")
let search = new URLSearchParams(location.search);
let sender = search.get("name") || "";
let room = search.get("room") || "public";
let peerMap = new Map();
let channelMap = new Map();
let ws;
let wsReconnectInterval = 5000;
const MessageBootstrap = "Bootstrap";
const MessageDiscoverRequest = "DiscoverRequest";
const MessageDiscoverResponse = "DiscoverResponse";
const MessageSessionOffer = "SessionOffer";
const MessageSessionAnswer = "SessionAnswer";
const MessageICECandidate = "IceCandidate";
const display = (message) => {
let newNode = document.createElement("div")
newNode.innerHTML = `${new Date().toTimeString()}: ${message}`;
message_list.appendChild(newNode)
}
inputbox.addEventListener("keyup", (e) => {
if (!(e.key === 'Enter' || e.keyCode === 13)) {
return
}
if (!inputbox.value || inputbox.value.trim().length == 0) {
return
}
let channel = channelMap.get(peers.value)
if (!channel) {
return
}
console.log(`You -> ${peers.value}: ${inputbox.value}`)
display(`You -> ${peers.value}: ${inputbox.value}`)
channel.send(inputbox.value)
inputbox.value = ""
})
const addPeer = (peerName) => {
let newNode = document.createElement("option")
newNode.id = peerName
newNode.setAttribute("value", peerName)
newNode.innerText = peerName;
newNode.innerHTML = peerName;
peers.appendChild(newNode)
}
const removePeer = (peerName) => {
let el = document.getElementById(peerName);
if(el) el.remove()
}
let timeout;
let reconnect = () => {
if(ws && (ws.readyState == WebSocket.CONNECTING || ws.readyState == WebSocket.OPEN)) {
console.log("ws ok", ws)
return;
}
let protocol = "ws://";
if (location.protocol === "https") {
protocol = "wss://"
}
ws = new WebSocket(protocol+location.host+"/ws");
ws.addEventListener("error", reconnect)
ws.addEventListener("close", reconnect)
ws.addEventListener("message", ({data}) => {
handle_ws_message(JSON.parse(data))
})
ws.addEventListener("open", () => {
display("server connected. waiting for a name to be assigned for you...")
ws.send(JSON.stringify({
message: {
type: MessageBootstrap,
},
room,
sender,
}))
})
console.log("connecting...")
}
let handle_ws_message = (wsMessage) => {
let recreateAndSetupPeer = async (peerName) => {
if (!peerMap.has(peerName)) {
let peer = new RTCPeerConnection({
iceServers: [{ urls: ["stun:nhz.jeffthecoder.xyz:3478", "stun:nhz.jeffthecoder.xyz:3479", "stun:nhz.jeffthecoder.xyz:13478"] }]
})
peer.addEventListener("signalingstatechange", (ev) => {
console.log("signaling state changed: ", peer.signalingState)
})
peer.addEventListener("connectionstatechange", (ev) => {
console.log("peer connection state changed: ", peer.connectionState)
switch (peer.connectionState) {
case "closed":
break;
case "disconnected":
case "failed":
peer.restartIce()
break
}
})
peer.addEventListener("icecandidate", (ev) => {
if (!ev.candidate) {
console.log("gather end")
return
}
let candidate = ev.candidate.toJSON()
ws.send(JSON.stringify({
message: {
type: MessageICECandidate,
candidate: JSON.stringify(candidate),
sender,
kind: 3,
},
room,
sender,
receiver: wsMessage.sender,
}))
})
peer.addEventListener("icegatheringstatechange", ev => {
console.log("gather", peer.iceGatheringState)
})
peer.addEventListener("datachannel", ({channel}) => {
channelMap.set(peerName, channel);
channel.addEventListener("open", (ev) => {
display("connected in event")
addPeer(peerName)
})
channel.addEventListener("message", (ev) => {
display(`${peerName} -> You: ${ev.data}`)
})
channel.addEventListener("close", () => {
removePeer(peerName)
})
})
peerMap.set(peerName, peer);
}
let peer = peerMap.get(peerName);
let resultSdp = null, resultMessageType;
if(wsMessage.message.type === MessageDiscoverResponse) {
let channel = peer.createDataChannel("chat");
channel.addEventListener("open", (ev) => {
display("connected in event")
addPeer(peerName)
})
channel.addEventListener("message", (ev) => {
display(`${peerName} -> You: ${ev.data}`)
})
channel.addEventListener("close", () => {
removePeer(peerName)
})
channelMap.set(peerName, channel);
resultSdp = await peer.createOffer();
resultMessageType = MessageSessionOffer;
peer.setLocalDescription(resultSdp)
console.log("set local offer")
} else {
peer.setRemoteDescription(JSON.parse(wsMessage.message.sdp))
console.log("set remote offer")
resultSdp = await peer.createAnswer();
resultMessageType = MessageSessionAnswer;
peer.setLocalDescription(resultSdp)
console.log("set local answer")
}
ws.send(JSON.stringify({
message: {
type: resultMessageType,
sdp: JSON.stringify(resultSdp),
sender,
kind: 3,
},
room,
sender,
receiver: wsMessage.sender,
}))
}
let peer = peerMap.get(wsMessage.sender)
switch (wsMessage.message.type) {
case MessageBootstrap:
sender = wsMessage.sender;
ws.send(JSON.stringify({
message: {
type: MessageDiscoverRequest,
},
room,
sender: wsMessage.sender,
}))
display(`You are ${sender}. Searching for peers...`)
break;
case MessageDiscoverRequest:
display("connecting to peer " + wsMessage.sender)
ws.send(JSON.stringify({
message: {
type: MessageDiscoverResponse,
},
room,
sender,
receiver: wsMessage.sender,
}))
break
case MessageDiscoverResponse:
recreateAndSetupPeer(wsMessage.sender)
break;
case MessageSessionOffer:
recreateAndSetupPeer(wsMessage.sender)
break
case MessageSessionAnswer:
display("receiving connection to peer: " + wsMessage.sender)
console.log("set remote answer")
peer.setRemoteDescription(JSON.parse(wsMessage.message.sdp))
peer.restartIce()
break
case MessageICECandidate:
let candidate = JSON.parse(wsMessage.message.candidate);
if(!peer) {
console.warn("candidate dropped", candidate)
return
}
peer.addIceCandidate(candidate)
}
}
reconnect()
console.log("document loaded")
})()
</script>
{% endblock %}

20
default.nix Normal file
View File

@ -0,0 +1,20 @@
{ rustPlatform, lib, callPackage, project-config ? callPackage ./project-config.nix { }, ... }: rustPlatform.buildRustPackage {
pname = "hello";
version = "1.0.0";
nativeBuildInputs = project-config.buildTools;
buildInputs = project-config.libraries;
src = ./.;
cargoLock = {
lockFile = ./Cargo.lock;
};
meta = with lib; {
description = "rust project scaffold";
homepage = "https://git.jeffthecoder.xyz/public/os-flakes";
license = licenses.unlicense;
maintainers = [ ];
};
}

3
extra-tools.nix Normal file
View File

@ -0,0 +1,3 @@
{ rustPackage, ... }: {
kopium = rustPackage.callPackage ./kopium.nix {};
}

29
flake.nix Normal file
View File

@ -0,0 +1,29 @@
{
inputs = {
nixpkgs.url = "nixpkgs/nixos-unstable";
};
outputs = { nixpkgs, fenix, ... }:
let
systems = [ "x86_64-linux" "aarch64-linux" "x86_64-darwin" "aarch64-darwin" ];
foreachSystem = nixpkgs.lib.genAttrs systems;
in
rec {
packages = foreachSystem (system:
let
pkgs = import nixpkgs { inherit system; };
project-config = pkgs.rustPlatform.callPackage ./project-config.nix { };
in
rec {
hello = pkgs.callPackage ./default.nix { };
default = hello;
});
devShells = foreachSystem
(system:
let
pkgs = import nixpkgs { inherit system; };
in
pkgs.callPackage ./shell.nix { }
);
};
}

15
kopium.nix Normal file
View File

@ -0,0 +1,15 @@
{ rustPlatform, lib, fetchCrate, stdenv, darwin, ...}: rustPlatform.buildRustPackage rec {
pname = "kopium";
version = "0.16.2";
cargoHash = "sha256-tpCPRKwA3vv+1p6ab7whIhwVvw6IXqAQaOIw9br6qEY=";
buildInputs = lib.optional stdenv.isDarwin darwin.apple_sdk.frameworks.Security;
doCheck = false;
src = fetchCrate {
inherit pname version;
hash = "sha256-i4tBu/k3EiJA1WcMpTksQzoVWAiD/hvGGyPoPKZD6fQ";
};
}

41
project-config.nix Normal file
View File

@ -0,0 +1,41 @@
{ pkg-config
, callPackage
, sqlite
, cargo-tarpaulin
, protobuf
, rustfmt
, sea-orm-cli
, extra-tools ? callPackage ./extra-tools.nix
, ...
}: {
buildTools = [ pkg-config ];
libraries = [ sqlite ];
developmentTools = [
# bpf-linker
cargo-tarpaulin
# cargo-espflash
# cargo-generate
# cargo-make
# cargo-mobile2
# cargo-tauri
# cargo-watch
# TODO: cargo-xcode
# TODO: create-tauri-app
# cargo-espflash
# TODO: kopium
# TODO: ldproxy
# TODO: paperclip
# sea-orm-cli
# perseus-cli
# trunk
# wasm-bindgen-cli
protobuf
rustfmt
# extra-tools.kopium
];
}

13
shell.nix Normal file
View File

@ -0,0 +1,13 @@
{ rustPackages, stdenv, rustc, rust-analyzer, project-config ? rustPackages.callPackage ./project-config.nix { }, ... }: let
package = rustPackages.callPackage ./default.nix { inherit project-config; };
in
package.overrideAttrs (final: (prevAttrs: {
nativeBuildInputs = prevAttrs.nativeBuildInputs ++ project-config.developmentTools ++ [rust-analyzer];
RUST_SRC_PATH = stdenv.mkDerivation {
inherit (rustc) src;
inherit (rustc.src) name;
phases = [ "unpackPhase" "installPhase" ];
installPhase = ''cp -r library $out'';
};
}))

16
signaling/Cargo.toml Normal file
View File

@ -0,0 +1,16 @@
[package]
name = "signaling"
version = "0.1.0"
edition = "2021"
[dependencies]
tonic = "0.11"
prost = "0.12"
serde = { version = "1" }
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
[build-dependencies]
tonic-build = "0.11"
[dev-dependencies]
serde_json = "1"

21
signaling/build.rs Normal file
View File

@ -0,0 +1,21 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure()
.type_attribute(
"Message",
"#[derive(serde::Deserialize, serde::Serialize)]\n#[serde(tag = \"type\")]",
)
.type_attribute(
"SignalingMessage",
"#[derive(serde::Deserialize, serde::Serialize)]\n#[serde(default)]",
)
.type_attribute(
"SDPMessage",
"#[derive(serde::Deserialize, serde::Serialize)]",
)
.type_attribute(
"ICECandidate",
"#[derive(serde::Deserialize, serde::Serialize)]",
)
.compile(&["proto/signaling.proto"], &["proto"])?;
Ok(())
}

View File

@ -0,0 +1,44 @@
syntax = "proto3";
package signaling;
option go_package = "git.jeffthecoder.xyz/public/chat-signaling-server/pkg/proto/signaling";
import "google/protobuf/empty.proto";
enum SDPMessageKind {
Video = 0;
Audio = 1;
VideoAudio = 2;
Data = 3;
}
message SDPMessage {
string SDP = 1;
SDPMessageKind Kind = 2;
string Sender = 3;
}
message ICECandidate {
string Candidate = 1;
string Sender = 2;
}
message SignalingMessage {
string Room = 1;
string Sender = 2;
optional string Receiver = 3;
oneof Message {
google.protobuf.Empty Bootstrap = 10;
google.protobuf.Empty DiscoverRequest = 11;
google.protobuf.Empty DiscoverResponse = 12;
SDPMessage SessionOffer = 13;
SDPMessage SessionAnswer = 14;
ICECandidate ICECandidate = 15;
};
}
service Signaling {
rpc Biu(stream SignalingMessage) returns (stream SignalingMessage);
}

29
signaling/src/lib.rs Normal file
View File

@ -0,0 +1,29 @@
tonic::include_proto!("signaling"); // The string specified here must match the proto package name
#[cfg(test)]
mod test {
#[test]
fn serde_serialization() {
println!(
"{}",
serde_json::to_string(&crate::SignalingMessage {
message: Some(crate::signaling_message::Message::Bootstrap(())),
..Default::default()
})
.unwrap()
);
println!(
"{}",
serde_json::to_string(&crate::SignalingMessage {
message: Some(crate::signaling_message::Message::SessionOffer(
crate::SdpMessage {
..Default::default()
}
)),
..Default::default()
})
.unwrap()
);
}
}