170 lines
4.0 KiB
Go
170 lines
4.0 KiB
Go
package logger
|
|
|
|
import (
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
type LogTask struct {
|
|
Timestamp int64
|
|
Level string
|
|
Source string
|
|
Action string
|
|
Message string
|
|
}
|
|
|
|
type LogWorkerPool struct {
|
|
db *gorm.DB
|
|
taskChan chan LogTask // 任务通道
|
|
workerCount int // 工作协程数量
|
|
wg sync.WaitGroup // 等待所有工作协程完成
|
|
stopChan chan struct{} // 通知工作协程停止
|
|
isClosing bool // 标记日志系统是否正在关闭
|
|
batchSize int // 批量写入日志数量
|
|
batchInterval time.Duration // 批量写入间隔时间
|
|
}
|
|
|
|
// 实际执行日志写入的协程
|
|
func (p *LogWorkerPool) worker(id int) {
|
|
defer p.wg.Done()
|
|
|
|
log.Printf("日志工作协程 #%d 启动。", id)
|
|
|
|
batch := make([]Log, 0, p.batchSize) // 用于存储当前批次的日志
|
|
ticker := time.NewTicker(p.batchInterval) // 定时器,用于触发超时写入
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case task, ok := <-p.taskChan:
|
|
if !ok {
|
|
p.flushBatch(batch)
|
|
return
|
|
}
|
|
|
|
dbLog := Log{
|
|
Timestamp: task.Timestamp,
|
|
Level: task.Level,
|
|
Source: task.Source,
|
|
Action: task.Action,
|
|
Message: task.Message,
|
|
}
|
|
batch = append(batch, dbLog)
|
|
|
|
// 批次达到指定大小则立即写入
|
|
if len(batch) >= p.batchSize {
|
|
p.flushBatch(batch)
|
|
batch = make([]Log, 0, p.batchSize) // 重置批次
|
|
ticker.Reset(p.batchInterval) // 重置定时器
|
|
}
|
|
|
|
case <-ticker.C:
|
|
// 定时器触发
|
|
if len(batch) > 0 {
|
|
p.flushBatch(batch)
|
|
batch = make([]Log, 0, p.batchSize) // 重置批次
|
|
}
|
|
|
|
case <-p.stopChan: // 关闭通道
|
|
for {
|
|
select {
|
|
case task, ok := <-p.taskChan:
|
|
if !ok { // 通道已关闭且所有任务已处理
|
|
p.flushBatch(batch) // 刷新剩余批次
|
|
return
|
|
}
|
|
|
|
dbLog := Log{
|
|
Timestamp: task.Timestamp,
|
|
Level: task.Level,
|
|
Source: task.Source,
|
|
Action: task.Action,
|
|
Message: task.Message,
|
|
}
|
|
batch = append(batch, dbLog)
|
|
// 达到批次大小或通道为空时刷新
|
|
if len(batch) >= p.batchSize || len(p.taskChan) == 0 {
|
|
p.flushBatch(batch)
|
|
batch = make([]Log, 0, p.batchSize)
|
|
}
|
|
default:
|
|
p.flushBatch(batch)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// 日志批量写入
|
|
func (p *LogWorkerPool) flushBatch(batch []Log) {
|
|
if len(batch) == 0 {
|
|
return
|
|
}
|
|
if result := p.db.CreateInBatches(batch, len(batch)); result.Error != nil {
|
|
log.Printf("工作协程批量写入数据库日志失败, 批次大小: %d", len(batch))
|
|
}
|
|
}
|
|
|
|
// 提交日志任务的公共接口
|
|
// 此函数会阻塞,直到日志任务被通道接收
|
|
func (p *LogWorkerPool) SubmitLog(level, source, action, message string) {
|
|
if p.isClosing {
|
|
log.Printf("日志系统正在关闭,无法提交日志: %s - %s", level, message)
|
|
return
|
|
}
|
|
|
|
// 直接发送到通道,如果通道满则阻塞
|
|
p.taskChan <- LogTask{
|
|
Timestamp: time.Now().Unix(),
|
|
Level: level,
|
|
Source: source,
|
|
Action: action,
|
|
Message: message,
|
|
}
|
|
}
|
|
|
|
// 关闭工作池
|
|
func (p *LogWorkerPool) Close() {
|
|
p.isClosing = true // 标记正在关闭
|
|
close(p.stopChan) // 关闭停止通道
|
|
p.wg.Wait() // 等待所有工作协程完成其工作
|
|
close(p.taskChan) // 关闭任务通道
|
|
}
|
|
|
|
func NewLogWorkerPool(db *gorm.DB, workerCount, queueSize, batchSize int, batchInterval time.Duration) *LogWorkerPool {
|
|
if workerCount <= 0 {
|
|
workerCount = 1
|
|
}
|
|
if queueSize <= 0 {
|
|
queueSize = 1000 // 默认队列大小
|
|
}
|
|
if batchSize <= 0 {
|
|
batchSize = 50 // 默认批量大小
|
|
}
|
|
if batchInterval <= 0 {
|
|
batchInterval = 5 * time.Second // 默认批量间隔
|
|
}
|
|
|
|
pool := &LogWorkerPool{
|
|
db: db,
|
|
taskChan: make(chan LogTask, queueSize),
|
|
workerCount: workerCount,
|
|
stopChan: make(chan struct{}),
|
|
isClosing: false,
|
|
batchSize: batchSize,
|
|
batchInterval: batchInterval,
|
|
}
|
|
|
|
// 启动指定数量的工作协程
|
|
for i := 0; i < workerCount; i++ {
|
|
pool.wg.Add(1)
|
|
go pool.worker(i + 1)
|
|
}
|
|
|
|
return pool
|
|
}
|