Jaypore CI

> Jaypore CI: Minimal, Offline, Local CI system.
Log | Files | Refs | README | LICENSE

server.go (22496B)


      1 package jci
      2 
      3 import (
      4 	"bytes"
      5 	"crypto/hmac"
      6 	"crypto/rand"
      7 	"crypto/sha256"
      8 	"database/sql"
      9 	"encoding/base64"
     10 	"encoding/hex"
     11 	"encoding/json"
     12 	_ "embed"
     13 	"flag"
     14 	"fmt"
     15 	"io"
     16 	"log"
     17 	"net/http"
     18 	"os"
     19 	"strconv"
     20 	"strings"
     21 	"time"
     22 
     23 	_ "modernc.org/sqlite"
     24 )
     25 
     26 //go:embed default_runner_script.sh
     27 var defaultRunnerScript []byte
     28 
     29 // Server runs the coordination server: accepts Gitea webhooks and serves runner poll requests.
     30 //
     31 // Configuration env vars:
     32 //
     33 //	GITEA_HOST                        e.g. gitea.example.com
     34 //	GITEA_USER                        service account username
     35 //	GITEA_TOKEN                       token with read/write access to the target repos
     36 //	GITEA_WEBHOOK_SECRET              HMAC-SHA256 secret shared with Gitea
     37 //	JCI_RUNNER_SECRET                  shared secret that runners authenticate with
     38 //	JCI_MAX_JOBS                      max concurrent jobs per runner (default 4)
     39 //	JCI_JOB_TIMEOUT                   server-wide job timeout (default 60m)
     40 //	JCI_LISTEN                        listen address (default :8080)
     41 //	JCI_DB                            SQLite DB path (default /var/lib/jci/server.db)
     42 //	JCI_RUNNER_SCRIPT                 path to a shell script to send to runners (default: embedded default_runner_script.sh)
     43 //	JCI_RUNNER_IMAGE                  container image runners should use (default: python:3.11)
     44 //	JCI_JOB_ALLOW_DOCKER_FOR_REPOS    comma-separated list of owner/repo that get the Docker socket mounted (e.g. myorg/myrepo,myorg/other)
     45 //
     46 // Flags (override env vars):
     47 //
     48 //	--ip    bind IP  (default "" = all interfaces)
     49 //	--port  bind port (default 8080)
     50 func Server(args []string) error {
     51 	fs := flag.NewFlagSet("server", flag.ContinueOnError)
     52 	flagIP := fs.String("ip", "", "IP address to listen on (default: all interfaces)")
     53 	flagPort := fs.String("port", "", "port to listen on (default: 8080)")
     54 	if err := fs.Parse(args); err != nil {
     55 		return err
     56 	}
     57 
     58 	cfg, err := loadServerConfig()
     59 	if err != nil {
     60 		return err
     61 	}
     62 
     63 	// Flags override env vars
     64 	if *flagIP != "" || *flagPort != "" {
     65 		ip := *flagIP
     66 		port := *flagPort
     67 		if port == "" {
     68 			port = "8080"
     69 		}
     70 		cfg.listen = ip + ":" + port
     71 	}
     72 
     73 	db, err := openServerDB(cfg.dbPath)
     74 	if err != nil {
     75 		return fmt.Errorf("open db: %w", err)
     76 	}
     77 	defer db.Close()
     78 
     79 	log.Printf("verifying Gitea token against %s …", cfg.giteaHost)
     80 	if err := verifyGiteaToken(cfg); err != nil {
     81 		return fmt.Errorf("Gitea token check failed: %w", err)
     82 	}
     83 	log.Printf("Gitea token OK")
     84 
     85 	srv := &coordinationServer{cfg: cfg, db: db}
     86 
     87 	mux := http.NewServeMux()
     88 	mux.HandleFunc("/webhook", srv.handleWebhook)
     89 	mux.HandleFunc("/poll", srv.handlePoll)
     90 
     91 	log.Printf("jci server listening on %s", cfg.listen)
     92 	return http.ListenAndServe(cfg.listen, mux)
     93 }
     94 
     95 // ---------------------------------------------------------------------------
     96 // Config
     97 // ---------------------------------------------------------------------------
     98 
     99 type serverConfig struct {
    100 	giteaHost            string
    101 	giteaUser            string
    102 	giteaToken           string
    103 	webhookSecret        string
    104 	runnerSecret         string
    105 	maxJobsPerRunner     int
    106 	jobTimeout           time.Duration
    107 	listen               string
    108 	dbPath               string
    109 	runnerScript         string // base64-encoded script to send to runners
    110 	runnerImage          string // container image runners should use
    111 	allowDockerForRepos  map[string]bool // set of "owner/repo" keys
    112 }
    113 
    114 func loadServerConfig() (serverConfig, error) {
    115 	cfg := serverConfig{
    116 		maxJobsPerRunner:    4,
    117 		jobTimeout:          60 * time.Minute,
    118 		listen:              ":8080",
    119 		dbPath:              "/var/lib/jci/server.db",
    120 		runnerImage:         "python:3.11",
    121 		allowDockerForRepos: map[string]bool{},
    122 	}
    123 	cfg.giteaHost = os.Getenv("GITEA_HOST")
    124 	cfg.giteaUser = os.Getenv("GITEA_USER")
    125 	cfg.giteaToken = os.Getenv("GITEA_TOKEN")
    126 	cfg.webhookSecret = os.Getenv("GITEA_WEBHOOK_SECRET")
    127 	cfg.runnerSecret = os.Getenv("JCI_RUNNER_SECRET")
    128 
    129 	missing := []string{}
    130 	for _, pair := range [][2]string{
    131 		{"GITEA_HOST", cfg.giteaHost}, {"GITEA_USER", cfg.giteaUser},
    132 		{"GITEA_TOKEN", cfg.giteaToken}, {"GITEA_WEBHOOK_SECRET", cfg.webhookSecret},
    133 		{"JCI_RUNNER_SECRET", cfg.runnerSecret},
    134 	} {
    135 		if pair[1] == "" {
    136 			missing = append(missing, pair[0])
    137 		}
    138 	}
    139 	if len(missing) > 0 {
    140 		return cfg, fmt.Errorf("missing required env vars: %s", strings.Join(missing, ", "))
    141 	}
    142 
    143 	if v := os.Getenv("JCI_MAX_JOBS"); v != "" {
    144 		n, err := strconv.Atoi(v)
    145 		if err != nil {
    146 			return cfg, fmt.Errorf("JCI_MAX_JOBS must be an integer: %w", err)
    147 		}
    148 		cfg.maxJobsPerRunner = n
    149 	}
    150 	if v := os.Getenv("JCI_JOB_TIMEOUT"); v != "" {
    151 		d, err := time.ParseDuration(v)
    152 		if err != nil {
    153 			return cfg, fmt.Errorf("JCI_JOB_TIMEOUT: %w", err)
    154 		}
    155 		cfg.jobTimeout = d
    156 	}
    157 	if v := os.Getenv("JCI_LISTEN"); v != "" {
    158 		cfg.listen = v
    159 	}
    160 	if v := os.Getenv("JCI_DB"); v != "" {
    161 		cfg.dbPath = v
    162 	}
    163 	if v := os.Getenv("JCI_RUNNER_IMAGE"); v != "" {
    164 		cfg.runnerImage = v
    165 	}
    166 
    167 	// Load runner script: use file from JCI_RUNNER_SCRIPT if set, else embedded default.
    168 	scriptBytes := defaultRunnerScript
    169 	if v := os.Getenv("JCI_RUNNER_SCRIPT"); v != "" {
    170 		b, err := os.ReadFile(v)
    171 		if err != nil {
    172 			return cfg, fmt.Errorf("JCI_RUNNER_SCRIPT: %w", err)
    173 		}
    174 		scriptBytes = b
    175 	}
    176 	cfg.runnerScript = base64.StdEncoding.EncodeToString(scriptBytes)
    177 
    178 	// Parse allow-docker repos: "owner/repo,owner2/repo2" → set.
    179 	if v := os.Getenv("JCI_JOB_ALLOW_DOCKER_FOR_REPOS"); v != "" {
    180 		for _, entry := range strings.Split(v, ",") {
    181 			entry = strings.TrimSpace(entry)
    182 			if entry != "" {
    183 				cfg.allowDockerForRepos[entry] = true
    184 			}
    185 		}
    186 	}
    187 
    188 	return cfg, nil
    189 }
    190 
    191 // ---------------------------------------------------------------------------
    192 // SQLite schema
    193 // ---------------------------------------------------------------------------
    194 
    195 func openServerDB(path string) (*sql.DB, error) {
    196 	if err := os.MkdirAll(parentDir(path), 0755); err != nil {
    197 		return nil, err
    198 	}
    199 	db, err := sql.Open("sqlite", path)
    200 	if err != nil {
    201 		return nil, err
    202 	}
    203 	_, err = db.Exec(`
    204 		CREATE TABLE IF NOT EXISTS runners (
    205 			runner_id  TEXT PRIMARY KEY,
    206 			last_seen  DATETIME NOT NULL
    207 		);
    208 		CREATE TABLE IF NOT EXISTS jobs (
    209 			job_id             TEXT PRIMARY KEY,
    210 			repo_owner         TEXT NOT NULL,
    211 			repo_name          TEXT NOT NULL,
    212 			commit_sha         TEXT NOT NULL,
    213 			runner_id          TEXT,
    214 			assigned_at        DATETIME,
    215 			expires_at         DATETIME,
    216 			gitea_token        TEXT NOT NULL DEFAULT '',
    217 			status_cache       TEXT,
    218 			cache_until        DATETIME,
    219 			allow_docker_socket INTEGER NOT NULL DEFAULT 0
    220 		);
    221 	`)
    222 	if err != nil {
    223 		return db, err
    224 	}
    225 	// Migrate existing DB: add allow_docker_socket column if absent.
    226 	_, err = db.Exec(`ALTER TABLE jobs ADD COLUMN allow_docker_socket INTEGER NOT NULL DEFAULT 0`)
    227 	if err != nil && !strings.Contains(err.Error(), "duplicate column") {
    228 		return db, fmt.Errorf("migrate jobs table: %w", err)
    229 	}
    230 	return db, nil
    231 }
    232 
    233 // ---------------------------------------------------------------------------
    234 // Server
    235 // ---------------------------------------------------------------------------
    236 
    237 type coordinationServer struct {
    238 	cfg serverConfig
    239 	db  *sql.DB
    240 }
    241 
    242 // ---------------------------------------------------------------------------
    243 // Webhook handler
    244 // ---------------------------------------------------------------------------
    245 
    246 func (s *coordinationServer) handleWebhook(w http.ResponseWriter, r *http.Request) {
    247 	log.Printf("webhook: incoming request from %s", r.RemoteAddr)
    248 	if r.Method != http.MethodPost {
    249 		http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
    250 		return
    251 	}
    252 	body, err := io.ReadAll(r.Body)
    253 	if err != nil {
    254 		http.Error(w, "read error", http.StatusBadRequest)
    255 		return
    256 	}
    257 
    258 	sig := r.Header.Get("X-Gitea-Signature")
    259 	if !checkHMAC(body, sig, s.cfg.webhookSecret) {
    260 		log.Printf("webhook: HMAC check failed from %s (sig=%q)", r.RemoteAddr, sig)
    261 		http.Error(w, "invalid signature", http.StatusBadRequest)
    262 		return
    263 	}
    264 
    265 	var payload struct {
    266 		After string `json:"after"`
    267 		Repo  struct {
    268 			Owner struct{ Login string } `json:"owner"`
    269 			Name  string                 `json:"name"`
    270 		} `json:"repository"`
    271 	}
    272 	if err := json.Unmarshal(body, &payload); err != nil || payload.After == "" {
    273 		log.Printf("webhook: ignoring non-push event from %s (no 'after' field)", r.RemoteAddr)
    274 		w.WriteHeader(http.StatusOK) // not a push event; ignore
    275 		return
    276 	}
    277 
    278 	owner, name, commit := payload.Repo.Owner.Login, payload.Repo.Name, payload.After
    279 
    280 	// Deduplicate on commit SHA
    281 	var count int
    282 	_ = s.db.QueryRow(`SELECT COUNT(*) FROM jobs WHERE commit_sha = ?`, commit).Scan(&count)
    283 	if count > 0 {
    284 		log.Printf("webhook: duplicate commit %s/%s@%.12s — already queued, ignoring", owner, name, commit)
    285 		w.WriteHeader(http.StatusOK)
    286 		return
    287 	}
    288 
    289 	allowDocker := 0
    290 	if s.cfg.allowDockerForRepos[owner+"/"+name] {
    291 		allowDocker = 1
    292 	}
    293 
    294 	_, err = s.db.Exec(
    295 		`INSERT INTO jobs (job_id, repo_owner, repo_name, commit_sha, status_cache, allow_docker_socket) VALUES (?,?,?,?,'pending',?)`,
    296 		randomID(), owner, name, commit, allowDocker,
    297 	)
    298 	if err != nil {
    299 		log.Printf("webhook: insert job: %v", err)
    300 		http.Error(w, "db error", http.StatusInternalServerError)
    301 		return
    302 	}
    303 	log.Printf("webhook: queued %s/%s@%.12s", owner, name, commit)
    304 	w.WriteHeader(http.StatusOK)
    305 }
    306 
    307 // ---------------------------------------------------------------------------
    308 // Poll handler
    309 // ---------------------------------------------------------------------------
    310 
    311 func (s *coordinationServer) handlePoll(w http.ResponseWriter, r *http.Request) {
    312 	if r.Method != http.MethodPost {
    313 		http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
    314 		return
    315 	}
    316 	var req struct {
    317 		RunnerID string `json:"runner_id"`
    318 		Secret   string `json:"secret"`
    319 	}
    320 	if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
    321 		http.Error(w, "bad request", http.StatusBadRequest)
    322 		return
    323 	}
    324 	if req.Secret != s.cfg.runnerSecret {
    325 		http.Error(w, "forbidden", http.StatusForbidden)
    326 		return
    327 	}
    328 
    329 	// Auto-register / heartbeat
    330 	var knownRunner int
    331 	_ = s.db.QueryRow(`SELECT COUNT(*) FROM runners WHERE runner_id = ?`, req.RunnerID).Scan(&knownRunner)
    332 	_, _ = s.db.Exec(
    333 		`INSERT INTO runners (runner_id, last_seen) VALUES (?,?)
    334 		 ON CONFLICT(runner_id) DO UPDATE SET last_seen=excluded.last_seen`,
    335 		req.RunnerID, time.Now().UTC().Format(time.RFC3339),
    336 	)
    337 	if knownRunner == 0 {
    338 		log.Printf("new worker registered: %s", req.RunnerID)
    339 	} else {
    340 		log.Printf("worker heartbeat: %s", req.RunnerID)
    341 	}
    342 
    343 	// Clean expired jobs; finalise completed jobs for this runner
    344 	s.reapExpiredJobs()
    345 	s.syncCompletedJobsForRunner(req.RunnerID)
    346 
    347 	// Capacity check
    348 	var activeCount int
    349 	_ = s.db.QueryRow(`SELECT COUNT(*) FROM jobs WHERE runner_id = ?`, req.RunnerID).Scan(&activeCount)
    350 	if activeCount >= s.cfg.maxJobsPerRunner {
    351 		log.Printf("worker %s at capacity (%d/%d jobs); asking it to back off", req.RunnerID, activeCount, s.cfg.maxJobsPerRunner)
    352 		w.Header().Set("Retry-After", "5")
    353 		w.WriteHeader(http.StatusTooManyRequests)
    354 		return
    355 	}
    356 
    357 	// Pick one unassigned job using round-robin across repos:
    358 	// choose the repo/owner pair with the fewest currently-assigned jobs
    359 	// (across all runners), then take its oldest (lowest rowid) unassigned job.
    360 	// This prevents any single repo from monopolising the queue.
    361 	var jobID, repoOwner, repoName, commitSHA string
    362 	var allowDockerSocket int
    363 	err := s.db.QueryRow(`
    364 		SELECT j.job_id, j.repo_owner, j.repo_name, j.commit_sha, j.allow_docker_socket
    365 		FROM jobs j
    366 		WHERE j.runner_id IS NULL
    367 		ORDER BY (
    368 			SELECT COUNT(*) FROM jobs a
    369 			WHERE a.runner_id IS NOT NULL
    370 			  AND a.repo_owner = j.repo_owner
    371 			  AND a.repo_name  = j.repo_name
    372 		) ASC, j.rowid ASC
    373 		LIMIT 1
    374 	`).Scan(&jobID, &repoOwner, &repoName, &commitSHA, &allowDockerSocket)
    375 	if err == sql.ErrNoRows {
    376 		log.Printf("worker %s polled: no pending jobs", req.RunnerID)
    377 		w.Header().Set("Content-Type", "application/json")
    378 		json.NewEncoder(w).Encode(map[string]any{"job": nil})
    379 		return
    380 	} else if err != nil {
    381 		http.Error(w, "db error", http.StatusInternalServerError)
    382 		return
    383 	}
    384 
    385 	now := time.Now().UTC()
    386 	expiresAt := now.Add(s.cfg.jobTimeout)
    387 	_, _ = s.db.Exec(
    388 		`UPDATE jobs SET runner_id=?, assigned_at=?, expires_at=?, status_cache='running' WHERE job_id=?`,
    389 		req.RunnerID, now.Format(time.RFC3339), expiresAt.Format(time.RFC3339), jobID,
    390 	)
    391 
    392 	s.setGiteaCommitStatus(repoOwner, repoName, commitSHA, "running", "jci: job assigned")
    393 	log.Printf("job %s assigned to worker %s (%s/%s@%.12s) allowDocker=%v", jobID, req.RunnerID, repoOwner, repoName, commitSHA, allowDockerSocket == 1)
    394 
    395 	cloneURL := fmt.Sprintf("https://%s:%s@%s/%s/%s", s.cfg.giteaUser, s.cfg.giteaToken, s.cfg.giteaHost, repoOwner, repoName)
    396 	w.Header().Set("Content-Type", "application/json")
    397 	json.NewEncoder(w).Encode(map[string]any{
    398 		"job": map[string]any{
    399 			"job_id":              jobID,
    400 			"clone_url":           cloneURL,
    401 			"commit_sha":          commitSHA,
    402 			"repo_owner":          repoOwner,
    403 			"repo_name":           repoName,
    404 			"timeout_seconds":     int(s.cfg.jobTimeout.Seconds()),
    405 			"image":               s.cfg.runnerImage,
    406 			"script":              s.cfg.runnerScript,
    407 			"allow_docker_socket": allowDockerSocket == 1,
    408 		},
    409 	})
    410 }
    411 
    412 // ---------------------------------------------------------------------------
    413 // Gitea API helpers
    414 // ---------------------------------------------------------------------------
    415 
    416 // verifyGiteaToken confirms the token is valid by listing repos accessible to it.
    417 func verifyGiteaToken(cfg serverConfig) error {
    418 	url := fmt.Sprintf("https://%s/api/v1/repos/search?limit=1", cfg.giteaHost)
    419 	log.Printf("gitea: verifying token for host %s", cfg.giteaHost)
    420 	resp, err := giteaCall("GET", url, cfg.giteaToken, nil)
    421 	if err != nil {
    422 		return err
    423 	}
    424 	if resp.status != 200 {
    425 		return fmt.Errorf("GET %s returned %d — check GITEA_TOKEN", url, resp.status)
    426 	}
    427 	return nil
    428 }
    429 
    430 func (s *coordinationServer) setGiteaCommitStatus(owner, repo, commitSHA, state, description string) {
    431 	url := fmt.Sprintf("https://%s/api/v1/repos/%s/%s/statuses/%s", s.cfg.giteaHost, owner, repo, commitSHA)
    432 	body, _ := json.Marshal(map[string]string{
    433 		"state":       state,
    434 		"description": description,
    435 		"context":     "jci",
    436 	})
    437 	resp, err := giteaCall("POST", url, s.cfg.giteaToken, body)
    438 	if err != nil {
    439 		log.Printf("set commit status %s/%s@%.12s → %s: error: %v", owner, repo, commitSHA, state, err)
    440 		return
    441 	}
    442 	log.Printf("set commit status %s/%s@%.12s → %s: HTTP %d: %s", owner, repo, commitSHA, state, resp.status, resp.body)
    443 }
    444 
    445 // checkJobStatusOnGitea reads status.txt from the latest refs/jci-runs/<commit>/* on Gitea.
    446 // Returns "ok", "err", "running", or "" (unknown/not found).
    447 func (s *coordinationServer) checkJobStatusOnGitea(owner, repo, commitSHA string) string {
    448 	// List matching refs
    449 	url := fmt.Sprintf("https://%s/api/v1/repos/%s/%s/git/refs/jci-runs/%s", s.cfg.giteaHost, owner, repo, commitSHA)
    450 	resp, err := giteaCall("GET", url, s.cfg.giteaToken, nil)
    451 	if err != nil {
    452 		log.Printf("checkJobStatus %s/%s@%.12s: list refs error: %v", owner, repo, commitSHA, err)
    453 		return ""
    454 	}
    455 	if resp.status != 200 {
    456 		log.Printf("checkJobStatus %s/%s@%.12s: list refs HTTP %d", owner, repo, commitSHA, resp.status)
    457 		return ""
    458 	}
    459 	var refs []struct {
    460 		Object struct{ SHA string } `json:"object"`
    461 	}
    462 	if err := json.Unmarshal(resp.body, &refs); err != nil {
    463 		log.Printf("checkJobStatus %s/%s@%.12s: decode refs: %v", owner, repo, commitSHA, err)
    464 		return ""
    465 	}
    466 	if len(refs) == 0 {
    467 		log.Printf("checkJobStatus %s/%s@%.12s: no jci-runs refs found yet", owner, repo, commitSHA)
    468 		return ""
    469 	}
    470 	runCommitSHA := refs[len(refs)-1].Object.SHA
    471 
    472 	// Get tree
    473 	treeURL := fmt.Sprintf("https://%s/api/v1/repos/%s/%s/git/trees/%s", s.cfg.giteaHost, owner, repo, runCommitSHA)
    474 	treeresp, err := giteaCall("GET", treeURL, s.cfg.giteaToken, nil)
    475 	if err != nil {
    476 		log.Printf("checkJobStatus %s/%s@%.12s: get tree error: %v", owner, repo, commitSHA, err)
    477 		return ""
    478 	}
    479 	if treeresp.status != 200 {
    480 		log.Printf("checkJobStatus %s/%s@%.12s: get tree HTTP %d", owner, repo, commitSHA, treeresp.status)
    481 		return ""
    482 	}
    483 	var tree struct {
    484 		Tree []struct {
    485 			Path string `json:"path"`
    486 			SHA  string `json:"sha"`
    487 			Type string `json:"type"`
    488 		} `json:"tree"`
    489 	}
    490 	if err := json.Unmarshal(treeresp.body, &tree); err != nil {
    491 		return ""
    492 	}
    493 	var blobSHA string
    494 	for _, e := range tree.Tree {
    495 		if e.Path == "status.txt" && e.Type == "blob" {
    496 			blobSHA = e.SHA
    497 			break
    498 		}
    499 	}
    500 	if blobSHA == "" {
    501 		log.Printf("checkJobStatus %s/%s@%.12s: status.txt not found in tree %s", owner, repo, commitSHA, runCommitSHA)
    502 		return ""
    503 	}
    504 
    505 	blobURL := fmt.Sprintf("https://%s/api/v1/repos/%s/%s/git/blobs/%s", s.cfg.giteaHost, owner, repo, blobSHA)
    506 	blobResp, err := giteaCall("GET", blobURL, s.cfg.giteaToken, nil)
    507 	if err != nil {
    508 		log.Printf("checkJobStatus %s/%s@%.12s: get blob error: %v", owner, repo, commitSHA, err)
    509 		return ""
    510 	}
    511 	if blobResp.status != 200 {
    512 		log.Printf("checkJobStatus %s/%s@%.12s: get blob HTTP %d", owner, repo, commitSHA, blobResp.status)
    513 		return ""
    514 	}
    515 	var blob struct {
    516 		Content  string `json:"content"`
    517 		Encoding string `json:"encoding"`
    518 	}
    519 	if err := json.Unmarshal(blobResp.body, &blob); err != nil {
    520 		return ""
    521 	}
    522 	content := blob.Content
    523 	if blob.Encoding == "base64" {
    524 		decoded, err := base64.StdEncoding.DecodeString(strings.ReplaceAll(content, "\n", ""))
    525 		if err != nil {
    526 			return ""
    527 		}
    528 		content = string(decoded)
    529 	}
    530 	content = strings.TrimSpace(content)
    531 	if content == "ok" || content == "err" || content == "running" {
    532 		log.Printf("checkJobStatus %s/%s@%.12s: status.txt = %q", owner, repo, commitSHA, content)
    533 		return content
    534 	}
    535 	log.Printf("checkJobStatus %s/%s@%.12s: unrecognised status.txt content %q", owner, repo, commitSHA, content)
    536 	return ""
    537 }
    538 
    539 // ---------------------------------------------------------------------------
    540 // Background maintenance
    541 // ---------------------------------------------------------------------------
    542 
    543 func (s *coordinationServer) reapExpiredJobs() {
    544 	rows, err := s.db.Query(
    545 		`SELECT job_id, repo_owner, repo_name, commit_sha FROM jobs WHERE expires_at IS NOT NULL AND expires_at < ?`,
    546 		time.Now().UTC().Format(time.RFC3339),
    547 	)
    548 	if err != nil {
    549 		return
    550 	}
    551 	type expiredJob struct{ jobID, owner, repo, commit string }
    552 	var expired []expiredJob
    553 	for rows.Next() {
    554 		var j expiredJob
    555 		if rows.Scan(&j.jobID, &j.owner, &j.repo, &j.commit) == nil {
    556 			expired = append(expired, j)
    557 		}
    558 	}
    559 	rows.Close()
    560 
    561 	for _, j := range expired {
    562 		log.Printf("job %s timed out", j.jobID)
    563 		s.setGiteaCommitStatus(j.owner, j.repo, j.commit, "failure", "jci: job timed out")
    564 		_, _ = s.db.Exec(`DELETE FROM jobs WHERE job_id = ?`, j.jobID)
    565 	}
    566 }
    567 
    568 func (s *coordinationServer) syncCompletedJobsForRunner(runnerID string) {
    569 	rows, err := s.db.Query(
    570 		`SELECT job_id, repo_owner, repo_name, commit_sha, COALESCE(status_cache,''), COALESCE(cache_until,'')
    571 		 FROM jobs WHERE runner_id = ?`, runnerID,
    572 	)
    573 	if err != nil {
    574 		return
    575 	}
    576 	type jobRow struct {
    577 		jobID, owner, repo, commit, statusCache, cacheUntilStr string
    578 	}
    579 	var jobs []jobRow
    580 	for rows.Next() {
    581 		var j jobRow
    582 		if rows.Scan(&j.jobID, &j.owner, &j.repo, &j.commit, &j.statusCache, &j.cacheUntilStr) == nil {
    583 			jobs = append(jobs, j)
    584 		}
    585 	}
    586 	rows.Close()
    587 
    588 	for _, j := range jobs {
    589 		status := j.statusCache
    590 		cacheUntil, _ := time.Parse(time.RFC3339, j.cacheUntilStr)
    591 		if time.Now().UTC().After(cacheUntil) {
    592 			log.Printf("sync job %s (%s/%s@%.12s): cache expired, checking Gitea", j.jobID, j.owner, j.repo, j.commit)
    593 			if fresh := s.checkJobStatusOnGitea(j.owner, j.repo, j.commit); fresh != "" {
    594 				log.Printf("sync job %s: status %q → %q", j.jobID, status, fresh)
    595 				status = fresh
    596 			} else {
    597 				log.Printf("sync job %s: no fresh status from Gitea, keeping %q", j.jobID, status)
    598 			}
    599 			newCacheUntil := time.Now().UTC().Add(15 * time.Second).Format(time.RFC3339)
    600 			_, _ = s.db.Exec(`UPDATE jobs SET status_cache=?, cache_until=? WHERE job_id=?`, status, newCacheUntil, j.jobID)
    601 		} else {
    602 			log.Printf("sync job %s (%s/%s@%.12s): status cache still valid (%q until %s)", j.jobID, j.owner, j.repo, j.commit, status, j.cacheUntilStr)
    603 		}
    604 
    605 		if status == "ok" || status == "err" {
    606 			giteaState := "success"
    607 			if status == "err" {
    608 				giteaState = "failure"
    609 			}
    610 			s.setGiteaCommitStatus(j.owner, j.repo, j.commit, giteaState, "jci: job completed")
    611 			_, _ = s.db.Exec(`DELETE FROM jobs WHERE job_id = ?`, j.jobID)
    612 			log.Printf("job %s completed: %s", j.jobID, status)
    613 		}
    614 	}
    615 }
    616 
    617 // ---------------------------------------------------------------------------
    618 // Generic Gitea HTTP helper
    619 // ---------------------------------------------------------------------------
    620 
    621 type giteaResponse struct {
    622 	status int
    623 	body   []byte
    624 }
    625 
    626 func giteaCall(method, url, token string, body []byte) (giteaResponse, error) {
    627 	log.Printf("gitea: %s %s", method, url)
    628 	var bodyReader io.Reader
    629 	if body != nil {
    630 		bodyReader = bytes.NewReader(body)
    631 	}
    632 	req, err := http.NewRequest(method, url, bodyReader)
    633 	if err != nil {
    634 		return giteaResponse{}, err
    635 	}
    636 	req.Header.Set("Authorization", "token "+token)
    637 	req.Header.Set("Content-Type", "application/json")
    638 	req.Header.Set("Accept", "application/json")
    639 
    640 	client := &http.Client{Timeout: 15 * time.Second}
    641 	resp, err := client.Do(req)
    642 	if err != nil {
    643 		log.Printf("gitea: %s %s → error: %v", method, url, err)
    644 		return giteaResponse{}, err
    645 	}
    646 	defer resp.Body.Close()
    647 	data, _ := io.ReadAll(resp.Body)
    648 	log.Printf("gitea: %s %s → HTTP %d: %s", method, url, resp.StatusCode, data)
    649 	return giteaResponse{status: resp.StatusCode, body: data}, nil
    650 }
    651 
    652 // ---------------------------------------------------------------------------
    653 // Tiny utilities
    654 // ---------------------------------------------------------------------------
    655 
    656 func checkHMAC(body []byte, sig, secret string) bool {
    657 	mac := hmac.New(sha256.New, []byte(secret))
    658 	mac.Write(body)
    659 	expected := hex.EncodeToString(mac.Sum(nil))
    660 	return hmac.Equal([]byte(sig), []byte(expected))
    661 }
    662 
    663 func randomID() string {
    664 	b := make([]byte, 16)
    665 	rand.Read(b)
    666 	return fmt.Sprintf("%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:])
    667 }
    668 
    669 // parentDir returns the directory component of a file path without importing filepath
    670 // (avoids a collision with the filepath import in other files in the package).
    671 func parentDir(path string) string {
    672 	for i := len(path) - 1; i >= 0; i-- {
    673 		if path[i] == '/' || path[i] == '\\' {
    674 			return path[:i]
    675 		}
    676 	}
    677 	return "."
    678 }