From bcea56283f7f2d696be9d3628e74fc844af04d42 Mon Sep 17 00:00:00 2001 From: shayan775 Date: Tue, 24 Feb 2026 19:17:02 +0330 Subject: [PATCH 1/2] The too many SQL variables path is now batched. Changed code: Added safe batching limits in inbound.go: safeSQLVariablesPerQuery = 900 safeSaveBatchSize = 50 Fixed addClientTraffic in inbound.go: email IN (...) lookup is now chunked (line 815). tx.Save(dbClientTraffics) is now chunked (line 859). Added nil guard for p.SetOnlineClients(...) (line 855). Hardened adjustTraffics in inbound.go: deduplicates inbound IDs before querying (line 877). chunked inbound IN query (line 890). chunked inbound Save (line 932). Added regression test: inbound_add_client_traffic_test.go Verifies 4,000 client traffic updates succeed and totals are correct. Validation run: go test ./web/service -run TestAddClientTrafficHandlesLargeBatch -count=1 passed go test ./web/service -count=1 passed --- web/service/inbound.go | 79 +++++++++++++++---- .../inbound_add_client_traffic_test.go | 74 +++++++++++++++++ 2 files changed, 138 insertions(+), 15 deletions(-) create mode 100644 web/service/inbound_add_client_traffic_test.go diff --git a/web/service/inbound.go b/web/service/inbound.go index b00473e2..02311cdb 100644 --- a/web/service/inbound.go +++ b/web/service/inbound.go @@ -19,6 +19,13 @@ type InboundService struct { xrayApi xray.XrayAPI } +const ( + // Keep query variables below SQLite's classic 999 limit. + safeSQLVariablesPerQuery = 900 + // Save in small chunks so row-column placeholders stay under SQL var limits. + safeSaveBatchSize = 50 +) + func (s *InboundService) GetInbounds(userId int) ([]*model.Inbound, error) { db := database.GetDB() var inbounds []*model.Inbound @@ -805,9 +812,18 @@ func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTr emails = append(emails, traffic.Email) } dbClientTraffics := make([]*xray.ClientTraffic, 0, len(traffics)) - err = tx.Model(xray.ClientTraffic{}).Where("email IN (?)", emails).Find(&dbClientTraffics).Error - if err != nil { - return err + for start := 0; start < len(emails); start += safeSQLVariablesPerQuery { + end := start + safeSQLVariablesPerQuery + if end > len(emails) { + end = len(emails) + } + + batchClientTraffics := make([]*xray.ClientTraffic, 0, end-start) + err = tx.Model(xray.ClientTraffic{}).Where("email IN ?", emails[start:end]).Find(&batchClientTraffics).Error + if err != nil { + return err + } + dbClientTraffics = append(dbClientTraffics, batchClientTraffics...) } // Avoid empty slice error @@ -836,11 +852,21 @@ func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTr } // Set onlineUsers - p.SetOnlineClients(onlineClients) + if p != nil { + p.SetOnlineClients(onlineClients) + } - err = tx.Save(dbClientTraffics).Error - if err != nil { - logger.Warning("AddClientTraffic update data ", err) + for start := 0; start < len(dbClientTraffics); start += safeSaveBatchSize { + end := start + safeSaveBatchSize + if end > len(dbClientTraffics) { + end = len(dbClientTraffics) + } + + err = tx.Save(dbClientTraffics[start:end]).Error + if err != nil { + logger.Warning("AddClientTraffic update data ", err) + return nil + } } return nil @@ -848,17 +874,31 @@ func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTr func (s *InboundService) adjustTraffics(tx *gorm.DB, dbClientTraffics []*xray.ClientTraffic) ([]*xray.ClientTraffic, error) { inboundIds := make([]int, 0, len(dbClientTraffics)) + seenInboundIds := make(map[int]struct{}, len(dbClientTraffics)) for _, dbClientTraffic := range dbClientTraffics { if dbClientTraffic.ExpiryTime < 0 { + if _, seen := seenInboundIds[dbClientTraffic.InboundId]; seen { + continue + } inboundIds = append(inboundIds, dbClientTraffic.InboundId) + seenInboundIds[dbClientTraffic.InboundId] = struct{}{} } } if len(inboundIds) > 0 { - var inbounds []*model.Inbound - err := tx.Model(model.Inbound{}).Where("id IN (?)", inboundIds).Find(&inbounds).Error - if err != nil { - return nil, err + inbounds := make([]*model.Inbound, 0, len(inboundIds)) + for start := 0; start < len(inboundIds); start += safeSQLVariablesPerQuery { + end := start + safeSQLVariablesPerQuery + if end > len(inboundIds) { + end = len(inboundIds) + } + + batchInbounds := make([]*model.Inbound, 0, end-start) + err := tx.Model(model.Inbound{}).Where("id IN ?", inboundIds[start:end]).Find(&batchInbounds).Error + if err != nil { + return nil, err + } + inbounds = append(inbounds, batchInbounds...) } for inbound_index := range inbounds { settings := map[string]interface{}{} @@ -888,10 +928,19 @@ func (s *InboundService) adjustTraffics(tx *gorm.DB, dbClientTraffics []*xray.Cl inbounds[inbound_index].Settings = string(modifiedSettings) } } - err = tx.Save(inbounds).Error - if err != nil { - logger.Warning("AddClientTraffic update inbounds ", err) - logger.Error(inbounds) + + for start := 0; start < len(inbounds); start += safeSaveBatchSize { + end := start + safeSaveBatchSize + if end > len(inbounds) { + end = len(inbounds) + } + + err := tx.Save(inbounds[start:end]).Error + if err != nil { + logger.Warning("AddClientTraffic update inbounds ", err) + logger.Error(inbounds[start:end]) + break + } } } diff --git a/web/service/inbound_add_client_traffic_test.go b/web/service/inbound_add_client_traffic_test.go new file mode 100644 index 00000000..dbfc7000 --- /dev/null +++ b/web/service/inbound_add_client_traffic_test.go @@ -0,0 +1,74 @@ +package service + +import ( + "fmt" + "testing" + + "github.com/alireza0/x-ui/xray" + "gorm.io/driver/sqlite" + "gorm.io/gorm" +) + +func TestAddClientTrafficHandlesLargeBatch(t *testing.T) { + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + if err != nil { + t.Fatalf("open db: %v", err) + } + + if err := db.AutoMigrate(&xray.ClientTraffic{}); err != nil { + t.Fatalf("auto migrate: %v", err) + } + + const clientCount = 4000 + + seed := make([]*xray.ClientTraffic, 0, clientCount) + updates := make([]*xray.ClientTraffic, 0, clientCount) + for i := 0; i < clientCount; i++ { + email := fmt.Sprintf("user-%05d@example.com", i) + seed = append(seed, &xray.ClientTraffic{ + InboundId: 1, + Enable: true, + Email: email, + }) + updates = append(updates, &xray.ClientTraffic{ + Email: email, + Up: 1, + Down: 2, + }) + } + + if err := db.CreateInBatches(seed, 100).Error; err != nil { + t.Fatalf("seed traffic: %v", err) + } + + tx := db.Begin() + if tx.Error != nil { + t.Fatalf("begin tx: %v", tx.Error) + } + + s := &InboundService{} + if err := s.addClientTraffic(tx, updates); err != nil { + tx.Rollback() + t.Fatalf("add client traffic: %v", err) + } + if err := tx.Commit().Error; err != nil { + t.Fatalf("commit tx: %v", err) + } + + var totalUp int64 + if err := db.Model(&xray.ClientTraffic{}).Select("COALESCE(SUM(up), 0)").Scan(&totalUp).Error; err != nil { + t.Fatalf("sum up: %v", err) + } + + var totalDown int64 + if err := db.Model(&xray.ClientTraffic{}).Select("COALESCE(SUM(down), 0)").Scan(&totalDown).Error; err != nil { + t.Fatalf("sum down: %v", err) + } + + if totalUp != clientCount { + t.Fatalf("unexpected total up: got %d want %d", totalUp, clientCount) + } + if totalDown != 2*clientCount { + t.Fatalf("unexpected total down: got %d want %d", totalDown, 2*clientCount) + } +} From 95afd3006a97063905f539b0d1f110e0c6b82410 Mon Sep 17 00:00:00 2001 From: Alireza Ahmadi Date: Fri, 27 Feb 2026 00:30:50 +0100 Subject: [PATCH 2/2] improve expiration delay start --- web/service/inbound.go | 154 +++++++++++++++++++++-------------------- 1 file changed, 80 insertions(+), 74 deletions(-) diff --git a/web/service/inbound.go b/web/service/inbound.go index 02311cdb..b116514d 100644 --- a/web/service/inbound.go +++ b/web/service/inbound.go @@ -20,10 +20,7 @@ type InboundService struct { } const ( - // Keep query variables below SQLite's classic 999 limit. - safeSQLVariablesPerQuery = 900 - // Save in small chunks so row-column placeholders stay under SQL var limits. - safeSaveBatchSize = 50 + safeBatchSize = 500 ) func (s *InboundService) GetInbounds(userId int) ([]*model.Inbound, error) { @@ -796,6 +793,11 @@ func (s *InboundService) addInboundTraffic(tx *gorm.DB, traffics []*xray.Traffic return nil } +type newExpiryTime struct { + Email string + NewExpiryTime int64 +} + func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTraffic) (err error) { if len(traffics) == 0 { // Empty onlineUsers @@ -812,14 +814,14 @@ func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTr emails = append(emails, traffic.Email) } dbClientTraffics := make([]*xray.ClientTraffic, 0, len(traffics)) - for start := 0; start < len(emails); start += safeSQLVariablesPerQuery { - end := start + safeSQLVariablesPerQuery + for i := 0; i < len(emails); i += safeBatchSize { + end := i + safeBatchSize if end > len(emails) { end = len(emails) } - batchClientTraffics := make([]*xray.ClientTraffic, 0, end-start) - err = tx.Model(xray.ClientTraffic{}).Where("email IN ?", emails[start:end]).Find(&batchClientTraffics).Error + batchClientTraffics := make([]*xray.ClientTraffic, 0, end-i) + err = tx.Model(xray.ClientTraffic{}).Where("email IN ?", emails[i:end]).Find(&batchClientTraffics).Error if err != nil { return err } @@ -831,7 +833,20 @@ func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTr return nil } - dbClientTraffics, err = s.adjustTraffics(tx, dbClientTraffics) + inboundExpiryTimeMap := make(map[int][]newExpiryTime, 0) + for index, t := range dbClientTraffics { + if t.ExpiryTime < 0 { + newClientExpiryTime := (time.Now().Unix() * 1000) - int64(t.ExpiryTime) + newExpiryTime := newExpiryTime{ + Email: t.Email, + NewExpiryTime: newClientExpiryTime, + } + inboundExpiryTimeMap[t.InboundId] = append(inboundExpiryTimeMap[t.InboundId], newExpiryTime) + dbClientTraffics[index].ExpiryTime = newClientExpiryTime + } + } + + err = s.adjustTraffics(tx, inboundExpiryTimeMap) if err != nil { return err } @@ -856,13 +871,13 @@ func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTr p.SetOnlineClients(onlineClients) } - for start := 0; start < len(dbClientTraffics); start += safeSaveBatchSize { - end := start + safeSaveBatchSize + for i := 0; i < len(dbClientTraffics); i += safeBatchSize { + end := i + safeBatchSize if end > len(dbClientTraffics) { end = len(dbClientTraffics) } - err = tx.Save(dbClientTraffics[start:end]).Error + err = tx.Save(dbClientTraffics[i:end]).Error if err != nil { logger.Warning("AddClientTraffic update data ", err) return nil @@ -872,79 +887,70 @@ func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTr return nil } -func (s *InboundService) adjustTraffics(tx *gorm.DB, dbClientTraffics []*xray.ClientTraffic) ([]*xray.ClientTraffic, error) { - inboundIds := make([]int, 0, len(dbClientTraffics)) - seenInboundIds := make(map[int]struct{}, len(dbClientTraffics)) - for _, dbClientTraffic := range dbClientTraffics { - if dbClientTraffic.ExpiryTime < 0 { - if _, seen := seenInboundIds[dbClientTraffic.InboundId]; seen { - continue - } - inboundIds = append(inboundIds, dbClientTraffic.InboundId) - seenInboundIds[dbClientTraffic.InboundId] = struct{}{} - } +func (s *InboundService) adjustTraffics(tx *gorm.DB, inboundExpiryTimeMap map[int][]newExpiryTime) error { + if len(inboundExpiryTimeMap) == 0 { + return nil } - if len(inboundIds) > 0 { - inbounds := make([]*model.Inbound, 0, len(inboundIds)) - for start := 0; start < len(inboundIds); start += safeSQLVariablesPerQuery { - end := start + safeSQLVariablesPerQuery - if end > len(inboundIds) { - end = len(inboundIds) - } - - batchInbounds := make([]*model.Inbound, 0, end-start) - err := tx.Model(model.Inbound{}).Where("id IN ?", inboundIds[start:end]).Find(&batchInbounds).Error - if err != nil { - return nil, err - } - inbounds = append(inbounds, batchInbounds...) + inboundIds := make([]int, 0) + for inId := range inboundExpiryTimeMap { + inboundIds = append(inboundIds, inId) + } + inbounds := make([]*model.Inbound, 0, len(inboundIds)) + for i := 0; i < len(inboundIds); i += safeBatchSize { + end := i + safeBatchSize + if end > len(inboundIds) { + end = len(inboundIds) } - for inbound_index := range inbounds { - settings := map[string]interface{}{} - json.Unmarshal([]byte(inbounds[inbound_index].Settings), &settings) - clients, ok := settings["clients"].([]interface{}) - if ok { - var newClients []interface{} - for client_index := range clients { - c := clients[client_index].(map[string]interface{}) - for traffic_index := range dbClientTraffics { - if dbClientTraffics[traffic_index].ExpiryTime < 0 && c["email"] == dbClientTraffics[traffic_index].Email { - oldExpiryTime := c["expiryTime"].(float64) - newExpiryTime := (time.Now().Unix() * 1000) - int64(oldExpiryTime) - c["expiryTime"] = newExpiryTime - dbClientTraffics[traffic_index].ExpiryTime = newExpiryTime - break - } + + batchInbounds := make([]*model.Inbound, 0, end-i) + err := tx.Model(model.Inbound{}).Where("id IN ?", inboundIds[i:end]).Find(&batchInbounds).Error + if err != nil { + return err + } + inbounds = append(inbounds, batchInbounds...) + } + for inbound_index := range inbounds { + settings := map[string]interface{}{} + json.Unmarshal([]byte(inbounds[inbound_index].Settings), &settings) + clients, ok := settings["clients"].([]interface{}) + inbEmails := inboundExpiryTimeMap[inbounds[inbound_index].Id] + if ok { + var newClients []interface{} + for client_index := range clients { + c := clients[client_index].(map[string]interface{}) + for index := range inbEmails { + if c["email"] == inbEmails[index].Email { + c["expiryTime"] = inbEmails[index].NewExpiryTime + break } - newClients = append(newClients, interface{}(c)) } - settings["clients"] = newClients - modifiedSettings, err := json.MarshalIndent(settings, "", " ") - if err != nil { - return nil, err - } - - inbounds[inbound_index].Settings = string(modifiedSettings) + newClients = append(newClients, interface{}(c)) } - } - - for start := 0; start < len(inbounds); start += safeSaveBatchSize { - end := start + safeSaveBatchSize - if end > len(inbounds) { - end = len(inbounds) - } - - err := tx.Save(inbounds[start:end]).Error + settings["clients"] = newClients + modifiedSettings, err := json.MarshalIndent(settings, "", " ") if err != nil { - logger.Warning("AddClientTraffic update inbounds ", err) - logger.Error(inbounds[start:end]) - break + return err } + + inbounds[inbound_index].Settings = string(modifiedSettings) } } - return dbClientTraffics, nil + for i := 0; i < len(inbounds); i += safeBatchSize { + end := i + safeBatchSize + if end > len(inbounds) { + end = len(inbounds) + } + + err := tx.Save(inbounds[i:end]).Error + if err != nil { + logger.Warning("AddClientTraffic update inbounds ", err) + break + } + } + + return nil } func (s *InboundService) autoRenewClients(tx *gorm.DB) (bool, int64, error) {