实现Ling Transmit V1.1协议

This commit is contained in:
2025-01-18 23:35:32 +08:00
commit 72f332db47
12 changed files with 738 additions and 0 deletions

175
src/server/Client.rs Normal file
View File

@@ -0,0 +1,175 @@
// 版权所有 (c) ling 保留所有权利。
// 除非另行说明否则仅允许在LingTransmit中使用此文件中的代码。
//
// 由 ling 创建于 2025/1/18.
#![allow(non_snake_case)]
use crate::close_sender::CloseSender;
use crate::packet::code::{SERVER_ACK, SERVER_ERROR};
use crate::packet::{read_packet, NetworkPackets};
use crate::server::accept::{OwnedReadHalfAbstraction, OwnedWriteHalfAbstraction, SocketAddr};
use crate::server::ClientID;
use crate::ssl::ServerCert;
use log::{error, info};
use openssl::rsa::Padding;
use std::io;
use std::string::FromUtf8Error;
use std::sync::atomic::{AtomicBool, AtomicI32};
use std::sync::{Arc, OnceLock};
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::Mutex;
pub type ReadSoc = Mutex<Option<Box<dyn OwnedReadHalfAbstraction>>>;
pub type WriteSoc = Mutex<Box<dyn OwnedWriteHalfAbstraction>>;
/// 服务器侧的Client抽象
pub struct Client {
/// 服务器关闭
server_close: CloseSender,
/// 连接关闭
client_close: CloseSender,
read_soc: ReadSoc,
write_soc: WriteSoc,
pub id: ClientID,
pub addr: SocketAddr,
/// 协议版本
pub syn_version: AtomicI32,
/// 是否正在进行密钥协商
pub is_key_negotiation: AtomicBool,
/// 会话密钥
pub key: OnceLock<String>,
/// 服务器证书
cert: Arc<ServerCert>,
}
impl Client {
pub fn new(
server_close: CloseSender,
client_close: CloseSender,
read_soc: Box<dyn OwnedReadHalfAbstraction>,
write_soc: Box<dyn OwnedWriteHalfAbstraction>,
id: ClientID,
addr: SocketAddr,
cert: Arc<ServerCert>,
) -> Self {
Client {
server_close,
client_close,
read_soc: Mutex::new(Some(read_soc)),
write_soc: Mutex::new(write_soc),
id,
addr,
syn_version: AtomicI32::new(0),
is_key_negotiation: AtomicBool::new(false),
key: OnceLock::new(),
cert,
}
}
/// 挂断连接
pub(super) async fn close(&self) {
let mut lock = self.read_soc.lock().await;
*lock = None;
drop(lock);
let mut lock = self.write_soc.lock().await;
if let Err(err) = lock.shutdown().await {
error!("关闭 {} 号客户端的写入端失败:{}", self.id, err.to_string());
}
}
/// 开始处理该客户端的请求
pub async fn start(&self) {
loop {
//从字节流中读取一个数据包
let packet = tokio::select! {
packet = async {
let mut lock = self.read_soc.lock().await;
match *lock {
None => {
Err(io::Error::new(io::ErrorKind::NotFound,"读取端已经被挂断"))
}
Some(ref mut val) => {
read_packet(val,self).await
}
}
} => {
match packet {
Ok(val) => {val}
Err(err) => {
info!("{} 号连接读取数据包出错:{}",self.id,err.to_string());
return;
}
}
}
_ = self.server_close.wait_close() => {
info!("{} 号连接被挂断",self.id);
return;
}
_ = self.client_close.wait_close() => {
info!("{} 号连接被挂断",self.id);
return;
}
};
//处理客户端请求
//将数据包的读取和处理放在不同的select!中,确保数据包的处理过程不会被关闭信号打断
tokio::select! {
result = self.process_packet(packet) => {
if let Err(err) = result {
error!("{} 号连接请求处理出错:{}", self.id, err.to_string());
return;
}
}
_ = tokio::time::sleep(Duration::from_secs(15)) => {
error!("{} 号连接的请求处理超时!",self.id);
return;
}
}
}
}
/// 处理客户端请求
async fn process_packet(&self, packet: NetworkPackets) -> io::Result<()> {
match packet {
NetworkPackets::SynV1 => self.syn_v1().await,
NetworkPackets::UserAsk(_) => Ok(()),
NetworkPackets::PushAesKey(data) => self.client_push_key(&data).await,
}
}
/// 客户端推送会话密钥
async fn client_push_key(&self, buff: &Vec<u8>) -> io::Result<()> {
let rsa = self.cert.private_key.rsa()?;
let mut data = Vec::new();
data.resize(rsa.size() as usize, 0u8);
//解密会话密钥
rsa.private_decrypt(buff, &mut data, Padding::PKCS1)?;
let key = match String::from_utf8(data) {
Ok(key) => key,
Err(_) => {
return Err(io::Error::new(io::ErrorKind::NotFound, "解密会话密钥失败"));
}
};
let mut send = self.write_soc.lock().await;
if let Err(_) = self.key.set(key.clone()) {
//重复推送,拒绝密钥
send.write_i32(SERVER_ERROR).await?;
return Ok(());
}
send.write_i32(SERVER_ACK).await?;
Ok(())
}
/// 发送服务器证书
async fn syn_v1(&self) -> io::Result<()> {
let certificate = self.cert.certificate.to_pem()?;
let mut send = self.write_soc.lock().await;
send.write_i64(certificate.len() as i64).await?;
send.write(&certificate).await?;
Ok(())
}
}

116
src/server/accept.rs Normal file
View File

@@ -0,0 +1,116 @@
// 版权所有 (c) ling 保留所有权利。
// 除非另行说明否则仅允许在LingTransmit中使用此文件中的代码。
//
// 由 ling 创建于 2025/1/18.
#![allow(non_snake_case)]
use async_trait::async_trait;
use tokio::io;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::net::{tcp, unix, TcpListener, UnixListener};
/// 读取抽象,使用小端序
#[async_trait]
pub trait OwnedReadHalfAbstraction: AsyncRead + Unpin + Send + Sync {
async fn read_i32(&mut self) -> io::Result<i32> {
let mut buffer = [0u8; 4];
self.read_exact(&mut buffer).await?;
Ok(i32::from_le_bytes(buffer))
}
async fn read_i64(&mut self) -> io::Result<i64> {
let mut buffer = [0u8; 8];
self.read_exact(&mut buffer).await?;
Ok(i64::from_le_bytes(buffer))
}
async fn read_u32(&mut self) -> io::Result<u32> {
let mut buffer = [0u8; 4];
self.read_exact(&mut buffer).await?;
Ok(u32::from_le_bytes(buffer))
}
async fn read_u64(&mut self) -> io::Result<u64> {
let mut buffer = [0u8; 8];
self.read_exact(&mut buffer).await?;
Ok(u64::from_le_bytes(buffer))
}
}
/// 写入抽象,使用小端序
#[async_trait]
pub trait OwnedWriteHalfAbstraction: AsyncWrite + Unpin + Send + Sync {
async fn write_i32(&mut self, value: i32) -> io::Result<usize> {
self.write(&value.to_le_bytes()).await
}
async fn write_i64(&mut self, value: i64) -> io::Result<usize> {
self.write(&value.to_le_bytes()).await
}
async fn write_u32(&mut self, value: u32) -> io::Result<usize> {
self.write(&value.to_le_bytes()).await
}
async fn write_u64(&mut self, value: u64) -> io::Result<usize> {
self.write(&value.to_le_bytes()).await
}
}
#[async_trait]
impl OwnedReadHalfAbstraction for tcp::OwnedReadHalf {}
#[async_trait]
impl OwnedReadHalfAbstraction for unix::OwnedReadHalf {}
#[async_trait]
impl OwnedWriteHalfAbstraction for tcp::OwnedWriteHalf {}
#[async_trait]
impl OwnedWriteHalfAbstraction for unix::OwnedWriteHalf {}
#[async_trait]
pub trait AcceptSocket {
async fn accept(
&self,
) -> io::Result<(
Box<dyn OwnedReadHalfAbstraction>,
Box<dyn OwnedWriteHalfAbstraction>,
SocketAddr,
)>;
}
pub enum SocketAddr {
TCP(std::net::SocketAddr),
Unix(unix::SocketAddr),
}
#[async_trait]
impl AcceptSocket for TcpListener {
async fn accept(
&self,
) -> io::Result<(
Box<dyn OwnedReadHalfAbstraction>,
Box<dyn OwnedWriteHalfAbstraction>,
SocketAddr,
)> {
let (socket, addr) = self.accept().await?;
let (read, write) = socket.into_split();
Ok((Box::new(read), Box::new(write), SocketAddr::TCP(addr)))
}
}
#[async_trait]
impl AcceptSocket for UnixListener {
async fn accept(
&self,
) -> io::Result<(
Box<dyn OwnedReadHalfAbstraction>,
Box<dyn OwnedWriteHalfAbstraction>,
SocketAddr,
)> {
let (socket, addr) = self.accept().await?;
let (read, write) = socket.into_split();
Ok((Box::new(read), Box::new(write), SocketAddr::Unix(addr)))
}
}

126
src/server/mod.rs Normal file
View File

@@ -0,0 +1,126 @@
// 版权所有 (c) ling 保留所有权利。
// 除非另行说明否则仅允许在LingTransmit中使用此文件中的代码。
//
// 由 ling 创建于 2025/1/18.
#![allow(non_snake_case)]
pub mod Client;
pub mod accept;
use crate::close_sender::CloseSender;
use crate::server::accept::AcceptSocket;
use crate::ssl::ServerCert;
use async_trait::async_trait;
use log::{debug, error};
use std::collections::HashMap;
use std::path::Path;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{mpsc, Arc};
use tokio::io;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::UnixListener;
use tokio::net::{TcpListener, ToSocketAddrs};
use tokio::sync::{broadcast, Mutex};
pub type ClientID = u64;
pub type ClientList = Arc<Mutex<HashMap<ClientID, Arc<Client::Client>>>>;
/// 服务器抽象
pub struct Server {
listener: Box<dyn AcceptSocket>,
close_sender: CloseSender,
client_list: ClientList,
next_id: AtomicU64,
cert: Arc<ServerCert>,
}
impl Server {
fn new(listener: Box<dyn AcceptSocket>, cert: ServerCert) -> Self {
Server {
listener,
close_sender: CloseSender::new(),
client_list: Arc::new(Mutex::new(HashMap::new())),
next_id: AtomicU64::new(0),
cert: Arc::new(cert),
}
}
pub async fn new_tcp<A: ToSocketAddrs>(addr: A, cert: ServerCert) -> io::Result<Self> {
let listener = TcpListener::bind(addr).await?;
Ok(Server::new(Box::new(listener), cert))
}
pub async fn new_unix<P>(path: P, cert: ServerCert) -> io::Result<Self>
where
P: AsRef<Path>,
{
let unix = UnixListener::bind(path)?;
Ok(Server::new(Box::new(unix), cert))
}
/// 广播关闭消息
pub async fn close(&self) {
self.close_sender.send_close().await;
}
/// 获取在线客户端列表
pub fn get_client_list(&self) -> ClientList {
self.client_list.clone()
}
/// 挂断一个客户端
pub async fn close_client(&self, id: ClientID) {
Self::close_client_form_arc(&self.client_list, id).await;
}
pub async fn close_client_form_arc(list: &ClientList, id: ClientID) {
let mut lock = list.lock().await;
if let Some(client) = lock.get(&id) {
client.close().await;
lock.remove(&id);
}
}
/// 开始接收客户端连接
pub async fn start_accept(&self) {
tokio::select! {
_ = async {
loop {
if let Err(err) = self.accept_client().await {
error!("接受连接失败:{}",err.to_string());
}
}
} => {}
_ = self.close_sender.wait_close() => {
debug!("停止接受客户端连接");
}
}
}
async fn accept_client(&self) -> io::Result<()> {
let (read, write, addr) = self.listener.accept().await?;
let id = self.next_id.fetch_add(1, Ordering::SeqCst);
let close = CloseSender::new();
let client = Arc::new(Client::Client::new(
self.close_sender.clone().await,
close,
read,
write,
id,
addr,
self.cert.clone(),
));
let mut lock = self.client_list.lock().await;
lock.insert(id, client.clone());
drop(lock);
let list = self.get_client_list();
tokio::spawn(async move {
client.start().await;
//当连接的事件轮退出,则自动挂断
Self::close_client_form_arc(&list, id).await;
});
Ok(())
}
}