This commit is contained in:
guochao 2024-12-18 14:17:59 +08:00
parent 2701cf802d
commit 055d113c80

View File

@ -88,7 +88,6 @@ OUTER:
for { for {
select { select {
case <-memoryObject.ctx.Done(): case <-memoryObject.ctx.Done():
logrus.Info("ctx done")
break OUTER break OUTER
default: default:
} }
@ -98,14 +97,11 @@ OUTER:
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
continue continue
} }
logrus.WithFields(logrus.Fields{"start": offset, "end": newOffset}).Info("writing")
bytes := memoryObject.Buffer.Bytes()[offset:newOffset] bytes := memoryObject.Buffer.Bytes()[offset:newOffset]
written, err := w.Write(bytes) written, err := w.Write(bytes)
if err != nil { if err != nil {
logrus.WithError(err).Info("write failed")
return err return err
} }
logrus.WithFields(logrus.Fields{"n": written}).Info("written")
offset += written offset += written
} }
@ -213,11 +209,10 @@ const (
func (server *Server) checkLocal(w http.ResponseWriter, _ *http.Request, key string) (exists localStatus, mtime time.Time, err error) { func (server *Server) checkLocal(w http.ResponseWriter, _ *http.Request, key string) (exists localStatus, mtime time.Time, err error) {
if stat, err := os.Stat(key); err == nil { if stat, err := os.Stat(key); err == nil {
logrus.Println(stat.ModTime(), stat.ModTime().Add(server.Cache.Timeout), time.Now())
if mtime := stat.ModTime(); mtime.Add(server.Cache.Timeout).Before(time.Now()) { if mtime := stat.ModTime(); mtime.Add(server.Cache.Timeout).Before(time.Now()) {
return localExistsButNeedHead, mtime.In(time.UTC), nil return localExistsButNeedHead, mtime.In(time.UTC), nil
} }
return localExists, zeroTime, nil return localExists, mtime.In(time.UTC), nil
} else if os.IsPermission(err) { } else if os.IsPermission(err) {
http.Error(w, err.Error(), http.StatusForbidden) http.Error(w, err.Error(), http.StatusForbidden)
} else if !os.IsNotExist(err) { } else if !os.IsNotExist(err) {
@ -255,7 +250,7 @@ func (server *Server) streamOnline(w http.ResponseWriter, r *http.Request, mtime
} }
if err := memoryObject.StreamTo(w, memoryObject.wg); err != nil { if err := memoryObject.StreamTo(w, memoryObject.wg); err != nil {
logrus.WithError(err).Warn("failed to stream response") logrus.WithError(err).Warn("failed to stream response with existing memory object")
} }
} }
} else { } else {
@ -317,8 +312,10 @@ func (server *Server) streamOnline(w http.ResponseWriter, r *http.Request, mtime
go memoryObject.StreamTo(w, memoryObject.wg) go memoryObject.StreamTo(w, memoryObject.wg)
} }
err = nil
for chunk := range chunks { for chunk := range chunks {
if chunk.error != nil { if chunk.error != nil {
err = chunk.error
logrus.WithError(err).Warn("failed to read from upstream") logrus.WithError(err).Warn("failed to read from upstream")
} }
if chunk.buffer == nil { if chunk.buffer == nil {
@ -332,6 +329,11 @@ func (server *Server) streamOnline(w http.ResponseWriter, r *http.Request, mtime
memoryObject.wg.Wait() memoryObject.wg.Wait()
if err != nil {
logrus.WithError(err).WithField("upstreamIdx", selectedIdx).Error("something happened during download. will not cache this response")
return
}
go func() { go func() {
logrus.Trace("preparing to release memory object") logrus.Trace("preparing to release memory object")
mtime := zeroTime mtime := zeroTime
@ -408,7 +410,9 @@ func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (r
} }
if err != nil { if err != nil {
logger.WithError(err).Warn("upstream has error") if err != context.Canceled && err != context.DeadlineExceeded {
logger.WithError(err).Warn("upstream has error")
}
return return
} }
if response == nil { if response == nil {
@ -424,12 +428,12 @@ func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (r
resultResponse, resultCh, resultErr = response, ch, err resultResponse, resultCh, resultErr = response, ch, err
selectedCh <- idx selectedCh <- idx
for idx, cancel := range cancelFuncs { for cancelIdx, cancel := range cancelFuncs {
if resultIdx == idx { if cancelIdx == idx {
logrus.WithField("upstreamIdx", idx).Trace("selected thus not canceled") logrus.WithField("upstreamIdx", cancelIdx).Trace("selected thus not canceled")
continue continue
} }
logrus.WithField("upstreamIdx", idx).Trace("not selected and thus canceled") logrus.WithField("upstreamIdx", cancelIdx).Trace("not selected and thus canceled")
cancel() cancel()
} }