implemented channel queries

This commit is contained in:
mb 2022-08-22 11:25:49 +03:00
parent 62788c1b26
commit 28a3812812
10 changed files with 208 additions and 67 deletions

View file

@ -1,6 +1,9 @@
use std::time::Duration;
use tokio::{ use tokio::{
io::{stdin, AsyncBufReadExt, AsyncWriteExt, BufReader}, io::{stdin, AsyncBufReadExt, AsyncWriteExt, BufReader},
net::TcpStream, net::TcpStream,
time,
}; };
#[tokio::main] #[tokio::main]
@ -15,6 +18,7 @@ async fn main() {
let mut line = String::new(); let mut line = String::new();
reader.read_line(&mut line).await.unwrap(); reader.read_line(&mut line).await.unwrap();
println!("received '{line}'"); println!("received '{line}'");
time::sleep(Duration::from_millis(100)).await;
} }
}); });
@ -43,10 +47,11 @@ async fn main() {
} }
mod commands { mod commands {
use harsh_common::ClientRequest;
pub enum Command { pub enum Command {
Help, Help,
Request(harsh_common::ClientRequest), Request(ClientRequest),
} }
pub fn parse(input: &str) -> Option<Command> { pub fn parse(input: &str) -> Option<Command> {
@ -56,7 +61,20 @@ mod commands {
"ping" => { "ping" => {
let rest = parts.collect::<Box<[_]>>(); let rest = parts.collect::<Box<[_]>>();
let content = rest.join(" "); let content = rest.join(" ");
harsh_common::ClientRequest::new_ping(content) ClientRequest::new_ping(content)
}
"chanls" => ClientRequest::new_channel_list(),
"chanadd" => {
let name = parts.next()?;
ClientRequest::new_channel_create(name)
}
"chandel" => {
let id = parts.next()?.parse().ok()?;
ClientRequest::new_channel_delete(id)
}
"changetname" => {
let id = parts.next()?.parse().ok()?;
ClientRequest::new_channel_get_name(id)
} }
_ => return None, _ => return None,
}; };
@ -64,6 +82,20 @@ mod commands {
Some(Command::Request(command)) Some(Command::Request(command))
} }
pub const CMDS: &'static [Description] = &[
// all commands
Description::new("help", &[], "returns a help message"),
Description::new(
"ping",
&["content"],
"sends a ping with the specified content",
),
Description::new("chanls", &[], "list channels"),
Description::new("chanadd", &["name"], "creates a new channel"),
Description::new("chandel", &["id"], "delete a channel by its id"),
Description::new("changetname", &["id"], "get a channel's name"),
];
pub fn smart_split(input: &str) -> Vec<String> { pub fn smart_split(input: &str) -> Vec<String> {
let input = input.trim(); let input = input.trim();
let mut result = Vec::new(); let mut result = Vec::new();
@ -131,20 +163,10 @@ mod commands {
) -> Self { ) -> Self {
Self { name, desc, params } Self { name, desc, params }
} }
pub const ALL: &'static [Self] = &[
// all commands
Self::new("help", &[], "returns a help message"),
Self::new(
"ping",
&["content"],
"sends a ping with the specified content",
),
];
} }
pub fn help() { pub fn help() {
for &Description { name, params, desc } in Description::ALL { for &Description { name, params, desc } in CMDS {
let mut usage = params.iter().map(|s| s.to_string()).collect::<Vec<_>>(); let mut usage = params.iter().map(|s| s.to_string()).collect::<Vec<_>>();
usage.insert(0, name.to_string()); usage.insert(0, name.to_string());
let usage = usage.join(" "); let usage = usage.join(" ");

View file

@ -1,29 +1,77 @@
#[derive(Debug)]
pub enum ClientRequest {
Ping(Ping),
}
#[derive(Debug)] #[derive(Debug)]
pub struct Ping { pub struct Ping {
pub content: String, pub content: String,
} }
#[derive(Debug)]
pub struct ChannelList {}
#[derive(Debug)]
pub struct ChannelCreate {
pub name: String,
}
#[derive(Debug)]
pub struct ChannelDelete {
pub channel_id: u64,
}
#[derive(Debug)]
pub struct ChannelGetName {
pub channel_id: u64,
}
#[derive(Debug)]
pub enum ClientRequest {
Ping(Ping),
ChannelList(ChannelList),
ChannelCreate(ChannelCreate),
ChannelDelete(ChannelDelete),
ChannelGetName(ChannelGetName),
}
impl ClientRequest { impl ClientRequest {
pub fn new_ping(content: String) -> Self { pub fn new_ping(content: String) -> Self {
Self::Ping(Ping { content }) Self::Ping(Ping { content })
} }
pub fn new_channel_list() -> Self {
Self::ChannelList(ChannelList {})
}
pub fn new_channel_create(name: String) -> Self {
Self::ChannelCreate(ChannelCreate { name })
}
pub fn new_channel_delete(channel_id: u64) -> Self {
Self::ChannelDelete(ChannelDelete { channel_id })
}
pub fn new_channel_get_name(channel_id: u64) -> Self {
Self::ChannelGetName(ChannelGetName { channel_id })
}
pub fn try_parse(line: &str) -> Option<Self> { pub fn try_parse(line: &str) -> Option<Self> {
use repr::Command::*;
let command: repr::Command = serde_json::from_str(line).ok()?; let command: repr::Command = serde_json::from_str(line).ok()?;
let mapped = match command { let mapped = match command {
repr::Command::ping { content } => Self::Ping(Ping { content }), ping { content } => Self::Ping(Ping { content }),
channel_list {} => Self::ChannelList(ChannelList {}),
channel_create { name } => Self::ChannelCreate(ChannelCreate { name }),
channel_delete { channel_id } => Self::ChannelDelete(ChannelDelete { channel_id }),
channel_get_name { channel_id } => Self::ChannelGetName(ChannelGetName { channel_id }),
}; };
Some(mapped) Some(mapped)
} }
pub fn serialize(self) -> String { pub fn serialize(self) -> String {
use repr::Command::*;
let mapped = match self { let mapped = match self {
Self::Ping(Ping { content }) => repr::Command::ping { content }, Self::Ping(Ping { content }) => ping { content },
Self::ChannelList(ChannelList {}) => repr::Command::channel_list {},
Self::ChannelCreate(ChannelCreate { name }) => channel_create { name },
Self::ChannelDelete(ChannelDelete { channel_id }) => channel_delete { channel_id },
Self::ChannelGetName(ChannelGetName { channel_id }) => channel_get_name { channel_id },
}; };
serde_json::to_string(&mapped).unwrap() serde_json::to_string(&mapped).unwrap()
} }
@ -38,5 +86,9 @@ mod repr {
#[serde(tag = "type")] #[serde(tag = "type")]
pub enum Command { pub enum Command {
ping { content: String }, ping { content: String },
channel_list {},
channel_create { name: String },
channel_delete { channel_id: u64 },
channel_get_name { channel_id: u64 },
} }
} }

View file

@ -7,8 +7,8 @@ mod tests {
} }
} }
pub use client::{ClientRequest, Ping}; pub use client::ClientRequest;
mod client; pub mod client;
pub use server::{Pong, ServerRequest}; pub use server::ServerRequest;
mod server; pub mod server;

View file

@ -1,27 +1,51 @@
pub enum ServerRequest {
Pong(Pong),
}
pub struct Pong { pub struct Pong {
pub content: String, pub content: String,
} }
pub struct ChannelList {
pub channels: Vec<u64>,
}
pub struct ChannelGetName {
pub id: u64,
pub name: Option<String>,
}
pub enum ServerRequest {
Pong(Pong),
ChannelList(ChannelList),
ChannelGetName(ChannelGetName),
}
impl ServerRequest { impl ServerRequest {
pub fn new_pong(content: String) -> Self { pub fn new_pong(content: String) -> Self {
Self::Pong(Pong { content }) Self::Pong(Pong { content })
} }
pub fn new_channel_list(channels: Vec<u64>) -> Self {
Self::ChannelList(ChannelList { channels })
}
pub fn new_channel_get_name(id: u64, name: Option<String>) -> Self {
Self::ChannelGetName(ChannelGetName { name, id })
}
pub fn try_parse(line: &str) -> Option<Self> { pub fn try_parse(line: &str) -> Option<Self> {
use repr::Command::*;
let command: repr::Command = serde_json::from_str(line).ok()?; let command: repr::Command = serde_json::from_str(line).ok()?;
let mapped = match command { let mapped = match command {
repr::Command::pong { content } => Self::Pong(Pong { content }), pong { content } => Self::Pong(Pong { content }),
channel_list { channels } => Self::ChannelList(ChannelList { channels }),
channel_get_name { id, name } => Self::ChannelGetName(ChannelGetName { id, name }),
}; };
Some(mapped) Some(mapped)
} }
pub fn serialize(self) -> String { pub fn serialize(self) -> String {
use repr::Command::*;
let mapped = match self { let mapped = match self {
Self::Pong(Pong { content }) => repr::Command::pong { content }, Self::Pong(Pong { content }) => pong { content },
Self::ChannelList(ChannelList { channels }) => channel_list { channels },
Self::ChannelGetName(ChannelGetName { id, name }) => channel_get_name { id, name },
}; };
serde_json::to_string(&mapped).unwrap() serde_json::to_string(&mapped).unwrap()
} }
@ -36,5 +60,7 @@ mod repr {
#[serde(tag = "type")] #[serde(tag = "type")]
pub enum Command { pub enum Command {
pong { content: String }, pong { content: String },
channel_list { channels: Vec<u64> },
channel_get_name { id: u64, name: Option<String> },
} }
} }

View file

@ -1,9 +1,7 @@
use harsh_common::{Ping, Pong, ServerRequest}; use harsh_common::{client, server, ClientRequest, ServerRequest};
use telecomande::{Processor, Remote}; use telecomande::{Processor, Remote};
use harsh_common::ClientRequest; use crate::{Addr, Id, SessionCmd, SessionProc, StorageCmd, StorageProc};
use crate::{sessions, Addr, SessionProc, StorageProc};
#[derive(Debug)] #[derive(Debug)]
pub enum GatewayCmd { pub enum GatewayCmd {
@ -12,26 +10,50 @@ pub enum GatewayCmd {
} }
pub struct GatewayProc { pub struct GatewayProc {
client_handler: Remote<SessionProc>, sessions: Remote<SessionProc>,
storage: Remote<StorageProc>, storage: Remote<StorageProc>,
} }
impl GatewayProc { impl GatewayProc {
pub fn new(client_handler: Remote<SessionProc>, storage: Remote<StorageProc>) -> Self { pub fn new(sessions: Remote<SessionProc>, storage: Remote<StorageProc>) -> Self {
Self { Self { sessions, storage }
client_handler,
storage,
}
} }
async fn handle_request(&mut self, address: Addr, request: ClientRequest) { async fn handle_request(&mut self, address: Addr, request: ClientRequest) {
match request { match request {
ClientRequest::Ping(Ping { content }) => { ClientRequest::Ping(client::Ping { content }) => {
println!("received ping! '{content:?}'"); println!("received ping! '{content:?}'");
let response = ServerRequest::Pong(Pong { content }); let request = ServerRequest::Pong(server::Pong { content });
let content = response.serialize(); self.sessions
self.client_handler .send(SessionCmd::new_send(address, request))
.send(sessions::SessionCmd::Send(address, content)) .unwrap();
}
ClientRequest::ChannelList(client::ChannelList {}) => {
let (cmd, rec) = StorageCmd::new_channel_list();
self.storage.send(cmd).unwrap();
let channels = rec.await.unwrap().iter().map(|id| id.to_u64()).collect();
let request = ServerRequest::new_channel_list(channels);
self.sessions
.send(SessionCmd::new_send(address, request))
.unwrap();
}
ClientRequest::ChannelCreate(client::ChannelCreate { name }) => {
let (cmd, rec) = StorageCmd::new_channel_create(name);
let _id = rec.await.unwrap();
self.storage.send(cmd).unwrap();
}
ClientRequest::ChannelDelete(client::ChannelDelete { channel_id }) => {
self.storage
.send(StorageCmd::ChannelDelete(Id::from_u64(channel_id)))
.unwrap();
}
ClientRequest::ChannelGetName(client::ChannelGetName { channel_id }) => {
let (cmd, rec) = StorageCmd::new_channel_get_name(Id::from_u64(channel_id));
self.storage.send(cmd).unwrap();
let name = rec.await.unwrap();
let request = ServerRequest::new_channel_get_name(channel_id, name);
self.sessions
.send(SessionCmd::new_send(address, request))
.unwrap(); .unwrap();
} }
} }
@ -46,14 +68,15 @@ impl Processor for GatewayProc {
match command { match command {
GatewayCmd::Request(address, request) => { GatewayCmd::Request(address, request) => {
if let Some(request) = ClientRequest::try_parse(&request) { if let Some(request) = ClientRequest::try_parse(&request) {
println!("[session/info] received command '{request:?}'");
self.handle_request(address, request).await; self.handle_request(address, request).await;
} else { } else {
println!("failed to parse command"); println!("[session/warn] failed to parse command");
} }
} }
GatewayCmd::ClosedConnection(address) => self GatewayCmd::ClosedConnection(address) => self
.client_handler .sessions
.send(sessions::SessionCmd::RemoveSession(address)) .send(SessionCmd::RemoveSession(address))
.unwrap(), .unwrap(),
} }
Ok(()) Ok(())

View file

@ -4,20 +4,22 @@ use tokio::net::TcpListener;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
println!("starting server ..."); println!("[main/info] starting server ...");
let client_handler = SimpleExecutor::new(SessionProc::default()).spawn(); let sessions = SimpleExecutor::new(SessionProc::default()).spawn();
let storage = SimpleExecutor::new(StorageProc::new("./db")).spawn(); println!("[main/info] spawned sessions");
let storage = SimpleExecutor::new(StorageProc::new("/tmp/db.test")).spawn();
println!("[main/info] spawned storage");
let gateway = let gateway =
SimpleExecutor::new(GatewayProc::new(client_handler.remote(), storage.remote())).spawn(); SimpleExecutor::new(GatewayProc::new(sessions.remote(), storage.remote())).spawn();
println!("spawned gateway"); println!("[main/info] spawned gateway");
let listener = TcpListener::bind("localhost:8080").await.unwrap(); let listener = TcpListener::bind("localhost:8080").await.unwrap();
println!("listening on 'localhost:8080' ..."); println!("[main/info] listening on 'localhost:8080' ...");
let client_handler = client_handler.remote(); let client_handler = sessions.remote();
loop { loop {
let (stream, address) = listener.accept().await.unwrap(); let (stream, address) = listener.accept().await.unwrap();
println!("new connection from '{address:?}'"); println!("[main/info] new connection from '{address:?}'");
client_handler client_handler
.send(sessions::SessionCmd::AddSession( .send(sessions::SessionCmd::AddSession(
@ -38,5 +40,5 @@ pub use gateway::{GatewayCmd, GatewayProc};
mod sessions; mod sessions;
pub use sessions::{SessionCmd, SessionProc}; pub use sessions::{SessionCmd, SessionProc};
pub use storage::{StorageCmd, StorageProc};
mod storage; mod storage;
pub use storage::{StorageCmd, StorageProc};

View file

@ -1,5 +1,6 @@
use std::{collections::HashMap, net::SocketAddr}; use std::{collections::HashMap, net::SocketAddr};
use harsh_common::ServerRequest;
use telecomande::{Processor, Remote}; use telecomande::{Processor, Remote};
use tokio::{ use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
@ -18,6 +19,13 @@ pub enum SessionCmd {
Send(Addr, String), Send(Addr, String),
} }
impl SessionCmd {
pub fn new_send(address: Addr, request: ServerRequest) -> Self {
let content = request.serialize();
Self::Send(address, content)
}
}
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct SessionProc { pub struct SessionProc {
clients: HashMap<Addr, (OwnedWriteHalf, JoinHandle<()>)>, clients: HashMap<Addr, (OwnedWriteHalf, JoinHandle<()>)>,
@ -69,7 +77,7 @@ async fn session(address: Addr, reader: OwnedReadHalf, remote: Remote<gateway::G
loop { loop {
let mut line = String::new(); let mut line = String::new();
if let Err(error) = reader.read_line(&mut line).await { if let Err(error) = reader.read_line(&mut line).await {
eprintln!("{error}"); eprintln!("[session/error] {error}");
break; break;
} }
remote remote

View file

@ -8,23 +8,23 @@ use crate::Id;
pub enum StorageCmd { pub enum StorageCmd {
ChannelCreate(String, Sender<Id>), ChannelCreate(String, Sender<Id>),
ChannelDelete(Id), ChannelDelete(Id),
ChannelGetAll(Sender<Vec<Id>>), ChannelList(Sender<Vec<Id>>),
ChannelGetName(Id, Sender<Option<String>>), ChannelGetName(Id, Sender<Option<String>>),
} }
impl StorageCmd { impl StorageCmd {
fn new_channel_create(name: impl ToString) -> (Self, Receiver<Id>) { pub fn new_channel_create(name: impl ToString) -> (Self, Receiver<Id>) {
let (s, r) = oneshot::channel(); let (s, r) = oneshot::channel();
(Self::ChannelCreate(name.to_string(), s), r) (Self::ChannelCreate(name.to_string(), s), r)
} }
fn new_channel_delete(id: Id) -> Self { pub fn new_channel_delete(id: Id) -> Self {
Self::ChannelDelete(id) Self::ChannelDelete(id)
} }
fn new_channel_get_all() -> (Self, Receiver<Vec<Id>>) { pub fn new_channel_list() -> (Self, Receiver<Vec<Id>>) {
let (s, r) = oneshot::channel(); let (s, r) = oneshot::channel();
(Self::ChannelGetAll(s), r) (Self::ChannelList(s), r)
} }
fn new_channel_get_name(id: Id) -> (Self, Receiver<Option<String>>) { pub fn new_channel_get_name(id: Id) -> (Self, Receiver<Option<String>>) {
let (s, r) = oneshot::channel(); let (s, r) = oneshot::channel();
(Self::ChannelGetName(id, s), r) (Self::ChannelGetName(id, s), r)
} }
@ -93,7 +93,7 @@ impl Processor for StorageProc {
self.set(format!("/channels/{id}"), item); self.set(format!("/channels/{id}"), item);
sender.send(id).unwrap(); sender.send(id).unwrap();
} }
StorageCmd::ChannelGetAll(sender) => { StorageCmd::ChannelList(sender) => {
let results = self.list("/channels/"); let results = self.list("/channels/");
sender.send(results).unwrap(); sender.send(results).unwrap();
} }
@ -170,7 +170,7 @@ async fn test_channels() {
let id = rec.await.unwrap(); let id = rec.await.unwrap();
// query all // query all
let (cmd, rec) = StorageCmd::new_channel_get_all(); let (cmd, rec) = StorageCmd::new_channel_list();
remote.send(cmd).unwrap(); remote.send(cmd).unwrap();
let result = rec.await.unwrap(); let result = rec.await.unwrap();
assert_eq!(result.len(), 1); assert_eq!(result.len(), 1);
@ -189,7 +189,7 @@ async fn test_channels() {
let id2 = rec.await.unwrap(); let id2 = rec.await.unwrap();
// query all // query all
let (cmd, rec) = StorageCmd::new_channel_get_all(); let (cmd, rec) = StorageCmd::new_channel_list();
remote.send(cmd).unwrap(); remote.send(cmd).unwrap();
let result = rec.await.unwrap(); let result = rec.await.unwrap();
assert_eq!(result.len(), 2); assert_eq!(result.len(), 2);

View file

@ -27,6 +27,14 @@ impl Id {
let inner: u64 = input.parse().ok()?; let inner: u64 = input.parse().ok()?;
Some(Self(inner)) Some(Self(inner))
} }
pub fn from_u64(input: u64) -> Self {
Self(input)
}
pub fn to_u64(&self) -> u64 {
self.0
}
} }
#[test] #[test]

0
watch-server.sh Normal file → Executable file
View file