Compare commits

...

13 Commits

12 changed files with 644 additions and 3 deletions

View File

@@ -1,6 +1,32 @@
cmake_minimum_required(VERSION 3.28)
cmake_minimum_required(VERSION 3.16)
project(Transmission)
set(CMAKE_CXX_STANDARD 20)
if(NOT DEFINED TARGET_SYSTEM_NAME)
set(TARGET_SYSTEM_NAME ${CMAKE_SYSTEM_NAME})
message("使用默认目标 : ${TARGET_SYSTEM_NAME}")
endif ()
if (${TARGET_SYSTEM_NAME} STREQUAL "Linux")
add_definitions(-DBUILD_LINUX=1)
message("以Linux为构建目标")
elseif (${TARGET_SYSTEM_NAME} STREQUAL "Windows")
add_definitions(-DBUILD_WINDOWS=1)
message("以Windows为构建目标")
elseif(${TARGET_SYSTEM_NAME} STREQUAL "Android")
add_definitions(-DBUILD_ANDROID=1)
message("以Android为构建目标")
else ()
message(FATAL_ERROR "未知的目标操作系统")
endif ()
add_subdirectory(src)
if (IS_TEST)
add_subdirectory(test)
message("包含Gtest目标")
endif ()
add_executable(exec main.cpp
)
target_link_libraries(exec Transmission)
add_executable(Transmission main.cpp)

38
src/CMakeLists.txt Normal file
View File

@@ -0,0 +1,38 @@
cmake_minimum_required(VERSION 3.16)
project(Transmission)
find_package(Protobuf REQUIRED)
set(CMAKE_CXX_STANDARD 20)
FILE(GLOB protofiles "${CMAKE_CURRENT_SOURCE_DIR}/protobuf/*.proto")
message(STATUS ${protofiles})
PROTOBUF_GENERATE_CPP(PROTOSRCS PROTOHDRS ${protofiles})
add_library(proto STATIC ${PROTOSRCS} ${PROTOHDRS})
target_include_directories(proto PUBLIC ${CMAKE_CURRENT_BINARY_DIR})
add_library(Transmission
Client.cpp
Transmit.cpp
CharBuff.cpp
Exception/PrepareDataException.h
)
target_include_directories(Transmission PUBLIC ${CMAKE_CURRENT_BINARY_DIR})
if(NOT DEFINED PROTOBUF_LIB_PATH)
set(PROTOBUF_LIB_PATH "protobuf")
else ()
message("手动指定protobuf库为${PROTOBUF_LIB_PATH}")
endif ()
if(DEFINED PROTOBUF_INCLUDE_PATH)
message("手动指定protobuf头文件路径为${PROTOBUF_INCLUDE_PATH}")
target_include_directories(proto PUBLIC ${PROTOBUF_INCLUDE_PATH})
endif ()
if(NOT DEFINED LZMA_LIB_PATH)
set(LZMA_LIB_PATH "lzma")
else ()
message("手动指定LZMA库为${LZMA_LIB_PATH}")
endif ()
target_link_libraries(Transmission ${PROTOBUF_LIB_PATH} proto ${LZMA_LIB_PATH})

78
src/CharBuff.cpp Normal file
View File

@@ -0,0 +1,78 @@
#include <stdexcept>
#include <cstring>
#include <iostream>
#include "CharBuff.h"
namespace Transmission {
CharBuff::CharBuff(int64_t size) {
if (size <= 0)
throw std::runtime_error("缓冲区大小不能为0或者负数");
ptr = new unsigned char[size];
ptrSize = size;
}
CharBuff::~CharBuff() {
delete[] ptr;
}
void CharBuff::put(const void *buff, int64_t start, int64_t size) {
if ((this->ptrSize - this->flag) < size) {
//剩余容量不足以容纳新数据
if (this->ptrSize - (this->flag - this->startFlag) >= size) {
//丢弃缓冲区内已经被处理的数据后,可以继续添加数据。
//避免频繁的new delete造成不必要的性能消耗
memmove(this->ptr, this->ptr + this->startFlag, this->flag - this->startFlag);
this->flag -= this->startFlag;
this->startFlag = 0;
} else {
//缓冲区剩余容量无法继续添加数据,需要调整容量
//调整容量时,顺带丢弃已经被读取的内存数据
//调整后缓冲区大小
int64_t addSize = (this->ptrSize - this->startFlag) + (size * 2);
//不能缩小到低于256字节防止频繁调整容量
if (addSize < 256)
addSize = 256;
if (addSize > 1024 * 1024 * 8)
//出于安全考虑缓冲区大小最大不能超过8mb
throw std::bad_alloc();
auto *tmp = new unsigned char[addSize];
this->ptrSize = addSize;
memcpy(tmp, this->ptr + this->startFlag, this->flag - this->startFlag);
delete[] this->ptr;
this->flag -= this->startFlag;
this->startFlag = 0;
this->ptr = tmp;
}
}
memcpy(this->ptr + this->flag, (char *) (buff) + start, size);
this->flag += size;
}
unsigned char CharBuff::get() {
return *(ptr + startFlag++);
}
int64_t CharBuff::getSize() const {
return this->flag - this->startFlag;
}
int64_t CharBuff::getBuffSize() const {
return this->ptrSize;
}
CharBuff::CharBuff(const unsigned char *ptr, int64_t start, int64_t size) {
this->ptr = new unsigned char[size];
memcpy(this->ptr, ptr + start, size);
this->flag = size;
this->ptrSize = size;
}
void CharBuff::get(unsigned char *buff, int64_t size) {
if (this->startFlag + size > this->flag) {
size = this->flag - this->startFlag;
}
memcpy(buff, this->ptr + startFlag, size);
this->startFlag += size;
}
} // ling

56
src/CharBuff.h Normal file
View File

@@ -0,0 +1,56 @@
#ifndef MEMORYPLUGKERNEL_CHARBUFF_H
#define MEMORYPLUGKERNEL_CHARBUFF_H
#include <mutex>
namespace Transmission {
/**
* 一个轻量级的字节缓冲区,线程不安全
*/
class CharBuff {
private:
int64_t ptrSize;
unsigned char *ptr;
//指针位置
int64_t flag = 0;
int64_t startFlag = 0;
public:
/**
* 创建缓冲区默认分配64字节空间
* @param size
*/
explicit CharBuff(int64_t size = 64);
CharBuff(const unsigned char *ptr, int64_t start, int64_t size);
virtual ~CharBuff();
/**
* 添加数据到缓冲区末尾
* @param buff 数据
* @param start 开始位置
* @param size 长度
*/
void put(const void *buff, int64_t start, int64_t size);
unsigned char get();
void get(unsigned char *buff, int64_t size);
/**
* 获取数据大小
* @return
*/
[[nodiscard]] int64_t getSize() const;
/**
* 获取缓冲区大小
* @return
*/
[[nodiscard]] int64_t getBuffSize() const;
};
} // ling
#endif //MEMORYPLUGKERNEL_CHARBUFF_H

10
src/Client.cpp Normal file
View File

@@ -0,0 +1,10 @@
// 版权所有 (c) ling 保留所有权利。
// 除非另行说明否则仅允许在Transmission中使用此文件中的代码。
//
// 由 ling 创建于 24-4-18.
//
#include "Client.h"
namespace Transmission {
} // Transmission

18
src/Client.h Normal file
View File

@@ -0,0 +1,18 @@
// 版权所有 (c) ling 保留所有权利。
// 除非另行说明否则仅允许在Transmission中使用此文件中的代码。
//
// 由 ling 创建于 24-4-18.
//
#ifndef TRANSMISSION_CLIENT_H
#define TRANSMISSION_CLIENT_H
namespace Transmission {
/// 客户端类
class Client {
};
} // Transmission
#endif //TRANSMISSION_CLIENT_H

View File

@@ -0,0 +1,24 @@
// 版权所有 (c) ling 保留所有权利。
// 除非另行说明否则仅允许在Transmission中使用此文件中的代码。
//
// 由 ling 创建于 24-4-18.
//
#ifndef TRANSMISSION_PREPAREDATAEXCEPTION_H
#define TRANSMISSION_PREPAREDATAEXCEPTION_H
#include <stdexcept>
namespace Transmission {
/// 准备数据时发生异常
class PrepareDataException : public std::runtime_error {
public:
explicit PrepareDataException(const std::string &e) : std::runtime_error(e) {
}
explicit PrepareDataException(const char *e) : std::runtime_error(e) {
}
};
}
#endif //TRANSMISSION_PREPAREDATAEXCEPTION_H

199
src/Transmit.cpp Normal file
View File

@@ -0,0 +1,199 @@
// 版权所有 (c) ling 保留所有权利。
// 除非另行说明否则仅允许在Transmission中使用此文件中的代码。
//
// 由 ling 创建于 24-4-18.
//
#include "Transmit.h"
#include <lzma.h>
#include "Exception/PrepareDataException.h"
#include <utility>
#include <transmission.pb.h>
namespace Transmission {
const int32_t Transmit::DATA_STOP = 0x20030507;
int Transmit::compressSize = 64;
Transmit::Transmit(SOCKET fd, std::string ip) : ip(std::move(ip)), fd(fd) {
}
void Transmit::dataArrives(const unsigned char *data, size_t size) {
std::unique_lock<std::mutex> lock(dataMutex);
charBuff.put(data, 0, size);
if (this->packSize < 0) {
if (charBuff.getSize() >= 4) {
//读取下一个数据包的长度
this->packSize =
(charBuff.get() << 0) + (charBuff.get() << 8) + (charBuff.get() << 16) + (charBuff.get() << 24);
} else {
return;
}
}
//一个格式正确的数据包应该由如下部分构成:
// 头部:包体大小(int32_t)
// 包体:数据包正文段,长度不定
// 尾部MagicNumber(int32_t)
if (charBuff.getSize() < this->packSize + sizeof(int32_t)) {
//数据不够,等待填充数据
return;
}
auto buff = std::shared_ptr<unsigned char>(new unsigned char[this->packSize], [](const unsigned char *p) {
delete[] p;
});
charBuff.get(buff.get(), this->packSize);
int32_t magicNumber = (charBuff.get() << 0) + (charBuff.get() << 8) + (charBuff.get() << 16) + (charBuff.get() << 24);
if (magicNumber != DATA_STOP)
throw PrepareDataException("魔数错误");
DataPackets dataPackets;
if (!dataPackets.ParseFromArray(buff.get(), this->packSize))
throw PrepareDataException("反序列化出错");
this->packSize = -1;
switch (dataPackets.algorithm()) {
//无压缩
case CompressAlgorithm::NOT: {
auto pack = copyMem(dataPackets.data());
packetReady(dataPackets.type(), pack, dataPackets.data().size());
break;
}
//lzma压缩
case CompressAlgorithm::LZMA: {
size_t un_size;
auto pack = unLzma(dataPackets.data(), un_size);
packetReady(dataPackets.type(), pack, un_size);
break;
}
default:
throw PrepareDataException("未知的压缩算法");
}
}
std::shared_ptr<unsigned char> Transmit::copyMem(const std::string &str) {
auto buff = std::shared_ptr<unsigned char>(new unsigned char[str.size()], [](const unsigned char *p) {
delete[] p;
});
auto mem = reinterpret_cast<const unsigned char *>(str.data());
memcpy(buff.get(), mem, str.size());
return buff;
}
std::shared_ptr<unsigned char> Transmit::unLzma(const std::string &str, size_t &size) {
lzma_stream stream = LZMA_STREAM_INIT;
lzma_ret ret = lzma_stream_decoder(&stream, UINT64_MAX, LZMA_CONCATENATED);
if (ret != LZMA_OK)
throw PrepareDataException("LZMA初始化失败");
// 解压缩数据
stream.next_in = reinterpret_cast<const uint8_t *>(str.data());
stream.avail_in = str.size();
CharBuff buff;
const int defaultSize = 1024;
size_t lastSize = 0;
while (true) {
//size_t bufferSize = defaultSize;
auto ptr = std::shared_ptr<unsigned char>(new unsigned char[defaultSize], [](const unsigned char *p) {
delete[] p;
});
stream.next_out = reinterpret_cast<uint8_t *>(ptr.get());
stream.avail_out = defaultSize;
ret = lzma_code(&stream, LZMA_FINISH);
if (ret == LZMA_STREAM_END) {
lastSize = stream.total_out - lastSize;
buff.put(ptr.get(), 0, (int64_t) lastSize);
break;
}
if (ret != LZMA_OK)
throw PrepareDataException("LZMA解压缩失败");
lastSize = stream.total_out - lastSize;
buff.put(ptr.get(), 0, (int64_t) lastSize);
}
auto ptr = std::shared_ptr<unsigned char>(new unsigned char[buff.getSize()], [](const unsigned char *p) {
delete[] p;
});
size = buff.getSize();
buff.get(ptr.get(), buff.getSize());
return ptr;
}
void Transmit::sendData(const unsigned char *data, size_t size, int type) {
DataPackets dataPackets;
dataPackets.set_type(type);
std::string byteData(reinterpret_cast<const char *>(data), size);
dataPackets.set_data(byteData);
if (size < compressSize) {
//小于compressSize的数据包不压缩
pushData(dataPackets);
} else {
//大于64字节的数据使用lzma压缩后发送
pushLzmaData(dataPackets);
}
}
void Transmit::pushData(const unsigned char *data, int32_t size) const {
::send(this->fd, (const char *) &size, sizeof(int32_t), 0);
::send(this->fd, (const char *) data, size, 0);
::send(this->fd, (const char *) &DATA_STOP, sizeof(DATA_STOP), 0);
}
void Transmit::pushData(DataPackets &data) {
data.set_algorithm(CompressAlgorithm::NOT);
size_t pack_size = data.ByteSizeLong();
auto *temp = new unsigned char[pack_size];
data.SerializeToArray(temp, (int) pack_size);
pushData(temp, pack_size);
delete[] temp;
}
void Transmit::pushLzmaData(DataPackets &data) {
// 初始化压缩流
lzma_stream stream = LZMA_STREAM_INIT;
lzma_ret ret = lzma_easy_encoder(&stream, 6, LZMA_CHECK_CRC64); // 压缩级别为 6使用 CRC64 进行校验
if (ret != LZMA_OK)
throw PrepareDataException("LZMA初始化失败");
CharBuff buff;
size_t lastSize = 0;
stream.next_in = (const uint8_t *) data.data().data();
stream.avail_in = data.data().size();
//size_t maxCompressedSize = lzma_stream_buffer_bound(data.data().size());
const size_t maxCompressedSize = 1024;
while (true) {
auto ptr = std::shared_ptr<unsigned char>(new unsigned char[maxCompressedSize], [](const unsigned char *p) {
delete[] p;
});
stream.next_out = reinterpret_cast<uint8_t *>(ptr.get());
stream.avail_out = maxCompressedSize;
ret = lzma_code(&stream, LZMA_FINISH);
if (ret == LZMA_STREAM_END) {
lastSize = stream.total_out - lastSize;
buff.put(ptr.get(), 0, (int64_t) lastSize);
break;
}
if (ret != LZMA_OK)
throw PrepareDataException("LZMA压缩失败");
lastSize = stream.total_out - lastSize;
buff.put(ptr.get(), 0, (int64_t) lastSize);
}
auto ptr = std::shared_ptr<unsigned char>(new unsigned char[buff.getSize()], [](const unsigned char *p) {
delete[] p;
});
size_t size = buff.getSize();
buff.get(ptr.get(), buff.getSize());
data.set_algorithm(CompressAlgorithm::LZMA);
std::string byteData(reinterpret_cast<char *>(ptr.get()), size);
data.set_data(byteData);
size_t pack_size = data.ByteSizeLong();
auto *temp = new unsigned char[pack_size];
data.SerializeToArray(temp, (int) pack_size);
pushData(temp, pack_size);
delete[] temp;
}
} // Transmission

81
src/Transmit.h Normal file
View File

@@ -0,0 +1,81 @@
// 版权所有 (c) ling 保留所有权利。
// 除非另行说明否则仅允许在Transmission中使用此文件中的代码。
//
// 由 ling 创建于 24-4-18.
//
#ifndef TRANSMISSION_TRANSMIT_H
#define TRANSMISSION_TRANSMIT_H
#include <string>
#ifdef BUILD_LINUX
#include <pcap/socket.h>
#endif
#ifdef BUILD_WINDOWS
#include <winsock.h>
#include <io.h>
#include <ws2tcpip.h>
#endif
#ifdef BUILD_ANDROID
#include <sys/socket.h>
#define SOCKET int
#endif
#include <memory>
#include "CharBuff.h"
#include "transmission.pb.h"
namespace Transmission {
/// 传输层实现
/// 传输过程使用lzma算法压缩
class Transmit {
private:
const std::string ip;
const SOCKET fd;
CharBuff charBuff;
int32_t packSize = -1;
std::mutex dataMutex;
private:
/// 无压缩
void pushData(DataPackets &data);
/// lzma压缩后发送
void pushLzmaData(DataPackets &data);
/// 实际发送
virtual void pushData(const unsigned char *data, int32_t size) const;
protected:
virtual std::shared_ptr<unsigned char> copyMem(const std::string &str);
/// 解压lzma数据包
virtual std::shared_ptr<unsigned char> unLzma(const std::string &str, size_t &size);
public:
//最小压缩大小
static int compressSize;
static const int32_t DATA_STOP;
explicit Transmit(SOCKET fd, std::string ip);
/// 收到来自网络的数据
virtual void dataArrives(const unsigned char *data, size_t size);
/// 发送数据
virtual void sendData(const unsigned char *data, size_t size, int type);
/// 数据包就绪
/// @param type 数据包类型
/// @param data 解压缩后的数据包
virtual void packetReady(int type, std::shared_ptr<unsigned char> data, size_t size) = 0;
};
} // Transmission
#endif //TRANSMISSION_TRANSMIT_H

View File

@@ -0,0 +1,15 @@
package Transmission;
enum CompressAlgorithm {
NOT = 1;
LZMA = 2;
}
//基础数据包
message DataPackets {
//压缩算法
required int64 algorithm = 1;
//数据包类型
required int32 type = 2;
required bytes data = 3;
}

16
test/CMakeLists.txt Normal file
View File

@@ -0,0 +1,16 @@
cmake_minimum_required(VERSION 3.16)
project(Transmission)
set(CMAKE_CXX_STANDARD 20)
add_executable(Test
test_main.cpp
)
include_directories(../src)
enable_testing()
find_package(GTest REQUIRED)
target_link_libraries(Test PRIVATE GTest::GTest GTest::Main Transmission absl_log_internal_check_op absl_log_internal_message)
set(GTEST_LIB gtest gtest_main)
add_test(NAME Test COMMAND Test)

80
test/test_main.cpp Normal file
View File

@@ -0,0 +1,80 @@
// 版权所有 (c) ling 保留所有权利。
// 除非另行说明否则仅允许在Transmission中使用此文件中的代码。
//
// 由 ling 创建于 24-4-18.
//
#include <gtest/gtest.h>
#include "Transmit.h"
static bool isExec = false;
static const char *shortStr = "Client Hello";
static const char *LongStr = "Client HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient Hello";
static const char *LongLongStr = "Client HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient Hello"
"Client HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient Hello"
"Client HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient Hello"
"Client HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient Hello"
"Client HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient Hello"
"Client HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient Hello"
"Client HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient Hello"
"Client HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient Hello"
"Client HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient HelloClient Hello";
class Transmit : public Transmission::Transmit {
private:
SOCKET fd;
public:
explicit Transmit(SOCKET fd, std::string ip) : Transmission::Transmit(fd, ip) {
this->fd = fd;
}
void packetReady(int type, std::shared_ptr<unsigned char> data, size_t size) override {
if (type == 1)
ASSERT_TRUE(strcmp((char *) data.get(), shortStr) == 0);
else if (type == 2)
ASSERT_TRUE(strcmp((char *) data.get(), LongStr) == 0);
else if (type == 3) {
ASSERT_TRUE(strcmp((char *) data.get(), LongLongStr) == 0);
} else
ASSERT_TRUE(false);
isExec = true;
}
void pushData(const unsigned char *data, int32_t size) const override {
::write(this->fd, &size, sizeof(int32_t));
::write(this->fd, data, size);
::write(this->fd, &DATA_STOP, sizeof(DATA_STOP));
}
void read() {
int32_t size = 0;
::read(this->fd, &size, sizeof(size));
auto temp = new unsigned char[size];
::read(this->fd, temp, size);
int32_t stop = 0;
::read(this->fd, &stop, sizeof(stop));
dataArrives((unsigned char *) &size, sizeof(size));
dataArrives(temp, size);
dataArrives((unsigned char *) &stop, sizeof(stop));
delete[] temp;
}
};
TEST(Transmit测试, 1) {
int fds[2];
pipe(fds);
int read = fds[0];
int write = fds[1];
Transmit transmitRead(read, "");
Transmit transmitWrite(write, "");
transmitWrite.sendData((unsigned char *) shortStr, strlen(shortStr) + 1, 1);
transmitRead.read();
ASSERT_TRUE(isExec);
isExec = false;
transmitWrite.sendData((unsigned char *) LongStr, strlen(LongStr) + 1, 2);
transmitRead.read();
ASSERT_TRUE(isExec);
isExec = false;
transmitWrite.sendData((unsigned char *) LongLongStr, strlen(LongLongStr) + 1, 3);
transmitRead.read();
ASSERT_TRUE(isExec);
}