improve expiration delay start

This commit is contained in:
Alireza Ahmadi
2026-02-27 00:30:50 +01:00
parent bcea56283f
commit 95afd3006a

View File

@@ -20,10 +20,7 @@ type InboundService struct {
} }
const ( const (
// Keep query variables below SQLite's classic 999 limit. safeBatchSize = 500
safeSQLVariablesPerQuery = 900
// Save in small chunks so row-column placeholders stay under SQL var limits.
safeSaveBatchSize = 50
) )
func (s *InboundService) GetInbounds(userId int) ([]*model.Inbound, error) { func (s *InboundService) GetInbounds(userId int) ([]*model.Inbound, error) {
@@ -796,6 +793,11 @@ func (s *InboundService) addInboundTraffic(tx *gorm.DB, traffics []*xray.Traffic
return nil return nil
} }
type newExpiryTime struct {
Email string
NewExpiryTime int64
}
func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTraffic) (err error) { func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTraffic) (err error) {
if len(traffics) == 0 { if len(traffics) == 0 {
// Empty onlineUsers // Empty onlineUsers
@@ -812,14 +814,14 @@ func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTr
emails = append(emails, traffic.Email) emails = append(emails, traffic.Email)
} }
dbClientTraffics := make([]*xray.ClientTraffic, 0, len(traffics)) dbClientTraffics := make([]*xray.ClientTraffic, 0, len(traffics))
for start := 0; start < len(emails); start += safeSQLVariablesPerQuery { for i := 0; i < len(emails); i += safeBatchSize {
end := start + safeSQLVariablesPerQuery end := i + safeBatchSize
if end > len(emails) { if end > len(emails) {
end = len(emails) end = len(emails)
} }
batchClientTraffics := make([]*xray.ClientTraffic, 0, end-start) batchClientTraffics := make([]*xray.ClientTraffic, 0, end-i)
err = tx.Model(xray.ClientTraffic{}).Where("email IN ?", emails[start:end]).Find(&batchClientTraffics).Error err = tx.Model(xray.ClientTraffic{}).Where("email IN ?", emails[i:end]).Find(&batchClientTraffics).Error
if err != nil { if err != nil {
return err return err
} }
@@ -831,7 +833,20 @@ func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTr
return nil return nil
} }
dbClientTraffics, err = s.adjustTraffics(tx, dbClientTraffics) inboundExpiryTimeMap := make(map[int][]newExpiryTime, 0)
for index, t := range dbClientTraffics {
if t.ExpiryTime < 0 {
newClientExpiryTime := (time.Now().Unix() * 1000) - int64(t.ExpiryTime)
newExpiryTime := newExpiryTime{
Email: t.Email,
NewExpiryTime: newClientExpiryTime,
}
inboundExpiryTimeMap[t.InboundId] = append(inboundExpiryTimeMap[t.InboundId], newExpiryTime)
dbClientTraffics[index].ExpiryTime = newClientExpiryTime
}
}
err = s.adjustTraffics(tx, inboundExpiryTimeMap)
if err != nil { if err != nil {
return err return err
} }
@@ -856,13 +871,13 @@ func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTr
p.SetOnlineClients(onlineClients) p.SetOnlineClients(onlineClients)
} }
for start := 0; start < len(dbClientTraffics); start += safeSaveBatchSize { for i := 0; i < len(dbClientTraffics); i += safeBatchSize {
end := start + safeSaveBatchSize end := i + safeBatchSize
if end > len(dbClientTraffics) { if end > len(dbClientTraffics) {
end = len(dbClientTraffics) end = len(dbClientTraffics)
} }
err = tx.Save(dbClientTraffics[start:end]).Error err = tx.Save(dbClientTraffics[i:end]).Error
if err != nil { if err != nil {
logger.Warning("AddClientTraffic update data ", err) logger.Warning("AddClientTraffic update data ", err)
return nil return nil
@@ -872,79 +887,70 @@ func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTr
return nil return nil
} }
func (s *InboundService) adjustTraffics(tx *gorm.DB, dbClientTraffics []*xray.ClientTraffic) ([]*xray.ClientTraffic, error) { func (s *InboundService) adjustTraffics(tx *gorm.DB, inboundExpiryTimeMap map[int][]newExpiryTime) error {
inboundIds := make([]int, 0, len(dbClientTraffics)) if len(inboundExpiryTimeMap) == 0 {
seenInboundIds := make(map[int]struct{}, len(dbClientTraffics)) return nil
for _, dbClientTraffic := range dbClientTraffics {
if dbClientTraffic.ExpiryTime < 0 {
if _, seen := seenInboundIds[dbClientTraffic.InboundId]; seen {
continue
}
inboundIds = append(inboundIds, dbClientTraffic.InboundId)
seenInboundIds[dbClientTraffic.InboundId] = struct{}{}
}
} }
if len(inboundIds) > 0 { inboundIds := make([]int, 0)
inbounds := make([]*model.Inbound, 0, len(inboundIds)) for inId := range inboundExpiryTimeMap {
for start := 0; start < len(inboundIds); start += safeSQLVariablesPerQuery { inboundIds = append(inboundIds, inId)
end := start + safeSQLVariablesPerQuery }
if end > len(inboundIds) { inbounds := make([]*model.Inbound, 0, len(inboundIds))
end = len(inboundIds) for i := 0; i < len(inboundIds); i += safeBatchSize {
} end := i + safeBatchSize
if end > len(inboundIds) {
batchInbounds := make([]*model.Inbound, 0, end-start) end = len(inboundIds)
err := tx.Model(model.Inbound{}).Where("id IN ?", inboundIds[start:end]).Find(&batchInbounds).Error
if err != nil {
return nil, err
}
inbounds = append(inbounds, batchInbounds...)
} }
for inbound_index := range inbounds {
settings := map[string]interface{}{} batchInbounds := make([]*model.Inbound, 0, end-i)
json.Unmarshal([]byte(inbounds[inbound_index].Settings), &settings) err := tx.Model(model.Inbound{}).Where("id IN ?", inboundIds[i:end]).Find(&batchInbounds).Error
clients, ok := settings["clients"].([]interface{}) if err != nil {
if ok { return err
var newClients []interface{} }
for client_index := range clients { inbounds = append(inbounds, batchInbounds...)
c := clients[client_index].(map[string]interface{}) }
for traffic_index := range dbClientTraffics { for inbound_index := range inbounds {
if dbClientTraffics[traffic_index].ExpiryTime < 0 && c["email"] == dbClientTraffics[traffic_index].Email { settings := map[string]interface{}{}
oldExpiryTime := c["expiryTime"].(float64) json.Unmarshal([]byte(inbounds[inbound_index].Settings), &settings)
newExpiryTime := (time.Now().Unix() * 1000) - int64(oldExpiryTime) clients, ok := settings["clients"].([]interface{})
c["expiryTime"] = newExpiryTime inbEmails := inboundExpiryTimeMap[inbounds[inbound_index].Id]
dbClientTraffics[traffic_index].ExpiryTime = newExpiryTime if ok {
break var newClients []interface{}
} for client_index := range clients {
c := clients[client_index].(map[string]interface{})
for index := range inbEmails {
if c["email"] == inbEmails[index].Email {
c["expiryTime"] = inbEmails[index].NewExpiryTime
break
} }
newClients = append(newClients, interface{}(c))
} }
settings["clients"] = newClients newClients = append(newClients, interface{}(c))
modifiedSettings, err := json.MarshalIndent(settings, "", " ")
if err != nil {
return nil, err
}
inbounds[inbound_index].Settings = string(modifiedSettings)
} }
} settings["clients"] = newClients
modifiedSettings, err := json.MarshalIndent(settings, "", " ")
for start := 0; start < len(inbounds); start += safeSaveBatchSize {
end := start + safeSaveBatchSize
if end > len(inbounds) {
end = len(inbounds)
}
err := tx.Save(inbounds[start:end]).Error
if err != nil { if err != nil {
logger.Warning("AddClientTraffic update inbounds ", err) return err
logger.Error(inbounds[start:end])
break
} }
inbounds[inbound_index].Settings = string(modifiedSettings)
} }
} }
return dbClientTraffics, nil for i := 0; i < len(inbounds); i += safeBatchSize {
end := i + safeBatchSize
if end > len(inbounds) {
end = len(inbounds)
}
err := tx.Save(inbounds[i:end]).Error
if err != nil {
logger.Warning("AddClientTraffic update inbounds ", err)
break
}
}
return nil
} }
func (s *InboundService) autoRenewClients(tx *gorm.DB) (bool, int64, error) { func (s *InboundService) autoRenewClients(tx *gorm.DB) (bool, int64, error) {