feat(logic): 优化数据采集器启动流程并增加节点状态管理功能
- 移除配置参数依赖,改为直接调用Boot()函数 - 添加主机名自动获取和设置功能 - 新增SaveNodeStatus函数用于保存节点状态数据 - 在数据变化时同时保存节点状态信息 - 创建CollectorNode数据库模型用于存储节点状态
This commit is contained in:
@@ -8,28 +8,26 @@ import (
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"git.apinb.com/quant/collector/internal/conf"
|
||||
"github.com/robfig/cron/v3"
|
||||
)
|
||||
|
||||
var (
|
||||
COLLECTOR_URL = "http://localhost:5000/status"
|
||||
COLLECTION_INTERVAL = 30
|
||||
HOSTNAME = "localhost"
|
||||
)
|
||||
|
||||
func Boot(cfg *conf.Config) {
|
||||
func Boot() {
|
||||
log.Println("=== QMT数据采集器启动 ===")
|
||||
log.Printf("采集地址: %s", COLLECTOR_URL)
|
||||
log.Printf("采集间隔: %d秒", COLLECTION_INTERVAL)
|
||||
|
||||
if cfg == nil {
|
||||
log.Println("配置为空")
|
||||
return
|
||||
host, err := os.Hostname()
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("获取主机名失败: %v", err))
|
||||
} else {
|
||||
HOSTNAME = host
|
||||
}
|
||||
|
||||
log.Printf("账户: %s", cfg.Account)
|
||||
log.Printf("账户ID: %s", cfg.AccountID)
|
||||
log.Printf("主机名: %s", cfg.Hostname)
|
||||
log.Printf("主机名称: %s", HOSTNAME)
|
||||
|
||||
// 创建采集器
|
||||
coll := NewCollector(COLLECTOR_URL)
|
||||
@@ -45,7 +43,7 @@ func Boot(cfg *conf.Config) {
|
||||
log.Printf("定时任务表达式: %s", cronSpec)
|
||||
|
||||
// 添加定时任务
|
||||
_, err := c.AddFunc(cronSpec, func() {
|
||||
_, err = c.AddFunc(cronSpec, func() {
|
||||
runCollection(coll)
|
||||
})
|
||||
if err != nil {
|
||||
@@ -99,6 +97,12 @@ func runCollection(coll *Collector) {
|
||||
return
|
||||
}
|
||||
|
||||
// 保存节点状态(有则更新,没有则新增)
|
||||
if err := SaveNodeStatus(&status.Status); err != nil {
|
||||
log.Printf("保存节点状态失败: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// 记录成功的日志
|
||||
// if err := store.SaveCollectionLog(dataHash, ymd, true, "数据保存成功"); err != nil {
|
||||
// log.Printf("保存采集日志失败: %v", err)
|
||||
|
||||
@@ -178,6 +178,54 @@ func SaveData(status *types.StatusData) error {
|
||||
|
||||
}
|
||||
|
||||
// SaveNodeStatus 保存节点状态数据(有则更新,没有则新增)
|
||||
func SaveNodeStatus(status *types.Config) error {
|
||||
// 验证必要字段
|
||||
if status.ConfigKey == "" {
|
||||
return fmt.Errorf("配置键不能为空")
|
||||
}
|
||||
|
||||
// 查询是否存在
|
||||
var existingNode models.CollectorNode
|
||||
err := impl.DBService.Where("config_key = ?", status.ConfigKey).First(&existingNode).Error
|
||||
|
||||
nodeRecord := models.CollectorNode{
|
||||
ConfigKey: status.ConfigKey,
|
||||
HomeName: status.HomeName,
|
||||
ProjectRoot: status.ProjectRoot,
|
||||
QmtStatus: status.QmtStatus,
|
||||
StartTime: status.StartTime,
|
||||
}
|
||||
|
||||
if err == gorm.ErrRecordNotFound {
|
||||
// 记录不存在,插入新记录
|
||||
if err := impl.DBService.Create(&nodeRecord).Error; err != nil {
|
||||
return fmt.Errorf("插入节点状态失败: %w", err)
|
||||
}
|
||||
log.Printf("新建节点状态: config_key=%s, home_name=%s, qmt_status=%s",
|
||||
status.ConfigKey, status.HomeName, status.QmtStatus)
|
||||
} else if err != nil {
|
||||
// 查询出错
|
||||
return fmt.Errorf("查询节点状态失败: %w", err)
|
||||
} else {
|
||||
// 记录存在,检查是否有变化
|
||||
if nodeRecord.HomeName != existingNode.HomeName ||
|
||||
nodeRecord.ProjectRoot != existingNode.ProjectRoot ||
|
||||
nodeRecord.QmtStatus != existingNode.QmtStatus ||
|
||||
nodeRecord.StartTime != existingNode.StartTime {
|
||||
// 有变化,更新现有记录
|
||||
nodeRecord.ID = existingNode.ID
|
||||
if err := impl.DBService.Save(&nodeRecord).Error; err != nil {
|
||||
return fmt.Errorf("更新节点状态失败: %w", err)
|
||||
}
|
||||
log.Printf("更新节点状态: config_key=%s, home_name=%s, qmt_status=%s",
|
||||
status.ConfigKey, status.HomeName, status.QmtStatus)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// processOrderBook 处理订单簿逻辑
|
||||
func processOrderBook(accountID string, order types.Order, ymd int, now time.Time, openPrice float64) error {
|
||||
// OffsetFlag: 通常 0=买入, 1=卖出 (需要根据实际系统确认)
|
||||
|
||||
Reference in New Issue
Block a user