fix bug
This commit is contained in:
159
client_go/TIMESTAMP_MECHANISM.md
Normal file
159
client_go/TIMESTAMP_MECHANISM.md
Normal file
@@ -0,0 +1,159 @@
|
||||
# 共享内存时间戳机制说明
|
||||
|
||||
## 问题背景
|
||||
|
||||
原始实现中,Python 端每 30 秒更新一次共享内存数据,而 Go 客户端每秒读取一次。由于共享内存中的数据不会自动清除,Go 客户端每次读取的都是同一份旧数据,导致看起来像是每秒都收到了新行情。
|
||||
|
||||
## 解决方案:时间戳版本检测
|
||||
|
||||
### 核心思路
|
||||
|
||||
在共享内存头部添加**时间戳字段**,Go 客户端通过检测时间戳变化来判断数据是否真正更新。
|
||||
|
||||
### 共享内存头部结构(16字节)
|
||||
|
||||
```
|
||||
偏移量 | 字段名 | 类型 | 说明
|
||||
--------|---------------|--------|------------------
|
||||
0-3 | version | uint32 | 版本号(当前为1)
|
||||
4-7 | write_pos | uint32 | 写入位置指针
|
||||
8-11 | last_data_len | uint32 | 最后一条数据的长度
|
||||
12-15 | timestamp | uint32 | Unix 时间戳(秒级)
|
||||
```
|
||||
|
||||
### 工作流程
|
||||
|
||||
#### Python 端(生产者)
|
||||
|
||||
1. 每次调用 `publish_tick()` 时:
|
||||
- 序列化数据并写入共享内存
|
||||
- **更新头部时间戳为当前 Unix 时间戳**
|
||||
|
||||
2. 关键代码位置:`src/qmt/tick_push.py` 第 93-97 行
|
||||
|
||||
```python
|
||||
# 更新写入位置、最后数据长度和时间戳
|
||||
new_pos = current_pos + 4 + data_len
|
||||
timestamp = int(time.time()) # Unix 时间戳
|
||||
struct.pack_into('<I', buf, 4, new_pos)
|
||||
struct.pack_into('<I', buf, 8, data_len)
|
||||
struct.pack_into('<I', buf, 12, timestamp) # 更新时间戳
|
||||
```
|
||||
|
||||
#### Go 端(消费者)
|
||||
|
||||
1. `TickReader` 结构体新增字段:
|
||||
```go
|
||||
type TickReader struct {
|
||||
shmName string
|
||||
bufferSize int
|
||||
mappedFile windows.Handle
|
||||
view uintptr
|
||||
lastTimestamp uint32 // 上次读取的时间戳
|
||||
}
|
||||
```
|
||||
|
||||
2. `ReadLatestTick()` 方法增加时间戳检测:
|
||||
- 读取头部时间戳
|
||||
- **与上次读取的时间戳比较**
|
||||
- 如果相同,返回错误 "数据未更新"
|
||||
- 如果不同,继续读取数据并更新 `lastTimestamp`
|
||||
|
||||
3. 关键代码位置:`client_go/tick_reader.go` 第 105-152 行
|
||||
|
||||
```go
|
||||
// 解析时间戳
|
||||
currentTimestamp := binary.LittleEndian.Uint32(header[12:16])
|
||||
|
||||
// 检查数据是否更新
|
||||
if currentTimestamp == r.lastTimestamp {
|
||||
return nil, fmt.Errorf("数据未更新")
|
||||
}
|
||||
|
||||
// ... 读取数据 ...
|
||||
|
||||
// 更新最后时间戳
|
||||
r.lastTimestamp = currentTimestamp
|
||||
```
|
||||
|
||||
### 优势
|
||||
|
||||
1. **高频读取无浪费**:Go 客户端可以每秒甚至更频繁地读取,但只在数据真正更新时才处理
|
||||
2. **实时响应**:一旦 Python 端写入新数据,Go 端最多延迟 1 秒就能检测到
|
||||
3. **向后兼容**:只需修改头部最后一个字段(reserved → timestamp),不影响其他逻辑
|
||||
4. **简单高效**:无需额外的事件通知机制,纯轮询方式实现
|
||||
|
||||
### 测试验证
|
||||
|
||||
#### Python 端测试
|
||||
|
||||
运行测试脚本验证时间戳是否正确更新:
|
||||
|
||||
```bash
|
||||
cd d:\work\quant\qmt
|
||||
python test\test_timestamp.py
|
||||
```
|
||||
|
||||
预期输出:
|
||||
```
|
||||
============================================================
|
||||
测试共享内存时间戳机制
|
||||
============================================================
|
||||
|
||||
[1] 第一次发布数据...
|
||||
时间戳: 1775996840 (20:27:20)
|
||||
|
||||
[2] 等待 2 秒...
|
||||
|
||||
[3] 第二次发布数据(相同内容)...
|
||||
时间戳: 1775996842 (20:27:22)
|
||||
|
||||
✓ 时间戳已更新: 2 秒
|
||||
|
||||
[4] 再次等待 2 秒...
|
||||
|
||||
[5] 第三次发布数据(不同内容)...
|
||||
时间戳: 1775996844 (20:27:24)
|
||||
|
||||
✓ 时间戳继续更新: 2 秒
|
||||
|
||||
============================================================
|
||||
✓ 所有测试通过!
|
||||
============================================================
|
||||
```
|
||||
|
||||
#### Go 端测试
|
||||
|
||||
编译并运行 Go 客户端:
|
||||
|
||||
```bash
|
||||
cd d:\work\quant\qmt\client_go
|
||||
go build -o tick_reader.exe tick_reader.go
|
||||
.\tick_reader.exe
|
||||
```
|
||||
|
||||
预期行为:
|
||||
- Go 客户端每秒读取一次共享内存
|
||||
- 只有当 Python 端更新数据时(每 30 秒),才会打印 "接收到新行情"
|
||||
- 其余时间会跳过处理(因为时间戳未变化)
|
||||
|
||||
### 性能对比
|
||||
|
||||
| 方案 | 读取频率 | CPU 占用 | 响应延迟 | 实现复杂度 |
|
||||
|------|---------|---------|---------|-----------|
|
||||
| 方案1:调整读取间隔 | 30秒/次 | 低 | 最高 30 秒 | 简单 |
|
||||
| **方案2:时间戳检测** | **1秒/次** | **低** | **最高 1 秒** | **中等** |
|
||||
| 方案3:事件通知 | 实时 | 最低 | 毫秒级 | 复杂 |
|
||||
|
||||
### 注意事项
|
||||
|
||||
1. **时间戳精度**:当前使用秒级时间戳,如果需要更高精度可改为毫秒级(需要 8 字节存储)
|
||||
2. **时钟回拨**:理论上系统时钟回拨可能导致时间戳倒退,但在实际交易中几乎不可能发生
|
||||
3. **溢出问题**:uint32 时间戳可使用到 2106 年,短期内无需担心
|
||||
|
||||
### 未来优化方向
|
||||
|
||||
如果需要更低延迟的响应,可以考虑:
|
||||
1. 缩短 Python 端的 `DEFAULT_TICK_INTERVAL`(如改为 5 秒)
|
||||
2. 使用 Windows Event 对象实现真正的推送机制
|
||||
3. 采用环形缓冲区 + 序列号的无锁设计
|
||||
10
client_go/go.mod
Normal file
10
client_go/go.mod
Normal file
@@ -0,0 +1,10 @@
|
||||
module tick_reader
|
||||
|
||||
go 1.21
|
||||
|
||||
require (
|
||||
github.com/vmihailenco/msgpack/v5 v5.4.1
|
||||
golang.org/x/sys v0.15.0
|
||||
)
|
||||
|
||||
require github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
|
||||
14
client_go/go.sum
Normal file
14
client_go/go.sum
Normal file
@@ -0,0 +1,14 @@
|
||||
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
|
||||
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
|
||||
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
|
||||
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
|
||||
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
|
||||
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
|
||||
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
206
client_go/tick_reader.go
Normal file
206
client_go/tick_reader.go
Normal file
@@ -0,0 +1,206 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/vmihailenco/msgpack/v5"
|
||||
"golang.org/x/sys/windows"
|
||||
)
|
||||
|
||||
// TickData 行情数据结构
|
||||
type TickData map[string]interface{}
|
||||
|
||||
// TickReader 共享内存读取器
|
||||
type TickReader struct {
|
||||
shmName string
|
||||
bufferSize int
|
||||
mappedFile windows.Handle
|
||||
view uintptr
|
||||
lastTimestamp uint32 // 上次读取的时间戳
|
||||
}
|
||||
|
||||
// NewTickReader 创建新的读取器
|
||||
func NewTickReader(shmName string, bufferSize int) (*TickReader, error) {
|
||||
if shmName == "" {
|
||||
shmName = "tick_ipc"
|
||||
}
|
||||
if bufferSize == 0 {
|
||||
bufferSize = 10 * 1024 * 1024 // 10MB
|
||||
}
|
||||
|
||||
reader := &TickReader{
|
||||
shmName: shmName,
|
||||
bufferSize: bufferSize,
|
||||
lastTimestamp: 0,
|
||||
}
|
||||
|
||||
// 打开共享内存
|
||||
err := reader.openSharedMemory()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("打开共享内存失败: %w", err)
|
||||
}
|
||||
|
||||
log.Printf("✓ 成功连接到共享内存: %s (大小: %d bytes)", shmName, bufferSize)
|
||||
return reader, nil
|
||||
}
|
||||
|
||||
// openSharedMemory 打开 Windows 共享内存
|
||||
func (r *TickReader) openSharedMemory() error {
|
||||
// 打开已存在的共享内存(Python 创建的)
|
||||
namePtr, err := windows.UTF16PtrFromString(r.shmName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("转换字符串失败: %w", err)
|
||||
}
|
||||
|
||||
// 使用 syscall 直接调用 Windows API
|
||||
kernel32 := windows.NewLazySystemDLL("kernel32.dll")
|
||||
openFileMappingProc := kernel32.NewProc("OpenFileMappingW")
|
||||
mapViewOfFileProc := kernel32.NewProc("MapViewOfFile")
|
||||
|
||||
// 调用 OpenFileMappingW
|
||||
handle, _, callErr := openFileMappingProc.Call(
|
||||
uintptr(windows.FILE_MAP_READ),
|
||||
0, // bInheritHandle
|
||||
uintptr(unsafe.Pointer(namePtr)),
|
||||
)
|
||||
if callErr != nil && callErr.Error() != "The operation completed successfully." {
|
||||
return fmt.Errorf("打开文件映射失败: %w", callErr)
|
||||
}
|
||||
if handle == 0 {
|
||||
return fmt.Errorf("打开文件映射失败: 返回空句柄")
|
||||
}
|
||||
mappingHandle := windows.Handle(handle)
|
||||
|
||||
// 映射视图
|
||||
view, _, callErr := mapViewOfFileProc.Call(
|
||||
uintptr(mappingHandle),
|
||||
uintptr(windows.FILE_MAP_READ),
|
||||
0,
|
||||
0,
|
||||
uintptr(r.bufferSize),
|
||||
)
|
||||
if callErr != nil && callErr.Error() != "The operation completed successfully." {
|
||||
windows.CloseHandle(mappingHandle)
|
||||
return fmt.Errorf("映射视图失败: %w", callErr)
|
||||
}
|
||||
if view == 0 {
|
||||
windows.CloseHandle(mappingHandle)
|
||||
return fmt.Errorf("映射视图失败: 返回空地址")
|
||||
}
|
||||
|
||||
r.mappedFile = mappingHandle
|
||||
r.view = view
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ReadLatestTick 读取最新的行情数据
|
||||
func (r *TickReader) ReadLatestTick() (TickData, error) {
|
||||
if r.view == 0 {
|
||||
return nil, fmt.Errorf("共享内存未初始化")
|
||||
}
|
||||
|
||||
// 读取头部信息(16字节)
|
||||
// 格式: [version(4)] [write_pos(4)] [last_data_len(4)] [timestamp(4)]
|
||||
header := unsafeByteSlice(r.view, 0, 16)
|
||||
|
||||
// 解析时间戳
|
||||
currentTimestamp := binary.LittleEndian.Uint32(header[12:16])
|
||||
|
||||
// 检查数据是否更新
|
||||
if currentTimestamp == r.lastTimestamp {
|
||||
return nil, fmt.Errorf("数据未更新")
|
||||
}
|
||||
|
||||
// 解析最后数据长度
|
||||
lastDataLen := binary.LittleEndian.Uint32(header[8:12])
|
||||
if lastDataLen == 0 {
|
||||
return nil, fmt.Errorf("无可用数据")
|
||||
}
|
||||
|
||||
// 解析写入位置
|
||||
writePos := binary.LittleEndian.Uint32(header[4:8])
|
||||
|
||||
// 计算数据起始位置
|
||||
var dataStart uint32
|
||||
if writePos >= lastDataLen+4 {
|
||||
dataStart = 16 + writePos - lastDataLen - 4
|
||||
} else {
|
||||
dataStart = 16
|
||||
}
|
||||
|
||||
// 读取数据长度
|
||||
dataLenBytes := unsafeByteSlice(r.view, int(dataStart), 4)
|
||||
dataLen := binary.LittleEndian.Uint32(dataLenBytes)
|
||||
|
||||
// 读取 msgpack 数据
|
||||
packedData := unsafeByteSlice(r.view, int(dataStart)+4, int(dataLen))
|
||||
|
||||
// 反序列化 msgpack
|
||||
var tickData TickData
|
||||
err := msgpack.Unmarshal(packedData, &tickData)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("反序列化失败: %w", err)
|
||||
}
|
||||
|
||||
// 更新最后时间戳
|
||||
r.lastTimestamp = currentTimestamp
|
||||
|
||||
return tickData, nil
|
||||
}
|
||||
|
||||
// unsafeByteSlice 将内存地址转换为字节切片
|
||||
func unsafeByteSlice(base uintptr, offset int, length int) []byte {
|
||||
return (*[1 << 30]byte)(unsafe.Pointer(base + uintptr(offset)))[:length:length]
|
||||
}
|
||||
|
||||
// Close 关闭读取器
|
||||
func (r *TickReader) Close() {
|
||||
if r.view != 0 {
|
||||
windows.UnmapViewOfFile(r.view)
|
||||
r.view = 0
|
||||
}
|
||||
if r.mappedFile != 0 {
|
||||
windows.CloseHandle(r.mappedFile)
|
||||
r.mappedFile = 0
|
||||
}
|
||||
log.Println("✓ 共享内存连接已关闭")
|
||||
}
|
||||
|
||||
// StartContinuousRead 持续读取行情数据
|
||||
func (r *TickReader) StartContinuousRead(interval time.Duration, handler func(TickData)) {
|
||||
log.Printf("开始持续读取,间隔: %v", interval)
|
||||
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for range ticker.C {
|
||||
tick, err := r.ReadLatestTick()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if handler != nil {
|
||||
handler(tick)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
// 创建读取器
|
||||
reader, err := NewTickReader("tick_ipc", 0)
|
||||
if err != nil {
|
||||
log.Fatalf("初始化失败: %v", err)
|
||||
}
|
||||
defer reader.Close()
|
||||
|
||||
// 示例2:持续读取(取消注释以启用)
|
||||
fmt.Println("\n=== 持续读取示例(按 Ctrl+C 停止)===")
|
||||
reader.StartContinuousRead(time.Second, func(tick TickData) {
|
||||
fmt.Printf("\n[%s] 接收到新行情:%d个标的\n", time.Now().Format("15:04:05"), len(tick))
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user