diff --git a/CMakeLists.txt b/CMakeLists.txt index 18017e5..e21ada0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,5 +1,5 @@ cmake_minimum_required(VERSION 3.16) -project(exec) +project(Transmission) add_subdirectory(src) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index e123af3..9b18955 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,7 +1,23 @@ cmake_minimum_required(VERSION 3.16) project(Transmission) +find_package(Protobuf REQUIRED) + set(CMAKE_CXX_STANDARD 20) + +FILE(GLOB protofiles "${CMAKE_CURRENT_SOURCE_DIR}/protobuf/*.proto") +message(STATUS ${protofiles}) +PROTOBUF_GENERATE_CPP(PROTOSRCS PROTOHDRS ${protofiles}) + +add_library(proto STATIC ${PROTOSRCS} ${PROTOHDRS}) +target_include_directories(proto PUBLIC ${CMAKE_CURRENT_BINARY_DIR}) + add_library(Transmission - Client.cpp) \ No newline at end of file + Client.cpp + Transmit.cpp + CharBuff.cpp + "Exception/PrepareDataException.h" +) + +target_link_libraries(Transmission protobuf proto) \ No newline at end of file diff --git a/src/CharBuff.cpp b/src/CharBuff.cpp new file mode 100644 index 0000000..e80a873 --- /dev/null +++ b/src/CharBuff.cpp @@ -0,0 +1,78 @@ + +#include +#include +#include +#include "CharBuff.h" + +namespace Transmission { + CharBuff::CharBuff(int64_t size) { + if (size <= 0) + throw std::runtime_error("缓冲区大小不能为0或者负数"); + ptr = new unsigned char[size]; + ptrSize = size; + } + + CharBuff::~CharBuff() { + delete[] ptr; + } + + void CharBuff::put(const void *buff, int64_t start, int64_t size) { + if ((this->ptrSize - this->flag) < size) { + //剩余容量不足以容纳新数据 + if (this->ptrSize - (this->flag - this->startFlag) >= size) { + //丢弃缓冲区内已经被处理的数据后,可以继续添加数据。 + //避免频繁的new delete,造成不必要的性能消耗 + memmove(this->ptr, this->ptr + this->startFlag, this->flag - this->startFlag); + this->flag -= this->startFlag; + this->startFlag = 0; + } else { + //缓冲区剩余容量无法继续添加数据,需要调整容量 + //调整容量时,顺带丢弃已经被读取的内存数据 + //调整后缓冲区大小 + int64_t addSize = (this->ptrSize - this->startFlag) + (size * 2); + //不能缩小到低于256字节,防止频繁调整容量 + if (addSize < 256) + addSize = 256; + if (addSize > 1024 * 1024 * 8) + //出于安全考虑,缓冲区大小最大不能超过8mb + throw std::bad_alloc(); + auto *tmp = new unsigned char[addSize]; + this->ptrSize = addSize; + memcpy(tmp, this->ptr + this->startFlag, this->flag - this->startFlag); + delete[] this->ptr; + this->flag -= this->startFlag; + this->startFlag = 0; + this->ptr = tmp; + } + } + memcpy(this->ptr + this->flag, (char *) (buff) + start, size); + this->flag += size; + } + + unsigned char CharBuff::get() { + return *(ptr + startFlag++); + } + + int64_t CharBuff::getSize() const { + return this->flag - this->startFlag; + } + + int64_t CharBuff::getBuffSize() const { + return this->ptrSize; + } + + CharBuff::CharBuff(const unsigned char *ptr, int64_t start, int64_t size) { + this->ptr = new unsigned char[size]; + memcpy(this->ptr, ptr + start, size); + this->flag = size; + this->ptrSize = size; + } + + void CharBuff::get(unsigned char *buff, int64_t size) { + if (this->startFlag + size > this->flag) { + size = this->flag - this->startFlag; + } + memcpy(buff, this->ptr + startFlag, size); + this->startFlag += size; + } +} // ling \ No newline at end of file diff --git a/src/CharBuff.h b/src/CharBuff.h new file mode 100644 index 0000000..3829398 --- /dev/null +++ b/src/CharBuff.h @@ -0,0 +1,56 @@ + +#ifndef MEMORYPLUGKERNEL_CHARBUFF_H +#define MEMORYPLUGKERNEL_CHARBUFF_H + +#include + +namespace Transmission { + /** + * 一个轻量级的字节缓冲区,线程不安全 + */ + class CharBuff { + private: + int64_t ptrSize; + unsigned char *ptr; + //指针位置 + int64_t flag = 0; + int64_t startFlag = 0; + public: + /** + * 创建缓冲区,默认分配64字节空间 + * @param size + */ + explicit CharBuff(int64_t size = 64); + + CharBuff(const unsigned char *ptr, int64_t start, int64_t size); + + virtual ~CharBuff(); + + /** + * 添加数据到缓冲区末尾 + * @param buff 数据 + * @param start 开始位置 + * @param size 长度 + */ + void put(const void *buff, int64_t start, int64_t size); + + unsigned char get(); + + void get(unsigned char *buff, int64_t size); + + /** + * 获取数据大小 + * @return + */ + [[nodiscard]] int64_t getSize() const; + + /** + * 获取缓冲区大小 + * @return + */ + [[nodiscard]] int64_t getBuffSize() const; + }; + +} // ling + +#endif //MEMORYPLUGKERNEL_CHARBUFF_H diff --git a/src/Client.h b/src/Client.h index c5d059b..e012325 100644 --- a/src/Client.h +++ b/src/Client.h @@ -8,7 +8,7 @@ #define TRANSMISSION_CLIENT_H namespace Transmission { - + /// 客户端类 class Client { }; diff --git a/src/Exception/PrepareDataException.h b/src/Exception/PrepareDataException.h new file mode 100644 index 0000000..7ae950f --- /dev/null +++ b/src/Exception/PrepareDataException.h @@ -0,0 +1,24 @@ +// 版权所有 (c) ling 保留所有权利。 +// 除非另行说明,否则仅允许在Transmission中使用此文件中的代码。 +// +// 由 ling 创建于 24-4-18. +// + +#ifndef TRANSMISSION_PREPAREDATAEXCEPTION_H +#define TRANSMISSION_PREPAREDATAEXCEPTION_H + +#include + +namespace Transmission { + /// 准备数据时发生异常 + class PrepareDataException : public std::runtime_error { + public: + explicit PrepareDataException(const std::string &e) : std::runtime_error(e) { + } + + explicit PrepareDataException(const char *e) : std::runtime_error(e) { + } + }; +} + +#endif //TRANSMISSION_PREPAREDATAEXCEPTION_H diff --git a/src/Transmit.cpp b/src/Transmit.cpp new file mode 100644 index 0000000..a0c358d --- /dev/null +++ b/src/Transmit.cpp @@ -0,0 +1,112 @@ +// 版权所有 (c) ling 保留所有权利。 +// 除非另行说明,否则仅允许在Transmission中使用此文件中的代码。 +// +// 由 ling 创建于 24-4-18. +// + +#include "Transmit.h" +#include +#include "Exception/PrepareDataException.h" +#include +#include + +#define DATA_STOP 0x20030507 + +namespace Transmission { + Transmit::Transmit(SOCKET fd, std::string ip) : ip(std::move(ip)), fd(fd) { + + } + + void Transmit::dataArrives(unsigned char *data, size_t size) { + std::unique_lock lock(dataMutex); + if (this->packSize < 0) { + if (charBuff.getSize() >= 4) { + //读取下一个数据包的长度 + this->packSize = + (charBuff.get() << 0) + (charBuff.get() << 8) + (charBuff.get() << 16) + (charBuff.get() << 24); + } else { + return; + } + } + //一个格式正确的数据包应该由如下部分构成: + // 头部:包体大小(int32_t) + // 包体:数据包正文段,长度不定 + // 尾部:MagicNumber(int32_t) + if (charBuff.getSize() < this->packSize + sizeof(int32_t)) { + //数据不够,等待填充数据 + return; + } + int32_t magicNumber = (charBuff.get() << 0) + (charBuff.get() << 8) + (charBuff.get() << 16) + (charBuff.get() << 24); + if (magicNumber != DATA_STOP) + throw PrepareDataException("魔数错误"); + + auto buff = std::shared_ptr(new unsigned char[this->packSize], [](const unsigned char *p) { + delete[] p; + }); + charBuff.get(buff.get(), this->packSize); + DataPackets dataPackets; + if (!dataPackets.ParseFromArray(buff.get(), this->packSize)) + throw PrepareDataException("反序列化出错"); + + switch (dataPackets.type()) { + //无压缩 + case CompressAlgorithm::NOT: { + auto pack = copyMem(dataPackets.data()); + packetReady(dataPackets.type(), pack, dataPackets.data().size()); + break; + } + //lzma压缩 + case CompressAlgorithm::LZMA: { + size_t un_size; + auto pack = unLzma(dataPackets.data(), un_size); + packetReady(dataPackets.type(), pack, un_size); + break; + } + default: + throw PrepareDataException("未知的压缩算法"); + } + + } + + std::shared_ptr Transmit::copyMem(const std::string &str) { + auto buff = std::shared_ptr(new unsigned char[str.size()], [](const unsigned char *p) { + delete[] p; + }); + auto mem = reinterpret_cast(str.data()); + memcpy(buff.get(), mem, str.size()); + return buff; + } + + std::shared_ptr Transmit::unLzma(const std::string &str, size_t &size) { + lzma_stream stream = LZMA_STREAM_INIT; + lzma_ret ret = lzma_easy_encoder(&stream, LZMA_PRESET_DEFAULT, LZMA_CHECK_CRC64); + if (ret != LZMA_OK) + throw PrepareDataException("LZMA初始化失败!"); + // 解压缩数据 + stream.next_in = reinterpret_cast(str.data()); + stream.avail_in = str.size(); + CharBuff buff((int64_t) lzma_stream_buffer_bound(str.size())); + size_t lastSize = 0; + do { + size_t bufferSize = lzma_stream_buffer_bound(str.size()); + auto ptr = std::shared_ptr(new unsigned char[bufferSize], [](const unsigned char *p) { + delete[] p; + }); + stream.next_out = reinterpret_cast(ptr.get()); + stream.avail_out = bufferSize; + ret = lzma_code(&stream, LZMA_FINISH); + lastSize = stream.total_out - lastSize; + buff.put(ptr.get(), 0, (int64_t) lastSize); + } while (ret == LZMA_BUF_ERROR); + + if (ret != LZMA_OK && ret != LZMA_STREAM_END) + throw PrepareDataException("LZMA解压缩失败"); + + auto ptr = std::shared_ptr(new unsigned char[buff.getSize()], [](const unsigned char *p) { + delete[] p; + }); + size = buff.getSize(); + buff.get(ptr.get(), buff.getSize()); + return ptr; + } +} // Transmission \ No newline at end of file diff --git a/src/Transmit.h b/src/Transmit.h new file mode 100644 index 0000000..915ea7e --- /dev/null +++ b/src/Transmit.h @@ -0,0 +1,47 @@ +// 版权所有 (c) ling 保留所有权利。 +// 除非另行说明,否则仅允许在Transmission中使用此文件中的代码。 +// +// 由 ling 创建于 24-4-18. +// + +#ifndef TRANSMISSION_TRANSMIT_H +#define TRANSMISSION_TRANSMIT_H + +#include +#include +#include +#include "CharBuff.h" + +namespace Transmission { + /// 传输层实现 + /// 传输过程使用lzma算法压缩 + class Transmit { + private: + const std::string ip; + const SOCKET fd; + CharBuff charBuff; + int32_t packSize = -1; + std::mutex dataMutex; + + protected: + + virtual std::shared_ptr copyMem(const std::string &str); + + /// 解压lzma数据包 + virtual std::shared_ptr unLzma(const std::string &str, size_t &size); + + public: + explicit Transmit(SOCKET fd, std::string ip); + + /// 收到来自网络的数据 + virtual void dataArrives(unsigned char *data, size_t size); + + /// 数据包就绪 + /// @param type 数据包类型 + /// @param data 解压缩后的数据包 + virtual void packetReady(int type, std::shared_ptr data, size_t size) = 0; + }; + +} // Transmission + +#endif //TRANSMISSION_TRANSMIT_H diff --git a/src/protobuf/transmission.proto b/src/protobuf/transmission.proto new file mode 100644 index 0000000..c8f9417 --- /dev/null +++ b/src/protobuf/transmission.proto @@ -0,0 +1,15 @@ +package Transmission; + +enum CompressAlgorithm { + NOT = 1; + LZMA = 2; +} + +//基础数据包 +message DataPackets { + //压缩算法 + required int64 algorithm = 1; + //数据包类型 + required int32 type = 2; + required bytes data = 3; +}