完善服务端功能
This commit is contained in:
@@ -8,6 +8,7 @@ use crate::close_sender::CloseSender;
|
||||
use crate::packet::code::{SERVER_ACK, SERVER_ERROR};
|
||||
use crate::packet::{read_packet, NetworkPackets};
|
||||
use crate::server::accept::{OwnedReadHalfAbstraction, OwnedWriteHalfAbstraction, SocketAddr};
|
||||
use crate::server::event::ServerEvent;
|
||||
use crate::server::ClientID;
|
||||
use crate::ssl::ServerCert;
|
||||
use log::{error, info};
|
||||
@@ -41,6 +42,7 @@ pub struct Client {
|
||||
pub key: OnceLock<String>,
|
||||
/// 服务器证书
|
||||
cert: Arc<ServerCert>,
|
||||
pub(super) event: Arc<dyn ServerEvent>,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
@@ -52,6 +54,7 @@ impl Client {
|
||||
id: ClientID,
|
||||
addr: SocketAddr,
|
||||
cert: Arc<ServerCert>,
|
||||
event: Arc<dyn ServerEvent>,
|
||||
) -> Self {
|
||||
Client {
|
||||
server_close,
|
||||
@@ -64,6 +67,7 @@ impl Client {
|
||||
is_key_negotiation: AtomicBool::new(false),
|
||||
key: OnceLock::new(),
|
||||
cert,
|
||||
event,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -79,7 +83,7 @@ impl Client {
|
||||
}
|
||||
|
||||
/// 开始处理该客户端的请求
|
||||
pub async fn start(&self) {
|
||||
pub async fn start(self: Arc<Client>) {
|
||||
loop {
|
||||
//从字节流中读取一个数据包
|
||||
let packet = tokio::select! {
|
||||
@@ -90,14 +94,17 @@ impl Client {
|
||||
Err(io::Error::new(io::ErrorKind::NotFound,"读取端已经被挂断"))
|
||||
}
|
||||
Some(ref mut val) => {
|
||||
read_packet(val,self).await
|
||||
read_packet(val,self.clone()).await
|
||||
}
|
||||
}
|
||||
} => {
|
||||
match packet {
|
||||
Ok(val) => {val}
|
||||
Err(err) => {
|
||||
info!("{} 号连接读取数据包出错:{}",self.id,err.to_string());
|
||||
// 如果仅仅是读取到文件尾,说明客户端只是挂断了,不是什么大问题
|
||||
if err.kind() != io::ErrorKind::UnexpectedEof {
|
||||
error!("{} 号连接读取数据包出错:{}",self.id,err.to_string());
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -130,10 +137,10 @@ impl Client {
|
||||
}
|
||||
|
||||
/// 处理客户端请求
|
||||
async fn process_packet(&self, packet: NetworkPackets) -> io::Result<()> {
|
||||
async fn process_packet(self: &Arc<Client>, packet: NetworkPackets) -> io::Result<()> {
|
||||
match packet {
|
||||
NetworkPackets::SynV1 => self.syn_v1().await,
|
||||
NetworkPackets::UserAsk(_) => Ok(()),
|
||||
NetworkPackets::UserAsk(data) => self.event.client_user_data(self.clone(), data).await,
|
||||
NetworkPackets::PushAesKey(data) => self.client_push_key(&data).await,
|
||||
}
|
||||
}
|
||||
@@ -157,10 +164,10 @@ impl Client {
|
||||
let mut send = self.write_soc.lock().await;
|
||||
if let Err(_) = self.key.set(key.clone()) {
|
||||
//重复推送,拒绝密钥
|
||||
send.write_i32(SERVER_ERROR).await?;
|
||||
send.write_i32_le(SERVER_ERROR).await?;
|
||||
return Ok(());
|
||||
}
|
||||
send.write_i32(SERVER_ACK).await?;
|
||||
send.write_i32_le(SERVER_ACK).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -168,7 +175,7 @@ impl Client {
|
||||
async fn syn_v1(&self) -> io::Result<()> {
|
||||
let certificate = self.cert.certificate.to_pem()?;
|
||||
let mut send = self.write_soc.lock().await;
|
||||
send.write_i64(certificate.len() as i64).await?;
|
||||
send.write_i64_le(certificate.len() as i64).await?;
|
||||
send.write(&certificate).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -9,53 +9,13 @@ use tokio::io;
|
||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
||||
use tokio::net::{tcp, unix, TcpListener, UnixListener};
|
||||
|
||||
/// 读取抽象,使用小端序
|
||||
/// 读取抽象
|
||||
#[async_trait]
|
||||
pub trait OwnedReadHalfAbstraction: AsyncRead + Unpin + Send + Sync {
|
||||
async fn read_i32(&mut self) -> io::Result<i32> {
|
||||
let mut buffer = [0u8; 4];
|
||||
self.read_exact(&mut buffer).await?;
|
||||
Ok(i32::from_le_bytes(buffer))
|
||||
}
|
||||
pub trait OwnedReadHalfAbstraction: AsyncRead + Unpin + Send + Sync {}
|
||||
|
||||
async fn read_i64(&mut self) -> io::Result<i64> {
|
||||
let mut buffer = [0u8; 8];
|
||||
self.read_exact(&mut buffer).await?;
|
||||
Ok(i64::from_le_bytes(buffer))
|
||||
}
|
||||
|
||||
async fn read_u32(&mut self) -> io::Result<u32> {
|
||||
let mut buffer = [0u8; 4];
|
||||
self.read_exact(&mut buffer).await?;
|
||||
Ok(u32::from_le_bytes(buffer))
|
||||
}
|
||||
|
||||
async fn read_u64(&mut self) -> io::Result<u64> {
|
||||
let mut buffer = [0u8; 8];
|
||||
self.read_exact(&mut buffer).await?;
|
||||
Ok(u64::from_le_bytes(buffer))
|
||||
}
|
||||
}
|
||||
|
||||
/// 写入抽象,使用小端序
|
||||
/// 写入抽象
|
||||
#[async_trait]
|
||||
pub trait OwnedWriteHalfAbstraction: AsyncWrite + Unpin + Send + Sync {
|
||||
async fn write_i32(&mut self, value: i32) -> io::Result<usize> {
|
||||
self.write(&value.to_le_bytes()).await
|
||||
}
|
||||
|
||||
async fn write_i64(&mut self, value: i64) -> io::Result<usize> {
|
||||
self.write(&value.to_le_bytes()).await
|
||||
}
|
||||
|
||||
async fn write_u32(&mut self, value: u32) -> io::Result<usize> {
|
||||
self.write(&value.to_le_bytes()).await
|
||||
}
|
||||
|
||||
async fn write_u64(&mut self, value: u64) -> io::Result<usize> {
|
||||
self.write(&value.to_le_bytes()).await
|
||||
}
|
||||
}
|
||||
pub trait OwnedWriteHalfAbstraction: AsyncWrite + Unpin + Send + Sync {}
|
||||
|
||||
#[async_trait]
|
||||
impl OwnedReadHalfAbstraction for tcp::OwnedReadHalf {}
|
||||
|
||||
28
src/server/event.rs
Normal file
28
src/server/event.rs
Normal file
@@ -0,0 +1,28 @@
|
||||
// 版权所有 (c) ling 保留所有权利。
|
||||
// 除非另行说明,否则仅允许在LingTransmit中使用此文件中的代码。
|
||||
//
|
||||
// 由 ling 创建于 2025/1/19.
|
||||
#![allow(non_snake_case)]
|
||||
|
||||
use crate::server::Client;
|
||||
use async_trait::async_trait;
|
||||
use std::io;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::broadcast::error::{RecvError, SendError};
|
||||
|
||||
#[async_trait]
|
||||
pub trait ServerEvent: Send + Sync {
|
||||
/// 客户端连入事件
|
||||
async fn client_linker_listener(&self, client: Arc<Client::Client>);
|
||||
|
||||
/// 客户端挂断事件
|
||||
async fn client_close_listener(&self, client: Arc<Client::Client>);
|
||||
|
||||
/// 客户端请求
|
||||
async fn client_user_data(
|
||||
&self,
|
||||
client: Arc<Client::Client>,
|
||||
packet: Vec<u8>,
|
||||
) -> io::Result<()>;
|
||||
}
|
||||
@@ -6,9 +6,11 @@
|
||||
|
||||
pub mod Client;
|
||||
pub mod accept;
|
||||
pub mod event;
|
||||
|
||||
use crate::close_sender::CloseSender;
|
||||
use crate::server::accept::AcceptSocket;
|
||||
use crate::server::event::ServerEvent;
|
||||
use crate::ssl::ServerCert;
|
||||
use async_trait::async_trait;
|
||||
use log::{debug, error};
|
||||
@@ -32,30 +34,40 @@ pub struct Server {
|
||||
client_list: ClientList,
|
||||
next_id: AtomicU64,
|
||||
cert: Arc<ServerCert>,
|
||||
event: Arc<dyn ServerEvent>,
|
||||
}
|
||||
|
||||
impl Server {
|
||||
fn new(listener: Box<dyn AcceptSocket>, cert: ServerCert) -> Self {
|
||||
fn new(listener: Box<dyn AcceptSocket>, cert: ServerCert, event: Arc<dyn ServerEvent>) -> Self {
|
||||
Server {
|
||||
listener,
|
||||
close_sender: CloseSender::new(),
|
||||
client_list: Arc::new(Mutex::new(HashMap::new())),
|
||||
next_id: AtomicU64::new(0),
|
||||
cert: Arc::new(cert),
|
||||
event,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn new_tcp<A: ToSocketAddrs>(addr: A, cert: ServerCert) -> io::Result<Self> {
|
||||
pub async fn new_tcp<A: ToSocketAddrs>(
|
||||
addr: A,
|
||||
cert: ServerCert,
|
||||
event: Arc<dyn ServerEvent>,
|
||||
) -> io::Result<Self> {
|
||||
let listener = TcpListener::bind(addr).await?;
|
||||
Ok(Server::new(Box::new(listener), cert))
|
||||
Ok(Server::new(Box::new(listener), cert, event))
|
||||
}
|
||||
|
||||
pub async fn new_unix<P>(path: P, cert: ServerCert) -> io::Result<Self>
|
||||
pub async fn new_unix<P>(
|
||||
path: P,
|
||||
cert: ServerCert,
|
||||
event: Arc<dyn ServerEvent>,
|
||||
) -> io::Result<Self>
|
||||
where
|
||||
P: AsRef<Path>,
|
||||
{
|
||||
let unix = UnixListener::bind(path)?;
|
||||
Ok(Server::new(Box::new(unix), cert))
|
||||
Ok(Server::new(Box::new(unix), cert, event))
|
||||
}
|
||||
|
||||
/// 广播关闭消息
|
||||
@@ -69,15 +81,18 @@ impl Server {
|
||||
}
|
||||
|
||||
/// 挂断一个客户端
|
||||
pub async fn close_client(&self, id: ClientID) {
|
||||
Self::close_client_form_arc(&self.client_list, id).await;
|
||||
pub async fn close_client(&self, client: &Arc<Client::Client>) {
|
||||
Self::close_client_form_arc(&self.client_list, client).await;
|
||||
}
|
||||
|
||||
pub async fn close_client_form_arc(list: &ClientList, id: ClientID) {
|
||||
pub async fn close_client_form_arc(list: &ClientList, client: &Arc<Client::Client>) {
|
||||
let client_id = client.id;
|
||||
let mut lock = list.lock().await;
|
||||
if let Some(client) = lock.get(&id) {
|
||||
if let Some(client) = lock.get(&client_id) {
|
||||
//向使用者报告客户端关闭
|
||||
client.event.client_close_listener(client.clone()).await;
|
||||
client.close().await;
|
||||
lock.remove(&id);
|
||||
lock.remove(&client_id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -109,17 +124,21 @@ impl Server {
|
||||
id,
|
||||
addr,
|
||||
self.cert.clone(),
|
||||
self.event.clone(),
|
||||
));
|
||||
|
||||
let mut lock = self.client_list.lock().await;
|
||||
lock.insert(id, client.clone());
|
||||
drop(lock);
|
||||
|
||||
//向使用者报告新的客户端连入
|
||||
self.event.client_linker_listener(client.clone()).await;
|
||||
|
||||
let list = self.get_client_list();
|
||||
tokio::spawn(async move {
|
||||
client.start().await;
|
||||
client.clone().start().await;
|
||||
//当连接的事件轮退出,则自动挂断
|
||||
Self::close_client_form_arc(&list, id).await;
|
||||
Self::close_client_form_arc(&list, &client).await;
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user