From 695ac6daa262abd6d782d0b77061dd1fa1fc536e Mon Sep 17 00:00:00 2001 From: mb Date: Mon, 22 Aug 2022 15:54:36 +0300 Subject: [PATCH] implemented notifications and messages --- .gitignore | 5 +- harsh-client/src/commands.rs | 166 ++++++++++++++++++ harsh-client/src/main.rs | 166 +++--------------- harsh-common/src/client.rs | 190 ++++++++++++++++++-- harsh-common/src/server.rs | 233 +++++++++++++++++++++++- harsh-server/src/gateway.rs | 167 ++++++++++++++---- harsh-server/src/main.rs | 12 +- harsh-server/src/sessions.rs | 33 +++- harsh-server/src/storage.rs | 273 +++++++++++++++++------------ harsh-server/src/storage/models.rs | 23 ++- harsh-server/src/storage/tests.rs | 64 +++++++ harsh-server/src/utils.rs | 6 + 12 files changed, 1023 insertions(+), 315 deletions(-) create mode 100644 harsh-client/src/commands.rs create mode 100644 harsh-server/src/storage/tests.rs diff --git a/.gitignore b/.gitignore index ffcc601..cbe6f8d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ -/db -/target \ No newline at end of file + +*.test +target \ No newline at end of file diff --git a/harsh-client/src/commands.rs b/harsh-client/src/commands.rs new file mode 100644 index 0000000..1d0f98f --- /dev/null +++ b/harsh-client/src/commands.rs @@ -0,0 +1,166 @@ +use harsh_common::ClientRequest; + +pub enum Command { + Help, + Request(ClientRequest), +} + +pub fn parse(input: &str) -> Option { + let mut parts = smart_split(input).into_iter(); + let command = match parts.next()?.as_str() { + "help" => return Some(Command::Help), + "ping" => { + let rest = parts.collect::>(); + let content = rest.join(" "); + ClientRequest::new_ping(content) + } + "chanls" => ClientRequest::new_channel_list(), + "chanadd" => { + let name = parts.next()?; + ClientRequest::new_channel_create(name) + } + "chandel" => { + let id = parts.next()?.parse().ok()?; + ClientRequest::new_channel_delete(id) + } + "changname" => { + let id = parts.next()?.parse().ok()?; + ClientRequest::new_channel_get_name(id) + } + "chansname" => { + let id = parts.next()?.parse().ok()?; + let name = parts.next()?; + ClientRequest::new_channel_set_name(id, name) + } + "msgls" => { + let channel_id = parts.next()?.parse().ok()?; + ClientRequest::new_message_list(channel_id) + } + "msgadd" => { + let channel_id = parts.next()?.parse().ok()?; + let content = parts.next()?; + ClientRequest::new_message_create(channel_id, content) + } + "msgdel" => { + let channel_id = parts.next()?.parse().ok()?; + let id = parts.next()?.parse().ok()?; + ClientRequest::new_message_delete(channel_id, id) + } + "msggcont" => { + let channel_id = parts.next()?.parse().ok()?; + let id = parts.next()?.parse().ok()?; + ClientRequest::new_message_get_content(channel_id, id) + } + "msgscont" => { + let channel_id = parts.next()?.parse().ok()?; + let id = parts.next()?.parse().ok()?; + let content = parts.next()?; + ClientRequest::new_message_set_content(channel_id, id, content) + } + _ => return None, + }; + + Some(Command::Request(command)) +} + +pub const CMDS: &'static [Description] = &[ + // all commands + Description::new("help", &[], "returns a help message"), + Description::new( + "ping", + &["content"], + "sends a ping with the specified content", + ), + Description::new("chanls", &[], "list channels"), + Description::new("chanadd", &["name"], "creates a new channel"), + Description::new("chandel", &["id"], "delete a channel by its id"), + Description::new("changname", &["id"], "get a channel's name"), + Description::new("chansname", &["id", "name"], "set a channel's name"), + Description::new("msgls", &["channel_id"], "list messages"), + Description::new("msgadd", &["channel_id", "content"], "create a message"), + Description::new("msgdel", &["channel_id", "id"], "delete a message"), + Description::new("msggcont", &["channel_id", "id"], "get a message's content"), + Description::new( + "msgscont", + &["channel_id", "id", "content"], + "set a message's content", + ), +]; + +pub fn smart_split(input: &str) -> Vec { + let input = input.trim(); + let mut result = Vec::new(); + + let mut capturing = false; + let mut ignoring = false; + let mut current = String::new(); + for char in input.chars() { + let char: char = char; + if ignoring { + current.push(char); + ignoring = false; + continue; + } + + match char { + '\\' => { + ignoring = true; + } + '"' => { + capturing = !capturing; + } + ' ' if !capturing => { + result.push(current); + current = String::new(); + } + _ => current.push(char), + } + } + result.push(current); + result +} + +#[test] +fn test_smart_split() { + assert_eq!( + smart_split("hello world"), + vec!["hello".to_string(), "world".to_string()] + ); + assert_eq!( + smart_split(r#""lorem ipsum" "dolor amit""#), + vec!["lorem ipsum".to_string(), "dolor amit".to_string()] + ); + assert_eq!( + smart_split(r#"lorem "ipsum do"lor "amit""#), + vec![ + "lorem".to_string(), + "ipsum dolor".to_string(), + "amit".to_string() + ] + ); +} + +pub struct Description { + name: &'static str, + params: &'static [&'static str], + desc: &'static str, +} + +impl Description { + pub const fn new( + name: &'static str, + params: &'static [&'static str], + desc: &'static str, + ) -> Self { + Self { name, desc, params } + } +} + +pub fn help() { + for &Description { name, params, desc } in CMDS { + let mut usage = params.iter().map(|s| s.to_string()).collect::>(); + usage.insert(0, name.to_string()); + let usage = usage.join(" "); + println!("{name}:\n\tusage:\t\t{usage}\n\tdescription:\t{desc}"); + } +} diff --git a/harsh-client/src/main.rs b/harsh-client/src/main.rs index 992c31d..73d4442 100644 --- a/harsh-client/src/main.rs +++ b/harsh-client/src/main.rs @@ -1,25 +1,38 @@ -use std::time::Duration; +use std::{ + io::{stdout, Write}, + process::exit, +}; +use harsh_common::ServerRequest; use tokio::{ io::{stdin, AsyncBufReadExt, AsyncWriteExt, BufReader}, net::TcpStream, - time, }; +const ADDRESS: &'static str = "localhost:42069"; + #[tokio::main] async fn main() { - println!("starting client ..."); - let stream = TcpStream::connect("localhost:8080").await.unwrap(); - println!("connected to 'localhost:8080'"); + println!("[main/info] starting client ..."); + let stream = TcpStream::connect(ADDRESS).await.unwrap(); + println!("[main/info] connected to '{ADDRESS}'"); let (reader, writer) = stream.into_split(); tokio::spawn(async { let mut reader = BufReader::new(reader); loop { let mut line = String::new(); - reader.read_line(&mut line).await.unwrap(); - println!("received '{line}'"); - time::sleep(Duration::from_millis(100)).await; + match reader.read_line(&mut line).await { + Ok(0) => { + break; + } + _ => (), + } + if let Some(parsed) = ServerRequest::try_parse(&line) { + println!("[main/info] received '{parsed:?}'"); + } } + println!("[main/info] connection closed, goodbye."); + exit(0); }); let input_loop = tokio::spawn(async { @@ -27,14 +40,16 @@ async fn main() { let mut writer = writer; loop { + print!("$> "); + stdout().lock().flush().unwrap(); let mut line = String::new(); input.read_line(&mut line).await.unwrap(); let input = commands::parse(&line); match input { - None => println!("failed to parse command"), + None => println!("[main/warn] failed to parse command"), Some(commands::Command::Help) => commands::help(), Some(commands::Command::Request(cmd)) => { - println!("sending.."); + println!("[main/info] sending.."); writer.write_all(cmd.serialize().as_bytes()).await.unwrap(); writer.write_all(b"\n").await.unwrap(); } @@ -42,135 +57,8 @@ async fn main() { } }); - println!("awaiting input ..."); + println!("[main/info] awaiting input ..."); input_loop.await.unwrap(); } -mod commands { - use harsh_common::ClientRequest; - - pub enum Command { - Help, - Request(ClientRequest), - } - - pub fn parse(input: &str) -> Option { - let mut parts = smart_split(input).into_iter(); - let command = match parts.next()?.as_str() { - "help" => return Some(Command::Help), - "ping" => { - let rest = parts.collect::>(); - let content = rest.join(" "); - ClientRequest::new_ping(content) - } - "chanls" => ClientRequest::new_channel_list(), - "chanadd" => { - let name = parts.next()?; - ClientRequest::new_channel_create(name) - } - "chandel" => { - let id = parts.next()?.parse().ok()?; - ClientRequest::new_channel_delete(id) - } - "changetname" => { - let id = parts.next()?.parse().ok()?; - ClientRequest::new_channel_get_name(id) - } - _ => return None, - }; - - Some(Command::Request(command)) - } - - pub const CMDS: &'static [Description] = &[ - // all commands - Description::new("help", &[], "returns a help message"), - Description::new( - "ping", - &["content"], - "sends a ping with the specified content", - ), - Description::new("chanls", &[], "list channels"), - Description::new("chanadd", &["name"], "creates a new channel"), - Description::new("chandel", &["id"], "delete a channel by its id"), - Description::new("changetname", &["id"], "get a channel's name"), - ]; - - pub fn smart_split(input: &str) -> Vec { - let input = input.trim(); - let mut result = Vec::new(); - - let mut capturing = false; - let mut ignoring = false; - let mut current = String::new(); - for char in input.chars() { - let char: char = char; - if ignoring { - current.push(char); - ignoring = false; - continue; - } - - match char { - '\\' => { - ignoring = true; - } - '"' => { - capturing = !capturing; - } - ' ' if !capturing => { - result.push(current); - current = String::new(); - } - _ => current.push(char), - } - } - result.push(current); - result - } - - #[test] - fn test_smart_split() { - assert_eq!( - smart_split("hello world"), - vec!["hello".to_string(), "world".to_string()] - ); - assert_eq!( - smart_split(r#""lorem ipsum" "dolor amit""#), - vec!["lorem ipsum".to_string(), "dolor amit".to_string()] - ); - assert_eq!( - smart_split(r#"lorem "ipsum do"lor "amit""#), - vec![ - "lorem".to_string(), - "ipsum dolor".to_string(), - "amit".to_string() - ] - ); - } - - pub struct Description { - name: &'static str, - params: &'static [&'static str], - desc: &'static str, - } - - impl Description { - pub const fn new( - name: &'static str, - params: &'static [&'static str], - desc: &'static str, - ) -> Self { - Self { name, desc, params } - } - } - - pub fn help() { - for &Description { name, params, desc } in CMDS { - let mut usage = params.iter().map(|s| s.to_string()).collect::>(); - usage.insert(0, name.to_string()); - let usage = usage.join(" "); - println!("{name}:\n\tusage:\n\t\t{usage}\n\n\tdescription:\n\t\t{desc}\n"); - } - } -} +mod commands; diff --git a/harsh-common/src/client.rs b/harsh-common/src/client.rs index 7b82ea3..173e2c4 100644 --- a/harsh-common/src/client.rs +++ b/harsh-common/src/client.rs @@ -13,13 +13,45 @@ pub struct ChannelCreate { #[derive(Debug)] pub struct ChannelDelete { - pub channel_id: u64, + pub id: u64, } #[derive(Debug)] pub struct ChannelGetName { + pub id: u64, +} + +#[derive(Debug)] +pub struct ChannelSetName { + pub id: u64, + pub name: String, +} + +#[derive(Debug)] +pub struct MessageList { pub channel_id: u64, } +#[derive(Debug)] +pub struct MessageCreate { + pub channel_id: u64, + pub content: String, +} +#[derive(Debug)] +pub struct MessageDelete { + pub channel_id: u64, + pub id: u64, +} +#[derive(Debug)] +pub struct MessageGetContent { + pub channel_id: u64, + pub id: u64, +} +#[derive(Debug)] +pub struct MessageSetContent { + pub channel_id: u64, + pub id: u64, + pub content: String, +} #[derive(Debug)] pub enum ClientRequest { @@ -28,6 +60,12 @@ pub enum ClientRequest { ChannelCreate(ChannelCreate), ChannelDelete(ChannelDelete), ChannelGetName(ChannelGetName), + ChannelSetName(ChannelSetName), + MessageList(MessageList), + MessageCreate(MessageCreate), + MessageDelete(MessageDelete), + MessageGetContent(MessageGetContent), + MessageSetContent(MessageSetContent), } impl ClientRequest { @@ -44,11 +82,41 @@ impl ClientRequest { } pub fn new_channel_delete(channel_id: u64) -> Self { - Self::ChannelDelete(ChannelDelete { channel_id }) + Self::ChannelDelete(ChannelDelete { id: channel_id }) } pub fn new_channel_get_name(channel_id: u64) -> Self { - Self::ChannelGetName(ChannelGetName { channel_id }) + Self::ChannelGetName(ChannelGetName { id: channel_id }) + } + + pub fn new_channel_set_name(channel_id: u64, name: String) -> Self { + Self::ChannelSetName(ChannelSetName { + id: channel_id, + name, + }) + } + + pub fn new_message_list(channel_id: u64) -> Self { + Self::MessageList(MessageList { channel_id }) + } + pub fn new_message_create(channel_id: u64, content: String) -> Self { + Self::MessageCreate(MessageCreate { + channel_id, + content, + }) + } + pub fn new_message_delete(channel_id: u64, id: u64) -> Self { + Self::MessageDelete(MessageDelete { channel_id, id }) + } + pub fn new_message_get_content(channel_id: u64, id: u64) -> Self { + Self::MessageGetContent(MessageGetContent { channel_id, id }) + } + pub fn new_message_set_content(channel_id: u64, id: u64, content: String) -> Self { + Self::MessageSetContent(MessageSetContent { + channel_id, + id, + content, + }) } pub fn try_parse(line: &str) -> Option { @@ -58,8 +126,42 @@ impl ClientRequest { ping { content } => Self::Ping(Ping { content }), channel_list {} => Self::ChannelList(ChannelList {}), channel_create { name } => Self::ChannelCreate(ChannelCreate { name }), - channel_delete { channel_id } => Self::ChannelDelete(ChannelDelete { channel_id }), - channel_get_name { channel_id } => Self::ChannelGetName(ChannelGetName { channel_id }), + channel_delete { id: channel_id } => { + Self::ChannelDelete(ChannelDelete { id: channel_id }) + } + channel_get_name { id: channel_id } => { + Self::ChannelGetName(ChannelGetName { id: channel_id }) + } + channel_set_name { + id: channel_id, + name, + } => Self::ChannelSetName(ChannelSetName { + id: channel_id, + name, + }), + message_list { channel_id } => Self::MessageList(MessageList { channel_id }), + message_create { + channel_id, + content, + } => Self::MessageCreate(MessageCreate { + channel_id, + content, + }), + message_delete { id, channel_id } => { + Self::MessageDelete(MessageDelete { id, channel_id }) + } + message_get_content { id, channel_id } => { + Self::MessageGetContent(MessageGetContent { id, channel_id }) + } + message_set_content { + id, + channel_id, + content, + } => Self::MessageSetContent(MessageSetContent { + content, + id, + channel_id, + }), }; Some(mapped) } @@ -70,8 +172,42 @@ impl ClientRequest { Self::Ping(Ping { content }) => ping { content }, Self::ChannelList(ChannelList {}) => repr::Command::channel_list {}, Self::ChannelCreate(ChannelCreate { name }) => channel_create { name }, - Self::ChannelDelete(ChannelDelete { channel_id }) => channel_delete { channel_id }, - Self::ChannelGetName(ChannelGetName { channel_id }) => channel_get_name { channel_id }, + Self::ChannelDelete(ChannelDelete { id: channel_id }) => { + channel_delete { id: channel_id } + } + Self::ChannelGetName(ChannelGetName { id: channel_id }) => { + channel_get_name { id: channel_id } + } + Self::ChannelSetName(ChannelSetName { + id: channel_id, + name, + }) => channel_set_name { + id: channel_id, + name, + }, + Self::MessageList(MessageList { channel_id }) => message_list { channel_id }, + Self::MessageCreate(MessageCreate { + channel_id, + content, + }) => message_create { + channel_id, + content, + }, + Self::MessageDelete(MessageDelete { id, channel_id }) => { + message_delete { id, channel_id } + } + Self::MessageGetContent(MessageGetContent { id, channel_id }) => { + message_get_content { id, channel_id } + } + Self::MessageSetContent(MessageSetContent { + content, + id, + channel_id, + }) => message_set_content { + id, + channel_id, + content, + }, }; serde_json::to_string(&mapped).unwrap() } @@ -85,10 +221,42 @@ mod repr { #[derive(Serialize, Deserialize)] #[serde(tag = "type")] pub enum Command { - ping { content: String }, + ping { + content: String, + }, channel_list {}, - channel_create { name: String }, - channel_delete { channel_id: u64 }, - channel_get_name { channel_id: u64 }, + channel_create { + name: String, + }, + channel_delete { + id: u64, + }, + channel_get_name { + id: u64, + }, + channel_set_name { + id: u64, + name: String, + }, + message_list { + channel_id: u64, + }, + message_create { + channel_id: u64, + content: String, + }, + message_delete { + channel_id: u64, + id: u64, + }, + message_get_content { + channel_id: u64, + id: u64, + }, + message_set_content { + channel_id: u64, + id: u64, + content: String, + }, } } diff --git a/harsh-common/src/server.rs b/harsh-common/src/server.rs index d27e4a4..dee59b2 100644 --- a/harsh-common/src/server.rs +++ b/harsh-common/src/server.rs @@ -1,19 +1,78 @@ +#[derive(Debug)] pub struct Pong { pub content: String, } +#[derive(Debug)] pub struct ChannelList { pub channels: Vec, } +#[derive(Debug)] pub struct ChannelGetName { pub id: u64, pub name: Option, } + +#[derive(Debug)] +pub struct ChannelCreate { + pub id: u64, + pub name: String, +} + +#[derive(Debug)] +pub struct ChannelDelete { + pub id: u64, +} + +#[derive(Debug)] +pub struct ChannelSetName { + pub id: u64, + pub name: String, +} + +#[derive(Debug)] +pub struct MessageList { + pub channel_id: u64, + pub messages: Vec, +} +#[derive(Debug)] +pub struct MessageCreate { + pub channel_id: u64, + pub id: u64, + pub content: String, +} +#[derive(Debug)] +pub struct MessageDelete { + pub channel_id: u64, + pub id: u64, +} +#[derive(Debug)] +pub struct MessageGetContent { + pub channel_id: u64, + pub id: u64, + pub content: Option, +} +#[derive(Debug)] +pub struct MessageSetContent { + pub channel_id: u64, + pub id: u64, + pub content: String, +} + +#[derive(Debug)] pub enum ServerRequest { Pong(Pong), + ChannelCreate(ChannelCreate), + ChannelDelete(ChannelDelete), ChannelList(ChannelList), ChannelGetName(ChannelGetName), + ChannelSetName(ChannelSetName), + MessageList(MessageList), + MessageCreate(MessageCreate), + MessageDelete(MessageDelete), + MessageGetContent(MessageGetContent), + MessageSetContent(MessageSetContent), } impl ServerRequest { @@ -29,6 +88,52 @@ impl ServerRequest { Self::ChannelGetName(ChannelGetName { name, id }) } + pub fn new_channel_create(id: u64, name: String) -> Self { + Self::ChannelCreate(ChannelCreate { id, name }) + } + + pub fn new_channel_delete(id: u64) -> Self { + Self::ChannelDelete(ChannelDelete { id }) + } + + pub fn new_channel_set_name(id: u64, name: String) -> Self { + Self::ChannelSetName(ChannelSetName { id, name }) + } + + pub fn new_message_list(channel_id: u64, messages: Vec) -> Self { + Self::MessageList(MessageList { + channel_id, + messages, + }) + } + + pub fn new_message_create(channel_id: u64, id: u64, content: String) -> Self { + Self::MessageCreate(MessageCreate { + channel_id, + content, + id, + }) + } + pub fn new_message_delete(channel_id: u64, id: u64) -> Self { + Self::MessageDelete(MessageDelete { channel_id, id }) + } + + pub fn new_message_get_content(channel_id: u64, id: u64, content: Option) -> Self { + Self::MessageGetContent(MessageGetContent { + channel_id, + content, + id, + }) + } + + pub fn new_message_set_content(channel_id: u64, id: u64, content: String) -> Self { + Self::MessageSetContent(MessageSetContent { + channel_id, + content, + id, + }) + } + pub fn try_parse(line: &str) -> Option { use repr::Command::*; let command: repr::Command = serde_json::from_str(line).ok()?; @@ -36,6 +141,46 @@ impl ServerRequest { pong { content } => Self::Pong(Pong { content }), channel_list { channels } => Self::ChannelList(ChannelList { channels }), channel_get_name { id, name } => Self::ChannelGetName(ChannelGetName { id, name }), + channel_create { id, name } => Self::ChannelCreate(ChannelCreate { id, name }), + channel_set_name { id, name } => Self::ChannelSetName(ChannelSetName { id, name }), + channel_delete { id } => Self::ChannelDelete(ChannelDelete { id }), + message_list { + channel_id, + messages, + } => Self::MessageList(MessageList { + channel_id, + messages, + }), + message_create { + channel_id, + id, + content, + } => Self::MessageCreate(MessageCreate { + channel_id, + content, + id, + }), + message_delete { channel_id, id } => { + Self::MessageDelete(MessageDelete { channel_id, id }) + } + message_get_content { + channel_id, + id, + content, + } => Self::MessageGetContent(MessageGetContent { + channel_id, + content, + id, + }), + message_set_content { + channel_id, + id, + content, + } => Self::MessageSetContent(MessageSetContent { + channel_id, + content, + id, + }), }; Some(mapped) } @@ -46,6 +191,47 @@ impl ServerRequest { Self::Pong(Pong { content }) => pong { content }, Self::ChannelList(ChannelList { channels }) => channel_list { channels }, Self::ChannelGetName(ChannelGetName { id, name }) => channel_get_name { id, name }, + Self::ChannelCreate(ChannelCreate { id, name }) => channel_create { id, name }, + Self::ChannelSetName(ChannelSetName { id, name }) => channel_set_name { id, name }, + Self::ChannelDelete(ChannelDelete { id }) => channel_delete { id }, + + Self::MessageList(MessageList { + channel_id, + messages, + }) => message_list { + channel_id, + messages, + }, + Self::MessageCreate(MessageCreate { + channel_id, + content, + id, + }) => message_create { + channel_id, + id, + content, + }, + Self::MessageDelete(MessageDelete { channel_id, id }) => { + message_delete { channel_id, id } + } + Self::MessageGetContent(MessageGetContent { + channel_id, + content, + id, + }) => message_get_content { + channel_id, + id, + content, + }, + Self::MessageSetContent(MessageSetContent { + channel_id, + content, + id, + }) => message_set_content { + channel_id, + id, + content, + }, }; serde_json::to_string(&mapped).unwrap() } @@ -59,8 +245,49 @@ mod repr { #[derive(Serialize, Deserialize)] #[serde(tag = "type")] pub enum Command { - pong { content: String }, - channel_list { channels: Vec }, - channel_get_name { id: u64, name: Option }, + pong { + content: String, + }, + channel_list { + channels: Vec, + }, + channel_get_name { + id: u64, + name: Option, + }, + channel_create { + id: u64, + name: String, + }, + channel_delete { + id: u64, + }, + channel_set_name { + id: u64, + name: String, + }, + message_list { + channel_id: u64, + messages: Vec, + }, + message_create { + channel_id: u64, + id: u64, + content: String, + }, + message_delete { + channel_id: u64, + id: u64, + }, + message_get_content { + channel_id: u64, + id: u64, + content: Option, + }, + message_set_content { + channel_id: u64, + id: u64, + content: String, + }, } } diff --git a/harsh-server/src/gateway.rs b/harsh-server/src/gateway.rs index 8d85444..d78f5a4 100644 --- a/harsh-server/src/gateway.rs +++ b/harsh-server/src/gateway.rs @@ -15,48 +15,139 @@ pub struct GatewayProc { } impl GatewayProc { + async fn handle_request(&mut self, address: Addr, request: ClientRequest) { + match request { + ClientRequest::Ping(client::Ping { content }) => { + self.on_ping(content, address); + } + + ClientRequest::ChannelCreate(client::ChannelCreate { name }) => { + self.on_channel_create(name).await; + } + + ClientRequest::ChannelDelete(client::ChannelDelete { id }) => { + self.on_channel_delete(id); + } + + ClientRequest::ChannelList(client::ChannelList {}) => { + self.on_channel_list(address).await; + } + + ClientRequest::ChannelGetName(client::ChannelGetName { id }) => { + self.on_channel_get_name(id, address).await; + } + + ClientRequest::ChannelSetName(client::ChannelSetName { id, name }) => { + self.on_channel_set_name(id, name); + } + + ClientRequest::MessageList(client::MessageList { channel_id }) => { + let (cmd, rec) = StorageCmd::new_message_list(channel_id.into()); + self.storage.send(cmd).unwrap(); + let messages = rec.await.unwrap().iter().map(Id::to_u64).collect(); + let request = ServerRequest::new_message_list(channel_id, messages); + let command = SessionCmd::new_send(address, request); + self.sessions.send(command).unwrap(); + } + ClientRequest::MessageCreate(client::MessageCreate { + channel_id, + content, + }) => { + let (cmd, rec) = StorageCmd::new_message_create(channel_id.into(), content.clone()); + self.storage.send(cmd).unwrap(); + let id = rec.await.unwrap(); + let request = ServerRequest::new_message_create(channel_id, id.to_u64(), content); + let command = SessionCmd::new_broadcast(request); + self.sessions.send(command).unwrap(); + } + + ClientRequest::MessageDelete(client::MessageDelete { id, channel_id }) => { + let command = StorageCmd::new_message_delete(channel_id.into(), id.into()); + self.storage.send(command).unwrap(); + let request = ServerRequest::new_message_delete(channel_id, id); + let command = SessionCmd::new_broadcast(request); + self.sessions.send(command).unwrap(); + } + + ClientRequest::MessageGetContent(client::MessageGetContent { id, channel_id }) => { + let (cmd, rec) = StorageCmd::new_message_get_content(channel_id.into(), id.into()); + self.storage.send(cmd).unwrap(); + let request = + ServerRequest::new_message_get_content(channel_id, id, rec.await.unwrap()); + let command = SessionCmd::new_send(address, request); + self.sessions.send(command).unwrap(); + } + + ClientRequest::MessageSetContent(client::MessageSetContent { + content, + id, + channel_id, + }) => { + let command = StorageCmd::new_message_set_content( + channel_id.into(), + id.into(), + content.clone(), + ); + self.storage.send(command).unwrap(); + let request = ServerRequest::new_message_set_content(channel_id, id, content); + let command = SessionCmd::new_broadcast(request); + self.sessions.send(command).unwrap(); + } + } + } + pub fn new(sessions: Remote, storage: Remote) -> Self { Self { sessions, storage } } - async fn handle_request(&mut self, address: Addr, request: ClientRequest) { - match request { - ClientRequest::Ping(client::Ping { content }) => { - println!("received ping! '{content:?}'"); - let request = ServerRequest::Pong(server::Pong { content }); - self.sessions - .send(SessionCmd::new_send(address, request)) - .unwrap(); - } - ClientRequest::ChannelList(client::ChannelList {}) => { - let (cmd, rec) = StorageCmd::new_channel_list(); - self.storage.send(cmd).unwrap(); - let channels = rec.await.unwrap().iter().map(|id| id.to_u64()).collect(); - let request = ServerRequest::new_channel_list(channels); - self.sessions - .send(SessionCmd::new_send(address, request)) - .unwrap(); - } - ClientRequest::ChannelCreate(client::ChannelCreate { name }) => { - let (cmd, rec) = StorageCmd::new_channel_create(name); - let _id = rec.await.unwrap(); - self.storage.send(cmd).unwrap(); - } - ClientRequest::ChannelDelete(client::ChannelDelete { channel_id }) => { - self.storage - .send(StorageCmd::ChannelDelete(Id::from_u64(channel_id))) - .unwrap(); - } - ClientRequest::ChannelGetName(client::ChannelGetName { channel_id }) => { - let (cmd, rec) = StorageCmd::new_channel_get_name(Id::from_u64(channel_id)); - self.storage.send(cmd).unwrap(); - let name = rec.await.unwrap(); - let request = ServerRequest::new_channel_get_name(channel_id, name); - self.sessions - .send(SessionCmd::new_send(address, request)) - .unwrap(); - } - } + fn on_ping(&mut self, content: String, address: Addr) { + println!("[gateway/PING] '{content:?}'"); + let request = ServerRequest::Pong(server::Pong { content }); + let command = SessionCmd::new_send(address, request); + self.sessions.send(command).unwrap(); + } + + async fn on_channel_create(&mut self, name: String) { + let (cmd, rec) = StorageCmd::new_channel_create(name.clone()); + self.storage.send(cmd).unwrap(); + let id = rec.await.unwrap().to_u64(); + let request = ServerRequest::new_channel_create(id, name); + let command = SessionCmd::new_broadcast(request); + self.sessions.send(command).unwrap(); + } + + fn on_channel_delete(&mut self, id: u64) { + let command = StorageCmd::new_channel_delete(id.into()); + self.storage.send(command).unwrap(); + let request = ServerRequest::new_channel_delete(id); + let command = SessionCmd::new_broadcast(request); + self.sessions.send(command).unwrap(); + } + + async fn on_channel_list(&mut self, address: Addr) { + let (cmd, rec) = StorageCmd::new_channel_list(); + self.storage.send(cmd).unwrap(); + let channels = rec.await.unwrap().iter().map(|id| id.to_u64()).collect(); + let request = ServerRequest::new_channel_list(channels); + let command = SessionCmd::new_send(address, request); + self.sessions.send(command).unwrap(); + } + + async fn on_channel_get_name(&mut self, id: u64, address: Addr) { + let (cmd, rec) = StorageCmd::new_channel_get_name(id.into()); + self.storage.send(cmd).unwrap(); + let name = rec.await.unwrap(); + let request = ServerRequest::new_channel_get_name(id, name); + let command = SessionCmd::new_send(address, request); + self.sessions.send(command).unwrap(); + } + + fn on_channel_set_name(&mut self, id: u64, name: String) { + let command = StorageCmd::new_channel_set_name(id.into(), name.clone()); + self.storage.send(command).unwrap(); + let request = ServerRequest::new_channel_set_name(id, name); + let command = SessionCmd::new_broadcast(request); + self.sessions.send(command).unwrap(); } } diff --git a/harsh-server/src/main.rs b/harsh-server/src/main.rs index 4da1dcc..bad085c 100644 --- a/harsh-server/src/main.rs +++ b/harsh-server/src/main.rs @@ -1,26 +1,26 @@ use telecomande::{Executor, SimpleExecutor}; use tokio::net::TcpListener; -#[tokio::main] +const ADDRESS: &'static str = "localhost:42069"; +const DB_PATH: &'static str = "./db.test"; +#[tokio::main] async fn main() { println!("[main/info] starting server ..."); let sessions = SimpleExecutor::new(SessionProc::default()).spawn(); println!("[main/info] spawned sessions"); - let storage = SimpleExecutor::new(StorageProc::new("/tmp/db.test")).spawn(); + let storage = SimpleExecutor::new(StorageProc::new(DB_PATH)).spawn(); println!("[main/info] spawned storage"); let gateway = SimpleExecutor::new(GatewayProc::new(sessions.remote(), storage.remote())).spawn(); println!("[main/info] spawned gateway"); - let listener = TcpListener::bind("localhost:8080").await.unwrap(); - println!("[main/info] listening on 'localhost:8080' ..."); + let listener = TcpListener::bind(ADDRESS).await.unwrap(); + println!("[main/info] listening on '{ADDRESS}' ..."); let client_handler = sessions.remote(); loop { let (stream, address) = listener.accept().await.unwrap(); - println!("[main/info] new connection from '{address:?}'"); - client_handler .send(sessions::SessionCmd::AddSession( stream, diff --git a/harsh-server/src/sessions.rs b/harsh-server/src/sessions.rs index cb27a7a..a52955e 100644 --- a/harsh-server/src/sessions.rs +++ b/harsh-server/src/sessions.rs @@ -17,6 +17,7 @@ pub enum SessionCmd { AddSession(TcpStream, SocketAddr, Remote), RemoveSession(Addr), Send(Addr, String), + Broadcast(String), } impl SessionCmd { @@ -24,6 +25,11 @@ impl SessionCmd { let content = request.serialize(); Self::Send(address, content) } + + pub fn new_broadcast(request: ServerRequest) -> Self { + let content = request.serialize(); + Self::Broadcast(content) + } } #[derive(Debug, Default)] @@ -53,18 +59,30 @@ impl Processor for SessionProc { async fn handle(&mut self, command: Self::Command) -> Result<(), Self::Error> { match command { SessionCmd::AddSession(stream, address, remote) => { + println!("[sessions/info] new connection from '{address:?}'"); let address = Addr::new(address); self.add_client(stream, address, remote) } SessionCmd::RemoveSession(address) => { - self.clients.remove(&address); + println!("[sessions/info] closed connection from '{address:?}'"); + if let Some((_writer, handle)) = self.clients.remove(&address) { + handle.await.unwrap(); + } } SessionCmd::Send(address, content) => { if let Some((client, _)) = self.clients.get_mut(&address) { + println!("[session/info] sending '{content}' to '{address:?}'"); client.write_all(content.as_bytes()).await.unwrap(); client.write_all(b"\n").await.unwrap(); } else { - println!("failed to find session with address '{address:?}'") + eprintln!("failed to find session with address '{address:?}'") + } + } + SessionCmd::Broadcast(content) => { + for (client, _) in self.clients.values_mut() { + println!("[session/info] broadcasting '{content}'"); + client.write_all(content.as_bytes()).await.unwrap(); + client.write_all(b"\n").await.unwrap(); } } }; @@ -76,9 +94,14 @@ async fn session(address: Addr, reader: OwnedReadHalf, remote: Remote { + eprintln!("[session/error] {error}"); + } + Ok(0) => { + break; + } + _ => (), } remote .send(gateway::GatewayCmd::Request(address.clone(), line.clone())) diff --git a/harsh-server/src/storage.rs b/harsh-server/src/storage.rs index 2ca8d7e..4bb68e2 100644 --- a/harsh-server/src/storage.rs +++ b/harsh-server/src/storage.rs @@ -6,28 +6,67 @@ use crate::Id; #[derive(Debug)] pub enum StorageCmd { + ChannelList(Sender>), ChannelCreate(String, Sender), ChannelDelete(Id), - ChannelList(Sender>), ChannelGetName(Id, Sender>), + ChannelSetName(Id, String), + MessageList(Id, Sender>), + MessageCreate(Id, String, Sender), + MessageDelete(Id, Id), + MessageGetContent(Id, Id, Sender>), + MessageSetContent(Id, Id, String), } impl StorageCmd { - pub fn new_channel_create(name: impl ToString) -> (Self, Receiver) { - let (s, r) = oneshot::channel(); - (Self::ChannelCreate(name.to_string(), s), r) - } - pub fn new_channel_delete(id: Id) -> Self { - Self::ChannelDelete(id) - } pub fn new_channel_list() -> (Self, Receiver>) { let (s, r) = oneshot::channel(); (Self::ChannelList(s), r) } + + pub fn new_channel_create(name: impl ToString) -> (Self, Receiver) { + let (s, r) = oneshot::channel(); + (Self::ChannelCreate(name.to_string(), s), r) + } + + pub fn new_channel_delete(id: Id) -> Self { + Self::ChannelDelete(id) + } + pub fn new_channel_get_name(id: Id) -> (Self, Receiver>) { let (s, r) = oneshot::channel(); (Self::ChannelGetName(id, s), r) } + + pub fn new_channel_set_name(id: Id, name: String) -> Self { + Self::ChannelSetName(id, name) + } + + pub fn new_message_list(channel_id: Id) -> (Self, Receiver>) { + let (sender, receiver) = oneshot::channel(); + let cmd = Self::MessageList(channel_id, sender); + (cmd, receiver) + } + + pub fn new_message_create(channel_id: Id, content: String) -> (Self, Receiver) { + let (sender, receiver) = oneshot::channel(); + let cmd = Self::MessageCreate(channel_id, content, sender); + (cmd, receiver) + } + + pub fn new_message_delete(channel_id: Id, id: Id) -> Self { + Self::MessageDelete(channel_id, id) + } + + pub fn new_message_get_content(channel_id: Id, id: Id) -> (Self, Receiver>) { + let (sender, receiver) = oneshot::channel(); + let cmd = Self::MessageGetContent(channel_id, id, sender); + (cmd, receiver) + } + + pub fn new_message_set_content(channel_id: Id, id: Id, content: String) -> Self { + Self::MessageSetContent(channel_id, id, content) + } } pub struct StorageProc { @@ -50,6 +89,7 @@ impl StorageProc { T: SerDeser, { let path = path.to_string(); + println!("[storage/info] setting entry at '{path}'"); T::read(&self.base, path) } fn set(&self, path: S, item: T) @@ -58,12 +98,15 @@ impl StorageProc { T: SerDeser, { let path = path.to_string(); + println!("[storage/info] getting entry at '{path}'"); item.write(&self.base, path) } fn list(&self, path: impl ToString) -> Vec { let path = path.to_string(); - list(&self.base, path).collect() // TODO: turn into iterator with limits + println!("[storage/info] listing entries in '{path}'"); + let db = &self.base; + list(db, path) } // firsts (x) @@ -73,8 +116,112 @@ impl StorageProc { fn remove(&self, path: impl ToString) { let path = path.to_string(); - remove(&self.base, path) + println!("[storage/info] removing entry at '{path}'"); + self.base.remove(path).unwrap(); } + + async fn handle_command(&mut self, command: StorageCmd) { + match command { + StorageCmd::ChannelList(sender) => { + self.on_channel_list(sender); + } + StorageCmd::ChannelCreate(name, sender) => { + self.on_channel_create(name, sender); + } + StorageCmd::ChannelDelete(id) => self.on_channel_remove(id), + StorageCmd::ChannelGetName(id, sender) => { + self.on_channel_get_name(id, sender); + } + StorageCmd::ChannelSetName(id, name) => { + self.on_channel_set_name(id, name); + } + // ChannelGetParent + + // + StorageCmd::MessageList(channel_id, sender) => { + self.on_message_list(channel_id, sender); + } + StorageCmd::MessageCreate(channel_id, content, sender) => { + self.on_message_create(channel_id, content, sender); + } + StorageCmd::MessageDelete(channel_id, id) => { + self.on_message_delete(channel_id, id); + } + StorageCmd::MessageGetContent(channel_id, id, sender) => { + self.on_message_get_content(channel_id, id, sender); + } + StorageCmd::MessageSetContent(channel_id, id, content) => { + self.on_message_set_content(channel_id, id, content); + } + }; + } + + fn on_message_set_content(&mut self, channel_id: Id, id: Id, content: String) { + let path = format!("/messages/{channel_id}/{id}"); + if let Some(mut message) = self.get::<_, Message>(&path) { + message.set_content(content); + self.set(path, message); + } + } + + fn on_message_get_content(&mut self, channel_id: Id, id: Id, sender: Sender>) { + let message = self.get::<_, Message>(format!("/messages/{channel_id}/{id}")); + let content = message.map(|m| m.get_content().to_string()); + sender.send(content).unwrap() + } + + fn on_message_delete(&mut self, channel_id: Id, id: Id) { + self.remove(format!("/messages/{channel_id}/{id}")); + } + + fn on_message_create(&mut self, channel_id: Id, content: String, sender: Sender) { + let message = Message::new(content); + let id = message.get_id(); + self.set(format!("/messages/{channel_id}/{id}"), message); + sender.send(id).unwrap(); + } + + fn on_message_list(&mut self, channel_id: Id, sender: Sender>) { + let items = self.list(format!("/messages/{channel_id}/")); + sender.send(items).unwrap(); + } + + // + // Channels + // + fn on_channel_list(&mut self, sender: Sender>) { + let results = self.list("/channels/"); + sender.send(results).unwrap(); + } + + fn on_channel_create(&mut self, name: String, sender: Sender) { + let item = Channel::new(name); + let id = item.get_id(); + self.set(format!("/channels/{id}"), item); + sender.send(id).unwrap(); + } + + fn on_channel_remove(&mut self, id: Id) { + self.remove(format!("/channels/{id}")) + } + + fn on_channel_get_name(&mut self, id: Id, sender: Sender>) { + let channel = self.get::<_, Channel>(format!("/channels/{id}")); + let name = channel.map(|channel| channel.get_name().to_string()); + sender.send(name).unwrap(); + } + + fn on_channel_set_name(&mut self, id: Id, name: String) { + let path = format!("/channels/{id}"); + if let Some(mut channel) = self.get::<_, Channel>(&path) { + channel.set_name(name); + self.set(path, channel); + } + } + + // + // Messages + // } #[telecomande::async_trait] @@ -84,43 +231,15 @@ impl Processor for StorageProc { type Error = (); async fn handle(&mut self, command: Self::Command) -> Result<(), Self::Error> { - match command { - // channels - StorageCmd::ChannelDelete(id) => self.remove(format!("/channels/{id}")), - StorageCmd::ChannelCreate(name, sender) => { - let item = Channel::new(name); - let id = item.get_id(); - self.set(format!("/channels/{id}"), item); - sender.send(id).unwrap(); - } - StorageCmd::ChannelList(sender) => { - let results = self.list("/channels/"); - sender.send(results).unwrap(); - } - StorageCmd::ChannelGetName(id, sender) => { - let result = self - .get::<_, Channel>(format!("/channels/{id}")) - .map(|channel| channel.get_name().to_string()); - sender.send(result).unwrap(); - } // - // ChannelGetParent - - // messages - // c - // d - // l - // gcontent - }; - + self.handle_command(command).await; Ok(()) } } mod models; -pub use models::{Channel, Msg, SerDeser, User}; +pub use models::{Channel, Message, SerDeser, User}; -fn list(db: &Db, path: impl ToString) -> impl Iterator { - let path = path.to_string(); +fn list(db: &Db, path: String) -> Vec { let len = path.len(); db.scan_prefix(path) .filter_map(move |result| -> Option { @@ -129,74 +248,8 @@ fn list(db: &Db, path: impl ToString) -> impl Iterator { let suffix = &string[len..]; Id::from_string(suffix) }) + .collect() // TODO: turn into iterator with limits } -fn remove(db: &Db, path: impl ToString) { - let path = path.to_string(); - db.remove(path).unwrap(); -} - -#[test] -fn test_list() { - let db = sled::open("/tmp/test-db").unwrap(); - db.insert("/some/path/123", b"hello1").unwrap(); - db.insert("/some/path/1234", b"hello2").unwrap(); - db.insert("/some/path/12345", b"hello3").unwrap(); - let results = list(&db, "/some/path/".to_string()); - let vec = results.collect::>(); - assert_eq!( - vec, - vec![ - Id::from_string("123").unwrap(), - Id::from_string("1234").unwrap(), - Id::from_string("12345").unwrap() - ] - ); -} - -#[tokio::test] -async fn test_channels() { - use telecomande::{Executor, SimpleExecutor}; - // cleaning; - std::fs::remove_dir_all("/tmp/db-test").ok(); - - // instantiation - let store = SimpleExecutor::new(StorageProc::new("/tmp/db-test")).spawn(); - let remote = store.remote(); - - // insertion - let (cmd, rec) = StorageCmd::new_channel_create("a-channel"); - remote.send(cmd).unwrap(); - let id = rec.await.unwrap(); - - // query all - let (cmd, rec) = StorageCmd::new_channel_list(); - remote.send(cmd).unwrap(); - let result = rec.await.unwrap(); - assert_eq!(result.len(), 1); - let first = result[0]; - assert_eq!(first, id); - - // query property - let (cmd, rec) = StorageCmd::new_channel_get_name(id); - remote.send(cmd).unwrap(); - let result = rec.await.unwrap(); - assert_eq!(result.unwrap(), "a-channel".to_string()); - - // insertion - let (cmd, rec) = StorageCmd::new_channel_create("b-channel"); - remote.send(cmd).unwrap(); - let id2 = rec.await.unwrap(); - - // query all - let (cmd, rec) = StorageCmd::new_channel_list(); - remote.send(cmd).unwrap(); - let result = rec.await.unwrap(); - assert_eq!(result.len(), 2); - - // query property - let (cmd, rec) = StorageCmd::new_channel_get_name(id2); - remote.send(cmd).unwrap(); - let result = rec.await.unwrap(); - assert_eq!(result.unwrap(), "b-channel".to_string()); -} +#[cfg(test)] +mod tests; diff --git a/harsh-server/src/storage/models.rs b/harsh-server/src/storage/models.rs index c576281..c1a6c3c 100644 --- a/harsh-server/src/storage/models.rs +++ b/harsh-server/src/storage/models.rs @@ -22,6 +22,10 @@ impl Channel { pub fn get_name(&self) -> &str { &self.name } + + pub fn set_name(&mut self, name: String) { + self.name = name; + } } #[derive(Debug, Serialize, Deserialize)] @@ -31,11 +35,28 @@ pub struct User { } #[derive(Debug, Serialize, Deserialize)] -pub struct Msg { +pub struct Message { id: Id, content: String, } +impl Message { + pub fn new(content: String) -> Self { + let id = Id::from_now(); + Self { id, content } + } + + pub fn get_id(&self) -> Id { + self.id + } + pub fn get_content(&self) -> &str { + &self.content + } + pub fn set_content(&mut self, content: String) { + self.content = content; + } +} + pub trait SerDeser: Serialize + DeserializeOwned { fn ser(&self) -> Vec; fn deser(input: &[u8]) -> Option; diff --git a/harsh-server/src/storage/tests.rs b/harsh-server/src/storage/tests.rs new file mode 100644 index 0000000..657c6bb --- /dev/null +++ b/harsh-server/src/storage/tests.rs @@ -0,0 +1,64 @@ +use super::*; +#[test] +fn test_list() { + let db = sled::open("/tmp/test-db").unwrap(); + db.insert("/some/path/123", b"hello1").unwrap(); + db.insert("/some/path/1234", b"hello2").unwrap(); + db.insert("/some/path/12345", b"hello3").unwrap(); + let results = list(&db, "/some/path/".to_string()); + assert_eq!( + results, + vec![ + Id::from_string("123").unwrap(), + Id::from_string("1234").unwrap(), + Id::from_string("12345").unwrap() + ] + ); +} + +#[tokio::test] +async fn test_channels() { + use telecomande::{Executor, SimpleExecutor}; + // cleaning; + std::fs::remove_dir_all("/tmp/db-test").ok(); + + // instantiation + let store = SimpleExecutor::new(StorageProc::new("/tmp/db-test")).spawn(); + let remote = store.remote(); + + // insertion + let (cmd, rec) = StorageCmd::new_channel_create("a-channel"); + remote.send(cmd).unwrap(); + let id = rec.await.unwrap(); + + // query all + let (cmd, rec) = StorageCmd::new_channel_list(); + remote.send(cmd).unwrap(); + let result = rec.await.unwrap(); + assert_eq!(result.len(), 1); + let first = result[0]; + assert_eq!(first, id); + + // query property + let (cmd, rec) = StorageCmd::new_channel_get_name(id); + remote.send(cmd).unwrap(); + let result = rec.await.unwrap(); + assert_eq!(result.unwrap(), "a-channel".to_string()); + + // insertion + let (cmd, rec) = StorageCmd::new_channel_create("b-channel"); + remote.send(cmd).unwrap(); + let id2 = rec.await.unwrap(); + + // query all + let (cmd, rec) = StorageCmd::new_channel_list(); + remote.send(cmd).unwrap(); + let result = rec.await.unwrap(); + assert_eq!(result.len(), 2); + + // query property + let (cmd, rec) = StorageCmd::new_channel_get_name(id2); + remote.send(cmd).unwrap(); + let result = rec.await.unwrap(); + assert_eq!(result.unwrap(), "b-channel".to_string()); +} diff --git a/harsh-server/src/utils.rs b/harsh-server/src/utils.rs index 997bb6b..620cfbe 100644 --- a/harsh-server/src/utils.rs +++ b/harsh-server/src/utils.rs @@ -37,6 +37,12 @@ impl Id { } } +impl From for Id { + fn from(input: u64) -> Self { + Self::from_u64(input) + } +} + #[test] fn test_string_convertion() { let id = Id::from_now();