fix bug
This commit is contained in:
@@ -32,6 +32,13 @@ QMT数据采集器是一个专门用于从QMT量化交易平台采集实时交
|
||||
- 完整的采集日志记录,包括成功/失败状态和数据变化情况
|
||||
- 详细的错误信息记录,便于问题排查和系统维护
|
||||
|
||||
### 6. 订单簿数据完整性保障
|
||||
- **启动时自动检查**:程序启动时自动检查所有账户的订单簿数据
|
||||
- **基于持仓校验**:根据最新持仓列表智能识别遗漏的买入记录
|
||||
- **智能补全机制**:自动检测并补全缺失的买入/卖出订单簿记录
|
||||
- **交易闭合验证**:确保每笔买入都有对应的卖出记录,形成完整交易周期
|
||||
- **FIFO匹配原则**:卖出时按买入时间先后顺序匹配(先进先出)
|
||||
|
||||
## 技术特性
|
||||
|
||||
- **高性能**:采用Go语言开发,具有出色的并发处理能力
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"git.apinb.com/quant/collector/internal/conf"
|
||||
"git.apinb.com/quant/collector/internal/impl"
|
||||
"git.apinb.com/quant/collector/internal/logic"
|
||||
)
|
||||
|
||||
func main() {
|
||||
impl.NewImpl()
|
||||
|
||||
logic.Boot()
|
||||
c := conf.Get()
|
||||
logic.Boot(c)
|
||||
}
|
||||
|
||||
29
internal/conf/config.go
Normal file
29
internal/conf/config.go
Normal file
@@ -0,0 +1,29 @@
|
||||
package conf
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Hostname string
|
||||
Account string
|
||||
AccountID string
|
||||
}
|
||||
|
||||
func Get() *Config {
|
||||
host, _ := os.Hostname()
|
||||
host = strings.ToLower(host)
|
||||
|
||||
switch host {
|
||||
case "desktop-t5lh34v":
|
||||
return &Config{Hostname: host, Account: "david", AccountID: "86037237"}
|
||||
case "t8zznqs49f1ju7q":
|
||||
return &Config{Hostname: host, Account: "liao", AccountID: "8889399698"}
|
||||
case "dhzd0ojkpctafmm":
|
||||
return &Config{Hostname: host, Account: "zhang", AccountID: "8889616198"}
|
||||
case "f7tib45aqk4n10h":
|
||||
return &Config{Hostname: host, Account: "long", AccountID: "8886508526"}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
135
internal/logic/book.go
Normal file
135
internal/logic/book.go
Normal file
@@ -0,0 +1,135 @@
|
||||
package logic
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"git.apinb.com/bsm-sdk/core/utils"
|
||||
"git.apinb.com/quant/collector/internal/impl"
|
||||
"git.apinb.com/quant/collector/internal/models"
|
||||
"git.apinb.com/quant/collector/internal/types"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
// CheckAndRepairOrderBooks 检查并补全订单簿数据
|
||||
func CheckAndRepairOrderBooks(accountID string) error {
|
||||
log.Println("开始检查订单簿数据完整性...")
|
||||
err := checkAccountOrderBooks(accountID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("检查账户 %s 的订单簿失败: %w", accountID, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// checkAccountOrderBooks 检查单个账户的订单簿数据
|
||||
func checkAccountOrderBooks(accountID string) error {
|
||||
|
||||
// 获取该账户最新的持仓日期
|
||||
var latestYmd int
|
||||
err := impl.DBService.Model(&models.CollectorPosition{}).
|
||||
Where("account_id = ?", accountID).
|
||||
Select("MAX(ymd)").
|
||||
Scan(&latestYmd).Error
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("查询最新持仓日期失败: %w", err)
|
||||
}
|
||||
|
||||
if latestYmd == 0 {
|
||||
log.Printf("账户 %s 没有持仓记录", accountID)
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Printf("账户 %s 最新持仓日期: %d", accountID, latestYmd)
|
||||
|
||||
// 获取该账户最新日期的持仓列表
|
||||
var latestPositions []models.CollectorPosition
|
||||
err = impl.DBService.Where("account_id = ? AND ymd = ?", accountID, latestYmd).
|
||||
Find(&latestPositions).Error
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("查询持仓失败: %w", err)
|
||||
}
|
||||
|
||||
log.Printf("账户 %s 有 %d 个持仓股票需要检查", accountID, len(latestPositions))
|
||||
|
||||
// 1. 检查股票持仓列表是否有未闭合的订单簿记录
|
||||
for _, position := range latestPositions {
|
||||
// 跳过无持仓的股票
|
||||
if position.Volume <= 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
var existingOrderBooks models.OrderBook
|
||||
err := impl.DBService.Where("account_id = ? AND stock_code = ? AND is_closed = ?", accountID, position.Code, false).First(&existingOrderBooks).Error
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
// 没有找到未闭合的订单簿记录,写入新的记录
|
||||
newOrderBook := models.OrderBook{
|
||||
AccountID: accountID,
|
||||
StockCode: position.Code,
|
||||
Ymd: latestYmd,
|
||||
BuyPrice: position.AvgPrice,
|
||||
BuyVolume: position.Volume,
|
||||
IsClosed: false,
|
||||
}
|
||||
if err := impl.DBService.Create(&newOrderBook).Error; err != nil {
|
||||
log.Printf("创建订单簿记录失败: %v", err)
|
||||
continue
|
||||
}
|
||||
log.Printf("新建买入订单簿: account=%s, stock=%s, volume=%d", accountID, position.Code, position.Volume)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// 2. 检查所有未闭合的订单簿记录开仓成本价格与CollectorOrders对比,是否正确,不正确采用CollectorOrders中的价格更新
|
||||
var orderBooks []models.OrderBook
|
||||
err = impl.DBService.Where("account_id = ? AND is_closed = ?", accountID, false).Find(&orderBooks).Error
|
||||
if err != nil {
|
||||
return fmt.Errorf("查询未闭合的订单簿记录失败: %w", err)
|
||||
}
|
||||
for _, orderBook := range orderBooks {
|
||||
newAvgPrice, err := GetOrderAvgPrice(orderBook.StockCode, accountID, orderBook.BuyVolume)
|
||||
if err != nil {
|
||||
log.Printf("获取订单簿 %s 的开仓成本价格失败: %v", orderBook.StockCode, err)
|
||||
continue
|
||||
}
|
||||
if newAvgPrice > orderBook.BuyPrice {
|
||||
log.Printf("订单簿 %s 的开仓成本价格不一致, 更新前: %v, 更新后: %v", orderBook.StockCode, orderBook.BuyPrice, newAvgPrice)
|
||||
orderBook.BuyPrice = newAvgPrice
|
||||
if err := impl.DBService.Save(&orderBook).Error; err != nil {
|
||||
log.Printf("更新订单簿 %s 的开仓成本价格失败: %v", orderBook.StockCode, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
func GetOrderAvgPrice(code, account_id string, amount int) (float64, error) {
|
||||
if amount <= 0 {
|
||||
return 0, fmt.Errorf("数量必须大于0")
|
||||
}
|
||||
var orders []models.CollectorOrder
|
||||
impl.DBService.Model(&models.CollectorOrder{}).
|
||||
Where("stock_code = ? AND account_id = ? AND offset_flag = ?", code, account_id, types.FLAG_BUY).
|
||||
Order("id desc").
|
||||
Find(&orders)
|
||||
|
||||
var stepIdx = 0
|
||||
var currentAmount int
|
||||
for idx, order := range orders {
|
||||
currentAmount += order.TradedVolume
|
||||
if currentAmount == amount {
|
||||
stepIdx = idx
|
||||
break
|
||||
}
|
||||
}
|
||||
var avgPrice float64
|
||||
newOrders := orders[0 : stepIdx+1]
|
||||
for _, order := range newOrders {
|
||||
avgPrice += order.TradedPrice
|
||||
}
|
||||
avgPrice = utils.FloatRound(avgPrice/float64(len(newOrders)), 2)
|
||||
|
||||
return avgPrice, nil
|
||||
}
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"git.apinb.com/quant/collector/internal/conf"
|
||||
"github.com/robfig/cron/v3"
|
||||
)
|
||||
|
||||
@@ -16,15 +17,26 @@ var (
|
||||
COLLECTION_INTERVAL = 30
|
||||
)
|
||||
|
||||
func Boot() {
|
||||
func Boot(cfg *conf.Config) {
|
||||
log.Println("=== QMT数据采集器启动 ===")
|
||||
|
||||
log.Printf("采集地址: %s", COLLECTOR_URL)
|
||||
log.Printf("采集间隔: %d秒", COLLECTION_INTERVAL)
|
||||
|
||||
if cfg == nil {
|
||||
log.Println("配置为空")
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("账户: %s", cfg.Account)
|
||||
log.Printf("账户ID: %s", cfg.AccountID)
|
||||
log.Printf("主机名: %s", cfg.Hostname)
|
||||
|
||||
// 创建采集器
|
||||
coll := NewCollector(COLLECTOR_URL)
|
||||
|
||||
// 启动时检查并补全订单簿数据
|
||||
CheckAndRepairOrderBooks(cfg.AccountID)
|
||||
|
||||
// 创建cron调度器
|
||||
c := cron.New(cron.WithSeconds())
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ package logic
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"git.apinb.com/quant/collector/internal/impl"
|
||||
@@ -182,24 +183,53 @@ func processOrderBook(accountID string, order types.Order, ymd int, now time.Tim
|
||||
// OffsetFlag: 通常 0=买入, 1=卖出 (需要根据实际系统确认)
|
||||
// 这里假设: OffsetFlag == 0 表示买入, OffsetFlag == 1 表示卖出
|
||||
|
||||
if order.OffsetFlag == 0 {
|
||||
// 买入订单 - 创建新的订单簿记录
|
||||
orderBook := models.OrderBook{
|
||||
AccountID: accountID,
|
||||
StockCode: order.StockCode,
|
||||
Ymd: ymd,
|
||||
BuyOrderID: order.OrderID,
|
||||
BuyPrice: order.TradedPrice,
|
||||
BuyVolume: order.TradedVolume,
|
||||
BuyTime: order.OrderTime,
|
||||
BuyCollectedAt: now,
|
||||
IsClosed: false,
|
||||
}
|
||||
if order.OffsetFlag == types.FLAG_BUY {
|
||||
// 先检查有无未闭合的订单簿记录
|
||||
var existingOrderBook models.OrderBook
|
||||
err := impl.DBService.Where(
|
||||
"account_id = ? AND stock_code = ? AND is_closed = ?",
|
||||
accountID, order.StockCode, false,
|
||||
).Order("buy_time DESC").First(&existingOrderBook).Error
|
||||
|
||||
if err := impl.DBService.Create(&orderBook).Error; err != nil {
|
||||
return fmt.Errorf("创建订单簿记录失败: %w", err)
|
||||
if err == gorm.ErrRecordNotFound {
|
||||
// 没有未闭合的订单簿记录,创建新的
|
||||
orderBook := models.OrderBook{
|
||||
AccountID: accountID,
|
||||
StockCode: order.StockCode,
|
||||
Ymd: ymd,
|
||||
BuyOrderID: fmt.Sprintf("%d", order.OrderID),
|
||||
BuyPrice: order.TradedPrice,
|
||||
BuyVolume: order.TradedVolume,
|
||||
BuyTime: order.OrderTime,
|
||||
BuyCollectedAt: now,
|
||||
IsClosed: false,
|
||||
}
|
||||
|
||||
if err := impl.DBService.Create(&orderBook).Error; err != nil {
|
||||
return fmt.Errorf("创建订单簿记录失败: %w", err)
|
||||
}
|
||||
log.Printf("新建订单簿: account=%s, stock=%s, orderID=%d, price=%.4f, volume=%d",
|
||||
accountID, order.StockCode, order.OrderID, order.TradedPrice, order.TradedVolume)
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("查询订单簿记录失败: %w", err)
|
||||
} else {
|
||||
// 存在未闭合的订单簿记录,更新数量和价格(加权平均)
|
||||
totalVolume := existingOrderBook.BuyVolume + order.TradedVolume
|
||||
totalAmount := existingOrderBook.BuyPrice*float64(existingOrderBook.BuyVolume) + order.TradedPrice*float64(order.TradedVolume)
|
||||
newAvgPrice := totalAmount / float64(totalVolume)
|
||||
|
||||
existingOrderBook.BuyVolume = totalVolume
|
||||
existingOrderBook.BuyPrice = newAvgPrice
|
||||
// 追加订单ID到 BuyOrderID(用逗号分隔)
|
||||
existingOrderBook.BuyOrderID = existingOrderBook.BuyOrderID + "," + fmt.Sprintf("%d", order.OrderID)
|
||||
|
||||
if err := impl.DBService.Save(&existingOrderBook).Error; err != nil {
|
||||
return fmt.Errorf("更新订单簿记录失败: %w", err)
|
||||
}
|
||||
log.Printf("更新订单簿: account=%s, stock=%s, 新订单ID=%d, 总数量=%d, 平均价格=%.4f",
|
||||
accountID, order.StockCode, order.OrderID, totalVolume, newAvgPrice)
|
||||
}
|
||||
} else if order.OffsetFlag == 1 {
|
||||
} else if order.OffsetFlag == types.FLAG_SELL {
|
||||
// 卖出订单 - 查找对应的买入记录并闭合
|
||||
var orderBook models.OrderBook
|
||||
// 查找同一账户、同一股票、未闭合的订单簿记录
|
||||
@@ -216,17 +246,37 @@ func processOrderBook(accountID string, order types.Order, ymd int, now time.Tim
|
||||
return fmt.Errorf("查询订单簿记录失败: %w", err)
|
||||
}
|
||||
|
||||
// 计算盈亏
|
||||
buyAmount := float64(orderBook.BuyVolume) * orderBook.BuyPrice
|
||||
sellAmount := float64(order.TradedVolume) * order.TradedPrice
|
||||
profit := sellAmount - buyAmount
|
||||
// 计算盈亏(支持部分卖出)
|
||||
sellVolume := order.TradedVolume
|
||||
sellPrice := order.TradedPrice
|
||||
buyPrice := orderBook.BuyPrice
|
||||
|
||||
// 如果卖出数量小于买入数量,按比例计算盈亏
|
||||
var profit float64
|
||||
var profitRate float64
|
||||
if buyAmount > 0 {
|
||||
profitRate = (profit / buyAmount) * 100
|
||||
|
||||
if sellVolume >= orderBook.BuyVolume {
|
||||
// 全部卖出或超额卖出
|
||||
buyAmount := float64(orderBook.BuyVolume) * buyPrice
|
||||
sellAmount := float64(sellVolume) * sellPrice
|
||||
profit = sellAmount - buyAmount
|
||||
if buyAmount > 0 {
|
||||
profitRate = (profit / buyAmount) * 100
|
||||
}
|
||||
} else {
|
||||
// 部分卖出,按比例计算
|
||||
sellRatio := float64(sellVolume) / float64(orderBook.BuyVolume)
|
||||
buyAmount := float64(orderBook.BuyVolume) * buyPrice * sellRatio
|
||||
sellAmount := float64(sellVolume) * sellPrice
|
||||
profit = sellAmount - buyAmount
|
||||
if buyAmount > 0 {
|
||||
profitRate = (profit / buyAmount) * 100
|
||||
}
|
||||
}
|
||||
|
||||
// 更新订单簿记录
|
||||
orderBook.SellOrderID = &order.OrderID
|
||||
sellOrderID := fmt.Sprintf("%d", order.OrderID)
|
||||
orderBook.SellOrderID = &sellOrderID
|
||||
orderBook.SellPrice = &order.TradedPrice
|
||||
orderBook.SellVolume = &order.TradedVolume
|
||||
orderBook.SellTime = &order.OrderTime
|
||||
@@ -238,6 +288,8 @@ func processOrderBook(accountID string, order types.Order, ymd int, now time.Tim
|
||||
if err := impl.DBService.Save(&orderBook).Error; err != nil {
|
||||
return fmt.Errorf("更新订单簿记录失败: %w", err)
|
||||
}
|
||||
log.Printf("闭合订单簿: account=%s, stock=%s, buyOrderID=%s, sellOrderID=%s, profit=%.2f",
|
||||
accountID, order.StockCode, orderBook.BuyOrderID, sellOrderID, profit)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -15,14 +15,14 @@ type OrderBook struct {
|
||||
Ymd int `json:"ymd" gorm:"not null;index;comment:采集日期(年月日数字格式,如20260407)"`
|
||||
|
||||
// 买入信息
|
||||
BuyOrderID int64 `json:"buy_order_id" gorm:"not null;index;comment:买入订单ID"`
|
||||
BuyOrderID string `json:"buy_order_id" gorm:"not null;index;comment:买入订单ID"`
|
||||
BuyPrice float64 `json:"buy_price" gorm:"type:decimal(10,4);not null;default:0;column:buy_price;comment:买入价格"`
|
||||
BuyVolume int `json:"buy_volume" gorm:"not null;default:0;column:buy_volume;comment:买入数量"`
|
||||
BuyTime int64 `json:"buy_time" gorm:"not null;column:buy_time;comment:买入时间戳"`
|
||||
BuyCollectedAt time.Time `json:"buy_collected_at" gorm:"not null;column:buy_collected_at;comment:买入数据采集时间"`
|
||||
|
||||
// 卖出信息 (初始为空,卖出时填充)
|
||||
SellOrderID *int64 `json:"sell_order_id" gorm:"index;comment:卖出订单ID"`
|
||||
SellOrderID *string `json:"sell_order_id" gorm:"index;comment:卖出订单ID"`
|
||||
SellPrice *float64 `json:"sell_price" gorm:"type:decimal(10,4);column:sell_price;comment:卖出价格"`
|
||||
SellVolume *int `json:"sell_volume" gorm:"column:sell_volume;comment:卖出数量"`
|
||||
SellTime *int64 `json:"sell_time" gorm:"column:sell_time;comment:卖出时间戳"`
|
||||
|
||||
6
internal/types/vars.go
Normal file
6
internal/types/vars.go
Normal file
@@ -0,0 +1,6 @@
|
||||
package types
|
||||
|
||||
var (
|
||||
FLAG_BUY int = 48 // 买入,开仓
|
||||
FLAG_SELL int = 49 // 卖出,平仓
|
||||
)
|
||||
Reference in New Issue
Block a user