From b9d0f0b548decf3b8dcc1173df0e1db7dd9cbdd5 Mon Sep 17 00:00:00 2001 From: guochao Date: Thu, 30 May 2024 01:55:13 +0800 Subject: [PATCH] reimplement with fred --- Cargo.lock | 114 +++-- Cargo.nix | 477 ++++++++++-------- chat-signaling-server/Cargo.toml | 2 +- chat-signaling-server/src/signaling.rs | 5 +- chat-signaling-server/src/signaling/grpc.rs | 34 +- .../src/signaling/redisexchange.rs | 102 ++-- .../signaling/{websocket.rs => websockets.rs} | 30 +- chat-signaling-server/src/types.rs | 2 +- chat-signaling-server/src/web/routes.rs | 11 +- 9 files changed, 434 insertions(+), 343 deletions(-) rename chat-signaling-server/src/signaling/{websocket.rs => websockets.rs} (76%) diff --git a/Cargo.lock b/Cargo.lock index c62d15e..2507d07 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -132,6 +132,12 @@ version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "arrayvec" version = "0.7.4" @@ -397,6 +403,16 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" +[[package]] +name = "bytes-utils" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dafe3a8757b027e2be6e4e5601ed563c55989fcf1546e933c66c8eb3a058d35" +dependencies = [ + "bytes", + "either", +] + [[package]] name = "cc" version = "1.0.98" @@ -422,6 +438,7 @@ dependencies = [ "anyhow", "axum", "clap 4.5.4", + "fred", "futures", "include_dir", "itertools 0.13.0", @@ -431,7 +448,6 @@ dependencies = [ "opentelemetry", "opentelemetry-stdout", "opentelemetry_sdk", - "redis", "sea-orm", "serde", "serde_json", @@ -567,26 +583,18 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b6a852b24ab71dffc585bcb46eaf7959d175cb865a7152e35b348d1b2960422" -[[package]] -name = "combine" -version = "4.6.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" -dependencies = [ - "bytes", - "futures-core", - "memchr", - "pin-project-lite", - "tokio", - "tokio-util", -] - [[package]] name = "const-oid" version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" +[[package]] +name = "cookie-factory" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "396de984970346b0d9e93d1415082923c679e5ae5c3ee3dcbd104f5610af126b" + [[package]] name = "core-foundation-sys" version = "0.8.6" @@ -617,6 +625,12 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "crc16" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "338089f42c427b86394a5ee60ff321da23a5c89c9d89514c829687b26359fcff" + [[package]] name = "crossbeam-channel" version = "0.5.13" @@ -786,6 +800,15 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "float-cmp" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98de4bbd547a563b716d8dfa9aad1cb19bfab00f4fa09a6a4ed21dbcf44ce9c4" +dependencies = [ + "num-traits", +] + [[package]] name = "flume" version = "0.11.0" @@ -812,6 +835,32 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fred" +version = "9.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "915e065b377f6e16d5c01eae96bf31eeaf81e1e300b76f938761b3c21307cad8" +dependencies = [ + "arc-swap", + "async-trait", + "bytes", + "bytes-utils", + "crossbeam-queue", + "float-cmp", + "futures", + "log", + "parking_lot", + "rand", + "redis-protocol", + "semver", + "socket2", + "tokio", + "tokio-stream", + "tokio-util", + "url", + "urlencoding", +] + [[package]] name = "funty" version = "2.0.0" @@ -2044,24 +2093,17 @@ dependencies = [ ] [[package]] -name = "redis" -version = "0.25.3" +name = "redis-protocol" +version = "5.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6472825949c09872e8f2c50bde59fcefc17748b6be5c90fd67cd8b4daca73bfd" +checksum = "65deb7c9501fbb2b6f812a30d59c0253779480853545153a51d8e9e444ddc99f" dependencies = [ - "async-trait", "bytes", - "combine", - "futures-util", - "itoa", - "percent-encoding", - "pin-project-lite", - "ryu", - "sha1_smol", - "socket2", - "tokio", - "tokio-util", - "url", + "bytes-utils", + "cookie-factory", + "crc16", + "log", + "nom", ] [[package]] @@ -2380,6 +2422,12 @@ version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" +[[package]] +name = "semver" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" + [[package]] name = "serde" version = "1.0.203" @@ -2444,12 +2492,6 @@ dependencies = [ "digest", ] -[[package]] -name = "sha1_smol" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" - [[package]] name = "sha2" version = "0.10.8" diff --git a/Cargo.nix b/Cargo.nix index 037c32a..c5ebe87 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -414,6 +414,18 @@ rec { }; resolvedDefaultFeatures = [ "default" "std" ]; }; + "arc-swap" = rec { + crateName = "arc-swap"; + version = "1.7.1"; + edition = "2018"; + sha256 = "0mrl9a9r9p9bln74q6aszvf22q1ijiw089jkrmabfqkbj31zixv9"; + authors = [ + "Michal 'vorner' Vaner " + ]; + features = { + "serde" = [ "dep:serde" ]; + }; + }; "arrayvec" = rec { crateName = "arrayvec"; version = "0.7.4"; @@ -1221,6 +1233,33 @@ rec { }; resolvedDefaultFeatures = [ "default" "std" ]; }; + "bytes-utils" = rec { + crateName = "bytes-utils"; + version = "0.1.4"; + edition = "2021"; + sha256 = "0dcd0lxfpj367j9nwm7izj4mkib3slg61rg4wqmpw0kvfnlf7bvx"; + authors = [ + "Michal 'vorner' Vaner " + ]; + dependencies = [ + { + name = "bytes"; + packageId = "bytes"; + usesDefaultFeatures = false; + } + { + name = "either"; + packageId = "either"; + usesDefaultFeatures = false; + } + ]; + features = { + "default" = [ "std" ]; + "serde" = [ "dep:serde" "bytes/serde" ]; + "std" = [ "bytes/default" ]; + }; + resolvedDefaultFeatures = [ "default" "std" ]; + }; "cc" = rec { crateName = "cc"; version = "1.0.98"; @@ -1291,6 +1330,11 @@ rec { packageId = "clap 4.5.4"; features = [ "derive" "env" ]; } + { + name = "fred"; + packageId = "fred"; + features = [ "subscriber-client" ]; + } { name = "futures"; packageId = "futures"; @@ -1334,11 +1378,6 @@ rec { packageId = "opentelemetry_sdk"; optional = true; } - { - name = "redis"; - packageId = "redis"; - features = [ "tokio-comp" ]; - } { name = "sea-orm"; packageId = "sea-orm"; @@ -1399,13 +1438,13 @@ rec { ]; features = { "debug" = [ "dep:opentelemetry-stdout" ]; - "default" = [ "embed-templates" "serve-static" ]; + "default" = [ "embed-templates" "serve-static" "websocket" ]; "embed-templates" = [ "dep:include_dir" ]; "extract-static" = [ "embed-static" ]; "otlp" = [ "dep:tracing-opentelemetry" "dep:opentelemetry" "dep:opentelemetry_sdk" ]; "serve-static" = [ "embed-static" "dep:mime_guess" "dep:tower" ]; }; - resolvedDefaultFeatures = [ "debug" "default" "embed-static" "embed-templates" "extract-static" "otlp" "serve-static" ]; + resolvedDefaultFeatures = [ "debug" "default" "embed-static" "embed-templates" "extract-static" "grpc" "otlp" "serve-static" "websocket" ]; }; "chrono" = rec { crateName = "chrono"; @@ -1787,85 +1826,6 @@ rec { sha256 = "08h4jsrd2j5k6lp1b9v5p1f1g7cmyzm4djsvb3ydywdb4hmqashb"; }; - "combine" = rec { - crateName = "combine"; - version = "4.6.7"; - edition = "2018"; - sha256 = "1z8rh8wp59gf8k23ar010phgs0wgf5i8cx4fg01gwcnzfn5k0nms"; - authors = [ - "Markus Westerlind " - ]; - dependencies = [ - { - name = "bytes"; - packageId = "bytes"; - optional = true; - } - { - name = "futures-core"; - packageId = "futures-core"; - rename = "futures-core-03"; - optional = true; - usesDefaultFeatures = false; - } - { - name = "memchr"; - packageId = "memchr"; - usesDefaultFeatures = false; - } - { - name = "pin-project-lite"; - packageId = "pin-project-lite"; - optional = true; - } - { - name = "tokio"; - packageId = "tokio"; - rename = "tokio-dep"; - optional = true; - usesDefaultFeatures = false; - } - { - name = "tokio-util"; - packageId = "tokio-util"; - optional = true; - usesDefaultFeatures = false; - features = [ "codec" ]; - } - ]; - devDependencies = [ - { - name = "bytes"; - packageId = "bytes"; - } - { - name = "tokio"; - packageId = "tokio"; - rename = "tokio-dep"; - features = [ "fs" "macros" "rt" "rt-multi-thread" "io-util" ]; - } - ]; - features = { - "bytes" = [ "dep:bytes" ]; - "bytes_05" = [ "dep:bytes_05" ]; - "default" = [ "std" ]; - "futures-03" = [ "pin-project" "std" "futures-core-03" "futures-io-03" "pin-project-lite" ]; - "futures-core-03" = [ "dep:futures-core-03" ]; - "futures-io-03" = [ "dep:futures-io-03" ]; - "pin-project" = [ "pin-project-lite" ]; - "pin-project-lite" = [ "dep:pin-project-lite" ]; - "regex" = [ "dep:regex" ]; - "std" = [ "memchr/std" "bytes" "alloc" ]; - "tokio" = [ "tokio-dep" "tokio-util/io" "futures-core-03" "pin-project-lite" ]; - "tokio-02" = [ "pin-project" "std" "tokio-02-dep" "futures-core-03" "pin-project-lite" "bytes_05" ]; - "tokio-02-dep" = [ "dep:tokio-02-dep" ]; - "tokio-03" = [ "pin-project" "std" "tokio-03-dep" "futures-core-03" "pin-project-lite" ]; - "tokio-03-dep" = [ "dep:tokio-03-dep" ]; - "tokio-dep" = [ "dep:tokio-dep" ]; - "tokio-util" = [ "dep:tokio-util" ]; - }; - resolvedDefaultFeatures = [ "alloc" "bytes" "futures-core-03" "pin-project-lite" "std" "tokio" "tokio-dep" "tokio-util" ]; - }; "const-oid" = rec { crateName = "const-oid"; version = "0.9.6"; @@ -1878,6 +1838,20 @@ rec { "arbitrary" = [ "dep:arbitrary" ]; }; }; + "cookie-factory" = rec { + crateName = "cookie-factory"; + version = "0.3.2"; + edition = "2018"; + sha256 = "0sqjmw85ckqhppff6gjwmvjpkii35441a51xx7cv0ih3jy2fjv9r"; + authors = [ + "Geoffroy Couprie " + "Pierre Chifflier " + ]; + features = { + "default" = [ "std" ]; + }; + resolvedDefaultFeatures = [ "default" "std" ]; + }; "core-foundation-sys" = rec { crateName = "core-foundation-sys"; version = "0.8.6"; @@ -1949,6 +1923,16 @@ rec { "Akhil Velagapudi " ]; + }; + "crc16" = rec { + crateName = "crc16"; + version = "0.4.0"; + edition = "2015"; + sha256 = "1zzwb5iv51wnh96532cxkk4aa8ys47rhzrjy98wqcys25ks8k01k"; + authors = [ + "AIkorsky " + ]; + }; "crossbeam-channel" = rec { crateName = "crossbeam-channel"; @@ -2393,6 +2377,30 @@ rec { "serde" = [ "dep:serde" ]; }; }; + "float-cmp" = rec { + crateName = "float-cmp"; + version = "0.9.0"; + edition = "2018"; + sha256 = "1i799ksbq7fj9rm9m82g1yqgm6xi3jnrmylddmqknmksajylpplq"; + libName = "float_cmp"; + authors = [ + "Mike Dilger " + ]; + dependencies = [ + { + name = "num-traits"; + packageId = "num-traits"; + optional = true; + usesDefaultFeatures = false; + } + ]; + features = { + "default" = [ "ratio" ]; + "num-traits" = [ "dep:num-traits" ]; + "ratio" = [ "num-traits" ]; + }; + resolvedDefaultFeatures = [ "default" "num-traits" "ratio" ]; + }; "flume" = rec { crateName = "flume"; version = "0.11.0"; @@ -2467,6 +2475,132 @@ rec { }; resolvedDefaultFeatures = [ "alloc" "default" "std" ]; }; + "fred" = rec { + crateName = "fred"; + version = "9.0.3"; + edition = "2021"; + sha256 = "1n6a0w9w5cv1hy9nzdq0wghq3bzf66zrdbhyq3aicvkz6xdhcpli"; + authors = [ + "Alec Embke " + ]; + dependencies = [ + { + name = "arc-swap"; + packageId = "arc-swap"; + } + { + name = "async-trait"; + packageId = "async-trait"; + } + { + name = "bytes"; + packageId = "bytes"; + } + { + name = "bytes-utils"; + packageId = "bytes-utils"; + } + { + name = "crossbeam-queue"; + packageId = "crossbeam-queue"; + } + { + name = "float-cmp"; + packageId = "float-cmp"; + } + { + name = "futures"; + packageId = "futures"; + features = [ "std" ]; + } + { + name = "log"; + packageId = "log"; + } + { + name = "parking_lot"; + packageId = "parking_lot"; + } + { + name = "rand"; + packageId = "rand"; + } + { + name = "redis-protocol"; + packageId = "redis-protocol"; + features = [ "resp2" "resp3" "bytes" ]; + } + { + name = "semver"; + packageId = "semver"; + } + { + name = "socket2"; + packageId = "socket2"; + } + { + name = "tokio"; + packageId = "tokio"; + features = [ "net" "sync" "rt" "rt-multi-thread" "macros" ]; + } + { + name = "tokio-stream"; + packageId = "tokio-stream"; + } + { + name = "tokio-util"; + packageId = "tokio-util"; + features = [ "codec" ]; + } + { + name = "url"; + packageId = "url"; + } + { + name = "urlencoding"; + packageId = "urlencoding"; + } + ]; + devDependencies = [ + { + name = "tokio-stream"; + packageId = "tokio-stream"; + features = [ "sync" ]; + } + ]; + features = { + "blocking-encoding" = [ "tokio/rt-multi-thread" ]; + "default" = [ "transactions" "i-std" ]; + "dns" = [ "trust-dns-resolver" "trust-dns-resolver/tokio" ]; + "enable-native-tls" = [ "native-tls" "tokio-native-tls" ]; + "enable-rustls" = [ "rustls" "tokio-rustls" "rustls-native-certs" "rustls/std" "tokio-rustls/logging" "tokio-rustls/tls12" "tokio-rustls/aws_lc_rs" ]; + "enable-rustls-ring" = [ "rustls" "tokio-rustls" "rustls-native-certs" "rustls/std" "tokio-rustls/logging" "tokio-rustls/tls12" "tokio-rustls/ring" ]; + "full-tracing" = [ "partial-tracing" ]; + "i-all" = [ "i-acl" "i-client" "i-cluster" "i-config" "i-geo" "i-hashes" "i-hyperloglog" "i-keys" "i-lists" "i-scripts" "i-memory" "i-pubsub" "i-server" "i-streams" "i-tracking" "i-sorted-sets" "i-slowlog" "i-sets" ]; + "i-geo" = [ "i-sorted-sets" ]; + "i-redis-json" = [ "serde-json" ]; + "i-redis-stack" = [ "i-redis-json" "i-time-series" ]; + "i-std" = [ "i-hashes" "i-keys" "i-lists" "i-sets" "i-streams" "i-pubsub" "i-sorted-sets" "i-server" ]; + "i-tracking" = [ "i-client" "i-pubsub" ]; + "monitor" = [ "nom" ]; + "native-tls" = [ "dep:native-tls" ]; + "nom" = [ "dep:nom" ]; + "partial-tracing" = [ "tracing" "tracing-futures" ]; + "rustls" = [ "dep:rustls" ]; + "rustls-native-certs" = [ "dep:rustls-native-certs" ]; + "serde-json" = [ "serde_json" ]; + "serde_json" = [ "dep:serde_json" ]; + "sha-1" = [ "dep:sha-1" ]; + "subscriber-client" = [ "i-pubsub" ]; + "tokio-native-tls" = [ "dep:tokio-native-tls" ]; + "tokio-rustls" = [ "dep:tokio-rustls" ]; + "tracing" = [ "dep:tracing" ]; + "tracing-futures" = [ "dep:tracing-futures" ]; + "trust-dns-resolver" = [ "dep:trust-dns-resolver" ]; + "vendored-openssl" = [ "enable-native-tls" "native-tls/vendored" ]; + }; + resolvedDefaultFeatures = [ "default" "i-hashes" "i-keys" "i-lists" "i-pubsub" "i-server" "i-sets" "i-sorted-sets" "i-std" "i-streams" "subscriber-client" "transactions" ]; + }; "funty" = rec { crateName = "funty"; version = "2.0.0"; @@ -5953,146 +6087,61 @@ rec { }; resolvedDefaultFeatures = [ "alloc" "getrandom" "std" ]; }; - "redis" = rec { - crateName = "redis"; - version = "0.25.3"; + "redis-protocol" = rec { + crateName = "redis-protocol"; + version = "5.0.1"; edition = "2021"; - sha256 = "1z9vlyn4v2ydczyr0p5ynr47ghggzicxw2y5ybl7566095cq4wk4"; + sha256 = "17y9vm2f9sfqa4x1ai9mhn098xsk0afdac1ah5pjpfqza34vgpk5"; + libName = "redis_protocol"; + authors = [ + "Alec Embke " + ]; dependencies = [ - { - name = "async-trait"; - packageId = "async-trait"; - optional = true; - } { name = "bytes"; packageId = "bytes"; optional = true; - } - { - name = "combine"; - packageId = "combine"; usesDefaultFeatures = false; - features = [ "std" ]; } { - name = "futures-util"; - packageId = "futures-util"; + name = "bytes-utils"; + packageId = "bytes-utils"; optional = true; usesDefaultFeatures = false; } { - name = "itoa"; - packageId = "itoa"; - } - { - name = "percent-encoding"; - packageId = "percent-encoding"; - } - { - name = "pin-project-lite"; - packageId = "pin-project-lite"; - optional = true; - } - { - name = "ryu"; - packageId = "ryu"; - } - { - name = "sha1_smol"; - packageId = "sha1_smol"; - optional = true; - } - { - name = "socket2"; - packageId = "socket2"; - optional = true; + name = "cookie-factory"; + packageId = "cookie-factory"; usesDefaultFeatures = false; } { - name = "tokio"; - packageId = "tokio"; - optional = true; - features = [ "rt" "net" "time" ]; + name = "crc16"; + packageId = "crc16"; } { - name = "tokio-util"; - packageId = "tokio-util"; - optional = true; + name = "log"; + packageId = "log"; } { - name = "url"; - packageId = "url"; - } - ]; - devDependencies = [ - { - name = "socket2"; - packageId = "socket2"; - } - { - name = "tokio"; - packageId = "tokio"; - features = [ "rt" "macros" "rt-multi-thread" "time" ]; + name = "nom"; + packageId = "nom"; + usesDefaultFeatures = false; } ]; features = { - "ahash" = [ "dep:ahash" ]; - "aio" = [ "bytes" "pin-project-lite" "futures-util" "futures-util/alloc" "futures-util/sink" "tokio/io-util" "tokio-util" "tokio-util/codec" "tokio/sync" "combine/tokio" "async-trait" ]; - "arc-swap" = [ "dep:arc-swap" ]; - "async-native-tls" = [ "dep:async-native-tls" ]; - "async-std" = [ "dep:async-std" ]; - "async-std-comp" = [ "aio" "async-std" ]; - "async-std-native-tls-comp" = [ "async-std-comp" "async-native-tls" "tls-native-tls" ]; - "async-std-rustls-comp" = [ "async-std-comp" "futures-rustls" "tls-rustls" ]; - "async-std-tls-comp" = [ "async-std-native-tls-comp" ]; - "async-trait" = [ "dep:async-trait" ]; - "bigdecimal" = [ "dep:bigdecimal" ]; - "bytes" = [ "dep:bytes" ]; - "cluster" = [ "crc16" "rand" ]; - "cluster-async" = [ "cluster" "futures" "futures-util" "log" ]; - "connection-manager" = [ "arc-swap" "futures" "aio" "tokio-retry" ]; - "crc16" = [ "dep:crc16" ]; - "default" = [ "acl" "streams" "geospatial" "script" "keep-alive" ]; - "futures" = [ "dep:futures" ]; - "futures-rustls" = [ "dep:futures-rustls" ]; - "futures-util" = [ "dep:futures-util" ]; - "json" = [ "serde" "serde/derive" "serde_json" ]; - "keep-alive" = [ "socket2" ]; - "log" = [ "dep:log" ]; - "native-tls" = [ "dep:native-tls" ]; - "num-bigint" = [ "dep:num-bigint" ]; - "pin-project-lite" = [ "dep:pin-project-lite" ]; - "r2d2" = [ "dep:r2d2" ]; - "rand" = [ "dep:rand" ]; - "rust_decimal" = [ "dep:rust_decimal" ]; - "rustls" = [ "dep:rustls" ]; - "rustls-native-certs" = [ "dep:rustls-native-certs" ]; - "rustls-pemfile" = [ "dep:rustls-pemfile" ]; - "rustls-pki-types" = [ "dep:rustls-pki-types" ]; - "script" = [ "sha1_smol" ]; - "sentinel" = [ "rand" ]; - "serde" = [ "dep:serde" ]; - "serde_json" = [ "dep:serde_json" ]; - "sha1_smol" = [ "dep:sha1_smol" ]; - "socket2" = [ "dep:socket2" ]; - "tls" = [ "tls-native-tls" ]; - "tls-native-tls" = [ "native-tls" ]; - "tls-rustls" = [ "rustls" "rustls-native-certs" "rustls-pemfile" "rustls-pki-types" ]; - "tls-rustls-insecure" = [ "tls-rustls" ]; - "tls-rustls-webpki-roots" = [ "tls-rustls" "webpki-roots" ]; - "tokio" = [ "dep:tokio" ]; - "tokio-comp" = [ "aio" "tokio" "tokio/net" ]; - "tokio-native-tls" = [ "dep:tokio-native-tls" ]; - "tokio-native-tls-comp" = [ "tokio-comp" "tls-native-tls" "tokio-native-tls" ]; - "tokio-retry" = [ "dep:tokio-retry" ]; - "tokio-rustls" = [ "dep:tokio-rustls" ]; - "tokio-rustls-comp" = [ "tokio-comp" "tls-rustls" "tokio-rustls" ]; + "alloc" = [ "nom/alloc" ]; + "bytes" = [ "dep:bytes" "bytes-utils" ]; + "bytes-utils" = [ "dep:bytes-utils" ]; + "codec" = [ "tokio-util" "bytes" ]; + "default" = [ "std" "resp2" "resp3" ]; + "hashbrown" = [ "dep:hashbrown" ]; + "index-map" = [ "indexmap" ]; + "indexmap" = [ "dep:indexmap" ]; + "libm" = [ "dep:libm" ]; + "std" = [ "cookie-factory/default" "nom/default" ]; "tokio-util" = [ "dep:tokio-util" ]; - "uuid" = [ "dep:uuid" ]; - "webpki-roots" = [ "dep:webpki-roots" ]; }; - resolvedDefaultFeatures = [ "acl" "aio" "async-trait" "bytes" "default" "futures-util" "geospatial" "keep-alive" "pin-project-lite" "script" "sha1_smol" "socket2" "streams" "tokio" "tokio-comp" "tokio-util" ]; + resolvedDefaultFeatures = [ "bytes" "bytes-utils" "default" "resp2" "resp3" "std" ]; }; "redox_syscall 0.4.1" = rec { crateName = "redox_syscall"; @@ -7419,6 +7468,20 @@ rec { }; resolvedDefaultFeatures = [ "default" ]; }; + "semver" = rec { + crateName = "semver"; + version = "1.0.23"; + edition = "2018"; + sha256 = "12wqpxfflclbq4dv8sa6gchdh92ahhwn4ci1ls22wlby3h57wsb1"; + authors = [ + "David Tolnay " + ]; + features = { + "default" = [ "std" ]; + "serde" = [ "dep:serde" ]; + }; + resolvedDefaultFeatures = [ "default" "std" ]; + }; "serde" = rec { crateName = "serde"; version = "1.0.203"; @@ -7614,18 +7677,6 @@ rec { }; resolvedDefaultFeatures = [ "default" "std" ]; }; - "sha1_smol" = rec { - crateName = "sha1_smol"; - version = "1.0.0"; - edition = "2018"; - sha256 = "04nhbhvsk5ms1zbshs80iq5r1vjszp2xnm9f0ivj38q3dhc4f6mf"; - authors = [ - "Armin Ronacher " - ]; - features = { - "serde" = [ "dep:serde" ]; - }; - }; "sha2" = rec { crateName = "sha2"; version = "0.10.8"; diff --git a/chat-signaling-server/Cargo.toml b/chat-signaling-server/Cargo.toml index 691a519..edc85cf 100644 --- a/chat-signaling-server/Cargo.toml +++ b/chat-signaling-server/Cargo.toml @@ -25,7 +25,7 @@ opentelemetry = { version = "0.21", optional = true } opentelemetry_sdk = { version = "0.21", optional = true } opentelemetry-stdout = { version = "0.2", features = ["trace"], optional = true } -redis = { version = "0.25", features = ["tokio-comp"] } +fred = { version = "9", features = ["subscriber-client"] } sea-orm = { version = "0", features = [ "sqlx-sqlite", "runtime-tokio-rustls", "macros", "mock", "with-chrono", "with-json", "with-uuid" ] } tera = "1" diff --git a/chat-signaling-server/src/signaling.rs b/chat-signaling-server/src/signaling.rs index 18a8530..c30ea71 100644 --- a/chat-signaling-server/src/signaling.rs +++ b/chat-signaling-server/src/signaling.rs @@ -1,3 +1,4 @@ -pub mod grpc; mod redisexchange; -pub mod websocket; + +pub mod grpc; +pub mod websockets; diff --git a/chat-signaling-server/src/signaling/grpc.rs b/chat-signaling-server/src/signaling/grpc.rs index 4181f83..afb0e19 100644 --- a/chat-signaling-server/src/signaling/grpc.rs +++ b/chat-signaling-server/src/signaling/grpc.rs @@ -1,13 +1,12 @@ -use std::{pin::Pin, sync::Arc}; +use std::pin::Pin; + +use fred::prelude::*; use futures::stream::*; use signaling::{signaling_server::*, SignalingMessage}; use tonic::Status; -use tokio::sync::{ - mpsc::{Receiver, Sender}, - Mutex, -}; +use tokio::sync::mpsc::{Receiver, Sender}; /// A wrapper around [`tokio::sync::mpsc::Receiver`] that implements [`Stream`]. /// @@ -74,7 +73,7 @@ impl From> for ReceiverStream { } pub struct SignalingService { - pub redis: redis::Client, + pub redis: fred::types::RedisConfig, } impl SignalingService { @@ -105,8 +104,8 @@ impl SignalingService { async fn handle_with_redis_exchange( stream: tonic::Streaming, tx: Sender>, - redis: redis::aio::MultiplexedConnection, - pubsub: Arc>, + redis: fred::clients::RedisClient, + pubsub: fred::clients::SubscriberClient, ) { let (stream_tx, stream_rx) = tokio::sync::mpsc::channel(128); let (result_tx, result_rx) = tokio::sync::mpsc::channel(128); @@ -135,18 +134,19 @@ impl Signaling for SignalingService { let in_stream = request.into_inner(); let (tx, rx) = tokio::sync::mpsc::channel(128); - let redis = self.redis.get_multiplexed_tokio_connection().await.unwrap(); + let builder = fred::types::Builder::from_config(self.redis.clone()); + let redis = builder.build().unwrap(); + if let Err(err) = redis.init().await { + return Err(Status::internal(err.to_string())); + } - let pubsub = match self.redis.get_async_pubsub().await { - Ok(pubsub) => pubsub, - Err(err) => return Err(Status::unknown(err.to_string())), - }; + let pubsub = builder.build_subscriber_client().unwrap(); + if let Err(err) = pubsub.init().await { + return Err(Status::internal(err.to_string())); + } tokio::spawn(SignalingService::handle_with_redis_exchange( - in_stream, - tx, - redis, - Arc::new(Mutex::new(pubsub)), + in_stream, tx, redis, pubsub, )); let output_stream = ReceiverStream::new(rx); diff --git a/chat-signaling-server/src/signaling/redisexchange.rs b/chat-signaling-server/src/signaling/redisexchange.rs index 73ade5a..b511c91 100644 --- a/chat-signaling-server/src/signaling/redisexchange.rs +++ b/chat-signaling-server/src/signaling/redisexchange.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use redis::AsyncCommands; +use fred::prelude::*; use signaling::{IceCandidate, SdpMessage, SignalingMessage}; use tokio::sync::{ @@ -8,8 +8,6 @@ use tokio::sync::{ Mutex, }; -use futures::StreamExt; - enum RedisChannel { DiscoverRequest, DiscoverResponse(String), @@ -58,8 +56,8 @@ impl TryFrom<&str> for RedisChannel { pub async fn send_message_to_peers( mut messages_to_peers: Receiver, messages_from_peers: Sender, - mut redis: redis::aio::MultiplexedConnection, - pubsub: Arc>, + redis: fred::clients::RedisClient, + pubsub: fred::clients::SubscriberClient, ) { let mut name = String::default(); @@ -88,8 +86,8 @@ pub async fn send_message_to_peers( )); } signaling::signaling_message::Message::DiscoverRequest(()) => { - let peers = match redis - .publish::<_, _, u64>(format!("chatchat:{room}:discover"), name.clone()) + let peers: usize = match redis + .publish(format!("chatchat:{room}:discover"), name.clone()) .await { Ok(peers) => peers, @@ -98,15 +96,12 @@ pub async fn send_message_to_peers( break; } }; - tracing::info!(peers, room, sender = name, "broadcasting discover") + tracing::info!(peers, room, sender = name, "broadcasting discover"); } signaling::signaling_message::Message::DiscoverResponse(()) => { let receiver = message.receiver.unwrap(); - let peers = match redis - .publish::<_, _, u64>( - format!("chatchat:{room}:discover:{receiver}"), - name.clone(), - ) + let peers: usize = match redis + .publish(format!("chatchat:{room}:discover:{receiver}"), name.clone()) .await { Ok(peers) => peers, @@ -120,8 +115,8 @@ pub async fn send_message_to_peers( signaling::signaling_message::Message::SessionOffer(sdp) => { let msg = serde_json::to_string(&sdp).unwrap(); let receiver = message.receiver.unwrap(); - let peers = match redis - .publish::<_, _, u64>(format!("chatchat:{room}:offer:{receiver}"), msg) + let peers: usize = match redis + .publish(format!("chatchat:{room}:offer:{receiver}"), msg) .await { Ok(peers) => peers, @@ -136,8 +131,8 @@ pub async fn send_message_to_peers( signaling::signaling_message::Message::SessionAnswer(sdp) => { let msg = serde_json::to_string(&sdp).unwrap(); let receiver = message.receiver.unwrap(); - let peers = match redis - .publish::<_, _, u64>(format!("chatchat:{room}:answer:{receiver}"), msg) + let peers: usize = match redis + .publish(format!("chatchat:{room}:answer:{receiver}"), msg) .await { Ok(peers) => peers, @@ -151,8 +146,8 @@ pub async fn send_message_to_peers( signaling::signaling_message::Message::IceCandidate(candidate) => { let msg = serde_json::to_string(&candidate).unwrap(); let receiver = message.receiver.unwrap(); - let peers = match redis - .publish::<_, _, u64>(format!("chatchat:{room}:icecandidate:{receiver}"), msg) + let peers: usize = match redis + .publish(format!("chatchat:{room}:icecandidate:{receiver}"), msg) .await { Ok(peers) => peers, @@ -166,17 +161,17 @@ pub async fn send_message_to_peers( } } tracing::debug!(name, "stopped send to peer"); - let _ = closing_tx.send(()).await; + let _ = closing_tx.send(String::from("websocket exited")).await; } pub async fn receive_message_from_peers( name: String, room: String, tx: Sender, - pubsub: Arc>, - closing_rx: Arc>>, + pubsub: fred::clients::SubscriberClient, + closing_rx: Arc>>, ) { let mut closing_rx = closing_rx.lock().await; - let mut pubsub = pubsub.lock().await; + // let mut pubsub = pubsub.lock().await; let discover_request_channel = format!( "chatchat:{room}:{}", @@ -199,25 +194,19 @@ pub async fn receive_message_from_peers( RedisChannel::SessionIceCandidate(name.clone()).to_string() ); - tracing::trace!(room, name, channels=?[ - &discover_request_channel, - &discover_response_channel, - &session_offer_channel, - &session_answer_channel, - &session_ice_channel, - ], "subscribed"); - pubsub - .subscribe(&[ - discover_request_channel, - discover_response_channel, - session_offer_channel, - session_answer_channel, - session_ice_channel, - ]) - .await - .unwrap(); + let channels = [ + &discover_request_channel, + &discover_response_channel, + &session_offer_channel, + &session_answer_channel, + &session_ice_channel, + ]; - let mut messages = pubsub.on_message(); + pubsub.subscribe(&channels).await.unwrap(); + tracing::info!(?channels, "subscribed"); + + // let mut messages = pubsub.on_message(); + let mut messages = pubsub.message_rx(); tracing::info!(room, name = name.clone(), "connection ready"); let _ = tx @@ -231,34 +220,41 @@ pub async fn receive_message_from_peers( loop { tokio::select! { - _ = closing_rx.recv() => { + message = closing_rx.recv() => { + tracing::warn!(message, "exit due to closing rx"); break; }, - maybe_message = messages.next() => { - let message = if let Some(message) = maybe_message { - message - } else { - break; + maybe_message = messages.recv() => { + let message = match maybe_message { + Ok(message) => message, + Err(error) => { + tracing::warn!(?error, "failed to recv message from redis"); + break; + } }; let channel = match message - .get_channel_name() + .channel.to_string() .strip_prefix(&format!("chatchat:{}:", room.clone())) { - Some(channel) => channel, + Some(channel) => channel.to_string(), _ => continue, }; - let channel = match RedisChannel::try_from(channel) { + let channel = match RedisChannel::try_from(channel.as_str()) { Ok(channel) => channel, Err(unrecognized) => { tracing::warn!(unrecognized, "unrecognized"); continue; } }; - let payload: String = match message.get_payload() { - Ok(msg) => msg, - _ => continue, + let msg_kind = message.value.kind(); + let payload = if let fred::types::RedisValue::String(ref s) = message.value { + s.to_string() + } else { + tracing::warn!(?msg_kind, "message is not string"); + continue; }; + match channel { RedisChannel::DiscoverRequest => { if payload == name { diff --git a/chat-signaling-server/src/signaling/websocket.rs b/chat-signaling-server/src/signaling/websockets.rs similarity index 76% rename from chat-signaling-server/src/signaling/websocket.rs rename to chat-signaling-server/src/signaling/websockets.rs index 9f7e5ae..4879abc 100644 --- a/chat-signaling-server/src/signaling/websocket.rs +++ b/chat-signaling-server/src/signaling/websockets.rs @@ -1,14 +1,15 @@ use axum::{extract::State, response::IntoResponse}; +use fred::prelude::*; use crate::{signaling::redisexchange, types::AppState}; use axum::extract::ws::Message; use std::sync::Arc; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::RwLock; pub async fn handle_message_from_frontend( mut ws: axum::extract::ws::WebSocket, - redis: redis::aio::MultiplexedConnection, - pubsub: Arc>, + redis: fred::clients::RedisClient, + pubsub: fred::clients::SubscriberClient, ) { let (peer_tx, mut peer_rx) = tokio::sync::mpsc::channel(128); let (browser_tx, browser_rx) = tokio::sync::mpsc::channel(128); @@ -55,7 +56,6 @@ pub async fn handle_message_from_frontend( } } } - // browser_rx.close(); } pub async fn handle_ws_upgrade( @@ -63,17 +63,17 @@ pub async fn handle_ws_upgrade( State(state): State>>, ) -> Result { let state = state.read().await; - let redis: redis::aio::MultiplexedConnection = - match state.redis.get_multiplexed_tokio_connection().await { - Ok(redis) => redis, - Err(err) => return Err(err.to_string()), - }; - let pubsub = match state.clone().redis.get_async_pubsub().await { - Ok(pubsub) => pubsub, - Err(err) => return Err(err.to_string()), - }; + let builder = fred::types::Builder::from_config(state.redis.clone()); + let redis = builder.build().unwrap(); + if let Err(err) = redis.init().await { + return Err(err.to_string()); + } - Ok(upgrade - .on_upgrade(|ws| handle_message_from_frontend(ws, redis, Arc::new(Mutex::new(pubsub))))) + let pubsub = builder.build_subscriber_client().unwrap(); + if let Err(err) = pubsub.init().await { + return Err(err.to_string()); + } + + Ok(upgrade.on_upgrade(|ws| handle_message_from_frontend(ws, redis, pubsub))) } diff --git a/chat-signaling-server/src/types.rs b/chat-signaling-server/src/types.rs index 8fb1661..98d9fad 100644 --- a/chat-signaling-server/src/types.rs +++ b/chat-signaling-server/src/types.rs @@ -8,5 +8,5 @@ pub struct PageContext { #[derive(Clone)] pub struct AppState { pub templates: tera::Tera, - pub redis: redis::Client, + pub redis: fred::types::RedisConfig, } diff --git a/chat-signaling-server/src/web/routes.rs b/chat-signaling-server/src/web/routes.rs index 8c3dfd9..101ab77 100644 --- a/chat-signaling-server/src/web/routes.rs +++ b/chat-signaling-server/src/web/routes.rs @@ -57,13 +57,14 @@ pub struct Config { impl Default for Config { fn default() -> Self { Self { - redis_addr: String::from("redis://127.0.0.1") + redis_addr: String::from("redis://127.0.0.1"), } } } - -pub fn make_service(config: Option) -> Result>, Box> { +pub fn make_service( + config: Option, +) -> Result>, Box> { let config = config.unwrap_or_default(); let templates = load_templates()?; @@ -72,7 +73,7 @@ pub fn make_service(config: Option) -> Result #[cfg(feature = "serve-static")] let router = router.route_service("/static/*path", staticfiles::StaticFiles::strip("/static/")); - let redis = redis::Client::open(config.redis_addr)?; + let redis = fred::types::RedisConfig::from_url(&config.redis_addr)?; let grpc_service = tonic_web::enable(signaling::signaling_server::SignalingServer::new( GrpcSignalingService { @@ -87,7 +88,7 @@ pub fn make_service(config: Option) -> Result "/signaling.Signaling/*rpc", axum::routing::any_service(grpc_service.clone()), ) - .route("/ws", get(crate::signaling::websocket::handle_ws_upgrade)) + .route("/ws", get(crate::signaling::websockets::handle_ws_upgrade)) .with_state(Arc::new(RwLock::new(crate::types::AppState { templates: templates, redis,