server.go (22496B)
1 package jci 2 3 import ( 4 "bytes" 5 "crypto/hmac" 6 "crypto/rand" 7 "crypto/sha256" 8 "database/sql" 9 "encoding/base64" 10 "encoding/hex" 11 "encoding/json" 12 _ "embed" 13 "flag" 14 "fmt" 15 "io" 16 "log" 17 "net/http" 18 "os" 19 "strconv" 20 "strings" 21 "time" 22 23 _ "modernc.org/sqlite" 24 ) 25 26 //go:embed default_runner_script.sh 27 var defaultRunnerScript []byte 28 29 // Server runs the coordination server: accepts Gitea webhooks and serves runner poll requests. 30 // 31 // Configuration env vars: 32 // 33 // GITEA_HOST e.g. gitea.example.com 34 // GITEA_USER service account username 35 // GITEA_TOKEN token with read/write access to the target repos 36 // GITEA_WEBHOOK_SECRET HMAC-SHA256 secret shared with Gitea 37 // JCI_RUNNER_SECRET shared secret that runners authenticate with 38 // JCI_MAX_JOBS max concurrent jobs per runner (default 4) 39 // JCI_JOB_TIMEOUT server-wide job timeout (default 60m) 40 // JCI_LISTEN listen address (default :8080) 41 // JCI_DB SQLite DB path (default /var/lib/jci/server.db) 42 // JCI_RUNNER_SCRIPT path to a shell script to send to runners (default: embedded default_runner_script.sh) 43 // JCI_RUNNER_IMAGE container image runners should use (default: python:3.11) 44 // JCI_JOB_ALLOW_DOCKER_FOR_REPOS comma-separated list of owner/repo that get the Docker socket mounted (e.g. myorg/myrepo,myorg/other) 45 // 46 // Flags (override env vars): 47 // 48 // --ip bind IP (default "" = all interfaces) 49 // --port bind port (default 8080) 50 func Server(args []string) error { 51 fs := flag.NewFlagSet("server", flag.ContinueOnError) 52 flagIP := fs.String("ip", "", "IP address to listen on (default: all interfaces)") 53 flagPort := fs.String("port", "", "port to listen on (default: 8080)") 54 if err := fs.Parse(args); err != nil { 55 return err 56 } 57 58 cfg, err := loadServerConfig() 59 if err != nil { 60 return err 61 } 62 63 // Flags override env vars 64 if *flagIP != "" || *flagPort != "" { 65 ip := *flagIP 66 port := *flagPort 67 if port == "" { 68 port = "8080" 69 } 70 cfg.listen = ip + ":" + port 71 } 72 73 db, err := openServerDB(cfg.dbPath) 74 if err != nil { 75 return fmt.Errorf("open db: %w", err) 76 } 77 defer db.Close() 78 79 log.Printf("verifying Gitea token against %s …", cfg.giteaHost) 80 if err := verifyGiteaToken(cfg); err != nil { 81 return fmt.Errorf("Gitea token check failed: %w", err) 82 } 83 log.Printf("Gitea token OK") 84 85 srv := &coordinationServer{cfg: cfg, db: db} 86 87 mux := http.NewServeMux() 88 mux.HandleFunc("/webhook", srv.handleWebhook) 89 mux.HandleFunc("/poll", srv.handlePoll) 90 91 log.Printf("jci server listening on %s", cfg.listen) 92 return http.ListenAndServe(cfg.listen, mux) 93 } 94 95 // --------------------------------------------------------------------------- 96 // Config 97 // --------------------------------------------------------------------------- 98 99 type serverConfig struct { 100 giteaHost string 101 giteaUser string 102 giteaToken string 103 webhookSecret string 104 runnerSecret string 105 maxJobsPerRunner int 106 jobTimeout time.Duration 107 listen string 108 dbPath string 109 runnerScript string // base64-encoded script to send to runners 110 runnerImage string // container image runners should use 111 allowDockerForRepos map[string]bool // set of "owner/repo" keys 112 } 113 114 func loadServerConfig() (serverConfig, error) { 115 cfg := serverConfig{ 116 maxJobsPerRunner: 4, 117 jobTimeout: 60 * time.Minute, 118 listen: ":8080", 119 dbPath: "/var/lib/jci/server.db", 120 runnerImage: "python:3.11", 121 allowDockerForRepos: map[string]bool{}, 122 } 123 cfg.giteaHost = os.Getenv("GITEA_HOST") 124 cfg.giteaUser = os.Getenv("GITEA_USER") 125 cfg.giteaToken = os.Getenv("GITEA_TOKEN") 126 cfg.webhookSecret = os.Getenv("GITEA_WEBHOOK_SECRET") 127 cfg.runnerSecret = os.Getenv("JCI_RUNNER_SECRET") 128 129 missing := []string{} 130 for _, pair := range [][2]string{ 131 {"GITEA_HOST", cfg.giteaHost}, {"GITEA_USER", cfg.giteaUser}, 132 {"GITEA_TOKEN", cfg.giteaToken}, {"GITEA_WEBHOOK_SECRET", cfg.webhookSecret}, 133 {"JCI_RUNNER_SECRET", cfg.runnerSecret}, 134 } { 135 if pair[1] == "" { 136 missing = append(missing, pair[0]) 137 } 138 } 139 if len(missing) > 0 { 140 return cfg, fmt.Errorf("missing required env vars: %s", strings.Join(missing, ", ")) 141 } 142 143 if v := os.Getenv("JCI_MAX_JOBS"); v != "" { 144 n, err := strconv.Atoi(v) 145 if err != nil { 146 return cfg, fmt.Errorf("JCI_MAX_JOBS must be an integer: %w", err) 147 } 148 cfg.maxJobsPerRunner = n 149 } 150 if v := os.Getenv("JCI_JOB_TIMEOUT"); v != "" { 151 d, err := time.ParseDuration(v) 152 if err != nil { 153 return cfg, fmt.Errorf("JCI_JOB_TIMEOUT: %w", err) 154 } 155 cfg.jobTimeout = d 156 } 157 if v := os.Getenv("JCI_LISTEN"); v != "" { 158 cfg.listen = v 159 } 160 if v := os.Getenv("JCI_DB"); v != "" { 161 cfg.dbPath = v 162 } 163 if v := os.Getenv("JCI_RUNNER_IMAGE"); v != "" { 164 cfg.runnerImage = v 165 } 166 167 // Load runner script: use file from JCI_RUNNER_SCRIPT if set, else embedded default. 168 scriptBytes := defaultRunnerScript 169 if v := os.Getenv("JCI_RUNNER_SCRIPT"); v != "" { 170 b, err := os.ReadFile(v) 171 if err != nil { 172 return cfg, fmt.Errorf("JCI_RUNNER_SCRIPT: %w", err) 173 } 174 scriptBytes = b 175 } 176 cfg.runnerScript = base64.StdEncoding.EncodeToString(scriptBytes) 177 178 // Parse allow-docker repos: "owner/repo,owner2/repo2" → set. 179 if v := os.Getenv("JCI_JOB_ALLOW_DOCKER_FOR_REPOS"); v != "" { 180 for _, entry := range strings.Split(v, ",") { 181 entry = strings.TrimSpace(entry) 182 if entry != "" { 183 cfg.allowDockerForRepos[entry] = true 184 } 185 } 186 } 187 188 return cfg, nil 189 } 190 191 // --------------------------------------------------------------------------- 192 // SQLite schema 193 // --------------------------------------------------------------------------- 194 195 func openServerDB(path string) (*sql.DB, error) { 196 if err := os.MkdirAll(parentDir(path), 0755); err != nil { 197 return nil, err 198 } 199 db, err := sql.Open("sqlite", path) 200 if err != nil { 201 return nil, err 202 } 203 _, err = db.Exec(` 204 CREATE TABLE IF NOT EXISTS runners ( 205 runner_id TEXT PRIMARY KEY, 206 last_seen DATETIME NOT NULL 207 ); 208 CREATE TABLE IF NOT EXISTS jobs ( 209 job_id TEXT PRIMARY KEY, 210 repo_owner TEXT NOT NULL, 211 repo_name TEXT NOT NULL, 212 commit_sha TEXT NOT NULL, 213 runner_id TEXT, 214 assigned_at DATETIME, 215 expires_at DATETIME, 216 gitea_token TEXT NOT NULL DEFAULT '', 217 status_cache TEXT, 218 cache_until DATETIME, 219 allow_docker_socket INTEGER NOT NULL DEFAULT 0 220 ); 221 `) 222 if err != nil { 223 return db, err 224 } 225 // Migrate existing DB: add allow_docker_socket column if absent. 226 _, err = db.Exec(`ALTER TABLE jobs ADD COLUMN allow_docker_socket INTEGER NOT NULL DEFAULT 0`) 227 if err != nil && !strings.Contains(err.Error(), "duplicate column") { 228 return db, fmt.Errorf("migrate jobs table: %w", err) 229 } 230 return db, nil 231 } 232 233 // --------------------------------------------------------------------------- 234 // Server 235 // --------------------------------------------------------------------------- 236 237 type coordinationServer struct { 238 cfg serverConfig 239 db *sql.DB 240 } 241 242 // --------------------------------------------------------------------------- 243 // Webhook handler 244 // --------------------------------------------------------------------------- 245 246 func (s *coordinationServer) handleWebhook(w http.ResponseWriter, r *http.Request) { 247 log.Printf("webhook: incoming request from %s", r.RemoteAddr) 248 if r.Method != http.MethodPost { 249 http.Error(w, "method not allowed", http.StatusMethodNotAllowed) 250 return 251 } 252 body, err := io.ReadAll(r.Body) 253 if err != nil { 254 http.Error(w, "read error", http.StatusBadRequest) 255 return 256 } 257 258 sig := r.Header.Get("X-Gitea-Signature") 259 if !checkHMAC(body, sig, s.cfg.webhookSecret) { 260 log.Printf("webhook: HMAC check failed from %s (sig=%q)", r.RemoteAddr, sig) 261 http.Error(w, "invalid signature", http.StatusBadRequest) 262 return 263 } 264 265 var payload struct { 266 After string `json:"after"` 267 Repo struct { 268 Owner struct{ Login string } `json:"owner"` 269 Name string `json:"name"` 270 } `json:"repository"` 271 } 272 if err := json.Unmarshal(body, &payload); err != nil || payload.After == "" { 273 log.Printf("webhook: ignoring non-push event from %s (no 'after' field)", r.RemoteAddr) 274 w.WriteHeader(http.StatusOK) // not a push event; ignore 275 return 276 } 277 278 owner, name, commit := payload.Repo.Owner.Login, payload.Repo.Name, payload.After 279 280 // Deduplicate on commit SHA 281 var count int 282 _ = s.db.QueryRow(`SELECT COUNT(*) FROM jobs WHERE commit_sha = ?`, commit).Scan(&count) 283 if count > 0 { 284 log.Printf("webhook: duplicate commit %s/%s@%.12s — already queued, ignoring", owner, name, commit) 285 w.WriteHeader(http.StatusOK) 286 return 287 } 288 289 allowDocker := 0 290 if s.cfg.allowDockerForRepos[owner+"/"+name] { 291 allowDocker = 1 292 } 293 294 _, err = s.db.Exec( 295 `INSERT INTO jobs (job_id, repo_owner, repo_name, commit_sha, status_cache, allow_docker_socket) VALUES (?,?,?,?,'pending',?)`, 296 randomID(), owner, name, commit, allowDocker, 297 ) 298 if err != nil { 299 log.Printf("webhook: insert job: %v", err) 300 http.Error(w, "db error", http.StatusInternalServerError) 301 return 302 } 303 log.Printf("webhook: queued %s/%s@%.12s", owner, name, commit) 304 w.WriteHeader(http.StatusOK) 305 } 306 307 // --------------------------------------------------------------------------- 308 // Poll handler 309 // --------------------------------------------------------------------------- 310 311 func (s *coordinationServer) handlePoll(w http.ResponseWriter, r *http.Request) { 312 if r.Method != http.MethodPost { 313 http.Error(w, "method not allowed", http.StatusMethodNotAllowed) 314 return 315 } 316 var req struct { 317 RunnerID string `json:"runner_id"` 318 Secret string `json:"secret"` 319 } 320 if err := json.NewDecoder(r.Body).Decode(&req); err != nil { 321 http.Error(w, "bad request", http.StatusBadRequest) 322 return 323 } 324 if req.Secret != s.cfg.runnerSecret { 325 http.Error(w, "forbidden", http.StatusForbidden) 326 return 327 } 328 329 // Auto-register / heartbeat 330 var knownRunner int 331 _ = s.db.QueryRow(`SELECT COUNT(*) FROM runners WHERE runner_id = ?`, req.RunnerID).Scan(&knownRunner) 332 _, _ = s.db.Exec( 333 `INSERT INTO runners (runner_id, last_seen) VALUES (?,?) 334 ON CONFLICT(runner_id) DO UPDATE SET last_seen=excluded.last_seen`, 335 req.RunnerID, time.Now().UTC().Format(time.RFC3339), 336 ) 337 if knownRunner == 0 { 338 log.Printf("new worker registered: %s", req.RunnerID) 339 } else { 340 log.Printf("worker heartbeat: %s", req.RunnerID) 341 } 342 343 // Clean expired jobs; finalise completed jobs for this runner 344 s.reapExpiredJobs() 345 s.syncCompletedJobsForRunner(req.RunnerID) 346 347 // Capacity check 348 var activeCount int 349 _ = s.db.QueryRow(`SELECT COUNT(*) FROM jobs WHERE runner_id = ?`, req.RunnerID).Scan(&activeCount) 350 if activeCount >= s.cfg.maxJobsPerRunner { 351 log.Printf("worker %s at capacity (%d/%d jobs); asking it to back off", req.RunnerID, activeCount, s.cfg.maxJobsPerRunner) 352 w.Header().Set("Retry-After", "5") 353 w.WriteHeader(http.StatusTooManyRequests) 354 return 355 } 356 357 // Pick one unassigned job using round-robin across repos: 358 // choose the repo/owner pair with the fewest currently-assigned jobs 359 // (across all runners), then take its oldest (lowest rowid) unassigned job. 360 // This prevents any single repo from monopolising the queue. 361 var jobID, repoOwner, repoName, commitSHA string 362 var allowDockerSocket int 363 err := s.db.QueryRow(` 364 SELECT j.job_id, j.repo_owner, j.repo_name, j.commit_sha, j.allow_docker_socket 365 FROM jobs j 366 WHERE j.runner_id IS NULL 367 ORDER BY ( 368 SELECT COUNT(*) FROM jobs a 369 WHERE a.runner_id IS NOT NULL 370 AND a.repo_owner = j.repo_owner 371 AND a.repo_name = j.repo_name 372 ) ASC, j.rowid ASC 373 LIMIT 1 374 `).Scan(&jobID, &repoOwner, &repoName, &commitSHA, &allowDockerSocket) 375 if err == sql.ErrNoRows { 376 log.Printf("worker %s polled: no pending jobs", req.RunnerID) 377 w.Header().Set("Content-Type", "application/json") 378 json.NewEncoder(w).Encode(map[string]any{"job": nil}) 379 return 380 } else if err != nil { 381 http.Error(w, "db error", http.StatusInternalServerError) 382 return 383 } 384 385 now := time.Now().UTC() 386 expiresAt := now.Add(s.cfg.jobTimeout) 387 _, _ = s.db.Exec( 388 `UPDATE jobs SET runner_id=?, assigned_at=?, expires_at=?, status_cache='running' WHERE job_id=?`, 389 req.RunnerID, now.Format(time.RFC3339), expiresAt.Format(time.RFC3339), jobID, 390 ) 391 392 s.setGiteaCommitStatus(repoOwner, repoName, commitSHA, "running", "jci: job assigned") 393 log.Printf("job %s assigned to worker %s (%s/%s@%.12s) allowDocker=%v", jobID, req.RunnerID, repoOwner, repoName, commitSHA, allowDockerSocket == 1) 394 395 cloneURL := fmt.Sprintf("https://%s:%s@%s/%s/%s", s.cfg.giteaUser, s.cfg.giteaToken, s.cfg.giteaHost, repoOwner, repoName) 396 w.Header().Set("Content-Type", "application/json") 397 json.NewEncoder(w).Encode(map[string]any{ 398 "job": map[string]any{ 399 "job_id": jobID, 400 "clone_url": cloneURL, 401 "commit_sha": commitSHA, 402 "repo_owner": repoOwner, 403 "repo_name": repoName, 404 "timeout_seconds": int(s.cfg.jobTimeout.Seconds()), 405 "image": s.cfg.runnerImage, 406 "script": s.cfg.runnerScript, 407 "allow_docker_socket": allowDockerSocket == 1, 408 }, 409 }) 410 } 411 412 // --------------------------------------------------------------------------- 413 // Gitea API helpers 414 // --------------------------------------------------------------------------- 415 416 // verifyGiteaToken confirms the token is valid by listing repos accessible to it. 417 func verifyGiteaToken(cfg serverConfig) error { 418 url := fmt.Sprintf("https://%s/api/v1/repos/search?limit=1", cfg.giteaHost) 419 log.Printf("gitea: verifying token for host %s", cfg.giteaHost) 420 resp, err := giteaCall("GET", url, cfg.giteaToken, nil) 421 if err != nil { 422 return err 423 } 424 if resp.status != 200 { 425 return fmt.Errorf("GET %s returned %d — check GITEA_TOKEN", url, resp.status) 426 } 427 return nil 428 } 429 430 func (s *coordinationServer) setGiteaCommitStatus(owner, repo, commitSHA, state, description string) { 431 url := fmt.Sprintf("https://%s/api/v1/repos/%s/%s/statuses/%s", s.cfg.giteaHost, owner, repo, commitSHA) 432 body, _ := json.Marshal(map[string]string{ 433 "state": state, 434 "description": description, 435 "context": "jci", 436 }) 437 resp, err := giteaCall("POST", url, s.cfg.giteaToken, body) 438 if err != nil { 439 log.Printf("set commit status %s/%s@%.12s → %s: error: %v", owner, repo, commitSHA, state, err) 440 return 441 } 442 log.Printf("set commit status %s/%s@%.12s → %s: HTTP %d: %s", owner, repo, commitSHA, state, resp.status, resp.body) 443 } 444 445 // checkJobStatusOnGitea reads status.txt from the latest refs/jci-runs/<commit>/* on Gitea. 446 // Returns "ok", "err", "running", or "" (unknown/not found). 447 func (s *coordinationServer) checkJobStatusOnGitea(owner, repo, commitSHA string) string { 448 // List matching refs 449 url := fmt.Sprintf("https://%s/api/v1/repos/%s/%s/git/refs/jci-runs/%s", s.cfg.giteaHost, owner, repo, commitSHA) 450 resp, err := giteaCall("GET", url, s.cfg.giteaToken, nil) 451 if err != nil { 452 log.Printf("checkJobStatus %s/%s@%.12s: list refs error: %v", owner, repo, commitSHA, err) 453 return "" 454 } 455 if resp.status != 200 { 456 log.Printf("checkJobStatus %s/%s@%.12s: list refs HTTP %d", owner, repo, commitSHA, resp.status) 457 return "" 458 } 459 var refs []struct { 460 Object struct{ SHA string } `json:"object"` 461 } 462 if err := json.Unmarshal(resp.body, &refs); err != nil { 463 log.Printf("checkJobStatus %s/%s@%.12s: decode refs: %v", owner, repo, commitSHA, err) 464 return "" 465 } 466 if len(refs) == 0 { 467 log.Printf("checkJobStatus %s/%s@%.12s: no jci-runs refs found yet", owner, repo, commitSHA) 468 return "" 469 } 470 runCommitSHA := refs[len(refs)-1].Object.SHA 471 472 // Get tree 473 treeURL := fmt.Sprintf("https://%s/api/v1/repos/%s/%s/git/trees/%s", s.cfg.giteaHost, owner, repo, runCommitSHA) 474 treeresp, err := giteaCall("GET", treeURL, s.cfg.giteaToken, nil) 475 if err != nil { 476 log.Printf("checkJobStatus %s/%s@%.12s: get tree error: %v", owner, repo, commitSHA, err) 477 return "" 478 } 479 if treeresp.status != 200 { 480 log.Printf("checkJobStatus %s/%s@%.12s: get tree HTTP %d", owner, repo, commitSHA, treeresp.status) 481 return "" 482 } 483 var tree struct { 484 Tree []struct { 485 Path string `json:"path"` 486 SHA string `json:"sha"` 487 Type string `json:"type"` 488 } `json:"tree"` 489 } 490 if err := json.Unmarshal(treeresp.body, &tree); err != nil { 491 return "" 492 } 493 var blobSHA string 494 for _, e := range tree.Tree { 495 if e.Path == "status.txt" && e.Type == "blob" { 496 blobSHA = e.SHA 497 break 498 } 499 } 500 if blobSHA == "" { 501 log.Printf("checkJobStatus %s/%s@%.12s: status.txt not found in tree %s", owner, repo, commitSHA, runCommitSHA) 502 return "" 503 } 504 505 blobURL := fmt.Sprintf("https://%s/api/v1/repos/%s/%s/git/blobs/%s", s.cfg.giteaHost, owner, repo, blobSHA) 506 blobResp, err := giteaCall("GET", blobURL, s.cfg.giteaToken, nil) 507 if err != nil { 508 log.Printf("checkJobStatus %s/%s@%.12s: get blob error: %v", owner, repo, commitSHA, err) 509 return "" 510 } 511 if blobResp.status != 200 { 512 log.Printf("checkJobStatus %s/%s@%.12s: get blob HTTP %d", owner, repo, commitSHA, blobResp.status) 513 return "" 514 } 515 var blob struct { 516 Content string `json:"content"` 517 Encoding string `json:"encoding"` 518 } 519 if err := json.Unmarshal(blobResp.body, &blob); err != nil { 520 return "" 521 } 522 content := blob.Content 523 if blob.Encoding == "base64" { 524 decoded, err := base64.StdEncoding.DecodeString(strings.ReplaceAll(content, "\n", "")) 525 if err != nil { 526 return "" 527 } 528 content = string(decoded) 529 } 530 content = strings.TrimSpace(content) 531 if content == "ok" || content == "err" || content == "running" { 532 log.Printf("checkJobStatus %s/%s@%.12s: status.txt = %q", owner, repo, commitSHA, content) 533 return content 534 } 535 log.Printf("checkJobStatus %s/%s@%.12s: unrecognised status.txt content %q", owner, repo, commitSHA, content) 536 return "" 537 } 538 539 // --------------------------------------------------------------------------- 540 // Background maintenance 541 // --------------------------------------------------------------------------- 542 543 func (s *coordinationServer) reapExpiredJobs() { 544 rows, err := s.db.Query( 545 `SELECT job_id, repo_owner, repo_name, commit_sha FROM jobs WHERE expires_at IS NOT NULL AND expires_at < ?`, 546 time.Now().UTC().Format(time.RFC3339), 547 ) 548 if err != nil { 549 return 550 } 551 type expiredJob struct{ jobID, owner, repo, commit string } 552 var expired []expiredJob 553 for rows.Next() { 554 var j expiredJob 555 if rows.Scan(&j.jobID, &j.owner, &j.repo, &j.commit) == nil { 556 expired = append(expired, j) 557 } 558 } 559 rows.Close() 560 561 for _, j := range expired { 562 log.Printf("job %s timed out", j.jobID) 563 s.setGiteaCommitStatus(j.owner, j.repo, j.commit, "failure", "jci: job timed out") 564 _, _ = s.db.Exec(`DELETE FROM jobs WHERE job_id = ?`, j.jobID) 565 } 566 } 567 568 func (s *coordinationServer) syncCompletedJobsForRunner(runnerID string) { 569 rows, err := s.db.Query( 570 `SELECT job_id, repo_owner, repo_name, commit_sha, COALESCE(status_cache,''), COALESCE(cache_until,'') 571 FROM jobs WHERE runner_id = ?`, runnerID, 572 ) 573 if err != nil { 574 return 575 } 576 type jobRow struct { 577 jobID, owner, repo, commit, statusCache, cacheUntilStr string 578 } 579 var jobs []jobRow 580 for rows.Next() { 581 var j jobRow 582 if rows.Scan(&j.jobID, &j.owner, &j.repo, &j.commit, &j.statusCache, &j.cacheUntilStr) == nil { 583 jobs = append(jobs, j) 584 } 585 } 586 rows.Close() 587 588 for _, j := range jobs { 589 status := j.statusCache 590 cacheUntil, _ := time.Parse(time.RFC3339, j.cacheUntilStr) 591 if time.Now().UTC().After(cacheUntil) { 592 log.Printf("sync job %s (%s/%s@%.12s): cache expired, checking Gitea", j.jobID, j.owner, j.repo, j.commit) 593 if fresh := s.checkJobStatusOnGitea(j.owner, j.repo, j.commit); fresh != "" { 594 log.Printf("sync job %s: status %q → %q", j.jobID, status, fresh) 595 status = fresh 596 } else { 597 log.Printf("sync job %s: no fresh status from Gitea, keeping %q", j.jobID, status) 598 } 599 newCacheUntil := time.Now().UTC().Add(15 * time.Second).Format(time.RFC3339) 600 _, _ = s.db.Exec(`UPDATE jobs SET status_cache=?, cache_until=? WHERE job_id=?`, status, newCacheUntil, j.jobID) 601 } else { 602 log.Printf("sync job %s (%s/%s@%.12s): status cache still valid (%q until %s)", j.jobID, j.owner, j.repo, j.commit, status, j.cacheUntilStr) 603 } 604 605 if status == "ok" || status == "err" { 606 giteaState := "success" 607 if status == "err" { 608 giteaState = "failure" 609 } 610 s.setGiteaCommitStatus(j.owner, j.repo, j.commit, giteaState, "jci: job completed") 611 _, _ = s.db.Exec(`DELETE FROM jobs WHERE job_id = ?`, j.jobID) 612 log.Printf("job %s completed: %s", j.jobID, status) 613 } 614 } 615 } 616 617 // --------------------------------------------------------------------------- 618 // Generic Gitea HTTP helper 619 // --------------------------------------------------------------------------- 620 621 type giteaResponse struct { 622 status int 623 body []byte 624 } 625 626 func giteaCall(method, url, token string, body []byte) (giteaResponse, error) { 627 log.Printf("gitea: %s %s", method, url) 628 var bodyReader io.Reader 629 if body != nil { 630 bodyReader = bytes.NewReader(body) 631 } 632 req, err := http.NewRequest(method, url, bodyReader) 633 if err != nil { 634 return giteaResponse{}, err 635 } 636 req.Header.Set("Authorization", "token "+token) 637 req.Header.Set("Content-Type", "application/json") 638 req.Header.Set("Accept", "application/json") 639 640 client := &http.Client{Timeout: 15 * time.Second} 641 resp, err := client.Do(req) 642 if err != nil { 643 log.Printf("gitea: %s %s → error: %v", method, url, err) 644 return giteaResponse{}, err 645 } 646 defer resp.Body.Close() 647 data, _ := io.ReadAll(resp.Body) 648 log.Printf("gitea: %s %s → HTTP %d: %s", method, url, resp.StatusCode, data) 649 return giteaResponse{status: resp.StatusCode, body: data}, nil 650 } 651 652 // --------------------------------------------------------------------------- 653 // Tiny utilities 654 // --------------------------------------------------------------------------- 655 656 func checkHMAC(body []byte, sig, secret string) bool { 657 mac := hmac.New(sha256.New, []byte(secret)) 658 mac.Write(body) 659 expected := hex.EncodeToString(mac.Sum(nil)) 660 return hmac.Equal([]byte(sig), []byte(expected)) 661 } 662 663 func randomID() string { 664 b := make([]byte, 16) 665 rand.Read(b) 666 return fmt.Sprintf("%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:]) 667 } 668 669 // parentDir returns the directory component of a file path without importing filepath 670 // (avoids a collision with the filepath import in other files in the package). 671 func parentDir(path string) string { 672 for i := len(path) - 1; i >= 0; i-- { 673 if path[i] == '/' || path[i] == '\\' { 674 return path[:i] 675 } 676 } 677 return "." 678 }