diff --git a/harsh-client/src/main.rs b/harsh-client/src/main.rs index 9d03c4b..992c31d 100644 --- a/harsh-client/src/main.rs +++ b/harsh-client/src/main.rs @@ -1,6 +1,9 @@ +use std::time::Duration; + use tokio::{ io::{stdin, AsyncBufReadExt, AsyncWriteExt, BufReader}, net::TcpStream, + time, }; #[tokio::main] @@ -15,6 +18,7 @@ async fn main() { let mut line = String::new(); reader.read_line(&mut line).await.unwrap(); println!("received '{line}'"); + time::sleep(Duration::from_millis(100)).await; } }); @@ -43,10 +47,11 @@ async fn main() { } mod commands { + use harsh_common::ClientRequest; pub enum Command { Help, - Request(harsh_common::ClientRequest), + Request(ClientRequest), } pub fn parse(input: &str) -> Option { @@ -56,7 +61,20 @@ mod commands { "ping" => { let rest = parts.collect::>(); let content = rest.join(" "); - harsh_common::ClientRequest::new_ping(content) + ClientRequest::new_ping(content) + } + "chanls" => ClientRequest::new_channel_list(), + "chanadd" => { + let name = parts.next()?; + ClientRequest::new_channel_create(name) + } + "chandel" => { + let id = parts.next()?.parse().ok()?; + ClientRequest::new_channel_delete(id) + } + "changetname" => { + let id = parts.next()?.parse().ok()?; + ClientRequest::new_channel_get_name(id) } _ => return None, }; @@ -64,6 +82,20 @@ mod commands { Some(Command::Request(command)) } + pub const CMDS: &'static [Description] = &[ + // all commands + Description::new("help", &[], "returns a help message"), + Description::new( + "ping", + &["content"], + "sends a ping with the specified content", + ), + Description::new("chanls", &[], "list channels"), + Description::new("chanadd", &["name"], "creates a new channel"), + Description::new("chandel", &["id"], "delete a channel by its id"), + Description::new("changetname", &["id"], "get a channel's name"), + ]; + pub fn smart_split(input: &str) -> Vec { let input = input.trim(); let mut result = Vec::new(); @@ -131,20 +163,10 @@ mod commands { ) -> Self { Self { name, desc, params } } - - pub const ALL: &'static [Self] = &[ - // all commands - Self::new("help", &[], "returns a help message"), - Self::new( - "ping", - &["content"], - "sends a ping with the specified content", - ), - ]; } pub fn help() { - for &Description { name, params, desc } in Description::ALL { + for &Description { name, params, desc } in CMDS { let mut usage = params.iter().map(|s| s.to_string()).collect::>(); usage.insert(0, name.to_string()); let usage = usage.join(" "); diff --git a/harsh-common/src/client.rs b/harsh-common/src/client.rs index 5bc5c0f..7b82ea3 100644 --- a/harsh-common/src/client.rs +++ b/harsh-common/src/client.rs @@ -1,29 +1,77 @@ -#[derive(Debug)] -pub enum ClientRequest { - Ping(Ping), -} - #[derive(Debug)] pub struct Ping { pub content: String, } +#[derive(Debug)] +pub struct ChannelList {} + +#[derive(Debug)] +pub struct ChannelCreate { + pub name: String, +} + +#[derive(Debug)] +pub struct ChannelDelete { + pub channel_id: u64, +} + +#[derive(Debug)] +pub struct ChannelGetName { + pub channel_id: u64, +} + +#[derive(Debug)] +pub enum ClientRequest { + Ping(Ping), + ChannelList(ChannelList), + ChannelCreate(ChannelCreate), + ChannelDelete(ChannelDelete), + ChannelGetName(ChannelGetName), +} + impl ClientRequest { pub fn new_ping(content: String) -> Self { Self::Ping(Ping { content }) } + pub fn new_channel_list() -> Self { + Self::ChannelList(ChannelList {}) + } + + pub fn new_channel_create(name: String) -> Self { + Self::ChannelCreate(ChannelCreate { name }) + } + + pub fn new_channel_delete(channel_id: u64) -> Self { + Self::ChannelDelete(ChannelDelete { channel_id }) + } + + pub fn new_channel_get_name(channel_id: u64) -> Self { + Self::ChannelGetName(ChannelGetName { channel_id }) + } + pub fn try_parse(line: &str) -> Option { + use repr::Command::*; let command: repr::Command = serde_json::from_str(line).ok()?; let mapped = match command { - repr::Command::ping { content } => Self::Ping(Ping { content }), + ping { content } => Self::Ping(Ping { content }), + channel_list {} => Self::ChannelList(ChannelList {}), + channel_create { name } => Self::ChannelCreate(ChannelCreate { name }), + channel_delete { channel_id } => Self::ChannelDelete(ChannelDelete { channel_id }), + channel_get_name { channel_id } => Self::ChannelGetName(ChannelGetName { channel_id }), }; Some(mapped) } pub fn serialize(self) -> String { + use repr::Command::*; let mapped = match self { - Self::Ping(Ping { content }) => repr::Command::ping { content }, + Self::Ping(Ping { content }) => ping { content }, + Self::ChannelList(ChannelList {}) => repr::Command::channel_list {}, + Self::ChannelCreate(ChannelCreate { name }) => channel_create { name }, + Self::ChannelDelete(ChannelDelete { channel_id }) => channel_delete { channel_id }, + Self::ChannelGetName(ChannelGetName { channel_id }) => channel_get_name { channel_id }, }; serde_json::to_string(&mapped).unwrap() } @@ -38,5 +86,9 @@ mod repr { #[serde(tag = "type")] pub enum Command { ping { content: String }, + channel_list {}, + channel_create { name: String }, + channel_delete { channel_id: u64 }, + channel_get_name { channel_id: u64 }, } } diff --git a/harsh-common/src/lib.rs b/harsh-common/src/lib.rs index bd36c97..38e31f5 100644 --- a/harsh-common/src/lib.rs +++ b/harsh-common/src/lib.rs @@ -7,8 +7,8 @@ mod tests { } } -pub use client::{ClientRequest, Ping}; -mod client; +pub use client::ClientRequest; +pub mod client; -pub use server::{Pong, ServerRequest}; -mod server; +pub use server::ServerRequest; +pub mod server; diff --git a/harsh-common/src/server.rs b/harsh-common/src/server.rs index 6a7041b..d27e4a4 100644 --- a/harsh-common/src/server.rs +++ b/harsh-common/src/server.rs @@ -1,27 +1,51 @@ -pub enum ServerRequest { - Pong(Pong), -} - pub struct Pong { pub content: String, } +pub struct ChannelList { + pub channels: Vec, +} + +pub struct ChannelGetName { + pub id: u64, + pub name: Option, +} +pub enum ServerRequest { + Pong(Pong), + ChannelList(ChannelList), + ChannelGetName(ChannelGetName), +} + impl ServerRequest { pub fn new_pong(content: String) -> Self { Self::Pong(Pong { content }) } + pub fn new_channel_list(channels: Vec) -> Self { + Self::ChannelList(ChannelList { channels }) + } + + pub fn new_channel_get_name(id: u64, name: Option) -> Self { + Self::ChannelGetName(ChannelGetName { name, id }) + } + pub fn try_parse(line: &str) -> Option { + use repr::Command::*; let command: repr::Command = serde_json::from_str(line).ok()?; let mapped = match command { - repr::Command::pong { content } => Self::Pong(Pong { content }), + pong { content } => Self::Pong(Pong { content }), + channel_list { channels } => Self::ChannelList(ChannelList { channels }), + channel_get_name { id, name } => Self::ChannelGetName(ChannelGetName { id, name }), }; Some(mapped) } pub fn serialize(self) -> String { + use repr::Command::*; let mapped = match self { - Self::Pong(Pong { content }) => repr::Command::pong { content }, + Self::Pong(Pong { content }) => pong { content }, + Self::ChannelList(ChannelList { channels }) => channel_list { channels }, + Self::ChannelGetName(ChannelGetName { id, name }) => channel_get_name { id, name }, }; serde_json::to_string(&mapped).unwrap() } @@ -36,5 +60,7 @@ mod repr { #[serde(tag = "type")] pub enum Command { pong { content: String }, + channel_list { channels: Vec }, + channel_get_name { id: u64, name: Option }, } } diff --git a/harsh-server/src/gateway.rs b/harsh-server/src/gateway.rs index 49a7a0c..8d85444 100644 --- a/harsh-server/src/gateway.rs +++ b/harsh-server/src/gateway.rs @@ -1,9 +1,7 @@ -use harsh_common::{Ping, Pong, ServerRequest}; +use harsh_common::{client, server, ClientRequest, ServerRequest}; use telecomande::{Processor, Remote}; -use harsh_common::ClientRequest; - -use crate::{sessions, Addr, SessionProc, StorageProc}; +use crate::{Addr, Id, SessionCmd, SessionProc, StorageCmd, StorageProc}; #[derive(Debug)] pub enum GatewayCmd { @@ -12,26 +10,50 @@ pub enum GatewayCmd { } pub struct GatewayProc { - client_handler: Remote, + sessions: Remote, storage: Remote, } impl GatewayProc { - pub fn new(client_handler: Remote, storage: Remote) -> Self { - Self { - client_handler, - storage, - } + pub fn new(sessions: Remote, storage: Remote) -> Self { + Self { sessions, storage } } async fn handle_request(&mut self, address: Addr, request: ClientRequest) { match request { - ClientRequest::Ping(Ping { content }) => { + ClientRequest::Ping(client::Ping { content }) => { println!("received ping! '{content:?}'"); - let response = ServerRequest::Pong(Pong { content }); - let content = response.serialize(); - self.client_handler - .send(sessions::SessionCmd::Send(address, content)) + let request = ServerRequest::Pong(server::Pong { content }); + self.sessions + .send(SessionCmd::new_send(address, request)) + .unwrap(); + } + ClientRequest::ChannelList(client::ChannelList {}) => { + let (cmd, rec) = StorageCmd::new_channel_list(); + self.storage.send(cmd).unwrap(); + let channels = rec.await.unwrap().iter().map(|id| id.to_u64()).collect(); + let request = ServerRequest::new_channel_list(channels); + self.sessions + .send(SessionCmd::new_send(address, request)) + .unwrap(); + } + ClientRequest::ChannelCreate(client::ChannelCreate { name }) => { + let (cmd, rec) = StorageCmd::new_channel_create(name); + let _id = rec.await.unwrap(); + self.storage.send(cmd).unwrap(); + } + ClientRequest::ChannelDelete(client::ChannelDelete { channel_id }) => { + self.storage + .send(StorageCmd::ChannelDelete(Id::from_u64(channel_id))) + .unwrap(); + } + ClientRequest::ChannelGetName(client::ChannelGetName { channel_id }) => { + let (cmd, rec) = StorageCmd::new_channel_get_name(Id::from_u64(channel_id)); + self.storage.send(cmd).unwrap(); + let name = rec.await.unwrap(); + let request = ServerRequest::new_channel_get_name(channel_id, name); + self.sessions + .send(SessionCmd::new_send(address, request)) .unwrap(); } } @@ -46,14 +68,15 @@ impl Processor for GatewayProc { match command { GatewayCmd::Request(address, request) => { if let Some(request) = ClientRequest::try_parse(&request) { + println!("[session/info] received command '{request:?}'"); self.handle_request(address, request).await; } else { - println!("failed to parse command"); + println!("[session/warn] failed to parse command"); } } GatewayCmd::ClosedConnection(address) => self - .client_handler - .send(sessions::SessionCmd::RemoveSession(address)) + .sessions + .send(SessionCmd::RemoveSession(address)) .unwrap(), } Ok(()) diff --git a/harsh-server/src/main.rs b/harsh-server/src/main.rs index d292071..4da1dcc 100644 --- a/harsh-server/src/main.rs +++ b/harsh-server/src/main.rs @@ -4,20 +4,22 @@ use tokio::net::TcpListener; #[tokio::main] async fn main() { - println!("starting server ..."); - let client_handler = SimpleExecutor::new(SessionProc::default()).spawn(); - let storage = SimpleExecutor::new(StorageProc::new("./db")).spawn(); + println!("[main/info] starting server ..."); + let sessions = SimpleExecutor::new(SessionProc::default()).spawn(); + println!("[main/info] spawned sessions"); + let storage = SimpleExecutor::new(StorageProc::new("/tmp/db.test")).spawn(); + println!("[main/info] spawned storage"); let gateway = - SimpleExecutor::new(GatewayProc::new(client_handler.remote(), storage.remote())).spawn(); - println!("spawned gateway"); + SimpleExecutor::new(GatewayProc::new(sessions.remote(), storage.remote())).spawn(); + println!("[main/info] spawned gateway"); let listener = TcpListener::bind("localhost:8080").await.unwrap(); - println!("listening on 'localhost:8080' ..."); + println!("[main/info] listening on 'localhost:8080' ..."); - let client_handler = client_handler.remote(); + let client_handler = sessions.remote(); loop { let (stream, address) = listener.accept().await.unwrap(); - println!("new connection from '{address:?}'"); + println!("[main/info] new connection from '{address:?}'"); client_handler .send(sessions::SessionCmd::AddSession( @@ -38,5 +40,5 @@ pub use gateway::{GatewayCmd, GatewayProc}; mod sessions; pub use sessions::{SessionCmd, SessionProc}; -pub use storage::{StorageCmd, StorageProc}; mod storage; +pub use storage::{StorageCmd, StorageProc}; diff --git a/harsh-server/src/sessions.rs b/harsh-server/src/sessions.rs index be8b354..cb27a7a 100644 --- a/harsh-server/src/sessions.rs +++ b/harsh-server/src/sessions.rs @@ -1,5 +1,6 @@ use std::{collections::HashMap, net::SocketAddr}; +use harsh_common::ServerRequest; use telecomande::{Processor, Remote}; use tokio::{ io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, @@ -18,6 +19,13 @@ pub enum SessionCmd { Send(Addr, String), } +impl SessionCmd { + pub fn new_send(address: Addr, request: ServerRequest) -> Self { + let content = request.serialize(); + Self::Send(address, content) + } +} + #[derive(Debug, Default)] pub struct SessionProc { clients: HashMap)>, @@ -69,7 +77,7 @@ async fn session(address: Addr, reader: OwnedReadHalf, remote: Remote), ChannelDelete(Id), - ChannelGetAll(Sender>), + ChannelList(Sender>), ChannelGetName(Id, Sender>), } impl StorageCmd { - fn new_channel_create(name: impl ToString) -> (Self, Receiver) { + pub fn new_channel_create(name: impl ToString) -> (Self, Receiver) { let (s, r) = oneshot::channel(); (Self::ChannelCreate(name.to_string(), s), r) } - fn new_channel_delete(id: Id) -> Self { + pub fn new_channel_delete(id: Id) -> Self { Self::ChannelDelete(id) } - fn new_channel_get_all() -> (Self, Receiver>) { + pub fn new_channel_list() -> (Self, Receiver>) { let (s, r) = oneshot::channel(); - (Self::ChannelGetAll(s), r) + (Self::ChannelList(s), r) } - fn new_channel_get_name(id: Id) -> (Self, Receiver>) { + pub fn new_channel_get_name(id: Id) -> (Self, Receiver>) { let (s, r) = oneshot::channel(); (Self::ChannelGetName(id, s), r) } @@ -93,7 +93,7 @@ impl Processor for StorageProc { self.set(format!("/channels/{id}"), item); sender.send(id).unwrap(); } - StorageCmd::ChannelGetAll(sender) => { + StorageCmd::ChannelList(sender) => { let results = self.list("/channels/"); sender.send(results).unwrap(); } @@ -170,7 +170,7 @@ async fn test_channels() { let id = rec.await.unwrap(); // query all - let (cmd, rec) = StorageCmd::new_channel_get_all(); + let (cmd, rec) = StorageCmd::new_channel_list(); remote.send(cmd).unwrap(); let result = rec.await.unwrap(); assert_eq!(result.len(), 1); @@ -189,7 +189,7 @@ async fn test_channels() { let id2 = rec.await.unwrap(); // query all - let (cmd, rec) = StorageCmd::new_channel_get_all(); + let (cmd, rec) = StorageCmd::new_channel_list(); remote.send(cmd).unwrap(); let result = rec.await.unwrap(); assert_eq!(result.len(), 2); diff --git a/harsh-server/src/utils.rs b/harsh-server/src/utils.rs index 1e45be1..997bb6b 100644 --- a/harsh-server/src/utils.rs +++ b/harsh-server/src/utils.rs @@ -27,6 +27,14 @@ impl Id { let inner: u64 = input.parse().ok()?; Some(Self(inner)) } + + pub fn from_u64(input: u64) -> Self { + Self(input) + } + + pub fn to_u64(&self) -> u64 { + self.0 + } } #[test] diff --git a/watch-server.sh b/watch-server.sh old mode 100644 new mode 100755