Compare commits

...

5 Commits

Author SHA1 Message Date
7583eceede optz 2026-04-24 18:26:01 +08:00
9274ef4689 optz 2026-04-24 09:49:33 +08:00
8bae04bc5e feat(logic): 优化数据采集器启动流程并增加节点状态管理功能
- 移除配置参数依赖,改为直接调用Boot()函数
- 添加主机名自动获取和设置功能
- 新增SaveNodeStatus函数用于保存节点状态数据
- 在数据变化时同时保存节点状态信息
- 创建CollectorNode数据库模型用于存储节点状态
2026-04-24 09:40:48 +08:00
b2bfcc5e5e fix config 2026-04-24 09:11:17 +08:00
820f3435d3 fix bug 2026-04-22 18:17:58 +08:00
9 changed files with 134 additions and 64 deletions

View File

@@ -1,13 +1,30 @@
package main package main
import ( import (
"git.apinb.com/quant/collector/internal/conf" "fmt"
"os"
"git.apinb.com/quant/collector/internal/impl" "git.apinb.com/quant/collector/internal/impl"
"git.apinb.com/quant/collector/internal/logic" "git.apinb.com/quant/collector/internal/logic"
"github.com/marcsauter/single"
) )
func main() { func main() {
// 互斥体名称Global\CollectorSingleInstance 确保会话全局唯一
s := single.New("CollectorSingleInstance")
// 尝试加锁,如果已有实例运行会返回错误
if err := s.CheckLock(); err != nil {
if err == single.ErrAlreadyRunning {
fmt.Println("collector.exe 已有实例正在运行,本次启动退出。")
os.Exit(0)
}
fmt.Fprintf(os.Stderr, "检查单实例失败: %v\n", err)
os.Exit(1)
}
// 进程退出时自动释放锁
defer s.TryUnlock()
impl.NewImpl() impl.NewImpl()
c := conf.Get() logic.Boot()
logic.Boot(c)
} }

1
go.mod
View File

@@ -47,6 +47,7 @@ require (
github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect github.com/jinzhu/now v1.1.5 // indirect
github.com/marcsauter/single v0.0.0-20201009143647-9f8d81240be2
golang.org/x/crypto v0.48.0 // indirect golang.org/x/crypto v0.48.0 // indirect
golang.org/x/sync v0.20.0 // indirect golang.org/x/sync v0.20.0 // indirect
golang.org/x/text v0.36.0 // indirect golang.org/x/text v0.36.0 // indirect

2
go.sum
View File

@@ -44,6 +44,8 @@ github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/marcsauter/single v0.0.0-20201009143647-9f8d81240be2 h1:TyUcIW0tpCQzV4Hpe9jF3p590EQFnMQV3sv6DhoxV6Q=
github.com/marcsauter/single v0.0.0-20201009143647-9f8d81240be2/go.mod h1:uUA07IN7rYmbr5YlZM5nDVLyoxiqqpprFlXBrjqI24A=
github.com/oklog/ulid/v2 v2.1.1 h1:suPZ4ARWLOJLegGFiZZ1dFAkqzhMjL3J1TzI+5wHz8s= github.com/oklog/ulid/v2 v2.1.1 h1:suPZ4ARWLOJLegGFiZZ1dFAkqzhMjL3J1TzI+5wHz8s=
github.com/oklog/ulid/v2 v2.1.1/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ= github.com/oklog/ulid/v2 v2.1.1/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o= github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=

View File

@@ -1,29 +0,0 @@
package conf
import (
"os"
"strings"
)
type Config struct {
Hostname string
Account string
AccountID string
}
func Get() *Config {
host, _ := os.Hostname()
host = strings.ToLower(host)
switch host {
case "desktop-t5lh34v":
return &Config{Hostname: host, Account: "david", AccountID: "86037237"}
case "t8zznqs49f1ju7q":
return &Config{Hostname: host, Account: "liao", AccountID: "8889399698"}
case "dhzd0ojkpctafmm":
return &Config{Hostname: host, Account: "zhang", AccountID: "8889616198"}
case "f7tib45aqk4n10h":
return &Config{Hostname: host, Account: "long", AccountID: "8886508526"}
}
return nil
}

View File

@@ -8,28 +8,26 @@ import (
"syscall" "syscall"
"time" "time"
"git.apinb.com/quant/collector/internal/conf"
"github.com/robfig/cron/v3" "github.com/robfig/cron/v3"
) )
var ( var (
COLLECTOR_URL = "http://localhost:5000/status" COLLECTOR_URL = "http://localhost:5000/status"
COLLECTION_INTERVAL = 30 COLLECTION_INTERVAL = 30
HOSTNAME = "localhost"
) )
func Boot(cfg *conf.Config) { func Boot() {
log.Println("=== QMT数据采集器启动 ===") log.Println("=== QMT数据采集器启动 ===")
log.Printf("采集地址: %s", COLLECTOR_URL) log.Printf("采集地址: %s", COLLECTOR_URL)
log.Printf("采集间隔: %d秒", COLLECTION_INTERVAL) log.Printf("采集间隔: %d秒", COLLECTION_INTERVAL)
host, err := os.Hostname()
if cfg == nil { if err != nil {
log.Println("配置为空") panic(fmt.Errorf("获取主机名失败: %v", err))
return } else {
HOSTNAME = host
} }
log.Printf("主机名称: %s", HOSTNAME)
log.Printf("账户: %s", cfg.Account)
log.Printf("账户ID: %s", cfg.AccountID)
log.Printf("主机名: %s", cfg.Hostname)
// 创建采集器 // 创建采集器
coll := NewCollector(COLLECTOR_URL) coll := NewCollector(COLLECTOR_URL)
@@ -45,7 +43,7 @@ func Boot(cfg *conf.Config) {
log.Printf("定时任务表达式: %s", cronSpec) log.Printf("定时任务表达式: %s", cronSpec)
// 添加定时任务 // 添加定时任务
_, err := c.AddFunc(cronSpec, func() { _, err = c.AddFunc(cronSpec, func() {
runCollection(coll) runCollection(coll)
}) })
if err != nil { if err != nil {
@@ -94,11 +92,17 @@ func runCollection(coll *Collector) {
// 数据有变化,保存到数据库 // 数据有变化,保存到数据库
log.Println("数据已变化,开始存储到数据库...") log.Println("数据已变化,开始存储到数据库...")
if err := SaveData(status); err != nil { if err := SaveData(&status.Data); err != nil {
log.Printf("保存数据失败: %v", err) log.Printf("保存数据失败: %v", err)
return return
} }
// 保存节点状态(有则更新,没有则新增)
if err := SaveNodeStatus(&status.Status); err != nil {
log.Printf("保存节点状态失败: %v", err)
return
}
// 记录成功的日志 // 记录成功的日志
// if err := store.SaveCollectionLog(dataHash, ymd, true, "数据保存成功"); err != nil { // if err := store.SaveCollectionLog(dataHash, ymd, true, "数据保存成功"); err != nil {
// log.Printf("保存采集日志失败: %v", err) // log.Printf("保存采集日志失败: %v", err)

View File

@@ -13,9 +13,9 @@ import (
) )
// SaveStatus 保存完整状态数据(使用事务) // SaveStatus 保存完整状态数据(使用事务)
func SaveData(status *types.StatusData) error { func SaveData(data *types.Data) error {
// 验证必要字段 - AccountID是所有Upsert的共同条件 // 验证必要字段 - AccountID是所有Upsert的共同条件
if status.Data.Assets.AccountID == "" { if data.Assets.AccountID == "" {
return fmt.Errorf("账户ID不能为空") return fmt.Errorf("账户ID不能为空")
} }
@@ -25,16 +25,16 @@ func SaveData(status *types.StatusData) error {
// 保存资产快照 (先查询后更新/插入) // 保存资产快照 (先查询后更新/插入)
var existingAsset models.CollectorAssets var existingAsset models.CollectorAssets
err := impl.DBService.Where("account_id = ? AND ymd = ?", status.Data.Assets.AccountID, ymd).First(&existingAsset).Error err := impl.DBService.Where("account_id = ? AND ymd = ?", data.Assets.AccountID, ymd).First(&existingAsset).Error
asset := models.CollectorAssets{ asset := models.CollectorAssets{
AccountID: status.Data.Assets.AccountID, AccountID: data.Assets.AccountID,
Ymd: ymd, Ymd: ymd,
Cash: status.Data.Assets.Cash, Cash: data.Assets.Cash,
FrozenCash: status.Data.Assets.FrozenCash, FrozenCash: data.Assets.FrozenCash,
MarketValue: status.Data.Assets.MarketValue, MarketValue: data.Assets.MarketValue,
Profit: status.Data.Assets.Profit, Profit: data.Assets.Profit,
TotalAsset: status.Data.Assets.TotalAsset, TotalAsset: data.Assets.TotalAsset,
CollectedAt: now, CollectedAt: now,
} }
@@ -57,8 +57,8 @@ func SaveData(status *types.StatusData) error {
} }
// 批量保存订单 (先查询后更新/插入) // 批量保存订单 (先查询后更新/插入)
if len(status.Data.Orders) > 0 { if len(data.Orders) > 0 {
for _, order := range status.Data.Orders { for _, order := range data.Orders {
// 验证必要条件: OrderID和StockCode // 验证必要条件: OrderID和StockCode
if order.OrderID == 0 { if order.OrderID == 0 {
continue continue
@@ -79,14 +79,14 @@ func SaveData(status *types.StatusData) error {
// 查询是否存在 // 查询是否存在
var cnt int64 var cnt int64
impl.DBService.Where("account_id = ? AND order_id = ? AND ymd = ?", status.Data.Assets.AccountID, order.OrderID, ymd).Count(&cnt) impl.DBService.Model(&models.CollectorOrder{}).Where("account_id = ? AND order_id = ? AND ymd = ?", data.Assets.AccountID, order.OrderID, ymd).Count(&cnt)
if cnt > 0 { if cnt > 0 {
continue continue
} }
orderRecord := models.CollectorOrder{ orderRecord := models.CollectorOrder{
OrderID: order.OrderID, OrderID: order.OrderID,
AccountID: status.Data.Assets.AccountID, AccountID: data.Assets.AccountID,
StockCode: order.StockCode, StockCode: order.StockCode,
Ymd: ymd, Ymd: ymd,
Price: order.Price, Price: order.Price,
@@ -116,8 +116,8 @@ func SaveData(status *types.StatusData) error {
} }
// 批量保存持仓 (先查询后更新/插入) // 批量保存持仓 (先查询后更新/插入)
if len(status.Data.Positions) > 0 { if len(data.Positions) > 0 {
for _, pos := range status.Data.Positions { for _, pos := range data.Positions {
// 验证必要条件: Code // 验证必要条件: Code
if pos.Code == "" { if pos.Code == "" {
continue continue
@@ -126,10 +126,10 @@ func SaveData(status *types.StatusData) error {
// 查询是否存在 // 查询是否存在
var existingPosition models.CollectorPosition var existingPosition models.CollectorPosition
err := impl.DBService.Where("account_id = ? AND code = ? AND ymd = ?", err := impl.DBService.Where("account_id = ? AND code = ? AND ymd = ?",
status.Data.Assets.AccountID, pos.Code, ymd).First(&existingPosition).Error data.Assets.AccountID, pos.Code, ymd).First(&existingPosition).Error
positionRecord := models.CollectorPosition{ positionRecord := models.CollectorPosition{
AccountID: status.Data.Assets.AccountID, AccountID: data.Assets.AccountID,
Code: pos.Code, Code: pos.Code,
Ymd: ymd, Ymd: ymd,
Volume: pos.Volume, Volume: pos.Volume,
@@ -178,6 +178,54 @@ func SaveData(status *types.StatusData) error {
} }
// SaveNodeStatus 保存节点状态数据(有则更新,没有则新增)
func SaveNodeStatus(status *types.Config) error {
// 验证必要字段
if status.ConfigKey == "" {
return fmt.Errorf("配置键不能为空")
}
// 查询是否存在
var existingNode models.CollectorNode
err := impl.DBService.Where("config_key = ?", status.ConfigKey).First(&existingNode).Error
nodeRecord := models.CollectorNode{
ConfigKey: status.ConfigKey,
HomeName: status.HomeName,
ProjectRoot: status.ProjectRoot,
QmtStatus: status.QmtStatus,
StartTime: status.StartTime,
}
if err == gorm.ErrRecordNotFound {
// 记录不存在,插入新记录
if err := impl.DBService.Create(&nodeRecord).Error; err != nil {
return fmt.Errorf("插入节点状态失败: %w", err)
}
log.Printf("新建节点状态: config_key=%s, home_name=%s, qmt_status=%s",
status.ConfigKey, status.HomeName, status.QmtStatus)
} else if err != nil {
// 查询出错
return fmt.Errorf("查询节点状态失败: %w", err)
} else {
// 记录存在,检查是否有变化
if nodeRecord.HomeName != existingNode.HomeName ||
nodeRecord.ProjectRoot != existingNode.ProjectRoot ||
nodeRecord.QmtStatus != existingNode.QmtStatus ||
nodeRecord.StartTime != existingNode.StartTime {
// 有变化,更新现有记录
nodeRecord.ID = existingNode.ID
if err := impl.DBService.Save(&nodeRecord).Error; err != nil {
return fmt.Errorf("更新节点状态失败: %w", err)
}
log.Printf("更新节点状态: config_key=%s, home_name=%s, qmt_status=%s",
status.ConfigKey, status.HomeName, status.QmtStatus)
}
}
return nil
}
// processOrderBook 处理订单簿逻辑 // processOrderBook 处理订单簿逻辑
func processOrderBook(accountID string, order types.Order, ymd int, now time.Time, openPrice float64) error { func processOrderBook(accountID string, order types.Order, ymd int, now time.Time, openPrice float64) error {
// OffsetFlag: 通常 0=买入, 1=卖出 (需要根据实际系统确认) // OffsetFlag: 通常 0=买入, 1=卖出 (需要根据实际系统确认)

View File

@@ -0,0 +1,28 @@
package models
import (
"time"
"git.apinb.com/bsm-sdk/core/database"
)
// CollectorNode 节点状态数据库模型
type CollectorNode struct {
ID uint `json:"id" gorm:"primaryKey;comment:主键ID"`
ConfigKey string `json:"config_key" gorm:"type:varchar(100);not null;uniqueIndex;comment:配置键"`
HomeName string `json:"home_name" gorm:"type:varchar(100);not null;comment:主机名称"`
ProjectRoot string `json:"project_root" gorm:"type:varchar(500);not null;comment:项目根目录"`
QmtStatus string `json:"qmt_status" gorm:"type:varchar(50);not null;comment:QMT连接状态"`
StartTime int64 `json:"start_time" gorm:"not null;comment:启动时间戳(纳秒)"`
UpdatedAt time.Time `json:"updated_at" gorm:"autoUpdateTime;comment:记录更新时间"`
CreatedAt time.Time `json:"created_at" gorm:"autoCreateTime;comment:记录创建时间"`
}
func init() {
database.AppendMigrate(&CollectorNode{})
}
// TableName 设置表名
func (CollectorNode) TableName() string {
return "collector_node"
}

View File

@@ -9,7 +9,7 @@ type StatusData struct {
// Data 数据部分 // Data 数据部分
type Data struct { type Data struct {
Assets Assets `json:"assets"` Assets Assets `json:"assets"`
Orders []Order `json:"order"` Orders []Order `json:"orders"`
Positions []Position `json:"positions"` Positions []Position `json:"positions"`
TickData map[string]Tick `json:"tick_data"` TickData map[string]Tick `json:"tick_data"`
} }

View File

@@ -1,3 +1,2 @@
#!/bin/bash #!/bin/bash
go build -ldflags="-s -w" -o D:\work\quant\qmt\bin\collector.exe ./cmd/main.go
go build -o D:\work\quant\qmt\bin\collector.exe ./cmd/main.go