websocket protocol

This commit is contained in:
2024-10-19 16:00:31 +02:00
parent 5e651b382d
commit b94b3cf44f
13 changed files with 440 additions and 217 deletions

33
Cargo.lock generated
View File

@@ -201,9 +201,9 @@ dependencies = [
[[package]] [[package]]
name = "anyhow" name = "anyhow"
version = "1.0.89" version = "1.0.90"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6" checksum = "37bf3594c4c988a53154954629820791dde498571819ae4ca50ca811e060cc95"
[[package]] [[package]]
name = "anymap2" name = "anymap2"
@@ -374,6 +374,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"argon2", "argon2",
"avam-protocol",
"axum", "axum",
"axum-extra", "axum-extra",
"axum-macros", "axum-macros",
@@ -413,6 +414,7 @@ name = "avam-client"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"avam-protocol",
"base64 0.22.1", "base64 0.22.1",
"clap", "clap",
"config", "config",
@@ -444,6 +446,17 @@ dependencies = [
"winreg 0.52.0", "winreg 0.52.0",
] ]
[[package]]
name = "avam-protocol"
version = "0.1.0"
dependencies = [
"anyhow",
"bincode",
"flate2",
"serde",
"thiserror",
]
[[package]] [[package]]
name = "avam-wasm" name = "avam-wasm"
version = "0.1.0" version = "0.1.0"
@@ -1665,6 +1678,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1b589b4dc103969ad3cf85c950899926ec64300a1a46d76c03a6072957036f0" checksum = "a1b589b4dc103969ad3cf85c950899926ec64300a1a46d76c03a6072957036f0"
dependencies = [ dependencies = [
"crc32fast", "crc32fast",
"libz-rs-sys",
"miniz_oxide 0.8.0", "miniz_oxide 0.8.0",
] ]
@@ -3391,6 +3405,15 @@ dependencies = [
"x11", "x11",
] ]
[[package]]
name = "libz-rs-sys"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "009b9249eef9fd7f6bbc96969f38de54a10f6be687f6d0a2ed98c4e4dcdc566f"
dependencies = [
"zlib-rs",
]
[[package]] [[package]]
name = "linear-map" name = "linear-map"
version = "1.2.0" version = "1.2.0"
@@ -7470,6 +7493,12 @@ dependencies = [
"syn 2.0.79", "syn 2.0.79",
] ]
[[package]]
name = "zlib-rs"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b947c9af34afbf71a8ad64bedb8f3c26b562b1dad562218b265edd6f095731a"
[[package]] [[package]]
name = "zune-core" name = "zune-core"
version = "0.4.12" version = "0.4.12"

View File

@@ -1,5 +1,5 @@
[workspace] [workspace]
members = [".", "avam-client", "avam-wasm"] members = [".", "avam-client", "avam-protocol", "avam-wasm"]
resolver = "2" resolver = "2"
[package] [package]
@@ -28,6 +28,8 @@ panic = "abort"
hydrate = ["leptos/hydrate", "leptos_meta/hydrate", "leptos_router/hydrate"] hydrate = ["leptos/hydrate", "leptos_meta/hydrate", "leptos_router/hydrate"]
ssr = [ ssr = [
"dep:avam-protocol",
"dep:argon2", "dep:argon2",
"dep:axum", "dep:axum",
@@ -60,6 +62,7 @@ ssr = [
] ]
[dependencies] [dependencies]
avam-protocol = { path = "./avam-protocol", optional = true }
# Utilities # Utilities
anyhow = { version = "1.0.89", optional = false } anyhow = { version = "1.0.89", optional = false }
argon2 = { version = "0.5.3", optional = true } argon2 = { version = "0.5.3", optional = true }

View File

@@ -4,6 +4,7 @@ version = "0.1.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
avam-protocol = { path = "../avam-protocol" }
anyhow = { version = "1.0" } anyhow = { version = "1.0" }
base64 = { version = "0.22.1", default-features = false } base64 = { version = "0.22.1", default-features = false }
clap = { version = "4.5.20", features = ["derive"] } clap = { version = "4.5.20", features = ["derive"] }

View File

@@ -1,90 +1,180 @@
use std::{borrow::Cow, time::Duration}; use std::time::Duration;
use avam_protocol::{Packet, Packets, SimConnectPacket, SystemPacket};
use futures_util::{SinkExt, StreamExt}; use futures_util::{SinkExt, StreamExt};
use reqwest::StatusCode; use reqwest::StatusCode;
use tokio::{ use tokio::{
sync::broadcast::{Receiver, Sender}, sync::broadcast::{channel, Receiver, Sender},
time::sleep, task::JoinSet,
time::{sleep, timeout},
}; };
use tokio_tungstenite::{ use tokio_tungstenite::{
connect_async, connect_async,
tungstenite::{ tungstenite::{
self, self,
protocol::{frame::coding::CloseCode, CloseFrame}, protocol::{frame::coding::CloseCode, CloseFrame},
ClientRequestBuilder, ClientRequestBuilder, Message,
}, },
}; };
use crate::{state_machine::Event, BASE_URL}; use crate::{config::Config, state_machine::Event, BASE_URL};
pub async fn start( pub async fn start(
config: Config,
simconnect_sender: Sender<SimConnectPacket>,
socket_receiver: Receiver<Packets>,
event_sender: Sender<Event>, event_sender: Sender<Event>,
mut event_receiver: Receiver<Event>, mut event_receiver: Receiver<Event>,
) -> Result<(), anyhow::Error> { ) -> Result<(), anyhow::Error> {
let mut writer = None;
let uri: tungstenite::http::Uri = format!("{}/ws", BASE_URL.replace("https", "wss")).parse()?; let uri: tungstenite::http::Uri = format!("{}/ws", BASE_URL.replace("https", "wss")).parse()?;
let mut futures = JoinSet::new();
let (ia_sender, ia_receiver) = channel(10);
loop { loop {
if let Ok(event) = &event_receiver.try_recv() { if let Ok(event) = &event_receiver.try_recv() {
match event { match event {
Event::TokenReceived { token } => { Event::TokenReceived => {
let Some(token) = config.token() else {
let _ = event_sender.send(Event::Logout);
sleep(Duration::from_millis(100)).await;
continue;
};
let builder = ClientRequestBuilder::new(uri.clone()) let builder = ClientRequestBuilder::new(uri.clone())
.with_header("Authorization", format!("Bearer {}", token)); .with_header("Authorization", format!("Bearer {}", token));
let (socket, response) = connect_async(builder).await?; tracing::info!("Connecting");
let Ok(Ok((socket, response))) =
timeout(Duration::from_secs(10), connect_async(builder)).await
else {
tracing::error!("Unable to connect");
let _ = event_sender.send(Event::Disconnected);
sleep(Duration::from_millis(100)).await;
continue;
};
if response.status() != StatusCode::SWITCHING_PROTOCOLS { if response.status() != StatusCode::SWITCHING_PROTOCOLS {
tracing::error!("{:#?}", response); tracing::error!("Unable to connect: {:#?}", response);
let _ = event_sender.send(Event::Disconnected);
sleep(Duration::from_millis(100)).await;
continue; continue;
} }
let (write, mut read) = socket.split(); let (mut write, mut read) = socket.split();
writer = Some(write);
tokio::spawn(async move { let mut ia_receiver: Receiver<Packets> = ia_receiver.resubscribe();
let message = match read.next().await { futures.spawn(async move {
Some(data) => match data { loop {
Ok(message) => message, if let Ok(d) = ia_receiver.try_recv() {
Err(e) => { let message = match d {
tracing::error!("{:?}", e); Packets::System(SystemPacket::Close { reason }) => {
return; Message::Close(Some(CloseFrame {
code: CloseCode::Normal,
reason: reason.into(),
}))
}
Packets::System(SystemPacket::Pong) => {
let Ok(encoded_message) = d.encode() else {
tracing::error!(
"Unable to encode message for sending: {:#?}",
d
);
sleep(Duration::from_millis(100)).await;
continue;
};
Message::Binary(encoded_message)
}
d => {
tracing::info!("sending packet: {:?}", &d);
let Ok(encoded_message) = d.encode() else {
tracing::error!(
"Unable to encode message for sending: {:#?}",
d
);
sleep(Duration::from_millis(100)).await;
continue;
};
Message::Binary(encoded_message)
}
};
match write.send(message).await {
Err(tungstenite::Error::AlreadyClosed) => break,
Err(e) => {
tracing::error!("Error writing to socket: {:?}", e);
break;
}
Ok(()) => {}
};
}
sleep(Duration::from_millis(100)).await;
}
});
let ias = ia_sender.clone();
let es = event_sender.clone();
let scs = simconnect_sender.clone();
futures.spawn(async move {
loop {
let message = match read.next().await {
Some(data) => match data {
Ok(message) => message,
Err(e) => {
tracing::error!("{:?}", e);
let _ = es.send(Event::Disconnected);
sleep(Duration::from_millis(100)).await;
continue;
}
},
None => break,
};
if let Ok(data) = Packets::decode(&message.into_data()) {
if data == Packets::System(SystemPacket::Ping) {
let _ = ias.send(Packets::System(SystemPacket::Pong));
continue;
} }
},
None => return,
};
let data = message.to_text(); // From Socket -> SimConnect
tracing::debug!("{:?}", data); if let Packets::SimConnect(sim_connect_packet) = data {
tracing::info!("packet received: {:?}", &sim_connect_packet);
let _ = scs.send(sim_connect_packet);
}
}
sleep(Duration::from_millis(100)).await;
}
});
// Data from simconnect -> Socket
let mut socket_receiver = socket_receiver.resubscribe();
let ias = ia_sender.clone();
futures.spawn(async move {
loop {
if let Ok(message) = socket_receiver.try_recv() {
let _ = ias.send(message);
}
sleep(Duration::from_millis(100)).await;
}
}); });
tracing::info!("Connected"); tracing::info!("Connected");
let _ = event_sender.send(Event::Connected); let _ = event_sender.send(Event::Connected);
} }
Event::Logout => { Event::Logout => {
if let Some(mut write) = writer { let _ = ia_sender.send(Packets::System(SystemPacket::Close {
write reason: "User Logout".to_string(),
.send(tungstenite::Message::Close(Some(CloseFrame { }));
code: CloseCode::Normal, sleep(Duration::from_millis(200)).await;
reason: Cow::from("User Logout"), futures.abort_all();
})))
.await?;
writer = None;
tracing::debug!("Disconnected");
event_sender.send(Event::Disconnected)?;
}
} }
Event::Quit => { Event::Quit => {
tracing::info!("Shutting down Client"); tracing::info!("Shutting down Client");
if let Some(mut write) = writer { let _ = ia_sender.send(Packets::System(SystemPacket::Close {
write reason: "Quit".to_string(),
.send(tungstenite::Message::Close(Some(CloseFrame { }));
code: CloseCode::Normal, sleep(Duration::from_millis(200)).await;
reason: Cow::from("Application Shutdown"), futures.abort_all();
})))
.await?;
tracing::debug!("Disconnected");
}
break; break;
} }
_ => {} _ => {}

View File

@@ -18,7 +18,7 @@ use lock::Lock;
use oauth::{start_code_listener, start_code_to_token}; use oauth::{start_code_listener, start_code_to_token};
use pipe::Pipe; use pipe::Pipe;
use state_machine::Event; use state_machine::Event;
use tokio::task::JoinSet; use tokio::{sync::broadcast::channel, task::JoinSet};
use tracing::Level; use tracing::Level;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, Layer}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, Layer};
@@ -44,8 +44,7 @@ async fn main() -> Result<(), anyhow::Error> {
init_logging()?; init_logging()?;
// let (socket_sender, socket_receiver) = tokio::sync::broadcast::channel(1); let (event_sender, event_receiver) = channel(10);
let (event_sender, event_receiver) = tokio::sync::broadcast::channel(10);
let args = Arguments::parse(); let args = Arguments::parse();
if handle_single_instance(&args).await? { if handle_single_instance(&args).await? {
@@ -93,14 +92,21 @@ async fn main() -> Result<(), anyhow::Error> {
let receiver = event_receiver.resubscribe(); let receiver = event_receiver.resubscribe();
futures.spawn(start_code_to_token(c, pipe_receiver, sender, receiver)); futures.spawn(start_code_to_token(c, pipe_receiver, sender, receiver));
// Prepare channels for socket <-> simconnect
let (simconnect_sender, __simconnect_receiver) = channel(10);
let (__socket_sender, socket_receiver) = channel(10);
// Start the websocket client // Start the websocket client
// The socket client will just sit there until TokenReceivedEvent comes in to authenticate with the socket server let c = config.clone();
// The server needs to not accept any messages until the authentication is verified
let sender = event_sender.clone(); let sender = event_sender.clone();
let receiver = event_receiver.resubscribe(); let receiver = event_receiver.resubscribe();
futures.spawn(client::start(sender, receiver)); futures.spawn(client::start(
c,
// We need 2 way channels (2 channels, both with tx/rx) to send data from the socket to simconnect and back simconnect_sender,
socket_receiver,
sender,
receiver,
));
// Start the simconnect listener // Start the simconnect listener
// The simconnect sends data to the webscoket // The simconnect sends data to the webscoket

View File

@@ -106,7 +106,7 @@ pub async fn start_code_to_token(
config.set_token(Some(token.clone()))?; config.set_token(Some(token.clone()))?;
event_sender.send(Event::TokenReceived { token })?; event_sender.send(Event::TokenReceived)?;
} }
} }
} }

View File

@@ -1,8 +1,8 @@
pub struct Client { pub struct SimConnect {
// whatever we need // whatever we need
} }
impl Client { impl SimConnect {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
// websocket receiver // websocket receiver
@@ -25,5 +25,5 @@ impl Client {
} }
pub async fn start() -> Result<(), anyhow::Error> { pub async fn start() -> Result<(), anyhow::Error> {
Client::new().run().await? SimConnect::new().run().await?
} }

View File

@@ -5,18 +5,15 @@ use tokio::{
time::sleep, time::sleep,
}; };
use crate::{ use crate::{config::Config, oauth};
config::Config,
models::{CodeChallengeMethod, CodeVerifier},
oauth,
};
#[derive(Debug, Clone, PartialEq)] #[derive(Debug, Clone, PartialEq)]
pub enum State { pub enum State {
Init, Init,
AppStart, AppStart,
Shutdown,
Authenticate, Authenticate,
Connect { token: String }, Connect,
WaitForSim, WaitForSim,
InSim, InSim,
} }
@@ -24,8 +21,8 @@ pub enum State {
#[derive(Debug, Clone, PartialEq)] #[derive(Debug, Clone, PartialEq)]
pub enum Event { pub enum Event {
Ready, Ready,
StartAuthenticate, // should not be string StartAuthenticate, // should not be string
TokenReceived { token: String }, // AppStart and Authenticate can fire off TokenReceived to transition into Connect TokenReceived, // AppStart and Authenticate can fire off TokenReceived to transition into Connect
Connected, // Once connected to the socket, and properly authenticated, fire off Connected to transition to WaitForSim Connected, // Once connected to the socket, and properly authenticated, fire off Connected to transition to WaitForSim
Disconnected, // If for whatever reason we're disconnected from the backend, we need to transition back to Connect Disconnected, // If for whatever reason we're disconnected from the backend, we need to transition back to Connect
SimConnected, // SimConnect is connected, we're in the world and ready to send data, transition to Running SimConnected, // SimConnect is connected, we're in the world and ready to send data, transition to Running
@@ -42,16 +39,20 @@ impl State {
(_, Event::Logout) => State::AppStart, (_, Event::Logout) => State::AppStart,
(_, Event::StartAuthenticate) => Self::Authenticate, // Goto Authenticate (_, Event::StartAuthenticate) => Self::Authenticate, // Goto Authenticate
(_, Event::TokenReceived { token }) => State::Connect { token }, (_, Event::TokenReceived) => State::Connect,
(_, Event::Connected) => State::WaitForSim, // Goto WaitForSim (_, Event::Connected) => State::WaitForSim, // Goto WaitForSim
(_, Event::SimConnected) => todo!(), // Goto InSim (_, Event::Disconnected) => {
sleep(Duration::from_secs(5)).await; // wait 5 seconds before reconnecting
tracing::info!("Attempting reconnect");
State::AppStart // Goto Connect
}
(_, Event::Disconnected) => State::AppStart, // Goto Connect (_, Event::SimConnected) => State::InSim, // Goto InSim
(_, Event::SimDisconnected) => State::WaitForSim, // Goto WaitForSim (_, Event::SimDisconnected) => State::WaitForSim, // Goto WaitForSim
(_, Event::Quit) => todo!(), // All events can go into quit, to shutdown the application (_, Event::Quit) => State::Shutdown, // All events can go into quit, to shutdown the application
} }
} }
@@ -59,10 +60,8 @@ impl State {
match self { match self {
State::Init => Ok(()), State::Init => Ok(()),
State::AppStart => { State::AppStart => {
if let Some(token) = config.token() { if config.token().is_some() {
signal.send(Event::TokenReceived { signal.send(Event::TokenReceived)?;
token: token.to_string(),
})?;
} else { } else {
signal.send(Event::StartAuthenticate)?; signal.send(Event::StartAuthenticate)?;
} }
@@ -75,12 +74,10 @@ impl State {
Ok(()) Ok(())
} }
State::Connect { .. } => Ok(()), State::Connect => Ok(()),
State::WaitForSim => { State::WaitForSim => Ok(()),
tracing::info!("Waiting for sim!");
Ok(())
}
State::InSim => Ok(()), State::InSim => Ok(()),
State::Shutdown => Ok(()),
} }
} }
} }

11
avam-protocol/Cargo.toml Normal file
View File

@@ -0,0 +1,11 @@
[package]
name = "avam-protocol"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0.90"
bincode = "1.3.3"
flate2 = { version = "1.0.34", features = ["zlib-rs"] }
serde = { version = "1.0.210", features = ["derive"] }
thiserror = "1.0.64"

59
avam-protocol/src/lib.rs Normal file
View File

@@ -0,0 +1,59 @@
use serde::{de, Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
pub enum SystemPacket {
Ping,
Pong,
Close { reason: String },
}
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
pub enum SimConnectPacket {
// ..
}
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
pub enum Packets {
System(SystemPacket),
SimConnect(SimConnectPacket),
}
impl Packet for Packets {}
pub trait Packet: Serialize + de::DeserializeOwned + Sized {
fn encode(&self) -> Result<Vec<u8>, anyhow::Error> {
use flate2::write::GzEncoder;
use flate2::Compression;
use std::io::Write;
let encoded = bincode::serialize(&self)?;
let mut compressor = GzEncoder::new(Vec::new(), Compression::best());
compressor.write_all(&encoded)?;
Ok(compressor.finish()?)
}
fn decode(data: &[u8]) -> Result<Self, anyhow::Error> {
use flate2::read::GzDecoder;
use std::io::Read;
let mut decompressor = GzDecoder::new(data);
let mut buffer = Vec::new();
decompressor.read_to_end(&mut buffer)?;
Ok(bincode::deserialize(&buffer)?)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn it_works() {
let msg = Packets::System(SystemPacket::Ping);
let decoded = Packet::decode(&msg.encode().unwrap()).unwrap();
assert_eq!(msg, decoded);
}
}

View File

@@ -1,8 +1,9 @@
use std::{borrow::Cow, net::SocketAddr, ops::ControlFlow}; use std::{net::SocketAddr, ops::ControlFlow, sync::Arc, time::Duration};
use avam_protocol::{Packet, Packets, SimConnectPacket, SystemPacket};
use axum::{ use axum::{
extract::{ extract::{
ws::{CloseFrame, Message, WebSocket}, ws::{Message, WebSocket},
ConnectInfo, State, WebSocketUpgrade, ConnectInfo, State, WebSocketUpgrade,
}, },
response::IntoResponse, response::IntoResponse,
@@ -14,6 +15,10 @@ use axum_extra::{
use futures::{SinkExt, StreamExt}; use futures::{SinkExt, StreamExt};
use http::StatusCode; use http::StatusCode;
use jsonwebtoken::{decode, Algorithm, DecodingKey, Validation}; use jsonwebtoken::{decode, Algorithm, DecodingKey, Validation};
use tokio::{
sync::mpsc,
time::{sleep, Instant},
};
use crate::{ use crate::{
domain::api::{ domain::api::{
@@ -70,156 +75,151 @@ pub async fn ws_handler<S: ApiService>(
return Err(StatusCode::UNAUTHORIZED); return Err(StatusCode::UNAUTHORIZED);
}; };
Ok(ws.on_upgrade(move |socket| handle_socket(socket, user, addr))) if app_state.has_connection(&user).await {
return Err(StatusCode::CONFLICT);
}
Ok(ws.on_upgrade(move |socket| handle_socket(app_state.clone(), socket, user, addr)))
} }
/// Actual websocket statemachine (one will be spawned per connection) /// Actual websocket statemachine (one will be spawned per connection)
async fn handle_socket(mut socket: WebSocket, user: User, who: SocketAddr) { async fn handle_socket<S: ApiService>(
// send a ping (unsupported by some browsers) just to kick things off and get a response app_state: AppState<S>,
if socket socket: WebSocket,
.send(Message::Text(format!("Hello {}!", user.email()))) user: User,
.await who: SocketAddr,
.is_ok() ) {
{ // AppState needs to store user to channel handles or something
tracing::debug!("Pinged {who}..."); // It'd know who's connected at all times and be able to send messages
} else {
tracing::debug!("Could not send ping {who}!");
// no Error here since the only thing we can do is to close the connection.
// If we can not send messages, there is no way to salvage the statemachine anyway.
return;
}
// receive single message from a client (we can either receive or send with socket). let (sender, mut receiver) = mpsc::channel(10);
// this will likely be the Pong for our Ping or a hello message from client. app_state.add_connection(&user, sender).await;
// waiting for message from a client will block this task, but will not block other client's
// connections.
if let Some(msg) = socket.recv().await {
if let Ok(msg) = msg {
if process_message(msg, who).is_break() {
return;
}
} else {
tracing::debug!("client {who} abruptly disconnected");
return;
}
}
// Since each client gets individual statemachine, we can pause handling // This can probably be hella-abstracted away and be made a lot cleaner
// when necessary to wait for some external event (in this case illustrated by sleeping). let (mut writer, mut reader) = socket.split();
// Waiting for this client to finish getting its greetings does not prevent other clients from // This probably needs a (mpsc?) channel, then a dedicated thread to send ping on that channel
// connecting to server and receiving their greetings. let mut ping_timer = Instant::now();
for i in 1..5 { ping_timer.checked_add(Duration::from_secs(15)); // add 15 seconds so we instantly trigger a ping
if socket let u = user.clone();
.send(Message::Text(format!("Hi {i} times!"))) let writer_handle = tokio::spawn(async move {
.await loop {
.is_err() let msg = match receiver.try_recv() {
{ Ok(msg) => Some(msg),
tracing::debug!("client {who} abruptly disconnected"); Err(mpsc::error::TryRecvError::Disconnected) => break,
return; _ => None,
} };
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
// By splitting socket we can send and receive at the same time. In this example we will send if let Some(msg) = msg {
// unsolicited messages to client based on some sort of server's internal event (i.e .timer). let Ok(encoded_message) = msg.encode() else {
let (mut sender, mut receiver) = socket.split(); tracing::error!("Unable to encode message for sending: {:#?}", msg);
sleep(Duration::from_millis(100)).await;
// Spawn a task that will push several messages to the client (does not matter what client does) continue;
let mut send_task = tokio::spawn(async move { };
let n_msg = 20; tracing::info!("> [{}]: {:?}", &u.email(), msg);
for i in 0..n_msg { let _ = writer.send(Message::Binary(encoded_message)).await;
// In case of any websocket error, we exit.
if sender
.send(Message::Text(format!("Server message {i} ...")))
.await
.is_err()
{
return i;
} }
tokio::time::sleep(std::time::Duration::from_millis(300)).await; if ping_timer.elapsed() >= Duration::from_secs(15) {
} tracing::trace!(
"> [{}]: {:?}",
&u.email(),
Packets::System(SystemPacket::Ping)
);
let _ = writer
.send(Message::Binary(
Packets::System(SystemPacket::Ping).encode().unwrap(),
))
.await;
ping_timer = Instant::now();
}
tracing::debug!("Sending close to {who}..."); sleep(Duration::from_millis(100)).await;
if let Err(e) = sender
.send(Message::Close(Some(CloseFrame {
code: axum::extract::ws::close_code::NORMAL,
reason: Cow::from("Goodbye"),
})))
.await
{
tracing::debug!("Could not send Close due to {e}, probably it is ok?");
} }
n_msg
}); });
// This second task will receive messages from client and print them on server console let u = user.clone();
let mut recv_task = tokio::spawn(async move { let a_s = app_state.clone();
let mut cnt = 0; let reader_handle = tokio::spawn(async move {
while let Some(Ok(msg)) = receiver.next().await { loop {
cnt += 1; if let Some(Ok(data)) = reader.next().await {
// print message and break if instructed to do so let Ok(packet) = Packets::decode(&data.clone().into_data()) else {
if process_message(msg, who).is_break() { if let Message::Close(Some(c)) = data {
break; tracing::info!("{} disconnected: {}", u.email(), c.reason);
break;
}
tracing::error!("Invalid packet received {:#?}", data);
sleep(Duration::from_millis(100)).await;
continue;
};
match packet {
Packets::System(system_packet) => {
process_system_packet(a_s.clone(), &u, system_packet).await;
}
Packets::SimConnect(sim_connect_packet) => {
process_message(a_s.api_service().clone(), &u, sim_connect_packet).await;
}
}
} }
sleep(Duration::from_millis(100)).await;
} }
cnt
}); });
// If any one of the tasks exit, abort the other.
tokio::select! { tokio::select! {
rv_a = (&mut send_task) => { _ = reader_handle => {
match rv_a { tracing::debug!("reader closed");
Ok(a) => tracing::debug!("{a} messages sent to {who}"),
Err(a) => tracing::debug!("Error sending messages {a:?}")
}
recv_task.abort();
},
rv_b = (&mut recv_task) => {
match rv_b {
Ok(b) => tracing::debug!("Received {b} messages"),
Err(b) => tracing::debug!("Error receiving messages {b:?}")
}
send_task.abort();
} }
} _ = writer_handle => {
tracing::debug!("writer closed");
}
};
// remove the user/channel from AppState
app_state.remove_connection(&user).await;
// returning from the handler closes the websocket connection // returning from the handler closes the websocket connection
tracing::debug!("Websocket context {who} destroyed"); tracing::debug!("Websocket context {who} destroyed");
} }
async fn process_system_packet<S>(
app_state: AppState<S>,
user: &User,
packet: SystemPacket,
) -> ControlFlow<(), ()>
where
S: ApiService,
{
tracing::trace!("< [{}]: {:?}", user.email(), packet);
match packet {
SystemPacket::Ping => {
// send back pong
let _ = app_state
.send(user, Packets::System(SystemPacket::Pong))
.await;
}
SystemPacket::Pong => {
// noop
}
SystemPacket::Close { reason } => {
tracing::debug!("{} disconnect: {}", user.email(), reason);
return ControlFlow::Break(()); // whatever this means
}
};
ControlFlow::Continue(())
}
async fn process_message<S>(
api_service: Arc<S>,
user: &User,
packet: SimConnectPacket,
) -> ControlFlow<(), ()>
where
S: ApiService,
{
tracing::info!("< [{}]: {:?}", user.email(), packet);
// On incoming packets, we use the internal api to store stuff to the database
// We'll use Server Side Events (SSE) to keep the dashboard up to date with the state of the database
/// helper to print contents of messages to stdout. Has special treatment for Close.
fn process_message(msg: Message, who: SocketAddr) -> ControlFlow<(), ()> {
match msg {
Message::Text(t) => {
tracing::debug!(">>> {who} sent str: {t:?}");
}
Message::Binary(d) => {
tracing::debug!(">>> {} sent {} bytes: {:?}", who, d.len(), d);
}
Message::Close(c) => {
if let Some(cf) = c {
tracing::debug!(
">>> {} sent close with code {} and reason `{}`",
who,
cf.code,
cf.reason
);
} else {
tracing::debug!(">>> {who} somehow sent close message without CloseFrame");
}
return ControlFlow::Break(());
}
Message::Pong(v) => {
tracing::debug!(">>> {who} sent pong with {v:?}");
}
// You should never need to manually handle Message::Ping, as axum's websocket library
// will do so for you automagically by replying with Pong and copying the v according to
// spec. But if you need the contents of the pings you can see them here.
Message::Ping(v) => {
tracing::debug!(">>> {who} sent ping with {v:?}");
}
}
ControlFlow::Continue(()) ControlFlow::Continue(())
} }

View File

@@ -1,11 +1,16 @@
use std::sync::Arc; use std::{collections::HashMap, sync::Arc};
use avam_protocol::Packets;
use axum::extract::FromRef; use axum::extract::FromRef;
use leptos::get_configuration; use leptos::get_configuration;
use tokio::sync::{mpsc::Sender, RwLock};
use crate::{config::Config, domain::api::ports::ApiService}; use crate::{
config::Config,
domain::api::{ports::ApiService, prelude::User},
};
#[derive(Debug, Clone)] #[derive(Clone)]
/// The global application state shared between all request handlers. /// The global application state shared between all request handlers.
pub struct AppState<S> pub struct AppState<S>
where where
@@ -14,6 +19,7 @@ where
pub leptos_options: leptos::LeptosOptions, pub leptos_options: leptos::LeptosOptions,
config: Arc<Config>, config: Arc<Config>,
api_service: Arc<S>, api_service: Arc<S>,
connections: Arc<RwLock<HashMap<uuid::Uuid, Sender<Packets>>>>,
} }
impl<S> AppState<S> impl<S> AppState<S>
@@ -25,6 +31,7 @@ where
config: Arc::new(config), config: Arc::new(config),
leptos_options: get_configuration(None).await.unwrap().leptos_options, leptos_options: get_configuration(None).await.unwrap().leptos_options,
api_service: Arc::new(api_service), api_service: Arc::new(api_service),
connections: Arc::new(RwLock::new(HashMap::new())),
} }
} }
@@ -37,6 +44,30 @@ where
} }
} }
impl<S> AppState<S>
where
S: ApiService,
{
pub async fn add_connection(&self, user: &User, sender: Sender<Packets>) {
self.connections.write().await.insert(user.id(), sender);
}
pub async fn has_connection(&self, user: &User) -> bool {
self.connections.read().await.contains_key(&user.id())
}
pub async fn remove_connection(&self, user: &User) {
self.connections.write().await.remove(&user.id());
}
pub async fn send(&self, user: &User, packet: Packets) -> Result<(), anyhow::Error> {
if let Some(sender) = self.connections.read().await.get(&user.id()) {
sender.send(packet).await?;
}
Ok(())
}
}
impl<S> FromRef<AppState<S>> for leptos::LeptosOptions impl<S> FromRef<AppState<S>> for leptos::LeptosOptions
where where
S: ApiService, S: ApiService,

View File

@@ -1444,10 +1444,6 @@ html {
margin-top: auto; margin-top: auto;
} }
.block {
display: block;
}
.flex { .flex {
display: flex; display: flex;
} }