From 51aea0589e3d70b506df11d3039957c18fb9a308 Mon Sep 17 00:00:00 2001 From: Fuyao Date: Fri, 31 Oct 2025 16:34:39 +0800 Subject: [PATCH] =?UTF-8?q?=E6=97=A5=E5=BF=97=E5=8C=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 11 ++++ go.sum | 8 +++ level.go | 10 ++++ logger.go | 169 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ model.go | 10 ++++ 5 files changed, 208 insertions(+) create mode 100644 go.mod create mode 100644 go.sum create mode 100644 level.go create mode 100644 logger.go create mode 100644 model.go diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..c3ecf65 --- /dev/null +++ b/go.mod @@ -0,0 +1,11 @@ +module git.whblueocean.cn/blueocean-go/logger + +go 1.25.3 + +require gorm.io/gorm v1.31.0 + +require ( + github.com/jinzhu/inflection v1.0.0 // indirect + github.com/jinzhu/now v1.1.5 // indirect + golang.org/x/text v0.20.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..bc966ba --- /dev/null +++ b/go.sum @@ -0,0 +1,8 @@ +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= +github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug= +golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4= +gorm.io/gorm v1.31.0 h1:0VlycGreVhK7RF/Bwt51Fk8v0xLiiiFdbGDPIZQ7mJY= +gorm.io/gorm v1.31.0/go.mod h1:XyQVbO2k6YkOis7C2437jSit3SsDK72s7n7rsSHd+Gs= diff --git a/level.go b/level.go new file mode 100644 index 0000000..716cf74 --- /dev/null +++ b/level.go @@ -0,0 +1,10 @@ +package logger + +const ( + INFO = "INFO" // 信息 + WARN = "WARN" // 警告 + ERROR = "ERROR" // 错误 + DEBUG = "DEBUG" // 调试 + TRACE = "TRACE" // 追踪 + FATAL = "FATAL" // 致命 +) diff --git a/logger.go b/logger.go new file mode 100644 index 0000000..e38a1a6 --- /dev/null +++ b/logger.go @@ -0,0 +1,169 @@ +package logger + +import ( + "log" + "sync" + "time" + + "gorm.io/gorm" +) + +type LogTask struct { + Timestamp int64 + Level string + Source string + Action string + Message string +} + +type LogWorkerPool struct { + db *gorm.DB + taskChan chan LogTask // 任务通道 + workerCount int // 工作协程数量 + wg sync.WaitGroup // 等待所有工作协程完成 + stopChan chan struct{} // 通知工作协程停止 + isClosing bool // 标记日志系统是否正在关闭 + batchSize int // 批量写入日志数量 + batchInterval time.Duration // 批量写入间隔时间 +} + +// 实际执行日志写入的协程 +func (p *LogWorkerPool) worker(id int) { + defer p.wg.Done() + + log.Printf("日志工作协程 #%d 启动。", id) + + batch := make([]Log, 0, p.batchSize) // 用于存储当前批次的日志 + ticker := time.NewTicker(p.batchInterval) // 定时器,用于触发超时写入 + defer ticker.Stop() + + for { + select { + case task, ok := <-p.taskChan: + if !ok { + p.flushBatch(batch) + return + } + + dbLog := Log{ + Timestamp: task.Timestamp, + Level: task.Level, + Source: task.Source, + Action: task.Action, + Message: task.Message, + } + batch = append(batch, dbLog) + + // 批次达到指定大小则立即写入 + if len(batch) >= p.batchSize { + p.flushBatch(batch) + batch = make([]Log, 0, p.batchSize) // 重置批次 + ticker.Reset(p.batchInterval) // 重置定时器 + } + + case <-ticker.C: + // 定时器触发 + if len(batch) > 0 { + p.flushBatch(batch) + batch = make([]Log, 0, p.batchSize) // 重置批次 + } + + case <-p.stopChan: // 关闭通道 + for { + select { + case task, ok := <-p.taskChan: + if !ok { // 通道已关闭且所有任务已处理 + p.flushBatch(batch) // 刷新剩余批次 + return + } + + dbLog := Log{ + Timestamp: task.Timestamp, + Level: task.Level, + Source: task.Source, + Action: task.Action, + Message: task.Message, + } + batch = append(batch, dbLog) + // 达到批次大小或通道为空时刷新 + if len(batch) >= p.batchSize || len(p.taskChan) == 0 { + p.flushBatch(batch) + batch = make([]Log, 0, p.batchSize) + } + default: + p.flushBatch(batch) + return + } + } + } + } +} + +// 日志批量写入 +func (p *LogWorkerPool) flushBatch(batch []Log) { + if len(batch) == 0 { + return + } + if result := p.db.CreateInBatches(batch, len(batch)); result.Error != nil { + log.Printf("工作协程批量写入数据库日志失败, 批次大小: %d", len(batch)) + } +} + +// 提交日志任务的公共接口 +// 此函数会阻塞,直到日志任务被通道接收 +func (p *LogWorkerPool) SubmitLog(level, source, action, message string) { + if p.isClosing { + log.Printf("日志系统正在关闭,无法提交日志: %s - %s", level, message) + return + } + + // 直接发送到通道,如果通道满则阻塞 + p.taskChan <- LogTask{ + Timestamp: time.Now().Unix(), + Level: level, + Source: source, + Action: action, + Message: message, + } +} + +// 关闭工作池 +func (p *LogWorkerPool) Close() { + p.isClosing = true // 标记正在关闭 + close(p.stopChan) // 关闭停止通道 + p.wg.Wait() // 等待所有工作协程完成其工作 + close(p.taskChan) // 关闭任务通道 +} + +func NewLogWorkerPool(db *gorm.DB, workerCount, queueSize, batchSize int, batchInterval time.Duration) *LogWorkerPool { + if workerCount <= 0 { + workerCount = 1 + } + if queueSize <= 0 { + queueSize = 1000 // 默认队列大小 + } + if batchSize <= 0 { + batchSize = 50 // 默认批量大小 + } + if batchInterval <= 0 { + batchInterval = 5 * time.Second // 默认批量间隔 + } + + pool := &LogWorkerPool{ + db: db, + taskChan: make(chan LogTask, queueSize), + workerCount: workerCount, + stopChan: make(chan struct{}), + isClosing: false, + batchSize: batchSize, + batchInterval: batchInterval, + } + + // 启动指定数量的工作协程 + for i := 0; i < workerCount; i++ { + pool.wg.Add(1) + go pool.worker(i + 1) + } + + return pool +} diff --git a/model.go b/model.go new file mode 100644 index 0000000..8a5c6b2 --- /dev/null +++ b/model.go @@ -0,0 +1,10 @@ +package logger + +type Log struct { + ID uint64 `gorm:"column:id;primaryKey;autoIncrement;comment:自增ID" json:"id"` + Timestamp int64 `gorm:"column:timestamp;type:BIGINT;comment:时间" json:"timestamp"` + Level string `gorm:"column:level;type:VARCHAR(50);comment:日志级别" json:"level"` + Source string `gorm:"column:source;type:VARCHAR(255);comment:来源" json:"source"` + Action string `gorm:"column:action;type:VARCHAR(255);comment:操作" json:"action"` + Message string `gorm:"column:message;type:VARCHAR(255);comment:消息" json:"message"` +} -- 2.49.1