Compare commits
12 Commits
2025022201
...
2025061002
Author | SHA1 | Date | |
---|---|---|---|
80560f7408 | |||
147659b0da | |||
2a0bd28958 | |||
835045346d | |||
85968bb5cf | |||
e14bcb205b | |||
83dfcba4ae | |||
504a809c16 | |||
f9ff71f62a | |||
629c095bbe | |||
9b34fd29c0 | |||
3cfa6c6116 |
125
.clinerules
Normal file
125
.clinerules
Normal file
@ -0,0 +1,125 @@
|
||||
# Project
|
||||
|
||||
## User Language
|
||||
|
||||
Simplified Chinese
|
||||
|
||||
## Development
|
||||
|
||||
- TDD
|
||||
|
||||
# Cline's Memory Bank
|
||||
|
||||
I am Cline, an expert software engineer with a unique characteristic: my memory resets completely between sessions. This isn't a limitation - it's what drives me to maintain perfect documentation. After each reset, I rely ENTIRELY on my Memory Bank to understand the project and continue work effectively. I MUST read ALL memory bank files at the start of EVERY task - this is not optional.
|
||||
|
||||
## Memory Bank Structure
|
||||
|
||||
The Memory Bank consists of core files and optional context files, all in Markdown format. Files build upon each other in a clear hierarchy:
|
||||
|
||||
flowchart TD
|
||||
PB[projectbrief.md] --> PC[productContext.md]
|
||||
PB --> SP[systemPatterns.md]
|
||||
PB --> TC[techContext.md]
|
||||
|
||||
PC --> AC[activeContext.md]
|
||||
SP --> AC
|
||||
TC --> AC
|
||||
|
||||
AC --> P[progress.md]
|
||||
|
||||
### Core Files (Required)
|
||||
1. `projectbrief.md`
|
||||
- Foundation document that shapes all other files
|
||||
- Created at project start if it doesn't exist
|
||||
- Defines core requirements and goals
|
||||
- Source of truth for project scope
|
||||
|
||||
2. `productContext.md`
|
||||
- Why this project exists
|
||||
- Problems it solves
|
||||
- How it should work
|
||||
- User experience goals
|
||||
|
||||
3. `activeContext.md`
|
||||
- Current work focus
|
||||
- Recent changes
|
||||
- Next steps
|
||||
- Active decisions and considerations
|
||||
- Important patterns and preferences
|
||||
- Learnings and project insights
|
||||
|
||||
4. `systemPatterns.md`
|
||||
- System architecture
|
||||
- Key technical decisions
|
||||
- Design patterns in use
|
||||
- Component relationships
|
||||
- Critical implementation paths
|
||||
|
||||
5. `techContext.md`
|
||||
- Technologies used
|
||||
- Development setup
|
||||
- Technical constraints
|
||||
- Dependencies
|
||||
- Tool usage patterns
|
||||
|
||||
6. `progress.md`
|
||||
- What works
|
||||
- What's left to build
|
||||
- Current status
|
||||
- Known issues
|
||||
- Evolution of project decisions
|
||||
|
||||
### Additional Context
|
||||
Create additional files/folders within memory-bank/ when they help organize:
|
||||
- Complex feature documentation
|
||||
- Integration specifications
|
||||
- API documentation
|
||||
- Testing strategies
|
||||
- Deployment procedures
|
||||
|
||||
## Core Workflows
|
||||
|
||||
### Plan Mode
|
||||
flowchart TD
|
||||
Start[Start] --> ReadFiles[Read Memory Bank]
|
||||
ReadFiles --> CheckFiles{Files Complete?}
|
||||
|
||||
CheckFiles -->|No| Plan[Create Plan]
|
||||
Plan --> Document[Document in Chat]
|
||||
|
||||
CheckFiles -->|Yes| Verify[Verify Context]
|
||||
Verify --> Strategy[Develop Strategy]
|
||||
Strategy --> Present[Present Approach]
|
||||
|
||||
### Act Mode
|
||||
flowchart TD
|
||||
Start[Start] --> Context[Check Memory Bank]
|
||||
Context --> Update[Update Documentation]
|
||||
Update --> Execute[Execute Task]
|
||||
Execute --> Document[Document Changes]
|
||||
|
||||
## Documentation Updates
|
||||
|
||||
Memory Bank updates occur when:
|
||||
1. Discovering new project patterns
|
||||
2. After implementing significant changes
|
||||
3. When user requests with **update memory bank** (MUST review ALL files)
|
||||
4. When context needs clarification
|
||||
|
||||
flowchart TD
|
||||
Start[Update Process]
|
||||
|
||||
subgraph Process
|
||||
P1[Review ALL Files]
|
||||
P2[Document Current State]
|
||||
P3[Clarify Next Steps]
|
||||
P4[Document Insights & Patterns]
|
||||
|
||||
P1 --> P2 --> P3 --> P4
|
||||
end
|
||||
|
||||
Start --> Process
|
||||
|
||||
Note: When triggered by **update memory bank**, I MUST review every memory bank file, even if some don't require updates. Focus particularly on activeContext.md and progress.md as they track current state.
|
||||
|
||||
REMEMBER: After every memory reset, I begin completely fresh. The Memory Bank is my only link to previous work. It must be maintained with precision and clarity, as my effectiveness depends entirely on its accuracy.
|
@ -3,9 +3,6 @@ name: build container
|
||||
run-name: build container on ${{ gitea.actor }}
|
||||
on: [push]
|
||||
|
||||
env:
|
||||
GOPROXY: ${{ vars.GOPROXY }}
|
||||
|
||||
jobs:
|
||||
build-container:
|
||||
runs-on: docker
|
||||
@ -24,7 +21,7 @@ jobs:
|
||||
- name: Check out repository code
|
||||
uses: actions/checkout@v4
|
||||
- name: Build container
|
||||
run: REGISTRY=${{ github.server_url}}; REGISTRY=${REGISTRY##https://}; REGISTRY=${REGISTRY##http://}; podman build -t $REGISTRY/${{ github.repository }} .
|
||||
run: REGISTRY=${{ github.server_url}}; REGISTRY=${REGISTRY##https://}; REGISTRY=${REGISTRY##http://}; podman build --build-arg GOPROXY=${{ vars.GOPROXY }} -t $REGISTRY/${{ github.repository }} .
|
||||
- name: Login to Container Registry
|
||||
run: echo "${{ secrets.ACTION_PACKAGE_WRITE_TOKEN }}" | podman login ${{ github.server_url }} -u ${{ github.repository_owner }} --password-stdin
|
||||
- name: Push Container Image
|
||||
|
5
.gitignore
vendored
5
.gitignore
vendored
@ -1,4 +1,7 @@
|
||||
.vscode
|
||||
|
||||
data
|
||||
__debug*
|
||||
__debug*
|
||||
|
||||
.env
|
||||
.envrc
|
10
Dockerfile
10
Dockerfile
@ -1,10 +1,12 @@
|
||||
FROM docker.io/library/golang:1.23-alpine
|
||||
ARG GOPROXY
|
||||
FROM docker.io/library/golang:1.24-alpine
|
||||
ARG GOPROXY=direct
|
||||
ARG TAGS=""
|
||||
ARG ALPINE_MIRROR=https://mirrors.ustc.edu.cn
|
||||
WORKDIR /src
|
||||
COPY go.mod go.sum ./
|
||||
RUN apk add git && env GOPROXY=${GOPROXY:-direct} go mod download
|
||||
RUN sed -i "s,https://dl-cdn.alpinelinux.org,${ALPINE_MIRROR}," /etc/apk/repositories; apk add git && env GOPROXY=${GOPROXY:-direct} go mod download
|
||||
ADD . /src
|
||||
RUN go build -o /cache-proxy ./cmd/proxy
|
||||
RUN go build -o /cache-proxy -tags "${TAGS}" ./cmd/proxy
|
||||
|
||||
FROM docker.io/library/alpine:3.21 AS runtime
|
||||
COPY --from=0 /cache-proxy /bin/cache-proxy
|
||||
|
17
cmd/proxy/debug.go
Normal file
17
cmd/proxy/debug.go
Normal file
@ -0,0 +1,17 @@
|
||||
//go:build pprof
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"os"
|
||||
)
|
||||
|
||||
func init() {
|
||||
flag.BoolVar(&pprofEnabled, "pprof", true, "")
|
||||
|
||||
if v, ok := os.LookupEnv("SENTRY_DSN"); ok {
|
||||
sentrydsn = v
|
||||
}
|
||||
flag.StringVar(&sentrydsn, "sentry", sentrydsn, "sentry dsn to report errors")
|
||||
}
|
@ -4,13 +4,14 @@ import (
|
||||
"flag"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"net/http/pprof"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
cacheproxy "git.jeffthecoder.xyz/guochao/cache-proxy"
|
||||
"git.jeffthecoder.xyz/guochao/cache-proxy/pkgs/middleware"
|
||||
"git.jeffthecoder.xyz/guochao/cache-proxy/pkgs/middleware/handlerlog"
|
||||
"git.jeffthecoder.xyz/guochao/cache-proxy/pkgs/middleware/handlerctx"
|
||||
"git.jeffthecoder.xyz/guochao/cache-proxy/pkgs/middleware/httplog"
|
||||
"git.jeffthecoder.xyz/guochao/cache-proxy/pkgs/middleware/recover"
|
||||
|
||||
@ -22,6 +23,7 @@ var (
|
||||
configFilePath = "config.yaml"
|
||||
logLevel = "info"
|
||||
sentrydsn = ""
|
||||
pprofEnabled = false
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -31,12 +33,8 @@ func init() {
|
||||
if v, ok := os.LookupEnv("LOG_LEVEL"); ok {
|
||||
logLevel = v
|
||||
}
|
||||
if v, ok := os.LookupEnv("SENTRY_DSN"); ok {
|
||||
sentrydsn = v
|
||||
}
|
||||
flag.StringVar(&configFilePath, "config", configFilePath, "path to config file")
|
||||
flag.StringVar(&logLevel, "log-level", logLevel, "log level. (trace, debug, info, warn, error)")
|
||||
flag.StringVar(&sentrydsn, "sentry", sentrydsn, "sentry dsn to report errors")
|
||||
}
|
||||
|
||||
func configFromFile(path string) (*cacheproxy.Config, error) {
|
||||
@ -57,9 +55,9 @@ func configFromFile(path string) (*cacheproxy.Config, error) {
|
||||
Local: &cacheproxy.LocalStorage{
|
||||
Path: "./data",
|
||||
TemporaryFilePattern: "temp.*",
|
||||
},
|
||||
Accel: cacheproxy.Accel{
|
||||
ResponseWithHeaders: []string{"X-Sendfile", "X-Accel-Redirect"},
|
||||
Accel: cacheproxy.Accel{
|
||||
RespondWithHeaders: []string{"X-Sendfile", "X-Accel-Redirect"},
|
||||
},
|
||||
},
|
||||
},
|
||||
Misc: cacheproxy.MiscConfig{
|
||||
@ -124,10 +122,18 @@ func main() {
|
||||
|
||||
mux.HandleFunc("GET /", server.HandleRequestWithCache)
|
||||
|
||||
if pprofEnabled {
|
||||
mux.HandleFunc("GET /debug/pprof/", pprof.Index)
|
||||
mux.HandleFunc("GET /debug/pprof/cmdline", pprof.Cmdline)
|
||||
mux.HandleFunc("GET /debug/pprof/profile", pprof.Profile)
|
||||
mux.HandleFunc("GET /debug/pprof/symbol", pprof.Symbol)
|
||||
mux.HandleFunc("GET /debug/pprof/trace", pprof.Trace)
|
||||
}
|
||||
|
||||
slog.With("addr", ":8881").Info("serving app")
|
||||
if err := http.ListenAndServe(":8881", middleware.Use(mux,
|
||||
recover.Recover(),
|
||||
handlerlog.FindHandler(mux),
|
||||
handlerctx.FindHandler(mux),
|
||||
httplog.Log(httplog.Config{LogStart: true, LogFinish: true}),
|
||||
)); err != nil {
|
||||
slog.With("error", err).Error("failed to start server")
|
||||
|
20
compose.release.yaml
Normal file
20
compose.release.yaml
Normal file
@ -0,0 +1,20 @@
|
||||
services:
|
||||
cache-proxy:
|
||||
image: ${REPOSITORY:-git.jeffthecoder.xyz}/${OWNER:-public}/cache-proxy:${VERSION:-latest}
|
||||
build:
|
||||
args:
|
||||
GOPROXY: ${GOPROXY:-direct}
|
||||
ALPINE_MIRROR: ${ALPINE_MIRROR:-https://mirrors.ustc.edu.cn}
|
||||
|
||||
restart: always
|
||||
|
||||
volumes:
|
||||
- ./config.yaml:/config.yaml:ro
|
||||
- ./data:/data
|
||||
ports:
|
||||
- 8881:8881
|
||||
environment:
|
||||
- LOG_LEVEL=info
|
||||
env_file:
|
||||
- path: .env
|
||||
required: false
|
13
compose.yaml
13
compose.yaml
@ -1,11 +1,12 @@
|
||||
services:
|
||||
cache-proxy:
|
||||
image: 100.64.0.2:3000/guochao/cache-proxy
|
||||
image: ${REPOSITORY:-git.jeffthecoder.xyz}/${OWNER:-public}/cache-proxy:${VERSION:-latest}
|
||||
build:
|
||||
args:
|
||||
GOPROXY: ${GOPROXY:-direct}
|
||||
ALPINE_MIRROR: ${ALPINE_MIRROR:-https://mirrors.ustc.edu.cn}
|
||||
TAGS: pprof
|
||||
|
||||
pull_policy: always
|
||||
restart: always
|
||||
|
||||
volumes:
|
||||
@ -14,8 +15,8 @@ services:
|
||||
ports:
|
||||
- 8881:8881
|
||||
environment:
|
||||
- HTTPS_PROXY=http://100.64.0.23:7890
|
||||
- HTTP_PROXY=http://100.64.0.23:7890
|
||||
- ALL_PROXY=http://100.64.0.23:7890
|
||||
- SENTRY_DSN=http://3b7336602c77427d96318074cd86ee04@100.64.0.2:8000/2
|
||||
- SENTRY_DSN=${SENTRY_DSN}
|
||||
- LOG_LEVEL=debug
|
||||
env_file:
|
||||
- path: .env
|
||||
required: false
|
28
config.go
28
config.go
@ -10,43 +10,55 @@ type UpstreamPriorityGroup struct {
|
||||
Priority int `yaml:"priority"`
|
||||
}
|
||||
|
||||
type UpstreamMatch struct {
|
||||
type PathTransformation struct {
|
||||
Match string `yaml:"match"`
|
||||
Replace string `yaml:"replace"`
|
||||
}
|
||||
|
||||
type HeaderChecker struct {
|
||||
Name string `yaml:"name"`
|
||||
Match *string `yaml:"match"`
|
||||
}
|
||||
|
||||
type Checker struct {
|
||||
StatusCodes []int `yaml:"status-codes"`
|
||||
Headers []HeaderChecker `yaml:"headers"`
|
||||
}
|
||||
|
||||
type Upstream struct {
|
||||
Server string `yaml:"server"`
|
||||
Match UpstreamMatch `yaml:"match"`
|
||||
Path PathTransformation `yaml:"path"`
|
||||
Checkers []Checker `yaml:"checkers"`
|
||||
AllowedRedirect *string `yaml:"allowed-redirect"`
|
||||
PriorityGroups []UpstreamPriorityGroup `yaml:"priority-groups"`
|
||||
}
|
||||
|
||||
func (upstream Upstream) GetPath(orig string) (string, bool, error) {
|
||||
if upstream.Match.Match == "" || upstream.Match.Replace == "" {
|
||||
if upstream.Path.Match == "" || upstream.Path.Replace == "" {
|
||||
return orig, true, nil
|
||||
}
|
||||
matcher, err := regexp.Compile(upstream.Match.Match)
|
||||
matcher, err := regexp.Compile(upstream.Path.Match)
|
||||
if err != nil {
|
||||
return "", false, err
|
||||
}
|
||||
return matcher.ReplaceAllString(orig, upstream.Match.Replace), matcher.MatchString(orig), nil
|
||||
return matcher.ReplaceAllString(orig, upstream.Path.Replace), matcher.MatchString(orig), nil
|
||||
}
|
||||
|
||||
type LocalStorage struct {
|
||||
Path string `yaml:"path"`
|
||||
TemporaryFilePattern string `yaml:"temporary-file-pattern"`
|
||||
|
||||
Accel Accel `yaml:"accel"`
|
||||
}
|
||||
|
||||
type Accel struct {
|
||||
EnableByHeader string `yaml:"enable-by-header"`
|
||||
ResponseWithHeaders []string `yaml:"response-with-headers"`
|
||||
EnableByHeader string `yaml:"enable-by-header"`
|
||||
RespondWithHeaders []string `yaml:"respond-with-headers"`
|
||||
}
|
||||
|
||||
type Storage struct {
|
||||
Type string `yaml:"type"`
|
||||
Local *LocalStorage `yaml:"local"`
|
||||
Accel Accel `yaml:"accel"`
|
||||
}
|
||||
|
||||
type CachePolicyOnPath struct {
|
||||
|
103
config.yaml
103
config.yaml
@ -1,108 +1,120 @@
|
||||
upstream:
|
||||
- server: https://mirrors.aliyun.com
|
||||
match:
|
||||
match: /(debian|ubuntu|ubuntu-releases|alpine|archlinux|kali|manjaro|msys2|almalinux|rocky|centos|centos-stream|centos-vault|fedora|epel|gnu)/(.*)
|
||||
path:
|
||||
match: /(debian|ubuntu|ubuntu-ports|ubuntu-releases|alpine|archlinux|kali|manjaro|msys2|almalinux|rocky|centos|centos-stream|centos-vault|fedora|epel|gnu|pypi)/(.*)
|
||||
replace: '/$1/$2'
|
||||
- server: https://mirrors.tencent.com
|
||||
match:
|
||||
match: /(debian|ubuntu|ubuntu-releases|alpine|archlinux|kali|manjaro|msys2|almalinux|rocky|centos|centos-stream|centos-vault|fedora|epel|gnu)/(.*)
|
||||
path:
|
||||
match: /(debian|ubuntu|ubuntu-ports|ubuntu-releases|alpine|archlinux|kali|manjaro|msys2|almalinux|rocky|centos|centos-stream|centos-vault|fedora|epel|gnu)/(.*)
|
||||
replace: '/$1/$2'
|
||||
- server: https://mirrors.ustc.edu.cn
|
||||
match:
|
||||
match: /(debian|ubuntu|ubuntu-releases|alpine|archlinux|kali|manjaro|msys2|almalinux|rocky|centos|centos-stream|centos-vault|fedora|epel|elrepo|remi|rpmfusion|tailscale|gnu|rust-static)/(.*)
|
||||
path:
|
||||
match: /(debian|ubuntu|ubuntu-releases|alpine|archlinux|kali|manjaro|msys2|almalinux|rocky|centos|centos-stream|centos-vault|fedora|epel|elrepo|remi|rpmfusion|tailscale|gnu|rust-static|pypi)/(.*)
|
||||
replace: '/$1/$2'
|
||||
checkers:
|
||||
headers:
|
||||
- name: content-type
|
||||
match: application/octet-stream
|
||||
- server: https://packages.microsoft.com/repos/code
|
||||
match:
|
||||
path:
|
||||
match: /microsoft-code(.*)
|
||||
replace: '$1'
|
||||
- server: https://archive.ubuntu.com
|
||||
match:
|
||||
path:
|
||||
match: /ubuntu/(.*)
|
||||
replace: /$1
|
||||
- server: https://ports.ubuntu.com
|
||||
path:
|
||||
match: /ubuntu-ports/(.*)
|
||||
replace: /$1
|
||||
# priority-groups:
|
||||
# - match: /dists/.*
|
||||
# priority: 10
|
||||
- server: https://releases.ubuntu.com/
|
||||
match:
|
||||
path:
|
||||
match: /ubuntu-releases/(.*)
|
||||
replace: /$1
|
||||
- server: https://apt.releases.hashicorp.com
|
||||
match:
|
||||
path:
|
||||
match: /hashicorp-deb/(.*)
|
||||
replace: /$1
|
||||
- server: https://rpm.releases.hashicorp.com
|
||||
match:
|
||||
path:
|
||||
match: /hashicorp-rpm/(.*)
|
||||
replace: /$1
|
||||
- server: https://dl.google.com/linux/chrome
|
||||
match:
|
||||
path:
|
||||
match: /google-chrome/(.*)
|
||||
replace: /$1
|
||||
- server: https://pkgs.tailscale.com/stable
|
||||
match:
|
||||
path:
|
||||
match: /tailscale/(.*)
|
||||
replace: /$1
|
||||
- server: http://update.cs2c.com.cn:8080
|
||||
match:
|
||||
path:
|
||||
match: /kylinlinux/(.*)
|
||||
replace: /$1
|
||||
- server: https://cdn.jsdelivr.net
|
||||
match:
|
||||
path:
|
||||
match: /jsdelivr/(.*)
|
||||
replace: /$1
|
||||
- server: https://dl-cdn.alpinelinux.org
|
||||
match:
|
||||
path:
|
||||
match: /alpine/(.*)
|
||||
replace: /$1
|
||||
- server: https://repo.almalinux.org
|
||||
match:
|
||||
path:
|
||||
match: /almalinux/(.*)
|
||||
replace: /almalinux/$1
|
||||
- server: https://dl.rockylinux.org/pub/rocky
|
||||
match:
|
||||
path:
|
||||
match: /rocky/(.*)
|
||||
replace: /$1
|
||||
- server: https://dl.fedoraproject.org/pub/epel
|
||||
match:
|
||||
path:
|
||||
match: /epel/(.*)
|
||||
replace: /$1
|
||||
- server: https://deb.debian.org
|
||||
match:
|
||||
path:
|
||||
match: /(debian|debian-security)/(.*)
|
||||
replace: /$1/$2
|
||||
# priority-groups:
|
||||
# - match: /dists/.*
|
||||
# priority: 10
|
||||
- server: https://static.rust-lang.org
|
||||
match:
|
||||
path:
|
||||
match: /rust-static/(.*)
|
||||
replace: /$1
|
||||
- server: https://storage.flutter-io.cn
|
||||
match:
|
||||
path:
|
||||
match: /flutter-storage/(.*)
|
||||
replace: /$1
|
||||
- server: https://cdn-mirror.chaotic.cx/chaotic-aur
|
||||
match:
|
||||
path:
|
||||
match: /chaotic-aur/(.*)
|
||||
replace: /$1
|
||||
allowed-redirect: "^https?://cf-builds.garudalinux.org/.*/chaotic-aur/.*$"
|
||||
- server: http://download.zfsonlinux.org
|
||||
path:
|
||||
match: /openzfs/(.*)
|
||||
replace: /$1
|
||||
|
||||
misc:
|
||||
first-chunk-bytes: 31457280 # 1024*1024*30, all upstreams with 200 response will wait for the first chunks to select a upstream by bandwidth, instead of by latency. defaults to 50M
|
||||
# chunk-bytes: 1048576 # 1024*1024
|
||||
first-chunk-bytes: 31457280 ## 1024*1024*30, all upstreams with 200 response will wait for the first chunks to select a upstream by bandwidth, instead of by latency. defaults to 50M
|
||||
# chunk-bytes: 1048576 ## 1024*1024
|
||||
|
||||
cache:
|
||||
timeout: 1h
|
||||
policies:
|
||||
- match: '.*\.(rpm|deb|apk|iso)$' # rpm/deb/apk/iso won't change, create only
|
||||
- match: '.*\.(rpm|deb|apk|iso)$' ## rpm/deb/apk/iso won't change, create only
|
||||
refresh-after: never
|
||||
- match: '^/.*-security/.*' # mostly ubuntu/debian security
|
||||
- match: '^/.*-security/.*' ## mostly ubuntu/debian security
|
||||
refresh-after: 1h
|
||||
- match: '^/archlinux-localaur/.*' # archlinux local repo
|
||||
- match: '^/archlinux-localaur/.*' ## archlinux local repo
|
||||
refresh-after: never
|
||||
- match: '^/(archlinux.*|chaotic-aur)/*.tar.*' # archlinux packages
|
||||
- match: '^/(archlinux.*|chaotic-aur)/*.tar.*' ## archlinux packages
|
||||
refresh-after: never
|
||||
- match: '/chaotic-aur/.*\.db$' # archlinux chaotic-aur database
|
||||
- match: '/chaotic-aur/.*\.db$' ## archlinux chaotic-aur database
|
||||
refresh-after: 24h
|
||||
- match: '/centos/7'
|
||||
refresh-after: never
|
||||
@ -110,20 +122,23 @@ cache:
|
||||
refresh-after: never
|
||||
|
||||
storage:
|
||||
type: local # ignored
|
||||
type: local ## ignored for now
|
||||
local:
|
||||
path: ./data # defaults to ./data
|
||||
path: ./data ## defaults to ./data
|
||||
|
||||
accel:
|
||||
# example nginx config:
|
||||
## location /i {
|
||||
## internal;
|
||||
## alias /path/to/data;
|
||||
## }
|
||||
## location / {
|
||||
## proxy_pass 127.0.0.1:8881;
|
||||
## proxy_set_header X-Accel-Path /i;
|
||||
## }
|
||||
##
|
||||
accel:
|
||||
## example nginx config:
|
||||
## location /i {
|
||||
## internal;
|
||||
## alias /path/to/data;
|
||||
## }
|
||||
## location / {
|
||||
## proxy_pass 127.0.0.1:8881;
|
||||
## proxy_set_header X-Accel-Path /i;
|
||||
## }
|
||||
##
|
||||
## then cache proxy will respond to backend a header to indicate sendfile(join(X-Accel-Path, relpath))
|
||||
# enable-by-header: "X-Accel-Path"
|
||||
|
||||
# enable-by-header: "X-Accel-Path"
|
||||
## respond with different headers to
|
||||
# respond-with-headers: [X-Sendfile, X-Accel-Redirect]
|
||||
|
33
memory-bank/activeContext.md
Normal file
33
memory-bank/activeContext.md
Normal file
@ -0,0 +1,33 @@
|
||||
# 当前工作重点
|
||||
|
||||
当前没有正在进行的紧急任务。项目处于稳定状态,可以根据 `progress.md` 中的待办事项列表来规划接下来的工作。
|
||||
|
||||
## 近期变更
|
||||
|
||||
- **实现流式请求的阶段二优化 (基于临时文件)**:
|
||||
- **问题**: `StreamObject` 使用内存中的 `bytes.Buffer` 来缓存下载内容,在处理大文件时会导致内存占用过高。
|
||||
- **解决方案**: 将 `StreamObject` 的 `Buffer` 替换为 `*os.File`,将下载的字节流直接写入临时文件。
|
||||
- **实现细节**:
|
||||
- `startOrJoinStream` (生产者) 现在创建临时文件并将下载内容写入其中。下载成功后,该临时文件会被重命名为最终的缓存文件。
|
||||
- `serveRangedRequest` (消费者) 为了解决文件句柄的生命周期竞态问题,采用了 `syscall.Dup()` 来复制生产者的文件描述符。每个消费者都使用自己独立的、复制出来的文件句柄来读取数据,从而与生产者的文件句柄生命周期解耦。
|
||||
- **结果**: 解决了大文件的内存占用问题,并通过了所有测试,包括一个专门为验证并发安全性和竞态条件而设计的新测试。
|
||||
|
||||
## 后续步骤
|
||||
|
||||
下一个合乎逻辑的步骤是处理 `progress.md` 中列出的待办事项,例如:
|
||||
|
||||
1. **功能增强**:
|
||||
- **多后端支持**: 增加对 S3、Redis 等其他存储后端的支持。
|
||||
- **高级负载均衡**: 实现更复杂的负载均衡策略。
|
||||
- **监控**: 集成 Prometheus 指标。
|
||||
2. **代码重构**:
|
||||
- 对 `server.go` 中的复杂函数进行重构,提高可读性。
|
||||
|
||||
在开始新任务之前,需要与您确认下一个工作重点。
|
||||
|
||||
## 重要模式与偏好
|
||||
|
||||
- **代码风格**: 遵循 Go 社区的最佳实践,代码清晰、简洁,并有适当的注释。
|
||||
- **并发编程**: 倾向于使用 Go 的原生并发原语(goroutine 和 channel)来解决并发问题。
|
||||
- **配置驱动**: 核心逻辑应与配置分离,使得项目可以通过修改配置文件来适应不同的使用场景,而无需修改代码。
|
||||
- **文档驱动**: 所有重要的设计决策、架构变更和功能实现都应在 Memory Bank 中有相应的记录。
|
26
memory-bank/productContext.md
Normal file
26
memory-bank/productContext.md
Normal file
@ -0,0 +1,26 @@
|
||||
# 产品背景
|
||||
|
||||
在许多场景下,用户需要从软件镜像站、文件服务器或其他资源站点下载文件。然而,由于网络波动、服务器负载或地理位置等原因,单个服务器的下载速度和稳定性可能无法得到保证。
|
||||
|
||||
## 解决的问题
|
||||
|
||||
`cache-proxy` 旨在解决以下问题:
|
||||
|
||||
1. **单点故障**: 当用户依赖的唯一镜像服务器宕机或无法访问时,下载会中断。
|
||||
2. **网络速度不佳**: 用户可能没有连接到最快的可用服务器,导致下载时间过长。
|
||||
3. **重复下载**: 多个用户或同一个用户多次下载相同的文件时,会产生不必要的网络流量,增加了服务器的负载。
|
||||
|
||||
## 工作原理
|
||||
|
||||
`cache-proxy` 通过在用户和多个上游服务器之间引入一个代理层来解决这些问题。
|
||||
|
||||
- 当收到一个文件请求时,它会并发地向所有配置的上游服务器请求该文件。
|
||||
- 它会选择第一个返回成功响应的上游服务器,并将数据流式传输给用户。
|
||||
- 同时,它会将文件缓存到本地磁盘。
|
||||
- 对于后续的相同文件请求,如果缓存未过期,它将直接从本地提供文件,极大地提高了响应速度并减少了网络流量。
|
||||
|
||||
## 用户体验目标
|
||||
|
||||
- **无缝集成**: 用户只需将下载地址指向 `cache-proxy` 即可,无需关心后端的复杂性。
|
||||
- **更快的下载**: 通过竞争机制选择最快的源,用户总能获得最佳的下载体验。
|
||||
- **更高的可用性**: 即使部分上游服务器出现问题,只要有一个可用,服务就不会中断。
|
44
memory-bank/progress.md
Normal file
44
memory-bank/progress.md
Normal file
@ -0,0 +1,44 @@
|
||||
# 项目进展
|
||||
|
||||
这是一个已完成的 `cache-proxy` 项目的初始状态。核心功能已经实现并可以工作。
|
||||
|
||||
## 已完成的功能
|
||||
|
||||
- **核心代理逻辑**:
|
||||
- 从 `config.yaml` 加载配置。
|
||||
- 启动 HTTP 服务器并监听请求。
|
||||
- 根据请求路径检查本地缓存。
|
||||
- **并发上游请求**:
|
||||
- 能够并发地向上游服务器发起请求。
|
||||
- 能够正确地选择最快响应的服务器。
|
||||
- 能够在选择一个服务器后取消其他请求。
|
||||
- **缓存管理**:
|
||||
- 能够将下载的文件缓存到本地磁盘。
|
||||
- 支持基于时间的缓存刷新策略。
|
||||
- 支持通过 `If-Modified-Since` 请求头来减少不必要的数据传输。
|
||||
- **并发请求处理**:
|
||||
- 能够正确处理对同一文件的多个并发请求,确保只下载一次。
|
||||
- **加速下载**:
|
||||
- 支持通过 `X-Sendfile` / `X-Accel-Redirect` 头将文件发送委托给前端服务器(如 Nginx)。
|
||||
- **全面的测试覆盖**:
|
||||
- 完成了 `server_test.go` 的实现,为所有核心功能提供了单元测试和集成测试。
|
||||
- 测试覆盖了正常流程、边缘情况(如超时、上游失败)和安全(如路径穿越)等方面。
|
||||
- 对测试代码和注释进行了审查,确保其准确性和一致性。
|
||||
- 所有测试均已通过,验证了现有代码的健壮性。
|
||||
|
||||
## 待办事项
|
||||
|
||||
- **功能增强**:
|
||||
- **实现流式范围请求 (Done)**: 重构了 Ranged Request 处理流程,实现了边下载边响应。
|
||||
- **阶段一 (已完成)**: 使用 `sync.Cond` 和内存 `bytes.Buffer` 实现。
|
||||
- **性能优化 (已完成)**: 移除了 `StreamObject` 清理逻辑中的固定延迟,改为依赖 Go 的 GC 机制,显著提升了并发请求下的测试性能。
|
||||
- **阶段二 (已完成)**: 使用临时文件和 `syscall.Dup()` 系统调用优化,解决了大文件内存占用问题。
|
||||
- 目前只支持本地文件存储,未来可以考虑增加对其他存储后端(如 S3、Redis)的支持。
|
||||
- 增加更复杂的负载均衡策略,而不仅仅是“选择最快”。
|
||||
- 增加更详细的监控和指标(如 Prometheus metrics)。
|
||||
- **代码优化**:
|
||||
- 在完成流式请求功能后,对 `server.go` 中的一些复杂函数(如 `streamOnline`)进行重构,以提高可读性和可维护性。
|
||||
|
||||
## 已知问题
|
||||
|
||||
- 在当前阶段,尚未发现明显的 bug。代码结构清晰,逻辑完整。
|
10
memory-bank/projectbrief.md
Normal file
10
memory-bank/projectbrief.md
Normal file
@ -0,0 +1,10 @@
|
||||
# 项目简介:Cache-Proxy
|
||||
|
||||
这是一个使用 Go 语言编写的高性能缓存代理服务器。其核心功能是接收客户端的请求,并将其转发到多个配置好的上游(Upstream)镜像服务器。它会并发地向上游服务器发起请求,并选择最快返回响应的服务器,将其结果返回给客户端,同时将结果缓存到本地,以便后续请求能够更快地得到响应。
|
||||
|
||||
## 核心需求
|
||||
|
||||
- **性能**: 必须能够快速地处理并发请求,并有效地利用缓存。
|
||||
- **可靠性**: 当某个上游服务器不可用时,能够自动切换到其他可用的服务器。
|
||||
- **可配置性**: 用户可以通过配置文件轻松地添加、删除和配置上游服务器、缓存策略以及其他行为。
|
||||
- **透明性**: 对客户端来说,代理应该是透明的,客户端只需像访问普通服务器一样访问代理即可。
|
62
memory-bank/systemPatterns.md
Normal file
62
memory-bank/systemPatterns.md
Normal file
@ -0,0 +1,62 @@
|
||||
# 系统架构与设计模式
|
||||
|
||||
`cache-proxy` 的系统设计遵循了几个关键的模式和原则,以实现其高性能和高可用性的目标。
|
||||
|
||||
## 核心架构
|
||||
|
||||
系统可以分为三个主要部分:
|
||||
|
||||
1. **HTTP 服务器层**: 负责接收客户端请求,并使用中间件进行日志记录、错误恢复等通用处理。
|
||||
2. **缓存处理层**: 检查请求的文件是否存在于本地缓存中,并根据缓存策略决定是直接提供缓存文件还是向上游请求。
|
||||
3. **上游选择与下载层**: 这是系统的核心,负责并发地从多个上游服务器获取数据,并管理下载过程。
|
||||
|
||||
## 关键设计模式
|
||||
|
||||
### 1. 竞争式请求 (Racing Requests)
|
||||
|
||||
这是实现“选择最快”功能的核心模式。
|
||||
|
||||
- `fastesUpstream` 函数为每个上游服务器创建一个 goroutine。
|
||||
- 所有 goroutine 并发地向上游服务器发送请求。
|
||||
- 使用 `sync.Once` 来确保只有一个 goroutine 能够“胜出”并成为最终的数据源。
|
||||
- 一旦有 goroutine 胜出,它会调用 `context.CancelFunc` 来通知所有其他 goroutine 停止工作,从而避免不必要的资源消耗。
|
||||
|
||||
### 2. 生产者-消费者模式 (Producer-Consumer)
|
||||
|
||||
在文件下载和流式响应(Ranged Request)中,使用了生产者-消费者模式。
|
||||
|
||||
- **生产者 (`startOrJoinStream`)**:
|
||||
- 为每个首次请求的文件启动一个 goroutine。
|
||||
- 负责从最快的上游服务器下载文件内容。
|
||||
- 将内容**直接写入一个临时文件** (`*os.File`) 中,而不是写入内存缓冲区。
|
||||
- 通过 `sync.Cond` 广播下载进度(已写入的字节数 `Offset`)。
|
||||
- 下载成功后,将临时文件重命名为最终的缓存文件。
|
||||
- **消费者 (`serveRangedRequest`)**:
|
||||
- 当收到一个范围请求时,它会找到或等待对应的 `StreamObject`。
|
||||
- 为了安全地并发读取正在被生产者写入的文件,消费者会使用 `syscall.Dup()` **复制临时文件的文件描述符**。
|
||||
- 每个消费者都通过自己独立的、复制出来的文件句柄 (`*os.File`) 读取所需范围的数据,这避免了与生产者或其他消费者发生文件句柄状态的冲突。
|
||||
- 消费者根据生产者的 `Offset` 进度和 `sync.Cond` 信号来等待其请求范围的数据变为可用。
|
||||
|
||||
### 3. 并发访问控制与对象生命周期管理
|
||||
|
||||
为了处理多个客户端同时请求同一个文件的情况,系统使用了 `sync.Mutex` 和一个 `map[string]*StreamObject`。
|
||||
|
||||
- 当第一个对某文件的请求(我们称之为“消费者”)到达时,它会获得一个锁,并创建一个 `StreamObject` 来代表这个正在进行的下载任务,然后启动一个“生产者”goroutine 来执行下载。
|
||||
- 后续对同一文件的请求会发现 `StreamObject` 已存在于 map 中,它们不会再次启动下载,而是会共享这个对象。
|
||||
|
||||
**`StreamObject` 生命周期管理 (基于 GC)**
|
||||
|
||||
我们采用了一种简洁且高效的、依赖 Go 语言垃圾回收(GC)的模式来管理 `StreamObject` 的生命周期:
|
||||
|
||||
1. **生产者负责移除**: 下载 goroutine(生产者)在完成其任务(无论成功或失败)后,其唯一的职责就是将 `StreamObject` 从全局的 `map[string]*StreamObject` 中移除。
|
||||
2. **消费者持有引用**: 与此同时,所有正在处理该文件请求的 HTTP Handler(消费者)仍然持有对该 `StreamObject` 的引用。
|
||||
3. **GC 自动回收**: 因为消费者们还持有引用,Go 的 GC 不会回收这个对象。只有当最后一个消费者处理完请求、其函数栈帧销毁后,对 `StreamObject` 的最后一个引用才会消失。此时,GC 会在下一次运行时自动回收该对象的内存。
|
||||
|
||||
这个模式避免了复杂的引用计数或定时器,代码更简洁,并且从根本上解决了之前因固定延迟导致的性能问题。
|
||||
|
||||
### 4. 中间件 (Middleware)
|
||||
|
||||
项目使用了 Go 的标准 `http.Handler` 接口和中间件模式来构建请求处理链。
|
||||
|
||||
- `pkgs/middleware` 目录中定义了可重用的中间件,如 `httplog` 和 `recover`。
|
||||
- 这种模式使得在不修改核心业务逻辑的情况下,可以轻松地添加或删除日志、认证、错误处理等功能。
|
26
memory-bank/techContext.md
Normal file
26
memory-bank/techContext.md
Normal file
@ -0,0 +1,26 @@
|
||||
# 技术栈与开发环境
|
||||
|
||||
## 核心技术
|
||||
|
||||
- **语言**: Go (Golang)
|
||||
- **主要依赖**:
|
||||
- `gopkg.in/yaml.v3`: 用于解析 `config.yaml` 配置文件。
|
||||
- `log/slog`: Go 1.21+ 内置的结构化日志库。
|
||||
- `net/http`: 用于构建 HTTP 服务器和客户端。
|
||||
- `github.com/getsentry/sentry-go`: (可选)用于错误追踪。
|
||||
|
||||
## 开发与构建
|
||||
|
||||
- **依赖管理**: 使用 Go Modules (`go.mod`, `go.sum`) 进行依赖管理。
|
||||
- **配置文件**: 项目的行为由 `config.yaml` 文件驱动。
|
||||
- **容器化**:
|
||||
- `Dockerfile`: 用于构建项目的 Docker 镜像。
|
||||
- `compose.yaml`: 用于在开发环境中启动服务。
|
||||
- `compose.release.yaml`: 用于在生产环境中部署服务。
|
||||
|
||||
## 关键技术点
|
||||
|
||||
- **并发模型**: 大量使用 goroutine 和 channel 来实现高并发。特别是 `fastesUpstream` 函数中的并发请求模式。
|
||||
- **流式处理**: 通过 `io.Reader` 和 `io.Writer` 接口,数据以流的形式被处理,从未完全加载到内存中,这对于处理大文件至关重要。
|
||||
- **错误处理**: 在 Go 的标准错误处理之上,项目在某些关键路径(如 `fastesUpstream`)中使用了 `context` 包来处理超时和取消操作。
|
||||
- **结构化日志**: 使用 `slog` 库,可以输出结构化的、易于机器解析的日志,方便调试和监控。
|
@ -1,4 +1,4 @@
|
||||
package handlerlog
|
||||
package handlerctx
|
||||
|
||||
import (
|
||||
"context"
|
@ -6,7 +6,7 @@ import (
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"git.jeffthecoder.xyz/guochao/cache-proxy/pkgs/middleware/handlerlog"
|
||||
"git.jeffthecoder.xyz/guochao/cache-proxy/pkgs/middleware/handlerctx"
|
||||
)
|
||||
|
||||
type ctxKey int
|
||||
@ -43,7 +43,7 @@ func Log(config Config) func(http.Handler) http.Handler {
|
||||
"path", r.URL.Path,
|
||||
}
|
||||
|
||||
if pattern, handlerFound := handlerlog.Pattern(r); handlerFound {
|
||||
if pattern, handlerFound := handlerctx.Pattern(r); handlerFound {
|
||||
args = append(args, "handler", pattern)
|
||||
}
|
||||
|
||||
|
727
server.go
727
server.go
@ -1,20 +1,19 @@
|
||||
package cacheproxy
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"slices"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
@ -26,12 +25,17 @@ const (
|
||||
|
||||
var zeroTime time.Time
|
||||
|
||||
var preclosedChan = make(chan struct{})
|
||||
|
||||
func init() {
|
||||
close(preclosedChan)
|
||||
}
|
||||
|
||||
var (
|
||||
httpClient = http.Client{
|
||||
// check allowed redirect
|
||||
CheckRedirect: func(req *http.Request, via []*http.Request) error {
|
||||
lastRequest := via[len(via)-1]
|
||||
if allowedRedirect, ok := lastRequest.Context().Value(reqCtxAllowedRedirect).(string); ok {
|
||||
if allowedRedirect, ok := req.Context().Value(reqCtxAllowedRedirect).(string); ok {
|
||||
if matched, err := regexp.MatchString(allowedRedirect, req.URL.String()); err != nil {
|
||||
return err
|
||||
} else if !matched {
|
||||
@ -45,44 +49,16 @@ var (
|
||||
)
|
||||
|
||||
type StreamObject struct {
|
||||
Headers http.Header
|
||||
Buffer *bytes.Buffer
|
||||
Offset int
|
||||
Headers http.Header
|
||||
TempFile *os.File // The temporary file holding the download content.
|
||||
Offset int64 // The number of bytes written to TempFile.
|
||||
Done bool
|
||||
Error error
|
||||
|
||||
ctx context.Context
|
||||
wg *sync.WaitGroup
|
||||
}
|
||||
mu *sync.Mutex
|
||||
cond *sync.Cond
|
||||
|
||||
func (memoryObject *StreamObject) StreamTo(w io.Writer, wg *sync.WaitGroup) error {
|
||||
defer wg.Done()
|
||||
offset := 0
|
||||
if w == nil {
|
||||
w = io.Discard
|
||||
}
|
||||
OUTER:
|
||||
for {
|
||||
select {
|
||||
case <-memoryObject.ctx.Done():
|
||||
break OUTER
|
||||
default:
|
||||
}
|
||||
|
||||
newOffset := memoryObject.Offset
|
||||
if newOffset == offset {
|
||||
time.Sleep(time.Millisecond)
|
||||
continue
|
||||
}
|
||||
bytes := memoryObject.Buffer.Bytes()[offset:newOffset]
|
||||
written, err := w.Write(bytes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
offset += written
|
||||
}
|
||||
time.Sleep(time.Millisecond)
|
||||
_, err := w.Write(memoryObject.Buffer.Bytes()[offset:])
|
||||
return err
|
||||
fileWrittenCh chan struct{} // Closed when the file is fully written and renamed.
|
||||
}
|
||||
|
||||
type Server struct {
|
||||
@ -101,13 +77,8 @@ func NewServer(config Config) *Server {
|
||||
}
|
||||
}
|
||||
|
||||
type Chunk struct {
|
||||
buffer []byte
|
||||
error error
|
||||
}
|
||||
|
||||
func (server *Server) serveFile(w http.ResponseWriter, r *http.Request, path string) {
|
||||
if location := r.Header.Get(server.Storage.Accel.EnableByHeader); server.Storage.Accel.EnableByHeader != "" && location != "" {
|
||||
if location := r.Header.Get(server.Storage.Local.Accel.EnableByHeader); server.Storage.Local.Accel.EnableByHeader != "" && location != "" {
|
||||
relPath, err := filepath.Rel(server.Storage.Local.Path, path)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
@ -115,7 +86,7 @@ func (server *Server) serveFile(w http.ResponseWriter, r *http.Request, path str
|
||||
}
|
||||
accelPath := filepath.Join(location, relPath)
|
||||
|
||||
for _, headerKey := range server.Storage.Accel.ResponseWithHeaders {
|
||||
for _, headerKey := range server.Storage.Local.Accel.RespondWithHeaders {
|
||||
w.Header().Set(headerKey, accelPath)
|
||||
}
|
||||
|
||||
@ -143,26 +114,29 @@ func (server *Server) HandleRequestWithCache(w http.ResponseWriter, r *http.Requ
|
||||
slog.With("status", localStatus, "mtime", mtime, "error", err, "key", fullpath).Debug("local status checked")
|
||||
if os.IsPermission(err) {
|
||||
http.Error(w, err.Error(), http.StatusForbidden)
|
||||
return
|
||||
} else if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
} else if localStatus != localNotExists {
|
||||
if localStatus == localExistsButNeedHead {
|
||||
if ranged {
|
||||
server.streamOnline(nil, r, mtime, fullpath)
|
||||
server.serveFile(w, r, fullpath)
|
||||
} else {
|
||||
server.streamOnline(w, r, mtime, fullpath)
|
||||
}
|
||||
} else {
|
||||
server.serveFile(w, r, fullpath)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if localStatus == localExists {
|
||||
server.serveFile(w, r, fullpath)
|
||||
return
|
||||
}
|
||||
|
||||
// localExistsButNeedHead or localNotExists
|
||||
// Both need to go online.
|
||||
if ranged {
|
||||
server.serveRangedRequest(w, r, fullpath, mtime)
|
||||
} else {
|
||||
if ranged {
|
||||
server.streamOnline(nil, r, mtime, fullpath)
|
||||
server.serveFile(w, r, fullpath)
|
||||
} else {
|
||||
server.streamOnline(w, r, mtime, fullpath)
|
||||
// For full requests, we wait for the download to complete and then serve the file.
|
||||
// This maintains the original behavior for now.
|
||||
ch := server.startOrJoinStream(r, mtime, fullpath)
|
||||
if ch != nil {
|
||||
<-ch
|
||||
}
|
||||
server.serveFile(w, r, fullpath)
|
||||
}
|
||||
}
|
||||
|
||||
@ -210,229 +184,348 @@ func (server *Server) checkLocal(w http.ResponseWriter, _ *http.Request, key str
|
||||
return localNotExists, zeroTime, nil
|
||||
}
|
||||
|
||||
func (server *Server) streamOnline(w http.ResponseWriter, r *http.Request, mtime time.Time, key string) {
|
||||
func (server *Server) serveRangedRequest(w http.ResponseWriter, r *http.Request, key string, mtime time.Time) {
|
||||
server.lu.Lock()
|
||||
memoryObject, exists := server.o[r.URL.Path]
|
||||
locked := false
|
||||
defer func() {
|
||||
if locked {
|
||||
server.lu.Unlock()
|
||||
locked = false
|
||||
}
|
||||
}()
|
||||
if !exists {
|
||||
server.lu.Unlock()
|
||||
// This is the first request for this file, so we start the download.
|
||||
server.startOrJoinStream(r, mtime, key)
|
||||
|
||||
// Re-acquire the lock to get the newly created stream object.
|
||||
server.lu.Lock()
|
||||
locked = true
|
||||
|
||||
memoryObject, exists = server.o[r.URL.Path]
|
||||
}
|
||||
if exists {
|
||||
if locked {
|
||||
memoryObject = server.o[r.URL.Path]
|
||||
if memoryObject == nil {
|
||||
server.lu.Unlock()
|
||||
locked = false
|
||||
// This can happen if the upstream fails very quickly.
|
||||
http.Error(w, "Failed to start download stream", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
server.lu.Unlock()
|
||||
|
||||
if w != nil {
|
||||
memoryObject.wg.Add(1)
|
||||
for k := range memoryObject.Headers {
|
||||
v := memoryObject.Headers.Get(k)
|
||||
w.Header().Set(k, v)
|
||||
}
|
||||
// At this point, we have a memoryObject, and a producer goroutine is downloading the file.
|
||||
// Now we implement the consumer logic.
|
||||
|
||||
if err := memoryObject.StreamTo(w, memoryObject.wg); err != nil {
|
||||
slog.With("error", err).Warn("failed to stream response with existing memory object")
|
||||
}
|
||||
// Duplicate the file descriptor from the producer's temp file. This creates a new,
|
||||
// independent file descriptor that points to the same underlying file description.
|
||||
// This is more efficient and robust than opening the file by path.
|
||||
fd, err := syscall.Dup(int(memoryObject.TempFile.Fd()))
|
||||
if err != nil {
|
||||
http.Error(w, "Failed to duplicate file descriptor", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
// We create a new *os.File from the duplicated descriptor. The consumer is now
|
||||
// responsible for closing this new file.
|
||||
consumerFile := os.NewFile(uintptr(fd), memoryObject.TempFile.Name())
|
||||
if consumerFile == nil {
|
||||
syscall.Close(fd) // Clean up if NewFile fails
|
||||
http.Error(w, "Failed to create file from duplicated descriptor", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
defer consumerFile.Close()
|
||||
|
||||
rangeHeader := r.Header.Get("Range")
|
||||
if rangeHeader == "" {
|
||||
// This should not happen if called from HandleRequestWithCache, but as a safeguard:
|
||||
http.Error(w, "Range header is required", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Parse the Range header. We only support a single range like "bytes=start-end".
|
||||
var start, end int64
|
||||
parts := strings.Split(strings.TrimPrefix(rangeHeader, "bytes="), "-")
|
||||
if len(parts) != 2 {
|
||||
http.Error(w, "Invalid Range header", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
start, err = strconv.ParseInt(parts[0], 10, 64)
|
||||
if err != nil {
|
||||
http.Error(w, "Invalid Range header", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
if parts[1] != "" {
|
||||
end, err = strconv.ParseInt(parts[1], 10, 64)
|
||||
if err != nil {
|
||||
http.Error(w, "Invalid Range header", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
slog.With("mtime", mtime).Debug("checking fastest upstream")
|
||||
selectedIdx, response, chunks, err := server.fastesUpstream(r, mtime)
|
||||
if chunks == nil && mtime != zeroTime {
|
||||
slog.With("upstreamIdx", selectedIdx, "key", key).Debug("not modified. using local version")
|
||||
if w != nil {
|
||||
server.serveFile(w, r, key)
|
||||
// An empty end means "to the end of the file". We don't know the full size yet.
|
||||
// We'll have to handle this dynamically.
|
||||
end = -1 // Sentinel value
|
||||
}
|
||||
|
||||
memoryObject.mu.Lock()
|
||||
defer memoryObject.mu.Unlock()
|
||||
|
||||
// Wait until we have the headers.
|
||||
for memoryObject.Headers == nil && !memoryObject.Done {
|
||||
memoryObject.cond.Wait()
|
||||
}
|
||||
|
||||
if memoryObject.Error != nil {
|
||||
http.Error(w, memoryObject.Error.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
contentLengthStr := memoryObject.Headers.Get("Content-Length")
|
||||
totalSize, _ := strconv.ParseInt(contentLengthStr, 10, 64)
|
||||
if end == -1 || end >= totalSize {
|
||||
end = totalSize - 1
|
||||
}
|
||||
|
||||
if start >= totalSize || start > end {
|
||||
http.Error(w, "Range not satisfiable", http.StatusRequestedRangeNotSatisfiable)
|
||||
return
|
||||
}
|
||||
|
||||
var bytesSent int64
|
||||
bytesToSend := end - start + 1
|
||||
headersWritten := false
|
||||
|
||||
for bytesSent < bytesToSend && memoryObject.Error == nil {
|
||||
// Calculate what we need.
|
||||
neededStart := start + bytesSent
|
||||
|
||||
// Wait for the data to be available.
|
||||
for memoryObject.Offset <= neededStart && !memoryObject.Done {
|
||||
memoryObject.cond.Wait()
|
||||
}
|
||||
|
||||
// Check for error AFTER waiting. This is the critical fix.
|
||||
if memoryObject.Error != nil {
|
||||
// If headers haven't been written, we can send a 500 error.
|
||||
// If they have, it's too late, the connection will just be closed.
|
||||
if !headersWritten {
|
||||
http.Error(w, memoryObject.Error.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
return // Use return instead of break to exit immediately.
|
||||
}
|
||||
|
||||
// If we are here, we have some data to send. Write headers if we haven't already.
|
||||
if !headersWritten {
|
||||
w.Header().Set("Content-Range", "bytes "+strconv.FormatInt(start, 10)+"-"+strconv.FormatInt(end, 10)+"/"+contentLengthStr)
|
||||
w.Header().Set("Content-Length", strconv.FormatInt(end-start+1, 10))
|
||||
w.Header().Set("Accept-Ranges", "bytes")
|
||||
w.WriteHeader(http.StatusPartialContent)
|
||||
headersWritten = true
|
||||
}
|
||||
|
||||
// Data is available, read from the temporary file.
|
||||
// We calculate how much we can read in this iteration.
|
||||
readNow := memoryObject.Offset - neededStart
|
||||
if readNow <= 0 {
|
||||
// This can happen if we woke up but the data isn't what we need.
|
||||
// The loop will continue to wait.
|
||||
continue
|
||||
}
|
||||
|
||||
// Don't read more than the client requested in total.
|
||||
remainingToSend := bytesToSend - bytesSent
|
||||
if readNow > remainingToSend {
|
||||
readNow = remainingToSend
|
||||
}
|
||||
|
||||
// Read the chunk from the file at the correct offset.
|
||||
buffer := make([]byte, readNow)
|
||||
bytesRead, err := consumerFile.ReadAt(buffer, neededStart)
|
||||
if err != nil && err != io.EOF {
|
||||
if !headersWritten {
|
||||
http.Error(w, "Error reading from cache stream", http.StatusInternalServerError)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
slog.With("error", err).Warn("failed to select fastest upstream")
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
if selectedIdx == -1 || response == nil || chunks == nil {
|
||||
slog.Debug("no upstream is selected")
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
if response.StatusCode == http.StatusNotModified {
|
||||
slog.With("upstreamIdx", selectedIdx).Debug("not modified. using local version")
|
||||
os.Chtimes(key, zeroTime, time.Now())
|
||||
server.serveFile(w, r, key)
|
||||
return
|
||||
}
|
||||
|
||||
slog.With(
|
||||
"upstreamIdx", selectedIdx,
|
||||
).Debug("found fastest upstream")
|
||||
|
||||
buffer := &bytes.Buffer{}
|
||||
ctx, cancel := context.WithCancel(r.Context())
|
||||
defer cancel()
|
||||
|
||||
memoryObject = &StreamObject{
|
||||
Headers: response.Header,
|
||||
Buffer: buffer,
|
||||
|
||||
ctx: ctx,
|
||||
|
||||
wg: &sync.WaitGroup{},
|
||||
}
|
||||
|
||||
server.o[r.URL.Path] = memoryObject
|
||||
server.lu.Unlock()
|
||||
locked = false
|
||||
|
||||
err = nil
|
||||
|
||||
if w != nil {
|
||||
memoryObject.wg.Add(1)
|
||||
|
||||
for k := range memoryObject.Headers {
|
||||
v := memoryObject.Headers.Get(k)
|
||||
w.Header().Set(k, v)
|
||||
}
|
||||
|
||||
go memoryObject.StreamTo(w, memoryObject.wg)
|
||||
}
|
||||
|
||||
for chunk := range chunks {
|
||||
if chunk.error != nil {
|
||||
err = chunk.error
|
||||
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
|
||||
slog.With("error", err).Warn("failed to read from upstream")
|
||||
}
|
||||
}
|
||||
if chunk.buffer == nil {
|
||||
break
|
||||
}
|
||||
n, _ := buffer.Write(chunk.buffer)
|
||||
memoryObject.Offset += n
|
||||
}
|
||||
cancel()
|
||||
|
||||
memoryObject.wg.Wait()
|
||||
|
||||
if response.ContentLength > 0 {
|
||||
if memoryObject.Offset == int(response.ContentLength) && err != nil {
|
||||
if !(errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF)) {
|
||||
slog.With("read-length", memoryObject.Offset, "content-length", response.ContentLength, "error", err, "upstreamIdx", selectedIdx).Debug("something happened during download. but response body is read as whole. so error is reset to nil")
|
||||
}
|
||||
err = nil
|
||||
}
|
||||
} else if err == io.EOF {
|
||||
err = nil
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
logger := slog.With("upstreamIdx", selectedIdx)
|
||||
logger.Error("something happened during download. will not cache this response. setting lingering to reset the connection.")
|
||||
hijacker, ok := w.(http.Hijacker)
|
||||
if !ok {
|
||||
logger.Warn("response writer is not a hijacker. failed to set lingering")
|
||||
return
|
||||
}
|
||||
conn, _, err := hijacker.Hijack()
|
||||
if bytesRead > 0 {
|
||||
n, err := w.Write(buffer[:bytesRead])
|
||||
if err != nil {
|
||||
logger.With("error", err).Warn("hijack failed. failed to set lingering")
|
||||
// Client closed connection, just return.
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
tcpConn, ok := conn.(*net.TCPConn)
|
||||
if !ok {
|
||||
logger.With("error", err).Warn("connection is not a *net.TCPConn. failed to set lingering")
|
||||
return
|
||||
}
|
||||
if err := tcpConn.SetLinger(0); err != nil {
|
||||
logger.With("error", err).Warn("failed to set lingering")
|
||||
return
|
||||
}
|
||||
logger.Debug("connection set to linger. it will be reset once the conn.Close is called")
|
||||
bytesSent += int64(n)
|
||||
}
|
||||
go func() {
|
||||
defer func() {
|
||||
server.lu.Lock()
|
||||
defer server.lu.Unlock()
|
||||
|
||||
delete(server.o, r.URL.Path)
|
||||
slog.Debug("memory object released")
|
||||
}()
|
||||
|
||||
if err == nil {
|
||||
slog.Debug("preparing to release memory object")
|
||||
mtime := zeroTime
|
||||
lastModifiedHeader := response.Header.Get("Last-Modified")
|
||||
if lastModified, err := time.Parse(time.RFC1123, lastModifiedHeader); err != nil {
|
||||
slog.With(
|
||||
"error", err,
|
||||
"value", lastModifiedHeader,
|
||||
"url", response.Request.URL,
|
||||
).Debug("failed to parse last modified header value. set modified time to now")
|
||||
} else {
|
||||
slog.With(
|
||||
"header", lastModifiedHeader,
|
||||
"value", lastModified,
|
||||
"url", response.Request.URL,
|
||||
).Debug("found modified time")
|
||||
mtime = lastModified
|
||||
}
|
||||
if err := os.MkdirAll(server.Storage.Local.Path, 0755); err != nil {
|
||||
slog.With("error", err).Warn("failed to create local storage path")
|
||||
}
|
||||
|
||||
if server.Config.Storage.Local.TemporaryFilePattern == "" {
|
||||
if err := os.WriteFile(key, buffer.Bytes(), 0644); err != nil {
|
||||
slog.With("error", err).Warn("failed to write file")
|
||||
os.Remove(key)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
fp, err := os.CreateTemp(server.Storage.Local.Path, server.Storage.Local.TemporaryFilePattern)
|
||||
if err != nil {
|
||||
slog.With(
|
||||
"key", key,
|
||||
"path", server.Storage.Local.Path,
|
||||
"pattern", server.Storage.Local.TemporaryFilePattern,
|
||||
"error", err,
|
||||
).Warn("failed to create template file")
|
||||
return
|
||||
}
|
||||
|
||||
name := fp.Name()
|
||||
|
||||
if _, err := fp.Write(buffer.Bytes()); err != nil {
|
||||
fp.Close()
|
||||
os.Remove(name)
|
||||
|
||||
slog.With("error", err).Warn("failed to write into template file")
|
||||
} else if err := fp.Close(); err != nil {
|
||||
os.Remove(name)
|
||||
|
||||
slog.With("error", err).Warn("failed to close template file")
|
||||
} else {
|
||||
os.Chtimes(name, zeroTime, mtime)
|
||||
dirname := filepath.Dir(key)
|
||||
os.MkdirAll(dirname, 0755)
|
||||
os.Remove(key)
|
||||
os.Rename(name, key)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
if memoryObject.Done && bytesSent >= bytesToSend {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (resultIdx int, resultResponse *http.Response, resultCh chan Chunk, resultErr error) {
|
||||
// startOrJoinStream ensures a download stream is active for the given key.
|
||||
// If a stream already exists, it returns the channel that signals completion.
|
||||
// If not, it starts a new download producer and returns its completion channel.
|
||||
func (server *Server) startOrJoinStream(r *http.Request, mtime time.Time, key string) <-chan struct{} {
|
||||
server.lu.Lock()
|
||||
memoryObject, exists := server.o[r.URL.Path]
|
||||
if exists {
|
||||
// A download is already in progress. Return its completion channel.
|
||||
server.lu.Unlock()
|
||||
return memoryObject.fileWrittenCh
|
||||
}
|
||||
|
||||
// No active stream, create a new one.
|
||||
if err := os.MkdirAll(filepath.Dir(key), 0755); err != nil {
|
||||
slog.With("error", err).Warn("failed to create local storage path for temp file")
|
||||
server.lu.Unlock()
|
||||
return preclosedChan // Return a closed channel to prevent blocking
|
||||
}
|
||||
|
||||
tempFilePattern := server.Storage.Local.TemporaryFilePattern
|
||||
if tempFilePattern == "" {
|
||||
tempFilePattern = "cache-proxy-*"
|
||||
}
|
||||
tempFile, err := os.CreateTemp(filepath.Dir(key), tempFilePattern)
|
||||
if err != nil {
|
||||
slog.With("error", err).Warn("failed to create temporary file")
|
||||
server.lu.Unlock()
|
||||
return preclosedChan
|
||||
}
|
||||
|
||||
mu := &sync.Mutex{}
|
||||
fileWrittenCh := make(chan struct{})
|
||||
memoryObject = &StreamObject{
|
||||
TempFile: tempFile,
|
||||
mu: mu,
|
||||
cond: sync.NewCond(mu),
|
||||
fileWrittenCh: fileWrittenCh,
|
||||
}
|
||||
server.o[r.URL.Path] = memoryObject
|
||||
server.lu.Unlock()
|
||||
|
||||
// This is the producer goroutine
|
||||
go func(mo *StreamObject) {
|
||||
var err error
|
||||
downloadSucceeded := false
|
||||
tempFileName := mo.TempFile.Name()
|
||||
|
||||
defer func() {
|
||||
// On completion (or error), update the stream object,
|
||||
// wake up all consumers, and then clean up.
|
||||
mo.mu.Lock()
|
||||
mo.Done = true
|
||||
mo.Error = err
|
||||
mo.cond.Broadcast()
|
||||
mo.mu.Unlock()
|
||||
|
||||
// Close the temp file handle.
|
||||
mo.TempFile.Close()
|
||||
|
||||
// If download failed, remove the temp file.
|
||||
if !downloadSucceeded {
|
||||
os.Remove(tempFileName)
|
||||
}
|
||||
|
||||
// The producer's job is done. Remove the object from the central map.
|
||||
// Any existing consumers still hold a reference to the object,
|
||||
// so it won't be garbage collected until they are done.
|
||||
server.lu.Lock()
|
||||
delete(server.o, r.URL.Path)
|
||||
server.lu.Unlock()
|
||||
slog.Debug("memory object released by producer")
|
||||
close(mo.fileWrittenCh)
|
||||
}()
|
||||
|
||||
slog.With("mtime", mtime).Debug("checking fastest upstream")
|
||||
selectedIdx, response, firstChunk, upstreamErr := server.fastestUpstream(r, mtime)
|
||||
if upstreamErr != nil {
|
||||
if !errors.Is(upstreamErr, io.EOF) && !errors.Is(upstreamErr, io.ErrUnexpectedEOF) {
|
||||
slog.With("error", upstreamErr).Warn("failed to select fastest upstream")
|
||||
err = upstreamErr
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if selectedIdx == -1 || response == nil {
|
||||
slog.Debug("no upstream is selected")
|
||||
err = errors.New("no suitable upstream found")
|
||||
return
|
||||
}
|
||||
|
||||
if response.StatusCode == http.StatusNotModified {
|
||||
slog.With("upstreamIdx", selectedIdx).Debug("not modified. using local version")
|
||||
os.Chtimes(key, zeroTime, time.Now())
|
||||
// In this case, we don't have a new file, so we just exit.
|
||||
// The temp file will be cleaned up by the defer.
|
||||
return
|
||||
}
|
||||
|
||||
defer response.Body.Close()
|
||||
|
||||
slog.With("upstreamIdx", selectedIdx).Debug("found fastest upstream")
|
||||
|
||||
mo.mu.Lock()
|
||||
mo.Headers = response.Header
|
||||
mo.cond.Broadcast() // Broadcast headers availability
|
||||
mo.mu.Unlock()
|
||||
|
||||
// Write the first chunk that we already downloaded.
|
||||
if len(firstChunk) > 0 {
|
||||
written, writeErr := mo.TempFile.Write(firstChunk)
|
||||
if writeErr != nil {
|
||||
err = writeErr
|
||||
return
|
||||
}
|
||||
mo.mu.Lock()
|
||||
mo.Offset += int64(written)
|
||||
mo.cond.Broadcast()
|
||||
mo.mu.Unlock()
|
||||
}
|
||||
|
||||
// Download the rest of the file in chunks
|
||||
buffer := make([]byte, server.Misc.ChunkBytes)
|
||||
for {
|
||||
n, readErr := response.Body.Read(buffer)
|
||||
if n > 0 {
|
||||
written, writeErr := mo.TempFile.Write(buffer[:n])
|
||||
if writeErr != nil {
|
||||
err = writeErr
|
||||
break
|
||||
}
|
||||
mo.mu.Lock()
|
||||
mo.Offset += int64(written)
|
||||
mo.cond.Broadcast()
|
||||
mo.mu.Unlock()
|
||||
}
|
||||
if readErr != nil {
|
||||
if readErr != io.EOF {
|
||||
err = readErr
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// After download, if no critical error, rename the temp file to its final destination.
|
||||
if err == nil {
|
||||
// Set modification time
|
||||
mtime := zeroTime
|
||||
lastModifiedHeader := response.Header.Get("Last-Modified")
|
||||
if lastModified, lmErr := time.Parse(time.RFC1123, lastModifiedHeader); lmErr == nil {
|
||||
mtime = lastModified
|
||||
}
|
||||
// Close file before Chtimes and Rename
|
||||
mo.TempFile.Close()
|
||||
|
||||
os.Chtimes(tempFileName, zeroTime, mtime)
|
||||
|
||||
// Rename the file
|
||||
if renameErr := os.Rename(tempFileName, key); renameErr != nil {
|
||||
slog.With("error", renameErr, "from", tempFileName, "to", key).Warn("failed to rename temp file")
|
||||
err = renameErr
|
||||
os.Remove(tempFileName) // Attempt to clean up if rename fails
|
||||
} else {
|
||||
downloadSucceeded = true
|
||||
}
|
||||
} else {
|
||||
logger := slog.With("upstreamIdx", selectedIdx)
|
||||
logger.Error("something happened during download. will not cache this response.", "error", err)
|
||||
}
|
||||
}(memoryObject)
|
||||
|
||||
return fileWrittenCh
|
||||
}
|
||||
|
||||
func (server *Server) fastestUpstream(r *http.Request, lastModified time.Time) (resultIdx int, resultResponse *http.Response, firstChunk []byte, resultErr error) {
|
||||
returnLock := &sync.Mutex{}
|
||||
upstreams := len(server.Upstreams)
|
||||
cancelFuncs := make([]func(), upstreams)
|
||||
@ -443,6 +536,8 @@ func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (r
|
||||
|
||||
resultIdx = -1
|
||||
|
||||
var resultFirstChunk []byte
|
||||
|
||||
defer close(updateCh)
|
||||
defer close(notModifiedCh)
|
||||
defer func() {
|
||||
@ -457,7 +552,8 @@ func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (r
|
||||
groups := make(map[int][]int)
|
||||
for upstreamIdx, upstream := range server.Upstreams {
|
||||
if _, matched, err := upstream.GetPath(r.URL.Path); err != nil {
|
||||
return -1, nil, nil, err
|
||||
resultErr = err
|
||||
return
|
||||
} else if !matched {
|
||||
continue
|
||||
}
|
||||
@ -465,7 +561,8 @@ func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (r
|
||||
priority := 0
|
||||
for _, priorityGroup := range upstream.PriorityGroups {
|
||||
if matched, err := regexp.MatchString(priorityGroup.Match, r.URL.Path); err != nil {
|
||||
return -1, nil, nil, err
|
||||
resultErr = err
|
||||
return
|
||||
} else if matched {
|
||||
priority = priorityGroup.Priority
|
||||
break
|
||||
@ -501,7 +598,7 @@ func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (r
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
response, ch, err := server.tryUpstream(ctx, idx, priority, r, lastModified)
|
||||
response, chunk, err := server.tryUpstream(ctx, idx, priority, r, lastModified)
|
||||
if err == context.Canceled { // others returned
|
||||
logger.Debug("context canceled")
|
||||
return
|
||||
@ -521,13 +618,16 @@ func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (r
|
||||
}
|
||||
locked := returnLock.TryLock()
|
||||
if !locked {
|
||||
if response != nil {
|
||||
response.Body.Close()
|
||||
}
|
||||
return
|
||||
}
|
||||
defer returnLock.Unlock()
|
||||
|
||||
if response.StatusCode == http.StatusNotModified {
|
||||
notModifiedOnce.Do(func() {
|
||||
resultResponse, resultCh, resultErr = response, ch, err
|
||||
resultResponse, resultErr = response, err
|
||||
notModifiedCh <- idx
|
||||
})
|
||||
logger.Debug("voted not modified")
|
||||
@ -535,7 +635,7 @@ func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (r
|
||||
}
|
||||
|
||||
updateOnce.Do(func() {
|
||||
resultResponse, resultCh, resultErr = response, ch, err
|
||||
resultResponse, resultFirstChunk, resultErr = response, chunk, err
|
||||
updateCh <- idx
|
||||
|
||||
for cancelIdx, cancel := range cancelFuncs {
|
||||
@ -555,6 +655,7 @@ func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (r
|
||||
select {
|
||||
case idx := <-updateCh:
|
||||
resultIdx = idx
|
||||
firstChunk = resultFirstChunk
|
||||
logger.With("upstreamIdx", resultIdx).Debug("upstream selected")
|
||||
return
|
||||
default:
|
||||
@ -569,10 +670,10 @@ func (server *Server) fastesUpstream(r *http.Request, lastModified time.Time) (r
|
||||
}
|
||||
}
|
||||
|
||||
return -1, nil, nil, nil
|
||||
return
|
||||
}
|
||||
|
||||
func (server *Server) tryUpstream(ctx context.Context, upstreamIdx, priority int, r *http.Request, lastModified time.Time) (response *http.Response, chunks chan Chunk, err error) {
|
||||
func (server *Server) tryUpstream(ctx context.Context, upstreamIdx, priority int, r *http.Request, lastModified time.Time) (response *http.Response, firstChunk []byte, err error) {
|
||||
upstream := server.Upstreams[upstreamIdx]
|
||||
|
||||
newpath, matched, err := upstream.GetPath(r.URL.Path)
|
||||
@ -618,23 +719,55 @@ func (server *Server) tryUpstream(ctx context.Context, upstreamIdx, priority int
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
shouldCloseBody := true
|
||||
defer func() {
|
||||
if shouldCloseBody && response != nil {
|
||||
response.Body.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
if response.StatusCode == http.StatusNotModified {
|
||||
return response, nil, nil
|
||||
}
|
||||
if response.StatusCode >= 400 && response.StatusCode < 500 {
|
||||
return nil, nil, nil
|
||||
}
|
||||
if response.StatusCode < 200 || response.StatusCode >= 500 {
|
||||
logger.With(
|
||||
"url", newurl,
|
||||
"status", response.StatusCode,
|
||||
).Warn("unexpected status")
|
||||
return response, nil, fmt.Errorf("unexpected status(url=%v): %v: %v", newurl, response.StatusCode, response)
|
||||
|
||||
responseCheckers := upstream.Checkers
|
||||
if len(responseCheckers) == 0 {
|
||||
responseCheckers = append(responseCheckers, Checker{})
|
||||
}
|
||||
|
||||
var currentOffset int64
|
||||
for _, checker := range responseCheckers {
|
||||
if len(checker.StatusCodes) == 0 {
|
||||
checker.StatusCodes = append(checker.StatusCodes, http.StatusOK)
|
||||
}
|
||||
|
||||
ch := make(chan Chunk, 1024)
|
||||
if !slices.Contains(checker.StatusCodes, response.StatusCode) {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
for _, headerChecker := range checker.Headers {
|
||||
if headerChecker.Match == nil {
|
||||
// check header exists
|
||||
if _, ok := response.Header[headerChecker.Name]; !ok {
|
||||
logger.Debug("missing header", "header", headerChecker.Name)
|
||||
return nil, nil, nil
|
||||
}
|
||||
} else {
|
||||
// check header match
|
||||
value := response.Header.Get(headerChecker.Name)
|
||||
if matched, err := regexp.MatchString(*headerChecker.Match, value); err != nil {
|
||||
return nil, nil, err
|
||||
} else if !matched {
|
||||
logger.Debug("invalid header value",
|
||||
"header", headerChecker.Name,
|
||||
"value", value,
|
||||
"matcher", *headerChecker.Match,
|
||||
)
|
||||
|
||||
return nil, nil, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
buffer := make([]byte, server.Misc.FirstChunkBytes)
|
||||
n, err := io.ReadAtLeast(response.Body, buffer, len(buffer))
|
||||
@ -643,28 +776,12 @@ func (server *Server) tryUpstream(ctx context.Context, upstreamIdx, priority int
|
||||
if n == 0 {
|
||||
return response, nil, err
|
||||
}
|
||||
}
|
||||
ch <- Chunk{buffer: buffer[:n]}
|
||||
|
||||
go func() {
|
||||
defer close(ch)
|
||||
|
||||
for {
|
||||
buffer := make([]byte, server.Misc.ChunkBytes)
|
||||
n, err := io.ReadAtLeast(response.Body, buffer, len(buffer))
|
||||
if n > 0 {
|
||||
ch <- Chunk{buffer: buffer[:n]}
|
||||
currentOffset += int64(n)
|
||||
}
|
||||
if response.ContentLength > 0 && currentOffset == response.ContentLength && err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
ch <- Chunk{error: err}
|
||||
return
|
||||
}
|
||||
if errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, io.EOF) {
|
||||
err = nil
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
return response, ch, nil
|
||||
shouldCloseBody = false
|
||||
return response, buffer[:n], err
|
||||
}
|
||||
|
1715
server_test.go
Normal file
1715
server_test.go
Normal file
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user