runner.go (9742B)
1 package jci 2 3 import ( 4 "bytes" 5 "database/sql" 6 "encoding/json" 7 "fmt" 8 "io" 9 "log" 10 "math/rand" 11 "net/http" 12 "os" 13 "strings" 14 "time" 15 16 _ "modernc.org/sqlite" 17 ) 18 19 // Runner polls the jci server for jobs and dispatches them into Docker containers. 20 // 21 // Configuration env vars: 22 // 23 // JCI_SERVER base URL or host:port of the jci server, e.g. https://jci.example.com or jci_server:8080 24 // If no scheme is present, http:// is prepended automatically. 25 // JCI_RUNNER_SECRET shared secret matching RUNNER_SECRET on the server 26 // JCI_RUNNER_DB path to runner's SQLite DB (default /var/lib/jci/runner.db) 27 // JCI_BINARY path to the git-jci binary to inject into containers (default /usr/local/bin/git-jci) 28 func Runner(_ []string) error { 29 cfg, err := loadRunnerConfig() 30 if err != nil { 31 return err 32 } 33 34 db, err := openRunnerDB(cfg.dbPath) 35 if err != nil { 36 return fmt.Errorf("open runner db: %w", err) 37 } 38 defer db.Close() 39 40 runnerID, err := getOrCreateRunnerID(db) 41 if err != nil { 42 return fmt.Errorf("runner identity: %w", err) 43 } 44 log.Printf("jci runner starting (id=%s)", runnerID) 45 log.Printf("connecting to server %s", cfg.serverURL) 46 47 r := &runner{cfg: cfg, db: db, runnerID: runnerID, rt: newDockerRuntime()} 48 r.pollLoop() 49 return nil // pollLoop runs forever; return only on fatal errors 50 } 51 52 // --------------------------------------------------------------------------- 53 // Config 54 // --------------------------------------------------------------------------- 55 56 type runnerConfig struct { 57 serverURL string 58 runnerSecret string 59 dbPath string 60 binaryPath string 61 } 62 63 func loadRunnerConfig() (runnerConfig, error) { 64 cfg := runnerConfig{ 65 dbPath: "/var/lib/jci/runner.db", 66 binaryPath: "/usr/local/bin/git-jci", 67 } 68 cfg.serverURL = normalizeServerURL(os.Getenv("JCI_SERVER")) 69 cfg.runnerSecret = os.Getenv("JCI_RUNNER_SECRET") 70 71 if cfg.serverURL == "" || cfg.runnerSecret == "" { 72 return cfg, fmt.Errorf("missing required env vars: JCI_SERVER, JCI_RUNNER_SECRET") 73 } 74 if v := os.Getenv("JCI_RUNNER_DB"); v != "" { 75 cfg.dbPath = v 76 } 77 if v := os.Getenv("JCI_BINARY"); v != "" { 78 cfg.binaryPath = v 79 } 80 return cfg, nil 81 } 82 83 // --------------------------------------------------------------------------- 84 // Runner SQLite identity store 85 // --------------------------------------------------------------------------- 86 87 func openRunnerDB(path string) (*sql.DB, error) { 88 if err := os.MkdirAll(parentDir(path), 0755); err != nil { 89 return nil, err 90 } 91 db, err := sql.Open("sqlite", path) 92 if err != nil { 93 return nil, err 94 } 95 _, err = db.Exec(`CREATE TABLE IF NOT EXISTS identity (runner_id TEXT PRIMARY KEY)`) 96 return db, err 97 } 98 99 func getOrCreateRunnerID(db *sql.DB) (string, error) { 100 var id string 101 err := db.QueryRow(`SELECT runner_id FROM identity LIMIT 1`).Scan(&id) 102 if err == nil { 103 return id, nil 104 } 105 if err != sql.ErrNoRows { 106 return "", err 107 } 108 id = randomID() 109 _, err = db.Exec(`INSERT INTO identity (runner_id) VALUES (?)`, id) 110 return id, err 111 } 112 113 // --------------------------------------------------------------------------- 114 // Runner 115 // --------------------------------------------------------------------------- 116 117 type runner struct { 118 cfg runnerConfig 119 db *sql.DB 120 runnerID string 121 rt ContainerRuntime 122 } 123 124 // pollLoop runs forever: poll the server every 5s + 0–2s jitter. 125 // Container reaping runs in a separate goroutine on a 60s interval. 126 func (r *runner) pollLoop() { 127 const reapInterval = 60 * time.Second 128 go func() { 129 for { 130 time.Sleep(reapInterval) 131 r.reapStaleContainers() 132 } 133 }() 134 135 for { 136 jitter := time.Duration(rand.Intn(2000)) * time.Millisecond 137 time.Sleep(5*time.Second + jitter) 138 139 retryAfter, job, err := r.poll() 140 if err != nil { 141 log.Printf("poll error: %v", err) 142 continue 143 } 144 if retryAfter > 0 { 145 log.Printf("at capacity; server asked to back off %s", retryAfter) 146 time.Sleep(retryAfter) 147 continue 148 } 149 if job == nil { 150 log.Printf("poll: no jobs available") 151 continue 152 } 153 go r.dispatch(job) // non-blocking; runner returns to poll immediately 154 } 155 } 156 157 type serverJob struct { 158 JobID string `json:"job_id"` 159 CloneURL string `json:"clone_url"` 160 CommitSHA string `json:"commit_sha"` 161 RepoOwner string `json:"repo_owner"` 162 RepoName string `json:"repo_name"` 163 TimeoutSeconds int `json:"timeout_seconds"` 164 Image string `json:"image"` 165 Script string `json:"script"` // base64-encoded shell script 166 AllowDockerSocket bool `json:"allow_docker_socket"` // mount /var/run/docker.sock into job container 167 } 168 169 // poll contacts the server. Returns (retryAfter, job, err). 170 // retryAfter > 0 means 429 was received; job may be nil when there is no work. 171 func (r *runner) poll() (time.Duration, *serverJob, error) { 172 log.Printf("polling %s/poll", r.cfg.serverURL) 173 body, _ := json.Marshal(map[string]string{ 174 "runner_id": r.runnerID, 175 "secret": r.cfg.runnerSecret, 176 }) 177 resp, err := http.Post(r.cfg.serverURL+"/poll", "application/json", bytes.NewReader(body)) 178 if err != nil { 179 return 0, nil, err 180 } 181 defer resp.Body.Close() 182 183 log.Printf("poll response: %d", resp.StatusCode) 184 if resp.StatusCode == http.StatusTooManyRequests { 185 secs := 5 186 if v := resp.Header.Get("Retry-After"); v != "" { 187 fmt.Sscanf(v, "%d", &secs) 188 } 189 return time.Duration(secs) * time.Second, nil, nil 190 } 191 if resp.StatusCode != http.StatusOK { 192 data, _ := io.ReadAll(resp.Body) 193 log.Printf("poll: unexpected status %d: %s", resp.StatusCode, data) 194 return 0, nil, fmt.Errorf("server returned %d: %s", resp.StatusCode, data) 195 } 196 197 var result struct { 198 Job *serverJob `json:"job"` 199 } 200 if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 201 return 0, nil, fmt.Errorf("decode poll response: %w", err) 202 } 203 if result.Job != nil { 204 log.Printf("poll: received job %s (%s/%s@%.12s)", result.Job.JobID, result.Job.RepoOwner, result.Job.RepoName, result.Job.CommitSHA) 205 } 206 return 0, result.Job, nil 207 } 208 209 // dispatch starts a detached Docker container to run the job. 210 func (r *runner) dispatch(job *serverJob) { 211 log.Printf("received job %s: %s/%s@%.12s", job.JobID, job.RepoOwner, job.RepoName, job.CommitSHA) 212 log.Printf("job %s: launching container", job.JobID) 213 214 containerID, err := r.startJobContainer(job) 215 if err != nil { 216 log.Printf("job %s: failed to start container: %v", job.JobID, err) 217 return 218 } 219 log.Printf("job %s: container %s started — waiting for completion (will be reaped after timeout)", job.JobID, containerID[:12]) 220 221 // Container reaping is handled by the periodic reaper; nothing more to do here. 222 } 223 224 // startJobContainer launches a detached container and returns its ID. 225 func (r *runner) startJobContainer(job *serverJob) (string, error) { 226 // The script is delivered as a base64-encoded env var (JCI_SCRIPT). 227 // The container decodes and pipes it into sh — no script content ever 228 // touches a shell on the runner host. 229 timeoutLabel := fmt.Sprintf("%dm", job.TimeoutSeconds/60) 230 231 binds := []string{r.cfg.binaryPath + ":/usr/local/bin/git-jci:ro"} 232 if job.AllowDockerSocket { 233 binds = append(binds, "/var/run/docker.sock:/var/run/docker.sock") 234 } 235 236 spec := ContainerSpec{ 237 Image: job.Image, 238 Command: []string{"/bin/sh", "-c", `echo "$JCI_SCRIPT" | base64 -d | /bin/sh`}, 239 Env: []string{ 240 "JCI_CLONE_URL=" + job.CloneURL, 241 "JCI_COMMIT=" + job.CommitSHA, 242 "JCI_SCRIPT=" + job.Script, 243 }, 244 Binds: binds, 245 Labels: map[string]string{ 246 "jci-job": "y", 247 "jci-job-id": job.JobID, 248 "jci-timeout": timeoutLabel, 249 }, 250 } 251 252 // Log without credentials (clone URL contains token). 253 logSpec := spec 254 logSpec.Env = []string{ 255 "JCI_CLONE_URL=" + maskURL(job.CloneURL), 256 "JCI_COMMIT=" + job.CommitSHA, 257 "JCI_SCRIPT=(base64, " + fmt.Sprintf("%d bytes", len(job.Script)) + ")", 258 } 259 log.Printf("job %s: %s", job.JobID, logContainerCmd(logSpec)) 260 261 return r.rt.StartContainer(spec) 262 } 263 264 // reapStaleContainers kills containers labelled jci-job=y whose running time 265 // exceeds the duration stored in their jci-timeout label. 266 func (r *runner) reapStaleContainers() { 267 containers, err := r.rt.ListContainers("jci-job=y") 268 if err != nil { 269 log.Printf("reaper: list containers: %v", err) 270 return 271 } 272 log.Printf("reaper: found %d jci container(s)", len(containers)) 273 for _, c := range containers { 274 label := c.Labels["jci-timeout"] 275 if label == "" { 276 continue 277 } 278 timeout, err := time.ParseDuration(label) 279 if err != nil { 280 log.Printf("reaper: container %s has invalid jci-timeout label %q: %v", c.ID[:12], label, err) 281 continue 282 } 283 startedAt, err := r.rt.InspectStartedAt(c.ID) 284 if err != nil { 285 log.Printf("reaper: inspect %s: %v", c.ID[:12], err) 286 continue 287 } 288 if time.Since(startedAt) > timeout { 289 log.Printf("reaper: killing stale container %s (timeout=%s, running since %s)", c.ID[:12], timeout, startedAt) 290 if err := r.rt.RemoveContainer(c.ID); err != nil { 291 log.Printf("reaper: remove %s: %v", c.ID[:12], err) 292 } 293 } 294 } 295 } 296 297 // normalizeServerURL ensures the URL has an http:// or https:// scheme. 298 // This allows JCI_SERVER to be set as just a host:port (e.g. "jci_server:8080") 299 // without triggering the "first path segment cannot contain colon" parse error. 300 func normalizeServerURL(s string) string { 301 if s == "" { 302 return s 303 } 304 if !strings.HasPrefix(s, "http://") && !strings.HasPrefix(s, "https://") { 305 return "http://" + s 306 } 307 return s 308 } 309 310 // maskURL replaces the userinfo (credentials) in a URL with ***. 311 // e.g. https://token:x@github.com/org/repo → https://***@github.com/org/repo 312 func maskURL(raw string) string { 313 if i := strings.Index(raw, "@"); i != -1 { 314 if j := strings.Index(raw, "://"); j != -1 && j < i { 315 return raw[:j+3] + "***" + raw[i:] 316 } 317 } 318 return raw 319 } 320