Compare commits
4 Commits
3d89b59a67
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 9274ef4689 | |||
| 8bae04bc5e | |||
| b2bfcc5e5e | |||
| 820f3435d3 |
@@ -1,13 +1,11 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"git.apinb.com/quant/collector/internal/conf"
|
||||
"git.apinb.com/quant/collector/internal/impl"
|
||||
"git.apinb.com/quant/collector/internal/logic"
|
||||
)
|
||||
|
||||
func main() {
|
||||
impl.NewImpl()
|
||||
c := conf.Get()
|
||||
logic.Boot(c)
|
||||
logic.Boot()
|
||||
}
|
||||
|
||||
@@ -1,29 +0,0 @@
|
||||
package conf
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Hostname string
|
||||
Account string
|
||||
AccountID string
|
||||
}
|
||||
|
||||
func Get() *Config {
|
||||
host, _ := os.Hostname()
|
||||
host = strings.ToLower(host)
|
||||
|
||||
switch host {
|
||||
case "desktop-t5lh34v":
|
||||
return &Config{Hostname: host, Account: "david", AccountID: "86037237"}
|
||||
case "t8zznqs49f1ju7q":
|
||||
return &Config{Hostname: host, Account: "liao", AccountID: "8889399698"}
|
||||
case "dhzd0ojkpctafmm":
|
||||
return &Config{Hostname: host, Account: "zhang", AccountID: "8889616198"}
|
||||
case "f7tib45aqk4n10h":
|
||||
return &Config{Hostname: host, Account: "long", AccountID: "8886508526"}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -8,28 +8,26 @@ import (
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"git.apinb.com/quant/collector/internal/conf"
|
||||
"github.com/robfig/cron/v3"
|
||||
)
|
||||
|
||||
var (
|
||||
COLLECTOR_URL = "http://localhost:5000/status"
|
||||
COLLECTION_INTERVAL = 30
|
||||
HOSTNAME = "localhost"
|
||||
)
|
||||
|
||||
func Boot(cfg *conf.Config) {
|
||||
func Boot() {
|
||||
log.Println("=== QMT数据采集器启动 ===")
|
||||
log.Printf("采集地址: %s", COLLECTOR_URL)
|
||||
log.Printf("采集间隔: %d秒", COLLECTION_INTERVAL)
|
||||
|
||||
if cfg == nil {
|
||||
log.Println("配置为空")
|
||||
return
|
||||
host, err := os.Hostname()
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("获取主机名失败: %v", err))
|
||||
} else {
|
||||
HOSTNAME = host
|
||||
}
|
||||
|
||||
log.Printf("账户: %s", cfg.Account)
|
||||
log.Printf("账户ID: %s", cfg.AccountID)
|
||||
log.Printf("主机名: %s", cfg.Hostname)
|
||||
log.Printf("主机名称: %s", HOSTNAME)
|
||||
|
||||
// 创建采集器
|
||||
coll := NewCollector(COLLECTOR_URL)
|
||||
@@ -45,7 +43,7 @@ func Boot(cfg *conf.Config) {
|
||||
log.Printf("定时任务表达式: %s", cronSpec)
|
||||
|
||||
// 添加定时任务
|
||||
_, err := c.AddFunc(cronSpec, func() {
|
||||
_, err = c.AddFunc(cronSpec, func() {
|
||||
runCollection(coll)
|
||||
})
|
||||
if err != nil {
|
||||
@@ -94,11 +92,17 @@ func runCollection(coll *Collector) {
|
||||
|
||||
// 数据有变化,保存到数据库
|
||||
log.Println("数据已变化,开始存储到数据库...")
|
||||
if err := SaveData(status); err != nil {
|
||||
if err := SaveData(&status.Data); err != nil {
|
||||
log.Printf("保存数据失败: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// 保存节点状态(有则更新,没有则新增)
|
||||
if err := SaveNodeStatus(&status.Status); err != nil {
|
||||
log.Printf("保存节点状态失败: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// 记录成功的日志
|
||||
// if err := store.SaveCollectionLog(dataHash, ymd, true, "数据保存成功"); err != nil {
|
||||
// log.Printf("保存采集日志失败: %v", err)
|
||||
|
||||
@@ -13,9 +13,9 @@ import (
|
||||
)
|
||||
|
||||
// SaveStatus 保存完整状态数据(使用事务)
|
||||
func SaveData(status *types.StatusData) error {
|
||||
func SaveData(data *types.Data) error {
|
||||
// 验证必要字段 - AccountID是所有Upsert的共同条件
|
||||
if status.Data.Assets.AccountID == "" {
|
||||
if data.Assets.AccountID == "" {
|
||||
return fmt.Errorf("账户ID不能为空")
|
||||
}
|
||||
|
||||
@@ -25,16 +25,16 @@ func SaveData(status *types.StatusData) error {
|
||||
|
||||
// 保存资产快照 (先查询后更新/插入)
|
||||
var existingAsset models.CollectorAssets
|
||||
err := impl.DBService.Where("account_id = ? AND ymd = ?", status.Data.Assets.AccountID, ymd).First(&existingAsset).Error
|
||||
err := impl.DBService.Where("account_id = ? AND ymd = ?", data.Assets.AccountID, ymd).First(&existingAsset).Error
|
||||
|
||||
asset := models.CollectorAssets{
|
||||
AccountID: status.Data.Assets.AccountID,
|
||||
AccountID: data.Assets.AccountID,
|
||||
Ymd: ymd,
|
||||
Cash: status.Data.Assets.Cash,
|
||||
FrozenCash: status.Data.Assets.FrozenCash,
|
||||
MarketValue: status.Data.Assets.MarketValue,
|
||||
Profit: status.Data.Assets.Profit,
|
||||
TotalAsset: status.Data.Assets.TotalAsset,
|
||||
Cash: data.Assets.Cash,
|
||||
FrozenCash: data.Assets.FrozenCash,
|
||||
MarketValue: data.Assets.MarketValue,
|
||||
Profit: data.Assets.Profit,
|
||||
TotalAsset: data.Assets.TotalAsset,
|
||||
CollectedAt: now,
|
||||
}
|
||||
|
||||
@@ -57,8 +57,8 @@ func SaveData(status *types.StatusData) error {
|
||||
}
|
||||
|
||||
// 批量保存订单 (先查询后更新/插入)
|
||||
if len(status.Data.Orders) > 0 {
|
||||
for _, order := range status.Data.Orders {
|
||||
if len(data.Orders) > 0 {
|
||||
for _, order := range data.Orders {
|
||||
// 验证必要条件: OrderID和StockCode
|
||||
if order.OrderID == 0 {
|
||||
continue
|
||||
@@ -79,14 +79,14 @@ func SaveData(status *types.StatusData) error {
|
||||
|
||||
// 查询是否存在
|
||||
var cnt int64
|
||||
impl.DBService.Where("account_id = ? AND order_id = ? AND ymd = ?", status.Data.Assets.AccountID, order.OrderID, ymd).Count(&cnt)
|
||||
impl.DBService.Model(&models.CollectorOrder{}).Where("account_id = ? AND order_id = ? AND ymd = ?", data.Assets.AccountID, order.OrderID, ymd).Count(&cnt)
|
||||
if cnt > 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
orderRecord := models.CollectorOrder{
|
||||
OrderID: order.OrderID,
|
||||
AccountID: status.Data.Assets.AccountID,
|
||||
AccountID: data.Assets.AccountID,
|
||||
StockCode: order.StockCode,
|
||||
Ymd: ymd,
|
||||
Price: order.Price,
|
||||
@@ -116,8 +116,8 @@ func SaveData(status *types.StatusData) error {
|
||||
}
|
||||
|
||||
// 批量保存持仓 (先查询后更新/插入)
|
||||
if len(status.Data.Positions) > 0 {
|
||||
for _, pos := range status.Data.Positions {
|
||||
if len(data.Positions) > 0 {
|
||||
for _, pos := range data.Positions {
|
||||
// 验证必要条件: Code
|
||||
if pos.Code == "" {
|
||||
continue
|
||||
@@ -126,10 +126,10 @@ func SaveData(status *types.StatusData) error {
|
||||
// 查询是否存在
|
||||
var existingPosition models.CollectorPosition
|
||||
err := impl.DBService.Where("account_id = ? AND code = ? AND ymd = ?",
|
||||
status.Data.Assets.AccountID, pos.Code, ymd).First(&existingPosition).Error
|
||||
data.Assets.AccountID, pos.Code, ymd).First(&existingPosition).Error
|
||||
|
||||
positionRecord := models.CollectorPosition{
|
||||
AccountID: status.Data.Assets.AccountID,
|
||||
AccountID: data.Assets.AccountID,
|
||||
Code: pos.Code,
|
||||
Ymd: ymd,
|
||||
Volume: pos.Volume,
|
||||
@@ -178,6 +178,54 @@ func SaveData(status *types.StatusData) error {
|
||||
|
||||
}
|
||||
|
||||
// SaveNodeStatus 保存节点状态数据(有则更新,没有则新增)
|
||||
func SaveNodeStatus(status *types.Config) error {
|
||||
// 验证必要字段
|
||||
if status.ConfigKey == "" {
|
||||
return fmt.Errorf("配置键不能为空")
|
||||
}
|
||||
|
||||
// 查询是否存在
|
||||
var existingNode models.CollectorNode
|
||||
err := impl.DBService.Where("config_key = ?", status.ConfigKey).First(&existingNode).Error
|
||||
|
||||
nodeRecord := models.CollectorNode{
|
||||
ConfigKey: status.ConfigKey,
|
||||
HomeName: status.HomeName,
|
||||
ProjectRoot: status.ProjectRoot,
|
||||
QmtStatus: status.QmtStatus,
|
||||
StartTime: status.StartTime,
|
||||
}
|
||||
|
||||
if err == gorm.ErrRecordNotFound {
|
||||
// 记录不存在,插入新记录
|
||||
if err := impl.DBService.Create(&nodeRecord).Error; err != nil {
|
||||
return fmt.Errorf("插入节点状态失败: %w", err)
|
||||
}
|
||||
log.Printf("新建节点状态: config_key=%s, home_name=%s, qmt_status=%s",
|
||||
status.ConfigKey, status.HomeName, status.QmtStatus)
|
||||
} else if err != nil {
|
||||
// 查询出错
|
||||
return fmt.Errorf("查询节点状态失败: %w", err)
|
||||
} else {
|
||||
// 记录存在,检查是否有变化
|
||||
if nodeRecord.HomeName != existingNode.HomeName ||
|
||||
nodeRecord.ProjectRoot != existingNode.ProjectRoot ||
|
||||
nodeRecord.QmtStatus != existingNode.QmtStatus ||
|
||||
nodeRecord.StartTime != existingNode.StartTime {
|
||||
// 有变化,更新现有记录
|
||||
nodeRecord.ID = existingNode.ID
|
||||
if err := impl.DBService.Save(&nodeRecord).Error; err != nil {
|
||||
return fmt.Errorf("更新节点状态失败: %w", err)
|
||||
}
|
||||
log.Printf("更新节点状态: config_key=%s, home_name=%s, qmt_status=%s",
|
||||
status.ConfigKey, status.HomeName, status.QmtStatus)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// processOrderBook 处理订单簿逻辑
|
||||
func processOrderBook(accountID string, order types.Order, ymd int, now time.Time, openPrice float64) error {
|
||||
// OffsetFlag: 通常 0=买入, 1=卖出 (需要根据实际系统确认)
|
||||
|
||||
28
internal/models/collect_node.go
Normal file
28
internal/models/collect_node.go
Normal file
@@ -0,0 +1,28 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"git.apinb.com/bsm-sdk/core/database"
|
||||
)
|
||||
|
||||
// CollectorNode 节点状态数据库模型
|
||||
type CollectorNode struct {
|
||||
ID uint `json:"id" gorm:"primaryKey;comment:主键ID"`
|
||||
ConfigKey string `json:"config_key" gorm:"type:varchar(100);not null;uniqueIndex;comment:配置键"`
|
||||
HomeName string `json:"home_name" gorm:"type:varchar(100);not null;comment:主机名称"`
|
||||
ProjectRoot string `json:"project_root" gorm:"type:varchar(500);not null;comment:项目根目录"`
|
||||
QmtStatus string `json:"qmt_status" gorm:"type:varchar(50);not null;comment:QMT连接状态"`
|
||||
StartTime int64 `json:"start_time" gorm:"not null;comment:启动时间戳(纳秒)"`
|
||||
UpdatedAt time.Time `json:"updated_at" gorm:"autoUpdateTime;comment:记录更新时间"`
|
||||
CreatedAt time.Time `json:"created_at" gorm:"autoCreateTime;comment:记录创建时间"`
|
||||
}
|
||||
|
||||
func init() {
|
||||
database.AppendMigrate(&CollectorNode{})
|
||||
}
|
||||
|
||||
// TableName 设置表名
|
||||
func (CollectorNode) TableName() string {
|
||||
return "collector_node"
|
||||
}
|
||||
@@ -9,7 +9,7 @@ type StatusData struct {
|
||||
// Data 数据部分
|
||||
type Data struct {
|
||||
Assets Assets `json:"assets"`
|
||||
Orders []Order `json:"order"`
|
||||
Orders []Order `json:"orders"`
|
||||
Positions []Position `json:"positions"`
|
||||
TickData map[string]Tick `json:"tick_data"`
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user