fix memory object release and io.ReadAtLeast

This commit is contained in:
guochao 2024-12-19 10:58:46 +08:00
parent ec71986af2
commit 068a4b5b7c

View File

@ -368,6 +368,8 @@ func (server *Server) streamOnline(w http.ResponseWriter, r *http.Request, mtime
server.lu.Unlock() server.lu.Unlock()
locked = false locked = false
err = nil
if w != nil { if w != nil {
memoryObject.wg.Add(1) memoryObject.wg.Add(1)
@ -379,7 +381,6 @@ func (server *Server) streamOnline(w http.ResponseWriter, r *http.Request, mtime
go memoryObject.StreamTo(w, memoryObject.wg) go memoryObject.StreamTo(w, memoryObject.wg)
} }
err = nil
for chunk := range chunks { for chunk := range chunks {
if chunk.error != nil { if chunk.error != nil {
err = chunk.error err = chunk.error
@ -390,7 +391,6 @@ func (server *Server) streamOnline(w http.ResponseWriter, r *http.Request, mtime
} }
n, _ := buffer.Write(chunk.buffer) n, _ := buffer.Write(chunk.buffer)
memoryObject.Offset += n memoryObject.Offset += n
} }
cancel() cancel()
@ -398,47 +398,47 @@ func (server *Server) streamOnline(w http.ResponseWriter, r *http.Request, mtime
if err != nil { if err != nil {
logrus.WithError(err).WithField("upstreamIdx", selectedIdx).Error("something happened during download. will not cache this response") logrus.WithError(err).WithField("upstreamIdx", selectedIdx).Error("something happened during download. will not cache this response")
return
} }
go func() { go func() {
logrus.Trace("preparing to release memory object") if err == nil {
mtime := zeroTime logrus.Trace("preparing to release memory object")
lastModifiedHeader := response.Header.Get("Last-Modified") mtime := zeroTime
if lastModified, err := time.Parse(time.RFC1123, lastModifiedHeader); err != nil { lastModifiedHeader := response.Header.Get("Last-Modified")
logrus.WithError(err).WithFields(logrus.Fields{ if lastModified, err := time.Parse(time.RFC1123, lastModifiedHeader); err != nil {
"value": lastModifiedHeader, logrus.WithError(err).WithFields(logrus.Fields{
"url": response.Request.URL, "value": lastModifiedHeader,
}).Trace("failed to parse last modified header value") "url": response.Request.URL,
} else { }).Trace("failed to parse last modified header value")
mtime = lastModified } else {
} mtime = lastModified
if err := os.MkdirAll(server.Storage.Local.Path, 0755); err != nil { }
logrus.Warn(err) if err := os.MkdirAll(server.Storage.Local.Path, 0755); err != nil {
} logrus.Warn(err)
fp, err := os.CreateTemp(server.Storage.Local.Path, "temp.*") }
name := fp.Name() fp, err := os.CreateTemp(server.Storage.Local.Path, "temp.*")
if err != nil { name := fp.Name()
logrus.WithFields(logrus.Fields{ if err != nil {
"key": key, logrus.WithFields(logrus.Fields{
"path": server.Storage.Local.Path, "key": key,
"pattern": "temp.*", "path": server.Storage.Local.Path,
}).WithError(err).Warn("ftime.Time{}ailed to create template file") "pattern": "temp.*",
} else if _, err := fp.Write(buffer.Bytes()); err != nil { }).WithError(err).Warn("ftime.Time{}ailed to create template file")
fp.Close() } else if _, err := fp.Write(buffer.Bytes()); err != nil {
os.Remove(name) fp.Close()
os.Remove(name)
logrus.WithError(err).Warn("failed to write into template file") logrus.WithError(err).Warn("failed to write into template file")
} else if err := fp.Close(); err != nil { } else if err := fp.Close(); err != nil {
os.Remove(name) os.Remove(name)
logrus.WithError(err).Warn("failed to close template file") logrus.WithError(err).Warn("failed to close template file")
} else { } else {
os.Chtimes(name, zeroTime, mtime) os.Chtimes(name, zeroTime, mtime)
dirname := filepath.Dir(key) dirname := filepath.Dir(key)
os.MkdirAll(dirname, 0755) os.MkdirAll(dirname, 0755)
os.Remove(key) os.Remove(key)
os.Rename(name, key) os.Rename(name, key)
}
} }
server.lu.Lock() server.lu.Lock()
@ -447,6 +447,7 @@ func (server *Server) streamOnline(w http.ResponseWriter, r *http.Request, mtime
delete(server.o, r.URL.Path) delete(server.o, r.URL.Path)
logrus.Trace("memory object released") logrus.Trace("memory object released")
}() }()
} }
} }
@ -583,6 +584,8 @@ func (server *Server) tryUpstream(ctx context.Context, upstreamIdx int, r *http.
return response, nil, fmt.Errorf("unexpected status(url=%v): %v: %v", newurl, response.StatusCode, response) return response, nil, fmt.Errorf("unexpected status(url=%v): %v: %v", newurl, response.StatusCode, response)
} }
var currentOffset int64
ch := make(chan Chunk, 1024) ch := make(chan Chunk, 1024)
buffer := make([]byte, server.Misc.FirstChunkBytes) buffer := make([]byte, server.Misc.FirstChunkBytes)
@ -605,14 +608,14 @@ func (server *Server) tryUpstream(ctx context.Context, upstreamIdx int, r *http.
n, err := io.ReadAtLeast(response.Body, buffer, len(buffer)) n, err := io.ReadAtLeast(response.Body, buffer, len(buffer))
if n > 0 { if n > 0 {
ch <- Chunk{buffer: buffer[:n]} ch <- Chunk{buffer: buffer[:n]}
currentOffset += int64(n)
} }
if err == io.EOF { if response.ContentLength > 0 && currentOffset == response.ContentLength && err == io.EOF || err == io.ErrUnexpectedEOF {
logger.Trace("done") logger.Trace("done")
return return
} }
if err != nil { if err != nil {
ch <- Chunk{error: err} ch <- Chunk{error: err}
logger.WithError(err).Trace("failed")
return return
} }
} }