add priority option in case some tool has inconsistent issue

This commit is contained in:
guochao 2025-02-06 11:05:18 +08:00
parent f3d5eb9fbd
commit 557a60f5de
3 changed files with 127 additions and 74 deletions

View File

@ -5,6 +5,11 @@ import (
"time" "time"
) )
type UpstreamPriorityGroup struct {
Match string `yaml:"match"`
Priority int `yaml:"priority"`
}
type UpstreamMatch struct { type UpstreamMatch struct {
Match string `yaml:"match"` Match string `yaml:"match"`
Replace string `yaml:"replace"` Replace string `yaml:"replace"`
@ -14,6 +19,7 @@ type Upstream struct {
Server string `yaml:"server"` Server string `yaml:"server"`
Match UpstreamMatch `yaml:"match"` Match UpstreamMatch `yaml:"match"`
AllowedRedirect *string `yaml:"allowed-redirect"` AllowedRedirect *string `yaml:"allowed-redirect"`
PriorityGroups []UpstreamPriorityGroup `yaml:"priority-groups"`
} }
func (upstream Upstream) GetPath(orig string) (string, bool, error) { func (upstream Upstream) GetPath(orig string) (string, bool, error) {

View File

@ -19,6 +19,9 @@ upstream:
match: match:
match: /ubuntu/(.*) match: /ubuntu/(.*)
replace: /$1 replace: /$1
# priority-groups:
# - match: /dists/.*
# priority: 10
- server: https://releases.ubuntu.com/ - server: https://releases.ubuntu.com/
match: match:
match: /ubuntu-releases/(.*) match: /ubuntu-releases/(.*)
@ -51,10 +54,6 @@ upstream:
match: match:
match: /alpine/(.*) match: /alpine/(.*)
replace: /$1 replace: /$1
- server: https://deb.debian.org/debian
match:
match: /debian/(.*)
replace: /$1
- server: https://repo.almalinux.org - server: https://repo.almalinux.org
match: match:
match: /almalinux/(.*) match: /almalinux/(.*)
@ -71,6 +70,9 @@ upstream:
match: match:
match: /(debian|debian-security)/(.*) match: /(debian|debian-security)/(.*)
replace: /$1/$2 replace: /$1/$2
# priority-groups:
# - match: /dists/.*
# priority: 10
- server: https://static.rust-lang.org - server: https://static.rust-lang.org
match: match:
match: /rust-static/(.*) match: /rust-static/(.*)

View File

@ -80,12 +80,6 @@ OUTER:
offset += written offset += written
} }
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
slog.With(
"start", offset,
"end", memoryObject.Buffer.Len(),
"n", memoryObject.Buffer.Len()-offset,
).Debug("remain bytes")
_, err := w.Write(memoryObject.Buffer.Bytes()[offset:]) _, err := w.Write(memoryObject.Buffer.Bytes()[offset:])
return err return err
} }
@ -312,8 +306,10 @@ func (server *Server) streamOnline(w http.ResponseWriter, r *http.Request, mtime
for chunk := range chunks { for chunk := range chunks {
if chunk.error != nil { if chunk.error != nil {
err = chunk.error err = chunk.error
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
slog.With("error", err).Warn("failed to read from upstream") slog.With("error", err).Warn("failed to read from upstream")
} }
}
if chunk.buffer == nil { if chunk.buffer == nil {
break break
} }
@ -423,9 +419,6 @@ func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (r
resultIdx = -1 resultIdx = -1
wg := &sync.WaitGroup{}
wg.Add(len(server.Upstreams))
defer close(updateCh) defer close(updateCh)
defer close(notModifiedCh) defer close(notModifiedCh)
defer func() { defer func() {
@ -436,16 +429,55 @@ func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (r
cancel() cancel()
} }
}() }()
for idx := range server.Upstreams {
groups := make(map[int][]int)
for upstreamIdx, upstream := range server.Upstreams {
if _, matched, err := upstream.GetPath(r.URL.Path); err != nil {
return -1, nil, nil, err
} else if !matched {
continue
}
priority := 0
for _, priorityGroup := range upstream.PriorityGroups {
if matched, err := regexp.MatchString(priorityGroup.Match, r.URL.Path); err != nil {
return -1, nil, nil, err
} else if matched {
priority = priorityGroup.Priority
break
}
}
groups[priority] = append(groups[priority], upstreamIdx)
}
priorities := make([]int, 0, len(groups))
for priority := range groups {
priorities = append(priorities, priority)
}
slices.Sort(priorities)
slices.Reverse(priorities)
for _, priority := range priorities {
upstreams := groups[priority]
wg := &sync.WaitGroup{}
wg.Add(len(upstreams))
logger := slog.With()
if priority != 0 {
logger = logger.With("priority", priority)
}
for _, idx := range upstreams {
idx := idx idx := idx
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
cancelFuncs[idx] = cancel cancelFuncs[idx] = cancel
logger := slog.With("upstreamIdx", idx) logger := logger.With("upstreamIdx", idx)
go func() { go func() {
defer wg.Done() defer wg.Done()
response, ch, err := server.tryUpstream(ctx, idx, r, lastModified) response, ch, err := server.tryUpstream(ctx, idx, priority, r, lastModified)
if err == context.Canceled { // others returned if err == context.Canceled { // others returned
logger.Debug("context canceled") logger.Debug("context canceled")
return return
@ -481,6 +513,13 @@ func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (r
updateOnce.Do(func() { updateOnce.Do(func() {
resultResponse, resultCh, resultErr = response, ch, err resultResponse, resultCh, resultErr = response, ch, err
updateCh <- idx updateCh <- idx
for cancelIdx, cancel := range cancelFuncs {
if cancelIdx == resultIdx || cancel == nil {
continue
}
cancel()
}
}) })
logger.Debug("voted update") logger.Debug("voted update")
@ -492,21 +531,24 @@ func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (r
select { select {
case idx := <-updateCh: case idx := <-updateCh:
resultIdx = idx resultIdx = idx
slog.With("upstreamIdx", resultIdx).Debug("upstream selected") logger.With("upstreamIdx", resultIdx).Debug("upstream selected")
return
default: default:
select { select {
case idx := <-notModifiedCh: case idx := <-notModifiedCh:
resultIdx = idx resultIdx = idx
slog.With("upstreamIdx", resultIdx).Debug("all upstream not modified") logger.With("upstreamIdx", resultIdx).Debug("all upstream not modified")
default:
slog.Debug("no valid upstream found")
}
}
return return
default:
logger.Debug("no valid upstream found")
}
}
} }
func (server *Server) tryUpstream(ctx context.Context, upstreamIdx int, r *http.Request, lastModified time.Time) (response *http.Response, chunks chan Chunk, err error) { return -1, nil, nil, nil
}
func (server *Server) tryUpstream(ctx context.Context, upstreamIdx, priority int, r *http.Request, lastModified time.Time) (response *http.Response, chunks chan Chunk, err error) {
upstream := server.Upstreams[upstreamIdx] upstream := server.Upstreams[upstreamIdx]
newpath, matched, err := upstream.GetPath(r.URL.Path) newpath, matched, err := upstream.GetPath(r.URL.Path)
@ -517,6 +559,9 @@ func (server *Server) tryUpstream(ctx context.Context, upstreamIdx int, r *http.
return nil, nil, nil return nil, nil, nil
} }
logger := slog.With("upstreamIdx", upstreamIdx, "server", upstream.Server, "path", newpath) logger := slog.With("upstreamIdx", upstreamIdx, "server", upstream.Server, "path", newpath)
if priority != 0 {
logger = logger.With("priority", priority)
}
logger.With( logger.With(
"matched", matched, "matched", matched,