add proper command line options and logging
This commit is contained in:
parent
e871ec2b29
commit
05e5344b92
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -1396,6 +1396,8 @@ dependencies = [
|
|||||||
"tokio",
|
"tokio",
|
||||||
"tokio-stream",
|
"tokio-stream",
|
||||||
"tonic",
|
"tonic",
|
||||||
|
"tracing",
|
||||||
|
"tracing-subscriber",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -1,13 +1,41 @@
|
|||||||
use std::collections::HashMap;
|
use std::{collections::HashMap, str::FromStr};
|
||||||
|
|
||||||
use rand_core::OsRng;
|
use rand_core::OsRng;
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
use x25519_dalek::{PublicKey, StaticSecret};
|
use x25519_dalek::{PublicKey, StaticSecret};
|
||||||
|
|
||||||
|
use clap::Parser;
|
||||||
|
|
||||||
|
#[derive(clap::Parser, Debug)]
|
||||||
|
struct Args {
|
||||||
|
#[clap(
|
||||||
|
short = 'c',
|
||||||
|
long = "connect",
|
||||||
|
help = "server address to connect to",
|
||||||
|
default_value = "grpc://127.0.0.1:8877"
|
||||||
|
)]
|
||||||
|
endpoint: String,
|
||||||
|
|
||||||
|
#[clap(short, long, default_value = "1")]
|
||||||
|
workers: u8,
|
||||||
|
|
||||||
|
#[clap(short, long, default_value = "INFO")]
|
||||||
|
log_level: String,
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
let client =
|
let args = Args::parse();
|
||||||
libvanity::worker::worker_client::WorkerClient::connect("grpc://127.0.0.1:8877").await?;
|
if args.workers < 1 {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let level = tracing::Level::from_str(&args.log_level)?;
|
||||||
|
tracing_subscriber::FmtSubscriber::builder()
|
||||||
|
.with_max_level(level)
|
||||||
|
.init();
|
||||||
|
|
||||||
|
let client = libvanity::worker::worker_client::WorkerClient::connect(args.endpoint).await?;
|
||||||
|
|
||||||
let (tx, rx) = tokio::sync::mpsc::channel(128);
|
let (tx, rx) = tokio::sync::mpsc::channel(128);
|
||||||
|
|
||||||
@ -29,10 +57,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
match new_or_done {
|
match new_or_done {
|
||||||
New(request) => {
|
New(request) => {
|
||||||
let mut writer = requests.write().await;
|
let mut writer = requests.write().await;
|
||||||
|
tracing::debug!("new work received: {}", request.id);
|
||||||
writer.insert(request.id, request.pattern);
|
writer.insert(request.id, request.pattern);
|
||||||
}
|
}
|
||||||
Done(response) => {
|
Done(response) => {
|
||||||
let mut writer = requests.write().await;
|
let mut writer = requests.write().await;
|
||||||
|
tracing::debug!("stop signal received: {}", response);
|
||||||
writer.remove(&response);
|
writer.remove(&response);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -40,14 +70,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
let worker_threads = Vec::from_iter(1..10).into_iter().map(|id| {
|
let worker_threads = Vec::from_iter(0..args.workers).into_iter().map(|id| {
|
||||||
let requests = requests.clone();
|
let requests = requests.clone();
|
||||||
let tx = tx.clone();
|
let tx = tx.clone();
|
||||||
let _id = id.clone();
|
let id = id.clone();
|
||||||
|
tracing::trace!("spawning worker {}", id);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
tracing::trace!("worker {} started", id);
|
||||||
loop {
|
loop {
|
||||||
let requests = requests.read().await.clone();
|
let requests = requests.read().await.clone();
|
||||||
if requests.is_empty() {
|
if requests.is_empty() {
|
||||||
|
tracing::trace!("no work found on {}", id);
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
|
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@ -60,6 +93,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
let matcher = matcher.clone();
|
let matcher = matcher.clone();
|
||||||
let public_b64 = base64::encode(public.as_bytes());
|
let public_b64 = base64::encode(public.as_bytes());
|
||||||
if matcher.is_match(&public_b64) {
|
if matcher.is_match(&public_b64) {
|
||||||
|
tracing::trace!("match found on {}: {}", id, public_b64);
|
||||||
let mut result = libvanity::worker::Result::default();
|
let mut result = libvanity::worker::Result::default();
|
||||||
result.id = id.clone();
|
result.id = id.clone();
|
||||||
result.result = base64::encode(private.to_bytes());
|
result.result = base64::encode(private.to_bytes());
|
||||||
|
@ -12,5 +12,7 @@ tonic = "0.8"
|
|||||||
tokio = {version = "1", features = ["full"]}
|
tokio = {version = "1", features = ["full"]}
|
||||||
tokio-stream = { version = "0.1.11", features = ["net"] }
|
tokio-stream = { version = "0.1.11", features = ["net"] }
|
||||||
|
|
||||||
|
tracing-subscriber = { version = "0.3.16", features = ["json"] }
|
||||||
|
tracing = { version = "0.1.37", features = ["log"] }
|
||||||
|
|
||||||
libvanity = {path = "../libvanity"}
|
libvanity = {path = "../libvanity"}
|
||||||
|
@ -1,3 +1,5 @@
|
|||||||
|
use std::str::FromStr;
|
||||||
|
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use libvanity::vanity::VanityRequest;
|
use libvanity::vanity::VanityRequest;
|
||||||
|
|
||||||
@ -11,12 +13,20 @@ struct Args {
|
|||||||
|
|
||||||
#[clap(help = "pattern of public key")]
|
#[clap(help = "pattern of public key")]
|
||||||
pattern: String,
|
pattern: String,
|
||||||
|
|
||||||
|
#[clap(short, long, default_value = "WARN")]
|
||||||
|
log_level: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
let args = Args::parse();
|
let args = Args::parse();
|
||||||
|
|
||||||
|
let level = tracing::Level::from_str(&args.log_level)?;
|
||||||
|
tracing_subscriber::FmtSubscriber::builder()
|
||||||
|
.with_max_level(level)
|
||||||
|
.init();
|
||||||
|
|
||||||
let mut client =
|
let mut client =
|
||||||
libvanity::vanity::vanity_service_client::VanityServiceClient::connect(args.endpoint)
|
libvanity::vanity::vanity_service_client::VanityServiceClient::connect(args.endpoint)
|
||||||
.await?;
|
.await?;
|
||||||
|
@ -2,21 +2,37 @@ pub(crate) mod consts;
|
|||||||
pub(crate) mod server;
|
pub(crate) mod server;
|
||||||
pub(crate) mod worker;
|
pub(crate) mod worker;
|
||||||
|
|
||||||
use std::net::ToSocketAddrs;
|
use std::{net::ToSocketAddrs, str::FromStr};
|
||||||
|
|
||||||
|
use clap::Parser;
|
||||||
use libvanity::{
|
use libvanity::{
|
||||||
vanity::vanity_service_server::VanityServiceServer, worker::worker_server::WorkerServer,
|
vanity::vanity_service_server::VanityServiceServer, worker::worker_server::WorkerServer,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub use libvanity;
|
pub use libvanity;
|
||||||
|
|
||||||
|
#[derive(Parser)]
|
||||||
|
struct Args {
|
||||||
|
#[clap(short, long, default_value = "127.0.0.1:8877")]
|
||||||
|
bind_address: String,
|
||||||
|
|
||||||
|
#[clap(short, long, default_value = "redis://127.0.0.1:6379")]
|
||||||
|
redis_address: String,
|
||||||
|
|
||||||
|
#[clap(short, long, default_value = "INFO")]
|
||||||
|
log_level: String,
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let args = Args::parse();
|
||||||
|
|
||||||
|
let level = tracing::Level::from_str(&args.log_level)?;
|
||||||
tracing_subscriber::FmtSubscriber::builder()
|
tracing_subscriber::FmtSubscriber::builder()
|
||||||
.with_max_level(tracing::Level::DEBUG)
|
.with_max_level(level)
|
||||||
.init();
|
.init();
|
||||||
|
|
||||||
let redis = redis::Client::open("redis://127.0.0.1:6379")?;
|
let redis = redis::Client::open(args.redis_address)?;
|
||||||
|
|
||||||
let vanity_server = server::VanityService::new(redis.clone());
|
let vanity_server = server::VanityService::new(redis.clone());
|
||||||
let worker_server = worker::WorkerService::new(redis.clone());
|
let worker_server = worker::WorkerService::new(redis.clone());
|
||||||
@ -24,7 +40,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
tonic::transport::Server::builder()
|
tonic::transport::Server::builder()
|
||||||
.add_service(VanityServiceServer::new(vanity_server))
|
.add_service(VanityServiceServer::new(vanity_server))
|
||||||
.add_service(WorkerServer::new(worker_server))
|
.add_service(WorkerServer::new(worker_server))
|
||||||
.serve("127.0.0.1:8877".to_socket_addrs()?.next().unwrap())
|
.serve(args.bind_address.to_socket_addrs()?.next().unwrap())
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -31,17 +31,33 @@ impl service_server::VanityService for VanityService {
|
|||||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||||
let pattern = req.into_inner().pattern;
|
let pattern = req.into_inner().pattern;
|
||||||
|
|
||||||
let mut redis = self.redis.get_async_connection().await.expect("msg");
|
tracing::trace!("entered request method: pattern={}", pattern);
|
||||||
redis
|
|
||||||
|
let mut redis = match self.redis.get_async_connection().await {
|
||||||
|
Ok(redis) => redis,
|
||||||
|
Err(err) => {
|
||||||
|
tracing::warn!("failed to get redis connection: {}", err);
|
||||||
|
return Err(tonic::Status::unavailable(err.to_string()));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if let Err(err) = redis
|
||||||
.publish::<&str, String, ()>("vanity:new", format!("{}|{}", id, pattern))
|
.publish::<&str, String, ()>("vanity:new", format!("{}|{}", id, pattern))
|
||||||
.await
|
.await
|
||||||
.expect("should not fail");
|
{
|
||||||
|
tracing::warn!("failed to publish new requests: {}", err);
|
||||||
|
return Err(tonic::Status::unavailable(err.to_string()));
|
||||||
|
}
|
||||||
|
tracing::debug!("new requests published on new clients: {}", pattern);
|
||||||
|
|
||||||
let mut pubsub = redis.into_pubsub();
|
let mut pubsub = redis.into_pubsub();
|
||||||
pubsub
|
if let Err(err) = pubsub
|
||||||
.subscribe(format!("{}:{}", REDIS_PUBSUB_PATTERN_RESULT, id))
|
.subscribe(format!("{}:{}", REDIS_PUBSUB_PATTERN_RESULT, id))
|
||||||
.await
|
.await
|
||||||
.expect("msg");
|
{
|
||||||
|
tracing::warn!("failed to subscribe on result channel: {}", err);
|
||||||
|
return Err(tonic::Status::unavailable(err.to_string()));
|
||||||
|
};
|
||||||
|
tracing::debug!("listening on result channel: {}", pattern);
|
||||||
|
|
||||||
let mut msgs = pubsub.on_message();
|
let mut msgs = pubsub.on_message();
|
||||||
|
|
||||||
@ -49,6 +65,7 @@ impl service_server::VanityService for VanityService {
|
|||||||
if let Some(msg) = msgs.next().await {
|
if let Some(msg) = msgs.next().await {
|
||||||
let mut response = VanityResponse::default();
|
let mut response = VanityResponse::default();
|
||||||
response.result = String::from_utf8_lossy(msg.get_payload_bytes()).to_string();
|
response.result = String::from_utf8_lossy(msg.get_payload_bytes()).to_string();
|
||||||
|
tracing::trace!("returned from request method: pattern={}", pattern);
|
||||||
return Ok(tonic::Response::new(response));
|
return Ok(tonic::Response::new(response));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user