mirror of
https://github.com/MHSanaei/3x-ui.git
synced 2026-03-14 07:03:09 +00:00
revert group management (#2656)
* Revert "json post base path bug fixed (#2647)" This reverts commit04cf250a54. * Revert "Group Management of Subscription Clients" * Revert "fix getSubGroupClients for enable/disable and edit clients." * Revert "Enhance database initialization in db.go (#2645)" This reverts commit66fe84181b. * Revert "Add checkpoint handling in CloseDB function (#2646)" This reverts commit4dd40f6f19. * Revert "Improved database model migration and added indexing (#2655)" This reverts commitb922d986d6.
This commit is contained in:
@@ -1,178 +0,0 @@
|
||||
package job
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"x-ui/database"
|
||||
"x-ui/database/model"
|
||||
"x-ui/logger"
|
||||
"x-ui/xray"
|
||||
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type SyncClientTrafficJob struct {
|
||||
subClientsCollection map[string][]string
|
||||
}
|
||||
|
||||
func NewClientTrafficSyncJob() *SyncClientTrafficJob {
|
||||
return new(SyncClientTrafficJob)
|
||||
}
|
||||
func (j *SyncClientTrafficJob) Run() {
|
||||
// Step 1: Group clients by SubID
|
||||
subClientsCollection := j.collectClientsGroupedBySubId()
|
||||
|
||||
// Step 2: Sync client traffics for each SubID group
|
||||
for subId, emails := range subClientsCollection {
|
||||
err := j.syncClientTraffics(map[string][]string{subId: emails})
|
||||
if err != nil {
|
||||
logger.Error("Failed to sync traffics for SubID ", subId, ": ", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// collectClientsGroupedBySubId groups clients by their SubIDs
|
||||
func (j *SyncClientTrafficJob) collectClientsGroupedBySubId() map[string][]string {
|
||||
db := database.GetDB()
|
||||
result := make(map[string][]string)
|
||||
|
||||
// Fetch all inbounds
|
||||
var inbounds []*model.Inbound
|
||||
if err := db.Model(&model.Inbound{}).Find(&inbounds).Error; err != nil {
|
||||
logger.Error("Error fetching inbounds: ", err)
|
||||
return result // Return empty map on error
|
||||
}
|
||||
|
||||
// Process each inbound
|
||||
for _, inbound := range inbounds {
|
||||
if inbound.Settings == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
settingsMap, err := parseSettings(inbound.Settings, uint(inbound.Id))
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
continue
|
||||
}
|
||||
|
||||
clients, ok := settingsMap["clients"].([]interface{})
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
processClients(clients, result)
|
||||
}
|
||||
|
||||
// Remove SubIDs with one or fewer emails
|
||||
filterSingleEmailSubIDs(result)
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// parseSettings unmarshals the JSON settings and returns it as a map
|
||||
func parseSettings(settings string, inboundID uint) (map[string]interface{}, error) {
|
||||
if !json.Valid([]byte(settings)) {
|
||||
return nil, fmt.Errorf("Invalid JSON format in Settings for inbound ID %d", inboundID)
|
||||
}
|
||||
|
||||
var tempData map[string]interface{}
|
||||
if err := json.Unmarshal([]byte(settings), &tempData); err != nil {
|
||||
return nil, fmt.Errorf("Error unmarshalling settings for inbound ID %d: %v", inboundID, err)
|
||||
}
|
||||
|
||||
return tempData, nil
|
||||
}
|
||||
|
||||
// processClients extracts SubID and email from the clients and populates the result map
|
||||
func processClients(clients []interface{}, result map[string][]string) {
|
||||
for _, client := range clients {
|
||||
clientMap, ok := client.(map[string]interface{})
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
subId, ok := clientMap["subId"].(string)
|
||||
if !ok || subId == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
email, ok := clientMap["email"].(string)
|
||||
if !ok || email == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
result[subId] = append(result[subId], email)
|
||||
}
|
||||
}
|
||||
|
||||
// filterSingleEmailSubIDs removes SubIDs with one or fewer emails from the result map
|
||||
func filterSingleEmailSubIDs(result map[string][]string) {
|
||||
for subId, emails := range result {
|
||||
if len(emails) <= 1 {
|
||||
delete(result, subId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// syncClientTraffics synchronizes traffic data for each SubID group
|
||||
func (j *SyncClientTrafficJob) syncClientTraffics(result map[string][]string) error {
|
||||
for subId, emails := range result {
|
||||
db := database.GetDB()
|
||||
|
||||
// Step 1: Calculate maxUp and maxDown (outside transaction)
|
||||
var maxUp, maxDown int64
|
||||
err := calculateMaxTraffic(db, emails, &maxUp, &maxDown)
|
||||
if err != nil {
|
||||
logger.Error("Failed to calculate max traffic for SubID ", subId, ": ", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Step 2: Update traffic data with retry mechanism
|
||||
err = retryOperation(func() error {
|
||||
return updateTraffic(db, emails, maxUp, maxDown)
|
||||
}, 5, 100*time.Millisecond)
|
||||
|
||||
if err != nil {
|
||||
logger.Error("Failed to update client traffics for SubID ", subId, ": ", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// calculateMaxTraffic calculates max up and down traffic for a group of emails
|
||||
func calculateMaxTraffic(db *gorm.DB, emails []string, maxUp, maxDown *int64) error {
|
||||
return db.Model(&xray.ClientTraffic{}).
|
||||
Where("email IN ?", emails).
|
||||
Select("MAX(up) AS max_up, MAX(down) AS max_down").
|
||||
Row().
|
||||
Scan(maxUp, maxDown)
|
||||
}
|
||||
|
||||
// updateTraffic updates the traffic data in the database within a transaction
|
||||
func updateTraffic(db *gorm.DB, emails []string, maxUp, maxDown int64) error {
|
||||
return db.Transaction(func(tx *gorm.DB) error {
|
||||
return tx.Model(&xray.ClientTraffic{}).
|
||||
Where("email IN ?", emails).
|
||||
Updates(map[string]interface{}{
|
||||
"up": maxUp,
|
||||
"down": maxDown,
|
||||
}).Error
|
||||
})
|
||||
}
|
||||
|
||||
// retryOperation retries an operation multiple times with a delay
|
||||
func retryOperation(operation func() error, maxRetries int, delay time.Duration) error {
|
||||
var err error
|
||||
for i := 0; i < maxRetries; i++ {
|
||||
err = operation()
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
logger.Info(fmt.Sprintf("Retry %d/%d failed: %v", i+1, maxRetries, err))
|
||||
time.Sleep(delay)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user