This commit is like Batman, It has no parents
This commit is contained in:
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
/target
|
1091
Cargo.lock
generated
Normal file
1091
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
12
Cargo.toml
Normal file
12
Cargo.toml
Normal file
@@ -0,0 +1,12 @@
|
||||
[package]
|
||||
name = "p2000-api"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
async-stream = "0.3.6"
|
||||
axum = { version = "0.8.1", features = ["ws", "http2", "macros"] }
|
||||
futures = "0.3.31"
|
||||
futures-util = "0.3.31"
|
||||
tokio = { version = "1.44.1", features = ["full"] }
|
||||
tokio-stream = { version = "0.1.17", features = ["sync"] }
|
5
build
Executable file
5
build
Executable file
@@ -0,0 +1,5 @@
|
||||
#!/bin/bash
|
||||
|
||||
cargo b -r
|
||||
scp target/release/p2000-api sdr:~/
|
||||
ssh sdr "sudo systemctl stop p2000-api ; sudo cp ~/p2000-api /usr/local/bin/ ; sudo systemctl start p2000-api"
|
120
src/main.rs
Normal file
120
src/main.rs
Normal file
@@ -0,0 +1,120 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_stream::try_stream;
|
||||
use axum::{
|
||||
Router,
|
||||
extract::{
|
||||
State, WebSocketUpgrade,
|
||||
ws::{Message, WebSocket},
|
||||
},
|
||||
response::{
|
||||
IntoResponse, Sse,
|
||||
sse::{Event, KeepAlive},
|
||||
},
|
||||
routing::{any, get},
|
||||
};
|
||||
use futures::Stream;
|
||||
use tokio::{
|
||||
net::TcpListener,
|
||||
sync::broadcast::{Sender, channel},
|
||||
task,
|
||||
};
|
||||
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct ServerState {
|
||||
// ..
|
||||
event_stream: Sender<String>,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap();
|
||||
|
||||
let (tx, _) = channel::<String>(10);
|
||||
let server_state = Arc::new(ServerState { event_stream: tx });
|
||||
|
||||
let state = server_state.clone();
|
||||
task::spawn(async move {
|
||||
input_listener(state).await;
|
||||
});
|
||||
|
||||
let router = Router::new()
|
||||
.route("/", get(index))
|
||||
.route("/sse", get(event_stream))
|
||||
.route("/ws", any(ws_handler))
|
||||
.with_state(server_state);
|
||||
|
||||
axum::serve(listener, router).await.unwrap();
|
||||
}
|
||||
|
||||
async fn index() -> impl IntoResponse {
|
||||
"navigate to /sse for server side events, or /ws for websocket"
|
||||
}
|
||||
|
||||
async fn input_listener(state: Arc<ServerState>) {
|
||||
let tx = &state.event_stream;
|
||||
let stdin = std::io::stdin();
|
||||
let mut buffer = String::new();
|
||||
while stdin.read_line(&mut buffer).unwrap() > 0 {
|
||||
if !buffer.contains('|') {
|
||||
buffer.clear();
|
||||
continue;
|
||||
}
|
||||
let line = buffer.trim();
|
||||
println!("{}", line);
|
||||
if tx.send(line.to_string()).is_err() {
|
||||
break;
|
||||
}
|
||||
buffer.clear();
|
||||
}
|
||||
}
|
||||
|
||||
async fn ws_handler(
|
||||
ws: WebSocketUpgrade,
|
||||
State(state): State<Arc<ServerState>>,
|
||||
) -> impl IntoResponse {
|
||||
ws.on_upgrade(move |socket| ws_stream(socket, state))
|
||||
}
|
||||
|
||||
async fn ws_stream(mut socket: WebSocket, state: Arc<ServerState>) {
|
||||
let mut receiver = state.event_stream.subscribe();
|
||||
loop {
|
||||
let msg = match receiver.try_recv() {
|
||||
Ok(msg) => msg,
|
||||
Err(tokio::sync::broadcast::error::TryRecvError::Closed) => break,
|
||||
Err(_) => {
|
||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if (socket.send(Message::Text(msg.into())).await).is_err() {
|
||||
break;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
async fn event_stream(
|
||||
State(state): State<Arc<ServerState>>,
|
||||
) -> Sse<impl Stream<Item = Result<Event, BroadcastStreamRecvError>>> {
|
||||
let mut receiver = state.event_stream.subscribe();
|
||||
// let mystream = BroadcastStream::new(receiver);
|
||||
|
||||
// Sse::new(mystream.map(|e| Ok(Event::default().data(e?)))).keep_alive(KeepAlive::default())
|
||||
|
||||
Sse::new(try_stream! {
|
||||
yield Event::default().data("ACK");
|
||||
loop {
|
||||
let msg = match receiver.try_recv() {
|
||||
Ok(msg) => msg,
|
||||
Err(tokio::sync::broadcast::error::TryRecvError::Closed) => break,
|
||||
Err(_) => {
|
||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
yield Event::default().data(msg);
|
||||
}
|
||||
})
|
||||
.keep_alive(KeepAlive::default())
|
||||
}
|
Reference in New Issue
Block a user