From d0cc4f33a6d06c8f515dc2876c53f63e2027e23d Mon Sep 17 00:00:00 2001 From: guochao Date: Wed, 16 Oct 2024 17:48:15 +0800 Subject: [PATCH] add a sse demo --- Cargo.lock | 33 ++++++---- Cargo.toml | 5 ++ src/bin/server.rs | 42 ++++++++++-- src/frontend.rs | 2 +- src/frontend/app.rs | 3 + src/frontend/components/nav.rs | 1 + src/frontend/pages.rs | 2 + src/frontend/pages/echo.rs | 114 +++++++++++++++++++++++++++++++++ 8 files changed, 180 insertions(+), 22 deletions(-) create mode 100644 src/frontend/pages/echo.rs diff --git a/Cargo.lock b/Cargo.lock index 7dc7787..e4a2bfc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1102,9 +1102,9 @@ checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" [[package]] name = "js-sys" -version = "0.3.70" +version = "0.3.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1868808506b929d7b0cfa8f75951347aa71bb21144b7791bae35d9bccfcfe37a" +checksum = "6a88f1bda2bd75b0452a14784937d796722fdebfe50df998aeb3f0b7603019a9" dependencies = [ "wasm-bindgen", ] @@ -1231,13 +1231,18 @@ dependencies = [ "log", "mime", "mime_guess", + "serde", "thiserror", "tokio", + "tokio-stream", "tower 0.5.1", "tower-http", "tracing", "tracing-subscriber", + "wasm-bindgen", + "wasm-bindgen-futures", "wasm-logger", + "web-sys", "yew", "yew-agent", "yew-router", @@ -2150,9 +2155,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.93" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a82edfc16a6c469f5f44dc7b571814045d60404b55a0ee849f9bcfa2e63dd9b5" +checksum = "128d1e363af62632b8eb57219c8fd7877144af57558fb2ef0368d0087bddeb2e" dependencies = [ "cfg-if", "once_cell", @@ -2161,9 +2166,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.93" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9de396da306523044d3302746f1208fa71d7532227f15e347e2d93e4145dd77b" +checksum = "cb6dd4d3ca0ddffd1dd1c9c04f94b868c37ff5fac97c30b97cff2d74fce3a358" dependencies = [ "bumpalo", "log", @@ -2176,9 +2181,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.43" +version = "0.4.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61e9300f63a621e96ed275155c108eb6f843b6a26d053f122ab69724559dc8ed" +checksum = "cc7ec4f8827a71586374db3e87abdb5a2bb3a15afed140221307c3ec06b1f63b" dependencies = [ "cfg-if", "js-sys", @@ -2188,9 +2193,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.93" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "585c4c91a46b072c92e908d99cb1dcdf95c5218eeb6f3bf1efa991ee7a68cccf" +checksum = "e79384be7f8f5a9dd5d7167216f022090cf1f9ec128e6e6a482a2cb5c5422c56" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -2198,9 +2203,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.93" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836" +checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" dependencies = [ "proc-macro2", "quote", @@ -2211,9 +2216,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.93" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c62a0a307cb4a311d3a07867860911ca130c3494e8c2719593806c08bc5d0484" +checksum = "65fc09f10666a9f147042251e0dda9c18f166ff7de300607007e96bdebc1068d" [[package]] name = "wasm-logger" diff --git a/Cargo.toml b/Cargo.toml index 3d4a837..49a1e98 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,12 +24,17 @@ yew-router = "0.18" gloo = "0.10" yew-agent = "0.3.0" futures = "0.3" +wasm-bindgen = "0.2" +wasm-bindgen-futures = "0.4.45" +serde = { version = "1", features = ["derive"]} +web-sys = "0.3.70" [target.'cfg(not(target_arch = "wasm32"))'.dependencies] # server side axum = { version = "0.7", features = ["multipart", "tracing", "ws"] } console-subscriber = { version = "0.4.0", optional = true } tokio = { version = "1", features = ["full", "tracing"] } +tokio-stream = { version = "0.1", features = ["time"] } tower = { version = "0.5", features = ["tracing", "util"] } tower-http = { version = "0.6", features = ["util", "trace", "catch-panic"] } tracing = "0.1" diff --git a/src/bin/server.rs b/src/bin/server.rs index 4c268d3..903750c 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -1,14 +1,18 @@ -use std::{collections::HashMap, str::FromStr}; - +use axum::response::sse::{Event, KeepAlive}; use axum::{ - extract::Path, + extract::{Path, Query}, http::{HeaderName, HeaderValue, Uri}, response::{Html, IntoResponse, Response}, }; -use networkd::{ServerApp, ServerAppProps, Route}; +use networkd::{Route, ServerApp, ServerAppProps}; +use serde::Deserialize; +use std::convert::Infallible; +use std::{collections::HashMap, str::FromStr}; +use tokio_stream::StreamExt as _; use tower_http::{ - trace::{DefaultMakeSpan, DefaultOnRequest, DefaultOnResponse}, ServiceBuilderExt} -; + trace::{DefaultMakeSpan, DefaultOnRequest, DefaultOnResponse}, + ServiceBuilderExt, +}; use tracing::Level; use yew::ServerRenderer; @@ -17,6 +21,28 @@ use yew_router::Routable; static INDEX_PAGE: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/dist/index.html")); static DIST_DIR: include_dir::Dir = include_dir::include_dir!("$CARGO_MANIFEST_DIR/dist"); +#[derive(Debug, Deserialize)] +struct SSEEchoParams { + #[serde(alias = "s", default)] + content: String, +} + +// echo characters one by one +async fn sse_echo(Query(query): Query) -> impl IntoResponse { + let items: Vec = query.content.chars().map(|c| c.to_string()).collect(); + + let stream = futures::stream::iter(items) + .map(|s| Ok::<_, Infallible>(Event::default().data(s.clone()))) + .chain(futures::stream::pending()) + .throttle(std::time::Duration::from_secs(1)); + + axum::response::sse::Sse::new(stream).keep_alive( + KeepAlive::new() + .interval(std::time::Duration::from_secs(1)) + .text("keepalive"), + ) +} + #[tracing::instrument] async fn rendering_pages( url: Uri, @@ -70,6 +96,8 @@ async fn main() -> anyhow::Result<()> { let api_router = axum::Router::new(); let oauth_router = axum::Router::new(); + let sse_router = axum::Router::new().route("/echo", axum::routing::get(sse_echo)); + let router = axum::Router::new() // not required to login .route("/static/*path", axum::routing::get(static_assets)) @@ -81,7 +109,7 @@ async fn main() -> anyhow::Result<()> { // nesting router .nest("/api/", api_router) .nest("/o/", oauth_router) - + .nest("/sse", sse_router) // fallback .fallback(rendering_pages) .layer( diff --git a/src/frontend.rs b/src/frontend.rs index 7abc2d0..16ced7e 100644 --- a/src/frontend.rs +++ b/src/frontend.rs @@ -1,6 +1,6 @@ pub mod app; -pub mod pages; pub mod components; +pub mod pages; pub mod worker; pub use app::*; diff --git a/src/frontend/app.rs b/src/frontend/app.rs index 7ede78f..e492d49 100644 --- a/src/frontend/app.rs +++ b/src/frontend/app.rs @@ -10,6 +10,8 @@ pub enum Route { Home, #[at("/counter")] Counter, + #[at("/echo")] + Echo, #[not_found] #[at("/404")] NotFound, @@ -22,6 +24,7 @@ fn switch(routes: Route) -> Html { Route::Home => html! { }, Route::Counter => html! { }, Route::NotFound => html! { }, + Route::Echo => html! { }, } } diff --git a/src/frontend/components/nav.rs b/src/frontend/components/nav.rs index b6ad3e4..eeebf82 100644 --- a/src/frontend/components/nav.rs +++ b/src/frontend/components/nav.rs @@ -42,6 +42,7 @@ pub fn nav_bar() -> Html { for (route, title) in [ (Route::Home, "Home"), (Route::Counter, "Counter"), + (Route::Echo, "Echo"), ] { let active_item = if let Some(ref location) = maybe_location { let path = location.path(); diff --git a/src/frontend/pages.rs b/src/frontend/pages.rs index c155a3d..7473732 100644 --- a/src/frontend/pages.rs +++ b/src/frontend/pages.rs @@ -1,7 +1,9 @@ pub mod index; pub mod counter; +pub mod echo; pub mod _404; pub use counter::CounterPage; +pub use echo::EchoPage; pub use index::IndexPage; pub use _404::NotFound; \ No newline at end of file diff --git a/src/frontend/pages/echo.rs b/src/frontend/pages/echo.rs new file mode 100644 index 0000000..b6e5f34 --- /dev/null +++ b/src/frontend/pages/echo.rs @@ -0,0 +1,114 @@ +use futures::{FutureExt, StreamExt}; +use std::rc::Rc; +use std::time::Duration; +use wasm_bindgen::JsCast; +use web_sys::HtmlInputElement; +use yew::prelude::*; +use yew::{function_component, html, use_state_eq, Callback, Html, InputEvent}; + +#[derive(Clone, PartialEq, Default)] +struct ReducibleString(String); +enum StringReducer { + Append(String), +} + +impl Reducible for ReducibleString { + type Action = StringReducer; + fn reduce(self: Rc, action: Self::Action) -> Rc { + match action { + StringReducer::Append(string) => Rc::new(Self(format!("{}{}", self.0, string))), + } + } +} + +#[function_component] +pub fn EchoPage() -> Html { + let response = use_reducer(ReducibleString::default); + + let name = use_state_eq(|| String::new()); + let name_setter_for_input = { + let name = name.clone(); + Callback::from(move |ev: InputEvent| { + let name_writer = name.setter(); + let Some(target) = ev.target() else { + return; + }; + // Events can bubble so this listener might catch events from child + // elements which are not of type HtmlInputElement + let input = target.dyn_into::().ok(); + + if let Some(input) = input { + name_writer.set(input.value()); + } + }) + }; + + let start_sse = { + let name = name.clone(); + let response = response.clone(); + Callback::from(move |_| { + let response = response.clone(); + let url = format!("/sse/echo?s={}", *name); + let mut eventsource = match gloo::net::eventsource::futures::EventSource::new(&url) { + Ok(eventsource) => eventsource, + Err(err) => { + gloo::console::error!(format!("failed to open event source: {}", err)); + return; + } + }; + let mut subscription = match eventsource.subscribe("message") { + Ok(subscription) => subscription, + Err(err) => { + gloo::console::error!(format!("failed to subscribe on event source: {}", err)); + return; + } + }; + wasm_bindgen_futures::spawn_local(async move { + loop { + futures::select_biased! { + message = subscription.next().fuse() => { + let Some(data) = message else { + continue + }; + let (_, event) = match data { + Ok(data) => data, + Err(err) => { + gloo::console::error!(format!("failed to fetch data from event source: {}", err)); + continue; + } + }; + + let Some(data) = event.data().as_string() else { + continue + }; + + response.dispatch(StringReducer::Append(data)); + }, // subscription.next + _ = gloo::timers::future::sleep(Duration::from_secs(5)).fuse() => { + break + }, + } + } + + gloo::console::log!("eventsource 5: ", format!("{:?}", eventsource.state())); + }); + }) + }; + + if (*response).0.len() > 0 { + let response = (*response).clone().0; + html! { + <> + {response} + + } + } else { + let name = (*name).clone(); + html! { + <> + + + + } + } +}