Compare commits
6 Commits
2025030301
...
master
Author | SHA1 | Date | |
---|---|---|---|
80560f7408 | |||
147659b0da | |||
2a0bd28958 | |||
835045346d | |||
85968bb5cf | |||
e14bcb205b |
125
.clinerules
Normal file
125
.clinerules
Normal file
@ -0,0 +1,125 @@
|
|||||||
|
# Project
|
||||||
|
|
||||||
|
## User Language
|
||||||
|
|
||||||
|
Simplified Chinese
|
||||||
|
|
||||||
|
## Development
|
||||||
|
|
||||||
|
- TDD
|
||||||
|
|
||||||
|
# Cline's Memory Bank
|
||||||
|
|
||||||
|
I am Cline, an expert software engineer with a unique characteristic: my memory resets completely between sessions. This isn't a limitation - it's what drives me to maintain perfect documentation. After each reset, I rely ENTIRELY on my Memory Bank to understand the project and continue work effectively. I MUST read ALL memory bank files at the start of EVERY task - this is not optional.
|
||||||
|
|
||||||
|
## Memory Bank Structure
|
||||||
|
|
||||||
|
The Memory Bank consists of core files and optional context files, all in Markdown format. Files build upon each other in a clear hierarchy:
|
||||||
|
|
||||||
|
flowchart TD
|
||||||
|
PB[projectbrief.md] --> PC[productContext.md]
|
||||||
|
PB --> SP[systemPatterns.md]
|
||||||
|
PB --> TC[techContext.md]
|
||||||
|
|
||||||
|
PC --> AC[activeContext.md]
|
||||||
|
SP --> AC
|
||||||
|
TC --> AC
|
||||||
|
|
||||||
|
AC --> P[progress.md]
|
||||||
|
|
||||||
|
### Core Files (Required)
|
||||||
|
1. `projectbrief.md`
|
||||||
|
- Foundation document that shapes all other files
|
||||||
|
- Created at project start if it doesn't exist
|
||||||
|
- Defines core requirements and goals
|
||||||
|
- Source of truth for project scope
|
||||||
|
|
||||||
|
2. `productContext.md`
|
||||||
|
- Why this project exists
|
||||||
|
- Problems it solves
|
||||||
|
- How it should work
|
||||||
|
- User experience goals
|
||||||
|
|
||||||
|
3. `activeContext.md`
|
||||||
|
- Current work focus
|
||||||
|
- Recent changes
|
||||||
|
- Next steps
|
||||||
|
- Active decisions and considerations
|
||||||
|
- Important patterns and preferences
|
||||||
|
- Learnings and project insights
|
||||||
|
|
||||||
|
4. `systemPatterns.md`
|
||||||
|
- System architecture
|
||||||
|
- Key technical decisions
|
||||||
|
- Design patterns in use
|
||||||
|
- Component relationships
|
||||||
|
- Critical implementation paths
|
||||||
|
|
||||||
|
5. `techContext.md`
|
||||||
|
- Technologies used
|
||||||
|
- Development setup
|
||||||
|
- Technical constraints
|
||||||
|
- Dependencies
|
||||||
|
- Tool usage patterns
|
||||||
|
|
||||||
|
6. `progress.md`
|
||||||
|
- What works
|
||||||
|
- What's left to build
|
||||||
|
- Current status
|
||||||
|
- Known issues
|
||||||
|
- Evolution of project decisions
|
||||||
|
|
||||||
|
### Additional Context
|
||||||
|
Create additional files/folders within memory-bank/ when they help organize:
|
||||||
|
- Complex feature documentation
|
||||||
|
- Integration specifications
|
||||||
|
- API documentation
|
||||||
|
- Testing strategies
|
||||||
|
- Deployment procedures
|
||||||
|
|
||||||
|
## Core Workflows
|
||||||
|
|
||||||
|
### Plan Mode
|
||||||
|
flowchart TD
|
||||||
|
Start[Start] --> ReadFiles[Read Memory Bank]
|
||||||
|
ReadFiles --> CheckFiles{Files Complete?}
|
||||||
|
|
||||||
|
CheckFiles -->|No| Plan[Create Plan]
|
||||||
|
Plan --> Document[Document in Chat]
|
||||||
|
|
||||||
|
CheckFiles -->|Yes| Verify[Verify Context]
|
||||||
|
Verify --> Strategy[Develop Strategy]
|
||||||
|
Strategy --> Present[Present Approach]
|
||||||
|
|
||||||
|
### Act Mode
|
||||||
|
flowchart TD
|
||||||
|
Start[Start] --> Context[Check Memory Bank]
|
||||||
|
Context --> Update[Update Documentation]
|
||||||
|
Update --> Execute[Execute Task]
|
||||||
|
Execute --> Document[Document Changes]
|
||||||
|
|
||||||
|
## Documentation Updates
|
||||||
|
|
||||||
|
Memory Bank updates occur when:
|
||||||
|
1. Discovering new project patterns
|
||||||
|
2. After implementing significant changes
|
||||||
|
3. When user requests with **update memory bank** (MUST review ALL files)
|
||||||
|
4. When context needs clarification
|
||||||
|
|
||||||
|
flowchart TD
|
||||||
|
Start[Update Process]
|
||||||
|
|
||||||
|
subgraph Process
|
||||||
|
P1[Review ALL Files]
|
||||||
|
P2[Document Current State]
|
||||||
|
P3[Clarify Next Steps]
|
||||||
|
P4[Document Insights & Patterns]
|
||||||
|
|
||||||
|
P1 --> P2 --> P3 --> P4
|
||||||
|
end
|
||||||
|
|
||||||
|
Start --> Process
|
||||||
|
|
||||||
|
Note: When triggered by **update memory bank**, I MUST review every memory bank file, even if some don't require updates. Focus particularly on activeContext.md and progress.md as they track current state.
|
||||||
|
|
||||||
|
REMEMBER: After every memory reset, I begin completely fresh. The Memory Bank is my only link to previous work. It must be maintained with precision and clarity, as my effectiveness depends entirely on its accuracy.
|
@ -11,7 +11,7 @@ import (
|
|||||||
|
|
||||||
cacheproxy "git.jeffthecoder.xyz/guochao/cache-proxy"
|
cacheproxy "git.jeffthecoder.xyz/guochao/cache-proxy"
|
||||||
"git.jeffthecoder.xyz/guochao/cache-proxy/pkgs/middleware"
|
"git.jeffthecoder.xyz/guochao/cache-proxy/pkgs/middleware"
|
||||||
"git.jeffthecoder.xyz/guochao/cache-proxy/pkgs/middleware/handlerlog"
|
"git.jeffthecoder.xyz/guochao/cache-proxy/pkgs/middleware/handlerctx"
|
||||||
"git.jeffthecoder.xyz/guochao/cache-proxy/pkgs/middleware/httplog"
|
"git.jeffthecoder.xyz/guochao/cache-proxy/pkgs/middleware/httplog"
|
||||||
"git.jeffthecoder.xyz/guochao/cache-proxy/pkgs/middleware/recover"
|
"git.jeffthecoder.xyz/guochao/cache-proxy/pkgs/middleware/recover"
|
||||||
|
|
||||||
@ -133,7 +133,7 @@ func main() {
|
|||||||
slog.With("addr", ":8881").Info("serving app")
|
slog.With("addr", ":8881").Info("serving app")
|
||||||
if err := http.ListenAndServe(":8881", middleware.Use(mux,
|
if err := http.ListenAndServe(":8881", middleware.Use(mux,
|
||||||
recover.Recover(),
|
recover.Recover(),
|
||||||
handlerlog.FindHandler(mux),
|
handlerctx.FindHandler(mux),
|
||||||
httplog.Log(httplog.Config{LogStart: true, LogFinish: true}),
|
httplog.Log(httplog.Config{LogStart: true, LogFinish: true}),
|
||||||
)); err != nil {
|
)); err != nil {
|
||||||
slog.With("error", err).Error("failed to start server")
|
slog.With("error", err).Error("failed to start server")
|
||||||
|
21
config.go
21
config.go
@ -10,27 +10,38 @@ type UpstreamPriorityGroup struct {
|
|||||||
Priority int `yaml:"priority"`
|
Priority int `yaml:"priority"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type UpstreamMatch struct {
|
type PathTransformation struct {
|
||||||
Match string `yaml:"match"`
|
Match string `yaml:"match"`
|
||||||
Replace string `yaml:"replace"`
|
Replace string `yaml:"replace"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type HeaderChecker struct {
|
||||||
|
Name string `yaml:"name"`
|
||||||
|
Match *string `yaml:"match"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Checker struct {
|
||||||
|
StatusCodes []int `yaml:"status-codes"`
|
||||||
|
Headers []HeaderChecker `yaml:"headers"`
|
||||||
|
}
|
||||||
|
|
||||||
type Upstream struct {
|
type Upstream struct {
|
||||||
Server string `yaml:"server"`
|
Server string `yaml:"server"`
|
||||||
Match UpstreamMatch `yaml:"match"`
|
Path PathTransformation `yaml:"path"`
|
||||||
|
Checkers []Checker `yaml:"checkers"`
|
||||||
AllowedRedirect *string `yaml:"allowed-redirect"`
|
AllowedRedirect *string `yaml:"allowed-redirect"`
|
||||||
PriorityGroups []UpstreamPriorityGroup `yaml:"priority-groups"`
|
PriorityGroups []UpstreamPriorityGroup `yaml:"priority-groups"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (upstream Upstream) GetPath(orig string) (string, bool, error) {
|
func (upstream Upstream) GetPath(orig string) (string, bool, error) {
|
||||||
if upstream.Match.Match == "" || upstream.Match.Replace == "" {
|
if upstream.Path.Match == "" || upstream.Path.Replace == "" {
|
||||||
return orig, true, nil
|
return orig, true, nil
|
||||||
}
|
}
|
||||||
matcher, err := regexp.Compile(upstream.Match.Match)
|
matcher, err := regexp.Compile(upstream.Path.Match)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", false, err
|
return "", false, err
|
||||||
}
|
}
|
||||||
return matcher.ReplaceAllString(orig, upstream.Match.Replace), matcher.MatchString(orig), nil
|
return matcher.ReplaceAllString(orig, upstream.Path.Replace), matcher.MatchString(orig), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type LocalStorage struct {
|
type LocalStorage struct {
|
||||||
|
56
config.yaml
56
config.yaml
@ -1,91 +1,103 @@
|
|||||||
upstream:
|
upstream:
|
||||||
- server: https://mirrors.aliyun.com
|
- server: https://mirrors.aliyun.com
|
||||||
match:
|
path:
|
||||||
match: /(debian|ubuntu|ubuntu-releases|alpine|archlinux|kali|manjaro|msys2|almalinux|rocky|centos|centos-stream|centos-vault|fedora|epel|gnu|pypi)/(.*)
|
match: /(debian|ubuntu|ubuntu-ports|ubuntu-releases|alpine|archlinux|kali|manjaro|msys2|almalinux|rocky|centos|centos-stream|centos-vault|fedora|epel|gnu|pypi)/(.*)
|
||||||
replace: '/$1/$2'
|
replace: '/$1/$2'
|
||||||
- server: https://mirrors.tencent.com
|
- server: https://mirrors.tencent.com
|
||||||
match:
|
path:
|
||||||
match: /(debian|ubuntu|ubuntu-releases|alpine|archlinux|kali|manjaro|msys2|almalinux|rocky|centos|centos-stream|centos-vault|fedora|epel|gnu)/(.*)
|
match: /(debian|ubuntu|ubuntu-ports|ubuntu-releases|alpine|archlinux|kali|manjaro|msys2|almalinux|rocky|centos|centos-stream|centos-vault|fedora|epel|gnu)/(.*)
|
||||||
replace: '/$1/$2'
|
replace: '/$1/$2'
|
||||||
- server: https://mirrors.ustc.edu.cn
|
- server: https://mirrors.ustc.edu.cn
|
||||||
match:
|
path:
|
||||||
match: /(debian|ubuntu|ubuntu-releases|alpine|archlinux|kali|manjaro|msys2|almalinux|rocky|centos|centos-stream|centos-vault|fedora|epel|elrepo|remi|rpmfusion|tailscale|gnu|rust-static|pypi)/(.*)
|
match: /(debian|ubuntu|ubuntu-releases|alpine|archlinux|kali|manjaro|msys2|almalinux|rocky|centos|centos-stream|centos-vault|fedora|epel|elrepo|remi|rpmfusion|tailscale|gnu|rust-static|pypi)/(.*)
|
||||||
replace: '/$1/$2'
|
replace: '/$1/$2'
|
||||||
|
checkers:
|
||||||
|
headers:
|
||||||
|
- name: content-type
|
||||||
|
match: application/octet-stream
|
||||||
- server: https://packages.microsoft.com/repos/code
|
- server: https://packages.microsoft.com/repos/code
|
||||||
match:
|
path:
|
||||||
match: /microsoft-code(.*)
|
match: /microsoft-code(.*)
|
||||||
replace: '$1'
|
replace: '$1'
|
||||||
- server: https://archive.ubuntu.com
|
- server: https://archive.ubuntu.com
|
||||||
match:
|
path:
|
||||||
match: /ubuntu/(.*)
|
match: /ubuntu/(.*)
|
||||||
replace: /$1
|
replace: /$1
|
||||||
|
- server: https://ports.ubuntu.com
|
||||||
|
path:
|
||||||
|
match: /ubuntu-ports/(.*)
|
||||||
|
replace: /$1
|
||||||
# priority-groups:
|
# priority-groups:
|
||||||
# - match: /dists/.*
|
# - match: /dists/.*
|
||||||
# priority: 10
|
# priority: 10
|
||||||
- server: https://releases.ubuntu.com/
|
- server: https://releases.ubuntu.com/
|
||||||
match:
|
path:
|
||||||
match: /ubuntu-releases/(.*)
|
match: /ubuntu-releases/(.*)
|
||||||
replace: /$1
|
replace: /$1
|
||||||
- server: https://apt.releases.hashicorp.com
|
- server: https://apt.releases.hashicorp.com
|
||||||
match:
|
path:
|
||||||
match: /hashicorp-deb/(.*)
|
match: /hashicorp-deb/(.*)
|
||||||
replace: /$1
|
replace: /$1
|
||||||
- server: https://rpm.releases.hashicorp.com
|
- server: https://rpm.releases.hashicorp.com
|
||||||
match:
|
path:
|
||||||
match: /hashicorp-rpm/(.*)
|
match: /hashicorp-rpm/(.*)
|
||||||
replace: /$1
|
replace: /$1
|
||||||
- server: https://dl.google.com/linux/chrome
|
- server: https://dl.google.com/linux/chrome
|
||||||
match:
|
path:
|
||||||
match: /google-chrome/(.*)
|
match: /google-chrome/(.*)
|
||||||
replace: /$1
|
replace: /$1
|
||||||
- server: https://pkgs.tailscale.com/stable
|
- server: https://pkgs.tailscale.com/stable
|
||||||
match:
|
path:
|
||||||
match: /tailscale/(.*)
|
match: /tailscale/(.*)
|
||||||
replace: /$1
|
replace: /$1
|
||||||
- server: http://update.cs2c.com.cn:8080
|
- server: http://update.cs2c.com.cn:8080
|
||||||
match:
|
path:
|
||||||
match: /kylinlinux/(.*)
|
match: /kylinlinux/(.*)
|
||||||
replace: /$1
|
replace: /$1
|
||||||
- server: https://cdn.jsdelivr.net
|
- server: https://cdn.jsdelivr.net
|
||||||
match:
|
path:
|
||||||
match: /jsdelivr/(.*)
|
match: /jsdelivr/(.*)
|
||||||
replace: /$1
|
replace: /$1
|
||||||
- server: https://dl-cdn.alpinelinux.org
|
- server: https://dl-cdn.alpinelinux.org
|
||||||
match:
|
path:
|
||||||
match: /alpine/(.*)
|
match: /alpine/(.*)
|
||||||
replace: /$1
|
replace: /$1
|
||||||
- server: https://repo.almalinux.org
|
- server: https://repo.almalinux.org
|
||||||
match:
|
path:
|
||||||
match: /almalinux/(.*)
|
match: /almalinux/(.*)
|
||||||
replace: /almalinux/$1
|
replace: /almalinux/$1
|
||||||
- server: https://dl.rockylinux.org/pub/rocky
|
- server: https://dl.rockylinux.org/pub/rocky
|
||||||
match:
|
path:
|
||||||
match: /rocky/(.*)
|
match: /rocky/(.*)
|
||||||
replace: /$1
|
replace: /$1
|
||||||
- server: https://dl.fedoraproject.org/pub/epel
|
- server: https://dl.fedoraproject.org/pub/epel
|
||||||
match:
|
path:
|
||||||
match: /epel/(.*)
|
match: /epel/(.*)
|
||||||
replace: /$1
|
replace: /$1
|
||||||
- server: https://deb.debian.org
|
- server: https://deb.debian.org
|
||||||
match:
|
path:
|
||||||
match: /(debian|debian-security)/(.*)
|
match: /(debian|debian-security)/(.*)
|
||||||
replace: /$1/$2
|
replace: /$1/$2
|
||||||
# priority-groups:
|
# priority-groups:
|
||||||
# - match: /dists/.*
|
# - match: /dists/.*
|
||||||
# priority: 10
|
# priority: 10
|
||||||
- server: https://static.rust-lang.org
|
- server: https://static.rust-lang.org
|
||||||
match:
|
path:
|
||||||
match: /rust-static/(.*)
|
match: /rust-static/(.*)
|
||||||
replace: /$1
|
replace: /$1
|
||||||
- server: https://storage.flutter-io.cn
|
- server: https://storage.flutter-io.cn
|
||||||
match:
|
path:
|
||||||
match: /flutter-storage/(.*)
|
match: /flutter-storage/(.*)
|
||||||
replace: /$1
|
replace: /$1
|
||||||
- server: https://cdn-mirror.chaotic.cx/chaotic-aur
|
- server: https://cdn-mirror.chaotic.cx/chaotic-aur
|
||||||
match:
|
path:
|
||||||
match: /chaotic-aur/(.*)
|
match: /chaotic-aur/(.*)
|
||||||
replace: /$1
|
replace: /$1
|
||||||
allowed-redirect: "^https?://cf-builds.garudalinux.org/.*/chaotic-aur/.*$"
|
allowed-redirect: "^https?://cf-builds.garudalinux.org/.*/chaotic-aur/.*$"
|
||||||
|
- server: http://download.zfsonlinux.org
|
||||||
|
path:
|
||||||
|
match: /openzfs/(.*)
|
||||||
|
replace: /$1
|
||||||
|
|
||||||
misc:
|
misc:
|
||||||
first-chunk-bytes: 31457280 ## 1024*1024*30, all upstreams with 200 response will wait for the first chunks to select a upstream by bandwidth, instead of by latency. defaults to 50M
|
first-chunk-bytes: 31457280 ## 1024*1024*30, all upstreams with 200 response will wait for the first chunks to select a upstream by bandwidth, instead of by latency. defaults to 50M
|
||||||
|
33
memory-bank/activeContext.md
Normal file
33
memory-bank/activeContext.md
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
# 当前工作重点
|
||||||
|
|
||||||
|
当前没有正在进行的紧急任务。项目处于稳定状态,可以根据 `progress.md` 中的待办事项列表来规划接下来的工作。
|
||||||
|
|
||||||
|
## 近期变更
|
||||||
|
|
||||||
|
- **实现流式请求的阶段二优化 (基于临时文件)**:
|
||||||
|
- **问题**: `StreamObject` 使用内存中的 `bytes.Buffer` 来缓存下载内容,在处理大文件时会导致内存占用过高。
|
||||||
|
- **解决方案**: 将 `StreamObject` 的 `Buffer` 替换为 `*os.File`,将下载的字节流直接写入临时文件。
|
||||||
|
- **实现细节**:
|
||||||
|
- `startOrJoinStream` (生产者) 现在创建临时文件并将下载内容写入其中。下载成功后,该临时文件会被重命名为最终的缓存文件。
|
||||||
|
- `serveRangedRequest` (消费者) 为了解决文件句柄的生命周期竞态问题,采用了 `syscall.Dup()` 来复制生产者的文件描述符。每个消费者都使用自己独立的、复制出来的文件句柄来读取数据,从而与生产者的文件句柄生命周期解耦。
|
||||||
|
- **结果**: 解决了大文件的内存占用问题,并通过了所有测试,包括一个专门为验证并发安全性和竞态条件而设计的新测试。
|
||||||
|
|
||||||
|
## 后续步骤
|
||||||
|
|
||||||
|
下一个合乎逻辑的步骤是处理 `progress.md` 中列出的待办事项,例如:
|
||||||
|
|
||||||
|
1. **功能增强**:
|
||||||
|
- **多后端支持**: 增加对 S3、Redis 等其他存储后端的支持。
|
||||||
|
- **高级负载均衡**: 实现更复杂的负载均衡策略。
|
||||||
|
- **监控**: 集成 Prometheus 指标。
|
||||||
|
2. **代码重构**:
|
||||||
|
- 对 `server.go` 中的复杂函数进行重构,提高可读性。
|
||||||
|
|
||||||
|
在开始新任务之前,需要与您确认下一个工作重点。
|
||||||
|
|
||||||
|
## 重要模式与偏好
|
||||||
|
|
||||||
|
- **代码风格**: 遵循 Go 社区的最佳实践,代码清晰、简洁,并有适当的注释。
|
||||||
|
- **并发编程**: 倾向于使用 Go 的原生并发原语(goroutine 和 channel)来解决并发问题。
|
||||||
|
- **配置驱动**: 核心逻辑应与配置分离,使得项目可以通过修改配置文件来适应不同的使用场景,而无需修改代码。
|
||||||
|
- **文档驱动**: 所有重要的设计决策、架构变更和功能实现都应在 Memory Bank 中有相应的记录。
|
26
memory-bank/productContext.md
Normal file
26
memory-bank/productContext.md
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
# 产品背景
|
||||||
|
|
||||||
|
在许多场景下,用户需要从软件镜像站、文件服务器或其他资源站点下载文件。然而,由于网络波动、服务器负载或地理位置等原因,单个服务器的下载速度和稳定性可能无法得到保证。
|
||||||
|
|
||||||
|
## 解决的问题
|
||||||
|
|
||||||
|
`cache-proxy` 旨在解决以下问题:
|
||||||
|
|
||||||
|
1. **单点故障**: 当用户依赖的唯一镜像服务器宕机或无法访问时,下载会中断。
|
||||||
|
2. **网络速度不佳**: 用户可能没有连接到最快的可用服务器,导致下载时间过长。
|
||||||
|
3. **重复下载**: 多个用户或同一个用户多次下载相同的文件时,会产生不必要的网络流量,增加了服务器的负载。
|
||||||
|
|
||||||
|
## 工作原理
|
||||||
|
|
||||||
|
`cache-proxy` 通过在用户和多个上游服务器之间引入一个代理层来解决这些问题。
|
||||||
|
|
||||||
|
- 当收到一个文件请求时,它会并发地向所有配置的上游服务器请求该文件。
|
||||||
|
- 它会选择第一个返回成功响应的上游服务器,并将数据流式传输给用户。
|
||||||
|
- 同时,它会将文件缓存到本地磁盘。
|
||||||
|
- 对于后续的相同文件请求,如果缓存未过期,它将直接从本地提供文件,极大地提高了响应速度并减少了网络流量。
|
||||||
|
|
||||||
|
## 用户体验目标
|
||||||
|
|
||||||
|
- **无缝集成**: 用户只需将下载地址指向 `cache-proxy` 即可,无需关心后端的复杂性。
|
||||||
|
- **更快的下载**: 通过竞争机制选择最快的源,用户总能获得最佳的下载体验。
|
||||||
|
- **更高的可用性**: 即使部分上游服务器出现问题,只要有一个可用,服务就不会中断。
|
44
memory-bank/progress.md
Normal file
44
memory-bank/progress.md
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
# 项目进展
|
||||||
|
|
||||||
|
这是一个已完成的 `cache-proxy` 项目的初始状态。核心功能已经实现并可以工作。
|
||||||
|
|
||||||
|
## 已完成的功能
|
||||||
|
|
||||||
|
- **核心代理逻辑**:
|
||||||
|
- 从 `config.yaml` 加载配置。
|
||||||
|
- 启动 HTTP 服务器并监听请求。
|
||||||
|
- 根据请求路径检查本地缓存。
|
||||||
|
- **并发上游请求**:
|
||||||
|
- 能够并发地向上游服务器发起请求。
|
||||||
|
- 能够正确地选择最快响应的服务器。
|
||||||
|
- 能够在选择一个服务器后取消其他请求。
|
||||||
|
- **缓存管理**:
|
||||||
|
- 能够将下载的文件缓存到本地磁盘。
|
||||||
|
- 支持基于时间的缓存刷新策略。
|
||||||
|
- 支持通过 `If-Modified-Since` 请求头来减少不必要的数据传输。
|
||||||
|
- **并发请求处理**:
|
||||||
|
- 能够正确处理对同一文件的多个并发请求,确保只下载一次。
|
||||||
|
- **加速下载**:
|
||||||
|
- 支持通过 `X-Sendfile` / `X-Accel-Redirect` 头将文件发送委托给前端服务器(如 Nginx)。
|
||||||
|
- **全面的测试覆盖**:
|
||||||
|
- 完成了 `server_test.go` 的实现,为所有核心功能提供了单元测试和集成测试。
|
||||||
|
- 测试覆盖了正常流程、边缘情况(如超时、上游失败)和安全(如路径穿越)等方面。
|
||||||
|
- 对测试代码和注释进行了审查,确保其准确性和一致性。
|
||||||
|
- 所有测试均已通过,验证了现有代码的健壮性。
|
||||||
|
|
||||||
|
## 待办事项
|
||||||
|
|
||||||
|
- **功能增强**:
|
||||||
|
- **实现流式范围请求 (Done)**: 重构了 Ranged Request 处理流程,实现了边下载边响应。
|
||||||
|
- **阶段一 (已完成)**: 使用 `sync.Cond` 和内存 `bytes.Buffer` 实现。
|
||||||
|
- **性能优化 (已完成)**: 移除了 `StreamObject` 清理逻辑中的固定延迟,改为依赖 Go 的 GC 机制,显著提升了并发请求下的测试性能。
|
||||||
|
- **阶段二 (已完成)**: 使用临时文件和 `syscall.Dup()` 系统调用优化,解决了大文件内存占用问题。
|
||||||
|
- 目前只支持本地文件存储,未来可以考虑增加对其他存储后端(如 S3、Redis)的支持。
|
||||||
|
- 增加更复杂的负载均衡策略,而不仅仅是“选择最快”。
|
||||||
|
- 增加更详细的监控和指标(如 Prometheus metrics)。
|
||||||
|
- **代码优化**:
|
||||||
|
- 在完成流式请求功能后,对 `server.go` 中的一些复杂函数(如 `streamOnline`)进行重构,以提高可读性和可维护性。
|
||||||
|
|
||||||
|
## 已知问题
|
||||||
|
|
||||||
|
- 在当前阶段,尚未发现明显的 bug。代码结构清晰,逻辑完整。
|
10
memory-bank/projectbrief.md
Normal file
10
memory-bank/projectbrief.md
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
# 项目简介:Cache-Proxy
|
||||||
|
|
||||||
|
这是一个使用 Go 语言编写的高性能缓存代理服务器。其核心功能是接收客户端的请求,并将其转发到多个配置好的上游(Upstream)镜像服务器。它会并发地向上游服务器发起请求,并选择最快返回响应的服务器,将其结果返回给客户端,同时将结果缓存到本地,以便后续请求能够更快地得到响应。
|
||||||
|
|
||||||
|
## 核心需求
|
||||||
|
|
||||||
|
- **性能**: 必须能够快速地处理并发请求,并有效地利用缓存。
|
||||||
|
- **可靠性**: 当某个上游服务器不可用时,能够自动切换到其他可用的服务器。
|
||||||
|
- **可配置性**: 用户可以通过配置文件轻松地添加、删除和配置上游服务器、缓存策略以及其他行为。
|
||||||
|
- **透明性**: 对客户端来说,代理应该是透明的,客户端只需像访问普通服务器一样访问代理即可。
|
62
memory-bank/systemPatterns.md
Normal file
62
memory-bank/systemPatterns.md
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
# 系统架构与设计模式
|
||||||
|
|
||||||
|
`cache-proxy` 的系统设计遵循了几个关键的模式和原则,以实现其高性能和高可用性的目标。
|
||||||
|
|
||||||
|
## 核心架构
|
||||||
|
|
||||||
|
系统可以分为三个主要部分:
|
||||||
|
|
||||||
|
1. **HTTP 服务器层**: 负责接收客户端请求,并使用中间件进行日志记录、错误恢复等通用处理。
|
||||||
|
2. **缓存处理层**: 检查请求的文件是否存在于本地缓存中,并根据缓存策略决定是直接提供缓存文件还是向上游请求。
|
||||||
|
3. **上游选择与下载层**: 这是系统的核心,负责并发地从多个上游服务器获取数据,并管理下载过程。
|
||||||
|
|
||||||
|
## 关键设计模式
|
||||||
|
|
||||||
|
### 1. 竞争式请求 (Racing Requests)
|
||||||
|
|
||||||
|
这是实现“选择最快”功能的核心模式。
|
||||||
|
|
||||||
|
- `fastesUpstream` 函数为每个上游服务器创建一个 goroutine。
|
||||||
|
- 所有 goroutine 并发地向上游服务器发送请求。
|
||||||
|
- 使用 `sync.Once` 来确保只有一个 goroutine 能够“胜出”并成为最终的数据源。
|
||||||
|
- 一旦有 goroutine 胜出,它会调用 `context.CancelFunc` 来通知所有其他 goroutine 停止工作,从而避免不必要的资源消耗。
|
||||||
|
|
||||||
|
### 2. 生产者-消费者模式 (Producer-Consumer)
|
||||||
|
|
||||||
|
在文件下载和流式响应(Ranged Request)中,使用了生产者-消费者模式。
|
||||||
|
|
||||||
|
- **生产者 (`startOrJoinStream`)**:
|
||||||
|
- 为每个首次请求的文件启动一个 goroutine。
|
||||||
|
- 负责从最快的上游服务器下载文件内容。
|
||||||
|
- 将内容**直接写入一个临时文件** (`*os.File`) 中,而不是写入内存缓冲区。
|
||||||
|
- 通过 `sync.Cond` 广播下载进度(已写入的字节数 `Offset`)。
|
||||||
|
- 下载成功后,将临时文件重命名为最终的缓存文件。
|
||||||
|
- **消费者 (`serveRangedRequest`)**:
|
||||||
|
- 当收到一个范围请求时,它会找到或等待对应的 `StreamObject`。
|
||||||
|
- 为了安全地并发读取正在被生产者写入的文件,消费者会使用 `syscall.Dup()` **复制临时文件的文件描述符**。
|
||||||
|
- 每个消费者都通过自己独立的、复制出来的文件句柄 (`*os.File`) 读取所需范围的数据,这避免了与生产者或其他消费者发生文件句柄状态的冲突。
|
||||||
|
- 消费者根据生产者的 `Offset` 进度和 `sync.Cond` 信号来等待其请求范围的数据变为可用。
|
||||||
|
|
||||||
|
### 3. 并发访问控制与对象生命周期管理
|
||||||
|
|
||||||
|
为了处理多个客户端同时请求同一个文件的情况,系统使用了 `sync.Mutex` 和一个 `map[string]*StreamObject`。
|
||||||
|
|
||||||
|
- 当第一个对某文件的请求(我们称之为“消费者”)到达时,它会获得一个锁,并创建一个 `StreamObject` 来代表这个正在进行的下载任务,然后启动一个“生产者”goroutine 来执行下载。
|
||||||
|
- 后续对同一文件的请求会发现 `StreamObject` 已存在于 map 中,它们不会再次启动下载,而是会共享这个对象。
|
||||||
|
|
||||||
|
**`StreamObject` 生命周期管理 (基于 GC)**
|
||||||
|
|
||||||
|
我们采用了一种简洁且高效的、依赖 Go 语言垃圾回收(GC)的模式来管理 `StreamObject` 的生命周期:
|
||||||
|
|
||||||
|
1. **生产者负责移除**: 下载 goroutine(生产者)在完成其任务(无论成功或失败)后,其唯一的职责就是将 `StreamObject` 从全局的 `map[string]*StreamObject` 中移除。
|
||||||
|
2. **消费者持有引用**: 与此同时,所有正在处理该文件请求的 HTTP Handler(消费者)仍然持有对该 `StreamObject` 的引用。
|
||||||
|
3. **GC 自动回收**: 因为消费者们还持有引用,Go 的 GC 不会回收这个对象。只有当最后一个消费者处理完请求、其函数栈帧销毁后,对 `StreamObject` 的最后一个引用才会消失。此时,GC 会在下一次运行时自动回收该对象的内存。
|
||||||
|
|
||||||
|
这个模式避免了复杂的引用计数或定时器,代码更简洁,并且从根本上解决了之前因固定延迟导致的性能问题。
|
||||||
|
|
||||||
|
### 4. 中间件 (Middleware)
|
||||||
|
|
||||||
|
项目使用了 Go 的标准 `http.Handler` 接口和中间件模式来构建请求处理链。
|
||||||
|
|
||||||
|
- `pkgs/middleware` 目录中定义了可重用的中间件,如 `httplog` 和 `recover`。
|
||||||
|
- 这种模式使得在不修改核心业务逻辑的情况下,可以轻松地添加或删除日志、认证、错误处理等功能。
|
26
memory-bank/techContext.md
Normal file
26
memory-bank/techContext.md
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
# 技术栈与开发环境
|
||||||
|
|
||||||
|
## 核心技术
|
||||||
|
|
||||||
|
- **语言**: Go (Golang)
|
||||||
|
- **主要依赖**:
|
||||||
|
- `gopkg.in/yaml.v3`: 用于解析 `config.yaml` 配置文件。
|
||||||
|
- `log/slog`: Go 1.21+ 内置的结构化日志库。
|
||||||
|
- `net/http`: 用于构建 HTTP 服务器和客户端。
|
||||||
|
- `github.com/getsentry/sentry-go`: (可选)用于错误追踪。
|
||||||
|
|
||||||
|
## 开发与构建
|
||||||
|
|
||||||
|
- **依赖管理**: 使用 Go Modules (`go.mod`, `go.sum`) 进行依赖管理。
|
||||||
|
- **配置文件**: 项目的行为由 `config.yaml` 文件驱动。
|
||||||
|
- **容器化**:
|
||||||
|
- `Dockerfile`: 用于构建项目的 Docker 镜像。
|
||||||
|
- `compose.yaml`: 用于在开发环境中启动服务。
|
||||||
|
- `compose.release.yaml`: 用于在生产环境中部署服务。
|
||||||
|
|
||||||
|
## 关键技术点
|
||||||
|
|
||||||
|
- **并发模型**: 大量使用 goroutine 和 channel 来实现高并发。特别是 `fastesUpstream` 函数中的并发请求模式。
|
||||||
|
- **流式处理**: 通过 `io.Reader` 和 `io.Writer` 接口,数据以流的形式被处理,从未完全加载到内存中,这对于处理大文件至关重要。
|
||||||
|
- **错误处理**: 在 Go 的标准错误处理之上,项目在某些关键路径(如 `fastesUpstream`)中使用了 `context` 包来处理超时和取消操作。
|
||||||
|
- **结构化日志**: 使用 `slog` 库,可以输出结构化的、易于机器解析的日志,方便调试和监控。
|
@ -1,4 +1,4 @@
|
|||||||
package handlerlog
|
package handlerctx
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
@ -6,7 +6,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.jeffthecoder.xyz/guochao/cache-proxy/pkgs/middleware/handlerlog"
|
"git.jeffthecoder.xyz/guochao/cache-proxy/pkgs/middleware/handlerctx"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ctxKey int
|
type ctxKey int
|
||||||
@ -43,7 +43,7 @@ func Log(config Config) func(http.Handler) http.Handler {
|
|||||||
"path", r.URL.Path,
|
"path", r.URL.Path,
|
||||||
}
|
}
|
||||||
|
|
||||||
if pattern, handlerFound := handlerlog.Pattern(r); handlerFound {
|
if pattern, handlerFound := handlerctx.Pattern(r); handlerFound {
|
||||||
args = append(args, "handler", pattern)
|
args = append(args, "handler", pattern)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
663
server.go
663
server.go
@ -1,20 +1,19 @@
|
|||||||
package cacheproxy
|
package cacheproxy
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"io"
|
"io"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"regexp"
|
"regexp"
|
||||||
"slices"
|
"slices"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -26,12 +25,17 @@ const (
|
|||||||
|
|
||||||
var zeroTime time.Time
|
var zeroTime time.Time
|
||||||
|
|
||||||
|
var preclosedChan = make(chan struct{})
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
close(preclosedChan)
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
httpClient = http.Client{
|
httpClient = http.Client{
|
||||||
// check allowed redirect
|
// check allowed redirect
|
||||||
CheckRedirect: func(req *http.Request, via []*http.Request) error {
|
CheckRedirect: func(req *http.Request, via []*http.Request) error {
|
||||||
lastRequest := via[len(via)-1]
|
if allowedRedirect, ok := req.Context().Value(reqCtxAllowedRedirect).(string); ok {
|
||||||
if allowedRedirect, ok := lastRequest.Context().Value(reqCtxAllowedRedirect).(string); ok {
|
|
||||||
if matched, err := regexp.MatchString(allowedRedirect, req.URL.String()); err != nil {
|
if matched, err := regexp.MatchString(allowedRedirect, req.URL.String()); err != nil {
|
||||||
return err
|
return err
|
||||||
} else if !matched {
|
} else if !matched {
|
||||||
@ -46,43 +50,15 @@ var (
|
|||||||
|
|
||||||
type StreamObject struct {
|
type StreamObject struct {
|
||||||
Headers http.Header
|
Headers http.Header
|
||||||
Buffer *bytes.Buffer
|
TempFile *os.File // The temporary file holding the download content.
|
||||||
Offset int
|
Offset int64 // The number of bytes written to TempFile.
|
||||||
|
Done bool
|
||||||
|
Error error
|
||||||
|
|
||||||
ctx context.Context
|
mu *sync.Mutex
|
||||||
wg *sync.WaitGroup
|
cond *sync.Cond
|
||||||
}
|
|
||||||
|
|
||||||
func (memoryObject *StreamObject) StreamTo(w io.Writer, wg *sync.WaitGroup) error {
|
fileWrittenCh chan struct{} // Closed when the file is fully written and renamed.
|
||||||
defer wg.Done()
|
|
||||||
offset := 0
|
|
||||||
if w == nil {
|
|
||||||
w = io.Discard
|
|
||||||
}
|
|
||||||
OUTER:
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-memoryObject.ctx.Done():
|
|
||||||
break OUTER
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
newOffset := memoryObject.Offset
|
|
||||||
if newOffset == offset {
|
|
||||||
time.Sleep(time.Millisecond)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
bytes := memoryObject.Buffer.Bytes()[offset:newOffset]
|
|
||||||
written, err := w.Write(bytes)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
offset += written
|
|
||||||
}
|
|
||||||
time.Sleep(time.Millisecond)
|
|
||||||
_, err := w.Write(memoryObject.Buffer.Bytes()[offset:])
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type Server struct {
|
type Server struct {
|
||||||
@ -101,11 +77,6 @@ func NewServer(config Config) *Server {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type Chunk struct {
|
|
||||||
buffer []byte
|
|
||||||
error error
|
|
||||||
}
|
|
||||||
|
|
||||||
func (server *Server) serveFile(w http.ResponseWriter, r *http.Request, path string) {
|
func (server *Server) serveFile(w http.ResponseWriter, r *http.Request, path string) {
|
||||||
if location := r.Header.Get(server.Storage.Local.Accel.EnableByHeader); server.Storage.Local.Accel.EnableByHeader != "" && location != "" {
|
if location := r.Header.Get(server.Storage.Local.Accel.EnableByHeader); server.Storage.Local.Accel.EnableByHeader != "" && location != "" {
|
||||||
relPath, err := filepath.Rel(server.Storage.Local.Path, path)
|
relPath, err := filepath.Rel(server.Storage.Local.Path, path)
|
||||||
@ -143,27 +114,30 @@ func (server *Server) HandleRequestWithCache(w http.ResponseWriter, r *http.Requ
|
|||||||
slog.With("status", localStatus, "mtime", mtime, "error", err, "key", fullpath).Debug("local status checked")
|
slog.With("status", localStatus, "mtime", mtime, "error", err, "key", fullpath).Debug("local status checked")
|
||||||
if os.IsPermission(err) {
|
if os.IsPermission(err) {
|
||||||
http.Error(w, err.Error(), http.StatusForbidden)
|
http.Error(w, err.Error(), http.StatusForbidden)
|
||||||
|
return
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
} else if localStatus != localNotExists {
|
return
|
||||||
if localStatus == localExistsButNeedHead {
|
}
|
||||||
|
|
||||||
|
if localStatus == localExists {
|
||||||
|
server.serveFile(w, r, fullpath)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// localExistsButNeedHead or localNotExists
|
||||||
|
// Both need to go online.
|
||||||
if ranged {
|
if ranged {
|
||||||
server.streamOnline(nil, r, mtime, fullpath)
|
server.serveRangedRequest(w, r, fullpath, mtime)
|
||||||
server.serveFile(w, r, fullpath)
|
|
||||||
} else {
|
} else {
|
||||||
server.streamOnline(w, r, mtime, fullpath)
|
// For full requests, we wait for the download to complete and then serve the file.
|
||||||
|
// This maintains the original behavior for now.
|
||||||
|
ch := server.startOrJoinStream(r, mtime, fullpath)
|
||||||
|
if ch != nil {
|
||||||
|
<-ch
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
server.serveFile(w, r, fullpath)
|
server.serveFile(w, r, fullpath)
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
if ranged {
|
|
||||||
server.streamOnline(nil, r, mtime, fullpath)
|
|
||||||
server.serveFile(w, r, fullpath)
|
|
||||||
} else {
|
|
||||||
server.streamOnline(w, r, mtime, fullpath)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type localStatus int
|
type localStatus int
|
||||||
@ -210,229 +184,348 @@ func (server *Server) checkLocal(w http.ResponseWriter, _ *http.Request, key str
|
|||||||
return localNotExists, zeroTime, nil
|
return localNotExists, zeroTime, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (server *Server) streamOnline(w http.ResponseWriter, r *http.Request, mtime time.Time, key string) {
|
func (server *Server) serveRangedRequest(w http.ResponseWriter, r *http.Request, key string, mtime time.Time) {
|
||||||
memoryObject, exists := server.o[r.URL.Path]
|
|
||||||
locked := false
|
|
||||||
defer func() {
|
|
||||||
if locked {
|
|
||||||
server.lu.Unlock()
|
|
||||||
locked = false
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
if !exists {
|
|
||||||
server.lu.Lock()
|
server.lu.Lock()
|
||||||
locked = true
|
memoryObject, exists := server.o[r.URL.Path]
|
||||||
|
if !exists {
|
||||||
memoryObject, exists = server.o[r.URL.Path]
|
|
||||||
}
|
|
||||||
if exists {
|
|
||||||
if locked {
|
|
||||||
server.lu.Unlock()
|
server.lu.Unlock()
|
||||||
locked = false
|
// This is the first request for this file, so we start the download.
|
||||||
|
server.startOrJoinStream(r, mtime, key)
|
||||||
|
|
||||||
|
// Re-acquire the lock to get the newly created stream object.
|
||||||
|
server.lu.Lock()
|
||||||
|
memoryObject = server.o[r.URL.Path]
|
||||||
|
if memoryObject == nil {
|
||||||
|
server.lu.Unlock()
|
||||||
|
// This can happen if the upstream fails very quickly.
|
||||||
|
http.Error(w, "Failed to start download stream", http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
server.lu.Unlock()
|
||||||
|
|
||||||
|
// At this point, we have a memoryObject, and a producer goroutine is downloading the file.
|
||||||
|
// Now we implement the consumer logic.
|
||||||
|
|
||||||
|
// Duplicate the file descriptor from the producer's temp file. This creates a new,
|
||||||
|
// independent file descriptor that points to the same underlying file description.
|
||||||
|
// This is more efficient and robust than opening the file by path.
|
||||||
|
fd, err := syscall.Dup(int(memoryObject.TempFile.Fd()))
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, "Failed to duplicate file descriptor", http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// We create a new *os.File from the duplicated descriptor. The consumer is now
|
||||||
|
// responsible for closing this new file.
|
||||||
|
consumerFile := os.NewFile(uintptr(fd), memoryObject.TempFile.Name())
|
||||||
|
if consumerFile == nil {
|
||||||
|
syscall.Close(fd) // Clean up if NewFile fails
|
||||||
|
http.Error(w, "Failed to create file from duplicated descriptor", http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer consumerFile.Close()
|
||||||
|
|
||||||
|
rangeHeader := r.Header.Get("Range")
|
||||||
|
if rangeHeader == "" {
|
||||||
|
// This should not happen if called from HandleRequestWithCache, but as a safeguard:
|
||||||
|
http.Error(w, "Range header is required", http.StatusBadRequest)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if w != nil {
|
// Parse the Range header. We only support a single range like "bytes=start-end".
|
||||||
memoryObject.wg.Add(1)
|
var start, end int64
|
||||||
for k := range memoryObject.Headers {
|
parts := strings.Split(strings.TrimPrefix(rangeHeader, "bytes="), "-")
|
||||||
v := memoryObject.Headers.Get(k)
|
if len(parts) != 2 {
|
||||||
w.Header().Set(k, v)
|
http.Error(w, "Invalid Range header", http.StatusBadRequest)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
start, err = strconv.ParseInt(parts[0], 10, 64)
|
||||||
if err := memoryObject.StreamTo(w, memoryObject.wg); err != nil {
|
if err != nil {
|
||||||
slog.With("error", err).Warn("failed to stream response with existing memory object")
|
http.Error(w, "Invalid Range header", http.StatusBadRequest)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
if parts[1] != "" {
|
||||||
|
end, err = strconv.ParseInt(parts[1], 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, "Invalid Range header", http.StatusBadRequest)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
slog.With("mtime", mtime).Debug("checking fastest upstream")
|
// An empty end means "to the end of the file". We don't know the full size yet.
|
||||||
selectedIdx, response, chunks, err := server.fastesUpstream(r, mtime)
|
// We'll have to handle this dynamically.
|
||||||
if chunks == nil && mtime != zeroTime {
|
end = -1 // Sentinel value
|
||||||
slog.With("upstreamIdx", selectedIdx, "key", key).Debug("not modified. using local version")
|
}
|
||||||
if w != nil {
|
|
||||||
server.serveFile(w, r, key)
|
memoryObject.mu.Lock()
|
||||||
|
defer memoryObject.mu.Unlock()
|
||||||
|
|
||||||
|
// Wait until we have the headers.
|
||||||
|
for memoryObject.Headers == nil && !memoryObject.Done {
|
||||||
|
memoryObject.cond.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
if memoryObject.Error != nil {
|
||||||
|
http.Error(w, memoryObject.Error.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
contentLengthStr := memoryObject.Headers.Get("Content-Length")
|
||||||
|
totalSize, _ := strconv.ParseInt(contentLengthStr, 10, 64)
|
||||||
|
if end == -1 || end >= totalSize {
|
||||||
|
end = totalSize - 1
|
||||||
|
}
|
||||||
|
|
||||||
|
if start >= totalSize || start > end {
|
||||||
|
http.Error(w, "Range not satisfiable", http.StatusRequestedRangeNotSatisfiable)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var bytesSent int64
|
||||||
|
bytesToSend := end - start + 1
|
||||||
|
headersWritten := false
|
||||||
|
|
||||||
|
for bytesSent < bytesToSend && memoryObject.Error == nil {
|
||||||
|
// Calculate what we need.
|
||||||
|
neededStart := start + bytesSent
|
||||||
|
|
||||||
|
// Wait for the data to be available.
|
||||||
|
for memoryObject.Offset <= neededStart && !memoryObject.Done {
|
||||||
|
memoryObject.cond.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for error AFTER waiting. This is the critical fix.
|
||||||
|
if memoryObject.Error != nil {
|
||||||
|
// If headers haven't been written, we can send a 500 error.
|
||||||
|
// If they have, it's too late, the connection will just be closed.
|
||||||
|
if !headersWritten {
|
||||||
|
http.Error(w, memoryObject.Error.Error(), http.StatusInternalServerError)
|
||||||
|
}
|
||||||
|
return // Use return instead of break to exit immediately.
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we are here, we have some data to send. Write headers if we haven't already.
|
||||||
|
if !headersWritten {
|
||||||
|
w.Header().Set("Content-Range", "bytes "+strconv.FormatInt(start, 10)+"-"+strconv.FormatInt(end, 10)+"/"+contentLengthStr)
|
||||||
|
w.Header().Set("Content-Length", strconv.FormatInt(end-start+1, 10))
|
||||||
|
w.Header().Set("Accept-Ranges", "bytes")
|
||||||
|
w.WriteHeader(http.StatusPartialContent)
|
||||||
|
headersWritten = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Data is available, read from the temporary file.
|
||||||
|
// We calculate how much we can read in this iteration.
|
||||||
|
readNow := memoryObject.Offset - neededStart
|
||||||
|
if readNow <= 0 {
|
||||||
|
// This can happen if we woke up but the data isn't what we need.
|
||||||
|
// The loop will continue to wait.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Don't read more than the client requested in total.
|
||||||
|
remainingToSend := bytesToSend - bytesSent
|
||||||
|
if readNow > remainingToSend {
|
||||||
|
readNow = remainingToSend
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read the chunk from the file at the correct offset.
|
||||||
|
buffer := make([]byte, readNow)
|
||||||
|
bytesRead, err := consumerFile.ReadAt(buffer, neededStart)
|
||||||
|
if err != nil && err != io.EOF {
|
||||||
|
if !headersWritten {
|
||||||
|
http.Error(w, "Error reading from cache stream", http.StatusInternalServerError)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if bytesRead > 0 {
|
||||||
|
n, err := w.Write(buffer[:bytesRead])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.With("error", err).Warn("failed to select fastest upstream")
|
// Client closed connection, just return.
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if selectedIdx == -1 || response == nil || chunks == nil {
|
bytesSent += int64(n)
|
||||||
|
}
|
||||||
|
|
||||||
|
if memoryObject.Done && bytesSent >= bytesToSend {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// startOrJoinStream ensures a download stream is active for the given key.
|
||||||
|
// If a stream already exists, it returns the channel that signals completion.
|
||||||
|
// If not, it starts a new download producer and returns its completion channel.
|
||||||
|
func (server *Server) startOrJoinStream(r *http.Request, mtime time.Time, key string) <-chan struct{} {
|
||||||
|
server.lu.Lock()
|
||||||
|
memoryObject, exists := server.o[r.URL.Path]
|
||||||
|
if exists {
|
||||||
|
// A download is already in progress. Return its completion channel.
|
||||||
|
server.lu.Unlock()
|
||||||
|
return memoryObject.fileWrittenCh
|
||||||
|
}
|
||||||
|
|
||||||
|
// No active stream, create a new one.
|
||||||
|
if err := os.MkdirAll(filepath.Dir(key), 0755); err != nil {
|
||||||
|
slog.With("error", err).Warn("failed to create local storage path for temp file")
|
||||||
|
server.lu.Unlock()
|
||||||
|
return preclosedChan // Return a closed channel to prevent blocking
|
||||||
|
}
|
||||||
|
|
||||||
|
tempFilePattern := server.Storage.Local.TemporaryFilePattern
|
||||||
|
if tempFilePattern == "" {
|
||||||
|
tempFilePattern = "cache-proxy-*"
|
||||||
|
}
|
||||||
|
tempFile, err := os.CreateTemp(filepath.Dir(key), tempFilePattern)
|
||||||
|
if err != nil {
|
||||||
|
slog.With("error", err).Warn("failed to create temporary file")
|
||||||
|
server.lu.Unlock()
|
||||||
|
return preclosedChan
|
||||||
|
}
|
||||||
|
|
||||||
|
mu := &sync.Mutex{}
|
||||||
|
fileWrittenCh := make(chan struct{})
|
||||||
|
memoryObject = &StreamObject{
|
||||||
|
TempFile: tempFile,
|
||||||
|
mu: mu,
|
||||||
|
cond: sync.NewCond(mu),
|
||||||
|
fileWrittenCh: fileWrittenCh,
|
||||||
|
}
|
||||||
|
server.o[r.URL.Path] = memoryObject
|
||||||
|
server.lu.Unlock()
|
||||||
|
|
||||||
|
// This is the producer goroutine
|
||||||
|
go func(mo *StreamObject) {
|
||||||
|
var err error
|
||||||
|
downloadSucceeded := false
|
||||||
|
tempFileName := mo.TempFile.Name()
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
// On completion (or error), update the stream object,
|
||||||
|
// wake up all consumers, and then clean up.
|
||||||
|
mo.mu.Lock()
|
||||||
|
mo.Done = true
|
||||||
|
mo.Error = err
|
||||||
|
mo.cond.Broadcast()
|
||||||
|
mo.mu.Unlock()
|
||||||
|
|
||||||
|
// Close the temp file handle.
|
||||||
|
mo.TempFile.Close()
|
||||||
|
|
||||||
|
// If download failed, remove the temp file.
|
||||||
|
if !downloadSucceeded {
|
||||||
|
os.Remove(tempFileName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The producer's job is done. Remove the object from the central map.
|
||||||
|
// Any existing consumers still hold a reference to the object,
|
||||||
|
// so it won't be garbage collected until they are done.
|
||||||
|
server.lu.Lock()
|
||||||
|
delete(server.o, r.URL.Path)
|
||||||
|
server.lu.Unlock()
|
||||||
|
slog.Debug("memory object released by producer")
|
||||||
|
close(mo.fileWrittenCh)
|
||||||
|
}()
|
||||||
|
|
||||||
|
slog.With("mtime", mtime).Debug("checking fastest upstream")
|
||||||
|
selectedIdx, response, firstChunk, upstreamErr := server.fastestUpstream(r, mtime)
|
||||||
|
if upstreamErr != nil {
|
||||||
|
if !errors.Is(upstreamErr, io.EOF) && !errors.Is(upstreamErr, io.ErrUnexpectedEOF) {
|
||||||
|
slog.With("error", upstreamErr).Warn("failed to select fastest upstream")
|
||||||
|
err = upstreamErr
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if selectedIdx == -1 || response == nil {
|
||||||
slog.Debug("no upstream is selected")
|
slog.Debug("no upstream is selected")
|
||||||
http.NotFound(w, r)
|
err = errors.New("no suitable upstream found")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if response.StatusCode == http.StatusNotModified {
|
if response.StatusCode == http.StatusNotModified {
|
||||||
slog.With("upstreamIdx", selectedIdx).Debug("not modified. using local version")
|
slog.With("upstreamIdx", selectedIdx).Debug("not modified. using local version")
|
||||||
os.Chtimes(key, zeroTime, time.Now())
|
os.Chtimes(key, zeroTime, time.Now())
|
||||||
server.serveFile(w, r, key)
|
// In this case, we don't have a new file, so we just exit.
|
||||||
|
// The temp file will be cleaned up by the defer.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
slog.With(
|
defer response.Body.Close()
|
||||||
"upstreamIdx", selectedIdx,
|
|
||||||
).Debug("found fastest upstream")
|
|
||||||
|
|
||||||
buffer := &bytes.Buffer{}
|
slog.With("upstreamIdx", selectedIdx).Debug("found fastest upstream")
|
||||||
ctx, cancel := context.WithCancel(r.Context())
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
memoryObject = &StreamObject{
|
mo.mu.Lock()
|
||||||
Headers: response.Header,
|
mo.Headers = response.Header
|
||||||
Buffer: buffer,
|
mo.cond.Broadcast() // Broadcast headers availability
|
||||||
|
mo.mu.Unlock()
|
||||||
|
|
||||||
ctx: ctx,
|
// Write the first chunk that we already downloaded.
|
||||||
|
if len(firstChunk) > 0 {
|
||||||
wg: &sync.WaitGroup{},
|
written, writeErr := mo.TempFile.Write(firstChunk)
|
||||||
|
if writeErr != nil {
|
||||||
|
err = writeErr
|
||||||
|
return
|
||||||
|
}
|
||||||
|
mo.mu.Lock()
|
||||||
|
mo.Offset += int64(written)
|
||||||
|
mo.cond.Broadcast()
|
||||||
|
mo.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
server.o[r.URL.Path] = memoryObject
|
// Download the rest of the file in chunks
|
||||||
server.lu.Unlock()
|
buffer := make([]byte, server.Misc.ChunkBytes)
|
||||||
locked = false
|
for {
|
||||||
|
n, readErr := response.Body.Read(buffer)
|
||||||
err = nil
|
if n > 0 {
|
||||||
|
written, writeErr := mo.TempFile.Write(buffer[:n])
|
||||||
if w != nil {
|
if writeErr != nil {
|
||||||
memoryObject.wg.Add(1)
|
err = writeErr
|
||||||
|
|
||||||
for k := range memoryObject.Headers {
|
|
||||||
v := memoryObject.Headers.Get(k)
|
|
||||||
w.Header().Set(k, v)
|
|
||||||
}
|
|
||||||
|
|
||||||
go memoryObject.StreamTo(w, memoryObject.wg)
|
|
||||||
}
|
|
||||||
|
|
||||||
for chunk := range chunks {
|
|
||||||
if chunk.error != nil {
|
|
||||||
err = chunk.error
|
|
||||||
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
|
|
||||||
slog.With("error", err).Warn("failed to read from upstream")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if chunk.buffer == nil {
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
n, _ := buffer.Write(chunk.buffer)
|
mo.mu.Lock()
|
||||||
memoryObject.Offset += n
|
mo.Offset += int64(written)
|
||||||
|
mo.cond.Broadcast()
|
||||||
|
mo.mu.Unlock()
|
||||||
}
|
}
|
||||||
cancel()
|
if readErr != nil {
|
||||||
|
if readErr != io.EOF {
|
||||||
memoryObject.wg.Wait()
|
err = readErr
|
||||||
|
|
||||||
if response.ContentLength > 0 {
|
|
||||||
if memoryObject.Offset == int(response.ContentLength) && err != nil {
|
|
||||||
if !(errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF)) {
|
|
||||||
slog.With("read-length", memoryObject.Offset, "content-length", response.ContentLength, "error", err, "upstreamIdx", selectedIdx).Debug("something happened during download. but response body is read as whole. so error is reset to nil")
|
|
||||||
}
|
}
|
||||||
err = nil
|
break
|
||||||
}
|
}
|
||||||
} else if err == io.EOF {
|
|
||||||
err = nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
// After download, if no critical error, rename the temp file to its final destination.
|
||||||
logger := slog.With("upstreamIdx", selectedIdx)
|
|
||||||
logger.Error("something happened during download. will not cache this response. setting lingering to reset the connection.")
|
|
||||||
hijacker, ok := w.(http.Hijacker)
|
|
||||||
if !ok {
|
|
||||||
logger.Warn("response writer is not a hijacker. failed to set lingering")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
conn, _, err := hijacker.Hijack()
|
|
||||||
if err != nil {
|
|
||||||
logger.With("error", err).Warn("hijack failed. failed to set lingering")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer conn.Close()
|
|
||||||
|
|
||||||
tcpConn, ok := conn.(*net.TCPConn)
|
|
||||||
if !ok {
|
|
||||||
logger.With("error", err).Warn("connection is not a *net.TCPConn. failed to set lingering")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err := tcpConn.SetLinger(0); err != nil {
|
|
||||||
logger.With("error", err).Warn("failed to set lingering")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
logger.Debug("connection set to linger. it will be reset once the conn.Close is called")
|
|
||||||
}
|
|
||||||
go func() {
|
|
||||||
defer func() {
|
|
||||||
server.lu.Lock()
|
|
||||||
defer server.lu.Unlock()
|
|
||||||
|
|
||||||
delete(server.o, r.URL.Path)
|
|
||||||
slog.Debug("memory object released")
|
|
||||||
}()
|
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
slog.Debug("preparing to release memory object")
|
// Set modification time
|
||||||
mtime := zeroTime
|
mtime := zeroTime
|
||||||
lastModifiedHeader := response.Header.Get("Last-Modified")
|
lastModifiedHeader := response.Header.Get("Last-Modified")
|
||||||
if lastModified, err := time.Parse(time.RFC1123, lastModifiedHeader); err != nil {
|
if lastModified, lmErr := time.Parse(time.RFC1123, lastModifiedHeader); lmErr == nil {
|
||||||
slog.With(
|
|
||||||
"error", err,
|
|
||||||
"value", lastModifiedHeader,
|
|
||||||
"url", response.Request.URL,
|
|
||||||
).Debug("failed to parse last modified header value. set modified time to now")
|
|
||||||
} else {
|
|
||||||
slog.With(
|
|
||||||
"header", lastModifiedHeader,
|
|
||||||
"value", lastModified,
|
|
||||||
"url", response.Request.URL,
|
|
||||||
).Debug("found modified time")
|
|
||||||
mtime = lastModified
|
mtime = lastModified
|
||||||
}
|
}
|
||||||
if err := os.MkdirAll(server.Storage.Local.Path, 0755); err != nil {
|
// Close file before Chtimes and Rename
|
||||||
slog.With("error", err).Warn("failed to create local storage path")
|
mo.TempFile.Close()
|
||||||
}
|
|
||||||
|
|
||||||
if server.Config.Storage.Local.TemporaryFilePattern == "" {
|
os.Chtimes(tempFileName, zeroTime, mtime)
|
||||||
if err := os.WriteFile(key, buffer.Bytes(), 0644); err != nil {
|
|
||||||
slog.With("error", err).Warn("failed to write file")
|
|
||||||
os.Remove(key)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
fp, err := os.CreateTemp(server.Storage.Local.Path, server.Storage.Local.TemporaryFilePattern)
|
// Rename the file
|
||||||
if err != nil {
|
if renameErr := os.Rename(tempFileName, key); renameErr != nil {
|
||||||
slog.With(
|
slog.With("error", renameErr, "from", tempFileName, "to", key).Warn("failed to rename temp file")
|
||||||
"key", key,
|
err = renameErr
|
||||||
"path", server.Storage.Local.Path,
|
os.Remove(tempFileName) // Attempt to clean up if rename fails
|
||||||
"pattern", server.Storage.Local.TemporaryFilePattern,
|
|
||||||
"error", err,
|
|
||||||
).Warn("failed to create template file")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
name := fp.Name()
|
|
||||||
|
|
||||||
if _, err := fp.Write(buffer.Bytes()); err != nil {
|
|
||||||
fp.Close()
|
|
||||||
os.Remove(name)
|
|
||||||
|
|
||||||
slog.With("error", err).Warn("failed to write into template file")
|
|
||||||
} else if err := fp.Close(); err != nil {
|
|
||||||
os.Remove(name)
|
|
||||||
|
|
||||||
slog.With("error", err).Warn("failed to close template file")
|
|
||||||
} else {
|
} else {
|
||||||
os.Chtimes(name, zeroTime, mtime)
|
downloadSucceeded = true
|
||||||
dirname := filepath.Dir(key)
|
|
||||||
os.MkdirAll(dirname, 0755)
|
|
||||||
os.Remove(key)
|
|
||||||
os.Rename(name, key)
|
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
logger := slog.With("upstreamIdx", selectedIdx)
|
||||||
|
logger.Error("something happened during download. will not cache this response.", "error", err)
|
||||||
}
|
}
|
||||||
}()
|
}(memoryObject)
|
||||||
|
|
||||||
}
|
return fileWrittenCh
|
||||||
}
|
}
|
||||||
|
|
||||||
func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (resultIdx int, resultResponse *http.Response, resultCh chan Chunk, resultErr error) {
|
func (server *Server) fastestUpstream(r *http.Request, lastModified time.Time) (resultIdx int, resultResponse *http.Response, firstChunk []byte, resultErr error) {
|
||||||
returnLock := &sync.Mutex{}
|
returnLock := &sync.Mutex{}
|
||||||
upstreams := len(server.Upstreams)
|
upstreams := len(server.Upstreams)
|
||||||
cancelFuncs := make([]func(), upstreams)
|
cancelFuncs := make([]func(), upstreams)
|
||||||
@ -443,6 +536,8 @@ func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (r
|
|||||||
|
|
||||||
resultIdx = -1
|
resultIdx = -1
|
||||||
|
|
||||||
|
var resultFirstChunk []byte
|
||||||
|
|
||||||
defer close(updateCh)
|
defer close(updateCh)
|
||||||
defer close(notModifiedCh)
|
defer close(notModifiedCh)
|
||||||
defer func() {
|
defer func() {
|
||||||
@ -457,7 +552,8 @@ func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (r
|
|||||||
groups := make(map[int][]int)
|
groups := make(map[int][]int)
|
||||||
for upstreamIdx, upstream := range server.Upstreams {
|
for upstreamIdx, upstream := range server.Upstreams {
|
||||||
if _, matched, err := upstream.GetPath(r.URL.Path); err != nil {
|
if _, matched, err := upstream.GetPath(r.URL.Path); err != nil {
|
||||||
return -1, nil, nil, err
|
resultErr = err
|
||||||
|
return
|
||||||
} else if !matched {
|
} else if !matched {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -465,7 +561,8 @@ func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (r
|
|||||||
priority := 0
|
priority := 0
|
||||||
for _, priorityGroup := range upstream.PriorityGroups {
|
for _, priorityGroup := range upstream.PriorityGroups {
|
||||||
if matched, err := regexp.MatchString(priorityGroup.Match, r.URL.Path); err != nil {
|
if matched, err := regexp.MatchString(priorityGroup.Match, r.URL.Path); err != nil {
|
||||||
return -1, nil, nil, err
|
resultErr = err
|
||||||
|
return
|
||||||
} else if matched {
|
} else if matched {
|
||||||
priority = priorityGroup.Priority
|
priority = priorityGroup.Priority
|
||||||
break
|
break
|
||||||
@ -501,7 +598,7 @@ func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (r
|
|||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
response, ch, err := server.tryUpstream(ctx, idx, priority, r, lastModified)
|
response, chunk, err := server.tryUpstream(ctx, idx, priority, r, lastModified)
|
||||||
if err == context.Canceled { // others returned
|
if err == context.Canceled { // others returned
|
||||||
logger.Debug("context canceled")
|
logger.Debug("context canceled")
|
||||||
return
|
return
|
||||||
@ -521,13 +618,16 @@ func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (r
|
|||||||
}
|
}
|
||||||
locked := returnLock.TryLock()
|
locked := returnLock.TryLock()
|
||||||
if !locked {
|
if !locked {
|
||||||
|
if response != nil {
|
||||||
|
response.Body.Close()
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer returnLock.Unlock()
|
defer returnLock.Unlock()
|
||||||
|
|
||||||
if response.StatusCode == http.StatusNotModified {
|
if response.StatusCode == http.StatusNotModified {
|
||||||
notModifiedOnce.Do(func() {
|
notModifiedOnce.Do(func() {
|
||||||
resultResponse, resultCh, resultErr = response, ch, err
|
resultResponse, resultErr = response, err
|
||||||
notModifiedCh <- idx
|
notModifiedCh <- idx
|
||||||
})
|
})
|
||||||
logger.Debug("voted not modified")
|
logger.Debug("voted not modified")
|
||||||
@ -535,7 +635,7 @@ func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (r
|
|||||||
}
|
}
|
||||||
|
|
||||||
updateOnce.Do(func() {
|
updateOnce.Do(func() {
|
||||||
resultResponse, resultCh, resultErr = response, ch, err
|
resultResponse, resultFirstChunk, resultErr = response, chunk, err
|
||||||
updateCh <- idx
|
updateCh <- idx
|
||||||
|
|
||||||
for cancelIdx, cancel := range cancelFuncs {
|
for cancelIdx, cancel := range cancelFuncs {
|
||||||
@ -555,6 +655,7 @@ func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (r
|
|||||||
select {
|
select {
|
||||||
case idx := <-updateCh:
|
case idx := <-updateCh:
|
||||||
resultIdx = idx
|
resultIdx = idx
|
||||||
|
firstChunk = resultFirstChunk
|
||||||
logger.With("upstreamIdx", resultIdx).Debug("upstream selected")
|
logger.With("upstreamIdx", resultIdx).Debug("upstream selected")
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
@ -569,10 +670,10 @@ func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (r
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return -1, nil, nil, nil
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (server *Server) tryUpstream(ctx context.Context, upstreamIdx, priority int, r *http.Request, lastModified time.Time) (response *http.Response, chunks chan Chunk, err error) {
|
func (server *Server) tryUpstream(ctx context.Context, upstreamIdx, priority int, r *http.Request, lastModified time.Time) (response *http.Response, firstChunk []byte, err error) {
|
||||||
upstream := server.Upstreams[upstreamIdx]
|
upstream := server.Upstreams[upstreamIdx]
|
||||||
|
|
||||||
newpath, matched, err := upstream.GetPath(r.URL.Path)
|
newpath, matched, err := upstream.GetPath(r.URL.Path)
|
||||||
@ -618,23 +719,55 @@ func (server *Server) tryUpstream(ctx context.Context, upstreamIdx, priority int
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
shouldCloseBody := true
|
||||||
|
defer func() {
|
||||||
|
if shouldCloseBody && response != nil {
|
||||||
|
response.Body.Close()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
if response.StatusCode == http.StatusNotModified {
|
if response.StatusCode == http.StatusNotModified {
|
||||||
return response, nil, nil
|
return response, nil, nil
|
||||||
}
|
}
|
||||||
if response.StatusCode >= 400 && response.StatusCode < 500 {
|
|
||||||
|
responseCheckers := upstream.Checkers
|
||||||
|
if len(responseCheckers) == 0 {
|
||||||
|
responseCheckers = append(responseCheckers, Checker{})
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, checker := range responseCheckers {
|
||||||
|
if len(checker.StatusCodes) == 0 {
|
||||||
|
checker.StatusCodes = append(checker.StatusCodes, http.StatusOK)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !slices.Contains(checker.StatusCodes, response.StatusCode) {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, headerChecker := range checker.Headers {
|
||||||
|
if headerChecker.Match == nil {
|
||||||
|
// check header exists
|
||||||
|
if _, ok := response.Header[headerChecker.Name]; !ok {
|
||||||
|
logger.Debug("missing header", "header", headerChecker.Name)
|
||||||
return nil, nil, nil
|
return nil, nil, nil
|
||||||
}
|
}
|
||||||
if response.StatusCode < 200 || response.StatusCode >= 500 {
|
} else {
|
||||||
logger.With(
|
// check header match
|
||||||
"url", newurl,
|
value := response.Header.Get(headerChecker.Name)
|
||||||
"status", response.StatusCode,
|
if matched, err := regexp.MatchString(*headerChecker.Match, value); err != nil {
|
||||||
).Warn("unexpected status")
|
return nil, nil, err
|
||||||
return response, nil, fmt.Errorf("unexpected status(url=%v): %v: %v", newurl, response.StatusCode, response)
|
} else if !matched {
|
||||||
|
logger.Debug("invalid header value",
|
||||||
|
"header", headerChecker.Name,
|
||||||
|
"value", value,
|
||||||
|
"matcher", *headerChecker.Match,
|
||||||
|
)
|
||||||
|
|
||||||
|
return nil, nil, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var currentOffset int64
|
|
||||||
|
|
||||||
ch := make(chan Chunk, 1024)
|
|
||||||
|
|
||||||
buffer := make([]byte, server.Misc.FirstChunkBytes)
|
buffer := make([]byte, server.Misc.FirstChunkBytes)
|
||||||
n, err := io.ReadAtLeast(response.Body, buffer, len(buffer))
|
n, err := io.ReadAtLeast(response.Body, buffer, len(buffer))
|
||||||
@ -643,28 +776,12 @@ func (server *Server) tryUpstream(ctx context.Context, upstreamIdx, priority int
|
|||||||
if n == 0 {
|
if n == 0 {
|
||||||
return response, nil, err
|
return response, nil, err
|
||||||
}
|
}
|
||||||
}
|
|
||||||
ch <- Chunk{buffer: buffer[:n]}
|
|
||||||
|
|
||||||
go func() {
|
if errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, io.EOF) {
|
||||||
defer close(ch)
|
err = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for {
|
shouldCloseBody = false
|
||||||
buffer := make([]byte, server.Misc.ChunkBytes)
|
return response, buffer[:n], err
|
||||||
n, err := io.ReadAtLeast(response.Body, buffer, len(buffer))
|
|
||||||
if n > 0 {
|
|
||||||
ch <- Chunk{buffer: buffer[:n]}
|
|
||||||
currentOffset += int64(n)
|
|
||||||
}
|
|
||||||
if response.ContentLength > 0 && currentOffset == response.ContentLength && err == io.EOF || err == io.ErrUnexpectedEOF {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
ch <- Chunk{error: err}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return response, ch, nil
|
|
||||||
}
|
}
|
||||||
|
1715
server_test.go
Normal file
1715
server_test.go
Normal file
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user