This commit is contained in:
zxr
2026-03-30 15:26:16 +08:00
parent 5939a3d62d
commit 279021bf86
25 changed files with 1794 additions and 0 deletions

View File

@@ -0,0 +1,64 @@
package ingest
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"time"
"git.apinb.com/ops/logs/internal/config"
)
// AlertReceiveBody 与 alert ReceiveRequest 对齐(含必填 raw_data
type AlertReceiveBody struct {
AlertName string `json:"alert_name"`
Summary string `json:"summary"`
Description string `json:"description"`
SeverityCode string `json:"severity_code"`
Value string `json:"value"`
Threshold string `json:"threshold"`
Labels map[string]string `json:"labels"`
Agent string `json:"agent"`
PolicyID uint `json:"policy_id"`
RawData json.RawMessage `json:"raw_data"`
}
func forwardAlert(body AlertReceiveBody) error {
cfg := config.Spec.AlertForward
if cfg == nil || !cfg.Enabled || cfg.BaseURL == "" {
return nil
}
if len(body.RawData) == 0 {
return fmt.Errorf("raw_data 不能为空")
}
if body.AlertName == "" {
body.AlertName = "日志告警"
}
if body.PolicyID == 0 && cfg.DefaultPolicyID > 0 {
body.PolicyID = cfg.DefaultPolicyID
}
raw, err := json.Marshal(body)
if err != nil {
return err
}
url := cfg.BaseURL + "/Alert/v1/alerts/receive"
req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(raw))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
if cfg.InternalKey != "" {
req.Header.Set("X-Internal-Key", cfg.InternalKey)
}
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("alert returned HTTP %d", resp.StatusCode)
}
return nil
}

426
internal/ingest/engine.go Normal file
View File

@@ -0,0 +1,426 @@
package ingest
import (
"encoding/json"
"fmt"
"net"
"regexp"
"sort"
"strconv"
"strings"
"sync"
"time"
"git.apinb.com/ops/logs/internal/config"
"git.apinb.com/ops/logs/internal/impl"
"git.apinb.com/ops/logs/internal/models"
"github.com/gosnmp/gosnmp"
)
type Engine struct {
mu sync.RWMutex
trapDict []models.TrapDictionaryEntry
syslogRules []models.SyslogRule
trapRules []models.TrapRule
shields []models.TrapShield
}
var Global = &Engine{}
func (e *Engine) Refresh() error {
var dict []models.TrapDictionaryEntry
var syslog []models.SyslogRule
var trap []models.TrapRule
var shield []models.TrapShield
if err := impl.DBService.Where("enabled = ?", true).Find(&dict).Error; err != nil {
return err
}
sort.Slice(dict, func(i, j int) bool {
return len(dict[i].OIDPrefix) > len(dict[j].OIDPrefix)
})
if err := impl.DBService.Where("enabled = ?", true).Find(&syslog).Error; err != nil {
return err
}
sort.Slice(syslog, func(i, j int) bool { return syslog[i].Priority > syslog[j].Priority })
if err := impl.DBService.Where("enabled = ?", true).Find(&trap).Error; err != nil {
return err
}
sort.Slice(trap, func(i, j int) bool { return trap[i].Priority > trap[j].Priority })
if err := impl.DBService.Where("enabled = ?", true).Find(&shield).Error; err != nil {
return err
}
e.mu.Lock()
e.trapDict = dict
e.syslogRules = syslog
e.trapRules = trap
e.shields = shield
e.mu.Unlock()
return nil
}
func StartRefresher() {
interval := config.Spec.Ingest.RuleRefreshSecs
if interval <= 0 {
interval = 30
}
_ = Global.Refresh()
go func() {
t := time.NewTicker(time.Duration(interval) * time.Second)
defer t.Stop()
for range t.C {
_ = Global.Refresh()
}
}()
}
func normOID(s string) string {
s = strings.TrimSpace(s)
return strings.TrimPrefix(s, ".")
}
func (e *Engine) HandleSyslog(addr *net.UDPAddr, payload []byte) {
parsed := parseSyslogPayload(payload)
device := parsed.Hostname
if device == "" {
device = addr.IP.String()
}
detailObj := map[string]interface{}{
"priority": parsed.Priority,
"hostname": parsed.Hostname,
"tag": parsed.Tag,
"message": parsed.Message,
}
detailBytes, _ := json.Marshal(detailObj)
summary := formatSyslogSummary(parsed)
sev := syslogPriorityToSeverity(parsed.Priority)
ev := models.LogEvent{
SourceKind: "syslog",
RemoteAddr: addr.String(),
RawPayload: string(payload),
NormalizedSummary: summary,
NormalizedDetail: string(detailBytes),
DeviceName: device,
SeverityCode: sev,
}
e.mu.RLock()
rules := e.syslogRules
e.mu.RUnlock()
var matched *models.SyslogRule
for i := range rules {
if syslogRuleMatches(&rules[i], device, parsed.Message, parsed.RawLine) {
matched = &rules[i]
break
}
}
if err := impl.DBService.Create(&ev).Error; err != nil {
return
}
if matched == nil {
return
}
labels := map[string]string{
"source": "syslog",
"device": device,
"rule_id": strconv.FormatUint(uint64(matched.ID), 10),
"rule_name": matched.Name,
"remote_addr": addr.String(),
}
rawObj := map[string]interface{}{
"source": "syslog",
"received_at": time.Now().UTC().Format(time.RFC3339),
"source_ip": addr.IP.String(),
"rule_id": matched.ID,
"log_entry_id": ev.ID,
"raw_packet": string(payload),
"parsed": detailObj,
}
rawBytes, mErr := json.Marshal(rawObj)
if mErr != nil {
return
}
body := AlertReceiveBody{
AlertName: matched.AlertName,
Summary: summary,
Description: summary,
SeverityCode: firstNonEmpty(matched.SeverityCode, sev),
Value: parsed.Message,
Labels: labels,
Agent: "logs-syslog",
PolicyID: matched.PolicyID,
RawData: rawBytes,
}
if err := forwardAlert(body); err == nil {
_ = impl.DBService.Model(&ev).Update("alert_sent", true).Error
}
}
func syslogRuleMatches(rule *models.SyslogRule, device, message, rawLine string) bool {
if strings.TrimSpace(rule.DeviceNameContains) == "" && strings.TrimSpace(rule.KeywordRegex) == "" {
return false
}
deviceName := strings.ToLower(device)
contains := strings.ToLower(rule.DeviceNameContains)
if contains != "" && !strings.Contains(deviceName, contains) {
return false
}
if rule.KeywordRegex != "" {
re, err := regexp.Compile(rule.KeywordRegex)
if err != nil {
return false
}
if !re.MatchString(message) && !re.MatchString(rawLine) {
return false
}
}
return true
}
func trapShielded(e *Engine, addr *net.UDPAddr, trapOID string, pkt *gosnmp.SnmpPacket) bool {
ip := addr.IP
fp := varbindFingerprint(pkt)
now := time.Now()
e.mu.RLock()
shields := e.shields
e.mu.RUnlock()
for i := range shields {
s := &shields[i]
if !s.Enabled {
continue
}
if strings.TrimSpace(s.SourceIPCIDR) == "" {
continue
}
if !ipMatchesCIDR(ip, s.SourceIPCIDR) {
continue
}
if p := strings.TrimSpace(s.OIDPrefix); p != "" && !strings.HasPrefix(normOID(trapOID), normOID(p)) {
continue
}
if h := strings.TrimSpace(s.InterfaceHint); h != "" && !strings.Contains(fp, h) {
continue
}
if !inTimeWindows(now, s.TimeWindowsJSON) {
continue
}
return true
}
return false
}
func lookupTrapDict(e *Engine, trapOID string) *models.TrapDictionaryEntry {
t := normOID(trapOID)
e.mu.RLock()
dict := e.trapDict
e.mu.RUnlock()
for i := range dict {
if strings.HasPrefix(t, normOID(dict[i].OIDPrefix)) {
return &dict[i]
}
}
return nil
}
func (e *Engine) HandleTrap(addr *net.UDPAddr, pkt *gosnmp.SnmpPacket) {
trapOID := extractTrapOID(pkt)
if trapShielded(e, addr, trapOID, pkt) {
return
}
dict := lookupTrapDict(e, trapOID)
fp := varbindFingerprint(pkt)
vbJSON, _ := json.Marshal(trapVarbinds(pkt))
readable := buildTrapReadable(trapOID, dict, fp)
detailObj := map[string]interface{}{
"trap_oid": trapOID,
"varbinds": trapVarbinds(pkt),
"dict_title": "",
"dict_description": "",
"recovery": "",
}
sev := "warning"
if dict != nil {
detailObj["dict_title"] = dict.Title
detailObj["dict_description"] = dict.Description
detailObj["recovery"] = dict.RecoveryMessage
if dict.SeverityCode != "" {
sev = dict.SeverityCode
}
}
detailBytes, _ := json.Marshal(detailObj)
ev := models.LogEvent{
SourceKind: "snmp_trap",
RemoteAddr: addr.String(),
RawPayload: fp,
NormalizedSummary: readable,
NormalizedDetail: string(detailBytes),
DeviceName: addr.IP.String(),
SeverityCode: sev,
TrapOID: trapOID,
}
if err := impl.DBService.Create(&ev).Error; err != nil {
return
}
e.mu.RLock()
rules := e.trapRules
e.mu.RUnlock()
var matched *models.TrapRule
for i := range rules {
if trapRuleMatches(&rules[i], trapOID, fp) {
matched = &rules[i]
break
}
}
if matched == nil && dict != nil && strings.TrimSpace(dict.SeverityCode) != "" {
matched = &models.TrapRule{
AlertName: firstNonEmpty(dict.Title, "SNMP Trap"),
SeverityCode: dict.SeverityCode,
PolicyID: 0,
}
}
if matched == nil {
return
}
desc := readable
if dict != nil && dict.RecoveryMessage != "" {
desc = readable + "\n恢复建议: " + dict.RecoveryMessage
}
labels := map[string]string{
"source": "snmp_trap",
"trap_oid": trapOID,
"remote_addr": addr.String(),
}
if matched.ID != 0 {
labels["rule_id"] = strconv.FormatUint(uint64(matched.ID), 10)
labels["rule_name"] = matched.Name
}
resolved := map[string]interface{}{}
if dict != nil {
resolved["title"] = dict.Title
resolved["description"] = dict.Description
resolved["recovery"] = dict.RecoveryMessage
}
rawObj := map[string]interface{}{
"source": "snmp_trap",
"received_at": time.Now().UTC().Format(time.RFC3339),
"source_ip": addr.IP.String(),
"log_entry_id": ev.ID,
"trap_oid": trapOID,
"varbinds": trapVarbinds(pkt),
"resolved": resolved,
"pdu_summary": fp,
}
if matched.ID != 0 {
rawObj["rule_id"] = matched.ID
}
rawBytes, mErr := json.Marshal(rawObj)
if mErr != nil {
return
}
body := AlertReceiveBody{
AlertName: firstNonEmpty(matched.AlertName, "SNMP Trap"),
Summary: readable,
Description: desc,
SeverityCode: firstNonEmpty(matched.SeverityCode, sev),
Value: string(vbJSON),
Labels: labels,
Agent: "logs-trap",
PolicyID: matched.PolicyID,
RawData: rawBytes,
}
if err := forwardAlert(body); err == nil {
_ = impl.DBService.Model(&ev).Update("alert_sent", true).Error
}
}
func extractTrapOID(pkt *gosnmp.SnmpPacket) string {
const snmpTrapOID = "1.3.6.1.6.3.1.1.4.1.0"
for _, v := range pkt.Variables {
if v.Name == snmpTrapOID || strings.HasSuffix(v.Name, ".1.3.6.1.6.3.1.1.4.1.0") {
return oidToString(v.Value)
}
}
for _, v := range pkt.Variables {
if strings.Contains(v.Name, "1.3.6.1.6.3.1.1.4.1") {
return oidToString(v.Value)
}
}
return ""
}
func oidToString(val interface{}) string {
switch x := val.(type) {
case string:
return x
case []byte:
return string(x)
default:
return fmt.Sprintf("%v", x)
}
}
func trapVarbinds(pkt *gosnmp.SnmpPacket) []map[string]string {
out := make([]map[string]string, 0, len(pkt.Variables))
for _, v := range pkt.Variables {
out = append(out, map[string]string{
"oid": v.Name,
"type": fmt.Sprintf("%v", v.Type),
"value": fmtVarbindValue(v),
})
}
return out
}
func buildTrapReadable(trapOID string, dict *models.TrapDictionaryEntry, varbindSummary string) string {
if dict != nil && dict.Title != "" {
return dict.Title + " (" + trapOID + ")"
}
if trapOID != "" {
return "Trap " + trapOID
}
return truncate(varbindSummary, 256)
}
func trapRuleMatches(rule *models.TrapRule, trapOID, varbindFP string) bool {
hasOID := strings.TrimSpace(rule.OIDPrefix) != ""
hasRE := strings.TrimSpace(rule.VarbindMatchRegex) != ""
if !hasOID && !hasRE {
return false
}
if hasOID && !strings.HasPrefix(normOID(trapOID), normOID(rule.OIDPrefix)) {
return false
}
if rule.VarbindMatchRegex != "" {
re, err := regexp.Compile(rule.VarbindMatchRegex)
if err != nil {
return false
}
if !re.MatchString(varbindFP) {
return false
}
}
return true
}
func firstNonEmpty(a, b string) string {
if strings.TrimSpace(a) != "" {
return a
}
return b
}

114
internal/ingest/shield.go Normal file
View File

@@ -0,0 +1,114 @@
package ingest
import (
"encoding/json"
"fmt"
"net"
"strconv"
"strings"
"time"
"github.com/gosnmp/gosnmp"
)
type timeWindow struct {
Days []int `json:"days"`
Start string `json:"start"`
End string `json:"end"`
}
func ipMatchesCIDR(ip net.IP, cidr string) bool {
cidr = strings.TrimSpace(cidr)
if cidr == "" {
return false
}
if !strings.Contains(cidr, "/") {
p := net.ParseIP(cidr)
return p != nil && p.Equal(ip)
}
_, network, err := net.ParseCIDR(cidr)
if err != nil {
return false
}
return network.Contains(ip)
}
func inTimeWindows(now time.Time, jsonStr string) bool {
s := strings.TrimSpace(jsonStr)
if s == "" || s == "null" {
return true
}
var windows []timeWindow
if err := json.Unmarshal([]byte(s), &windows); err != nil || len(windows) == 0 {
return true
}
tod := now.Hour()*60 + now.Minute()
wd := int(now.Weekday())
for _, w := range windows {
if len(w.Days) > 0 {
ok := false
for _, d := range w.Days {
if d == wd {
ok = true
break
}
}
if !ok {
continue
}
}
start := parseHHMM(w.Start)
end := parseHHMM(w.End)
if start < 0 || end < 0 {
continue
}
if start <= end {
if tod >= start && tod <= end {
return true
}
} else {
if tod >= start || tod <= end {
return true
}
}
}
return false
}
func parseHHMM(s string) int {
s = strings.TrimSpace(s)
if s == "" {
return -1
}
parts := strings.Split(s, ":")
if len(parts) != 2 {
return -1
}
h, err1 := strconv.Atoi(parts[0])
m, err2 := strconv.Atoi(parts[1])
if err1 != nil || err2 != nil || h < 0 || h > 23 || m < 0 || m > 59 {
return -1
}
return h*60 + m
}
func varbindFingerprint(pkt *gosnmp.SnmpPacket) string {
var b strings.Builder
for _, v := range pkt.Variables {
b.WriteString(v.Name)
b.WriteByte('=')
b.WriteString(fmtVarbindValue(v))
b.WriteByte(';')
}
return b.String()
}
func fmtVarbindValue(v gosnmp.SnmpPDU) string {
switch v.Type {
case gosnmp.OctetString:
if bb, ok := v.Value.([]byte); ok {
return string(bb)
}
}
return fmt.Sprintf("%v", v.Value)
}

View File

@@ -0,0 +1,109 @@
package ingest
import (
"fmt"
"regexp"
"strconv"
"strings"
)
var rePri = regexp.MustCompile(`^<(\d{1,3})>`)
type ParsedSyslog struct {
Priority int
Hostname string
Tag string
Message string
RawLine string
}
func parseSyslogPayload(payload []byte) ParsedSyslog {
line := strings.TrimSpace(string(payload))
p := ParsedSyslog{RawLine: line, Message: line}
if line == "" {
return p
}
rest := line
if m := rePri.FindStringSubmatch(line); len(m) == 2 {
if pri, err := strconv.Atoi(m[1]); err == nil {
p.Priority = pri
}
rest = line[len(m[0]):]
}
rest = strings.TrimSpace(rest)
fields := strings.SplitN(rest, " ", 6)
if len(fields) >= 2 && len(fields[0]) == 1 && fields[0][0] >= '1' && fields[0][0] <= '9' {
if len(fields) >= 4 {
p.Hostname = fields[2]
if len(fields) >= 6 {
p.Message = fields[5]
} else if len(fields) == 5 {
p.Message = fields[4]
}
}
return p
}
tokens := strings.SplitN(rest, " ", 3)
if len(tokens) >= 2 {
if len(tokens) >= 3 && isMonthAbbr(tokens[0]) {
p.Hostname = tokens[2]
if idx := strings.Index(rest, ": "); idx > 0 {
p.Message = strings.TrimSpace(rest[idx+2:])
}
} else {
p.Hostname = tokens[1]
if len(tokens) >= 3 {
tagMsg := tokens[2]
if idx := strings.Index(tagMsg, ": "); idx > 0 {
p.Tag = tagMsg[:idx]
p.Message = strings.TrimSpace(tagMsg[idx+2:])
} else {
p.Message = tagMsg
}
}
}
}
return p
}
func isMonthAbbr(s string) bool {
if len(s) < 3 {
return false
}
mons := []string{"Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"}
for _, m := range mons {
if strings.HasPrefix(s, m) {
return true
}
}
return false
}
func syslogPriorityToSeverity(pri int) string {
sev := pri % 8
switch sev {
case 0, 1, 2:
return "critical"
case 3:
return "major"
case 4:
return "warning"
default:
return "info"
}
}
func formatSyslogSummary(p ParsedSyslog) string {
host := p.Hostname
if host == "" {
host = "unknown-host"
}
return fmt.Sprintf("%s: %s", host, truncate(p.Message, 512))
}
func truncate(s string, n int) string {
if len(s) <= n {
return s
}
return s[:n] + "..."
}

View File

@@ -0,0 +1,32 @@
package ingest
import (
"encoding/json"
"testing"
)
func TestParseSyslogPayloadPri(t *testing.T) {
p := parseSyslogPayload([]byte("<34>Oct 11 22:14:15 mymachine su: 'su root' failed for lonvick on /dev/pts/8"))
if p.Priority != 34 {
t.Fatalf("priority=%d", p.Priority)
}
}
func TestForwardAlertBodyIncludesRawData(t *testing.T) {
raw := []byte(`{"source":"syslog","parsed":{}}`)
b := AlertReceiveBody{
AlertName: "a",
RawData: raw,
}
data, err := json.Marshal(b)
if err != nil {
t.Fatal(err)
}
var dec map[string]json.RawMessage
if err := json.Unmarshal(data, &dec); err != nil {
t.Fatal(err)
}
if string(dec["raw_data"]) != string(raw) {
t.Fatalf("raw_data %s", dec["raw_data"])
}
}

View File

@@ -0,0 +1,41 @@
package ingest
import (
"log"
"net"
"strings"
"git.apinb.com/ops/logs/internal/config"
)
func StartSyslogUDP() {
addr := strings.TrimSpace(config.Spec.Ingest.SyslogListenAddr)
if addr == "" {
return
}
go func() {
pc, err := net.ListenPacket("udp", addr)
if err != nil {
log.Printf("logs: syslog UDP listen %s: %v", addr, err)
return
}
defer pc.Close()
log.Printf("logs: syslog listening UDP %s", addr)
buf := make([]byte, 65536)
for {
n, remote, err := pc.ReadFrom(buf)
if err != nil {
log.Printf("logs: syslog read: %v", err)
continue
}
udpAddr, _ := remote.(*net.UDPAddr)
if udpAddr == nil {
continue
}
p := make([]byte, n)
copy(p, buf[:n])
a := *udpAddr
Global.HandleSyslog(&a, p)
}
}()
}

View File

@@ -0,0 +1,32 @@
package ingest
import (
"log"
"net"
"strings"
"git.apinb.com/ops/logs/internal/config"
"github.com/gosnmp/gosnmp"
)
func StartTrapUDP() {
addr := strings.TrimSpace(config.Spec.Ingest.TrapListenAddr)
if addr == "" {
return
}
go func() {
tl := gosnmp.NewTrapListener()
tl.OnNewTrap = func(pkt *gosnmp.SnmpPacket, u *net.UDPAddr) {
if u == nil || pkt == nil {
return
}
ua := *u
Global.HandleTrap(&ua, pkt)
}
tl.Params = gosnmp.Default
tl.Params.Logger = gosnmp.NewLogger(log.Default())
if err := tl.Listen(addr); err != nil {
log.Printf("logs: trap listener %s: %v", addr, err)
}
}()
}