Source code for jaypore_ci.executors.docker

"""
A docker executor for Jaypore CI.
"""
from copy import deepcopy

import pendulum
import docker
from rich import print as rprint
from tqdm import tqdm

from jaypore_ci import clean
from jaypore_ci.interfaces import Executor, TriggerFailed, JobStatus
from jaypore_ci.logging import logger


[docs]class Docker(Executor): """ Run jobs via docker. To communicate with docker we use the `Python docker sdk <https://docker-py.readthedocs.io/en/stable/client.html>`_. Using this executor will: - Create a separate network for each run - Run jobs as part of the network - Clean up all jobs when the pipeline exits. """ def __init__(self): super().__init__() self.pipe_id = None self.pipeline = None self.docker = docker.from_env() self.client = docker.APIClient() self.__execution_order__ = []
[docs] def logging(self): """ Returns a logging instance that has executor specific information bound to it. """ return logger.bind(pipe_id=self.pipe_id, network_name=self.get_net())
[docs] def set_pipeline(self, pipeline): """ Set executor's pipeline to the given one. This will clean up old networks and create new ones. """ if self.pipe_id is not None: self.delete_network() self.delete_all_jobs() self.pipe_id = pipeline.pipe_id self.pipeline = pipeline self.create_network()
[docs] def teardown(self): self.delete_network() self.delete_all_jobs()
[docs] def setup(self): self.delete_old_containers()
[docs] def delete_old_containers(self): a_week_back = pendulum.now().subtract(days=7) pipe_ids_removed = set() for container in tqdm( self.docker.containers.list(filters={"status": "exited"}), desc="Removing jobs older than a week", ): if "jayporeci_" not in container.name: continue if "__job__" in container.name: pipe_ids_removed.add( container.name.split("__job__")[1].split("__", 1)[0] ) finished_at = pendulum.parse(container.attrs["State"]["FinishedAt"]) if finished_at <= a_week_back: container.remove(v=True) for network in tqdm( self.docker.networks.list( names=[self.get_net(pipe_id=pipe_id) for pipe_id in pipe_ids_removed] ), desc="Removing related networks", ): network.remove()
[docs] def get_net(self, *, pipe_id=None): """ Return a network name based on what the curent pipeline is. """ pipe_id = pipe_id if pipe_id is not None else self.pipe_id return f"jayporeci__net__{pipe_id}" if pipe_id is not None else None
[docs] def create_network(self): """ Will create a docker network. If it fails to do so in 3 attempts it will raise an exception and fail. """ assert self.pipe_id is not None, "Cannot create network if pipe is not set" for _ in range(3): if len(self.docker.networks.list(names=[self.get_net()])) != 0: self.logging().info("Found network", network_name=self.get_net()) return self.logging().info( "Create network", subprocess=self.docker.networks.create( name=self.get_net(), driver="bridge" ), ) raise TriggerFailed("Cannot create network")
[docs] def delete_all_jobs(self): """ Deletes all jobs associated with the pipeline for this executor. It will stop any jobs that are still running. """ assert self.pipe_id is not None, "Cannot delete jobs if pipe is not set" job = None for job in self.pipeline.jobs.values(): if job.run_id is not None and not job.run_id.startswith("pyrun_"): container = self.docker.containers.get(job.run_id) container.stop(timeout=1) self.logging().info("Stop job:", run_id=job.run_id) job.check_job(with_update_report=False) if job is not None: job.check_job() self.logging().info("All jobs stopped")
[docs] def delete_network(self): """ Delete the network for this executor. """ assert self.pipe_id is not None, "Cannot delete network if pipe is not set" try: net = self.docker.networks.get(self.get_net()) net.remove() except docker.errors.NotFound: self.logging().error("Delete network: Not found", netid=self.get_net())
[docs] def get_job_name(self, job, tail=False): """ Generates a clean job name slug. """ name = clean.name(job.name) if tail: return name return f"jayporeci__job__{self.pipe_id}__{name}"
[docs] def run(self, job: "Job") -> str: """ Run the given job and return a docker container ID. In case something goes wrong it will raise TriggerFailed """ assert self.pipe_id is not None, "Cannot run job if pipe id is not set" ex_kwargs = deepcopy(job.executor_kwargs) env = job.get_env() env.update(ex_kwargs.pop("environment", {})) trigger = { "detach": True, "environment": env, "volumes": list( set( [ "/var/run/docker.sock:/var/run/docker.sock", "/usr/bin/docker:/usr/bin/docker:ro", "/tmp/jayporeci__cidfiles:/jaypore_ci/cidfiles:ro", f"/tmp/jayporeci__src__{self.pipeline.remote.sha}:/jaypore_ci/run", ] + (ex_kwargs.pop("volumes", [])) ) ), "name": self.get_job_name(job), "network": self.get_net(), "image": job.image, "command": job.command if not job.is_service else None, } for key, value in ex_kwargs.items(): if key in trigger: self.logging().warning( f"Overwriting existing value of `{key}` for job trigger.", old_value=trigger[key], new_value=value, ) trigger[key] = value if not job.is_service: trigger["working_dir"] = "/jaypore_ci/run" if not job.is_service: assert job.command rprint(trigger) try: container = self.docker.containers.run(**trigger) self.__execution_order__.append( (self.get_job_name(job, tail=True), container.id, "Run") ) return container.id except docker.errors.APIError as e: self.logging().exception(e) raise TriggerFailed(e) from e
[docs] def get_status(self, run_id: str) -> JobStatus: """ Given a run_id, it will get the status for that run. """ inspect = self.client.inspect_container(run_id) status = JobStatus( is_running=inspect["State"]["Running"], exit_code=int(inspect["State"]["ExitCode"]), logs="", started_at=pendulum.parse(inspect["State"]["StartedAt"]), finished_at=pendulum.parse(inspect["State"]["FinishedAt"]) if inspect["State"]["FinishedAt"] != "0001-01-01T00:00:00Z" else None, ) # --- logs self.logging().debug("Check status", status=status) logs = self.docker.containers.get(run_id).logs().decode() return status._replace(logs=logs)
[docs] def get_execution_order(self): return {name: i for i, (name, *_) in enumerate(self.__execution_order__)}