6 Commits

Author SHA1 Message Date
80560f7408 cline optimization on range request and memory usage
All checks were successful
build container / build-container (push) Successful in 29m33s
run go test / test (push) Successful in 26m15s
2025-06-10 17:44:44 +08:00
147659b0da test and fix bugs found by cline
All checks were successful
build container / build-container (push) Successful in 28m57s
run go test / test (push) Successful in 25m34s
2025-06-10 14:22:43 +08:00
2a0bd28958 use cline and prepare to test the project for further development
Some checks failed
build container / build-container (push) Has been cancelled
2025-06-10 14:14:32 +08:00
835045346d add header checker
Some checks failed
build container / build-container (push) Successful in 33m30s
run go test / test (push) Failing after 3s
2025-04-01 14:09:43 +08:00
85968bb5cf update matcher to path matcher 2025-04-01 11:12:34 +08:00
e14bcb205b rename handlerlog to handlerctx
All checks were successful
build container / build-container (push) Successful in 6m4s
run go test / test (push) Successful in 3m23s
2025-03-03 09:59:19 +08:00
14 changed files with 2516 additions and 335 deletions

125
.clinerules Normal file
View File

@ -0,0 +1,125 @@
# Project
## User Language
Simplified Chinese
## Development
- TDD
# Cline's Memory Bank
I am Cline, an expert software engineer with a unique characteristic: my memory resets completely between sessions. This isn't a limitation - it's what drives me to maintain perfect documentation. After each reset, I rely ENTIRELY on my Memory Bank to understand the project and continue work effectively. I MUST read ALL memory bank files at the start of EVERY task - this is not optional.
## Memory Bank Structure
The Memory Bank consists of core files and optional context files, all in Markdown format. Files build upon each other in a clear hierarchy:
flowchart TD
PB[projectbrief.md] --> PC[productContext.md]
PB --> SP[systemPatterns.md]
PB --> TC[techContext.md]
PC --> AC[activeContext.md]
SP --> AC
TC --> AC
AC --> P[progress.md]
### Core Files (Required)
1. `projectbrief.md`
- Foundation document that shapes all other files
- Created at project start if it doesn't exist
- Defines core requirements and goals
- Source of truth for project scope
2. `productContext.md`
- Why this project exists
- Problems it solves
- How it should work
- User experience goals
3. `activeContext.md`
- Current work focus
- Recent changes
- Next steps
- Active decisions and considerations
- Important patterns and preferences
- Learnings and project insights
4. `systemPatterns.md`
- System architecture
- Key technical decisions
- Design patterns in use
- Component relationships
- Critical implementation paths
5. `techContext.md`
- Technologies used
- Development setup
- Technical constraints
- Dependencies
- Tool usage patterns
6. `progress.md`
- What works
- What's left to build
- Current status
- Known issues
- Evolution of project decisions
### Additional Context
Create additional files/folders within memory-bank/ when they help organize:
- Complex feature documentation
- Integration specifications
- API documentation
- Testing strategies
- Deployment procedures
## Core Workflows
### Plan Mode
flowchart TD
Start[Start] --> ReadFiles[Read Memory Bank]
ReadFiles --> CheckFiles{Files Complete?}
CheckFiles -->|No| Plan[Create Plan]
Plan --> Document[Document in Chat]
CheckFiles -->|Yes| Verify[Verify Context]
Verify --> Strategy[Develop Strategy]
Strategy --> Present[Present Approach]
### Act Mode
flowchart TD
Start[Start] --> Context[Check Memory Bank]
Context --> Update[Update Documentation]
Update --> Execute[Execute Task]
Execute --> Document[Document Changes]
## Documentation Updates
Memory Bank updates occur when:
1. Discovering new project patterns
2. After implementing significant changes
3. When user requests with **update memory bank** (MUST review ALL files)
4. When context needs clarification
flowchart TD
Start[Update Process]
subgraph Process
P1[Review ALL Files]
P2[Document Current State]
P3[Clarify Next Steps]
P4[Document Insights & Patterns]
P1 --> P2 --> P3 --> P4
end
Start --> Process
Note: When triggered by **update memory bank**, I MUST review every memory bank file, even if some don't require updates. Focus particularly on activeContext.md and progress.md as they track current state.
REMEMBER: After every memory reset, I begin completely fresh. The Memory Bank is my only link to previous work. It must be maintained with precision and clarity, as my effectiveness depends entirely on its accuracy.

View File

@ -11,7 +11,7 @@ import (
cacheproxy "git.jeffthecoder.xyz/guochao/cache-proxy"
"git.jeffthecoder.xyz/guochao/cache-proxy/pkgs/middleware"
"git.jeffthecoder.xyz/guochao/cache-proxy/pkgs/middleware/handlerlog"
"git.jeffthecoder.xyz/guochao/cache-proxy/pkgs/middleware/handlerctx"
"git.jeffthecoder.xyz/guochao/cache-proxy/pkgs/middleware/httplog"
"git.jeffthecoder.xyz/guochao/cache-proxy/pkgs/middleware/recover"
@ -133,7 +133,7 @@ func main() {
slog.With("addr", ":8881").Info("serving app")
if err := http.ListenAndServe(":8881", middleware.Use(mux,
recover.Recover(),
handlerlog.FindHandler(mux),
handlerctx.FindHandler(mux),
httplog.Log(httplog.Config{LogStart: true, LogFinish: true}),
)); err != nil {
slog.With("error", err).Error("failed to start server")

View File

@ -10,27 +10,38 @@ type UpstreamPriorityGroup struct {
Priority int `yaml:"priority"`
}
type UpstreamMatch struct {
type PathTransformation struct {
Match string `yaml:"match"`
Replace string `yaml:"replace"`
}
type HeaderChecker struct {
Name string `yaml:"name"`
Match *string `yaml:"match"`
}
type Checker struct {
StatusCodes []int `yaml:"status-codes"`
Headers []HeaderChecker `yaml:"headers"`
}
type Upstream struct {
Server string `yaml:"server"`
Match UpstreamMatch `yaml:"match"`
Path PathTransformation `yaml:"path"`
Checkers []Checker `yaml:"checkers"`
AllowedRedirect *string `yaml:"allowed-redirect"`
PriorityGroups []UpstreamPriorityGroup `yaml:"priority-groups"`
}
func (upstream Upstream) GetPath(orig string) (string, bool, error) {
if upstream.Match.Match == "" || upstream.Match.Replace == "" {
if upstream.Path.Match == "" || upstream.Path.Replace == "" {
return orig, true, nil
}
matcher, err := regexp.Compile(upstream.Match.Match)
matcher, err := regexp.Compile(upstream.Path.Match)
if err != nil {
return "", false, err
}
return matcher.ReplaceAllString(orig, upstream.Match.Replace), matcher.MatchString(orig), nil
return matcher.ReplaceAllString(orig, upstream.Path.Replace), matcher.MatchString(orig), nil
}
type LocalStorage struct {

View File

@ -1,91 +1,103 @@
upstream:
- server: https://mirrors.aliyun.com
match:
match: /(debian|ubuntu|ubuntu-releases|alpine|archlinux|kali|manjaro|msys2|almalinux|rocky|centos|centos-stream|centos-vault|fedora|epel|gnu|pypi)/(.*)
path:
match: /(debian|ubuntu|ubuntu-ports|ubuntu-releases|alpine|archlinux|kali|manjaro|msys2|almalinux|rocky|centos|centos-stream|centos-vault|fedora|epel|gnu|pypi)/(.*)
replace: '/$1/$2'
- server: https://mirrors.tencent.com
match:
match: /(debian|ubuntu|ubuntu-releases|alpine|archlinux|kali|manjaro|msys2|almalinux|rocky|centos|centos-stream|centos-vault|fedora|epel|gnu)/(.*)
path:
match: /(debian|ubuntu|ubuntu-ports|ubuntu-releases|alpine|archlinux|kali|manjaro|msys2|almalinux|rocky|centos|centos-stream|centos-vault|fedora|epel|gnu)/(.*)
replace: '/$1/$2'
- server: https://mirrors.ustc.edu.cn
match:
path:
match: /(debian|ubuntu|ubuntu-releases|alpine|archlinux|kali|manjaro|msys2|almalinux|rocky|centos|centos-stream|centos-vault|fedora|epel|elrepo|remi|rpmfusion|tailscale|gnu|rust-static|pypi)/(.*)
replace: '/$1/$2'
checkers:
headers:
- name: content-type
match: application/octet-stream
- server: https://packages.microsoft.com/repos/code
match:
path:
match: /microsoft-code(.*)
replace: '$1'
- server: https://archive.ubuntu.com
match:
path:
match: /ubuntu/(.*)
replace: /$1
- server: https://ports.ubuntu.com
path:
match: /ubuntu-ports/(.*)
replace: /$1
# priority-groups:
# - match: /dists/.*
# priority: 10
- server: https://releases.ubuntu.com/
match:
path:
match: /ubuntu-releases/(.*)
replace: /$1
- server: https://apt.releases.hashicorp.com
match:
path:
match: /hashicorp-deb/(.*)
replace: /$1
- server: https://rpm.releases.hashicorp.com
match:
path:
match: /hashicorp-rpm/(.*)
replace: /$1
- server: https://dl.google.com/linux/chrome
match:
path:
match: /google-chrome/(.*)
replace: /$1
- server: https://pkgs.tailscale.com/stable
match:
path:
match: /tailscale/(.*)
replace: /$1
- server: http://update.cs2c.com.cn:8080
match:
path:
match: /kylinlinux/(.*)
replace: /$1
- server: https://cdn.jsdelivr.net
match:
path:
match: /jsdelivr/(.*)
replace: /$1
- server: https://dl-cdn.alpinelinux.org
match:
path:
match: /alpine/(.*)
replace: /$1
- server: https://repo.almalinux.org
match:
path:
match: /almalinux/(.*)
replace: /almalinux/$1
- server: https://dl.rockylinux.org/pub/rocky
match:
path:
match: /rocky/(.*)
replace: /$1
- server: https://dl.fedoraproject.org/pub/epel
match:
path:
match: /epel/(.*)
replace: /$1
- server: https://deb.debian.org
match:
path:
match: /(debian|debian-security)/(.*)
replace: /$1/$2
# priority-groups:
# - match: /dists/.*
# priority: 10
- server: https://static.rust-lang.org
match:
path:
match: /rust-static/(.*)
replace: /$1
- server: https://storage.flutter-io.cn
match:
path:
match: /flutter-storage/(.*)
replace: /$1
- server: https://cdn-mirror.chaotic.cx/chaotic-aur
match:
path:
match: /chaotic-aur/(.*)
replace: /$1
allowed-redirect: "^https?://cf-builds.garudalinux.org/.*/chaotic-aur/.*$"
- server: http://download.zfsonlinux.org
path:
match: /openzfs/(.*)
replace: /$1
misc:
first-chunk-bytes: 31457280 ## 1024*1024*30, all upstreams with 200 response will wait for the first chunks to select a upstream by bandwidth, instead of by latency. defaults to 50M

View File

@ -0,0 +1,33 @@
# 当前工作重点
当前没有正在进行的紧急任务。项目处于稳定状态,可以根据 `progress.md` 中的待办事项列表来规划接下来的工作。
## 近期变更
- **实现流式请求的阶段二优化 (基于临时文件)**:
- **问题**: `StreamObject` 使用内存中的 `bytes.Buffer` 来缓存下载内容,在处理大文件时会导致内存占用过高。
- **解决方案**: 将 `StreamObject``Buffer` 替换为 `*os.File`,将下载的字节流直接写入临时文件。
- **实现细节**:
- `startOrJoinStream` (生产者) 现在创建临时文件并将下载内容写入其中。下载成功后,该临时文件会被重命名为最终的缓存文件。
- `serveRangedRequest` (消费者) 为了解决文件句柄的生命周期竞态问题,采用了 `syscall.Dup()` 来复制生产者的文件描述符。每个消费者都使用自己独立的、复制出来的文件句柄来读取数据,从而与生产者的文件句柄生命周期解耦。
- **结果**: 解决了大文件的内存占用问题,并通过了所有测试,包括一个专门为验证并发安全性和竞态条件而设计的新测试。
## 后续步骤
下一个合乎逻辑的步骤是处理 `progress.md` 中列出的待办事项,例如:
1. **功能增强**:
- **多后端支持**: 增加对 S3、Redis 等其他存储后端的支持。
- **高级负载均衡**: 实现更复杂的负载均衡策略。
- **监控**: 集成 Prometheus 指标。
2. **代码重构**:
-`server.go` 中的复杂函数进行重构,提高可读性。
在开始新任务之前,需要与您确认下一个工作重点。
## 重要模式与偏好
- **代码风格**: 遵循 Go 社区的最佳实践,代码清晰、简洁,并有适当的注释。
- **并发编程**: 倾向于使用 Go 的原生并发原语goroutine 和 channel来解决并发问题。
- **配置驱动**: 核心逻辑应与配置分离,使得项目可以通过修改配置文件来适应不同的使用场景,而无需修改代码。
- **文档驱动**: 所有重要的设计决策、架构变更和功能实现都应在 Memory Bank 中有相应的记录。

View File

@ -0,0 +1,26 @@
# 产品背景
在许多场景下,用户需要从软件镜像站、文件服务器或其他资源站点下载文件。然而,由于网络波动、服务器负载或地理位置等原因,单个服务器的下载速度和稳定性可能无法得到保证。
## 解决的问题
`cache-proxy` 旨在解决以下问题:
1. **单点故障**: 当用户依赖的唯一镜像服务器宕机或无法访问时,下载会中断。
2. **网络速度不佳**: 用户可能没有连接到最快的可用服务器,导致下载时间过长。
3. **重复下载**: 多个用户或同一个用户多次下载相同的文件时,会产生不必要的网络流量,增加了服务器的负载。
## 工作原理
`cache-proxy` 通过在用户和多个上游服务器之间引入一个代理层来解决这些问题。
- 当收到一个文件请求时,它会并发地向所有配置的上游服务器请求该文件。
- 它会选择第一个返回成功响应的上游服务器,并将数据流式传输给用户。
- 同时,它会将文件缓存到本地磁盘。
- 对于后续的相同文件请求,如果缓存未过期,它将直接从本地提供文件,极大地提高了响应速度并减少了网络流量。
## 用户体验目标
- **无缝集成**: 用户只需将下载地址指向 `cache-proxy` 即可,无需关心后端的复杂性。
- **更快的下载**: 通过竞争机制选择最快的源,用户总能获得最佳的下载体验。
- **更高的可用性**: 即使部分上游服务器出现问题,只要有一个可用,服务就不会中断。

44
memory-bank/progress.md Normal file
View File

@ -0,0 +1,44 @@
# 项目进展
这是一个已完成的 `cache-proxy` 项目的初始状态。核心功能已经实现并可以工作。
## 已完成的功能
- **核心代理逻辑**:
-`config.yaml` 加载配置。
- 启动 HTTP 服务器并监听请求。
- 根据请求路径检查本地缓存。
- **并发上游请求**:
- 能够并发地向上游服务器发起请求。
- 能够正确地选择最快响应的服务器。
- 能够在选择一个服务器后取消其他请求。
- **缓存管理**:
- 能够将下载的文件缓存到本地磁盘。
- 支持基于时间的缓存刷新策略。
- 支持通过 `If-Modified-Since` 请求头来减少不必要的数据传输。
- **并发请求处理**:
- 能够正确处理对同一文件的多个并发请求,确保只下载一次。
- **加速下载**:
- 支持通过 `X-Sendfile` / `X-Accel-Redirect` 头将文件发送委托给前端服务器(如 Nginx
- **全面的测试覆盖**:
- 完成了 `server_test.go` 的实现,为所有核心功能提供了单元测试和集成测试。
- 测试覆盖了正常流程、边缘情况(如超时、上游失败)和安全(如路径穿越)等方面。
- 对测试代码和注释进行了审查,确保其准确性和一致性。
- 所有测试均已通过,验证了现有代码的健壮性。
## 待办事项
- **功能增强**:
- **实现流式范围请求 (Done)**: 重构了 Ranged Request 处理流程,实现了边下载边响应。
- **阶段一 (已完成)**: 使用 `sync.Cond` 和内存 `bytes.Buffer` 实现。
- **性能优化 (已完成)**: 移除了 `StreamObject` 清理逻辑中的固定延迟,改为依赖 Go 的 GC 机制,显著提升了并发请求下的测试性能。
- **阶段二 (已完成)**: 使用临时文件和 `syscall.Dup()` 系统调用优化,解决了大文件内存占用问题。
- 目前只支持本地文件存储,未来可以考虑增加对其他存储后端(如 S3、Redis的支持。
- 增加更复杂的负载均衡策略,而不仅仅是“选择最快”。
- 增加更详细的监控和指标(如 Prometheus metrics
- **代码优化**:
- 在完成流式请求功能后,对 `server.go` 中的一些复杂函数(如 `streamOnline`)进行重构,以提高可读性和可维护性。
## 已知问题
- 在当前阶段,尚未发现明显的 bug。代码结构清晰逻辑完整。

View File

@ -0,0 +1,10 @@
# 项目简介Cache-Proxy
这是一个使用 Go 语言编写的高性能缓存代理服务器。其核心功能是接收客户端的请求并将其转发到多个配置好的上游Upstream镜像服务器。它会并发地向上游服务器发起请求并选择最快返回响应的服务器将其结果返回给客户端同时将结果缓存到本地以便后续请求能够更快地得到响应。
## 核心需求
- **性能**: 必须能够快速地处理并发请求,并有效地利用缓存。
- **可靠性**: 当某个上游服务器不可用时,能够自动切换到其他可用的服务器。
- **可配置性**: 用户可以通过配置文件轻松地添加、删除和配置上游服务器、缓存策略以及其他行为。
- **透明性**: 对客户端来说,代理应该是透明的,客户端只需像访问普通服务器一样访问代理即可。

View File

@ -0,0 +1,62 @@
# 系统架构与设计模式
`cache-proxy` 的系统设计遵循了几个关键的模式和原则,以实现其高性能和高可用性的目标。
## 核心架构
系统可以分为三个主要部分:
1. **HTTP 服务器层**: 负责接收客户端请求,并使用中间件进行日志记录、错误恢复等通用处理。
2. **缓存处理层**: 检查请求的文件是否存在于本地缓存中,并根据缓存策略决定是直接提供缓存文件还是向上游请求。
3. **上游选择与下载层**: 这是系统的核心,负责并发地从多个上游服务器获取数据,并管理下载过程。
## 关键设计模式
### 1. 竞争式请求 (Racing Requests)
这是实现“选择最快”功能的核心模式。
- `fastesUpstream` 函数为每个上游服务器创建一个 goroutine。
- 所有 goroutine 并发地向上游服务器发送请求。
- 使用 `sync.Once` 来确保只有一个 goroutine 能够“胜出”并成为最终的数据源。
- 一旦有 goroutine 胜出,它会调用 `context.CancelFunc` 来通知所有其他 goroutine 停止工作,从而避免不必要的资源消耗。
### 2. 生产者-消费者模式 (Producer-Consumer)
在文件下载和流式响应Ranged Request使用了生产者-消费者模式。
- **生产者 (`startOrJoinStream`)**:
- 为每个首次请求的文件启动一个 goroutine。
- 负责从最快的上游服务器下载文件内容。
- 将内容**直接写入一个临时文件** (`*os.File`) 中,而不是写入内存缓冲区。
- 通过 `sync.Cond` 广播下载进度(已写入的字节数 `Offset`)。
- 下载成功后,将临时文件重命名为最终的缓存文件。
- **消费者 (`serveRangedRequest`)**:
- 当收到一个范围请求时,它会找到或等待对应的 `StreamObject`
- 为了安全地并发读取正在被生产者写入的文件,消费者会使用 `syscall.Dup()` **复制临时文件的文件描述符**
- 每个消费者都通过自己独立的、复制出来的文件句柄 (`*os.File`) 读取所需范围的数据,这避免了与生产者或其他消费者发生文件句柄状态的冲突。
- 消费者根据生产者的 `Offset` 进度和 `sync.Cond` 信号来等待其请求范围的数据变为可用。
### 3. 并发访问控制与对象生命周期管理
为了处理多个客户端同时请求同一个文件的情况,系统使用了 `sync.Mutex` 和一个 `map[string]*StreamObject`
- 当第一个对某文件的请求(我们称之为“消费者”)到达时,它会获得一个锁,并创建一个 `StreamObject` 来代表这个正在进行的下载任务然后启动一个“生产者”goroutine 来执行下载。
- 后续对同一文件的请求会发现 `StreamObject` 已存在于 map 中,它们不会再次启动下载,而是会共享这个对象。
**`StreamObject` 生命周期管理 (基于 GC)**
我们采用了一种简洁且高效的、依赖 Go 语言垃圾回收GC的模式来管理 `StreamObject` 的生命周期:
1. **生产者负责移除**: 下载 goroutine生产者在完成其任务无论成功或失败其唯一的职责就是将 `StreamObject` 从全局的 `map[string]*StreamObject` 中移除。
2. **消费者持有引用**: 与此同时,所有正在处理该文件请求的 HTTP Handler消费者仍然持有对该 `StreamObject` 的引用。
3. **GC 自动回收**: 因为消费者们还持有引用Go 的 GC 不会回收这个对象。只有当最后一个消费者处理完请求、其函数栈帧销毁后,对 `StreamObject` 的最后一个引用才会消失。此时GC 会在下一次运行时自动回收该对象的内存。
这个模式避免了复杂的引用计数或定时器,代码更简洁,并且从根本上解决了之前因固定延迟导致的性能问题。
### 4. 中间件 (Middleware)
项目使用了 Go 的标准 `http.Handler` 接口和中间件模式来构建请求处理链。
- `pkgs/middleware` 目录中定义了可重用的中间件,如 `httplog``recover`
- 这种模式使得在不修改核心业务逻辑的情况下,可以轻松地添加或删除日志、认证、错误处理等功能。

View File

@ -0,0 +1,26 @@
# 技术栈与开发环境
## 核心技术
- **语言**: Go (Golang)
- **主要依赖**:
- `gopkg.in/yaml.v3`: 用于解析 `config.yaml` 配置文件。
- `log/slog`: Go 1.21+ 内置的结构化日志库。
- `net/http`: 用于构建 HTTP 服务器和客户端。
- `github.com/getsentry/sentry-go`: (可选)用于错误追踪。
## 开发与构建
- **依赖管理**: 使用 Go Modules (`go.mod`, `go.sum`) 进行依赖管理。
- **配置文件**: 项目的行为由 `config.yaml` 文件驱动。
- **容器化**:
- `Dockerfile`: 用于构建项目的 Docker 镜像。
- `compose.yaml`: 用于在开发环境中启动服务。
- `compose.release.yaml`: 用于在生产环境中部署服务。
## 关键技术点
- **并发模型**: 大量使用 goroutine 和 channel 来实现高并发。特别是 `fastesUpstream` 函数中的并发请求模式。
- **流式处理**: 通过 `io.Reader``io.Writer` 接口,数据以流的形式被处理,从未完全加载到内存中,这对于处理大文件至关重要。
- **错误处理**: 在 Go 的标准错误处理之上,项目在某些关键路径(如 `fastesUpstream`)中使用了 `context` 包来处理超时和取消操作。
- **结构化日志**: 使用 `slog` 库,可以输出结构化的、易于机器解析的日志,方便调试和监控。

View File

@ -1,4 +1,4 @@
package handlerlog
package handlerctx
import (
"context"

View File

@ -6,7 +6,7 @@ import (
"net/http"
"time"
"git.jeffthecoder.xyz/guochao/cache-proxy/pkgs/middleware/handlerlog"
"git.jeffthecoder.xyz/guochao/cache-proxy/pkgs/middleware/handlerctx"
)
type ctxKey int
@ -43,7 +43,7 @@ func Log(config Config) func(http.Handler) http.Handler {
"path", r.URL.Path,
}
if pattern, handlerFound := handlerlog.Pattern(r); handlerFound {
if pattern, handlerFound := handlerctx.Pattern(r); handlerFound {
args = append(args, "handler", pattern)
}

723
server.go
View File

@ -1,20 +1,19 @@
package cacheproxy
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"log/slog"
"net"
"net/http"
"os"
"path/filepath"
"regexp"
"slices"
"strconv"
"strings"
"sync"
"syscall"
"time"
)
@ -26,12 +25,17 @@ const (
var zeroTime time.Time
var preclosedChan = make(chan struct{})
func init() {
close(preclosedChan)
}
var (
httpClient = http.Client{
// check allowed redirect
CheckRedirect: func(req *http.Request, via []*http.Request) error {
lastRequest := via[len(via)-1]
if allowedRedirect, ok := lastRequest.Context().Value(reqCtxAllowedRedirect).(string); ok {
if allowedRedirect, ok := req.Context().Value(reqCtxAllowedRedirect).(string); ok {
if matched, err := regexp.MatchString(allowedRedirect, req.URL.String()); err != nil {
return err
} else if !matched {
@ -45,44 +49,16 @@ var (
)
type StreamObject struct {
Headers http.Header
Buffer *bytes.Buffer
Offset int
Headers http.Header
TempFile *os.File // The temporary file holding the download content.
Offset int64 // The number of bytes written to TempFile.
Done bool
Error error
ctx context.Context
wg *sync.WaitGroup
}
mu *sync.Mutex
cond *sync.Cond
func (memoryObject *StreamObject) StreamTo(w io.Writer, wg *sync.WaitGroup) error {
defer wg.Done()
offset := 0
if w == nil {
w = io.Discard
}
OUTER:
for {
select {
case <-memoryObject.ctx.Done():
break OUTER
default:
}
newOffset := memoryObject.Offset
if newOffset == offset {
time.Sleep(time.Millisecond)
continue
}
bytes := memoryObject.Buffer.Bytes()[offset:newOffset]
written, err := w.Write(bytes)
if err != nil {
return err
}
offset += written
}
time.Sleep(time.Millisecond)
_, err := w.Write(memoryObject.Buffer.Bytes()[offset:])
return err
fileWrittenCh chan struct{} // Closed when the file is fully written and renamed.
}
type Server struct {
@ -101,11 +77,6 @@ func NewServer(config Config) *Server {
}
}
type Chunk struct {
buffer []byte
error error
}
func (server *Server) serveFile(w http.ResponseWriter, r *http.Request, path string) {
if location := r.Header.Get(server.Storage.Local.Accel.EnableByHeader); server.Storage.Local.Accel.EnableByHeader != "" && location != "" {
relPath, err := filepath.Rel(server.Storage.Local.Path, path)
@ -143,26 +114,29 @@ func (server *Server) HandleRequestWithCache(w http.ResponseWriter, r *http.Requ
slog.With("status", localStatus, "mtime", mtime, "error", err, "key", fullpath).Debug("local status checked")
if os.IsPermission(err) {
http.Error(w, err.Error(), http.StatusForbidden)
return
} else if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
} else if localStatus != localNotExists {
if localStatus == localExistsButNeedHead {
if ranged {
server.streamOnline(nil, r, mtime, fullpath)
server.serveFile(w, r, fullpath)
} else {
server.streamOnline(w, r, mtime, fullpath)
}
} else {
server.serveFile(w, r, fullpath)
}
return
}
if localStatus == localExists {
server.serveFile(w, r, fullpath)
return
}
// localExistsButNeedHead or localNotExists
// Both need to go online.
if ranged {
server.serveRangedRequest(w, r, fullpath, mtime)
} else {
if ranged {
server.streamOnline(nil, r, mtime, fullpath)
server.serveFile(w, r, fullpath)
} else {
server.streamOnline(w, r, mtime, fullpath)
// For full requests, we wait for the download to complete and then serve the file.
// This maintains the original behavior for now.
ch := server.startOrJoinStream(r, mtime, fullpath)
if ch != nil {
<-ch
}
server.serveFile(w, r, fullpath)
}
}
@ -210,229 +184,348 @@ func (server *Server) checkLocal(w http.ResponseWriter, _ *http.Request, key str
return localNotExists, zeroTime, nil
}
func (server *Server) streamOnline(w http.ResponseWriter, r *http.Request, mtime time.Time, key string) {
func (server *Server) serveRangedRequest(w http.ResponseWriter, r *http.Request, key string, mtime time.Time) {
server.lu.Lock()
memoryObject, exists := server.o[r.URL.Path]
locked := false
defer func() {
if locked {
server.lu.Unlock()
locked = false
}
}()
if !exists {
server.lu.Unlock()
// This is the first request for this file, so we start the download.
server.startOrJoinStream(r, mtime, key)
// Re-acquire the lock to get the newly created stream object.
server.lu.Lock()
locked = true
memoryObject, exists = server.o[r.URL.Path]
}
if exists {
if locked {
memoryObject = server.o[r.URL.Path]
if memoryObject == nil {
server.lu.Unlock()
locked = false
// This can happen if the upstream fails very quickly.
http.Error(w, "Failed to start download stream", http.StatusInternalServerError)
return
}
}
server.lu.Unlock()
if w != nil {
memoryObject.wg.Add(1)
for k := range memoryObject.Headers {
v := memoryObject.Headers.Get(k)
w.Header().Set(k, v)
}
// At this point, we have a memoryObject, and a producer goroutine is downloading the file.
// Now we implement the consumer logic.
if err := memoryObject.StreamTo(w, memoryObject.wg); err != nil {
slog.With("error", err).Warn("failed to stream response with existing memory object")
}
// Duplicate the file descriptor from the producer's temp file. This creates a new,
// independent file descriptor that points to the same underlying file description.
// This is more efficient and robust than opening the file by path.
fd, err := syscall.Dup(int(memoryObject.TempFile.Fd()))
if err != nil {
http.Error(w, "Failed to duplicate file descriptor", http.StatusInternalServerError)
return
}
// We create a new *os.File from the duplicated descriptor. The consumer is now
// responsible for closing this new file.
consumerFile := os.NewFile(uintptr(fd), memoryObject.TempFile.Name())
if consumerFile == nil {
syscall.Close(fd) // Clean up if NewFile fails
http.Error(w, "Failed to create file from duplicated descriptor", http.StatusInternalServerError)
return
}
defer consumerFile.Close()
rangeHeader := r.Header.Get("Range")
if rangeHeader == "" {
// This should not happen if called from HandleRequestWithCache, but as a safeguard:
http.Error(w, "Range header is required", http.StatusBadRequest)
return
}
// Parse the Range header. We only support a single range like "bytes=start-end".
var start, end int64
parts := strings.Split(strings.TrimPrefix(rangeHeader, "bytes="), "-")
if len(parts) != 2 {
http.Error(w, "Invalid Range header", http.StatusBadRequest)
return
}
start, err = strconv.ParseInt(parts[0], 10, 64)
if err != nil {
http.Error(w, "Invalid Range header", http.StatusBadRequest)
return
}
if parts[1] != "" {
end, err = strconv.ParseInt(parts[1], 10, 64)
if err != nil {
http.Error(w, "Invalid Range header", http.StatusBadRequest)
return
}
} else {
slog.With("mtime", mtime).Debug("checking fastest upstream")
selectedIdx, response, chunks, err := server.fastesUpstream(r, mtime)
if chunks == nil && mtime != zeroTime {
slog.With("upstreamIdx", selectedIdx, "key", key).Debug("not modified. using local version")
if w != nil {
server.serveFile(w, r, key)
// An empty end means "to the end of the file". We don't know the full size yet.
// We'll have to handle this dynamically.
end = -1 // Sentinel value
}
memoryObject.mu.Lock()
defer memoryObject.mu.Unlock()
// Wait until we have the headers.
for memoryObject.Headers == nil && !memoryObject.Done {
memoryObject.cond.Wait()
}
if memoryObject.Error != nil {
http.Error(w, memoryObject.Error.Error(), http.StatusInternalServerError)
return
}
contentLengthStr := memoryObject.Headers.Get("Content-Length")
totalSize, _ := strconv.ParseInt(contentLengthStr, 10, 64)
if end == -1 || end >= totalSize {
end = totalSize - 1
}
if start >= totalSize || start > end {
http.Error(w, "Range not satisfiable", http.StatusRequestedRangeNotSatisfiable)
return
}
var bytesSent int64
bytesToSend := end - start + 1
headersWritten := false
for bytesSent < bytesToSend && memoryObject.Error == nil {
// Calculate what we need.
neededStart := start + bytesSent
// Wait for the data to be available.
for memoryObject.Offset <= neededStart && !memoryObject.Done {
memoryObject.cond.Wait()
}
// Check for error AFTER waiting. This is the critical fix.
if memoryObject.Error != nil {
// If headers haven't been written, we can send a 500 error.
// If they have, it's too late, the connection will just be closed.
if !headersWritten {
http.Error(w, memoryObject.Error.Error(), http.StatusInternalServerError)
}
return // Use return instead of break to exit immediately.
}
// If we are here, we have some data to send. Write headers if we haven't already.
if !headersWritten {
w.Header().Set("Content-Range", "bytes "+strconv.FormatInt(start, 10)+"-"+strconv.FormatInt(end, 10)+"/"+contentLengthStr)
w.Header().Set("Content-Length", strconv.FormatInt(end-start+1, 10))
w.Header().Set("Accept-Ranges", "bytes")
w.WriteHeader(http.StatusPartialContent)
headersWritten = true
}
// Data is available, read from the temporary file.
// We calculate how much we can read in this iteration.
readNow := memoryObject.Offset - neededStart
if readNow <= 0 {
// This can happen if we woke up but the data isn't what we need.
// The loop will continue to wait.
continue
}
// Don't read more than the client requested in total.
remainingToSend := bytesToSend - bytesSent
if readNow > remainingToSend {
readNow = remainingToSend
}
// Read the chunk from the file at the correct offset.
buffer := make([]byte, readNow)
bytesRead, err := consumerFile.ReadAt(buffer, neededStart)
if err != nil && err != io.EOF {
if !headersWritten {
http.Error(w, "Error reading from cache stream", http.StatusInternalServerError)
}
return
}
if err != nil {
slog.With("error", err).Warn("failed to select fastest upstream")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if selectedIdx == -1 || response == nil || chunks == nil {
slog.Debug("no upstream is selected")
http.NotFound(w, r)
return
}
if response.StatusCode == http.StatusNotModified {
slog.With("upstreamIdx", selectedIdx).Debug("not modified. using local version")
os.Chtimes(key, zeroTime, time.Now())
server.serveFile(w, r, key)
return
}
slog.With(
"upstreamIdx", selectedIdx,
).Debug("found fastest upstream")
buffer := &bytes.Buffer{}
ctx, cancel := context.WithCancel(r.Context())
defer cancel()
memoryObject = &StreamObject{
Headers: response.Header,
Buffer: buffer,
ctx: ctx,
wg: &sync.WaitGroup{},
}
server.o[r.URL.Path] = memoryObject
server.lu.Unlock()
locked = false
err = nil
if w != nil {
memoryObject.wg.Add(1)
for k := range memoryObject.Headers {
v := memoryObject.Headers.Get(k)
w.Header().Set(k, v)
}
go memoryObject.StreamTo(w, memoryObject.wg)
}
for chunk := range chunks {
if chunk.error != nil {
err = chunk.error
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
slog.With("error", err).Warn("failed to read from upstream")
}
}
if chunk.buffer == nil {
break
}
n, _ := buffer.Write(chunk.buffer)
memoryObject.Offset += n
}
cancel()
memoryObject.wg.Wait()
if response.ContentLength > 0 {
if memoryObject.Offset == int(response.ContentLength) && err != nil {
if !(errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF)) {
slog.With("read-length", memoryObject.Offset, "content-length", response.ContentLength, "error", err, "upstreamIdx", selectedIdx).Debug("something happened during download. but response body is read as whole. so error is reset to nil")
}
err = nil
}
} else if err == io.EOF {
err = nil
}
if err != nil {
logger := slog.With("upstreamIdx", selectedIdx)
logger.Error("something happened during download. will not cache this response. setting lingering to reset the connection.")
hijacker, ok := w.(http.Hijacker)
if !ok {
logger.Warn("response writer is not a hijacker. failed to set lingering")
return
}
conn, _, err := hijacker.Hijack()
if bytesRead > 0 {
n, err := w.Write(buffer[:bytesRead])
if err != nil {
logger.With("error", err).Warn("hijack failed. failed to set lingering")
// Client closed connection, just return.
return
}
defer conn.Close()
tcpConn, ok := conn.(*net.TCPConn)
if !ok {
logger.With("error", err).Warn("connection is not a *net.TCPConn. failed to set lingering")
return
}
if err := tcpConn.SetLinger(0); err != nil {
logger.With("error", err).Warn("failed to set lingering")
return
}
logger.Debug("connection set to linger. it will be reset once the conn.Close is called")
bytesSent += int64(n)
}
go func() {
defer func() {
server.lu.Lock()
defer server.lu.Unlock()
delete(server.o, r.URL.Path)
slog.Debug("memory object released")
}()
if err == nil {
slog.Debug("preparing to release memory object")
mtime := zeroTime
lastModifiedHeader := response.Header.Get("Last-Modified")
if lastModified, err := time.Parse(time.RFC1123, lastModifiedHeader); err != nil {
slog.With(
"error", err,
"value", lastModifiedHeader,
"url", response.Request.URL,
).Debug("failed to parse last modified header value. set modified time to now")
} else {
slog.With(
"header", lastModifiedHeader,
"value", lastModified,
"url", response.Request.URL,
).Debug("found modified time")
mtime = lastModified
}
if err := os.MkdirAll(server.Storage.Local.Path, 0755); err != nil {
slog.With("error", err).Warn("failed to create local storage path")
}
if server.Config.Storage.Local.TemporaryFilePattern == "" {
if err := os.WriteFile(key, buffer.Bytes(), 0644); err != nil {
slog.With("error", err).Warn("failed to write file")
os.Remove(key)
}
return
}
fp, err := os.CreateTemp(server.Storage.Local.Path, server.Storage.Local.TemporaryFilePattern)
if err != nil {
slog.With(
"key", key,
"path", server.Storage.Local.Path,
"pattern", server.Storage.Local.TemporaryFilePattern,
"error", err,
).Warn("failed to create template file")
return
}
name := fp.Name()
if _, err := fp.Write(buffer.Bytes()); err != nil {
fp.Close()
os.Remove(name)
slog.With("error", err).Warn("failed to write into template file")
} else if err := fp.Close(); err != nil {
os.Remove(name)
slog.With("error", err).Warn("failed to close template file")
} else {
os.Chtimes(name, zeroTime, mtime)
dirname := filepath.Dir(key)
os.MkdirAll(dirname, 0755)
os.Remove(key)
os.Rename(name, key)
}
}
}()
if memoryObject.Done && bytesSent >= bytesToSend {
break
}
}
}
func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (resultIdx int, resultResponse *http.Response, resultCh chan Chunk, resultErr error) {
// startOrJoinStream ensures a download stream is active for the given key.
// If a stream already exists, it returns the channel that signals completion.
// If not, it starts a new download producer and returns its completion channel.
func (server *Server) startOrJoinStream(r *http.Request, mtime time.Time, key string) <-chan struct{} {
server.lu.Lock()
memoryObject, exists := server.o[r.URL.Path]
if exists {
// A download is already in progress. Return its completion channel.
server.lu.Unlock()
return memoryObject.fileWrittenCh
}
// No active stream, create a new one.
if err := os.MkdirAll(filepath.Dir(key), 0755); err != nil {
slog.With("error", err).Warn("failed to create local storage path for temp file")
server.lu.Unlock()
return preclosedChan // Return a closed channel to prevent blocking
}
tempFilePattern := server.Storage.Local.TemporaryFilePattern
if tempFilePattern == "" {
tempFilePattern = "cache-proxy-*"
}
tempFile, err := os.CreateTemp(filepath.Dir(key), tempFilePattern)
if err != nil {
slog.With("error", err).Warn("failed to create temporary file")
server.lu.Unlock()
return preclosedChan
}
mu := &sync.Mutex{}
fileWrittenCh := make(chan struct{})
memoryObject = &StreamObject{
TempFile: tempFile,
mu: mu,
cond: sync.NewCond(mu),
fileWrittenCh: fileWrittenCh,
}
server.o[r.URL.Path] = memoryObject
server.lu.Unlock()
// This is the producer goroutine
go func(mo *StreamObject) {
var err error
downloadSucceeded := false
tempFileName := mo.TempFile.Name()
defer func() {
// On completion (or error), update the stream object,
// wake up all consumers, and then clean up.
mo.mu.Lock()
mo.Done = true
mo.Error = err
mo.cond.Broadcast()
mo.mu.Unlock()
// Close the temp file handle.
mo.TempFile.Close()
// If download failed, remove the temp file.
if !downloadSucceeded {
os.Remove(tempFileName)
}
// The producer's job is done. Remove the object from the central map.
// Any existing consumers still hold a reference to the object,
// so it won't be garbage collected until they are done.
server.lu.Lock()
delete(server.o, r.URL.Path)
server.lu.Unlock()
slog.Debug("memory object released by producer")
close(mo.fileWrittenCh)
}()
slog.With("mtime", mtime).Debug("checking fastest upstream")
selectedIdx, response, firstChunk, upstreamErr := server.fastestUpstream(r, mtime)
if upstreamErr != nil {
if !errors.Is(upstreamErr, io.EOF) && !errors.Is(upstreamErr, io.ErrUnexpectedEOF) {
slog.With("error", upstreamErr).Warn("failed to select fastest upstream")
err = upstreamErr
return
}
}
if selectedIdx == -1 || response == nil {
slog.Debug("no upstream is selected")
err = errors.New("no suitable upstream found")
return
}
if response.StatusCode == http.StatusNotModified {
slog.With("upstreamIdx", selectedIdx).Debug("not modified. using local version")
os.Chtimes(key, zeroTime, time.Now())
// In this case, we don't have a new file, so we just exit.
// The temp file will be cleaned up by the defer.
return
}
defer response.Body.Close()
slog.With("upstreamIdx", selectedIdx).Debug("found fastest upstream")
mo.mu.Lock()
mo.Headers = response.Header
mo.cond.Broadcast() // Broadcast headers availability
mo.mu.Unlock()
// Write the first chunk that we already downloaded.
if len(firstChunk) > 0 {
written, writeErr := mo.TempFile.Write(firstChunk)
if writeErr != nil {
err = writeErr
return
}
mo.mu.Lock()
mo.Offset += int64(written)
mo.cond.Broadcast()
mo.mu.Unlock()
}
// Download the rest of the file in chunks
buffer := make([]byte, server.Misc.ChunkBytes)
for {
n, readErr := response.Body.Read(buffer)
if n > 0 {
written, writeErr := mo.TempFile.Write(buffer[:n])
if writeErr != nil {
err = writeErr
break
}
mo.mu.Lock()
mo.Offset += int64(written)
mo.cond.Broadcast()
mo.mu.Unlock()
}
if readErr != nil {
if readErr != io.EOF {
err = readErr
}
break
}
}
// After download, if no critical error, rename the temp file to its final destination.
if err == nil {
// Set modification time
mtime := zeroTime
lastModifiedHeader := response.Header.Get("Last-Modified")
if lastModified, lmErr := time.Parse(time.RFC1123, lastModifiedHeader); lmErr == nil {
mtime = lastModified
}
// Close file before Chtimes and Rename
mo.TempFile.Close()
os.Chtimes(tempFileName, zeroTime, mtime)
// Rename the file
if renameErr := os.Rename(tempFileName, key); renameErr != nil {
slog.With("error", renameErr, "from", tempFileName, "to", key).Warn("failed to rename temp file")
err = renameErr
os.Remove(tempFileName) // Attempt to clean up if rename fails
} else {
downloadSucceeded = true
}
} else {
logger := slog.With("upstreamIdx", selectedIdx)
logger.Error("something happened during download. will not cache this response.", "error", err)
}
}(memoryObject)
return fileWrittenCh
}
func (server *Server) fastestUpstream(r *http.Request, lastModified time.Time) (resultIdx int, resultResponse *http.Response, firstChunk []byte, resultErr error) {
returnLock := &sync.Mutex{}
upstreams := len(server.Upstreams)
cancelFuncs := make([]func(), upstreams)
@ -443,6 +536,8 @@ func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (r
resultIdx = -1
var resultFirstChunk []byte
defer close(updateCh)
defer close(notModifiedCh)
defer func() {
@ -457,7 +552,8 @@ func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (r
groups := make(map[int][]int)
for upstreamIdx, upstream := range server.Upstreams {
if _, matched, err := upstream.GetPath(r.URL.Path); err != nil {
return -1, nil, nil, err
resultErr = err
return
} else if !matched {
continue
}
@ -465,7 +561,8 @@ func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (r
priority := 0
for _, priorityGroup := range upstream.PriorityGroups {
if matched, err := regexp.MatchString(priorityGroup.Match, r.URL.Path); err != nil {
return -1, nil, nil, err
resultErr = err
return
} else if matched {
priority = priorityGroup.Priority
break
@ -501,7 +598,7 @@ func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (r
go func() {
defer wg.Done()
response, ch, err := server.tryUpstream(ctx, idx, priority, r, lastModified)
response, chunk, err := server.tryUpstream(ctx, idx, priority, r, lastModified)
if err == context.Canceled { // others returned
logger.Debug("context canceled")
return
@ -521,13 +618,16 @@ func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (r
}
locked := returnLock.TryLock()
if !locked {
if response != nil {
response.Body.Close()
}
return
}
defer returnLock.Unlock()
if response.StatusCode == http.StatusNotModified {
notModifiedOnce.Do(func() {
resultResponse, resultCh, resultErr = response, ch, err
resultResponse, resultErr = response, err
notModifiedCh <- idx
})
logger.Debug("voted not modified")
@ -535,7 +635,7 @@ func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (r
}
updateOnce.Do(func() {
resultResponse, resultCh, resultErr = response, ch, err
resultResponse, resultFirstChunk, resultErr = response, chunk, err
updateCh <- idx
for cancelIdx, cancel := range cancelFuncs {
@ -555,6 +655,7 @@ func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (r
select {
case idx := <-updateCh:
resultIdx = idx
firstChunk = resultFirstChunk
logger.With("upstreamIdx", resultIdx).Debug("upstream selected")
return
default:
@ -569,10 +670,10 @@ func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (r
}
}
return -1, nil, nil, nil
return
}
func (server *Server) tryUpstream(ctx context.Context, upstreamIdx, priority int, r *http.Request, lastModified time.Time) (response *http.Response, chunks chan Chunk, err error) {
func (server *Server) tryUpstream(ctx context.Context, upstreamIdx, priority int, r *http.Request, lastModified time.Time) (response *http.Response, firstChunk []byte, err error) {
upstream := server.Upstreams[upstreamIdx]
newpath, matched, err := upstream.GetPath(r.URL.Path)
@ -618,23 +719,55 @@ func (server *Server) tryUpstream(ctx context.Context, upstreamIdx, priority int
if err != nil {
return nil, nil, err
}
shouldCloseBody := true
defer func() {
if shouldCloseBody && response != nil {
response.Body.Close()
}
}()
if response.StatusCode == http.StatusNotModified {
return response, nil, nil
}
if response.StatusCode >= 400 && response.StatusCode < 500 {
return nil, nil, nil
}
if response.StatusCode < 200 || response.StatusCode >= 500 {
logger.With(
"url", newurl,
"status", response.StatusCode,
).Warn("unexpected status")
return response, nil, fmt.Errorf("unexpected status(url=%v): %v: %v", newurl, response.StatusCode, response)
responseCheckers := upstream.Checkers
if len(responseCheckers) == 0 {
responseCheckers = append(responseCheckers, Checker{})
}
var currentOffset int64
for _, checker := range responseCheckers {
if len(checker.StatusCodes) == 0 {
checker.StatusCodes = append(checker.StatusCodes, http.StatusOK)
}
ch := make(chan Chunk, 1024)
if !slices.Contains(checker.StatusCodes, response.StatusCode) {
return nil, nil, err
}
for _, headerChecker := range checker.Headers {
if headerChecker.Match == nil {
// check header exists
if _, ok := response.Header[headerChecker.Name]; !ok {
logger.Debug("missing header", "header", headerChecker.Name)
return nil, nil, nil
}
} else {
// check header match
value := response.Header.Get(headerChecker.Name)
if matched, err := regexp.MatchString(*headerChecker.Match, value); err != nil {
return nil, nil, err
} else if !matched {
logger.Debug("invalid header value",
"header", headerChecker.Name,
"value", value,
"matcher", *headerChecker.Match,
)
return nil, nil, nil
}
}
}
}
buffer := make([]byte, server.Misc.FirstChunkBytes)
n, err := io.ReadAtLeast(response.Body, buffer, len(buffer))
@ -643,28 +776,12 @@ func (server *Server) tryUpstream(ctx context.Context, upstreamIdx, priority int
if n == 0 {
return response, nil, err
}
}
ch <- Chunk{buffer: buffer[:n]}
go func() {
defer close(ch)
for {
buffer := make([]byte, server.Misc.ChunkBytes)
n, err := io.ReadAtLeast(response.Body, buffer, len(buffer))
if n > 0 {
ch <- Chunk{buffer: buffer[:n]}
currentOffset += int64(n)
}
if response.ContentLength > 0 && currentOffset == response.ContentLength && err == io.EOF || err == io.ErrUnexpectedEOF {
return
}
if err != nil {
ch <- Chunk{error: err}
return
}
if errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, io.EOF) {
err = nil
}
}()
}
return response, ch, nil
shouldCloseBody = false
return response, buffer[:n], err
}

1715
server_test.go Normal file

File diff suppressed because it is too large Load Diff