diff --git a/web/service/inbound.go b/web/service/inbound.go index 02311cdb..b116514d 100644 --- a/web/service/inbound.go +++ b/web/service/inbound.go @@ -20,10 +20,7 @@ type InboundService struct { } const ( - // Keep query variables below SQLite's classic 999 limit. - safeSQLVariablesPerQuery = 900 - // Save in small chunks so row-column placeholders stay under SQL var limits. - safeSaveBatchSize = 50 + safeBatchSize = 500 ) func (s *InboundService) GetInbounds(userId int) ([]*model.Inbound, error) { @@ -796,6 +793,11 @@ func (s *InboundService) addInboundTraffic(tx *gorm.DB, traffics []*xray.Traffic return nil } +type newExpiryTime struct { + Email string + NewExpiryTime int64 +} + func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTraffic) (err error) { if len(traffics) == 0 { // Empty onlineUsers @@ -812,14 +814,14 @@ func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTr emails = append(emails, traffic.Email) } dbClientTraffics := make([]*xray.ClientTraffic, 0, len(traffics)) - for start := 0; start < len(emails); start += safeSQLVariablesPerQuery { - end := start + safeSQLVariablesPerQuery + for i := 0; i < len(emails); i += safeBatchSize { + end := i + safeBatchSize if end > len(emails) { end = len(emails) } - batchClientTraffics := make([]*xray.ClientTraffic, 0, end-start) - err = tx.Model(xray.ClientTraffic{}).Where("email IN ?", emails[start:end]).Find(&batchClientTraffics).Error + batchClientTraffics := make([]*xray.ClientTraffic, 0, end-i) + err = tx.Model(xray.ClientTraffic{}).Where("email IN ?", emails[i:end]).Find(&batchClientTraffics).Error if err != nil { return err } @@ -831,7 +833,20 @@ func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTr return nil } - dbClientTraffics, err = s.adjustTraffics(tx, dbClientTraffics) + inboundExpiryTimeMap := make(map[int][]newExpiryTime, 0) + for index, t := range dbClientTraffics { + if t.ExpiryTime < 0 { + newClientExpiryTime := (time.Now().Unix() * 1000) - int64(t.ExpiryTime) + newExpiryTime := newExpiryTime{ + Email: t.Email, + NewExpiryTime: newClientExpiryTime, + } + inboundExpiryTimeMap[t.InboundId] = append(inboundExpiryTimeMap[t.InboundId], newExpiryTime) + dbClientTraffics[index].ExpiryTime = newClientExpiryTime + } + } + + err = s.adjustTraffics(tx, inboundExpiryTimeMap) if err != nil { return err } @@ -856,13 +871,13 @@ func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTr p.SetOnlineClients(onlineClients) } - for start := 0; start < len(dbClientTraffics); start += safeSaveBatchSize { - end := start + safeSaveBatchSize + for i := 0; i < len(dbClientTraffics); i += safeBatchSize { + end := i + safeBatchSize if end > len(dbClientTraffics) { end = len(dbClientTraffics) } - err = tx.Save(dbClientTraffics[start:end]).Error + err = tx.Save(dbClientTraffics[i:end]).Error if err != nil { logger.Warning("AddClientTraffic update data ", err) return nil @@ -872,79 +887,70 @@ func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTr return nil } -func (s *InboundService) adjustTraffics(tx *gorm.DB, dbClientTraffics []*xray.ClientTraffic) ([]*xray.ClientTraffic, error) { - inboundIds := make([]int, 0, len(dbClientTraffics)) - seenInboundIds := make(map[int]struct{}, len(dbClientTraffics)) - for _, dbClientTraffic := range dbClientTraffics { - if dbClientTraffic.ExpiryTime < 0 { - if _, seen := seenInboundIds[dbClientTraffic.InboundId]; seen { - continue - } - inboundIds = append(inboundIds, dbClientTraffic.InboundId) - seenInboundIds[dbClientTraffic.InboundId] = struct{}{} - } +func (s *InboundService) adjustTraffics(tx *gorm.DB, inboundExpiryTimeMap map[int][]newExpiryTime) error { + if len(inboundExpiryTimeMap) == 0 { + return nil } - if len(inboundIds) > 0 { - inbounds := make([]*model.Inbound, 0, len(inboundIds)) - for start := 0; start < len(inboundIds); start += safeSQLVariablesPerQuery { - end := start + safeSQLVariablesPerQuery - if end > len(inboundIds) { - end = len(inboundIds) - } - - batchInbounds := make([]*model.Inbound, 0, end-start) - err := tx.Model(model.Inbound{}).Where("id IN ?", inboundIds[start:end]).Find(&batchInbounds).Error - if err != nil { - return nil, err - } - inbounds = append(inbounds, batchInbounds...) + inboundIds := make([]int, 0) + for inId := range inboundExpiryTimeMap { + inboundIds = append(inboundIds, inId) + } + inbounds := make([]*model.Inbound, 0, len(inboundIds)) + for i := 0; i < len(inboundIds); i += safeBatchSize { + end := i + safeBatchSize + if end > len(inboundIds) { + end = len(inboundIds) } - for inbound_index := range inbounds { - settings := map[string]interface{}{} - json.Unmarshal([]byte(inbounds[inbound_index].Settings), &settings) - clients, ok := settings["clients"].([]interface{}) - if ok { - var newClients []interface{} - for client_index := range clients { - c := clients[client_index].(map[string]interface{}) - for traffic_index := range dbClientTraffics { - if dbClientTraffics[traffic_index].ExpiryTime < 0 && c["email"] == dbClientTraffics[traffic_index].Email { - oldExpiryTime := c["expiryTime"].(float64) - newExpiryTime := (time.Now().Unix() * 1000) - int64(oldExpiryTime) - c["expiryTime"] = newExpiryTime - dbClientTraffics[traffic_index].ExpiryTime = newExpiryTime - break - } + + batchInbounds := make([]*model.Inbound, 0, end-i) + err := tx.Model(model.Inbound{}).Where("id IN ?", inboundIds[i:end]).Find(&batchInbounds).Error + if err != nil { + return err + } + inbounds = append(inbounds, batchInbounds...) + } + for inbound_index := range inbounds { + settings := map[string]interface{}{} + json.Unmarshal([]byte(inbounds[inbound_index].Settings), &settings) + clients, ok := settings["clients"].([]interface{}) + inbEmails := inboundExpiryTimeMap[inbounds[inbound_index].Id] + if ok { + var newClients []interface{} + for client_index := range clients { + c := clients[client_index].(map[string]interface{}) + for index := range inbEmails { + if c["email"] == inbEmails[index].Email { + c["expiryTime"] = inbEmails[index].NewExpiryTime + break } - newClients = append(newClients, interface{}(c)) } - settings["clients"] = newClients - modifiedSettings, err := json.MarshalIndent(settings, "", " ") - if err != nil { - return nil, err - } - - inbounds[inbound_index].Settings = string(modifiedSettings) + newClients = append(newClients, interface{}(c)) } - } - - for start := 0; start < len(inbounds); start += safeSaveBatchSize { - end := start + safeSaveBatchSize - if end > len(inbounds) { - end = len(inbounds) - } - - err := tx.Save(inbounds[start:end]).Error + settings["clients"] = newClients + modifiedSettings, err := json.MarshalIndent(settings, "", " ") if err != nil { - logger.Warning("AddClientTraffic update inbounds ", err) - logger.Error(inbounds[start:end]) - break + return err } + + inbounds[inbound_index].Settings = string(modifiedSettings) } } - return dbClientTraffics, nil + for i := 0; i < len(inbounds); i += safeBatchSize { + end := i + safeBatchSize + if end > len(inbounds) { + end = len(inbounds) + } + + err := tx.Save(inbounds[i:end]).Error + if err != nil { + logger.Warning("AddClientTraffic update inbounds ", err) + break + } + } + + return nil } func (s *InboundService) autoRenewClients(tx *gorm.DB) (bool, int64, error) {