This commit is contained in:
JOLIMAITRE Matthieu 2022-08-22 01:59:56 +02:00
commit 62788c1b26
23 changed files with 1532 additions and 0 deletions

1
harsh-server/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
/target

17
harsh-server/Cargo.toml Normal file
View file

@ -0,0 +1,17 @@
[package]
name = "harsh_server"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
#sleded = "1.0"
sled = "0.34"
telecomande = "1.2"
tokio = { version = "1.20", features = ["full"] }
harsh_common = { path = "../harsh-common/" }
chrono = "0.4"
rand = "0.8.5"

View file

@ -0,0 +1,61 @@
use harsh_common::{Ping, Pong, ServerRequest};
use telecomande::{Processor, Remote};
use harsh_common::ClientRequest;
use crate::{sessions, Addr, SessionProc, StorageProc};
#[derive(Debug)]
pub enum GatewayCmd {
Request(Addr, String),
ClosedConnection(Addr),
}
pub struct GatewayProc {
client_handler: Remote<SessionProc>,
storage: Remote<StorageProc>,
}
impl GatewayProc {
pub fn new(client_handler: Remote<SessionProc>, storage: Remote<StorageProc>) -> Self {
Self {
client_handler,
storage,
}
}
async fn handle_request(&mut self, address: Addr, request: ClientRequest) {
match request {
ClientRequest::Ping(Ping { content }) => {
println!("received ping! '{content:?}'");
let response = ServerRequest::Pong(Pong { content });
let content = response.serialize();
self.client_handler
.send(sessions::SessionCmd::Send(address, content))
.unwrap();
}
}
}
}
#[telecomande::async_trait]
impl Processor for GatewayProc {
type Command = GatewayCmd;
type Error = ();
async fn handle(&mut self, command: Self::Command) -> Result<(), Self::Error> {
match command {
GatewayCmd::Request(address, request) => {
if let Some(request) = ClientRequest::try_parse(&request) {
self.handle_request(address, request).await;
} else {
println!("failed to parse command");
}
}
GatewayCmd::ClosedConnection(address) => self
.client_handler
.send(sessions::SessionCmd::RemoveSession(address))
.unwrap(),
}
Ok(())
}
}

42
harsh-server/src/main.rs Normal file
View file

@ -0,0 +1,42 @@
use telecomande::{Executor, SimpleExecutor};
use tokio::net::TcpListener;
#[tokio::main]
async fn main() {
println!("starting server ...");
let client_handler = SimpleExecutor::new(SessionProc::default()).spawn();
let storage = SimpleExecutor::new(StorageProc::new("./db")).spawn();
let gateway =
SimpleExecutor::new(GatewayProc::new(client_handler.remote(), storage.remote())).spawn();
println!("spawned gateway");
let listener = TcpListener::bind("localhost:8080").await.unwrap();
println!("listening on 'localhost:8080' ...");
let client_handler = client_handler.remote();
loop {
let (stream, address) = listener.accept().await.unwrap();
println!("new connection from '{address:?}'");
client_handler
.send(sessions::SessionCmd::AddSession(
stream,
address,
gateway.remote(),
))
.unwrap();
}
}
mod utils;
pub use utils::{Addr, Id};
mod gateway;
pub use gateway::{GatewayCmd, GatewayProc};
mod sessions;
pub use sessions::{SessionCmd, SessionProc};
pub use storage::{StorageCmd, StorageProc};
mod storage;

View file

@ -0,0 +1,82 @@
use std::{collections::HashMap, net::SocketAddr};
use telecomande::{Processor, Remote};
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
net::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
TcpStream,
},
task::JoinHandle,
};
use crate::{gateway, Addr};
#[derive(Debug)]
pub enum SessionCmd {
AddSession(TcpStream, SocketAddr, Remote<gateway::GatewayProc>),
RemoveSession(Addr),
Send(Addr, String),
}
#[derive(Debug, Default)]
pub struct SessionProc {
clients: HashMap<Addr, (OwnedWriteHalf, JoinHandle<()>)>,
}
impl SessionProc {
fn add_client(
&mut self,
stream: TcpStream,
address: Addr,
remote: Remote<gateway::GatewayProc>,
) {
let (reader, writer) = stream.into_split();
let handle = tokio::spawn(session(address.clone(), reader, remote));
self.clients.insert(address, (writer, handle));
}
}
#[telecomande::async_trait]
impl Processor for SessionProc {
type Command = SessionCmd;
type Error = ();
async fn handle(&mut self, command: Self::Command) -> Result<(), Self::Error> {
match command {
SessionCmd::AddSession(stream, address, remote) => {
let address = Addr::new(address);
self.add_client(stream, address, remote)
}
SessionCmd::RemoveSession(address) => {
self.clients.remove(&address);
}
SessionCmd::Send(address, content) => {
if let Some((client, _)) = self.clients.get_mut(&address) {
client.write_all(content.as_bytes()).await.unwrap();
client.write_all(b"\n").await.unwrap();
} else {
println!("failed to find session with address '{address:?}'")
}
}
};
Ok(())
}
}
async fn session(address: Addr, reader: OwnedReadHalf, remote: Remote<gateway::GatewayProc>) {
let mut reader = BufReader::new(reader);
loop {
let mut line = String::new();
if let Err(error) = reader.read_line(&mut line).await {
eprintln!("{error}");
break;
}
remote
.send(gateway::GatewayCmd::Request(address.clone(), line.clone()))
.unwrap();
}
remote
.send(gateway::GatewayCmd::ClosedConnection(address))
.unwrap();
}

202
harsh-server/src/storage.rs Normal file
View file

@ -0,0 +1,202 @@
use sled::Db;
use telecomande::Processor;
use tokio::sync::oneshot::{self, Receiver, Sender};
use crate::Id;
#[derive(Debug)]
pub enum StorageCmd {
ChannelCreate(String, Sender<Id>),
ChannelDelete(Id),
ChannelGetAll(Sender<Vec<Id>>),
ChannelGetName(Id, Sender<Option<String>>),
}
impl StorageCmd {
fn new_channel_create(name: impl ToString) -> (Self, Receiver<Id>) {
let (s, r) = oneshot::channel();
(Self::ChannelCreate(name.to_string(), s), r)
}
fn new_channel_delete(id: Id) -> Self {
Self::ChannelDelete(id)
}
fn new_channel_get_all() -> (Self, Receiver<Vec<Id>>) {
let (s, r) = oneshot::channel();
(Self::ChannelGetAll(s), r)
}
fn new_channel_get_name(id: Id) -> (Self, Receiver<Option<String>>) {
let (s, r) = oneshot::channel();
(Self::ChannelGetName(id, s), r)
}
}
pub struct StorageProc {
base: Db,
}
impl StorageProc {
pub fn new<S>(path: S) -> Self
where
S: ToString,
{
let path = path.to_string();
let base = sled::open(path).unwrap();
Self { base }
}
fn get<S, T>(&self, path: S) -> Option<T>
where
S: ToString,
T: SerDeser,
{
let path = path.to_string();
T::read(&self.base, path)
}
fn set<S, T>(&self, path: S, item: T)
where
S: ToString,
T: SerDeser,
{
let path = path.to_string();
item.write(&self.base, path)
}
fn list(&self, path: impl ToString) -> Vec<Id> {
let path = path.to_string();
list(&self.base, path).collect() // TODO: turn into iterator with limits
}
// firsts (x)
// lasts (x)
// from (id, x)
// to (id, x)
fn remove(&self, path: impl ToString) {
let path = path.to_string();
remove(&self.base, path)
}
}
#[telecomande::async_trait]
impl Processor for StorageProc {
type Command = StorageCmd;
type Error = ();
async fn handle(&mut self, command: Self::Command) -> Result<(), Self::Error> {
match command {
// channels
StorageCmd::ChannelDelete(id) => self.remove(format!("/channels/{id}")),
StorageCmd::ChannelCreate(name, sender) => {
let item = Channel::new(name);
let id = item.get_id();
self.set(format!("/channels/{id}"), item);
sender.send(id).unwrap();
}
StorageCmd::ChannelGetAll(sender) => {
let results = self.list("/channels/");
sender.send(results).unwrap();
}
StorageCmd::ChannelGetName(id, sender) => {
let result = self
.get::<_, Channel>(format!("/channels/{id}"))
.map(|channel| channel.get_name().to_string());
sender.send(result).unwrap();
} //
// ChannelGetParent
// messages
// c
// d
// l
// gcontent
};
Ok(())
}
}
mod models;
pub use models::{Channel, Msg, SerDeser, User};
fn list(db: &Db, path: impl ToString) -> impl Iterator<Item = Id> {
let path = path.to_string();
let len = path.len();
db.scan_prefix(path)
.filter_map(move |result| -> Option<Id> {
let (key, _) = result.ok()?;
let string = String::from_utf8(key.iter().cloned().collect()).unwrap();
let suffix = &string[len..];
Id::from_string(suffix)
})
}
fn remove(db: &Db, path: impl ToString) {
let path = path.to_string();
db.remove(path).unwrap();
}
#[test]
fn test_list() {
let db = sled::open("/tmp/test-db").unwrap();
db.insert("/some/path/123", b"hello1").unwrap();
db.insert("/some/path/1234", b"hello2").unwrap();
db.insert("/some/path/12345", b"hello3").unwrap();
let results = list(&db, "/some/path/".to_string());
let vec = results.collect::<Vec<_>>();
assert_eq!(
vec,
vec![
Id::from_string("123").unwrap(),
Id::from_string("1234").unwrap(),
Id::from_string("12345").unwrap()
]
);
}
#[tokio::test]
async fn test_channels() {
use telecomande::{Executor, SimpleExecutor};
// cleaning;
std::fs::remove_dir_all("/tmp/db-test").ok();
// instantiation
let store = SimpleExecutor::new(StorageProc::new("/tmp/db-test")).spawn();
let remote = store.remote();
// insertion
let (cmd, rec) = StorageCmd::new_channel_create("a-channel");
remote.send(cmd).unwrap();
let id = rec.await.unwrap();
// query all
let (cmd, rec) = StorageCmd::new_channel_get_all();
remote.send(cmd).unwrap();
let result = rec.await.unwrap();
assert_eq!(result.len(), 1);
let first = result[0];
assert_eq!(first, id);
// query property
let (cmd, rec) = StorageCmd::new_channel_get_name(id);
remote.send(cmd).unwrap();
let result = rec.await.unwrap();
assert_eq!(result.unwrap(), "a-channel".to_string());
// insertion
let (cmd, rec) = StorageCmd::new_channel_create("b-channel");
remote.send(cmd).unwrap();
let id2 = rec.await.unwrap();
// query all
let (cmd, rec) = StorageCmd::new_channel_get_all();
remote.send(cmd).unwrap();
let result = rec.await.unwrap();
assert_eq!(result.len(), 2);
// query property
let (cmd, rec) = StorageCmd::new_channel_get_name(id2);
remote.send(cmd).unwrap();
let result = rec.await.unwrap();
assert_eq!(result.unwrap(), "b-channel".to_string());
}

View file

@ -0,0 +1,67 @@
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use sled::Db;
use crate::Id;
#[derive(Debug, Serialize, Deserialize)]
pub struct Channel {
id: Id,
name: String,
}
impl Channel {
pub fn new(name: String) -> Self {
let id = Id::from_now();
Self { id, name }
}
pub fn get_id(&self) -> Id {
self.id
}
pub fn get_name(&self) -> &str {
&self.name
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct User {
id: Id,
name: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Msg {
id: Id,
content: String,
}
pub trait SerDeser: Serialize + DeserializeOwned {
fn ser(&self) -> Vec<u8>;
fn deser(input: &[u8]) -> Option<Self>;
fn read(db: &Db, path: String) -> Option<Self>;
fn write(&self, db: &Db, path: String);
}
impl<T> SerDeser for T
where
T: Serialize + DeserializeOwned,
{
fn ser(&self) -> Vec<u8> {
serde_json::to_vec(self).unwrap()
}
fn deser(input: &[u8]) -> Option<Self> {
serde_json::from_slice(input).ok()
}
fn read(db: &Db, path: String) -> Option<Self> {
let bytes = db.get(path).unwrap()?;
Self::deser(&bytes)
}
fn write(&self, db: &Db, path: String) {
let bytes = self.ser();
db.insert(path, bytes).unwrap();
}
}

56
harsh-server/src/utils.rs Normal file
View file

@ -0,0 +1,56 @@
use std::{fmt::Display, net::SocketAddr};
use rand::random;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Addr(String);
impl Addr {
pub fn new(address: SocketAddr) -> Self {
let string = format!("{address:?}");
Self(string)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Id(u64);
impl Id {
pub fn from_now() -> Self {
let ms = chrono::Utc::now().timestamp_millis() as u64;
let total = (ms * 1000) + rand_range(1000);
Self(total)
}
pub fn from_string(input: &str) -> Option<Self> {
let inner: u64 = input.parse().ok()?;
Some(Self(inner))
}
}
#[test]
fn test_string_convertion() {
let id = Id::from_now();
let str = id.to_string();
assert_eq!(id, Id::from_string(&str).unwrap());
}
fn rand_range(n: u64) -> u64 {
let random: u64 = random();
random % n
}
impl Display for Id {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let inner = self.0;
let padded = format!("{inner:0>20}"); // pads to the left to make 20 chars of length
f.write_str(&padded)
}
}
#[test]
fn length_of_max() {
assert_eq!(u64::MAX, 18446744073709551615_u64);
assert_eq!(20, "18446744073709551615".len())
}