From 05e5344b9290a3f579af7346506b6c00829dca05 Mon Sep 17 00:00:00 2001 From: guochao Date: Fri, 21 Oct 2022 17:51:47 +0800 Subject: [PATCH] add proper command line options and logging --- Cargo.lock | 2 ++ vanity-worker/src/main.rs | 44 ++++++++++++++++++++++++++++++++++----- vanity/Cargo.toml | 2 ++ vanity/src/main.rs | 10 +++++++++ vanityd/src/main.rs | 24 +++++++++++++++++---- vanityd/src/server.rs | 27 +++++++++++++++++++----- 6 files changed, 95 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d26732b..4068080 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1396,6 +1396,8 @@ dependencies = [ "tokio", "tokio-stream", "tonic", + "tracing", + "tracing-subscriber", ] [[package]] diff --git a/vanity-worker/src/main.rs b/vanity-worker/src/main.rs index a766ee7..44b6ac4 100644 --- a/vanity-worker/src/main.rs +++ b/vanity-worker/src/main.rs @@ -1,13 +1,41 @@ -use std::collections::HashMap; +use std::{collections::HashMap, str::FromStr}; use rand_core::OsRng; use tokio::sync::RwLock; use x25519_dalek::{PublicKey, StaticSecret}; +use clap::Parser; + +#[derive(clap::Parser, Debug)] +struct Args { + #[clap( + short = 'c', + long = "connect", + help = "server address to connect to", + default_value = "grpc://127.0.0.1:8877" + )] + endpoint: String, + + #[clap(short, long, default_value = "1")] + workers: u8, + + #[clap(short, long, default_value = "INFO")] + log_level: String, +} + #[tokio::main] async fn main() -> Result<(), Box> { - let client = - libvanity::worker::worker_client::WorkerClient::connect("grpc://127.0.0.1:8877").await?; + let args = Args::parse(); + if args.workers < 1 { + return Ok(()); + } + + let level = tracing::Level::from_str(&args.log_level)?; + tracing_subscriber::FmtSubscriber::builder() + .with_max_level(level) + .init(); + + let client = libvanity::worker::worker_client::WorkerClient::connect(args.endpoint).await?; let (tx, rx) = tokio::sync::mpsc::channel(128); @@ -29,10 +57,12 @@ async fn main() -> Result<(), Box> { match new_or_done { New(request) => { let mut writer = requests.write().await; + tracing::debug!("new work received: {}", request.id); writer.insert(request.id, request.pattern); } Done(response) => { let mut writer = requests.write().await; + tracing::debug!("stop signal received: {}", response); writer.remove(&response); } } @@ -40,14 +70,17 @@ async fn main() -> Result<(), Box> { } }); - let worker_threads = Vec::from_iter(1..10).into_iter().map(|id| { + let worker_threads = Vec::from_iter(0..args.workers).into_iter().map(|id| { let requests = requests.clone(); let tx = tx.clone(); - let _id = id.clone(); + let id = id.clone(); + tracing::trace!("spawning worker {}", id); tokio::spawn(async move { + tracing::trace!("worker {} started", id); loop { let requests = requests.read().await.clone(); if requests.is_empty() { + tracing::trace!("no work found on {}", id); tokio::time::sleep(std::time::Duration::from_millis(10)).await; continue; } @@ -60,6 +93,7 @@ async fn main() -> Result<(), Box> { let matcher = matcher.clone(); let public_b64 = base64::encode(public.as_bytes()); if matcher.is_match(&public_b64) { + tracing::trace!("match found on {}: {}", id, public_b64); let mut result = libvanity::worker::Result::default(); result.id = id.clone(); result.result = base64::encode(private.to_bytes()); diff --git a/vanity/Cargo.toml b/vanity/Cargo.toml index 81b72b9..eb148c9 100644 --- a/vanity/Cargo.toml +++ b/vanity/Cargo.toml @@ -12,5 +12,7 @@ tonic = "0.8" tokio = {version = "1", features = ["full"]} tokio-stream = { version = "0.1.11", features = ["net"] } +tracing-subscriber = { version = "0.3.16", features = ["json"] } +tracing = { version = "0.1.37", features = ["log"] } libvanity = {path = "../libvanity"} diff --git a/vanity/src/main.rs b/vanity/src/main.rs index a6ee758..6a266e2 100644 --- a/vanity/src/main.rs +++ b/vanity/src/main.rs @@ -1,3 +1,5 @@ +use std::str::FromStr; + use clap::Parser; use libvanity::vanity::VanityRequest; @@ -11,12 +13,20 @@ struct Args { #[clap(help = "pattern of public key")] pattern: String, + + #[clap(short, long, default_value = "WARN")] + log_level: String, } #[tokio::main] async fn main() -> Result<(), Box> { let args = Args::parse(); + let level = tracing::Level::from_str(&args.log_level)?; + tracing_subscriber::FmtSubscriber::builder() + .with_max_level(level) + .init(); + let mut client = libvanity::vanity::vanity_service_client::VanityServiceClient::connect(args.endpoint) .await?; diff --git a/vanityd/src/main.rs b/vanityd/src/main.rs index 9eb1ced..6ff5988 100644 --- a/vanityd/src/main.rs +++ b/vanityd/src/main.rs @@ -2,21 +2,37 @@ pub(crate) mod consts; pub(crate) mod server; pub(crate) mod worker; -use std::net::ToSocketAddrs; +use std::{net::ToSocketAddrs, str::FromStr}; +use clap::Parser; use libvanity::{ vanity::vanity_service_server::VanityServiceServer, worker::worker_server::WorkerServer, }; pub use libvanity; +#[derive(Parser)] +struct Args { + #[clap(short, long, default_value = "127.0.0.1:8877")] + bind_address: String, + + #[clap(short, long, default_value = "redis://127.0.0.1:6379")] + redis_address: String, + + #[clap(short, long, default_value = "INFO")] + log_level: String, +} + #[tokio::main] async fn main() -> Result<(), Box> { + let args = Args::parse(); + + let level = tracing::Level::from_str(&args.log_level)?; tracing_subscriber::FmtSubscriber::builder() - .with_max_level(tracing::Level::DEBUG) + .with_max_level(level) .init(); - let redis = redis::Client::open("redis://127.0.0.1:6379")?; + let redis = redis::Client::open(args.redis_address)?; let vanity_server = server::VanityService::new(redis.clone()); let worker_server = worker::WorkerService::new(redis.clone()); @@ -24,7 +40,7 @@ async fn main() -> Result<(), Box> { tonic::transport::Server::builder() .add_service(VanityServiceServer::new(vanity_server)) .add_service(WorkerServer::new(worker_server)) - .serve("127.0.0.1:8877".to_socket_addrs()?.next().unwrap()) + .serve(args.bind_address.to_socket_addrs()?.next().unwrap()) .await?; Ok(()) diff --git a/vanityd/src/server.rs b/vanityd/src/server.rs index 5ab3ac0..8607dc2 100644 --- a/vanityd/src/server.rs +++ b/vanityd/src/server.rs @@ -31,17 +31,33 @@ impl service_server::VanityService for VanityService { .fetch_add(1, std::sync::atomic::Ordering::Relaxed); let pattern = req.into_inner().pattern; - let mut redis = self.redis.get_async_connection().await.expect("msg"); - redis + tracing::trace!("entered request method: pattern={}", pattern); + + let mut redis = match self.redis.get_async_connection().await { + Ok(redis) => redis, + Err(err) => { + tracing::warn!("failed to get redis connection: {}", err); + return Err(tonic::Status::unavailable(err.to_string())); + } + }; + if let Err(err) = redis .publish::<&str, String, ()>("vanity:new", format!("{}|{}", id, pattern)) .await - .expect("should not fail"); + { + tracing::warn!("failed to publish new requests: {}", err); + return Err(tonic::Status::unavailable(err.to_string())); + } + tracing::debug!("new requests published on new clients: {}", pattern); let mut pubsub = redis.into_pubsub(); - pubsub + if let Err(err) = pubsub .subscribe(format!("{}:{}", REDIS_PUBSUB_PATTERN_RESULT, id)) .await - .expect("msg"); + { + tracing::warn!("failed to subscribe on result channel: {}", err); + return Err(tonic::Status::unavailable(err.to_string())); + }; + tracing::debug!("listening on result channel: {}", pattern); let mut msgs = pubsub.on_message(); @@ -49,6 +65,7 @@ impl service_server::VanityService for VanityService { if let Some(msg) = msgs.next().await { let mut response = VanityResponse::default(); response.result = String::from_utf8_lossy(msg.get_payload_bytes()).to_string(); + tracing::trace!("returned from request method: pattern={}", pattern); return Ok(tonic::Response::new(response)); } }