From 0bfd9962f0abcae2597335116bc6cc31dc4977cd Mon Sep 17 00:00:00 2001 From: Avii Date: Tue, 4 Mar 2025 21:40:00 +0100 Subject: [PATCH] things and stuff --- Cargo.lock | 105 ++++++++++++++++++++++ Cargo.toml | 1 + src/main.rs | 246 +++++++++++++++++++++++++++++++++++++++++----------- 3 files changed, 299 insertions(+), 53 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cb1833f..7e678b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -196,6 +196,95 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + +[[package]] +name = "futures-task" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-util" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + [[package]] name = "getrandom" version = "0.2.15" @@ -232,6 +321,7 @@ dependencies = [ name = "irc" version = "0.1.0" dependencies = [ + "futures", "irc-rust", "rcgen", "rustls", @@ -421,6 +511,12 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "powerfmt" version = "0.2.0" @@ -619,6 +715,15 @@ dependencies = [ "libc", ] +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + [[package]] name = "smallvec" version = "1.14.0" diff --git a/Cargo.toml b/Cargo.toml index c87974d..68f5a05 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2024" [dependencies] +futures = "0.3.31" irc-rust = "0.4.0" rcgen = "0.13.2" rustls = "0.23.23" diff --git a/src/main.rs b/src/main.rs index 4fdf6f0..4c19819 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,21 +1,115 @@ -use std::{io::Cursor, net::SocketAddr, sync::Arc}; +use std::collections::HashMap; +use std::{io::Cursor, net::SocketAddr, str::FromStr, sync::Arc, time::Duration}; use irc_rust::Message; -use rcgen::{generate_simple_self_signed, CertifiedKey}; -use rustls::pki_types::{pem::PemObject, CertificateDer, PrivateKeyDer}; -use tokio::{net::{TcpListener, TcpStream}, sync::mpsc}; -use tokio_rustls::{server::TlsStream, TlsAcceptor}; -use tokio::io::AsyncReadExt; +use rcgen::{CertifiedKey, generate_simple_self_signed}; +use rustls::pki_types::{CertificateDer, PrivateKeyDer, pem::PemObject}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::sync::RwLock; +use tokio::{ + net::{TcpListener, TcpStream}, + sync::mpsc::{self, Receiver, Sender}, +}; +use tokio_rustls::{TlsAcceptor, server::TlsStream}; type Result = std::result::Result>; +#[derive(Debug)] +struct Client { + peer_addr: SocketAddr, + rx: Receiver, + tx: Sender, + nick: Option, + + c2s_tx: Option>, + s2c_rx: Option>, +} + +impl Client { + pub fn new(peer_addr: SocketAddr) -> Self { + let (c2s_tx, c2s_rx) = mpsc::channel(32); + let (s2c_tx, s2c_rx) = mpsc::channel(32); + + Self { + peer_addr, + tx: s2c_tx, + rx: c2s_rx, + nick: None, + + c2s_tx: Some(c2s_tx), + s2c_rx: Some(s2c_rx), + } + } + + pub fn take_channels(&mut self) -> Result<(Sender, Receiver)> { + Ok(( + self.c2s_tx.take().expect("Already taken"), + self.s2c_rx.take().expect("Already taken"), + )) + } +} + +#[derive(Default, Debug)] +struct AppState { + clients: RwLock>, +} + +impl AppState { + pub async fn add_client(&self, peer_addr: SocketAddr) -> (Sender, Receiver) { + let mut client = Client::new(peer_addr); + let Ok((c2s_tx, s2c_rx)) = client.take_channels() else { + eprintln!("Unable to take channels for {}", peer_addr); + unreachable!(); + }; + self.clients.write().await.insert(peer_addr, client); + + (c2s_tx, s2c_rx) + } + + // pub sync fn set_nick(&self, peer_addr: SocketAddr, nick: &str) { + // // ... + // } + + // clients still needs mod/op status stuff + + // join channel + // leave channel + // create channel + + // pub async fn get(&self, peer_addr: SocketAddr) -> Option<&Client> { + // self.clients.read().await.get(&peer_addr) + // } + + // pub async fn get_mut(&self, peer_addr: SocketAddr) -> Option<&mut Client> { + // self.clients.write().await.get_mut(&peer_addr) + // } + + pub async fn get_messages(&self) -> Vec<(SocketAddr, Message)> { + let mut messages = vec![]; + let mut clients = self.clients.write().await; + for (peer_addr, client) in clients.iter_mut() { + if let Ok(msg) = client.rx.try_recv() { + messages.push((*peer_addr, msg)); + } + } + + messages + } + + pub async fn send(&self, peer_addr: SocketAddr, msg: Message) -> Result<()> { + let clients = self.clients.read().await; + let Some(client) = clients.get(&peer_addr) else { + return Err("Client not found".into()); + }; + client.tx.send(msg).await?; + Ok(()) + } +} + #[tokio::main] async fn main() -> Result<()> { - - let CertifiedKey { cert, key_pair } = generate_simple_self_signed(vec![ - "127.0.0.1".to_string(), - "localhost".to_string() - ])?; + let CertifiedKey { cert, key_pair } = + generate_simple_self_signed(vec!["127.0.0.1".to_string(), "localhost".to_string()])?; let cert = cert.pem(); let key = key_pair.serialize_pem(); @@ -27,65 +121,111 @@ async fn main() -> Result<()> { .with_no_client_auth() .with_single_cert(certs, key)?; - let acceptor = TlsAcceptor::from(Arc::new(config)); + let app_state = Arc::new(AppState::default()); + let acceptor = TlsAcceptor::from(Arc::new(config)); let listener = TcpListener::bind("0.0.0.0:6697").await?; - // todo: need a connection-pool here - // HashMap or something - // which means, this goes into yet another thread - loop { - println!("Awaiting connection..."); - let acceptor = acceptor.clone(); - let (stream, peer_addr) = listener.accept().await?; - - // todo: Create the channel here, store corrosponding tx/rx in the connection pool defined above - - tokio::spawn(async move { - - let Ok(stream) = acceptor.accept(stream).await else { - eprintln!("Unable to accept connection"); - return; + let state: Arc = app_state.clone(); + tokio::spawn(async move { + loop { + let acceptor = acceptor.clone(); + let Ok((stream, peer_addr)) = listener.accept().await else { + eprintln!("Error accepting connection..."); + continue; }; - - // todo: Pass the channel along with process - if let Err(e) = process(peer_addr, stream).await { - eprintln!("{:#?}", e); - } - }); - } + + let (c2s_tx, s2c_rx) = state.add_client(peer_addr).await; + + tokio::spawn(async move { + let Ok(stream) = acceptor.accept(stream).await else { + eprintln!("Unable to accept connection"); + return; + }; + + // todo: Pass the channel along with process + if let Err(e) = connect(peer_addr, stream, c2s_tx, s2c_rx).await { + eprintln!("{:#?}", e); + } + }); + } + }); // todo: the "processing" loop goes here... + let state: Arc = app_state.clone(); + tokio::spawn(async move { + loop { + let messages = state.get_messages().await; + // println!("Got {} messages", messages.len()); + + for (peer_addr, msg) in messages { + println!("{}: {}", peer_addr, msg); + if let Err(e) = handle(msg, peer_addr, state.clone()).await { + eprintln!("Error handling command: {:?}", e); + } + } + tokio::time::sleep(Duration::from_millis(1)).await; + } + }); + + tokio::signal::ctrl_c().await?; + + Ok(()) } -async fn process(peer_addr: SocketAddr, stream: TlsStream) -> Result<()> { +async fn handle( + msg: Message, + peer_addr: SocketAddr, + state: Arc, +) -> Result> { + match msg.command()? { + "CAP" => { + let _ = state.send(peer_addr, Message::from_str("NONE")?).await; + } + _ => { + return Err(format!("Unknown Command: {:?}", msg.command()?).into()); + } + }; + + Ok(None) +} + +async fn connect( + peer_addr: SocketAddr, + stream: TlsStream, + c2s_tx: Sender, + mut s2c_rx: Receiver, +) -> Result<()> { println!("Got connection from: {:?}", peer_addr); let (mut rx, mut tx) = tokio::io::split(stream); - - // todo: instead of creating it here.. it's too late to accumulate them all here - let (c2s_tx, mut c2s_rx) = mpsc::channel(32); - // fire off a receive loop tokio::spawn(async move { let mut buffer = vec![]; 'outer: loop { let mut buf = [0u8; 512]; if let Ok(b) = rx.read(&mut buf).await { - if b == 0 { break; } + if b == 0 { + break; + } let len = buffer.len(); buffer.append(&mut buf.to_vec()); buffer.truncate(len + b); - if b >= 512 { continue; } + if b >= 512 { + continue; + } - let lines = String::from_utf8_lossy(&buffer).split("\r\n").filter_map(|i| { - if i.len() == 0 { - None - } else { - Some(Message::from(i)) - } - }).collect::>(); + let lines = String::from_utf8_lossy(&buffer) + .split("\r\n") + .filter_map(|i| { + if i.is_empty() { + None + } else { + Some(Message::from(i)) + } + }) + .collect::>(); buffer.clear(); for i in lines { @@ -100,13 +240,13 @@ async fn process(peer_addr: SocketAddr, stream: TlsStream) -> Result< // fire off a send loop tokio::spawn(async move { - while let Some(i) = c2s_rx.recv().await { - dbg!(i); + while let Some(i) = s2c_rx.recv().await { + if let Err(e) = tx.write(i.to_string().as_bytes()).await { + eprintln!("Error sending message: {:?}", e); + break; + } } }); - // fire off a processing loop that combines both with channels - Ok(()) } -