首次提交

This commit is contained in:
2025-11-19 20:03:11 +08:00
commit 5cb9c66653
5 changed files with 2282 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
/target
.idea

1906
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

20
Cargo.toml Normal file
View File

@@ -0,0 +1,20 @@
[package]
name = "LogCollection"
version = "0.1.0"
edition = "2024"
[dependencies]
bytes = "1.10.1"
crossbeam = "0.8.4"
reqwest = { version = "0.12.24", features = ["json"] }
serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.145"
spin = "0.10.0"
tokio = { version = "1.48.0", features = ["full", "tracing"] }
tracing = "0.1.41"
tracing-appender = "0.2.3"
tracing-futures = "0.2.5"
tracing-subscriber = { version = "0.3.20", features = ["json", "env-filter", "fmt"] }
[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }

116
src/lib.rs Normal file
View File

@@ -0,0 +1,116 @@
use serde::{Deserialize, Serialize};
mod zinc_search_gather;
use std::io;
use std::path::Path;
use tracing::Level;
use tracing_appender::{non_blocking, rolling};
use tracing_subscriber::filter::{FilterFn, Filtered};
use tracing_subscriber::fmt::MakeWriter;
use tracing_subscriber::fmt::format::{Format, Json, JsonFields};
use tracing_subscriber::{EnvFilter, Layer, fmt, layer::SubscriberExt, util::SubscriberInitExt};
static mut MODULE_NAME: &'static str = "";
macro_rules! make_write {
($write:expr) => {
fmt::layer()
.json()
.with_writer($write)
.with_current_span(true)
.flatten_event(true)
.with_target(true)
.with_file(false)
.with_line_number(false)
.with_thread_ids(false)
.with_thread_names(false)
.with_filter(make_filter())
};
}
fn make_filter() -> FilterFn {
FilterFn::new(|metadata| {
let name = unsafe { MODULE_NAME };
let target = metadata.target();
target.starts_with(name)
|| target.starts_with("database")
|| target.starts_with("proxy_pool")
|| target.starts_with("rpc_api")
|| target.starts_with("utils")
|| metadata.level() <= &Level::WARN // WARN 和 ERROR 永远允许
})
}
pub async fn init_logging(
service_name: &str,
module_name: &'static str,
log_dir: impl AsRef<Path>,
config: Option<LayerConfig>,
) {
unsafe {
MODULE_NAME = module_name;
}
let gather = if let Some(config) = config {
Some(zinc_search_gather::start_zinc_search_gather_service(config).await)
} else {
None
};
// 创建每天轮换的日志文件
let file_appender = rolling::daily(log_dir, "app.log");
let (non_blocking_file, _guard) = non_blocking(file_appender);
// 同时输出到 stdout
let stdout = std::io::stdout;
let (non_blocking_stdout, _stdout_guard) = non_blocking(stdout());
// 配置环境变量过滤器,默认 info 级别
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
if cfg!(debug_assertions) {
EnvFilter::new("trace")
} else {
EnvFilter::new("info")
}
});
// 构建订阅器
let builder = tracing_subscriber::registry()
.with(env_filter)
//文件输出层 json) 格式
.with(make_write!(non_blocking_file))
// stdout 输出层JSON格式)
.with(
fmt::layer()
.compact()
.with_writer(non_blocking_stdout)
.with_file(false)
.with_line_number(false)
.with_filter(make_filter()),
);
if let Some(gather) = gather {
builder.with(make_write!(gather)).init();
} else {
builder.init();
}
// 记录启动信息
tracing::info!(
service = service_name,
version = env!("CARGO_PKG_VERSION"),
"Service started"
);
// 保持 guard 存活
std::mem::forget(_guard);
std::mem::forget(_stdout_guard);
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LayerConfig {
zinc_search_host: String,
username: String,
password: String,
node_name: String,
index_name: String,
}

238
src/zinc_search_gather.rs Normal file
View File

@@ -0,0 +1,238 @@
// 版权所有 (c) ling 保留所有权利。
// 除非另行说明否则仅允许在FindPowerAutoServer中使用此文件中的代码。
//
// 由 ling 创建于 2025/11/12.
#![allow(non_snake_case)]
use crate::LayerConfig;
use bytes::{BufMut, BytesMut};
use crossbeam::queue::SegQueue;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::error::Error;
use std::io::{self, Write};
use std::sync::{Arc, OnceLock};
use std::time::Duration;
use tokio::select;
use tokio::sync::Notify;
use tokio::time::sleep;
use tracing::error;
use tracing_subscriber::fmt::MakeWriter;
type BUFFER = Arc<spin::Mutex<SegQueue<String>>>;
/// 初始化并启动服务
/// # Panic! 当重复调用时
pub async fn start_zinc_search_gather_service(config: LayerConfig) -> ZincSearchGatherWrite {
if let Err(err) = test_zinc_search_connection(&config).await {
panic!("{:?}", err);
}
let buffer = BUFFER::default();
let notify = Arc::new(Notify::new());
#[cfg(tokio_unstable)]
tokio::task::Builder::new()
.name("日志总线")
.spawn(zinc_search_gather_service(
config,
buffer.clone(),
notify.clone(),
))
.expect("启动日志总线失败");
#[cfg(not(tokio_unstable))]
tokio::spawn(zinc_search_gather_service(
config,
buffer.clone(),
notify.clone(),
));
let write = ZincSearchGatherWrite::new(buffer, notify);
write
}
async fn zinc_search_gather_service(
config: LayerConfig,
active_buffer: BUFFER,
notify: Arc<Notify>,
) -> ! {
let mut buffer = SegQueue::new();
let client = Client::builder()
.timeout(Duration::from_secs(5))
.pool_max_idle_per_host(5)
.build()
.unwrap();
loop {
// 等待活跃缓冲器装满,或者时间片到期
select! {
_ = sleep(Duration::from_secs(1)) => {}
_ = notify.notified() => {}
}
// 交换缓冲区
{
let mut lock = active_buffer.lock();
std::mem::swap(&mut *lock, &mut buffer);
}
// 现在buffer内存储了需要发送的日志
let result = send_logs_to_zinc_search_bulk(&client, &config, &mut buffer).await;
match result {
Ok(_) => {}
Err(err) => {
error!("提交日志失败:{:?}", err)
}
}
}
}
#[derive(Debug)]
struct SubmitStats {
success: usize,
failed: usize,
}
async fn send_logs_to_zinc_search_bulk(
client: &Client,
config: &LayerConfig,
log_lines: &mut SegQueue<String>,
) -> Result<SubmitStats, Box<dyn std::error::Error>> {
let url = format!("{}/api/_bulk", config.zinc_search_host);
let mut ndjson_lines = Vec::with_capacity(log_lines.len() * 2);
let mut valid_count = 0;
// 预序列化索引元数据
let index_meta = serde_json::to_string(&serde_json::json!({
"index": {"_index": config.index_name}
}))?;
while let Some(line) = log_lines.pop() {
if let Some(log_json) = process_log_line(&line, &config.node_name) {
ndjson_lines.push(index_meta.clone());
ndjson_lines.push(serde_json::to_string(&log_json)?);
valid_count += 1;
}
}
if valid_count == 0 {
return Ok(SubmitStats {
success: 0,
failed: 0,
});
}
let body = ndjson_lines.join("\n") + "\n";
// 发送zero-copy
let response = client
.post(&url)
.basic_auth(&config.username, Some(&config.password))
.header("Content-Type", "application/x-ndjson")
.body(body) // 转换为 Byteszero-copy
.send()
.await?;
if response.status().is_success() {
Ok(SubmitStats {
success: valid_count,
failed: 0,
})
} else {
let status = response.status();
let text = response.text().await?;
eprintln!("❌ Bulk API 失败: {} - {}", status, text);
Ok(SubmitStats {
success: 0,
failed: valid_count,
})
}
}
/// 处理单条日志:解析 JSON 并添加 hostname
fn process_log_line(line: &str, hostname: &str) -> Option<Value> {
// 解析 JSON
let mut json = serde_json::from_str::<Value>(line).ok()?;
// 添加 hostname 字段
if let Some(obj) = json.as_object_mut() {
obj.insert("hostname".to_string(), Value::String(hostname.to_string()));
Some(json)
} else {
None
}
}
/// 检查配置文件以及zinc search状态
async fn test_zinc_search_connection(
config: &LayerConfig,
) -> Result<(), Box<dyn std::error::Error>> {
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(5))
.build()?;
let url = format!("{}/api/index_name", config.zinc_search_host);
let response = client
.get(&url)
.basic_auth(&config.username, Some(&config.password))
.send()
.await?;
if !response.status().is_success() {
return Err(format!("ZincSearch连接失败{}", response.status()).into());
}
let json: Vec<String> = response.json().await?;
for item in json {
if item.eq(&config.index_name) {
return Ok(());
}
}
Err(format!("索引 {} 不存在", config.index_name).into())
}
pub struct ZincSearchGatherWrite {
buffer: BUFFER,
notify: Arc<Notify>,
}
impl ZincSearchGatherWrite {
fn new(buffer: BUFFER, notify: Arc<Notify>) -> Self {
Self { buffer, notify }
}
}
impl<'a> MakeWriter<'a> for ZincSearchGatherWrite {
type Writer = ZincSearchWrite;
fn make_writer(&'a self) -> Self::Writer {
ZincSearchWrite::new(self.buffer.clone(), self.notify.clone())
}
}
pub struct ZincSearchWrite {
buffer: BUFFER,
notify: Arc<Notify>,
}
impl ZincSearchWrite {
fn new(buffer: BUFFER, notify: Arc<Notify>) -> Self {
Self { buffer, notify }
}
}
impl Write for ZincSearchWrite {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let buffer = self.buffer.lock();
buffer.push(String::from_utf8_lossy(buf).to_string());
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
// 仅当缓冲区内存在一定任务量时才触发通知
let lock = self.buffer.lock();
if lock.len() > 128 {
self.notify.notify_waiters();
}
Ok(())
}
}