things and stuff

This commit is contained in:
2025-03-04 21:40:00 +01:00
parent a44afb2e55
commit 0bfd9962f0
3 changed files with 299 additions and 53 deletions

View File

@@ -1,21 +1,115 @@
use std::{io::Cursor, net::SocketAddr, sync::Arc};
use std::collections::HashMap;
use std::{io::Cursor, net::SocketAddr, str::FromStr, sync::Arc, time::Duration};
use irc_rust::Message;
use rcgen::{generate_simple_self_signed, CertifiedKey};
use rustls::pki_types::{pem::PemObject, CertificateDer, PrivateKeyDer};
use tokio::{net::{TcpListener, TcpStream}, sync::mpsc};
use tokio_rustls::{server::TlsStream, TlsAcceptor};
use tokio::io::AsyncReadExt;
use rcgen::{CertifiedKey, generate_simple_self_signed};
use rustls::pki_types::{CertificateDer, PrivateKeyDer, pem::PemObject};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::RwLock;
use tokio::{
net::{TcpListener, TcpStream},
sync::mpsc::{self, Receiver, Sender},
};
use tokio_rustls::{TlsAcceptor, server::TlsStream};
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
#[derive(Debug)]
struct Client {
peer_addr: SocketAddr,
rx: Receiver<Message>,
tx: Sender<Message>,
nick: Option<String>,
c2s_tx: Option<Sender<Message>>,
s2c_rx: Option<Receiver<Message>>,
}
impl Client {
pub fn new(peer_addr: SocketAddr) -> Self {
let (c2s_tx, c2s_rx) = mpsc::channel(32);
let (s2c_tx, s2c_rx) = mpsc::channel(32);
Self {
peer_addr,
tx: s2c_tx,
rx: c2s_rx,
nick: None,
c2s_tx: Some(c2s_tx),
s2c_rx: Some(s2c_rx),
}
}
pub fn take_channels(&mut self) -> Result<(Sender<Message>, Receiver<Message>)> {
Ok((
self.c2s_tx.take().expect("Already taken"),
self.s2c_rx.take().expect("Already taken"),
))
}
}
#[derive(Default, Debug)]
struct AppState {
clients: RwLock<HashMap<SocketAddr, Client>>,
}
impl AppState {
pub async fn add_client(&self, peer_addr: SocketAddr) -> (Sender<Message>, Receiver<Message>) {
let mut client = Client::new(peer_addr);
let Ok((c2s_tx, s2c_rx)) = client.take_channels() else {
eprintln!("Unable to take channels for {}", peer_addr);
unreachable!();
};
self.clients.write().await.insert(peer_addr, client);
(c2s_tx, s2c_rx)
}
// pub sync fn set_nick(&self, peer_addr: SocketAddr, nick: &str) {
// // ...
// }
// clients still needs mod/op status stuff
// join channel
// leave channel
// create channel
// pub async fn get(&self, peer_addr: SocketAddr) -> Option<&Client> {
// self.clients.read().await.get(&peer_addr)
// }
// pub async fn get_mut(&self, peer_addr: SocketAddr) -> Option<&mut Client> {
// self.clients.write().await.get_mut(&peer_addr)
// }
pub async fn get_messages(&self) -> Vec<(SocketAddr, Message)> {
let mut messages = vec![];
let mut clients = self.clients.write().await;
for (peer_addr, client) in clients.iter_mut() {
if let Ok(msg) = client.rx.try_recv() {
messages.push((*peer_addr, msg));
}
}
messages
}
pub async fn send(&self, peer_addr: SocketAddr, msg: Message) -> Result<()> {
let clients = self.clients.read().await;
let Some(client) = clients.get(&peer_addr) else {
return Err("Client not found".into());
};
client.tx.send(msg).await?;
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<()> {
let CertifiedKey { cert, key_pair } = generate_simple_self_signed(vec![
"127.0.0.1".to_string(),
"localhost".to_string()
])?;
let CertifiedKey { cert, key_pair } =
generate_simple_self_signed(vec!["127.0.0.1".to_string(), "localhost".to_string()])?;
let cert = cert.pem();
let key = key_pair.serialize_pem();
@@ -27,65 +121,111 @@ async fn main() -> Result<()> {
.with_no_client_auth()
.with_single_cert(certs, key)?;
let acceptor = TlsAcceptor::from(Arc::new(config));
let app_state = Arc::new(AppState::default());
let acceptor = TlsAcceptor::from(Arc::new(config));
let listener = TcpListener::bind("0.0.0.0:6697").await?;
// todo: need a connection-pool here
// HashMap<Socket, MessageQueue> or something
// which means, this goes into yet another thread
loop {
println!("Awaiting connection...");
let acceptor = acceptor.clone();
let (stream, peer_addr) = listener.accept().await?;
// todo: Create the channel here, store corrosponding tx/rx in the connection pool defined above
tokio::spawn(async move {
let Ok(stream) = acceptor.accept(stream).await else {
eprintln!("Unable to accept connection");
return;
let state: Arc<AppState> = app_state.clone();
tokio::spawn(async move {
loop {
let acceptor = acceptor.clone();
let Ok((stream, peer_addr)) = listener.accept().await else {
eprintln!("Error accepting connection...");
continue;
};
// todo: Pass the channel along with process
if let Err(e) = process(peer_addr, stream).await {
eprintln!("{:#?}", e);
}
});
}
let (c2s_tx, s2c_rx) = state.add_client(peer_addr).await;
tokio::spawn(async move {
let Ok(stream) = acceptor.accept(stream).await else {
eprintln!("Unable to accept connection");
return;
};
// todo: Pass the channel along with process
if let Err(e) = connect(peer_addr, stream, c2s_tx, s2c_rx).await {
eprintln!("{:#?}", e);
}
});
}
});
// todo: the "processing" loop goes here...
let state: Arc<AppState> = app_state.clone();
tokio::spawn(async move {
loop {
let messages = state.get_messages().await;
// println!("Got {} messages", messages.len());
for (peer_addr, msg) in messages {
println!("{}: {}", peer_addr, msg);
if let Err(e) = handle(msg, peer_addr, state.clone()).await {
eprintln!("Error handling command: {:?}", e);
}
}
tokio::time::sleep(Duration::from_millis(1)).await;
}
});
tokio::signal::ctrl_c().await?;
Ok(())
}
async fn process(peer_addr: SocketAddr, stream: TlsStream<TcpStream>) -> Result<()> {
async fn handle(
msg: Message,
peer_addr: SocketAddr,
state: Arc<AppState>,
) -> Result<Option<Message>> {
match msg.command()? {
"CAP" => {
let _ = state.send(peer_addr, Message::from_str("NONE")?).await;
}
_ => {
return Err(format!("Unknown Command: {:?}", msg.command()?).into());
}
};
Ok(None)
}
async fn connect(
peer_addr: SocketAddr,
stream: TlsStream<TcpStream>,
c2s_tx: Sender<Message>,
mut s2c_rx: Receiver<Message>,
) -> Result<()> {
println!("Got connection from: {:?}", peer_addr);
let (mut rx, mut tx) = tokio::io::split(stream);
// todo: instead of creating it here.. it's too late to accumulate them all here
let (c2s_tx, mut c2s_rx) = mpsc::channel(32);
// fire off a receive loop
tokio::spawn(async move {
let mut buffer = vec![];
'outer: loop {
let mut buf = [0u8; 512];
if let Ok(b) = rx.read(&mut buf).await {
if b == 0 { break; }
if b == 0 {
break;
}
let len = buffer.len();
buffer.append(&mut buf.to_vec());
buffer.truncate(len + b);
if b >= 512 { continue; }
if b >= 512 {
continue;
}
let lines = String::from_utf8_lossy(&buffer).split("\r\n").filter_map(|i| {
if i.len() == 0 {
None
} else {
Some(Message::from(i))
}
}).collect::<Vec<Message>>();
let lines = String::from_utf8_lossy(&buffer)
.split("\r\n")
.filter_map(|i| {
if i.is_empty() {
None
} else {
Some(Message::from(i))
}
})
.collect::<Vec<Message>>();
buffer.clear();
for i in lines {
@@ -100,13 +240,13 @@ async fn process(peer_addr: SocketAddr, stream: TlsStream<TcpStream>) -> Result<
// fire off a send loop
tokio::spawn(async move {
while let Some(i) = c2s_rx.recv().await {
dbg!(i);
while let Some(i) = s2c_rx.recv().await {
if let Err(e) = tx.write(i.to_string().as_bytes()).await {
eprintln!("Error sending message: {:?}", e);
break;
}
}
});
// fire off a processing loop that combines both with channels
Ok(())
}