one commit contains all impl

This commit is contained in:
2022-10-21 15:09:22 +08:00
commit e871ec2b29
17 changed files with 2117 additions and 0 deletions

1
vanityd/src/consts.rs Normal file
View File

@ -0,0 +1 @@
pub(crate) const REDIS_PUBSUB_PATTERN_RESULT: &str = "vanity:result:";

31
vanityd/src/main.rs Normal file
View File

@ -0,0 +1,31 @@
pub(crate) mod consts;
pub(crate) mod server;
pub(crate) mod worker;
use std::net::ToSocketAddrs;
use libvanity::{
vanity::vanity_service_server::VanityServiceServer, worker::worker_server::WorkerServer,
};
pub use libvanity;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::DEBUG)
.init();
let redis = redis::Client::open("redis://127.0.0.1:6379")?;
let vanity_server = server::VanityService::new(redis.clone());
let worker_server = worker::WorkerService::new(redis.clone());
tonic::transport::Server::builder()
.add_service(VanityServiceServer::new(vanity_server))
.add_service(WorkerServer::new(worker_server))
.serve("127.0.0.1:8877".to_socket_addrs()?.next().unwrap())
.await?;
Ok(())
}

56
vanityd/src/server.rs Normal file
View File

@ -0,0 +1,56 @@
use libvanity::vanity::{vanity_service_server as service_server, VanityRequest, VanityResponse};
use tokio_stream::StreamExt;
use tonic::async_trait;
use crate::consts::*;
use redis::AsyncCommands;
pub(crate) struct VanityService {
counter: std::sync::atomic::AtomicU64,
redis: redis::Client,
}
impl VanityService {
pub fn new(redis: redis::Client) -> Self {
Self {
redis,
counter: std::sync::atomic::AtomicU64::new(0),
}
}
}
#[async_trait]
impl service_server::VanityService for VanityService {
async fn request_vanity(
&self,
req: tonic::Request<VanityRequest>,
) -> Result<tonic::Response<VanityResponse>, tonic::Status> {
let id = self
.counter
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let pattern = req.into_inner().pattern;
let mut redis = self.redis.get_async_connection().await.expect("msg");
redis
.publish::<&str, String, ()>("vanity:new", format!("{}|{}", id, pattern))
.await
.expect("should not fail");
let mut pubsub = redis.into_pubsub();
pubsub
.subscribe(format!("{}:{}", REDIS_PUBSUB_PATTERN_RESULT, id))
.await
.expect("msg");
let mut msgs = pubsub.on_message();
loop {
if let Some(msg) = msgs.next().await {
let mut response = VanityResponse::default();
response.result = String::from_utf8_lossy(msg.get_payload_bytes()).to_string();
return Ok(tonic::Response::new(response));
}
}
}
}

146
vanityd/src/worker.rs Normal file
View File

@ -0,0 +1,146 @@
use futures::StreamExt;
use tonic::async_trait;
use redis::AsyncCommands;
use crate::consts::*;
pub struct WorkerService {
redis: redis::Client,
}
impl WorkerService {
pub fn new(r: redis::Client) -> Self {
Self { redis: r }
}
}
#[async_trait]
impl libvanity::worker::worker_server::Worker for WorkerService {
type PollStream =
tokio_stream::wrappers::ReceiverStream<tonic::Result<libvanity::worker::Request>>;
async fn poll(
&self,
result: tonic::Request<tonic::Streaming<libvanity::worker::Result>>,
) -> Result<tonic::Response<Self::PollStream>, tonic::Status> {
tracing::trace!("entered polling method");
let (tx, rx) = tokio::sync::mpsc::channel(1);
tracing::debug!("getting redis connection");
let redis_new_request = match self.redis.get_async_connection().await {
Ok(conn) => conn,
Err(err) => return Err(tonic::Status::unavailable(err.to_string())),
};
// handle new requests or results
let new_tx = tx.clone();
tokio::spawn(async move {
tracing::debug!("entering puller closure");
let tx = new_tx.clone();
let mut pubsub = redis_new_request.into_pubsub();
match pubsub.subscribe("vanity:new").await {
Err(err) => {
tx.send(Err(tonic::Status::unavailable(err.to_string())))
.await
.ok();
}
_ => {}
}
match pubsub
.psubscribe(format!("{}*", REDIS_PUBSUB_PATTERN_RESULT))
.await
{
Err(err) => {
tx.send(Err(tonic::Status::unavailable(err.to_string())))
.await
.ok();
}
_ => {}
}
let mut msgs = pubsub.on_message();
while let Some(msg) = msgs.next().await {
let mut request = libvanity::worker::Request::default();
if let Ok(pattern) = msg.get_channel::<String>() {
// result,
tracing::info!("pattern = {}", pattern);
if let Some(key) = pattern.strip_prefix(&REDIS_PUBSUB_PATTERN_RESULT) {
tracing::info!("key = {}", key);
if let Ok(id) = u64::from_str_radix(key.trim_start_matches(":"), 10) {
request.new_or_done =
Some(libvanity::worker::request::NewOrDone::Done(id));
tracing::info!("done -> {}", id.clone());
}
} else {
// new pattern request
let payload: String = msg.get_payload().unwrap();
if let Some((idstr, pattern)) = payload.rsplit_once('|') {
if let Ok(id) = u64::from_str_radix(idstr, 10) {
let mut new_request = libvanity::worker::NewRequest::default();
new_request.id = id.clone();
new_request.pattern = String::from(pattern);
request.new_or_done =
Some(libvanity::worker::request::NewOrDone::New(
// New
new_request,
));
tracing::info!("new -> {}", id.clone());
}
}
}
}
if request.new_or_done == None {
continue;
}
tracing::info!("sending {:?}", request.new_or_done);
if let Err(err) = tx.send(Ok(request)).await {
tx.send(Err(tonic::Status::from_error(Box::new(err))))
.await
.ok();
};
}
});
let mut in_stream = result.into_inner();
let redis = self.redis.clone();
tokio::spawn(async move {
tracing::debug!("entering trans closure");
let redis = redis.clone();
while let Some(result) = in_stream.next().await {
match result {
Ok(result) => {
let mut redis = match redis.get_async_connection().await {
Ok(redis) => redis,
Err(err) => {
tx.send(Err(tonic::Status::unavailable(err.to_string())))
.await
.ok();
continue;
}
};
tracing::info!("request done: {}", result.id);
let _: u64 = redis
.publish(
format!("{}:{}", REDIS_PUBSUB_PATTERN_RESULT, result.id),
result.result,
)
.await
.unwrap();
}
Err(err) => {
match tx.send(Err(err)).await {
Ok(_) => (),
Err(_err) => break, // response was droped
}
}
}
}
});
tracing::debug!("returned a response stream.( working in background )");
Ok(tonic::Response::new(
tokio_stream::wrappers::ReceiverStream::new(rx),
))
}
}