// 版权所有 (c) ling 保留所有权利。 // 除非另行说明,否则仅允许在LingTransmit中使用此文件中的代码。 // // 由 ling 创建于 2025/1/18. #![allow(non_snake_case)] pub mod Client; pub mod accept; pub mod event; use crate::close_sender::CloseSender; use crate::server::accept::AcceptSocket; use crate::server::event::ServerEvent; use crate::ssl::ServerCert; use async_trait::async_trait; use log::{debug, error}; use std::collections::HashMap; use std::path::Path; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{mpsc, Arc}; use tokio::io; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::UnixListener; use tokio::net::{TcpListener, ToSocketAddrs}; use tokio::sync::{broadcast, Mutex}; pub type ClientID = u64; pub type ClientList = Arc>>>; /// 服务器抽象 pub struct Server { listener: Arc, close_sender: CloseSender, client_list: ClientList, next_id: AtomicU64, cert: Arc, event: Arc, } impl Server { fn new(listener: Arc, cert: ServerCert, event: Arc) -> Self { Server { listener, close_sender: CloseSender::new(), client_list: Arc::new(Mutex::new(HashMap::new())), next_id: AtomicU64::new(0), cert: Arc::new(cert), event, } } pub async fn new_tcp( addr: A, cert: ServerCert, event: Arc, ) -> io::Result { let listener = TcpListener::bind(addr).await?; Ok(Server::new(Arc::new(listener), cert, event)) } pub async fn new_unix

( path: P, cert: ServerCert, event: Arc, ) -> io::Result where P: AsRef, { let unix = UnixListener::bind(path)?; Ok(Server::new(Arc::new(unix), cert, event)) } /// 广播关闭消息 pub async fn close(&self) { self.close_sender.send_close().await; } /// 获取在线客户端列表 pub fn get_client_list(&self) -> ClientList { self.client_list.clone() } /// 挂断一个客户端 pub async fn close_client(&self, client: &Arc) { Self::close_client_form_arc(&self.client_list, client).await; } pub async fn close_client_form_arc(list: &ClientList, client: &Arc) { let client_id = client.id; let mut lock = list.lock().await; if let Some(client) = lock.get(&client_id) { //向使用者报告客户端关闭 client.event.client_close_listener(client.clone()).await; client.close().await; lock.remove(&client_id); } } /// 开始接收客户端连接 pub async fn start_accept(&self) { tokio::select! { _ = async { loop { if let Err(err) = self.accept_client().await { error!("接受连接失败:{}",err.to_string()); } } } => {} _ = self.close_sender.wait_close() => { debug!("停止接受客户端连接"); } } } async fn accept_client(&self) -> io::Result<()> { let (read, write, addr) = self.listener.accept().await?; let id = self.next_id.fetch_add(1, Ordering::SeqCst); let close = CloseSender::new(); let client = Arc::new(Client::Client::new( self.close_sender.clone().await, close, read, write, id, addr, self.cert.clone(), self.event.clone(), )); let mut lock = self.client_list.lock().await; lock.insert(id, client.clone()); drop(lock); //向使用者报告新的客户端连入 self.event.client_linker_listener(client.clone()).await; let list = self.get_client_list(); tokio::spawn(async move { client.clone().start().await; //当连接的事件轮退出,则自动挂断 Self::close_client_form_arc(&list, &client).await; }); Ok(()) } }