Compare commits
7 Commits
592c8e31dd
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 7583eceede | |||
| 9274ef4689 | |||
| 8bae04bc5e | |||
| b2bfcc5e5e | |||
| 820f3435d3 | |||
| 3d89b59a67 | |||
| aa2c3fb84d |
@@ -32,6 +32,13 @@ QMT数据采集器是一个专门用于从QMT量化交易平台采集实时交
|
|||||||
- 完整的采集日志记录,包括成功/失败状态和数据变化情况
|
- 完整的采集日志记录,包括成功/失败状态和数据变化情况
|
||||||
- 详细的错误信息记录,便于问题排查和系统维护
|
- 详细的错误信息记录,便于问题排查和系统维护
|
||||||
|
|
||||||
|
### 6. 订单簿数据完整性保障
|
||||||
|
- **启动时自动检查**:程序启动时自动检查所有账户的订单簿数据
|
||||||
|
- **基于持仓校验**:根据最新持仓列表智能识别遗漏的买入记录
|
||||||
|
- **智能补全机制**:自动检测并补全缺失的买入/卖出订单簿记录
|
||||||
|
- **交易闭合验证**:确保每笔买入都有对应的卖出记录,形成完整交易周期
|
||||||
|
- **FIFO匹配原则**:卖出时按买入时间先后顺序匹配(先进先出)
|
||||||
|
|
||||||
## 技术特性
|
## 技术特性
|
||||||
|
|
||||||
- **高性能**:采用Go语言开发,具有出色的并发处理能力
|
- **高性能**:采用Go语言开发,具有出色的并发处理能力
|
||||||
|
|||||||
20
cmd/main.go
20
cmd/main.go
@@ -1,12 +1,30 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
|
||||||
"git.apinb.com/quant/collector/internal/impl"
|
"git.apinb.com/quant/collector/internal/impl"
|
||||||
"git.apinb.com/quant/collector/internal/logic"
|
"git.apinb.com/quant/collector/internal/logic"
|
||||||
|
"github.com/marcsauter/single"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
impl.NewImpl()
|
// 互斥体名称,Global\CollectorSingleInstance 确保会话全局唯一
|
||||||
|
s := single.New("CollectorSingleInstance")
|
||||||
|
|
||||||
|
// 尝试加锁,如果已有实例运行会返回错误
|
||||||
|
if err := s.CheckLock(); err != nil {
|
||||||
|
if err == single.ErrAlreadyRunning {
|
||||||
|
fmt.Println("collector.exe 已有实例正在运行,本次启动退出。")
|
||||||
|
os.Exit(0)
|
||||||
|
}
|
||||||
|
fmt.Fprintf(os.Stderr, "检查单实例失败: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
// 进程退出时自动释放锁
|
||||||
|
defer s.TryUnlock()
|
||||||
|
|
||||||
|
impl.NewImpl()
|
||||||
logic.Boot()
|
logic.Boot()
|
||||||
}
|
}
|
||||||
|
|||||||
1
go.mod
1
go.mod
@@ -47,6 +47,7 @@ require (
|
|||||||
github.com/jackc/puddle/v2 v2.2.2 // indirect
|
github.com/jackc/puddle/v2 v2.2.2 // indirect
|
||||||
github.com/jinzhu/inflection v1.0.0 // indirect
|
github.com/jinzhu/inflection v1.0.0 // indirect
|
||||||
github.com/jinzhu/now v1.1.5 // indirect
|
github.com/jinzhu/now v1.1.5 // indirect
|
||||||
|
github.com/marcsauter/single v0.0.0-20201009143647-9f8d81240be2
|
||||||
golang.org/x/crypto v0.48.0 // indirect
|
golang.org/x/crypto v0.48.0 // indirect
|
||||||
golang.org/x/sync v0.20.0 // indirect
|
golang.org/x/sync v0.20.0 // indirect
|
||||||
golang.org/x/text v0.36.0 // indirect
|
golang.org/x/text v0.36.0 // indirect
|
||||||
|
|||||||
2
go.sum
2
go.sum
@@ -44,6 +44,8 @@ github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
|
|||||||
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
|
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
|
||||||
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
||||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||||
|
github.com/marcsauter/single v0.0.0-20201009143647-9f8d81240be2 h1:TyUcIW0tpCQzV4Hpe9jF3p590EQFnMQV3sv6DhoxV6Q=
|
||||||
|
github.com/marcsauter/single v0.0.0-20201009143647-9f8d81240be2/go.mod h1:uUA07IN7rYmbr5YlZM5nDVLyoxiqqpprFlXBrjqI24A=
|
||||||
github.com/oklog/ulid/v2 v2.1.1 h1:suPZ4ARWLOJLegGFiZZ1dFAkqzhMjL3J1TzI+5wHz8s=
|
github.com/oklog/ulid/v2 v2.1.1 h1:suPZ4ARWLOJLegGFiZZ1dFAkqzhMjL3J1TzI+5wHz8s=
|
||||||
github.com/oklog/ulid/v2 v2.1.1/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
|
github.com/oklog/ulid/v2 v2.1.1/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
|
||||||
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
|
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ import (
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
CacheUrl = "redis://null:Weidong2023~!@139.224.247.176:19379/0"
|
CacheUrl = "redis://null:Weidong2023~!@139.224.247.176:19379/0"
|
||||||
DBSources = []string{"host=139.224.247.176 user=postgres password=Stock0310~! dbname=stock_prod port=19432 sslmode=disable TimeZone=Asia/Shanghai"}
|
DBSources = []string{"host=139.224.247.176 user=postgres password=Stock0310~! dbname=stock_prod port=19432 sslmode=disable"}
|
||||||
RedisService *redis.RedisClient
|
RedisService *redis.RedisClient
|
||||||
DBService *gorm.DB
|
DBService *gorm.DB
|
||||||
)
|
)
|
||||||
|
|||||||
135
internal/logic/book.go
Normal file
135
internal/logic/book.go
Normal file
@@ -0,0 +1,135 @@
|
|||||||
|
package logic
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
|
||||||
|
"git.apinb.com/bsm-sdk/core/utils"
|
||||||
|
"git.apinb.com/quant/collector/internal/impl"
|
||||||
|
"git.apinb.com/quant/collector/internal/models"
|
||||||
|
"git.apinb.com/quant/collector/internal/types"
|
||||||
|
"gorm.io/gorm"
|
||||||
|
)
|
||||||
|
|
||||||
|
// CheckAndRepairOrderBooks 检查并补全订单簿数据
|
||||||
|
func CheckAndRepairOrderBooks(accountID string) error {
|
||||||
|
log.Println("开始检查订单簿数据完整性...")
|
||||||
|
err := checkAccountOrderBooks(accountID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("检查账户 %s 的订单簿失败: %w", accountID, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkAccountOrderBooks 检查单个账户的订单簿数据
|
||||||
|
func checkAccountOrderBooks(accountID string) error {
|
||||||
|
|
||||||
|
// 获取该账户最新的持仓日期
|
||||||
|
var latestYmd int
|
||||||
|
err := impl.DBService.Model(&models.CollectorPosition{}).
|
||||||
|
Where("account_id = ?", accountID).
|
||||||
|
Select("MAX(ymd)").
|
||||||
|
Scan(&latestYmd).Error
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("查询最新持仓日期失败: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if latestYmd == 0 {
|
||||||
|
log.Printf("账户 %s 没有持仓记录", accountID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("账户 %s 最新持仓日期: %d", accountID, latestYmd)
|
||||||
|
|
||||||
|
// 获取该账户最新日期的持仓列表
|
||||||
|
var latestPositions []models.CollectorPosition
|
||||||
|
err = impl.DBService.Where("account_id = ? AND ymd = ?", accountID, latestYmd).
|
||||||
|
Find(&latestPositions).Error
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("查询持仓失败: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("账户 %s 有 %d 个持仓股票需要检查", accountID, len(latestPositions))
|
||||||
|
|
||||||
|
// 1. 检查股票持仓列表是否有未闭合的订单簿记录
|
||||||
|
for _, position := range latestPositions {
|
||||||
|
// 跳过无持仓的股票
|
||||||
|
if position.Volume <= 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
var existingOrderBooks models.OrderBook
|
||||||
|
err := impl.DBService.Where("account_id = ? AND stock_code = ? AND is_closed = ?", accountID, position.Code, false).First(&existingOrderBooks).Error
|
||||||
|
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||||
|
// 没有找到未闭合的订单簿记录,写入新的记录
|
||||||
|
newOrderBook := models.OrderBook{
|
||||||
|
AccountID: accountID,
|
||||||
|
StockCode: position.Code,
|
||||||
|
Ymd: latestYmd,
|
||||||
|
BuyPrice: position.AvgPrice,
|
||||||
|
BuyVolume: position.Volume,
|
||||||
|
IsClosed: false,
|
||||||
|
}
|
||||||
|
if err := impl.DBService.Create(&newOrderBook).Error; err != nil {
|
||||||
|
log.Printf("创建订单簿记录失败: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
log.Printf("新建买入订单簿: account=%s, stock=%s, volume=%d", accountID, position.Code, position.Volume)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. 检查所有未闭合的订单簿记录开仓成本价格与CollectorOrders对比,是否正确,不正确采用CollectorOrders中的价格更新
|
||||||
|
var orderBooks []models.OrderBook
|
||||||
|
err = impl.DBService.Where("account_id = ? AND is_closed = ?", accountID, false).Find(&orderBooks).Error
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("查询未闭合的订单簿记录失败: %w", err)
|
||||||
|
}
|
||||||
|
for _, orderBook := range orderBooks {
|
||||||
|
newAvgPrice, err := GetOrderAvgPrice(orderBook.StockCode, accountID, orderBook.BuyVolume)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("获取订单簿 %s 的开仓成本价格失败: %v", orderBook.StockCode, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if newAvgPrice > orderBook.BuyPrice {
|
||||||
|
log.Printf("订单簿 %s 的开仓成本价格不一致, 更新前: %v, 更新后: %v", orderBook.StockCode, orderBook.BuyPrice, newAvgPrice)
|
||||||
|
orderBook.BuyPrice = newAvgPrice
|
||||||
|
if err := impl.DBService.Save(&orderBook).Error; err != nil {
|
||||||
|
log.Printf("更新订单簿 %s 的开仓成本价格失败: %v", orderBook.StockCode, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func GetOrderAvgPrice(code, account_id string, amount int) (float64, error) {
|
||||||
|
if amount <= 0 {
|
||||||
|
return 0, fmt.Errorf("数量必须大于0")
|
||||||
|
}
|
||||||
|
var orders []models.CollectorOrder
|
||||||
|
impl.DBService.Model(&models.CollectorOrder{}).
|
||||||
|
Where("stock_code = ? AND account_id = ? AND offset_flag = ?", code, account_id, types.FLAG_BUY).
|
||||||
|
Order("id desc").
|
||||||
|
Find(&orders)
|
||||||
|
|
||||||
|
var stepIdx = 0
|
||||||
|
var currentAmount int
|
||||||
|
for idx, order := range orders {
|
||||||
|
currentAmount += order.TradedVolume
|
||||||
|
if currentAmount == amount {
|
||||||
|
stepIdx = idx
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
var avgPrice float64
|
||||||
|
newOrders := orders[0 : stepIdx+1]
|
||||||
|
for _, order := range newOrders {
|
||||||
|
avgPrice += order.TradedPrice
|
||||||
|
}
|
||||||
|
avgPrice = utils.FloatRound(avgPrice/float64(len(newOrders)), 2)
|
||||||
|
|
||||||
|
return avgPrice, nil
|
||||||
|
}
|
||||||
@@ -14,17 +14,27 @@ import (
|
|||||||
var (
|
var (
|
||||||
COLLECTOR_URL = "http://localhost:5000/status"
|
COLLECTOR_URL = "http://localhost:5000/status"
|
||||||
COLLECTION_INTERVAL = 30
|
COLLECTION_INTERVAL = 30
|
||||||
|
HOSTNAME = "localhost"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Boot() {
|
func Boot() {
|
||||||
log.Println("=== QMT数据采集器启动 ===")
|
log.Println("=== QMT数据采集器启动 ===")
|
||||||
|
|
||||||
log.Printf("采集地址: %s", COLLECTOR_URL)
|
log.Printf("采集地址: %s", COLLECTOR_URL)
|
||||||
log.Printf("采集间隔: %d秒", COLLECTION_INTERVAL)
|
log.Printf("采集间隔: %d秒", COLLECTION_INTERVAL)
|
||||||
|
host, err := os.Hostname()
|
||||||
|
if err != nil {
|
||||||
|
panic(fmt.Errorf("获取主机名失败: %v", err))
|
||||||
|
} else {
|
||||||
|
HOSTNAME = host
|
||||||
|
}
|
||||||
|
log.Printf("主机名称: %s", HOSTNAME)
|
||||||
|
|
||||||
// 创建采集器
|
// 创建采集器
|
||||||
coll := NewCollector(COLLECTOR_URL)
|
coll := NewCollector(COLLECTOR_URL)
|
||||||
|
|
||||||
|
// 启动时检查并补全订单簿数据
|
||||||
|
// CheckAndRepairOrderBooks(cfg.AccountID)
|
||||||
|
|
||||||
// 创建cron调度器
|
// 创建cron调度器
|
||||||
c := cron.New(cron.WithSeconds())
|
c := cron.New(cron.WithSeconds())
|
||||||
|
|
||||||
@@ -33,7 +43,7 @@ func Boot() {
|
|||||||
log.Printf("定时任务表达式: %s", cronSpec)
|
log.Printf("定时任务表达式: %s", cronSpec)
|
||||||
|
|
||||||
// 添加定时任务
|
// 添加定时任务
|
||||||
_, err := c.AddFunc(cronSpec, func() {
|
_, err = c.AddFunc(cronSpec, func() {
|
||||||
runCollection(coll)
|
runCollection(coll)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -82,11 +92,17 @@ func runCollection(coll *Collector) {
|
|||||||
|
|
||||||
// 数据有变化,保存到数据库
|
// 数据有变化,保存到数据库
|
||||||
log.Println("数据已变化,开始存储到数据库...")
|
log.Println("数据已变化,开始存储到数据库...")
|
||||||
if err := SaveData(status); err != nil {
|
if err := SaveData(&status.Data); err != nil {
|
||||||
log.Printf("保存数据失败: %v", err)
|
log.Printf("保存数据失败: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 保存节点状态(有则更新,没有则新增)
|
||||||
|
if err := SaveNodeStatus(&status.Status); err != nil {
|
||||||
|
log.Printf("保存节点状态失败: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// 记录成功的日志
|
// 记录成功的日志
|
||||||
// if err := store.SaveCollectionLog(dataHash, ymd, true, "数据保存成功"); err != nil {
|
// if err := store.SaveCollectionLog(dataHash, ymd, true, "数据保存成功"); err != nil {
|
||||||
// log.Printf("保存采集日志失败: %v", err)
|
// log.Printf("保存采集日志失败: %v", err)
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package logic
|
|||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.apinb.com/quant/collector/internal/impl"
|
"git.apinb.com/quant/collector/internal/impl"
|
||||||
@@ -12,9 +13,9 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// SaveStatus 保存完整状态数据(使用事务)
|
// SaveStatus 保存完整状态数据(使用事务)
|
||||||
func SaveData(status *types.StatusData) error {
|
func SaveData(data *types.Data) error {
|
||||||
// 验证必要字段 - AccountID是所有Upsert的共同条件
|
// 验证必要字段 - AccountID是所有Upsert的共同条件
|
||||||
if status.Data.Assets.AccountID == "" {
|
if data.Assets.AccountID == "" {
|
||||||
return fmt.Errorf("账户ID不能为空")
|
return fmt.Errorf("账户ID不能为空")
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -24,16 +25,16 @@ func SaveData(status *types.StatusData) error {
|
|||||||
|
|
||||||
// 保存资产快照 (先查询后更新/插入)
|
// 保存资产快照 (先查询后更新/插入)
|
||||||
var existingAsset models.CollectorAssets
|
var existingAsset models.CollectorAssets
|
||||||
err := impl.DBService.Where("account_id = ? AND ymd = ?", status.Data.Assets.AccountID, ymd).First(&existingAsset).Error
|
err := impl.DBService.Where("account_id = ? AND ymd = ?", data.Assets.AccountID, ymd).First(&existingAsset).Error
|
||||||
|
|
||||||
asset := models.CollectorAssets{
|
asset := models.CollectorAssets{
|
||||||
AccountID: status.Data.Assets.AccountID,
|
AccountID: data.Assets.AccountID,
|
||||||
Ymd: ymd,
|
Ymd: ymd,
|
||||||
Cash: status.Data.Assets.Cash,
|
Cash: data.Assets.Cash,
|
||||||
FrozenCash: status.Data.Assets.FrozenCash,
|
FrozenCash: data.Assets.FrozenCash,
|
||||||
MarketValue: status.Data.Assets.MarketValue,
|
MarketValue: data.Assets.MarketValue,
|
||||||
Profit: status.Data.Assets.Profit,
|
Profit: data.Assets.Profit,
|
||||||
TotalAsset: status.Data.Assets.TotalAsset,
|
TotalAsset: data.Assets.TotalAsset,
|
||||||
CollectedAt: now,
|
CollectedAt: now,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -56,8 +57,8 @@ func SaveData(status *types.StatusData) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 批量保存订单 (先查询后更新/插入)
|
// 批量保存订单 (先查询后更新/插入)
|
||||||
if len(status.Data.Orders) > 0 {
|
if len(data.Orders) > 0 {
|
||||||
for _, order := range status.Data.Orders {
|
for _, order := range data.Orders {
|
||||||
// 验证必要条件: OrderID和StockCode
|
// 验证必要条件: OrderID和StockCode
|
||||||
if order.OrderID == 0 {
|
if order.OrderID == 0 {
|
||||||
continue
|
continue
|
||||||
@@ -78,14 +79,14 @@ func SaveData(status *types.StatusData) error {
|
|||||||
|
|
||||||
// 查询是否存在
|
// 查询是否存在
|
||||||
var cnt int64
|
var cnt int64
|
||||||
impl.DBService.Where("account_id = ? AND order_id = ? AND ymd = ?", status.Data.Assets.AccountID, order.OrderID, ymd).Count(&cnt)
|
impl.DBService.Model(&models.CollectorOrder{}).Where("account_id = ? AND order_id = ? AND ymd = ?", data.Assets.AccountID, order.OrderID, ymd).Count(&cnt)
|
||||||
if cnt > 0 {
|
if cnt > 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
orderRecord := models.CollectorOrder{
|
orderRecord := models.CollectorOrder{
|
||||||
OrderID: order.OrderID,
|
OrderID: order.OrderID,
|
||||||
AccountID: status.Data.Assets.AccountID,
|
AccountID: data.Assets.AccountID,
|
||||||
StockCode: order.StockCode,
|
StockCode: order.StockCode,
|
||||||
Ymd: ymd,
|
Ymd: ymd,
|
||||||
Price: order.Price,
|
Price: order.Price,
|
||||||
@@ -107,16 +108,16 @@ func SaveData(status *types.StatusData) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 处理订单簿逻辑
|
// 处理订单簿逻辑
|
||||||
if err := processOrderBook(status.Data.Assets.AccountID, order, ymd, now, open_price); err != nil {
|
// if err := processOrderBook(status.Data.Assets.AccountID, order, ymd, now, open_price); err != nil {
|
||||||
fmt.Printf("处理订单簿失败: %v\n", err)
|
// fmt.Printf("处理订单簿失败: %v\n", err)
|
||||||
// 不返回错误,避免影响主流程
|
// // 不返回错误,避免影响主流程
|
||||||
}
|
// }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 批量保存持仓 (先查询后更新/插入)
|
// 批量保存持仓 (先查询后更新/插入)
|
||||||
if len(status.Data.Positions) > 0 {
|
if len(data.Positions) > 0 {
|
||||||
for _, pos := range status.Data.Positions {
|
for _, pos := range data.Positions {
|
||||||
// 验证必要条件: Code
|
// 验证必要条件: Code
|
||||||
if pos.Code == "" {
|
if pos.Code == "" {
|
||||||
continue
|
continue
|
||||||
@@ -125,10 +126,10 @@ func SaveData(status *types.StatusData) error {
|
|||||||
// 查询是否存在
|
// 查询是否存在
|
||||||
var existingPosition models.CollectorPosition
|
var existingPosition models.CollectorPosition
|
||||||
err := impl.DBService.Where("account_id = ? AND code = ? AND ymd = ?",
|
err := impl.DBService.Where("account_id = ? AND code = ? AND ymd = ?",
|
||||||
status.Data.Assets.AccountID, pos.Code, ymd).First(&existingPosition).Error
|
data.Assets.AccountID, pos.Code, ymd).First(&existingPosition).Error
|
||||||
|
|
||||||
positionRecord := models.CollectorPosition{
|
positionRecord := models.CollectorPosition{
|
||||||
AccountID: status.Data.Assets.AccountID,
|
AccountID: data.Assets.AccountID,
|
||||||
Code: pos.Code,
|
Code: pos.Code,
|
||||||
Ymd: ymd,
|
Ymd: ymd,
|
||||||
Volume: pos.Volume,
|
Volume: pos.Volume,
|
||||||
@@ -177,29 +178,106 @@ func SaveData(status *types.StatusData) error {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SaveNodeStatus 保存节点状态数据(有则更新,没有则新增)
|
||||||
|
func SaveNodeStatus(status *types.Config) error {
|
||||||
|
// 验证必要字段
|
||||||
|
if status.ConfigKey == "" {
|
||||||
|
return fmt.Errorf("配置键不能为空")
|
||||||
|
}
|
||||||
|
|
||||||
|
// 查询是否存在
|
||||||
|
var existingNode models.CollectorNode
|
||||||
|
err := impl.DBService.Where("config_key = ?", status.ConfigKey).First(&existingNode).Error
|
||||||
|
|
||||||
|
nodeRecord := models.CollectorNode{
|
||||||
|
ConfigKey: status.ConfigKey,
|
||||||
|
HomeName: status.HomeName,
|
||||||
|
ProjectRoot: status.ProjectRoot,
|
||||||
|
QmtStatus: status.QmtStatus,
|
||||||
|
StartTime: status.StartTime,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err == gorm.ErrRecordNotFound {
|
||||||
|
// 记录不存在,插入新记录
|
||||||
|
if err := impl.DBService.Create(&nodeRecord).Error; err != nil {
|
||||||
|
return fmt.Errorf("插入节点状态失败: %w", err)
|
||||||
|
}
|
||||||
|
log.Printf("新建节点状态: config_key=%s, home_name=%s, qmt_status=%s",
|
||||||
|
status.ConfigKey, status.HomeName, status.QmtStatus)
|
||||||
|
} else if err != nil {
|
||||||
|
// 查询出错
|
||||||
|
return fmt.Errorf("查询节点状态失败: %w", err)
|
||||||
|
} else {
|
||||||
|
// 记录存在,检查是否有变化
|
||||||
|
if nodeRecord.HomeName != existingNode.HomeName ||
|
||||||
|
nodeRecord.ProjectRoot != existingNode.ProjectRoot ||
|
||||||
|
nodeRecord.QmtStatus != existingNode.QmtStatus ||
|
||||||
|
nodeRecord.StartTime != existingNode.StartTime {
|
||||||
|
// 有变化,更新现有记录
|
||||||
|
nodeRecord.ID = existingNode.ID
|
||||||
|
if err := impl.DBService.Save(&nodeRecord).Error; err != nil {
|
||||||
|
return fmt.Errorf("更新节点状态失败: %w", err)
|
||||||
|
}
|
||||||
|
log.Printf("更新节点状态: config_key=%s, home_name=%s, qmt_status=%s",
|
||||||
|
status.ConfigKey, status.HomeName, status.QmtStatus)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// processOrderBook 处理订单簿逻辑
|
// processOrderBook 处理订单簿逻辑
|
||||||
func processOrderBook(accountID string, order types.Order, ymd int, now time.Time, openPrice float64) error {
|
func processOrderBook(accountID string, order types.Order, ymd int, now time.Time, openPrice float64) error {
|
||||||
// OffsetFlag: 通常 0=买入, 1=卖出 (需要根据实际系统确认)
|
// OffsetFlag: 通常 0=买入, 1=卖出 (需要根据实际系统确认)
|
||||||
// 这里假设: OffsetFlag == 0 表示买入, OffsetFlag == 1 表示卖出
|
// 这里假设: OffsetFlag == 0 表示买入, OffsetFlag == 1 表示卖出
|
||||||
|
|
||||||
if order.OffsetFlag == 0 {
|
if order.OffsetFlag == types.FLAG_BUY {
|
||||||
// 买入订单 - 创建新的订单簿记录
|
// 先检查有无未闭合的订单簿记录
|
||||||
orderBook := models.OrderBook{
|
var existingOrderBook models.OrderBook
|
||||||
AccountID: accountID,
|
err := impl.DBService.Where(
|
||||||
StockCode: order.StockCode,
|
"account_id = ? AND stock_code = ? AND is_closed = ?",
|
||||||
Ymd: ymd,
|
accountID, order.StockCode, false,
|
||||||
BuyOrderID: order.OrderID,
|
).Order("buy_time DESC").First(&existingOrderBook).Error
|
||||||
BuyPrice: order.TradedPrice,
|
|
||||||
BuyVolume: order.TradedVolume,
|
|
||||||
BuyTime: order.OrderTime,
|
|
||||||
BuyCollectedAt: now,
|
|
||||||
IsClosed: false,
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := impl.DBService.Create(&orderBook).Error; err != nil {
|
if err == gorm.ErrRecordNotFound {
|
||||||
return fmt.Errorf("创建订单簿记录失败: %w", err)
|
// 没有未闭合的订单簿记录,创建新的
|
||||||
|
orderBook := models.OrderBook{
|
||||||
|
AccountID: accountID,
|
||||||
|
StockCode: order.StockCode,
|
||||||
|
Ymd: ymd,
|
||||||
|
BuyOrderID: fmt.Sprintf("%d", order.OrderID),
|
||||||
|
BuyPrice: order.TradedPrice,
|
||||||
|
BuyVolume: order.TradedVolume,
|
||||||
|
BuyTime: order.OrderTime,
|
||||||
|
BuyCollectedAt: now,
|
||||||
|
IsClosed: false,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := impl.DBService.Create(&orderBook).Error; err != nil {
|
||||||
|
return fmt.Errorf("创建订单簿记录失败: %w", err)
|
||||||
|
}
|
||||||
|
log.Printf("新建订单簿: account=%s, stock=%s, orderID=%d, price=%.4f, volume=%d",
|
||||||
|
accountID, order.StockCode, order.OrderID, order.TradedPrice, order.TradedVolume)
|
||||||
|
} else if err != nil {
|
||||||
|
return fmt.Errorf("查询订单簿记录失败: %w", err)
|
||||||
|
} else {
|
||||||
|
// 存在未闭合的订单簿记录,更新数量和价格(加权平均)
|
||||||
|
totalVolume := existingOrderBook.BuyVolume + order.TradedVolume
|
||||||
|
totalAmount := existingOrderBook.BuyPrice*float64(existingOrderBook.BuyVolume) + order.TradedPrice*float64(order.TradedVolume)
|
||||||
|
newAvgPrice := totalAmount / float64(totalVolume)
|
||||||
|
|
||||||
|
existingOrderBook.BuyVolume = totalVolume
|
||||||
|
existingOrderBook.BuyPrice = newAvgPrice
|
||||||
|
// 追加订单ID到 BuyOrderID(用逗号分隔)
|
||||||
|
existingOrderBook.BuyOrderID = existingOrderBook.BuyOrderID + "," + fmt.Sprintf("%d", order.OrderID)
|
||||||
|
|
||||||
|
if err := impl.DBService.Save(&existingOrderBook).Error; err != nil {
|
||||||
|
return fmt.Errorf("更新订单簿记录失败: %w", err)
|
||||||
|
}
|
||||||
|
log.Printf("更新订单簿: account=%s, stock=%s, 新订单ID=%d, 总数量=%d, 平均价格=%.4f",
|
||||||
|
accountID, order.StockCode, order.OrderID, totalVolume, newAvgPrice)
|
||||||
}
|
}
|
||||||
} else if order.OffsetFlag == 1 {
|
} else if order.OffsetFlag == types.FLAG_SELL {
|
||||||
// 卖出订单 - 查找对应的买入记录并闭合
|
// 卖出订单 - 查找对应的买入记录并闭合
|
||||||
var orderBook models.OrderBook
|
var orderBook models.OrderBook
|
||||||
// 查找同一账户、同一股票、未闭合的订单簿记录
|
// 查找同一账户、同一股票、未闭合的订单簿记录
|
||||||
@@ -216,17 +294,37 @@ func processOrderBook(accountID string, order types.Order, ymd int, now time.Tim
|
|||||||
return fmt.Errorf("查询订单簿记录失败: %w", err)
|
return fmt.Errorf("查询订单簿记录失败: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 计算盈亏
|
// 计算盈亏(支持部分卖出)
|
||||||
buyAmount := float64(orderBook.BuyVolume) * orderBook.BuyPrice
|
sellVolume := order.TradedVolume
|
||||||
sellAmount := float64(order.TradedVolume) * order.TradedPrice
|
sellPrice := order.TradedPrice
|
||||||
profit := sellAmount - buyAmount
|
buyPrice := orderBook.BuyPrice
|
||||||
|
|
||||||
|
// 如果卖出数量小于买入数量,按比例计算盈亏
|
||||||
|
var profit float64
|
||||||
var profitRate float64
|
var profitRate float64
|
||||||
if buyAmount > 0 {
|
|
||||||
profitRate = (profit / buyAmount) * 100
|
if sellVolume >= orderBook.BuyVolume {
|
||||||
|
// 全部卖出或超额卖出
|
||||||
|
buyAmount := float64(orderBook.BuyVolume) * buyPrice
|
||||||
|
sellAmount := float64(sellVolume) * sellPrice
|
||||||
|
profit = sellAmount - buyAmount
|
||||||
|
if buyAmount > 0 {
|
||||||
|
profitRate = (profit / buyAmount) * 100
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// 部分卖出,按比例计算
|
||||||
|
sellRatio := float64(sellVolume) / float64(orderBook.BuyVolume)
|
||||||
|
buyAmount := float64(orderBook.BuyVolume) * buyPrice * sellRatio
|
||||||
|
sellAmount := float64(sellVolume) * sellPrice
|
||||||
|
profit = sellAmount - buyAmount
|
||||||
|
if buyAmount > 0 {
|
||||||
|
profitRate = (profit / buyAmount) * 100
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 更新订单簿记录
|
// 更新订单簿记录
|
||||||
orderBook.SellOrderID = &order.OrderID
|
sellOrderID := fmt.Sprintf("%d", order.OrderID)
|
||||||
|
orderBook.SellOrderID = &sellOrderID
|
||||||
orderBook.SellPrice = &order.TradedPrice
|
orderBook.SellPrice = &order.TradedPrice
|
||||||
orderBook.SellVolume = &order.TradedVolume
|
orderBook.SellVolume = &order.TradedVolume
|
||||||
orderBook.SellTime = &order.OrderTime
|
orderBook.SellTime = &order.OrderTime
|
||||||
@@ -238,6 +336,8 @@ func processOrderBook(accountID string, order types.Order, ymd int, now time.Tim
|
|||||||
if err := impl.DBService.Save(&orderBook).Error; err != nil {
|
if err := impl.DBService.Save(&orderBook).Error; err != nil {
|
||||||
return fmt.Errorf("更新订单簿记录失败: %w", err)
|
return fmt.Errorf("更新订单簿记录失败: %w", err)
|
||||||
}
|
}
|
||||||
|
log.Printf("闭合订单簿: account=%s, stock=%s, buyOrderID=%s, sellOrderID=%s, profit=%.2f",
|
||||||
|
accountID, order.StockCode, orderBook.BuyOrderID, sellOrderID, profit)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
28
internal/models/collect_node.go
Normal file
28
internal/models/collect_node.go
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
package models
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.apinb.com/bsm-sdk/core/database"
|
||||||
|
)
|
||||||
|
|
||||||
|
// CollectorNode 节点状态数据库模型
|
||||||
|
type CollectorNode struct {
|
||||||
|
ID uint `json:"id" gorm:"primaryKey;comment:主键ID"`
|
||||||
|
ConfigKey string `json:"config_key" gorm:"type:varchar(100);not null;uniqueIndex;comment:配置键"`
|
||||||
|
HomeName string `json:"home_name" gorm:"type:varchar(100);not null;comment:主机名称"`
|
||||||
|
ProjectRoot string `json:"project_root" gorm:"type:varchar(500);not null;comment:项目根目录"`
|
||||||
|
QmtStatus string `json:"qmt_status" gorm:"type:varchar(50);not null;comment:QMT连接状态"`
|
||||||
|
StartTime int64 `json:"start_time" gorm:"not null;comment:启动时间戳(纳秒)"`
|
||||||
|
UpdatedAt time.Time `json:"updated_at" gorm:"autoUpdateTime;comment:记录更新时间"`
|
||||||
|
CreatedAt time.Time `json:"created_at" gorm:"autoCreateTime;comment:记录创建时间"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
database.AppendMigrate(&CollectorNode{})
|
||||||
|
}
|
||||||
|
|
||||||
|
// TableName 设置表名
|
||||||
|
func (CollectorNode) TableName() string {
|
||||||
|
return "collector_node"
|
||||||
|
}
|
||||||
@@ -15,14 +15,14 @@ type OrderBook struct {
|
|||||||
Ymd int `json:"ymd" gorm:"not null;index;comment:采集日期(年月日数字格式,如20260407)"`
|
Ymd int `json:"ymd" gorm:"not null;index;comment:采集日期(年月日数字格式,如20260407)"`
|
||||||
|
|
||||||
// 买入信息
|
// 买入信息
|
||||||
BuyOrderID int64 `json:"buy_order_id" gorm:"not null;index;comment:买入订单ID"`
|
BuyOrderID string `json:"buy_order_id" gorm:"not null;index;comment:买入订单ID"`
|
||||||
BuyPrice float64 `json:"buy_price" gorm:"type:decimal(10,4);not null;default:0;column:buy_price;comment:买入价格"`
|
BuyPrice float64 `json:"buy_price" gorm:"type:decimal(10,4);not null;default:0;column:buy_price;comment:买入价格"`
|
||||||
BuyVolume int `json:"buy_volume" gorm:"not null;default:0;column:buy_volume;comment:买入数量"`
|
BuyVolume int `json:"buy_volume" gorm:"not null;default:0;column:buy_volume;comment:买入数量"`
|
||||||
BuyTime int64 `json:"buy_time" gorm:"not null;column:buy_time;comment:买入时间戳"`
|
BuyTime int64 `json:"buy_time" gorm:"not null;column:buy_time;comment:买入时间戳"`
|
||||||
BuyCollectedAt time.Time `json:"buy_collected_at" gorm:"not null;column:buy_collected_at;comment:买入数据采集时间"`
|
BuyCollectedAt time.Time `json:"buy_collected_at" gorm:"not null;column:buy_collected_at;comment:买入数据采集时间"`
|
||||||
|
|
||||||
// 卖出信息 (初始为空,卖出时填充)
|
// 卖出信息 (初始为空,卖出时填充)
|
||||||
SellOrderID *int64 `json:"sell_order_id" gorm:"index;comment:卖出订单ID"`
|
SellOrderID *string `json:"sell_order_id" gorm:"index;comment:卖出订单ID"`
|
||||||
SellPrice *float64 `json:"sell_price" gorm:"type:decimal(10,4);column:sell_price;comment:卖出价格"`
|
SellPrice *float64 `json:"sell_price" gorm:"type:decimal(10,4);column:sell_price;comment:卖出价格"`
|
||||||
SellVolume *int `json:"sell_volume" gorm:"column:sell_volume;comment:卖出数量"`
|
SellVolume *int `json:"sell_volume" gorm:"column:sell_volume;comment:卖出数量"`
|
||||||
SellTime *int64 `json:"sell_time" gorm:"column:sell_time;comment:卖出时间戳"`
|
SellTime *int64 `json:"sell_time" gorm:"column:sell_time;comment:卖出时间戳"`
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ type StatusData struct {
|
|||||||
// Data 数据部分
|
// Data 数据部分
|
||||||
type Data struct {
|
type Data struct {
|
||||||
Assets Assets `json:"assets"`
|
Assets Assets `json:"assets"`
|
||||||
Orders []Order `json:"order"`
|
Orders []Order `json:"orders"`
|
||||||
Positions []Position `json:"positions"`
|
Positions []Position `json:"positions"`
|
||||||
TickData map[string]Tick `json:"tick_data"`
|
TickData map[string]Tick `json:"tick_data"`
|
||||||
}
|
}
|
||||||
|
|||||||
6
internal/types/vars.go
Normal file
6
internal/types/vars.go
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
package types
|
||||||
|
|
||||||
|
var (
|
||||||
|
FLAG_BUY int = 48 // 买入,开仓
|
||||||
|
FLAG_SELL int = 49 // 卖出,平仓
|
||||||
|
)
|
||||||
@@ -1,3 +1,2 @@
|
|||||||
#!/bin/bash
|
#!/bin/bash
|
||||||
|
go build -ldflags="-s -w" -o D:\work\quant\qmt\bin\collector.exe ./cmd/main.go
|
||||||
go build -o D:\work\quant\qmt\bin\collector.exe ./cmd/main.go
|
|
||||||
Reference in New Issue
Block a user