package logger import ( "log" "sync" "time" "gorm.io/gorm" ) type LogTask struct { Timestamp int64 Level string Source string Action string Message string } type LogWorkerPool struct { db *gorm.DB taskChan chan LogTask // 任务通道 workerCount int // 工作协程数量 wg sync.WaitGroup // 等待所有工作协程完成 stopChan chan struct{} // 通知工作协程停止 isClosing bool // 标记日志系统是否正在关闭 batchSize int // 批量写入日志数量 batchInterval time.Duration // 批量写入间隔时间 } // 实际执行日志写入的协程 func (p *LogWorkerPool) worker(id int) { defer p.wg.Done() log.Printf("日志工作协程 #%d 启动。", id) batch := make([]Log, 0, p.batchSize) // 用于存储当前批次的日志 ticker := time.NewTicker(p.batchInterval) // 定时器,用于触发超时写入 defer ticker.Stop() for { select { case task, ok := <-p.taskChan: if !ok { p.flushBatch(batch) return } dbLog := Log{ Timestamp: task.Timestamp, Level: task.Level, Source: task.Source, Action: task.Action, Message: task.Message, } batch = append(batch, dbLog) // 批次达到指定大小则立即写入 if len(batch) >= p.batchSize { p.flushBatch(batch) batch = make([]Log, 0, p.batchSize) // 重置批次 ticker.Reset(p.batchInterval) // 重置定时器 } case <-ticker.C: // 定时器触发 if len(batch) > 0 { p.flushBatch(batch) batch = make([]Log, 0, p.batchSize) // 重置批次 } case <-p.stopChan: // 关闭通道 for { select { case task, ok := <-p.taskChan: if !ok { // 通道已关闭且所有任务已处理 p.flushBatch(batch) // 刷新剩余批次 return } dbLog := Log{ Timestamp: task.Timestamp, Level: task.Level, Source: task.Source, Action: task.Action, Message: task.Message, } batch = append(batch, dbLog) // 达到批次大小或通道为空时刷新 if len(batch) >= p.batchSize || len(p.taskChan) == 0 { p.flushBatch(batch) batch = make([]Log, 0, p.batchSize) } default: p.flushBatch(batch) return } } } } } // 日志批量写入 func (p *LogWorkerPool) flushBatch(batch []Log) { if len(batch) == 0 { return } if result := p.db.CreateInBatches(batch, len(batch)); result.Error != nil { log.Printf("工作协程批量写入数据库日志失败, 批次大小: %d", len(batch)) } } // 提交日志任务的公共接口 // 此函数会阻塞,直到日志任务被通道接收 func (p *LogWorkerPool) SubmitLog(level, source, action, message string) { if p.isClosing { log.Printf("日志系统正在关闭,无法提交日志: %s - %s", level, message) return } // 直接发送到通道,如果通道满则阻塞 p.taskChan <- LogTask{ Timestamp: time.Now().Unix(), Level: level, Source: source, Action: action, Message: message, } } // 关闭工作池 func (p *LogWorkerPool) Close() { p.isClosing = true // 标记正在关闭 close(p.stopChan) // 关闭停止通道 p.wg.Wait() // 等待所有工作协程完成其工作 close(p.taskChan) // 关闭任务通道 } func NewLogWorkerPool(db *gorm.DB, workerCount, queueSize, batchSize int, batchInterval time.Duration) *LogWorkerPool { if workerCount <= 0 { workerCount = 1 } if queueSize <= 0 { queueSize = 1000 // 默认队列大小 } if batchSize <= 0 { batchSize = 50 // 默认批量大小 } if batchInterval <= 0 { batchInterval = 5 * time.Second // 默认批量间隔 } pool := &LogWorkerPool{ db: db, taskChan: make(chan LogTask, queueSize), workerCount: workerCount, stopChan: make(chan struct{}), isClosing: false, batchSize: batchSize, batchInterval: batchInterval, } // 启动指定数量的工作协程 for i := 0; i < workerCount; i++ { pool.wg.Add(1) go pool.worker(i + 1) } return pool }