From 557a60f5de49f6b94107f2ff7442f44cbb3e08a4 Mon Sep 17 00:00:00 2001 From: guochao Date: Thu, 6 Feb 2025 11:05:18 +0800 Subject: [PATCH] add priority option in case some tool has inconsistent issue --- config.go | 12 +++- config.yaml | 10 +-- server.go | 179 ++++++++++++++++++++++++++++++++-------------------- 3 files changed, 127 insertions(+), 74 deletions(-) diff --git a/config.go b/config.go index c8ec41b..c6092de 100644 --- a/config.go +++ b/config.go @@ -5,15 +5,21 @@ import ( "time" ) +type UpstreamPriorityGroup struct { + Match string `yaml:"match"` + Priority int `yaml:"priority"` +} + type UpstreamMatch struct { Match string `yaml:"match"` Replace string `yaml:"replace"` } type Upstream struct { - Server string `yaml:"server"` - Match UpstreamMatch `yaml:"match"` - AllowedRedirect *string `yaml:"allowed-redirect"` + Server string `yaml:"server"` + Match UpstreamMatch `yaml:"match"` + AllowedRedirect *string `yaml:"allowed-redirect"` + PriorityGroups []UpstreamPriorityGroup `yaml:"priority-groups"` } func (upstream Upstream) GetPath(orig string) (string, bool, error) { diff --git a/config.yaml b/config.yaml index 52e24b7..d31b6e6 100644 --- a/config.yaml +++ b/config.yaml @@ -19,6 +19,9 @@ upstream: match: match: /ubuntu/(.*) replace: /$1 + # priority-groups: + # - match: /dists/.* + # priority: 10 - server: https://releases.ubuntu.com/ match: match: /ubuntu-releases/(.*) @@ -51,10 +54,6 @@ upstream: match: match: /alpine/(.*) replace: /$1 -- server: https://deb.debian.org/debian - match: - match: /debian/(.*) - replace: /$1 - server: https://repo.almalinux.org match: match: /almalinux/(.*) @@ -71,6 +70,9 @@ upstream: match: match: /(debian|debian-security)/(.*) replace: /$1/$2 + # priority-groups: + # - match: /dists/.* + # priority: 10 - server: https://static.rust-lang.org match: match: /rust-static/(.*) diff --git a/server.go b/server.go index 86b2658..c8b3412 100644 --- a/server.go +++ b/server.go @@ -80,12 +80,6 @@ OUTER: offset += written } time.Sleep(time.Millisecond) - slog.With( - "start", offset, - "end", memoryObject.Buffer.Len(), - "n", memoryObject.Buffer.Len()-offset, - ).Debug("remain bytes") - _, err := w.Write(memoryObject.Buffer.Bytes()[offset:]) return err } @@ -312,7 +306,9 @@ func (server *Server) streamOnline(w http.ResponseWriter, r *http.Request, mtime for chunk := range chunks { if chunk.error != nil { err = chunk.error - slog.With("error", err).Warn("failed to read from upstream") + if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) { + slog.With("error", err).Warn("failed to read from upstream") + } } if chunk.buffer == nil { break @@ -423,9 +419,6 @@ func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (r resultIdx = -1 - wg := &sync.WaitGroup{} - wg.Add(len(server.Upstreams)) - defer close(updateCh) defer close(notModifiedCh) defer func() { @@ -436,77 +429,126 @@ func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (r cancel() } }() - for idx := range server.Upstreams { - idx := idx - ctx, cancel := context.WithCancel(context.Background()) - cancelFuncs[idx] = cancel - logger := slog.With("upstreamIdx", idx) + groups := make(map[int][]int) + for upstreamIdx, upstream := range server.Upstreams { + if _, matched, err := upstream.GetPath(r.URL.Path); err != nil { + return -1, nil, nil, err + } else if !matched { + continue + } - go func() { - defer wg.Done() - response, ch, err := server.tryUpstream(ctx, idx, r, lastModified) - if err == context.Canceled { // others returned - logger.Debug("context canceled") - return + priority := 0 + for _, priorityGroup := range upstream.PriorityGroups { + if matched, err := regexp.MatchString(priorityGroup.Match, r.URL.Path); err != nil { + return -1, nil, nil, err + } else if matched { + priority = priorityGroup.Priority + break } - - if err != nil { - if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { - logger.With("error", err).Warn("upstream has error") - } - return - } - if response == nil { - return - } - if response.StatusCode != http.StatusOK && response.StatusCode != http.StatusNotModified { - return - } - locked := returnLock.TryLock() - if !locked { - return - } - defer returnLock.Unlock() - - if response.StatusCode == http.StatusNotModified { - notModifiedOnce.Do(func() { - resultResponse, resultCh, resultErr = response, ch, err - notModifiedCh <- idx - }) - logger.Debug("voted not modified") - return - } - - updateOnce.Do(func() { - resultResponse, resultCh, resultErr = response, ch, err - updateCh <- idx - }) - - logger.Debug("voted update") - }() + } + groups[priority] = append(groups[priority], upstreamIdx) } - wg.Wait() + priorities := make([]int, 0, len(groups)) + for priority := range groups { + priorities = append(priorities, priority) + } + slices.Sort(priorities) + slices.Reverse(priorities) + + for _, priority := range priorities { + upstreams := groups[priority] + + wg := &sync.WaitGroup{} + wg.Add(len(upstreams)) + + logger := slog.With() + if priority != 0 { + logger = logger.With("priority", priority) + } + + for _, idx := range upstreams { + idx := idx + ctx, cancel := context.WithCancel(context.Background()) + cancelFuncs[idx] = cancel + + logger := logger.With("upstreamIdx", idx) + + go func() { + defer wg.Done() + response, ch, err := server.tryUpstream(ctx, idx, priority, r, lastModified) + if err == context.Canceled { // others returned + logger.Debug("context canceled") + return + } + + if err != nil { + if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { + logger.With("error", err).Warn("upstream has error") + } + return + } + if response == nil { + return + } + if response.StatusCode != http.StatusOK && response.StatusCode != http.StatusNotModified { + return + } + locked := returnLock.TryLock() + if !locked { + return + } + defer returnLock.Unlock() + + if response.StatusCode == http.StatusNotModified { + notModifiedOnce.Do(func() { + resultResponse, resultCh, resultErr = response, ch, err + notModifiedCh <- idx + }) + logger.Debug("voted not modified") + return + } + + updateOnce.Do(func() { + resultResponse, resultCh, resultErr = response, ch, err + updateCh <- idx + + for cancelIdx, cancel := range cancelFuncs { + if cancelIdx == resultIdx || cancel == nil { + continue + } + cancel() + } + }) + + logger.Debug("voted update") + }() + } + + wg.Wait() - select { - case idx := <-updateCh: - resultIdx = idx - slog.With("upstreamIdx", resultIdx).Debug("upstream selected") - default: select { - case idx := <-notModifiedCh: + case idx := <-updateCh: resultIdx = idx - slog.With("upstreamIdx", resultIdx).Debug("all upstream not modified") + logger.With("upstreamIdx", resultIdx).Debug("upstream selected") + return default: - slog.Debug("no valid upstream found") + select { + case idx := <-notModifiedCh: + resultIdx = idx + logger.With("upstreamIdx", resultIdx).Debug("all upstream not modified") + return + default: + logger.Debug("no valid upstream found") + } } } - return + return -1, nil, nil, nil } -func (server *Server) tryUpstream(ctx context.Context, upstreamIdx int, r *http.Request, lastModified time.Time) (response *http.Response, chunks chan Chunk, err error) { +func (server *Server) tryUpstream(ctx context.Context, upstreamIdx, priority int, r *http.Request, lastModified time.Time) (response *http.Response, chunks chan Chunk, err error) { upstream := server.Upstreams[upstreamIdx] newpath, matched, err := upstream.GetPath(r.URL.Path) @@ -517,6 +559,9 @@ func (server *Server) tryUpstream(ctx context.Context, upstreamIdx int, r *http. return nil, nil, nil } logger := slog.With("upstreamIdx", upstreamIdx, "server", upstream.Server, "path", newpath) + if priority != 0 { + logger = logger.With("priority", priority) + } logger.With( "matched", matched,