Jaypore CI

> Jaypore CI: Minimal, Offline, Local CI system.
Log | Files | Refs | README | LICENSE

runner.go (9742B)


      1 package jci
      2 
      3 import (
      4 	"bytes"
      5 	"database/sql"
      6 	"encoding/json"
      7 	"fmt"
      8 	"io"
      9 	"log"
     10 	"math/rand"
     11 	"net/http"
     12 	"os"
     13 	"strings"
     14 	"time"
     15 
     16 	_ "modernc.org/sqlite"
     17 )
     18 
     19 // Runner polls the jci server for jobs and dispatches them into Docker containers.
     20 //
     21 // Configuration env vars:
     22 //
     23 //	JCI_SERVER         base URL or host:port of the jci server, e.g. https://jci.example.com or jci_server:8080
     24 //	                   If no scheme is present, http:// is prepended automatically.
     25 //	JCI_RUNNER_SECRET  shared secret matching RUNNER_SECRET on the server
     26 //	JCI_RUNNER_DB      path to runner's SQLite DB (default /var/lib/jci/runner.db)
     27 //	JCI_BINARY         path to the git-jci binary to inject into containers (default /usr/local/bin/git-jci)
     28 func Runner(_ []string) error {
     29 	cfg, err := loadRunnerConfig()
     30 	if err != nil {
     31 		return err
     32 	}
     33 
     34 	db, err := openRunnerDB(cfg.dbPath)
     35 	if err != nil {
     36 		return fmt.Errorf("open runner db: %w", err)
     37 	}
     38 	defer db.Close()
     39 
     40 	runnerID, err := getOrCreateRunnerID(db)
     41 	if err != nil {
     42 		return fmt.Errorf("runner identity: %w", err)
     43 	}
     44 	log.Printf("jci runner starting (id=%s)", runnerID)
     45 	log.Printf("connecting to server %s", cfg.serverURL)
     46 
     47 	r := &runner{cfg: cfg, db: db, runnerID: runnerID, rt: newDockerRuntime()}
     48 	r.pollLoop()
     49 	return nil // pollLoop runs forever; return only on fatal errors
     50 }
     51 
     52 // ---------------------------------------------------------------------------
     53 // Config
     54 // ---------------------------------------------------------------------------
     55 
     56 type runnerConfig struct {
     57 	serverURL    string
     58 	runnerSecret string
     59 	dbPath       string
     60 	binaryPath   string
     61 }
     62 
     63 func loadRunnerConfig() (runnerConfig, error) {
     64 	cfg := runnerConfig{
     65 		dbPath:     "/var/lib/jci/runner.db",
     66 		binaryPath: "/usr/local/bin/git-jci",
     67 	}
     68 	cfg.serverURL = normalizeServerURL(os.Getenv("JCI_SERVER"))
     69 	cfg.runnerSecret = os.Getenv("JCI_RUNNER_SECRET")
     70 
     71 	if cfg.serverURL == "" || cfg.runnerSecret == "" {
     72 		return cfg, fmt.Errorf("missing required env vars: JCI_SERVER, JCI_RUNNER_SECRET")
     73 	}
     74 	if v := os.Getenv("JCI_RUNNER_DB"); v != "" {
     75 		cfg.dbPath = v
     76 	}
     77 	if v := os.Getenv("JCI_BINARY"); v != "" {
     78 		cfg.binaryPath = v
     79 	}
     80 	return cfg, nil
     81 }
     82 
     83 // ---------------------------------------------------------------------------
     84 // Runner SQLite identity store
     85 // ---------------------------------------------------------------------------
     86 
     87 func openRunnerDB(path string) (*sql.DB, error) {
     88 	if err := os.MkdirAll(parentDir(path), 0755); err != nil {
     89 		return nil, err
     90 	}
     91 	db, err := sql.Open("sqlite", path)
     92 	if err != nil {
     93 		return nil, err
     94 	}
     95 	_, err = db.Exec(`CREATE TABLE IF NOT EXISTS identity (runner_id TEXT PRIMARY KEY)`)
     96 	return db, err
     97 }
     98 
     99 func getOrCreateRunnerID(db *sql.DB) (string, error) {
    100 	var id string
    101 	err := db.QueryRow(`SELECT runner_id FROM identity LIMIT 1`).Scan(&id)
    102 	if err == nil {
    103 		return id, nil
    104 	}
    105 	if err != sql.ErrNoRows {
    106 		return "", err
    107 	}
    108 	id = randomID()
    109 	_, err = db.Exec(`INSERT INTO identity (runner_id) VALUES (?)`, id)
    110 	return id, err
    111 }
    112 
    113 // ---------------------------------------------------------------------------
    114 // Runner
    115 // ---------------------------------------------------------------------------
    116 
    117 type runner struct {
    118 	cfg      runnerConfig
    119 	db       *sql.DB
    120 	runnerID string
    121 	rt       ContainerRuntime
    122 }
    123 
    124 // pollLoop runs forever: poll the server every 5s + 0–2s jitter.
    125 // Container reaping runs in a separate goroutine on a 60s interval.
    126 func (r *runner) pollLoop() {
    127 	const reapInterval = 60 * time.Second
    128 	go func() {
    129 		for {
    130 			time.Sleep(reapInterval)
    131 			r.reapStaleContainers()
    132 		}
    133 	}()
    134 
    135 	for {
    136 		jitter := time.Duration(rand.Intn(2000)) * time.Millisecond
    137 		time.Sleep(5*time.Second + jitter)
    138 
    139 		retryAfter, job, err := r.poll()
    140 		if err != nil {
    141 			log.Printf("poll error: %v", err)
    142 			continue
    143 		}
    144 		if retryAfter > 0 {
    145 			log.Printf("at capacity; server asked to back off %s", retryAfter)
    146 			time.Sleep(retryAfter)
    147 			continue
    148 		}
    149 		if job == nil {
    150 			log.Printf("poll: no jobs available")
    151 			continue
    152 		}
    153 		go r.dispatch(job) // non-blocking; runner returns to poll immediately
    154 	}
    155 }
    156 
    157 type serverJob struct {
    158 	JobID             string `json:"job_id"`
    159 	CloneURL          string `json:"clone_url"`
    160 	CommitSHA         string `json:"commit_sha"`
    161 	RepoOwner         string `json:"repo_owner"`
    162 	RepoName          string `json:"repo_name"`
    163 	TimeoutSeconds    int    `json:"timeout_seconds"`
    164 	Image             string `json:"image"`
    165 	Script            string `json:"script"`              // base64-encoded shell script
    166 	AllowDockerSocket bool   `json:"allow_docker_socket"` // mount /var/run/docker.sock into job container
    167 }
    168 
    169 // poll contacts the server. Returns (retryAfter, job, err).
    170 // retryAfter > 0 means 429 was received; job may be nil when there is no work.
    171 func (r *runner) poll() (time.Duration, *serverJob, error) {
    172 	log.Printf("polling %s/poll", r.cfg.serverURL)
    173 	body, _ := json.Marshal(map[string]string{
    174 		"runner_id": r.runnerID,
    175 		"secret":    r.cfg.runnerSecret,
    176 	})
    177 	resp, err := http.Post(r.cfg.serverURL+"/poll", "application/json", bytes.NewReader(body))
    178 	if err != nil {
    179 		return 0, nil, err
    180 	}
    181 	defer resp.Body.Close()
    182 
    183 	log.Printf("poll response: %d", resp.StatusCode)
    184 	if resp.StatusCode == http.StatusTooManyRequests {
    185 		secs := 5
    186 		if v := resp.Header.Get("Retry-After"); v != "" {
    187 			fmt.Sscanf(v, "%d", &secs)
    188 		}
    189 		return time.Duration(secs) * time.Second, nil, nil
    190 	}
    191 	if resp.StatusCode != http.StatusOK {
    192 		data, _ := io.ReadAll(resp.Body)
    193 		log.Printf("poll: unexpected status %d: %s", resp.StatusCode, data)
    194 		return 0, nil, fmt.Errorf("server returned %d: %s", resp.StatusCode, data)
    195 	}
    196 
    197 	var result struct {
    198 		Job *serverJob `json:"job"`
    199 	}
    200 	if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
    201 		return 0, nil, fmt.Errorf("decode poll response: %w", err)
    202 	}
    203 	if result.Job != nil {
    204 		log.Printf("poll: received job %s (%s/%s@%.12s)", result.Job.JobID, result.Job.RepoOwner, result.Job.RepoName, result.Job.CommitSHA)
    205 	}
    206 	return 0, result.Job, nil
    207 }
    208 
    209 // dispatch starts a detached Docker container to run the job.
    210 func (r *runner) dispatch(job *serverJob) {
    211 	log.Printf("received job %s: %s/%s@%.12s", job.JobID, job.RepoOwner, job.RepoName, job.CommitSHA)
    212 	log.Printf("job %s: launching container", job.JobID)
    213 
    214 	containerID, err := r.startJobContainer(job)
    215 	if err != nil {
    216 		log.Printf("job %s: failed to start container: %v", job.JobID, err)
    217 		return
    218 	}
    219 	log.Printf("job %s: container %s started — waiting for completion (will be reaped after timeout)", job.JobID, containerID[:12])
    220 
    221 	// Container reaping is handled by the periodic reaper; nothing more to do here.
    222 }
    223 
    224 // startJobContainer launches a detached container and returns its ID.
    225 func (r *runner) startJobContainer(job *serverJob) (string, error) {
    226 	// The script is delivered as a base64-encoded env var (JCI_SCRIPT).
    227 	// The container decodes and pipes it into sh — no script content ever
    228 	// touches a shell on the runner host.
    229 	timeoutLabel := fmt.Sprintf("%dm", job.TimeoutSeconds/60)
    230 
    231 	binds := []string{r.cfg.binaryPath + ":/usr/local/bin/git-jci:ro"}
    232 	if job.AllowDockerSocket {
    233 		binds = append(binds, "/var/run/docker.sock:/var/run/docker.sock")
    234 	}
    235 
    236 	spec := ContainerSpec{
    237 		Image:   job.Image,
    238 		Command: []string{"/bin/sh", "-c", `echo "$JCI_SCRIPT" | base64 -d | /bin/sh`},
    239 		Env: []string{
    240 			"JCI_CLONE_URL=" + job.CloneURL,
    241 			"JCI_COMMIT=" + job.CommitSHA,
    242 			"JCI_SCRIPT=" + job.Script,
    243 		},
    244 		Binds: binds,
    245 		Labels: map[string]string{
    246 			"jci-job":     "y",
    247 			"jci-job-id":  job.JobID,
    248 			"jci-timeout": timeoutLabel,
    249 		},
    250 	}
    251 
    252 	// Log without credentials (clone URL contains token).
    253 	logSpec := spec
    254 	logSpec.Env = []string{
    255 		"JCI_CLONE_URL=" + maskURL(job.CloneURL),
    256 		"JCI_COMMIT=" + job.CommitSHA,
    257 		"JCI_SCRIPT=(base64, " + fmt.Sprintf("%d bytes", len(job.Script)) + ")",
    258 	}
    259 	log.Printf("job %s: %s", job.JobID, logContainerCmd(logSpec))
    260 
    261 	return r.rt.StartContainer(spec)
    262 }
    263 
    264 // reapStaleContainers kills containers labelled jci-job=y whose running time
    265 // exceeds the duration stored in their jci-timeout label.
    266 func (r *runner) reapStaleContainers() {
    267 	containers, err := r.rt.ListContainers("jci-job=y")
    268 	if err != nil {
    269 		log.Printf("reaper: list containers: %v", err)
    270 		return
    271 	}
    272 	log.Printf("reaper: found %d jci container(s)", len(containers))
    273 	for _, c := range containers {
    274 		label := c.Labels["jci-timeout"]
    275 		if label == "" {
    276 			continue
    277 		}
    278 		timeout, err := time.ParseDuration(label)
    279 		if err != nil {
    280 			log.Printf("reaper: container %s has invalid jci-timeout label %q: %v", c.ID[:12], label, err)
    281 			continue
    282 		}
    283 		startedAt, err := r.rt.InspectStartedAt(c.ID)
    284 		if err != nil {
    285 			log.Printf("reaper: inspect %s: %v", c.ID[:12], err)
    286 			continue
    287 		}
    288 		if time.Since(startedAt) > timeout {
    289 			log.Printf("reaper: killing stale container %s (timeout=%s, running since %s)", c.ID[:12], timeout, startedAt)
    290 			if err := r.rt.RemoveContainer(c.ID); err != nil {
    291 				log.Printf("reaper: remove %s: %v", c.ID[:12], err)
    292 			}
    293 		}
    294 	}
    295 }
    296 
    297 // normalizeServerURL ensures the URL has an http:// or https:// scheme.
    298 // This allows JCI_SERVER to be set as just a host:port (e.g. "jci_server:8080")
    299 // without triggering the "first path segment cannot contain colon" parse error.
    300 func normalizeServerURL(s string) string {
    301 	if s == "" {
    302 		return s
    303 	}
    304 	if !strings.HasPrefix(s, "http://") && !strings.HasPrefix(s, "https://") {
    305 		return "http://" + s
    306 	}
    307 	return s
    308 }
    309 
    310 // maskURL replaces the userinfo (credentials) in a URL with ***.
    311 // e.g. https://token:x@github.com/org/repo → https://***@github.com/org/repo
    312 func maskURL(raw string) string {
    313 	if i := strings.Index(raw, "@"); i != -1 {
    314 		if j := strings.Index(raw, "://"); j != -1 && j < i {
    315 			return raw[:j+3] + "***" + raw[i:]
    316 		}
    317 	}
    318 	return raw
    319 }
    320