From aa51b9fabac07d75251aca2e13890c87057070c9 Mon Sep 17 00:00:00 2001 From: wwqgtxx Date: Mon, 28 Apr 2025 10:28:45 +0800 Subject: [PATCH] chore: replace using internal batch package to x/sync/errgroup In the original batch implementation, the Go() method will always start a new goroutine and then wait for the concurrency limit, which is unnecessary for the current code. x/sync/errgroup will block Go() until the concurrency limit is met, which can effectively reduce memory usage. In addition, the original batch always saves the return value of Go(), but it is not used in the current code, which will also waste a lot of memory space in high concurrency scenarios. --- adapter/provider/healthcheck.go | 13 +++++++------ component/updater/update_geo.go | 30 +++++++----------------------- 2 files changed, 14 insertions(+), 29 deletions(-) diff --git a/adapter/provider/healthcheck.go b/adapter/provider/healthcheck.go index 8737ff96..2bddd8e7 100644 --- a/adapter/provider/healthcheck.go +++ b/adapter/provider/healthcheck.go @@ -7,13 +7,13 @@ import ( "time" "github.com/metacubex/mihomo/common/atomic" - "github.com/metacubex/mihomo/common/batch" "github.com/metacubex/mihomo/common/singledo" "github.com/metacubex/mihomo/common/utils" C "github.com/metacubex/mihomo/constant" "github.com/metacubex/mihomo/log" "github.com/dlclark/regexp2" + "golang.org/x/sync/errgroup" ) type HealthCheckOption struct { @@ -147,7 +147,8 @@ func (hc *HealthCheck) check() { _, _, _ = hc.singleDo.Do(func() (struct{}, error) { id := utils.NewUUIDV4().String() log.Debugln("Start New Health Checking {%s}", id) - b, _ := batch.New[bool](hc.ctx, batch.WithConcurrencyNum[bool](10)) + b := new(errgroup.Group) + b.SetLimit(10) // execute default health check option := &extraOption{filters: nil, expectedStatus: hc.expectedStatus} @@ -159,13 +160,13 @@ func (hc *HealthCheck) check() { hc.execute(b, url, id, option) } } - b.Wait() + _ = b.Wait() log.Debugln("Finish A Health Checking {%s}", id) return struct{}{}, nil }) } -func (hc *HealthCheck) execute(b *batch.Batch[bool], url, uid string, option *extraOption) { +func (hc *HealthCheck) execute(b *errgroup.Group, url, uid string, option *extraOption) { url = strings.TrimSpace(url) if len(url) == 0 { log.Debugln("Health Check has been skipped due to testUrl is empty, {%s}", uid) @@ -195,13 +196,13 @@ func (hc *HealthCheck) execute(b *batch.Batch[bool], url, uid string, option *ex } p := proxy - b.Go(p.Name(), func() (bool, error) { + b.Go(func() error { ctx, cancel := context.WithTimeout(hc.ctx, hc.timeout) defer cancel() log.Debugln("Health Checking, proxy: %s, url: %s, id: {%s}", p.Name(), url, uid) _, _ = p.URLTest(ctx, url, expectedStatus) log.Debugln("Health Checked, proxy: %s, url: %s, alive: %t, delay: %d ms uid: {%s}", p.Name(), url, p.AliveForTestUrl(url), p.LastDelayForTestUrl(url), uid) - return false, nil + return nil }) } } diff --git a/component/updater/update_geo.go b/component/updater/update_geo.go index 719a5215..0778087a 100644 --- a/component/updater/update_geo.go +++ b/component/updater/update_geo.go @@ -9,7 +9,6 @@ import ( "time" "github.com/metacubex/mihomo/common/atomic" - "github.com/metacubex/mihomo/common/batch" "github.com/metacubex/mihomo/common/utils" "github.com/metacubex/mihomo/component/geodata" _ "github.com/metacubex/mihomo/component/geodata/standard" @@ -19,6 +18,7 @@ import ( "github.com/metacubex/mihomo/log" "github.com/oschwald/maxminddb-golang" + "golang.org/x/sync/errgroup" ) var ( @@ -169,41 +169,25 @@ func UpdateGeoSite() (err error) { func updateGeoDatabases() error { defer runtime.GC() - b, _ := batch.New[interface{}](context.Background()) + b := errgroup.Group{} if geodata.GeoIpEnable() { if geodata.GeodataMode() { - b.Go("UpdateGeoIp", func() (_ interface{}, err error) { - err = UpdateGeoIp() - return - }) + b.Go(UpdateGeoIp) } else { - b.Go("UpdateMMDB", func() (_ interface{}, err error) { - err = UpdateMMDB() - return - }) + b.Go(UpdateMMDB) } } if geodata.ASNEnable() { - b.Go("UpdateASN", func() (_ interface{}, err error) { - err = UpdateASN() - return - }) + b.Go(UpdateASN) } if geodata.GeoSiteEnable() { - b.Go("UpdateGeoSite", func() (_ interface{}, err error) { - err = UpdateGeoSite() - return - }) + b.Go(UpdateGeoSite) } - if e := b.Wait(); e != nil { - return e.Err - } - - return nil + return b.Wait() } var ErrGetDatabaseUpdateSkip = errors.New("GEO database is updating, skip")