From 8deffb1b7c1e7198e42a7d20583bf8bdbdac05c0 Mon Sep 17 00:00:00 2001 From: Hossin Asaadi Date: Fri, 11 Nov 2022 09:20:51 -0500 Subject: [PATCH] enable store each Email Traffic --- database/model/model.go | 7 +---- web/job/xray_traffic_job.go | 14 ++++++++++ web/service/inbound.go | 43 +++++++++++++++++++++++++++++ web/service/xray.go | 6 +++++ xray/process.go | 54 +++++++++++++++++++++++++++++++++++++ 5 files changed, 118 insertions(+), 6 deletions(-) diff --git a/database/model/model.go b/database/model/model.go index e392f0be..158e33ef 100644 --- a/database/model/model.go +++ b/database/model/model.go @@ -78,9 +78,4 @@ type Client struct { Security string `json:"security"` Total int64 `json:"total" form:"total"` ExpiryTime int64 `json:"expiryTime" form:"expiryTime"` -} -type ClientStats struct { - Email string `json:"email"` - Up int64 `json:"up" form:"up"` - Down int64 `json:"down" form:"down"` -} +} \ No newline at end of file diff --git a/web/job/xray_traffic_job.go b/web/job/xray_traffic_job.go index 22e1bb7b..1d1012d2 100644 --- a/web/job/xray_traffic_job.go +++ b/web/job/xray_traffic_job.go @@ -27,4 +27,18 @@ func (j *XrayTrafficJob) Run() { if err != nil { logger.Warning("add traffic failed:", err) } + + + // get Client Traffic + + clientTraffics, err := j.xrayService.GetXrayClientTraffic() + if err != nil { + logger.Warning("get xray client traffic failed:", err) + return + } + err = j.inboundService.AddClientTraffic(clientTraffics) + if err != nil { + logger.Warning("add client traffic failed:", err) + } + } diff --git a/web/service/inbound.go b/web/service/inbound.go index d5a685a8..7baaf096 100644 --- a/web/service/inbound.go +++ b/web/service/inbound.go @@ -3,6 +3,7 @@ package service import ( "fmt" "time" + "encoding/json" "x-ui/database" "x-ui/database/model" "x-ui/util/common" @@ -166,6 +167,48 @@ func (s *InboundService) AddTraffic(traffics []*xray.Traffic) (err error) { } return } +func (s *InboundService) AddClientTraffic(traffics []*xray.ClientTraffic) (err error) { + if len(traffics) == 0 { + return nil + } + db := database.GetDB() + db = db.Model(model.Inbound{}) + tx := db.Begin() + defer func() { + if err != nil { + tx.Rollback() + } else { + tx.Commit() + } + }() + for _, traffic := range traffics { + inbound := &model.Inbound{} + + err := tx.Where("settings like ?", "%" + traffic.Email + "%").First(inbound).Error + clientStats := map[string]*xray.ClientTraffic{} + json.Unmarshal([]byte(inbound.ClientStats), &clientStats) + + if _, ok := clientStats[traffic.Email]; ok { + clientStats[traffic.Email].Up = clientStats[traffic.Email].Up + traffic.Up + clientStats[traffic.Email].Down = clientStats[traffic.Email].Down + traffic.Down + }else{ + clientStats[traffic.Email] = traffic + } + jsonClientStats, err := json.Marshal(clientStats) + + // if clientStats[traffic.Email] + err = tx.Where("settings like ?", "%" + traffic.Email + "%"). + Update("client_stats", jsonClientStats). + Error + + + if err != nil { + return err + } + + } + return +} func (s *InboundService) DisableInvalidInbounds() (int64, error) { db := database.GetDB() diff --git a/web/service/xray.go b/web/service/xray.go index 7cfb909c..d33a7924 100644 --- a/web/service/xray.go +++ b/web/service/xray.go @@ -84,6 +84,12 @@ func (s *XrayService) GetXrayTraffic() ([]*xray.Traffic, error) { } return p.GetTraffic(true) } +func (s *XrayService) GetXrayClientTraffic() ([]*xray.ClientTraffic, error) { + if !s.IsXrayRunning() { + return nil, errors.New("xray is not running") + } + return p.GetClientTraffic(true) +} func (s *XrayService) RestartXray(isForce bool) error { lock.Lock() diff --git a/xray/process.go b/xray/process.go index 7d0d2d05..16d8a270 100644 --- a/xray/process.go +++ b/xray/process.go @@ -22,6 +22,7 @@ import ( ) var trafficRegex = regexp.MustCompile("(inbound|outbound)>>>([^>]+)>>>traffic>>>(downlink|uplink)") +var ClientTrafficRegex = regexp.MustCompile("(user)>>>([^>]+)>>>traffic>>>(downlink|uplink)") func GetBinaryName() string { return fmt.Sprintf("xray-%s-%s", runtime.GOOS, runtime.GOARCH) @@ -253,6 +254,9 @@ func (p *process) GetTraffic(reset bool) ([]*Traffic, error) { traffics := make([]*Traffic, 0) for _, stat := range resp.GetStat() { matchs := trafficRegex.FindStringSubmatch(stat.Name) + if len(matchs) < 3 { + continue + } isInbound := matchs[1] == "inbound" tag := matchs[2] isDown := matchs[3] == "downlink" @@ -277,3 +281,53 @@ func (p *process) GetTraffic(reset bool) ([]*Traffic, error) { return traffics, nil } +func (p *process) GetClientTraffic(reset bool) ([]*ClientTraffic, error) { + if p.apiPort == 0 { + return nil, common.NewError("xray api port wrong:", p.apiPort) + } + conn, err := grpc.Dial(fmt.Sprintf("127.0.0.1:%v", p.apiPort), grpc.WithInsecure()) + if err != nil { + return nil, err + } + defer conn.Close() + + client := statsservice.NewStatsServiceClient(conn) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + request := &statsservice.QueryStatsRequest{ + Reset_: reset, + } + resp, err := client.QueryStats(ctx, request) + if err != nil { + return nil, err + } + emailTrafficMap := map[string]*ClientTraffic{} + traffics := make([]*ClientTraffic, 0) + for _, stat := range resp.GetStat() { + matchs := ClientTrafficRegex.FindStringSubmatch(stat.Name) + if len(matchs) < 3 { + continue + } + isUser := matchs[1] == "user" + email := matchs[2] + isDown := matchs[3] == "downlink" + if ! isUser { + continue + } + traffic, ok := emailTrafficMap[email] + if !ok { + traffic = &ClientTraffic{ + Email: email, + } + emailTrafficMap[email] = traffic + traffics = append(traffics, traffic) + } + if isDown { + traffic.Down = stat.Value + } else { + traffic.Up = stat.Value + } + } + + return traffics, nil +}