implemented notifications and messages

This commit is contained in:
mb 2022-08-22 15:54:36 +03:00
parent 28a3812812
commit 695ac6daa2
12 changed files with 1023 additions and 315 deletions

View file

@ -15,48 +15,139 @@ pub struct GatewayProc {
}
impl GatewayProc {
async fn handle_request(&mut self, address: Addr, request: ClientRequest) {
match request {
ClientRequest::Ping(client::Ping { content }) => {
self.on_ping(content, address);
}
ClientRequest::ChannelCreate(client::ChannelCreate { name }) => {
self.on_channel_create(name).await;
}
ClientRequest::ChannelDelete(client::ChannelDelete { id }) => {
self.on_channel_delete(id);
}
ClientRequest::ChannelList(client::ChannelList {}) => {
self.on_channel_list(address).await;
}
ClientRequest::ChannelGetName(client::ChannelGetName { id }) => {
self.on_channel_get_name(id, address).await;
}
ClientRequest::ChannelSetName(client::ChannelSetName { id, name }) => {
self.on_channel_set_name(id, name);
}
ClientRequest::MessageList(client::MessageList { channel_id }) => {
let (cmd, rec) = StorageCmd::new_message_list(channel_id.into());
self.storage.send(cmd).unwrap();
let messages = rec.await.unwrap().iter().map(Id::to_u64).collect();
let request = ServerRequest::new_message_list(channel_id, messages);
let command = SessionCmd::new_send(address, request);
self.sessions.send(command).unwrap();
}
ClientRequest::MessageCreate(client::MessageCreate {
channel_id,
content,
}) => {
let (cmd, rec) = StorageCmd::new_message_create(channel_id.into(), content.clone());
self.storage.send(cmd).unwrap();
let id = rec.await.unwrap();
let request = ServerRequest::new_message_create(channel_id, id.to_u64(), content);
let command = SessionCmd::new_broadcast(request);
self.sessions.send(command).unwrap();
}
ClientRequest::MessageDelete(client::MessageDelete { id, channel_id }) => {
let command = StorageCmd::new_message_delete(channel_id.into(), id.into());
self.storage.send(command).unwrap();
let request = ServerRequest::new_message_delete(channel_id, id);
let command = SessionCmd::new_broadcast(request);
self.sessions.send(command).unwrap();
}
ClientRequest::MessageGetContent(client::MessageGetContent { id, channel_id }) => {
let (cmd, rec) = StorageCmd::new_message_get_content(channel_id.into(), id.into());
self.storage.send(cmd).unwrap();
let request =
ServerRequest::new_message_get_content(channel_id, id, rec.await.unwrap());
let command = SessionCmd::new_send(address, request);
self.sessions.send(command).unwrap();
}
ClientRequest::MessageSetContent(client::MessageSetContent {
content,
id,
channel_id,
}) => {
let command = StorageCmd::new_message_set_content(
channel_id.into(),
id.into(),
content.clone(),
);
self.storage.send(command).unwrap();
let request = ServerRequest::new_message_set_content(channel_id, id, content);
let command = SessionCmd::new_broadcast(request);
self.sessions.send(command).unwrap();
}
}
}
pub fn new(sessions: Remote<SessionProc>, storage: Remote<StorageProc>) -> Self {
Self { sessions, storage }
}
async fn handle_request(&mut self, address: Addr, request: ClientRequest) {
match request {
ClientRequest::Ping(client::Ping { content }) => {
println!("received ping! '{content:?}'");
let request = ServerRequest::Pong(server::Pong { content });
self.sessions
.send(SessionCmd::new_send(address, request))
.unwrap();
}
ClientRequest::ChannelList(client::ChannelList {}) => {
let (cmd, rec) = StorageCmd::new_channel_list();
self.storage.send(cmd).unwrap();
let channels = rec.await.unwrap().iter().map(|id| id.to_u64()).collect();
let request = ServerRequest::new_channel_list(channels);
self.sessions
.send(SessionCmd::new_send(address, request))
.unwrap();
}
ClientRequest::ChannelCreate(client::ChannelCreate { name }) => {
let (cmd, rec) = StorageCmd::new_channel_create(name);
let _id = rec.await.unwrap();
self.storage.send(cmd).unwrap();
}
ClientRequest::ChannelDelete(client::ChannelDelete { channel_id }) => {
self.storage
.send(StorageCmd::ChannelDelete(Id::from_u64(channel_id)))
.unwrap();
}
ClientRequest::ChannelGetName(client::ChannelGetName { channel_id }) => {
let (cmd, rec) = StorageCmd::new_channel_get_name(Id::from_u64(channel_id));
self.storage.send(cmd).unwrap();
let name = rec.await.unwrap();
let request = ServerRequest::new_channel_get_name(channel_id, name);
self.sessions
.send(SessionCmd::new_send(address, request))
.unwrap();
}
}
fn on_ping(&mut self, content: String, address: Addr) {
println!("[gateway/PING] '{content:?}'");
let request = ServerRequest::Pong(server::Pong { content });
let command = SessionCmd::new_send(address, request);
self.sessions.send(command).unwrap();
}
async fn on_channel_create(&mut self, name: String) {
let (cmd, rec) = StorageCmd::new_channel_create(name.clone());
self.storage.send(cmd).unwrap();
let id = rec.await.unwrap().to_u64();
let request = ServerRequest::new_channel_create(id, name);
let command = SessionCmd::new_broadcast(request);
self.sessions.send(command).unwrap();
}
fn on_channel_delete(&mut self, id: u64) {
let command = StorageCmd::new_channel_delete(id.into());
self.storage.send(command).unwrap();
let request = ServerRequest::new_channel_delete(id);
let command = SessionCmd::new_broadcast(request);
self.sessions.send(command).unwrap();
}
async fn on_channel_list(&mut self, address: Addr) {
let (cmd, rec) = StorageCmd::new_channel_list();
self.storage.send(cmd).unwrap();
let channels = rec.await.unwrap().iter().map(|id| id.to_u64()).collect();
let request = ServerRequest::new_channel_list(channels);
let command = SessionCmd::new_send(address, request);
self.sessions.send(command).unwrap();
}
async fn on_channel_get_name(&mut self, id: u64, address: Addr) {
let (cmd, rec) = StorageCmd::new_channel_get_name(id.into());
self.storage.send(cmd).unwrap();
let name = rec.await.unwrap();
let request = ServerRequest::new_channel_get_name(id, name);
let command = SessionCmd::new_send(address, request);
self.sessions.send(command).unwrap();
}
fn on_channel_set_name(&mut self, id: u64, name: String) {
let command = StorageCmd::new_channel_set_name(id.into(), name.clone());
self.storage.send(command).unwrap();
let request = ServerRequest::new_channel_set_name(id, name);
let command = SessionCmd::new_broadcast(request);
self.sessions.send(command).unwrap();
}
}

View file

@ -1,26 +1,26 @@
use telecomande::{Executor, SimpleExecutor};
use tokio::net::TcpListener;
#[tokio::main]
const ADDRESS: &'static str = "localhost:42069";
const DB_PATH: &'static str = "./db.test";
#[tokio::main]
async fn main() {
println!("[main/info] starting server ...");
let sessions = SimpleExecutor::new(SessionProc::default()).spawn();
println!("[main/info] spawned sessions");
let storage = SimpleExecutor::new(StorageProc::new("/tmp/db.test")).spawn();
let storage = SimpleExecutor::new(StorageProc::new(DB_PATH)).spawn();
println!("[main/info] spawned storage");
let gateway =
SimpleExecutor::new(GatewayProc::new(sessions.remote(), storage.remote())).spawn();
println!("[main/info] spawned gateway");
let listener = TcpListener::bind("localhost:8080").await.unwrap();
println!("[main/info] listening on 'localhost:8080' ...");
let listener = TcpListener::bind(ADDRESS).await.unwrap();
println!("[main/info] listening on '{ADDRESS}' ...");
let client_handler = sessions.remote();
loop {
let (stream, address) = listener.accept().await.unwrap();
println!("[main/info] new connection from '{address:?}'");
client_handler
.send(sessions::SessionCmd::AddSession(
stream,

View file

@ -17,6 +17,7 @@ pub enum SessionCmd {
AddSession(TcpStream, SocketAddr, Remote<gateway::GatewayProc>),
RemoveSession(Addr),
Send(Addr, String),
Broadcast(String),
}
impl SessionCmd {
@ -24,6 +25,11 @@ impl SessionCmd {
let content = request.serialize();
Self::Send(address, content)
}
pub fn new_broadcast(request: ServerRequest) -> Self {
let content = request.serialize();
Self::Broadcast(content)
}
}
#[derive(Debug, Default)]
@ -53,18 +59,30 @@ impl Processor for SessionProc {
async fn handle(&mut self, command: Self::Command) -> Result<(), Self::Error> {
match command {
SessionCmd::AddSession(stream, address, remote) => {
println!("[sessions/info] new connection from '{address:?}'");
let address = Addr::new(address);
self.add_client(stream, address, remote)
}
SessionCmd::RemoveSession(address) => {
self.clients.remove(&address);
println!("[sessions/info] closed connection from '{address:?}'");
if let Some((_writer, handle)) = self.clients.remove(&address) {
handle.await.unwrap();
}
}
SessionCmd::Send(address, content) => {
if let Some((client, _)) = self.clients.get_mut(&address) {
println!("[session/info] sending '{content}' to '{address:?}'");
client.write_all(content.as_bytes()).await.unwrap();
client.write_all(b"\n").await.unwrap();
} else {
println!("failed to find session with address '{address:?}'")
eprintln!("failed to find session with address '{address:?}'")
}
}
SessionCmd::Broadcast(content) => {
for (client, _) in self.clients.values_mut() {
println!("[session/info] broadcasting '{content}'");
client.write_all(content.as_bytes()).await.unwrap();
client.write_all(b"\n").await.unwrap();
}
}
};
@ -76,9 +94,14 @@ async fn session(address: Addr, reader: OwnedReadHalf, remote: Remote<gateway::G
let mut reader = BufReader::new(reader);
loop {
let mut line = String::new();
if let Err(error) = reader.read_line(&mut line).await {
eprintln!("[session/error] {error}");
break;
match reader.read_line(&mut line).await {
Err(error) => {
eprintln!("[session/error] {error}");
}
Ok(0) => {
break;
}
_ => (),
}
remote
.send(gateway::GatewayCmd::Request(address.clone(), line.clone()))

View file

@ -6,28 +6,67 @@ use crate::Id;
#[derive(Debug)]
pub enum StorageCmd {
ChannelList(Sender<Vec<Id>>),
ChannelCreate(String, Sender<Id>),
ChannelDelete(Id),
ChannelList(Sender<Vec<Id>>),
ChannelGetName(Id, Sender<Option<String>>),
ChannelSetName(Id, String),
MessageList(Id, Sender<Vec<Id>>),
MessageCreate(Id, String, Sender<Id>),
MessageDelete(Id, Id),
MessageGetContent(Id, Id, Sender<Option<String>>),
MessageSetContent(Id, Id, String),
}
impl StorageCmd {
pub fn new_channel_create(name: impl ToString) -> (Self, Receiver<Id>) {
let (s, r) = oneshot::channel();
(Self::ChannelCreate(name.to_string(), s), r)
}
pub fn new_channel_delete(id: Id) -> Self {
Self::ChannelDelete(id)
}
pub fn new_channel_list() -> (Self, Receiver<Vec<Id>>) {
let (s, r) = oneshot::channel();
(Self::ChannelList(s), r)
}
pub fn new_channel_create(name: impl ToString) -> (Self, Receiver<Id>) {
let (s, r) = oneshot::channel();
(Self::ChannelCreate(name.to_string(), s), r)
}
pub fn new_channel_delete(id: Id) -> Self {
Self::ChannelDelete(id)
}
pub fn new_channel_get_name(id: Id) -> (Self, Receiver<Option<String>>) {
let (s, r) = oneshot::channel();
(Self::ChannelGetName(id, s), r)
}
pub fn new_channel_set_name(id: Id, name: String) -> Self {
Self::ChannelSetName(id, name)
}
pub fn new_message_list(channel_id: Id) -> (Self, Receiver<Vec<Id>>) {
let (sender, receiver) = oneshot::channel();
let cmd = Self::MessageList(channel_id, sender);
(cmd, receiver)
}
pub fn new_message_create(channel_id: Id, content: String) -> (Self, Receiver<Id>) {
let (sender, receiver) = oneshot::channel();
let cmd = Self::MessageCreate(channel_id, content, sender);
(cmd, receiver)
}
pub fn new_message_delete(channel_id: Id, id: Id) -> Self {
Self::MessageDelete(channel_id, id)
}
pub fn new_message_get_content(channel_id: Id, id: Id) -> (Self, Receiver<Option<String>>) {
let (sender, receiver) = oneshot::channel();
let cmd = Self::MessageGetContent(channel_id, id, sender);
(cmd, receiver)
}
pub fn new_message_set_content(channel_id: Id, id: Id, content: String) -> Self {
Self::MessageSetContent(channel_id, id, content)
}
}
pub struct StorageProc {
@ -50,6 +89,7 @@ impl StorageProc {
T: SerDeser,
{
let path = path.to_string();
println!("[storage/info] setting entry at '{path}'");
T::read(&self.base, path)
}
fn set<S, T>(&self, path: S, item: T)
@ -58,12 +98,15 @@ impl StorageProc {
T: SerDeser,
{
let path = path.to_string();
println!("[storage/info] getting entry at '{path}'");
item.write(&self.base, path)
}
fn list(&self, path: impl ToString) -> Vec<Id> {
let path = path.to_string();
list(&self.base, path).collect() // TODO: turn into iterator with limits
println!("[storage/info] listing entries in '{path}'");
let db = &self.base;
list(db, path)
}
// firsts (x)
@ -73,8 +116,112 @@ impl StorageProc {
fn remove(&self, path: impl ToString) {
let path = path.to_string();
remove(&self.base, path)
println!("[storage/info] removing entry at '{path}'");
self.base.remove(path).unwrap();
}
async fn handle_command(&mut self, command: StorageCmd) {
match command {
StorageCmd::ChannelList(sender) => {
self.on_channel_list(sender);
}
StorageCmd::ChannelCreate(name, sender) => {
self.on_channel_create(name, sender);
}
StorageCmd::ChannelDelete(id) => self.on_channel_remove(id),
StorageCmd::ChannelGetName(id, sender) => {
self.on_channel_get_name(id, sender);
}
StorageCmd::ChannelSetName(id, name) => {
self.on_channel_set_name(id, name);
}
// ChannelGetParent
//
StorageCmd::MessageList(channel_id, sender) => {
self.on_message_list(channel_id, sender);
}
StorageCmd::MessageCreate(channel_id, content, sender) => {
self.on_message_create(channel_id, content, sender);
}
StorageCmd::MessageDelete(channel_id, id) => {
self.on_message_delete(channel_id, id);
}
StorageCmd::MessageGetContent(channel_id, id, sender) => {
self.on_message_get_content(channel_id, id, sender);
}
StorageCmd::MessageSetContent(channel_id, id, content) => {
self.on_message_set_content(channel_id, id, content);
}
};
}
fn on_message_set_content(&mut self, channel_id: Id, id: Id, content: String) {
let path = format!("/messages/{channel_id}/{id}");
if let Some(mut message) = self.get::<_, Message>(&path) {
message.set_content(content);
self.set(path, message);
}
}
fn on_message_get_content(&mut self, channel_id: Id, id: Id, sender: Sender<Option<String>>) {
let message = self.get::<_, Message>(format!("/messages/{channel_id}/{id}"));
let content = message.map(|m| m.get_content().to_string());
sender.send(content).unwrap()
}
fn on_message_delete(&mut self, channel_id: Id, id: Id) {
self.remove(format!("/messages/{channel_id}/{id}"));
}
fn on_message_create(&mut self, channel_id: Id, content: String, sender: Sender<Id>) {
let message = Message::new(content);
let id = message.get_id();
self.set(format!("/messages/{channel_id}/{id}"), message);
sender.send(id).unwrap();
}
fn on_message_list(&mut self, channel_id: Id, sender: Sender<Vec<Id>>) {
let items = self.list(format!("/messages/{channel_id}/"));
sender.send(items).unwrap();
}
//
// Channels
//
fn on_channel_list(&mut self, sender: Sender<Vec<Id>>) {
let results = self.list("/channels/");
sender.send(results).unwrap();
}
fn on_channel_create(&mut self, name: String, sender: Sender<Id>) {
let item = Channel::new(name);
let id = item.get_id();
self.set(format!("/channels/{id}"), item);
sender.send(id).unwrap();
}
fn on_channel_remove(&mut self, id: Id) {
self.remove(format!("/channels/{id}"))
}
fn on_channel_get_name(&mut self, id: Id, sender: Sender<Option<String>>) {
let channel = self.get::<_, Channel>(format!("/channels/{id}"));
let name = channel.map(|channel| channel.get_name().to_string());
sender.send(name).unwrap();
}
fn on_channel_set_name(&mut self, id: Id, name: String) {
let path = format!("/channels/{id}");
if let Some(mut channel) = self.get::<_, Channel>(&path) {
channel.set_name(name);
self.set(path, channel);
}
}
//
// Messages
//
}
#[telecomande::async_trait]
@ -84,43 +231,15 @@ impl Processor for StorageProc {
type Error = ();
async fn handle(&mut self, command: Self::Command) -> Result<(), Self::Error> {
match command {
// channels
StorageCmd::ChannelDelete(id) => self.remove(format!("/channels/{id}")),
StorageCmd::ChannelCreate(name, sender) => {
let item = Channel::new(name);
let id = item.get_id();
self.set(format!("/channels/{id}"), item);
sender.send(id).unwrap();
}
StorageCmd::ChannelList(sender) => {
let results = self.list("/channels/");
sender.send(results).unwrap();
}
StorageCmd::ChannelGetName(id, sender) => {
let result = self
.get::<_, Channel>(format!("/channels/{id}"))
.map(|channel| channel.get_name().to_string());
sender.send(result).unwrap();
} //
// ChannelGetParent
// messages
// c
// d
// l
// gcontent
};
self.handle_command(command).await;
Ok(())
}
}
mod models;
pub use models::{Channel, Msg, SerDeser, User};
pub use models::{Channel, Message, SerDeser, User};
fn list(db: &Db, path: impl ToString) -> impl Iterator<Item = Id> {
let path = path.to_string();
fn list(db: &Db, path: String) -> Vec<Id> {
let len = path.len();
db.scan_prefix(path)
.filter_map(move |result| -> Option<Id> {
@ -129,74 +248,8 @@ fn list(db: &Db, path: impl ToString) -> impl Iterator<Item = Id> {
let suffix = &string[len..];
Id::from_string(suffix)
})
.collect() // TODO: turn into iterator with limits
}
fn remove(db: &Db, path: impl ToString) {
let path = path.to_string();
db.remove(path).unwrap();
}
#[test]
fn test_list() {
let db = sled::open("/tmp/test-db").unwrap();
db.insert("/some/path/123", b"hello1").unwrap();
db.insert("/some/path/1234", b"hello2").unwrap();
db.insert("/some/path/12345", b"hello3").unwrap();
let results = list(&db, "/some/path/".to_string());
let vec = results.collect::<Vec<_>>();
assert_eq!(
vec,
vec![
Id::from_string("123").unwrap(),
Id::from_string("1234").unwrap(),
Id::from_string("12345").unwrap()
]
);
}
#[tokio::test]
async fn test_channels() {
use telecomande::{Executor, SimpleExecutor};
// cleaning;
std::fs::remove_dir_all("/tmp/db-test").ok();
// instantiation
let store = SimpleExecutor::new(StorageProc::new("/tmp/db-test")).spawn();
let remote = store.remote();
// insertion
let (cmd, rec) = StorageCmd::new_channel_create("a-channel");
remote.send(cmd).unwrap();
let id = rec.await.unwrap();
// query all
let (cmd, rec) = StorageCmd::new_channel_list();
remote.send(cmd).unwrap();
let result = rec.await.unwrap();
assert_eq!(result.len(), 1);
let first = result[0];
assert_eq!(first, id);
// query property
let (cmd, rec) = StorageCmd::new_channel_get_name(id);
remote.send(cmd).unwrap();
let result = rec.await.unwrap();
assert_eq!(result.unwrap(), "a-channel".to_string());
// insertion
let (cmd, rec) = StorageCmd::new_channel_create("b-channel");
remote.send(cmd).unwrap();
let id2 = rec.await.unwrap();
// query all
let (cmd, rec) = StorageCmd::new_channel_list();
remote.send(cmd).unwrap();
let result = rec.await.unwrap();
assert_eq!(result.len(), 2);
// query property
let (cmd, rec) = StorageCmd::new_channel_get_name(id2);
remote.send(cmd).unwrap();
let result = rec.await.unwrap();
assert_eq!(result.unwrap(), "b-channel".to_string());
}
#[cfg(test)]
mod tests;

View file

@ -22,6 +22,10 @@ impl Channel {
pub fn get_name(&self) -> &str {
&self.name
}
pub fn set_name(&mut self, name: String) {
self.name = name;
}
}
#[derive(Debug, Serialize, Deserialize)]
@ -31,11 +35,28 @@ pub struct User {
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Msg {
pub struct Message {
id: Id,
content: String,
}
impl Message {
pub fn new(content: String) -> Self {
let id = Id::from_now();
Self { id, content }
}
pub fn get_id(&self) -> Id {
self.id
}
pub fn get_content(&self) -> &str {
&self.content
}
pub fn set_content(&mut self, content: String) {
self.content = content;
}
}
pub trait SerDeser: Serialize + DeserializeOwned {
fn ser(&self) -> Vec<u8>;
fn deser(input: &[u8]) -> Option<Self>;

View file

@ -0,0 +1,64 @@
use super::*;
#[test]
fn test_list() {
let db = sled::open("/tmp/test-db").unwrap();
db.insert("/some/path/123", b"hello1").unwrap();
db.insert("/some/path/1234", b"hello2").unwrap();
db.insert("/some/path/12345", b"hello3").unwrap();
let results = list(&db, "/some/path/".to_string());
assert_eq!(
results,
vec![
Id::from_string("123").unwrap(),
Id::from_string("1234").unwrap(),
Id::from_string("12345").unwrap()
]
);
}
#[tokio::test]
async fn test_channels() {
use telecomande::{Executor, SimpleExecutor};
// cleaning;
std::fs::remove_dir_all("/tmp/db-test").ok();
// instantiation
let store = SimpleExecutor::new(StorageProc::new("/tmp/db-test")).spawn();
let remote = store.remote();
// insertion
let (cmd, rec) = StorageCmd::new_channel_create("a-channel");
remote.send(cmd).unwrap();
let id = rec.await.unwrap();
// query all
let (cmd, rec) = StorageCmd::new_channel_list();
remote.send(cmd).unwrap();
let result = rec.await.unwrap();
assert_eq!(result.len(), 1);
let first = result[0];
assert_eq!(first, id);
// query property
let (cmd, rec) = StorageCmd::new_channel_get_name(id);
remote.send(cmd).unwrap();
let result = rec.await.unwrap();
assert_eq!(result.unwrap(), "a-channel".to_string());
// insertion
let (cmd, rec) = StorageCmd::new_channel_create("b-channel");
remote.send(cmd).unwrap();
let id2 = rec.await.unwrap();
// query all
let (cmd, rec) = StorageCmd::new_channel_list();
remote.send(cmd).unwrap();
let result = rec.await.unwrap();
assert_eq!(result.len(), 2);
// query property
let (cmd, rec) = StorageCmd::new_channel_get_name(id2);
remote.send(cmd).unwrap();
let result = rec.await.unwrap();
assert_eq!(result.unwrap(), "b-channel".to_string());
}

View file

@ -37,6 +37,12 @@ impl Id {
}
}
impl From<u64> for Id {
fn from(input: u64) -> Self {
Self::from_u64(input)
}
}
#[test]
fn test_string_convertion() {
let id = Id::from_now();