use std::{io::Cursor, net::SocketAddr, sync::Arc}; use irc_rust::Message; use rcgen::{generate_simple_self_signed, CertifiedKey}; use rustls::pki_types::{pem::PemObject, CertificateDer, PrivateKeyDer}; use tokio::{net::{TcpListener, TcpStream}, sync::mpsc}; use tokio_rustls::{server::TlsStream, TlsAcceptor}; use tokio::io::AsyncReadExt; type Result = std::result::Result>; #[tokio::main] async fn main() -> Result<()> { let CertifiedKey { cert, key_pair } = generate_simple_self_signed(vec![ "127.0.0.1".to_string(), "localhost".to_string() ])?; let cert = cert.pem(); let key = key_pair.serialize_pem(); let certs = vec![CertificateDer::from_pem_reader(Cursor::new(&cert))?]; let key = PrivateKeyDer::from_pem_reader(Cursor::new(&key))?; let config = rustls::ServerConfig::builder() .with_no_client_auth() .with_single_cert(certs, key)?; let acceptor = TlsAcceptor::from(Arc::new(config)); let listener = TcpListener::bind("0.0.0.0:6697").await?; // todo: need a connection-pool here // HashMap or something // which means, this goes into yet another thread loop { println!("Awaiting connection..."); let acceptor = acceptor.clone(); let (stream, peer_addr) = listener.accept().await?; // todo: Create the channel here, store corrosponding tx/rx in the connection pool defined above tokio::spawn(async move { let Ok(stream) = acceptor.accept(stream).await else { eprintln!("Unable to accept connection"); return; }; // todo: Pass the channel along with process if let Err(e) = process(peer_addr, stream).await { eprintln!("{:#?}", e); } }); } // todo: the "processing" loop goes here... } async fn process(peer_addr: SocketAddr, stream: TlsStream) -> Result<()> { println!("Got connection from: {:?}", peer_addr); let (mut rx, mut tx) = tokio::io::split(stream); // todo: instead of creating it here.. it's too late to accumulate them all here let (c2s_tx, mut c2s_rx) = mpsc::channel(32); // fire off a receive loop tokio::spawn(async move { let mut buffer = vec![]; 'outer: loop { let mut buf = [0u8; 512]; if let Ok(b) = rx.read(&mut buf).await { if b == 0 { break; } let len = buffer.len(); buffer.append(&mut buf.to_vec()); buffer.truncate(len + b); if b >= 512 { continue; } let lines = String::from_utf8_lossy(&buffer).split("\r\n").filter_map(|i| { if i.len() == 0 { None } else { Some(Message::from(i)) } }).collect::>(); buffer.clear(); for i in lines { if c2s_tx.send(i).await.is_err() { break 'outer; } } } } println!("{:?} closed", peer_addr); }); // fire off a send loop tokio::spawn(async move { while let Some(i) = c2s_rx.recv().await { dbg!(i); } }); // fire off a processing loop that combines both with channels Ok(()) }