Coverage for jaypore_ci/jci.py: 85%

229 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-30 09:04 +0000

1""" 

2The code submodule for Jaypore CI. 

3""" 

4import time 

5import os 

6from itertools import product 

7from collections import defaultdict 

8from typing import List, Union, Callable 

9from contextlib import contextmanager 

10 

11import structlog 

12import pendulum 

13 

14from jaypore_ci.exceptions import BadConfig 

15from jaypore_ci.config import const 

16from jaypore_ci.changelog import version_map 

17from jaypore_ci import remotes, executors, reporters, repos, clean 

18from jaypore_ci.interfaces import ( 

19 Remote, 

20 Executor, 

21 Reporter, 

22 TriggerFailed, 

23 Status, 

24 Repo, 

25) 

26from jaypore_ci.logging import logger 

27 

28TZ = "UTC" 

29 

30__all__ = ["Pipeline", "Job"] 

31 

32 

33# All of these statuses are considered "finished" statuses 

34FIN_STATUSES = (Status.FAILED, Status.PASSED, Status.TIMEOUT, Status.SKIPPED) 

35PREFIX = "JAYPORE_" 

36 

37# Check if we need to upgrade Jaypore CI 

38def ensure_version_is_correct() -> None: 

39 """ 

40 Ensure that the version of Jaypore CI that is running, the code inside 

41 cicd.py, and pre-push.sh are at compatible versions. 

42 

43 If versions do not match then this function will print out instructions on 

44 what to do in order to upgrade. 

45 

46 Downgrades are not allowed, you need to re-install that specific version. 

47 """ 

48 if ( 48 ↛ 53line 48 didn't jump to line 53

49 const.expected_version is not None 

50 and const.version is not None 

51 and const.expected_version != const.version 

52 ): 

53 print("Expected : ", const.expected_version) 

54 print("Got : ", const.version) 

55 if const.version > const.expected_version: 

56 print( 

57 "Your current version is higher than the expected one. Please " 

58 "re-install Jaypore CI in this repo as downgrades are not " 

59 "supported." 

60 ) 

61 if const.version < const.expected_version: 

62 print("--- Upgrade Instructions ---") 

63 for version in sorted(version_map.keys()): 

64 if version < const.version or version > const.expected_version: 

65 continue 

66 for line in version_map[version]["instructions"]: 

67 print(line) 

68 print("--- -------------------- ---") 

69 raise BadConfig( 

70 "Version mismatch between arjoonn/jci:<tag> docker container and pre-push.sh script" 

71 ) 

72 

73 

74class Job: # pylint: disable=too-many-instance-attributes 

75 """ 

76 This is the fundamental building block for running jobs. 

77 Each job goes through a lifecycle defined by 

78 :class:`~jaypore_ci.interfaces.Status`. 

79 

80 A job is run by an :class:`~jaypore_ci.interfaces.Executor` as part of a 

81 :class:`~jaypore_ci.jci.Pipeline`. 

82 

83 It is never created manually. The correct way to create a job is to use 

84 :meth:`~jaypore_ci.jci.Pipeline.job`. 

85 

86 :param name: The name for the job. Names must be unique across 

87 jobs and stages. 

88 :param command: The command that we need to run for the job. It can 

89 be set to `None` when `is_service` is True. 

90 :param is_service: Is this job a service or not? Service jobs are 

91 assumed to be 

92 :class:`~jaypore_ci.interfaces.Status.PASSED` as 

93 long as they start. They are shut down when the 

94 entire pipeline has finished executing. 

95 :param pipeline: The pipeline this job is associated with. 

96 :param status: The :class:`~jaypore_ci.interfaces.Status` of this job. 

97 :param image: What docker image to use for this job. 

98 :param timeout: Defines how long a job is allowed to run before being 

99 killed and marked as 

100 class:`~jaypore_ci.interfaces.Status.FAILED`. 

101 :param env: A dictionary of environment variables to pass to 

102 the docker run command. 

103 :param children: Defines which jobs depend on this job's output 

104 status. 

105 :param parents: Defines which jobs need to pass before this job can 

106 be run. 

107 :param stage: What stage the job belongs to. This stage name must 

108 exist so that we can assign jobs to it. 

109 :param executor_kwargs: A dictionary of keyword arguments that the executor 

110 can use when running a job. Different executors may 

111 use this in different ways, for example with the 

112 :class:`~jaypore_ci.executors.docker.Docker` 

113 executor this may be used to run jobs with 

114 `--add-host or --device 

115 <https://docker-py.readthedocs.io/en/stable/containers.html#docker.models.containers.ContainerCollection.run>`_ 

116 . 

117 """ 

118 

119 def __init__( 

120 self, 

121 name: str, 

122 command: Union[str, Callable], 

123 pipeline: "Pipeline", 

124 *, 

125 status: str = None, 

126 children: List["Job"] = None, 

127 parents: List["Job"] = None, 

128 is_service: bool = False, 

129 stage: str = None, 

130 # --- executor kwargs 

131 image: str = None, 

132 timeout: int = None, 

133 env: dict = None, 

134 executor_kwargs: dict = None, 

135 ): 

136 self.name = name 

137 self.command = command 

138 self.image = image 

139 self.status = status 

140 self.run_state = None 

141 self.timeout = timeout 

142 self.pipeline = pipeline 

143 self.env = env 

144 self.children = children if children is not None else [] 

145 self.parents = parents if parents is not None else [] 

146 self.is_service = is_service 

147 self.stage = stage 

148 self.executor_kwargs = executor_kwargs if executor_kwargs is not None else {} 

149 # --- run information 

150 self.logs = defaultdict(list) 

151 self.job_id = id(self) 

152 self.run_id = None 

153 self.run_start = None 

154 self.last_check = None 

155 

156 def logging(self): 

157 """ 

158 Returns a logging instance that has job specific information bound to 

159 it. 

160 """ 

161 return self.pipeline.logging().bind( 

162 job_id=self.job_id, 

163 job_name=self.name, 

164 run_id=self.run_id, 

165 ) 

166 

167 def update_report(self) -> str: 

168 """ 

169 Update the status report. Usually called when a job changes some of 

170 it's internal state like when logs are updated or when status has 

171 changed. 

172 """ 

173 self.logging().debug("Update report") 

174 status = { 

175 Status.PENDING: "pending", 

176 Status.RUNNING: "pending", 

177 Status.FAILED: "failure", 

178 Status.PASSED: "success", 

179 Status.TIMEOUT: "warning", 

180 Status.SKIPPED: "warning", 

181 }[self.pipeline.get_status()] 

182 report = self.pipeline.reporter.render(self.pipeline) 

183 with open("/jaypore_ci/run/jaypore_ci.status.txt", "w", encoding="utf-8") as fl: 

184 fl.write(report) 

185 self.pipeline.remote.publish(report, status) 

186 return report 

187 

188 def trigger(self): 

189 """ 

190 Trigger the job via the pipeline's executor. 

191 This will immediately return and will not wait for the job to finish. 

192 

193 It is also idempotent. Calling this multiple times will only trigger 

194 the job once. 

195 """ 

196 if self.status == Status.PENDING: 

197 self.run_start = pendulum.now(TZ) 

198 self.logging().info("Trigger called") 

199 self.status = Status.RUNNING 

200 if isinstance(self.command, str): 200 ↛ 213line 200 didn't jump to line 213, because the condition on line 200 was never false

201 try: 

202 self.run_id = self.pipeline.executor.run(self) 

203 self.logging().info("Trigger done") 

204 except TriggerFailed as e: 

205 self.logging().error( 

206 "Trigger failed", 

207 error=e, 

208 job_name=self.name, 

209 ) 

210 self.status = Status.FAILED 

211 else: 

212 self.logging().info("Trigger called but job already running") 

213 self.check_job() 

214 

215 def check_job(self, *, with_update_report=True): 

216 """ 

217 This will check the status of the job. 

218 If `with_update_report` is False, it will not push an update to the remote. 

219 """ 

220 if isinstance(self.command, str) and self.run_id is not None: 

221 self.logging().debug("Checking job run") 

222 self.run_state = self.pipeline.executor.get_status(self.run_id) 

223 self.last_check = pendulum.now(TZ) 

224 self.logging().debug( 

225 "Job run status found", 

226 is_running=self.run_state.is_running, 

227 exit_code=self.run_state.exit_code, 

228 ) 

229 if self.run_state.is_running: 

230 self.status = Status.RUNNING if not self.is_service else Status.PASSED 

231 else: 

232 self.status = ( 

233 Status.PASSED if self.run_state.exit_code == 0 else Status.FAILED 

234 ) 

235 self.logs["stdout"] = reporters.clean_logs(self.run_state.logs) 

236 if with_update_report: 

237 self.update_report() 

238 

239 def is_complete(self) -> bool: 

240 """ 

241 Is this job complete? It could have passed/ failed etc. 

242 We no longer need to check for updates in a complete job. 

243 """ 

244 return self.status in FIN_STATUSES 

245 

246 def get_env(self): 

247 """ 

248 Gets the environment variables for a given job. 

249 Order of precedence for setting values is: 

250 

251 1. Pipeline 

252 2. Stage 

253 3. Job 

254 """ 

255 env = { 

256 k[len(PREFIX) :]: v for k, v in os.environ.items() if k.startswith(PREFIX) 

257 } 

258 env.update(self.pipeline.pipe_kwargs.get("env", {})) 

259 env.update(self.env) # Includes env specified in stage kwargs AND job kwargs 

260 return env 

261 

262 

263class Pipeline: # pylint: disable=too-many-instance-attributes 

264 """ 

265 A pipeline acts as a controlling/organizing mechanism for multiple jobs. 

266 

267 :param repo: Provides information about the codebase. 

268 :param reporter: Provides reports based on the state of the pipeline. 

269 :param remote: Allows us to publish reports to somewhere like gitea/email. 

270 :param executor: Runs the specified jobs. 

271 :param poll_interval: Defines how frequently (in seconds) to check the 

272 pipeline status and publish a report. 

273 """ 

274 

275 # We need a way to avoid actually running the examples. Something like a 

276 # "dry-run" option so that only the building of the config is done and it's 

277 # never actually run. It might be a good idea to make this an actual config 

278 # variable but I'm not sure if we should do that or not. 

279 __run_on_exit__ = True 

280 

281 def __init__( # pylint: disable=too-many-arguments 

282 self, 

283 *, 

284 repo: Repo = None, 

285 remote: Remote = None, 

286 executor: Executor = None, 

287 reporter: Reporter = None, 

288 poll_interval: int = 10, 

289 **kwargs, 

290 ) -> "Pipeline": 

291 self.jobs = {} 

292 self.services = [] 

293 self.should_pass_called = set() 

294 self.repo = repo if repo is not None else repos.Git.from_env() 

295 self.remote = ( 

296 remote 

297 if remote is not None 

298 else remotes.gitea.Gitea.from_env(repo=self.repo) 

299 ) 

300 self.executor = executor if executor is not None else executors.docker.Docker() 

301 self.reporter = reporter if reporter is not None else reporters.text.Text() 

302 self.poll_interval = poll_interval 

303 self.stages = ["Pipeline"] 

304 self.__pipe_id__ = None 

305 self.executor.set_pipeline(self) 

306 # --- 

307 kwargs["image"] = kwargs.get("image", "arjoonn/jci") 

308 kwargs["timeout"] = kwargs.get("timeout", 15 * 60) 

309 kwargs["env"] = kwargs.get("env", {}) 

310 kwargs["stage"] = "Pipeline" 

311 self.pipe_kwargs = kwargs 

312 self.stage_kwargs = None 

313 

314 @property 

315 def pipe_id(self): 

316 if self.__pipe_id__ is None: 316 ↛ 318line 316 didn't jump to line 318, because the condition on line 316 was never false

317 self.__pipe_id__ = self.__get_pipe_id__() 

318 return self.__pipe_id__ 

319 

320 def __get_pipe_id__(self): 

321 """ 

322 This is mainly here so that during testing we can override this and 

323 provide a different way to get the pipe id 

324 """ 

325 with open(f"/jaypore_ci/cidfiles/{self.repo.sha}", "r", encoding="utf-8") as fl: 

326 return fl.read().strip() 

327 

328 def logging(self): 

329 """ 

330 Return a logger with information about the current pipeline bound to 

331 it. 

332 """ 

333 return logger.bind( 

334 **{ 

335 **structlog.get_context(self.remote.logging()), 

336 **structlog.get_context(self.executor.logging()), 

337 "pipe_id": id(self), 

338 } 

339 ) 

340 

341 def __enter__(self): 

342 ensure_version_is_correct() 

343 self.executor.setup() 

344 self.remote.setup() 

345 return self 

346 

347 def __exit__(self, exc_type, exc_value, traceback): 

348 if Pipeline.__run_on_exit__: 

349 self.run() 

350 self.executor.teardown() 

351 self.remote.teardown() 

352 return False 

353 

354 def get_status(self) -> Status: 

355 """ 

356 Calculates a pipeline's status based on the status of it's jobs. 

357 """ 

358 for job in self.jobs.values(): 

359 if job.status == Status.RUNNING: 

360 return Status.RUNNING 

361 service = None 

362 for service in self.services: 362 ↛ 363line 362 didn't jump to line 363, because the loop on line 362 never started

363 service.check_job(with_update_report=False) 

364 if service is not None: 364 ↛ 365line 364 didn't jump to line 365, because the condition on line 364 was never true

365 service.check_job(with_update_report=False) 

366 has_pending = False 

367 for job in self.jobs.values(): 

368 job.check_job(with_update_report=False) 

369 if not job.is_complete(): 

370 has_pending = True 

371 else: 

372 if job.status != Status.PASSED: 372 ↛ 373line 372 didn't jump to line 373, because the condition on line 372 was never true

373 return Status.FAILED 

374 return Status.PENDING if has_pending else Status.PASSED 

375 

376 def get_status_dot(self) -> str: 

377 """ 

378 Get's the status dot for the pipeline. 

379 """ 

380 if self.get_status() == Status.PASSED: 

381 return "🟢" 

382 if self.get_status() == Status.FAILED: 382 ↛ 383line 382 didn't jump to line 383, because the condition on line 382 was never true

383 return "🔴" 

384 if self.get_status() == Status.SKIPPED: 384 ↛ 385line 384 didn't jump to line 385, because the condition on line 384 was never true

385 return "🔵" 

386 return "🟡" 

387 

388 def job( 

389 self, 

390 name: str, 

391 command: str, 

392 *, 

393 depends_on: List[str] = None, 

394 **kwargs, 

395 ) -> Job: 

396 """ 

397 Creates a :class:`~jaypore_ci.jci.Job` instance based on the 

398 pipeline/stage that it is being defined in. See 

399 :class:`~jaypore_ci.jci.Job` for details on what parameters can be 

400 passed to the job. 

401 """ 

402 depends_on = [] if depends_on is None else depends_on 

403 depends_on = [depends_on] if isinstance(depends_on, str) else depends_on 

404 name = clean.name(name) 

405 assert name, "Name should have some value after it is cleaned" 

406 assert name not in self.jobs, f"{name} already defined" 

407 assert name not in self.stages, "Stage name cannot match a job's name" 

408 kwargs, job_kwargs = dict(self.pipe_kwargs), kwargs 

409 kwargs.update(self.stage_kwargs if self.stage_kwargs is not None else {}) 

410 kwargs.update(job_kwargs) 

411 if not kwargs.get("is_service"): 

412 assert command, f"Command: {command}" 

413 job = Job( 

414 name=name if name is not None else " ", 

415 command=command, 

416 status=Status.PENDING, 

417 pipeline=self, 

418 children=[], 

419 parents=depends_on, 

420 **kwargs, 

421 ) 

422 for parent_name in depends_on: 

423 assert ( 

424 parent_name in self.jobs 

425 ), f"Parent job has to be defined before a child. Cannot find {parent_name}" 

426 parent = self.jobs[parent_name] 

427 assert parent.stage == job.stage, "Cannot have dependencies across stages" 

428 self.jobs[name] = job 

429 if kwargs.get("is_service"): 

430 self.services.append(job) 

431 return job 

432 

433 @classmethod 

434 def env_matrix(cls, **kwargs): 

435 """ 

436 Return a cartesian product of all the provided kwargs. 

437 """ 

438 keys = list(sorted(kwargs.keys())) 

439 for values in product(*[kwargs[key] for key in keys]): 

440 yield dict(list(zip(keys, values))) 

441 

442 def __ensure_duplex__(self): 

443 for name, job in self.jobs.items(): 

444 for parent_name in job.parents: 

445 parent = self.jobs[parent_name] 

446 parent.children = list(sorted(set(parent.children).union(set([name])))) 

447 

448 def run(self): 

449 """ 

450 Run the pipeline. This is always called automatically when the context 

451 of the pipeline declaration finishes and so unless you are doing 

452 something fancy you don't need to call this manually. 

453 """ 

454 self.__ensure_duplex__() 

455 # Run stages one by one 

456 job = None 

457 for stage in self.stages: 

458 # --- Trigger starting jobs 

459 jobs = {name: job for name, job in self.jobs.items() if job.stage == stage} 

460 for name in {job.name for job in jobs.values() if not job.parents}: 

461 jobs[name].trigger() 

462 # --- monitor and ensure all jobs run 

463 while not all(job.is_complete() for job in jobs.values()): 

464 for job in jobs.values(): 

465 job.check_job(with_update_report=False) 

466 if not job.is_complete(): 

467 # If all dependencies are met: trigger 

468 if len(job.parents) == 0 or all( 

469 jobs[parent_name].is_complete() 

470 and jobs[parent_name].status == Status.PASSED 

471 for parent_name in job.parents 

472 ): 

473 job.trigger() 

474 elif any( 474 ↛ 479line 474 didn't jump to line 479, because the condition on line 474 was never true

475 jobs[parent_name].is_complete() 

476 and jobs[parent_name].status != Status.PASSED 

477 for parent_name in job.parents 

478 ): 

479 job.status = Status.SKIPPED 

480 job.check_job() 

481 time.sleep(self.poll_interval) 

482 # --- has this stage passed? 

483 if not all( 483 ↛ 487line 483 didn't jump to line 487, because the condition on line 483 was never true

484 job.is_complete() and job.status == Status.PASSED 

485 for job in jobs.values() 

486 ): 

487 self.logging().error("Stage failed") 

488 job.update_report() 

489 break 

490 self.logging().error("Pipeline passed") 

491 if job is not None: 

492 report = job.update_report() 

493 self.logging().info("Report:", report=report) 

494 

495 @contextmanager 

496 def stage(self, name, **kwargs): 

497 """ 

498 A stage in a pipeline. 

499 

500 Any kwargs passed to this stage are supplied to jobs created within 

501 this stage. 

502 """ 

503 name = clean.name(name) 

504 assert name, "Name should have some value after it is cleaned" 

505 assert name not in self.jobs, "Stage name cannot match a job's name" 

506 assert name not in self.stages, "Stage names cannot be re-used" 

507 self.stages.append(name) 

508 kwargs["stage"] = name 

509 self.stage_kwargs = kwargs 

510 yield # ------------------------- 

511 self.stage_kwargs = None