From 068a4b5b7c6dcb90104a82c8c1981394c9eac3c7 Mon Sep 17 00:00:00 2001 From: guochao Date: Thu, 19 Dec 2024 10:58:46 +0800 Subject: [PATCH] fix memory object release and io.ReadAtLeast --- cmd/proxy/main.go | 85 ++++++++++++++++++++++++----------------------- 1 file changed, 44 insertions(+), 41 deletions(-) diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index 4a35ba9..ffbe5cf 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -368,6 +368,8 @@ func (server *Server) streamOnline(w http.ResponseWriter, r *http.Request, mtime server.lu.Unlock() locked = false + err = nil + if w != nil { memoryObject.wg.Add(1) @@ -379,7 +381,6 @@ func (server *Server) streamOnline(w http.ResponseWriter, r *http.Request, mtime go memoryObject.StreamTo(w, memoryObject.wg) } - err = nil for chunk := range chunks { if chunk.error != nil { err = chunk.error @@ -390,7 +391,6 @@ func (server *Server) streamOnline(w http.ResponseWriter, r *http.Request, mtime } n, _ := buffer.Write(chunk.buffer) memoryObject.Offset += n - } cancel() @@ -398,47 +398,47 @@ func (server *Server) streamOnline(w http.ResponseWriter, r *http.Request, mtime if err != nil { logrus.WithError(err).WithField("upstreamIdx", selectedIdx).Error("something happened during download. will not cache this response") - return } - go func() { - logrus.Trace("preparing to release memory object") - mtime := zeroTime - lastModifiedHeader := response.Header.Get("Last-Modified") - if lastModified, err := time.Parse(time.RFC1123, lastModifiedHeader); err != nil { - logrus.WithError(err).WithFields(logrus.Fields{ - "value": lastModifiedHeader, - "url": response.Request.URL, - }).Trace("failed to parse last modified header value") - } else { - mtime = lastModified - } - if err := os.MkdirAll(server.Storage.Local.Path, 0755); err != nil { - logrus.Warn(err) - } - fp, err := os.CreateTemp(server.Storage.Local.Path, "temp.*") - name := fp.Name() - if err != nil { - logrus.WithFields(logrus.Fields{ - "key": key, - "path": server.Storage.Local.Path, - "pattern": "temp.*", - }).WithError(err).Warn("ftime.Time{}ailed to create template file") - } else if _, err := fp.Write(buffer.Bytes()); err != nil { - fp.Close() - os.Remove(name) + if err == nil { + logrus.Trace("preparing to release memory object") + mtime := zeroTime + lastModifiedHeader := response.Header.Get("Last-Modified") + if lastModified, err := time.Parse(time.RFC1123, lastModifiedHeader); err != nil { + logrus.WithError(err).WithFields(logrus.Fields{ + "value": lastModifiedHeader, + "url": response.Request.URL, + }).Trace("failed to parse last modified header value") + } else { + mtime = lastModified + } + if err := os.MkdirAll(server.Storage.Local.Path, 0755); err != nil { + logrus.Warn(err) + } + fp, err := os.CreateTemp(server.Storage.Local.Path, "temp.*") + name := fp.Name() + if err != nil { + logrus.WithFields(logrus.Fields{ + "key": key, + "path": server.Storage.Local.Path, + "pattern": "temp.*", + }).WithError(err).Warn("ftime.Time{}ailed to create template file") + } else if _, err := fp.Write(buffer.Bytes()); err != nil { + fp.Close() + os.Remove(name) - logrus.WithError(err).Warn("failed to write into template file") - } else if err := fp.Close(); err != nil { - os.Remove(name) + logrus.WithError(err).Warn("failed to write into template file") + } else if err := fp.Close(); err != nil { + os.Remove(name) - logrus.WithError(err).Warn("failed to close template file") - } else { - os.Chtimes(name, zeroTime, mtime) - dirname := filepath.Dir(key) - os.MkdirAll(dirname, 0755) - os.Remove(key) - os.Rename(name, key) + logrus.WithError(err).Warn("failed to close template file") + } else { + os.Chtimes(name, zeroTime, mtime) + dirname := filepath.Dir(key) + os.MkdirAll(dirname, 0755) + os.Remove(key) + os.Rename(name, key) + } } server.lu.Lock() @@ -447,6 +447,7 @@ func (server *Server) streamOnline(w http.ResponseWriter, r *http.Request, mtime delete(server.o, r.URL.Path) logrus.Trace("memory object released") }() + } } @@ -583,6 +584,8 @@ func (server *Server) tryUpstream(ctx context.Context, upstreamIdx int, r *http. return response, nil, fmt.Errorf("unexpected status(url=%v): %v: %v", newurl, response.StatusCode, response) } + var currentOffset int64 + ch := make(chan Chunk, 1024) buffer := make([]byte, server.Misc.FirstChunkBytes) @@ -605,14 +608,14 @@ func (server *Server) tryUpstream(ctx context.Context, upstreamIdx int, r *http. n, err := io.ReadAtLeast(response.Body, buffer, len(buffer)) if n > 0 { ch <- Chunk{buffer: buffer[:n]} + currentOffset += int64(n) } - if err == io.EOF { + if response.ContentLength > 0 && currentOffset == response.ContentLength && err == io.EOF || err == io.ErrUnexpectedEOF { logger.Trace("done") return } if err != nil { ch <- Chunk{error: err} - logger.WithError(err).Trace("failed") return } }