添加Transmit传输过程

This commit is contained in:
2024-04-18 19:50:32 +08:00
parent 4f2ca772a0
commit eb471bd8dd
9 changed files with 351 additions and 3 deletions

View File

@@ -1,5 +1,5 @@
cmake_minimum_required(VERSION 3.16)
project(exec)
project(Transmission)
add_subdirectory(src)

View File

@@ -1,7 +1,23 @@
cmake_minimum_required(VERSION 3.16)
project(Transmission)
find_package(Protobuf REQUIRED)
set(CMAKE_CXX_STANDARD 20)
FILE(GLOB protofiles "${CMAKE_CURRENT_SOURCE_DIR}/protobuf/*.proto")
message(STATUS ${protofiles})
PROTOBUF_GENERATE_CPP(PROTOSRCS PROTOHDRS ${protofiles})
add_library(proto STATIC ${PROTOSRCS} ${PROTOHDRS})
target_include_directories(proto PUBLIC ${CMAKE_CURRENT_BINARY_DIR})
add_library(Transmission
Client.cpp)
Client.cpp
Transmit.cpp
CharBuff.cpp
"Exception/PrepareDataException.h"
)
target_link_libraries(Transmission protobuf proto)

78
src/CharBuff.cpp Normal file
View File

@@ -0,0 +1,78 @@
#include <stdexcept>
#include <cstring>
#include <iostream>
#include "CharBuff.h"
namespace Transmission {
CharBuff::CharBuff(int64_t size) {
if (size <= 0)
throw std::runtime_error("缓冲区大小不能为0或者负数");
ptr = new unsigned char[size];
ptrSize = size;
}
CharBuff::~CharBuff() {
delete[] ptr;
}
void CharBuff::put(const void *buff, int64_t start, int64_t size) {
if ((this->ptrSize - this->flag) < size) {
//剩余容量不足以容纳新数据
if (this->ptrSize - (this->flag - this->startFlag) >= size) {
//丢弃缓冲区内已经被处理的数据后,可以继续添加数据。
//避免频繁的new delete造成不必要的性能消耗
memmove(this->ptr, this->ptr + this->startFlag, this->flag - this->startFlag);
this->flag -= this->startFlag;
this->startFlag = 0;
} else {
//缓冲区剩余容量无法继续添加数据,需要调整容量
//调整容量时,顺带丢弃已经被读取的内存数据
//调整后缓冲区大小
int64_t addSize = (this->ptrSize - this->startFlag) + (size * 2);
//不能缩小到低于256字节防止频繁调整容量
if (addSize < 256)
addSize = 256;
if (addSize > 1024 * 1024 * 8)
//出于安全考虑缓冲区大小最大不能超过8mb
throw std::bad_alloc();
auto *tmp = new unsigned char[addSize];
this->ptrSize = addSize;
memcpy(tmp, this->ptr + this->startFlag, this->flag - this->startFlag);
delete[] this->ptr;
this->flag -= this->startFlag;
this->startFlag = 0;
this->ptr = tmp;
}
}
memcpy(this->ptr + this->flag, (char *) (buff) + start, size);
this->flag += size;
}
unsigned char CharBuff::get() {
return *(ptr + startFlag++);
}
int64_t CharBuff::getSize() const {
return this->flag - this->startFlag;
}
int64_t CharBuff::getBuffSize() const {
return this->ptrSize;
}
CharBuff::CharBuff(const unsigned char *ptr, int64_t start, int64_t size) {
this->ptr = new unsigned char[size];
memcpy(this->ptr, ptr + start, size);
this->flag = size;
this->ptrSize = size;
}
void CharBuff::get(unsigned char *buff, int64_t size) {
if (this->startFlag + size > this->flag) {
size = this->flag - this->startFlag;
}
memcpy(buff, this->ptr + startFlag, size);
this->startFlag += size;
}
} // ling

56
src/CharBuff.h Normal file
View File

@@ -0,0 +1,56 @@
#ifndef MEMORYPLUGKERNEL_CHARBUFF_H
#define MEMORYPLUGKERNEL_CHARBUFF_H
#include <mutex>
namespace Transmission {
/**
* 一个轻量级的字节缓冲区,线程不安全
*/
class CharBuff {
private:
int64_t ptrSize;
unsigned char *ptr;
//指针位置
int64_t flag = 0;
int64_t startFlag = 0;
public:
/**
* 创建缓冲区默认分配64字节空间
* @param size
*/
explicit CharBuff(int64_t size = 64);
CharBuff(const unsigned char *ptr, int64_t start, int64_t size);
virtual ~CharBuff();
/**
* 添加数据到缓冲区末尾
* @param buff 数据
* @param start 开始位置
* @param size 长度
*/
void put(const void *buff, int64_t start, int64_t size);
unsigned char get();
void get(unsigned char *buff, int64_t size);
/**
* 获取数据大小
* @return
*/
[[nodiscard]] int64_t getSize() const;
/**
* 获取缓冲区大小
* @return
*/
[[nodiscard]] int64_t getBuffSize() const;
};
} // ling
#endif //MEMORYPLUGKERNEL_CHARBUFF_H

View File

@@ -8,7 +8,7 @@
#define TRANSMISSION_CLIENT_H
namespace Transmission {
/// 客户端类
class Client {
};

View File

@@ -0,0 +1,24 @@
// 版权所有 (c) ling 保留所有权利。
// 除非另行说明否则仅允许在Transmission中使用此文件中的代码。
//
// 由 ling 创建于 24-4-18.
//
#ifndef TRANSMISSION_PREPAREDATAEXCEPTION_H
#define TRANSMISSION_PREPAREDATAEXCEPTION_H
#include <stdexcept>
namespace Transmission {
/// 准备数据时发生异常
class PrepareDataException : public std::runtime_error {
public:
explicit PrepareDataException(const std::string &e) : std::runtime_error(e) {
}
explicit PrepareDataException(const char *e) : std::runtime_error(e) {
}
};
}
#endif //TRANSMISSION_PREPAREDATAEXCEPTION_H

112
src/Transmit.cpp Normal file
View File

@@ -0,0 +1,112 @@
// 版权所有 (c) ling 保留所有权利。
// 除非另行说明否则仅允许在Transmission中使用此文件中的代码。
//
// 由 ling 创建于 24-4-18.
//
#include "Transmit.h"
#include <lzma.h>
#include "Exception/PrepareDataException.h"
#include <utility>
#include <transmission.pb.h>
#define DATA_STOP 0x20030507
namespace Transmission {
Transmit::Transmit(SOCKET fd, std::string ip) : ip(std::move(ip)), fd(fd) {
}
void Transmit::dataArrives(unsigned char *data, size_t size) {
std::unique_lock<std::mutex> lock(dataMutex);
if (this->packSize < 0) {
if (charBuff.getSize() >= 4) {
//读取下一个数据包的长度
this->packSize =
(charBuff.get() << 0) + (charBuff.get() << 8) + (charBuff.get() << 16) + (charBuff.get() << 24);
} else {
return;
}
}
//一个格式正确的数据包应该由如下部分构成:
// 头部:包体大小(int32_t)
// 包体:数据包正文段,长度不定
// 尾部MagicNumber(int32_t)
if (charBuff.getSize() < this->packSize + sizeof(int32_t)) {
//数据不够,等待填充数据
return;
}
int32_t magicNumber = (charBuff.get() << 0) + (charBuff.get() << 8) + (charBuff.get() << 16) + (charBuff.get() << 24);
if (magicNumber != DATA_STOP)
throw PrepareDataException("魔数错误");
auto buff = std::shared_ptr<unsigned char>(new unsigned char[this->packSize], [](const unsigned char *p) {
delete[] p;
});
charBuff.get(buff.get(), this->packSize);
DataPackets dataPackets;
if (!dataPackets.ParseFromArray(buff.get(), this->packSize))
throw PrepareDataException("反序列化出错");
switch (dataPackets.type()) {
//无压缩
case CompressAlgorithm::NOT: {
auto pack = copyMem(dataPackets.data());
packetReady(dataPackets.type(), pack, dataPackets.data().size());
break;
}
//lzma压缩
case CompressAlgorithm::LZMA: {
size_t un_size;
auto pack = unLzma(dataPackets.data(), un_size);
packetReady(dataPackets.type(), pack, un_size);
break;
}
default:
throw PrepareDataException("未知的压缩算法");
}
}
std::shared_ptr<unsigned char> Transmit::copyMem(const std::string &str) {
auto buff = std::shared_ptr<unsigned char>(new unsigned char[str.size()], [](const unsigned char *p) {
delete[] p;
});
auto mem = reinterpret_cast<const unsigned char *>(str.data());
memcpy(buff.get(), mem, str.size());
return buff;
}
std::shared_ptr<unsigned char> Transmit::unLzma(const std::string &str, size_t &size) {
lzma_stream stream = LZMA_STREAM_INIT;
lzma_ret ret = lzma_easy_encoder(&stream, LZMA_PRESET_DEFAULT, LZMA_CHECK_CRC64);
if (ret != LZMA_OK)
throw PrepareDataException("LZMA初始化失败");
// 解压缩数据
stream.next_in = reinterpret_cast<const uint8_t *>(str.data());
stream.avail_in = str.size();
CharBuff buff((int64_t) lzma_stream_buffer_bound(str.size()));
size_t lastSize = 0;
do {
size_t bufferSize = lzma_stream_buffer_bound(str.size());
auto ptr = std::shared_ptr<unsigned char>(new unsigned char[bufferSize], [](const unsigned char *p) {
delete[] p;
});
stream.next_out = reinterpret_cast<uint8_t *>(ptr.get());
stream.avail_out = bufferSize;
ret = lzma_code(&stream, LZMA_FINISH);
lastSize = stream.total_out - lastSize;
buff.put(ptr.get(), 0, (int64_t) lastSize);
} while (ret == LZMA_BUF_ERROR);
if (ret != LZMA_OK && ret != LZMA_STREAM_END)
throw PrepareDataException("LZMA解压缩失败");
auto ptr = std::shared_ptr<unsigned char>(new unsigned char[buff.getSize()], [](const unsigned char *p) {
delete[] p;
});
size = buff.getSize();
buff.get(ptr.get(), buff.getSize());
return ptr;
}
} // Transmission

47
src/Transmit.h Normal file
View File

@@ -0,0 +1,47 @@
// 版权所有 (c) ling 保留所有权利。
// 除非另行说明否则仅允许在Transmission中使用此文件中的代码。
//
// 由 ling 创建于 24-4-18.
//
#ifndef TRANSMISSION_TRANSMIT_H
#define TRANSMISSION_TRANSMIT_H
#include <string>
#include <pcap/socket.h>
#include <memory>
#include "CharBuff.h"
namespace Transmission {
/// 传输层实现
/// 传输过程使用lzma算法压缩
class Transmit {
private:
const std::string ip;
const SOCKET fd;
CharBuff charBuff;
int32_t packSize = -1;
std::mutex dataMutex;
protected:
virtual std::shared_ptr<unsigned char> copyMem(const std::string &str);
/// 解压lzma数据包
virtual std::shared_ptr<unsigned char> unLzma(const std::string &str, size_t &size);
public:
explicit Transmit(SOCKET fd, std::string ip);
/// 收到来自网络的数据
virtual void dataArrives(unsigned char *data, size_t size);
/// 数据包就绪
/// @param type 数据包类型
/// @param data 解压缩后的数据包
virtual void packetReady(int type, std::shared_ptr<unsigned char> data, size_t size) = 0;
};
} // Transmission
#endif //TRANSMISSION_TRANSMIT_H

View File

@@ -0,0 +1,15 @@
package Transmission;
enum CompressAlgorithm {
NOT = 1;
LZMA = 2;
}
//基础数据包
message DataPackets {
//压缩算法
required int64 algorithm = 1;
//数据包类型
required int32 type = 2;
required bytes data = 3;
}