test and fix bugs found by cline
All checks were successful
build container / build-container (push) Successful in 28m57s
run go test / test (push) Successful in 25m34s

This commit is contained in:
2025-06-10 14:22:43 +08:00
parent 2a0bd28958
commit 147659b0da
4 changed files with 1350 additions and 31 deletions

View File

@ -25,12 +25,17 @@ const (
var zeroTime time.Time
var preclosedChan = make(chan struct{})
func init() {
close(preclosedChan)
}
var (
httpClient = http.Client{
// check allowed redirect
CheckRedirect: func(req *http.Request, via []*http.Request) error {
lastRequest := via[len(via)-1]
if allowedRedirect, ok := lastRequest.Context().Value(reqCtxAllowedRedirect).(string); ok {
if allowedRedirect, ok := req.Context().Value(reqCtxAllowedRedirect).(string); ok {
if matched, err := regexp.MatchString(allowedRedirect, req.URL.String()); err != nil {
return err
} else if !matched {
@ -147,7 +152,7 @@ func (server *Server) HandleRequestWithCache(w http.ResponseWriter, r *http.Requ
} else if localStatus != localNotExists {
if localStatus == localExistsButNeedHead {
if ranged {
server.streamOnline(nil, r, mtime, fullpath)
<-server.streamOnline(nil, r, mtime, fullpath)
server.serveFile(w, r, fullpath)
} else {
server.streamOnline(w, r, mtime, fullpath)
@ -157,7 +162,7 @@ func (server *Server) HandleRequestWithCache(w http.ResponseWriter, r *http.Requ
}
} else {
if ranged {
server.streamOnline(nil, r, mtime, fullpath)
<-server.streamOnline(nil, r, mtime, fullpath)
server.serveFile(w, r, fullpath)
} else {
server.streamOnline(w, r, mtime, fullpath)
@ -209,7 +214,7 @@ func (server *Server) checkLocal(w http.ResponseWriter, _ *http.Request, key str
return localNotExists, zeroTime, nil
}
func (server *Server) streamOnline(w http.ResponseWriter, r *http.Request, mtime time.Time, key string) {
func (server *Server) streamOnline(w http.ResponseWriter, r *http.Request, mtime time.Time, key string) <-chan struct{} {
memoryObject, exists := server.o[r.URL.Path]
locked := false
defer func() {
@ -241,32 +246,39 @@ func (server *Server) streamOnline(w http.ResponseWriter, r *http.Request, mtime
slog.With("error", err).Warn("failed to stream response with existing memory object")
}
}
return preclosedChan
} else {
slog.With("mtime", mtime).Debug("checking fastest upstream")
selectedIdx, response, chunks, err := server.fastesUpstream(r, mtime)
selectedIdx, response, chunks, err := server.fastestUpstream(r, mtime)
if chunks == nil && mtime != zeroTime {
slog.With("upstreamIdx", selectedIdx, "key", key).Debug("not modified. using local version")
if w != nil {
server.serveFile(w, r, key)
}
return
return preclosedChan
}
if err != nil {
slog.With("error", err).Warn("failed to select fastest upstream")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
if w != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
return preclosedChan
}
if selectedIdx == -1 || response == nil || chunks == nil {
slog.Debug("no upstream is selected")
http.NotFound(w, r)
return
if w != nil {
http.NotFound(w, r)
}
return preclosedChan
}
if response.StatusCode == http.StatusNotModified {
slog.With("upstreamIdx", selectedIdx).Debug("not modified. using local version")
os.Chtimes(key, zeroTime, time.Now())
server.serveFile(w, r, key)
return
if w != nil {
server.serveFile(w, r, key)
}
return preclosedChan
}
slog.With(
@ -337,26 +349,28 @@ func (server *Server) streamOnline(w http.ResponseWriter, r *http.Request, mtime
hijacker, ok := w.(http.Hijacker)
if !ok {
logger.Warn("response writer is not a hijacker. failed to set lingering")
return
return preclosedChan
}
conn, _, err := hijacker.Hijack()
if err != nil {
logger.With("error", err).Warn("hijack failed. failed to set lingering")
return
return preclosedChan
}
defer conn.Close()
tcpConn, ok := conn.(*net.TCPConn)
if !ok {
logger.With("error", err).Warn("connection is not a *net.TCPConn. failed to set lingering")
return
return preclosedChan
}
if err := tcpConn.SetLinger(0); err != nil {
logger.With("error", err).Warn("failed to set lingering")
return
return preclosedChan
}
logger.Debug("connection set to linger. it will be reset once the conn.Close is called")
}
fileWrittenCh := make(chan struct{})
go func() {
defer func() {
server.lu.Lock()
@ -364,6 +378,7 @@ func (server *Server) streamOnline(w http.ResponseWriter, r *http.Request, mtime
delete(server.o, r.URL.Path)
slog.Debug("memory object released")
close(fileWrittenCh)
}()
if err == nil {
@ -428,10 +443,11 @@ func (server *Server) streamOnline(w http.ResponseWriter, r *http.Request, mtime
}
}()
return fileWrittenCh
}
}
func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (resultIdx int, resultResponse *http.Response, resultCh chan Chunk, resultErr error) {
func (server *Server) fastestUpstream(r *http.Request, lastModified time.Time) (resultIdx int, resultResponse *http.Response, resultCh chan Chunk, resultErr error) {
returnLock := &sync.Mutex{}
upstreams := len(server.Upstreams)
cancelFuncs := make([]func(), upstreams)
@ -619,7 +635,7 @@ func (server *Server) tryUpstream(ctx context.Context, upstreamIdx, priority int
}
streaming := false
defer func() {
if !streaming {
if !streaming && response != nil {
response.Body.Close()
}
}()
@ -684,6 +700,7 @@ func (server *Server) tryUpstream(ctx context.Context, upstreamIdx, priority int
streaming = true
go func() {
defer close(ch)
defer response.Body.Close()
for {
buffer := make([]byte, server.Misc.ChunkBytes)