diff --git a/Cargo.lock b/Cargo.lock index b30d3d9..21d658a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -201,9 +201,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.89" +version = "1.0.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6" +checksum = "37bf3594c4c988a53154954629820791dde498571819ae4ca50ca811e060cc95" [[package]] name = "anymap2" @@ -374,6 +374,7 @@ version = "0.1.0" dependencies = [ "anyhow", "argon2", + "avam-protocol", "axum", "axum-extra", "axum-macros", @@ -413,6 +414,7 @@ name = "avam-client" version = "0.1.0" dependencies = [ "anyhow", + "avam-protocol", "base64 0.22.1", "clap", "config", @@ -444,6 +446,17 @@ dependencies = [ "winreg 0.52.0", ] +[[package]] +name = "avam-protocol" +version = "0.1.0" +dependencies = [ + "anyhow", + "bincode", + "flate2", + "serde", + "thiserror", +] + [[package]] name = "avam-wasm" version = "0.1.0" @@ -1665,6 +1678,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1b589b4dc103969ad3cf85c950899926ec64300a1a46d76c03a6072957036f0" dependencies = [ "crc32fast", + "libz-rs-sys", "miniz_oxide 0.8.0", ] @@ -3391,6 +3405,15 @@ dependencies = [ "x11", ] +[[package]] +name = "libz-rs-sys" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "009b9249eef9fd7f6bbc96969f38de54a10f6be687f6d0a2ed98c4e4dcdc566f" +dependencies = [ + "zlib-rs", +] + [[package]] name = "linear-map" version = "1.2.0" @@ -7470,6 +7493,12 @@ dependencies = [ "syn 2.0.79", ] +[[package]] +name = "zlib-rs" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b947c9af34afbf71a8ad64bedb8f3c26b562b1dad562218b265edd6f095731a" + [[package]] name = "zune-core" version = "0.4.12" diff --git a/Cargo.toml b/Cargo.toml index d11a26a..056e3ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = [".", "avam-client", "avam-wasm"] +members = [".", "avam-client", "avam-protocol", "avam-wasm"] resolver = "2" [package] @@ -28,6 +28,8 @@ panic = "abort" hydrate = ["leptos/hydrate", "leptos_meta/hydrate", "leptos_router/hydrate"] ssr = [ + "dep:avam-protocol", + "dep:argon2", "dep:axum", @@ -60,6 +62,7 @@ ssr = [ ] [dependencies] +avam-protocol = { path = "./avam-protocol", optional = true } # Utilities anyhow = { version = "1.0.89", optional = false } argon2 = { version = "0.5.3", optional = true } diff --git a/avam-client/Cargo.toml b/avam-client/Cargo.toml index fd29340..4221801 100644 --- a/avam-client/Cargo.toml +++ b/avam-client/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] +avam-protocol = { path = "../avam-protocol" } anyhow = { version = "1.0" } base64 = { version = "0.22.1", default-features = false } clap = { version = "4.5.20", features = ["derive"] } diff --git a/avam-client/src/client.rs b/avam-client/src/client.rs index 0bbc278..7551a8f 100644 --- a/avam-client/src/client.rs +++ b/avam-client/src/client.rs @@ -1,90 +1,180 @@ -use std::{borrow::Cow, time::Duration}; +use std::time::Duration; +use avam_protocol::{Packet, Packets, SimConnectPacket, SystemPacket}; use futures_util::{SinkExt, StreamExt}; use reqwest::StatusCode; use tokio::{ - sync::broadcast::{Receiver, Sender}, - time::sleep, + sync::broadcast::{channel, Receiver, Sender}, + task::JoinSet, + time::{sleep, timeout}, }; use tokio_tungstenite::{ connect_async, tungstenite::{ self, protocol::{frame::coding::CloseCode, CloseFrame}, - ClientRequestBuilder, + ClientRequestBuilder, Message, }, }; -use crate::{state_machine::Event, BASE_URL}; +use crate::{config::Config, state_machine::Event, BASE_URL}; pub async fn start( + config: Config, + simconnect_sender: Sender, + socket_receiver: Receiver, event_sender: Sender, mut event_receiver: Receiver, ) -> Result<(), anyhow::Error> { - let mut writer = None; - let uri: tungstenite::http::Uri = format!("{}/ws", BASE_URL.replace("https", "wss")).parse()?; + let mut futures = JoinSet::new(); + + let (ia_sender, ia_receiver) = channel(10); loop { if let Ok(event) = &event_receiver.try_recv() { match event { - Event::TokenReceived { token } => { + Event::TokenReceived => { + let Some(token) = config.token() else { + let _ = event_sender.send(Event::Logout); + sleep(Duration::from_millis(100)).await; + continue; + }; + let builder = ClientRequestBuilder::new(uri.clone()) .with_header("Authorization", format!("Bearer {}", token)); - let (socket, response) = connect_async(builder).await?; + tracing::info!("Connecting"); + let Ok(Ok((socket, response))) = + timeout(Duration::from_secs(10), connect_async(builder)).await + else { + tracing::error!("Unable to connect"); + let _ = event_sender.send(Event::Disconnected); + sleep(Duration::from_millis(100)).await; + continue; + }; if response.status() != StatusCode::SWITCHING_PROTOCOLS { - tracing::error!("{:#?}", response); + tracing::error!("Unable to connect: {:#?}", response); + let _ = event_sender.send(Event::Disconnected); + sleep(Duration::from_millis(100)).await; continue; } - let (write, mut read) = socket.split(); - writer = Some(write); + let (mut write, mut read) = socket.split(); - tokio::spawn(async move { - let message = match read.next().await { - Some(data) => match data { - Ok(message) => message, - Err(e) => { - tracing::error!("{:?}", e); - return; + let mut ia_receiver: Receiver = ia_receiver.resubscribe(); + futures.spawn(async move { + loop { + if let Ok(d) = ia_receiver.try_recv() { + let message = match d { + Packets::System(SystemPacket::Close { reason }) => { + Message::Close(Some(CloseFrame { + code: CloseCode::Normal, + reason: reason.into(), + })) + } + Packets::System(SystemPacket::Pong) => { + let Ok(encoded_message) = d.encode() else { + tracing::error!( + "Unable to encode message for sending: {:#?}", + d + ); + sleep(Duration::from_millis(100)).await; + continue; + }; + Message::Binary(encoded_message) + } + d => { + tracing::info!("sending packet: {:?}", &d); + let Ok(encoded_message) = d.encode() else { + tracing::error!( + "Unable to encode message for sending: {:#?}", + d + ); + sleep(Duration::from_millis(100)).await; + continue; + }; + Message::Binary(encoded_message) + } + }; + + match write.send(message).await { + Err(tungstenite::Error::AlreadyClosed) => break, + Err(e) => { + tracing::error!("Error writing to socket: {:?}", e); + break; + } + Ok(()) => {} + }; + } + sleep(Duration::from_millis(100)).await; + } + }); + + let ias = ia_sender.clone(); + let es = event_sender.clone(); + let scs = simconnect_sender.clone(); + futures.spawn(async move { + loop { + let message = match read.next().await { + Some(data) => match data { + Ok(message) => message, + Err(e) => { + tracing::error!("{:?}", e); + let _ = es.send(Event::Disconnected); + sleep(Duration::from_millis(100)).await; + continue; + } + }, + None => break, + }; + + if let Ok(data) = Packets::decode(&message.into_data()) { + if data == Packets::System(SystemPacket::Ping) { + let _ = ias.send(Packets::System(SystemPacket::Pong)); + continue; } - }, - None => return, - }; - let data = message.to_text(); - tracing::debug!("{:?}", data); + // From Socket -> SimConnect + if let Packets::SimConnect(sim_connect_packet) = data { + tracing::info!("packet received: {:?}", &sim_connect_packet); + let _ = scs.send(sim_connect_packet); + } + } + sleep(Duration::from_millis(100)).await; + } + }); + + // Data from simconnect -> Socket + let mut socket_receiver = socket_receiver.resubscribe(); + let ias = ia_sender.clone(); + futures.spawn(async move { + loop { + if let Ok(message) = socket_receiver.try_recv() { + let _ = ias.send(message); + } + sleep(Duration::from_millis(100)).await; + } }); tracing::info!("Connected"); let _ = event_sender.send(Event::Connected); } Event::Logout => { - if let Some(mut write) = writer { - write - .send(tungstenite::Message::Close(Some(CloseFrame { - code: CloseCode::Normal, - reason: Cow::from("User Logout"), - }))) - .await?; - writer = None; - tracing::debug!("Disconnected"); - event_sender.send(Event::Disconnected)?; - } + let _ = ia_sender.send(Packets::System(SystemPacket::Close { + reason: "User Logout".to_string(), + })); + sleep(Duration::from_millis(200)).await; + futures.abort_all(); } Event::Quit => { tracing::info!("Shutting down Client"); - if let Some(mut write) = writer { - write - .send(tungstenite::Message::Close(Some(CloseFrame { - code: CloseCode::Normal, - reason: Cow::from("Application Shutdown"), - }))) - .await?; - tracing::debug!("Disconnected"); - } + let _ = ia_sender.send(Packets::System(SystemPacket::Close { + reason: "Quit".to_string(), + })); + sleep(Duration::from_millis(200)).await; + futures.abort_all(); break; } _ => {} diff --git a/avam-client/src/main.rs b/avam-client/src/main.rs index d869fc0..a63f25b 100644 --- a/avam-client/src/main.rs +++ b/avam-client/src/main.rs @@ -18,7 +18,7 @@ use lock::Lock; use oauth::{start_code_listener, start_code_to_token}; use pipe::Pipe; use state_machine::Event; -use tokio::task::JoinSet; +use tokio::{sync::broadcast::channel, task::JoinSet}; use tracing::Level; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, Layer}; @@ -44,8 +44,7 @@ async fn main() -> Result<(), anyhow::Error> { init_logging()?; - // let (socket_sender, socket_receiver) = tokio::sync::broadcast::channel(1); - let (event_sender, event_receiver) = tokio::sync::broadcast::channel(10); + let (event_sender, event_receiver) = channel(10); let args = Arguments::parse(); if handle_single_instance(&args).await? { @@ -93,14 +92,21 @@ async fn main() -> Result<(), anyhow::Error> { let receiver = event_receiver.resubscribe(); futures.spawn(start_code_to_token(c, pipe_receiver, sender, receiver)); + // Prepare channels for socket <-> simconnect + let (simconnect_sender, __simconnect_receiver) = channel(10); + let (__socket_sender, socket_receiver) = channel(10); + // Start the websocket client - // The socket client will just sit there until TokenReceivedEvent comes in to authenticate with the socket server - // The server needs to not accept any messages until the authentication is verified + let c = config.clone(); let sender = event_sender.clone(); let receiver = event_receiver.resubscribe(); - futures.spawn(client::start(sender, receiver)); - - // We need 2 way channels (2 channels, both with tx/rx) to send data from the socket to simconnect and back + futures.spawn(client::start( + c, + simconnect_sender, + socket_receiver, + sender, + receiver, + )); // Start the simconnect listener // The simconnect sends data to the webscoket diff --git a/avam-client/src/oauth.rs b/avam-client/src/oauth.rs index e1ec849..57e5913 100644 --- a/avam-client/src/oauth.rs +++ b/avam-client/src/oauth.rs @@ -106,7 +106,7 @@ pub async fn start_code_to_token( config.set_token(Some(token.clone()))?; - event_sender.send(Event::TokenReceived { token })?; + event_sender.send(Event::TokenReceived)?; } } } diff --git a/avam-client/src/simconnect.rs b/avam-client/src/simconnect.rs index 4515a03..ef868b0 100644 --- a/avam-client/src/simconnect.rs +++ b/avam-client/src/simconnect.rs @@ -1,8 +1,8 @@ -pub struct Client { +pub struct SimConnect { // whatever we need } -impl Client { +impl SimConnect { pub fn new() -> Self { Self { // websocket receiver @@ -25,5 +25,5 @@ impl Client { } pub async fn start() -> Result<(), anyhow::Error> { - Client::new().run().await? + SimConnect::new().run().await? } diff --git a/avam-client/src/state_machine.rs b/avam-client/src/state_machine.rs index 493769d..0e3eeca 100644 --- a/avam-client/src/state_machine.rs +++ b/avam-client/src/state_machine.rs @@ -5,18 +5,15 @@ use tokio::{ time::sleep, }; -use crate::{ - config::Config, - models::{CodeChallengeMethod, CodeVerifier}, - oauth, -}; +use crate::{config::Config, oauth}; #[derive(Debug, Clone, PartialEq)] pub enum State { Init, AppStart, + Shutdown, Authenticate, - Connect { token: String }, + Connect, WaitForSim, InSim, } @@ -24,8 +21,8 @@ pub enum State { #[derive(Debug, Clone, PartialEq)] pub enum Event { Ready, - StartAuthenticate, // should not be string - TokenReceived { token: String }, // AppStart and Authenticate can fire off TokenReceived to transition into Connect + StartAuthenticate, // should not be string + TokenReceived, // AppStart and Authenticate can fire off TokenReceived to transition into Connect Connected, // Once connected to the socket, and properly authenticated, fire off Connected to transition to WaitForSim Disconnected, // If for whatever reason we're disconnected from the backend, we need to transition back to Connect SimConnected, // SimConnect is connected, we're in the world and ready to send data, transition to Running @@ -42,16 +39,20 @@ impl State { (_, Event::Logout) => State::AppStart, (_, Event::StartAuthenticate) => Self::Authenticate, // Goto Authenticate - (_, Event::TokenReceived { token }) => State::Connect { token }, + (_, Event::TokenReceived) => State::Connect, (_, Event::Connected) => State::WaitForSim, // Goto WaitForSim - (_, Event::SimConnected) => todo!(), // Goto InSim + (_, Event::Disconnected) => { + sleep(Duration::from_secs(5)).await; // wait 5 seconds before reconnecting + tracing::info!("Attempting reconnect"); + State::AppStart // Goto Connect + } - (_, Event::Disconnected) => State::AppStart, // Goto Connect + (_, Event::SimConnected) => State::InSim, // Goto InSim (_, Event::SimDisconnected) => State::WaitForSim, // Goto WaitForSim - (_, Event::Quit) => todo!(), // All events can go into quit, to shutdown the application + (_, Event::Quit) => State::Shutdown, // All events can go into quit, to shutdown the application } } @@ -59,10 +60,8 @@ impl State { match self { State::Init => Ok(()), State::AppStart => { - if let Some(token) = config.token() { - signal.send(Event::TokenReceived { - token: token.to_string(), - })?; + if config.token().is_some() { + signal.send(Event::TokenReceived)?; } else { signal.send(Event::StartAuthenticate)?; } @@ -75,12 +74,10 @@ impl State { Ok(()) } - State::Connect { .. } => Ok(()), - State::WaitForSim => { - tracing::info!("Waiting for sim!"); - Ok(()) - } + State::Connect => Ok(()), + State::WaitForSim => Ok(()), State::InSim => Ok(()), + State::Shutdown => Ok(()), } } } diff --git a/avam-protocol/Cargo.toml b/avam-protocol/Cargo.toml new file mode 100644 index 0000000..ff6155d --- /dev/null +++ b/avam-protocol/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "avam-protocol" +version = "0.1.0" +edition = "2021" + +[dependencies] +anyhow = "1.0.90" +bincode = "1.3.3" +flate2 = { version = "1.0.34", features = ["zlib-rs"] } +serde = { version = "1.0.210", features = ["derive"] } +thiserror = "1.0.64" diff --git a/avam-protocol/src/lib.rs b/avam-protocol/src/lib.rs new file mode 100644 index 0000000..8d8b9a4 --- /dev/null +++ b/avam-protocol/src/lib.rs @@ -0,0 +1,59 @@ +use serde::{de, Deserialize, Serialize}; + +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] +pub enum SystemPacket { + Ping, + Pong, + Close { reason: String }, +} + +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] +pub enum SimConnectPacket { + // .. +} + +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] +pub enum Packets { + System(SystemPacket), + SimConnect(SimConnectPacket), +} + +impl Packet for Packets {} + +pub trait Packet: Serialize + de::DeserializeOwned + Sized { + fn encode(&self) -> Result, anyhow::Error> { + use flate2::write::GzEncoder; + use flate2::Compression; + use std::io::Write; + + let encoded = bincode::serialize(&self)?; + let mut compressor = GzEncoder::new(Vec::new(), Compression::best()); + compressor.write_all(&encoded)?; + + Ok(compressor.finish()?) + } + + fn decode(data: &[u8]) -> Result { + use flate2::read::GzDecoder; + use std::io::Read; + let mut decompressor = GzDecoder::new(data); + let mut buffer = Vec::new(); + decompressor.read_to_end(&mut buffer)?; + + Ok(bincode::deserialize(&buffer)?) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn it_works() { + let msg = Packets::System(SystemPacket::Ping); + + let decoded = Packet::decode(&msg.encode().unwrap()).unwrap(); + + assert_eq!(msg, decoded); + } +} diff --git a/src/lib/inbound/http/handlers/websocket.rs b/src/lib/inbound/http/handlers/websocket.rs index 155ad33..f4408e7 100644 --- a/src/lib/inbound/http/handlers/websocket.rs +++ b/src/lib/inbound/http/handlers/websocket.rs @@ -1,8 +1,9 @@ -use std::{borrow::Cow, net::SocketAddr, ops::ControlFlow}; +use std::{net::SocketAddr, ops::ControlFlow, sync::Arc, time::Duration}; +use avam_protocol::{Packet, Packets, SimConnectPacket, SystemPacket}; use axum::{ extract::{ - ws::{CloseFrame, Message, WebSocket}, + ws::{Message, WebSocket}, ConnectInfo, State, WebSocketUpgrade, }, response::IntoResponse, @@ -14,6 +15,10 @@ use axum_extra::{ use futures::{SinkExt, StreamExt}; use http::StatusCode; use jsonwebtoken::{decode, Algorithm, DecodingKey, Validation}; +use tokio::{ + sync::mpsc, + time::{sleep, Instant}, +}; use crate::{ domain::api::{ @@ -70,156 +75,151 @@ pub async fn ws_handler( return Err(StatusCode::UNAUTHORIZED); }; - Ok(ws.on_upgrade(move |socket| handle_socket(socket, user, addr))) + if app_state.has_connection(&user).await { + return Err(StatusCode::CONFLICT); + } + + Ok(ws.on_upgrade(move |socket| handle_socket(app_state.clone(), socket, user, addr))) } /// Actual websocket statemachine (one will be spawned per connection) -async fn handle_socket(mut socket: WebSocket, user: User, who: SocketAddr) { - // send a ping (unsupported by some browsers) just to kick things off and get a response - if socket - .send(Message::Text(format!("Hello {}!", user.email()))) - .await - .is_ok() - { - tracing::debug!("Pinged {who}..."); - } else { - tracing::debug!("Could not send ping {who}!"); - // no Error here since the only thing we can do is to close the connection. - // If we can not send messages, there is no way to salvage the statemachine anyway. - return; - } +async fn handle_socket( + app_state: AppState, + socket: WebSocket, + user: User, + who: SocketAddr, +) { + // AppState needs to store user to channel handles or something + // It'd know who's connected at all times and be able to send messages - // receive single message from a client (we can either receive or send with socket). - // this will likely be the Pong for our Ping or a hello message from client. - // waiting for message from a client will block this task, but will not block other client's - // connections. - if let Some(msg) = socket.recv().await { - if let Ok(msg) = msg { - if process_message(msg, who).is_break() { - return; - } - } else { - tracing::debug!("client {who} abruptly disconnected"); - return; - } - } + let (sender, mut receiver) = mpsc::channel(10); + app_state.add_connection(&user, sender).await; - // Since each client gets individual statemachine, we can pause handling - // when necessary to wait for some external event (in this case illustrated by sleeping). - // Waiting for this client to finish getting its greetings does not prevent other clients from - // connecting to server and receiving their greetings. - for i in 1..5 { - if socket - .send(Message::Text(format!("Hi {i} times!"))) - .await - .is_err() - { - tracing::debug!("client {who} abruptly disconnected"); - return; - } - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - } + // This can probably be hella-abstracted away and be made a lot cleaner + let (mut writer, mut reader) = socket.split(); + // This probably needs a (mpsc?) channel, then a dedicated thread to send ping on that channel + let mut ping_timer = Instant::now(); + ping_timer.checked_add(Duration::from_secs(15)); // add 15 seconds so we instantly trigger a ping + let u = user.clone(); + let writer_handle = tokio::spawn(async move { + loop { + let msg = match receiver.try_recv() { + Ok(msg) => Some(msg), + Err(mpsc::error::TryRecvError::Disconnected) => break, + _ => None, + }; - // By splitting socket we can send and receive at the same time. In this example we will send - // unsolicited messages to client based on some sort of server's internal event (i.e .timer). - let (mut sender, mut receiver) = socket.split(); - - // Spawn a task that will push several messages to the client (does not matter what client does) - let mut send_task = tokio::spawn(async move { - let n_msg = 20; - for i in 0..n_msg { - // In case of any websocket error, we exit. - if sender - .send(Message::Text(format!("Server message {i} ..."))) - .await - .is_err() - { - return i; + if let Some(msg) = msg { + let Ok(encoded_message) = msg.encode() else { + tracing::error!("Unable to encode message for sending: {:#?}", msg); + sleep(Duration::from_millis(100)).await; + continue; + }; + tracing::info!("> [{}]: {:?}", &u.email(), msg); + let _ = writer.send(Message::Binary(encoded_message)).await; } - tokio::time::sleep(std::time::Duration::from_millis(300)).await; - } + if ping_timer.elapsed() >= Duration::from_secs(15) { + tracing::trace!( + "> [{}]: {:?}", + &u.email(), + Packets::System(SystemPacket::Ping) + ); + let _ = writer + .send(Message::Binary( + Packets::System(SystemPacket::Ping).encode().unwrap(), + )) + .await; + ping_timer = Instant::now(); + } - tracing::debug!("Sending close to {who}..."); - if let Err(e) = sender - .send(Message::Close(Some(CloseFrame { - code: axum::extract::ws::close_code::NORMAL, - reason: Cow::from("Goodbye"), - }))) - .await - { - tracing::debug!("Could not send Close due to {e}, probably it is ok?"); + sleep(Duration::from_millis(100)).await; } - n_msg }); - // This second task will receive messages from client and print them on server console - let mut recv_task = tokio::spawn(async move { - let mut cnt = 0; - while let Some(Ok(msg)) = receiver.next().await { - cnt += 1; - // print message and break if instructed to do so - if process_message(msg, who).is_break() { - break; + let u = user.clone(); + let a_s = app_state.clone(); + let reader_handle = tokio::spawn(async move { + loop { + if let Some(Ok(data)) = reader.next().await { + let Ok(packet) = Packets::decode(&data.clone().into_data()) else { + if let Message::Close(Some(c)) = data { + tracing::info!("{} disconnected: {}", u.email(), c.reason); + break; + } + tracing::error!("Invalid packet received {:#?}", data); + sleep(Duration::from_millis(100)).await; + continue; + }; + + match packet { + Packets::System(system_packet) => { + process_system_packet(a_s.clone(), &u, system_packet).await; + } + Packets::SimConnect(sim_connect_packet) => { + process_message(a_s.api_service().clone(), &u, sim_connect_packet).await; + } + } } + sleep(Duration::from_millis(100)).await; } - cnt }); - // If any one of the tasks exit, abort the other. tokio::select! { - rv_a = (&mut send_task) => { - match rv_a { - Ok(a) => tracing::debug!("{a} messages sent to {who}"), - Err(a) => tracing::debug!("Error sending messages {a:?}") - } - recv_task.abort(); - }, - rv_b = (&mut recv_task) => { - match rv_b { - Ok(b) => tracing::debug!("Received {b} messages"), - Err(b) => tracing::debug!("Error receiving messages {b:?}") - } - send_task.abort(); + _ = reader_handle => { + tracing::debug!("reader closed"); } - } + _ = writer_handle => { + tracing::debug!("writer closed"); + } + }; + + // remove the user/channel from AppState + app_state.remove_connection(&user).await; // returning from the handler closes the websocket connection tracing::debug!("Websocket context {who} destroyed"); } +async fn process_system_packet( + app_state: AppState, + user: &User, + packet: SystemPacket, +) -> ControlFlow<(), ()> +where + S: ApiService, +{ + tracing::trace!("< [{}]: {:?}", user.email(), packet); + match packet { + SystemPacket::Ping => { + // send back pong + let _ = app_state + .send(user, Packets::System(SystemPacket::Pong)) + .await; + } + SystemPacket::Pong => { + // noop + } + SystemPacket::Close { reason } => { + tracing::debug!("{} disconnect: {}", user.email(), reason); + return ControlFlow::Break(()); // whatever this means + } + }; + + ControlFlow::Continue(()) +} + +async fn process_message( + api_service: Arc, + user: &User, + packet: SimConnectPacket, +) -> ControlFlow<(), ()> +where + S: ApiService, +{ + tracing::info!("< [{}]: {:?}", user.email(), packet); + + // On incoming packets, we use the internal api to store stuff to the database + // We'll use Server Side Events (SSE) to keep the dashboard up to date with the state of the database -/// helper to print contents of messages to stdout. Has special treatment for Close. -fn process_message(msg: Message, who: SocketAddr) -> ControlFlow<(), ()> { - match msg { - Message::Text(t) => { - tracing::debug!(">>> {who} sent str: {t:?}"); - } - Message::Binary(d) => { - tracing::debug!(">>> {} sent {} bytes: {:?}", who, d.len(), d); - } - Message::Close(c) => { - if let Some(cf) = c { - tracing::debug!( - ">>> {} sent close with code {} and reason `{}`", - who, - cf.code, - cf.reason - ); - } else { - tracing::debug!(">>> {who} somehow sent close message without CloseFrame"); - } - return ControlFlow::Break(()); - } - - Message::Pong(v) => { - tracing::debug!(">>> {who} sent pong with {v:?}"); - } - // You should never need to manually handle Message::Ping, as axum's websocket library - // will do so for you automagically by replying with Pong and copying the v according to - // spec. But if you need the contents of the pings you can see them here. - Message::Ping(v) => { - tracing::debug!(">>> {who} sent ping with {v:?}"); - } - } ControlFlow::Continue(()) } diff --git a/src/lib/inbound/http/state.rs b/src/lib/inbound/http/state.rs index 6eeee95..41c31d3 100644 --- a/src/lib/inbound/http/state.rs +++ b/src/lib/inbound/http/state.rs @@ -1,11 +1,16 @@ -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; +use avam_protocol::Packets; use axum::extract::FromRef; use leptos::get_configuration; +use tokio::sync::{mpsc::Sender, RwLock}; -use crate::{config::Config, domain::api::ports::ApiService}; +use crate::{ + config::Config, + domain::api::{ports::ApiService, prelude::User}, +}; -#[derive(Debug, Clone)] +#[derive(Clone)] /// The global application state shared between all request handlers. pub struct AppState where @@ -14,6 +19,7 @@ where pub leptos_options: leptos::LeptosOptions, config: Arc, api_service: Arc, + connections: Arc>>>, } impl AppState @@ -25,6 +31,7 @@ where config: Arc::new(config), leptos_options: get_configuration(None).await.unwrap().leptos_options, api_service: Arc::new(api_service), + connections: Arc::new(RwLock::new(HashMap::new())), } } @@ -37,6 +44,30 @@ where } } +impl AppState +where + S: ApiService, +{ + pub async fn add_connection(&self, user: &User, sender: Sender) { + self.connections.write().await.insert(user.id(), sender); + } + + pub async fn has_connection(&self, user: &User) -> bool { + self.connections.read().await.contains_key(&user.id()) + } + + pub async fn remove_connection(&self, user: &User) { + self.connections.write().await.remove(&user.id()); + } + + pub async fn send(&self, user: &User, packet: Packets) -> Result<(), anyhow::Error> { + if let Some(sender) = self.connections.read().await.get(&user.id()) { + sender.send(packet).await?; + } + Ok(()) + } +} + impl FromRef> for leptos::LeptosOptions where S: ApiService, diff --git a/style/main.scss b/style/main.scss index 77b4412..40fa8de 100644 --- a/style/main.scss +++ b/style/main.scss @@ -1444,10 +1444,6 @@ html { margin-top: auto; } -.block { - display: block; -} - .flex { display: flex; }