commit a7c14b0de54593b78ea482aa3e03542dc59d394a
parent 2c3ef9ed08f9f35a06c31cbb627f01c48deb434b
Author: arjoonn <arjoonn@noreply.localhost>
Date: Wed, 21 Dec 2022 19:03:51 +0000
Allow stages and specify job-level depends_on (!7)
Branch auto created by JayporeCI
<details>
<summary>JayporeCi: 🟢 cee220b7e5</summary>
```mermaid
flowchart TB
subgraph Pipeline
direction TB
end
subgraph Docker
direction TB
s_Docker(( )) -.-> Docker_0(Jci):::passed
s_Docker(( )) -.-> Docker_1(JciEnv):::passed
end
subgraph Checks
direction TB
s_Checks(( )) -.-> Checks_0(pylint):::passed
s_Checks(( )) -.-> Checks_1(pytest):::passed
s_Checks(( )) -.-> Checks_2(black):::passed
end
Pipeline ---> Docker
Docker ---> Checks
classDef pending fill:#aaa, color:black, stroke:black,stroke-width:2px,stroke-dasharray: 5 5;
classDef skipped fill:#aaa, color:black, stroke:black,stroke-width:2px;
classDef assigned fill:#ddd, color:black, stroke:black,stroke-width:2px;
classDef running fill:#bae1ff,color:black,stroke:black,stroke-width:2px,stroke-dasharray: 5 5;
classDef passed fill:#88d8b0, color:black, stroke:black;
classDef failed fill:#ff6f69, color:black, stroke:black;
classDef timeout fill:#ffda9e, color:black, stroke:black;
```
Co-authored-by: arjoonn sharma <arjoonn@midpathsoftware.com>
Reviewed-on: https://gitea.midpathsoftware.com/midpath/jaypore_ci/pulls/7
Diffstat:
7 files changed, 258 insertions(+), 346 deletions(-)
diff --git a/Dockerfile b/Dockerfile
@@ -1,12 +1,5 @@
-from python:3.11
-run python3 -m pip install --upgrade pip
-run python3 -m pip install poetry
+from python:3.11 as jcibase
workdir /app
-add pyproject.toml .
-add poetry.lock .
-run poetry export --with dev > req.txt
-run python3 -m pip install -r req.txt
-# Install docker
run apt-get update
run apt-get install ca-certificates curl gnupg lsb-release -y
run mkdir -p /etc/apt/keyrings
@@ -14,7 +7,17 @@ run curl -fsSL https://download.docker.com/linux/debian/gpg | gpg --dearmor
run echo "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.gpg] https://download.docker.com/linux/debian $(lsb_release -cs) stable" | tee /etc/apt/sources.list.d/docker.list > /dev/null
run apt-get update
run apt-get install docker-ce docker-ce-cli containerd.io docker-compose-plugin tree -y
-# Add jaypore_ci to this image
+
+from jcibase as jcienv
+run python3 -m pip install --upgrade pip
+run python3 -m pip install poetry
+add pyproject.toml .
+add poetry.lock .
+run poetry export --with dev > req.txt
+run python3 -m pip install -r req.txt
+env PYTHONPATH=/jaypore_ci/run/
+
+from jcienv as jci
add jaypore_ci/ /app/jaypore_ci
run poetry build
run ls -alR dist
diff --git a/README.md b/README.md
@@ -27,12 +27,10 @@ curl https://raw.githubusercontent.com/theSage21/jaypore_ci/main/setup.sh | bash
```python
from jaypore_ci import jci
- with jci.Pipeline( image="arjoonn/jaypore_ci:latest", timeout=15 * 60) as p:
- p.in_parallel(
- p.job("python3 -m black --check .", name="Black"),
- p.job("python3 -m pylint jaypore_ci/ tests/", name="PyLint"),
- p.job("python3 -m pytest tests/", name="PyTest"),
- ).should_pass()
+ with jci.Pipeline() as p:
+ p.job("Black", "python3 -m black --check .")
+ p.job("Pylint", "python3 -m pylint jaypore_ci/ tests/")
+ p.job("PyTest", "python3 -m pytest tests/")
```
</summary>
</details>
@@ -42,12 +40,10 @@ curl https://raw.githubusercontent.com/theSage21/jaypore_ci/main/setup.sh | bash
```python
from jaypore_ci import jci
- with jci.Pipeline(image="scratch", timeout=15 * 60) as p:
- p.image = image = f'myproject_{p.remote.sha}'
- p.in_sequence(
- p.job(f"docker build -t {image} .", name="Docker image"),
- p.job("python3 -m pytest tests/", name="PyTest"),
- ).should_pass()
+ with jci.Pipeline() as p:
+ image = f"myproject_{p.remote.sha}"
+ p.job("Docker", f"docker build -t {image} .")
+ p.job("PyTest", "python3 -m pytest tests/", depends_on=["PyTest"])
```
</summary>
</details>
@@ -57,35 +53,33 @@ curl https://raw.githubusercontent.com/theSage21/jaypore_ci/main/setup.sh | bash
```python
from jaypore_ci import jci
- with jci.Pipeline(image="arjoonn/jaypore_ci:latest", timeout=15 * 60) as p:
- p.image = image = f"myproject_{p.remote.sha}"
-
- p.in_sequence(
- p.job(f"docker build -t {image} .", name="Docker image"),
+ with jci.Pipeline() as p:
+ image = f"myproject_{p.remote.sha}"
+ with p.stage("build"):
+ p.job("DockProd", f"docker build --target ProdEnv -t {image}_prod .")
+ p.job("DockDev", f"docker build --target DevEnv -t {image}_dev .")
+ with p.stage("checking"):
+ p.job("UnitTest", "python3 -m pytest -m unit tests/")
+ p.job("PyLint", "python3 -m pylint src/")
+ p.job("Black", "python3 -m black --check .")
+ p.job(
+ "IntegrationTest",
+ "python3 -m pytest -m integration tests/",
+ depends_on=["PyLint", "UnitTest"],
+ )
+ with p.stage("publish"):
+ p.job("TagProd", f"docker tag -t {image}_prod hub/{image}_prod:{p.remote.sha}")
+ p.job("TagDev", f"docker tag -t {image}_dev hub/{image}_dev:{p.remote.sha}")
p.job(
- f"docker tag -t {image} dockerhubaccount/{image}:{p.remote.sha}",
- name="Docker tag",
- ),
+ "PushProd",
+ f"docker push hub/{image}_prod:{p.remote.sha}",
+ depends_on=["TagProd"],
+ )
p.job(
- f"docker push dockerhubaccount/{image}:{p.remote.sha}", name="Docker push"
- ),
- p.in_parallel(
- p.job("python3 -m pytest tests/", name="PyTest"),
- p.job("python3 -m pylint src/", name="PyLint"),
- p.job("python3 -m black --check .", name="Black"),
- ),
- p.in_parallel(
- p.in_sequence(
- p.job("poetry build", name="pypi build"),
- p.job("poetry publish", name="pypi publish"),
- ),
- p.job("python3 -m create_release_notes", name="release notes"),
- p.job(
- "python3 -m send_emails_to_downstream_packagers_and_maintainers",
- name="Notify downstream",
- ),
- ),
- ).should_pass()
+ "PushDev",
+ f"docker push hub/{image}_dev:{p.remote.sha}",
+ depends_on=["TagDev"],
+ )
```
</summary>
</details>
@@ -95,18 +89,15 @@ curl https://raw.githubusercontent.com/theSage21/jaypore_ci/main/setup.sh | bash
```python
from jaypore_ci import jci
- with jci.Pipeline(image="arjoonn/jaypore_ci:latest", timeout=15 * 60) as p:
- jobs = [
- p.job("python3 -m pytest tests", name=f"Tests: {env}", env=env)
- for env in p.env_matrix(
- BROWSER=["firefox", "chromium", "webkit"],
- SCREENSIZE=["phone", "laptop", "extended"],
- ONLINE=["online", "offline"],
- )
- ]
+ with jci.Pipeline() as p:
# This will have 18 jobs
# one for each possible combination of BROWSER, SCREENSIZE, ONLINE
- p.in_parallel(*jobs).should_pass()
+ for env in p.env_matrix(
+ BROWSER=["firefox", "chromium", "webkit"],
+ SCREENSIZE=["phone", "laptop", "extended"],
+ ONLINE=["online", "offline"],
+ ):
+ p.job(f"Test: {env}", "python3 -m pytest tests", env=env)
```
</summary>
</details>
@@ -126,19 +117,15 @@ curl https://raw.githubusercontent.com/theSage21/jaypore_ci/main/setup.sh | bash
# Services immediately return with a PASSED status
# If they exit with a Non ZERO code they are marked as FAILED, otherwise
# they are assumed to be PASSED
- with jci.Pipeline(image="arjoonn/jaypore_ci:latest", timeout=15 * 60) as p:
- p.in_sequence(
- p.in_parallel(
- p.job(image='mysql', name='Mysql', is_service=True),
- p.job(image='redis', name='Redis', is_service=True),
- p.job("python3 -m src.run_api", name='Myrepo:Api', is_service=True),
- ),
- p.in_parallel(
- p.job("python3 -m pytest -m unit_tests tests", name="Testing:Unit"),
- p.job("python3 -m pytest -m integration_tests tests", name="Testing:Integration"),
- p.job("python3 -m pytest -m regression_tests tests", name="Testing:Regression"),
- )
- ).should_pass()
+ with jci.Pipeline() as p:
+ with p.stage("Services", is_service=True):
+ p.job("Mysql", None, image="mysql")
+ p.job("Redis", None, image="redis")
+ p.job("Api", "python3 -m src.run_api", image="python:3.11")
+ with p.stage("Testing"):
+ p.job("UnitTest", "python3 -m pytest -m unit_tests tests")
+ p.job("IntegrationTest", "python3 -m pytest -m integration_tests tests")
+ p.job("RegressionTest", "python3 -m pytest -m regression_tests tests")
```
</summary>
</details>
diff --git a/cicd/cicd.py b/cicd/cicd.py
@@ -1,18 +1,12 @@
from jaypore_ci import jci
-with jci.Pipeline(image="Will set later", timeout=15 * 60) as p:
- p.image = image = f"jaypore_image_{p.remote.sha}"
- p.in_sequence(
- p.job(
- f"docker build -t {image} .",
- image="arjoonn/jaypore_ci:latest",
- name="Docker build",
- ),
- p.in_parallel(
- p.job("pwd", name="Pwd"),
- p.job("tree", name="Tree"),
- p.job("python3 -m black --check .", name="Black"),
- p.job("python3 -m pylint jaypore_ci/ tests/", name="PyLint"),
- p.job("python3 -m pytest tests/", name="PyTest"),
- ),
- ).should_pass()
+
+with jci.Pipeline() as p:
+ jcienv = f"jcienv:{p.remote.sha}"
+ with p.stage("Docker"):
+ p.job("JciEnv", f"docker build --target jcienv -t {jcienv} .")
+ p.job("Jci", f"docker build --target jci -t jci:{p.remote.sha} .")
+ with p.stage("Checks"):
+ p.job("black", "python3 -m black --check .")
+ p.job("pylint", "python3 -m pylint jaypore_ci/ tests/")
+ p.job("pytest", "python3 -m pytest tests/")
diff --git a/cicd/pre-push.githook b/cicd/pre-push.githook
@@ -9,8 +9,8 @@ main() {
SHA=$(git rev-parse HEAD)
REPO_ROOT=$(git rev-parse --show-toplevel)
TOKEN=$(echo "url=$(git remote -v|grep push|awk '{print $2}')"|git credential fill|grep password|awk -F= '{print $2}')
- # We will mount the current dir into /jaypore/repo
- # Then we will copy things over to /jaypore/run
+ # We will mount the current dir into /jaypore_ci/repo
+ # Then we will copy things over to /jaypore_ci/run
# Then we will run git clean to remove anything that is not in git
# Then we call the actual cicd code
#
@@ -22,11 +22,11 @@ main() {
--name jaypore_ci_$SHA \
-e JAYPORE_GITEA_TOKEN \
-v /var/run/docker.sock:/var/run/docker.sock \
- -v $REPO_ROOT:/jaypore/repo:ro \
- -v /tmp/jaypore_$SHA:/jaypore/run \
- --workdir /jaypore/run \
- arjoonn/jaypore_ci:latest \
- bash -c 'cp -r /jaypore/repo/. /jaypore/run && cd /jaypore/run/ && git clean -fdx && python cicd/cicd.py'
+ -v $REPO_ROOT:/jaypore_ci/repo:ro \
+ -v /tmp/jaypore_$SHA:/jaypore_ci/run \
+ --workdir /jaypore_ci/run \
+ jcienv \
+ bash -c 'cp -r /jaypore_ci/repo/. /jaypore_ci/run && cd /jaypore_ci/run/ && git clean -fdx && python cicd/cicd.py'
echo '----------------------------------------------'
}
(main)
diff --git a/jaypore_ci/docker.py b/jaypore_ci/docker.py
@@ -67,7 +67,7 @@ class Docker(Executor):
def delete_all_jobs(self):
assert self.pipe_id is not None, "Cannot delete jobs if pipe is not set"
job = None
- for job in self.pipeline.jobs:
+ for job in self.pipeline.jobs.values():
if job.run_id is not None and not job.run_id.startswith("pyrun_"):
self.logging().info(
"Stop job:",
diff --git a/jaypore_ci/jci.py b/jaypore_ci/jci.py
@@ -4,6 +4,7 @@ from enum import Enum
from itertools import product
from collections import defaultdict, namedtuple
from typing import List, Union, Callable
+from contextlib import contextmanager
import structlog
import pendulum
@@ -55,6 +56,7 @@ class Job: # pylint: disable=too-many-instance-attributes
children: List["Job"] = None,
parents: List["Job"] = None,
is_service: bool = False,
+ stage: str = None,
):
self.name = name
self.command = command
@@ -66,6 +68,7 @@ class Job: # pylint: disable=too-many-instance-attributes
self.children = children if children is not None else []
self.parents = parents if parents is not None else []
self.is_service = is_service
+ self.stage = stage
# --- run information
self.logs = defaultdict(list)
self.job_id = id(self)
@@ -99,64 +102,6 @@ class Job: # pylint: disable=too-many-instance-attributes
}[self.pipeline.get_status()]
self.pipeline.remote.publish(self.pipeline.render_report(), status)
- def should_pass(self, *, is_internal_call=False):
- """
- This is the main thing. It allows you to run assertions on the job like:
- assert job.should_pass()
-
- This function will block until the status of the job is known.
- It will also trigger and monitor all jobs required to obtain the status
- for this job.
- """
- self.logging().info("Ok called")
- if not is_internal_call:
- self.pipeline.should_pass_called.add(self)
- self.trigger()
- if self.is_service:
- self.status = Status.PASSED
- self.logging().info("Service started successfully", status=self.status)
- else:
- self.monitor_until_completion()
- self.logging().info("Ok finished", status=self.status)
- self.update_report()
- return self.status == Status.PASSED
-
- def monitor_until_completion(self):
- while not self.is_complete():
- self.check_job()
- time.sleep(1)
- now = pendulum.now(TZ)
- if (now - self.run_start).in_seconds() > self.timeout:
- self.status = Status.TIMEOUT
- self.logging().error(
- "Timeout", seconds_elapsed=(now - self.run_start).in_seconds()
- )
- self.update_report()
- break
- self.check_job()
-
- def get_graph(self):
- """
- Given the current job, builds a graph of all jobs that are it's
- parents.
-
- Returns a set of nodes & edges.
- """
- nodes = set([self])
- edges = set()
- for parent in self.parents:
- edges.add((parent, self))
- if parent not in nodes:
- p_nodes, p_edges = parent.get_graph()
- nodes |= set(p_nodes)
- edges |= set(p_edges)
- nodes = list(sorted(nodes, key=lambda x: x.name))
- in_degree = {a: len([None for x, y in edges if y == a]) for a in nodes}
- edges = list(
- sorted(edges, key=lambda x: (in_degree[x[1]], x[0].name, x[1].name))
- )
- return nodes, edges
-
def trigger(self):
"""
Trigger the job via the pipeline's executor.
@@ -168,17 +113,14 @@ class Job: # pylint: disable=too-many-instance-attributes
self.status = Status.RUNNING
if isinstance(self.command, str):
self.run_id = self.pipeline.executor.run(self)
- else:
- self.run_id = f"pyrun_{self.job_id}"
- self.command(self)
self.logging().info("Trigger done")
else:
self.logging().info("Trigger called but job already running")
self.check_job()
def check_job(self, with_update_report=True):
- self.logging().debug("Checking job run")
if isinstance(self.command, str) and self.run_id is not None:
+ self.logging().debug("Checking job run")
is_running, exit_code, logs = self.pipeline.executor.get_status(self.run_id)
self.last_check = pendulum.now(TZ)
self.logging().debug(
@@ -204,7 +146,7 @@ class Job: # pylint: disable=too-many-instance-attributes
return self.status in FIN_STATUSES
def get_env(self):
- return {**self.pipeline.env, **self.env}
+ return {**self.pipeline.pipe_kwargs.get("env", {}), **self.env}
class Pipeline: # pylint: disable=too-many-instance-attributes
@@ -221,24 +163,25 @@ class Pipeline: # pylint: disable=too-many-instance-attributes
self,
remote: Remote = None,
executor: Executor = None,
- image: str = "python:3.11",
- timeout: int = 15 * 60,
- env: dict = None,
*,
graph_direction: str = "TB",
+ **kwargs,
) -> "Pipeline":
- self.image = image
- self.timeout = timeout
- self.env = {} if env is None else env
- self.jobs = []
+ self.jobs = {}
self.services = []
self.should_pass_called = set()
self.remote = remote if remote is not None else gitea.Gitea.from_env()
self.executor = executor if executor is not None else docker.Docker()
self.graph_direction = graph_direction
self.executor.set_pipeline(self)
+ self.stages = ["Pipeline"]
# ---
- self.seq_links = set()
+ kwargs["image"] = kwargs.get("image", "arjoonn/jaypore_ci:latest")
+ kwargs["timeout"] = kwargs.get("timeout", 15 * 60)
+ kwargs["env"] = kwargs.get("env", {})
+ kwargs["stage"] = "Pipeline"
+ self.pipe_kwargs = kwargs
+ self.stage_kwargs = None
def logging(self):
return logger.bind(
@@ -255,31 +198,39 @@ class Pipeline: # pylint: disable=too-many-instance-attributes
return self
def __exit__(self, exc_type, exc_value, traceback):
+ self.run()
self.executor.__exit__(exc_type, exc_value, traceback)
self.remote.__exit__(exc_type, exc_value, traceback)
return False
def get_status(self):
- pipe_status = Status.PENDING
- for job in self.jobs:
+ """
+ Calculates a pipeline's status
+ """
+ for job in self.jobs.values():
if job.status == Status.RUNNING:
- pipe_status = Status.RUNNING
- break
+ return Status.RUNNING
service = None
for service in self.services:
service.check_job(with_update_report=False)
if service is not None:
service.check_job()
- for job in self.should_pass_called:
+ has_pending = True
+ for job in self.jobs.values():
+ job.check_job(with_update_report=False)
if job.is_complete():
- pipe_status = job.status
- break
- return pipe_status
+ has_pending = False
+ if job.status != Status.PASSED:
+ return Status.FAILED
+ return Status.PENDING if has_pending else Status.PASSED
def get_status_dot(self):
+ """
+ Get's the status dot for the pipeline
+ """
if self.get_status() == Status.PASSED:
return "🟢"
- if self.get_status() in (Status.FAILED, Status.TIMEOUT):
+ if self.get_status() == Status.FAILED:
return "🔴"
if self.get_status() == Status.SKIPPED:
return "🔵"
@@ -290,55 +241,76 @@ class Pipeline: # pylint: disable=too-many-instance-attributes
<details>
<summary>JayporeCi: {self.get_status_dot()} {self.remote.sha[:10]}</summary>
-{self.render_graph()}
-{self.render_logs()}
+{self.__render_graph__()}
+{self.__render_logs__()}
</details>"""
- def render_graph(self) -> str:
- mermaid = ""
- for job in self.should_pass_called:
- nodes, edges = job.get_graph()
- mermaid += f"""
+ def __render_graph__(self) -> str:
+ """
+ Render a mermaid graph given the jobs in the pipeline.
+ """
+ st_map = {
+ Status.PENDING: "pending",
+ Status.RUNNING: "running",
+ Status.FAILED: "failed",
+ Status.PASSED: "passed",
+ Status.TIMEOUT: "timeout",
+ Status.SKIPPED: "skipped",
+ }
+ mermaid = f"""
```mermaid
-graph {self.graph_direction}
+flowchart {self.graph_direction}
"""
- ref = {n: f"N{i}" for i, n in enumerate(nodes)}
- st_map = {
- Status.PENDING: "pending",
- Status.RUNNING: "running",
- Status.FAILED: "failed",
- Status.PASSED: "passed",
- Status.TIMEOUT: "timeout",
- Status.SKIPPED: "skipped",
- }
-
- for i, (a, b) in enumerate(edges):
- arrow = "-.-" if i % 2 == 0 else "-..-"
- if (a, b) in self.seq_links:
- arrow = "====>"
+ for stage in self.stages:
+ mermaid += f"""
+ subgraph {stage}
+ direction {self.graph_direction}
+ """
+ nodes, edges = set(), set()
+ for job in self.jobs.values():
+ if job.stage != stage:
+ continue
+ nodes.add(job.name)
+ edges |= {(p, job.name) for p in job.parents}
+ ref = {n: f"{stage}_{i}" for i, n in enumerate(nodes)}
+ arrow = "-.->"
+ for n in nodes:
+ n = self.jobs[n]
+ mermaid += f"""
+ s_{stage}(( )) {arrow} {ref[n.name]}({n.name}):::{st_map[n.status]}"""
+ for (a, b) in edges:
+ a, b = self.jobs[a], self.jobs[b]
mermaid += f"""
- {ref[a]}({a.name}):::{st_map[a.status]} {arrow} {ref[b]}({b.name}):::{st_map[b.status]}"""
+ {ref[a.name]}({a.name}):::{st_map[a.status]} {arrow} {ref[b.name]}({b.name}):::{st_map[b.status]}"""
mermaid += """
-
-
-
- classDef pending fill:#aaa, color:black, stroke:black,stroke-width:2px,stroke-dasharray: 5 5;
- classDef skipped fill:#aaa, color:black, stroke:black,stroke-width:2px;
- classDef assigned fill:#ddd, color:black, stroke:black,stroke-width:2px;
- classDef running fill:#bae1ff,color:black,stroke:black,stroke-width:2px,stroke-dasharray: 5 5;
- classDef passed fill:#88d8b0, color:black, stroke:black;
- classDef failed fill:#ff6f69, color:black, stroke:black;
- classDef timeout fill:#ffda9e, color:black, stroke:black;
+ end
+ """
+ for s1, s2 in zip(self.stages, self.stages[1:]):
+ mermaid += f"""
+ {s1} ---> {s2}
+ """
+ mermaid += """
+
+ classDef pending fill:#aaa, color:black, stroke:black,stroke-width:2px,stroke-dasharray: 5 5;
+ classDef skipped fill:#aaa, color:black, stroke:black,stroke-width:2px;
+ classDef assigned fill:#ddd, color:black, stroke:black,stroke-width:2px;
+ classDef running fill:#bae1ff,color:black,stroke:black,stroke-width:2px,stroke-dasharray: 5 5;
+ classDef passed fill:#88d8b0, color:black, stroke:black;
+ classDef failed fill:#ff6f69, color:black, stroke:black;
+ classDef timeout fill:#ffda9e, color:black, stroke:black;
``` """
return mermaid
- def render_logs(self):
+ def __render_logs__(self):
+ """
+ Collect all pipeline logs and render into a single collapsible text.
+ """
all_logs = []
fake_job = namedtuple("fake_job", "name logs")(
"JayporeCi", {"stdout": jaypore_logs}
)
- for job in [fake_job] + self.jobs:
+ for job in [fake_job] + list(self.jobs.values()):
job_log = []
for logname, stream in job.logs.items():
job_log += [f"============== {logname} ============="]
@@ -358,146 +330,42 @@ graph {self.graph_direction}
def job(
self,
- *commands: List[str],
name: str,
- image: str = None,
- timeout: int = None,
- env: dict = None,
- is_service: bool = False,
+ command: str,
+ *,
+ depends_on: List[str] = None,
+ **kwargs,
) -> Job:
- if not is_service:
- assert commands
+ """
+ Define a job in this pipeline.
+ """
+ depends_on = [] if depends_on is None else depends_on
+ assert name not in self.jobs
+ kwargs = dict(self.pipe_kwargs)
+ kwargs.update(self.stage_kwargs if self.stage_kwargs is not None else {})
+ kwargs.update(kwargs)
+ if not kwargs.get("is_service"):
+ assert command
job = Job(
name=name if name is not None else " ",
- command="\n".join(commands),
+ command=command,
status=Status.PENDING,
- image=image if image is not None else self.image,
- timeout=timeout if timeout is not None else self.timeout,
pipeline=self,
- env=env if env is not None else {},
children=[],
- is_service=is_service,
+ parents=depends_on,
+ **kwargs,
)
- self.jobs.append(job)
- if is_service:
+ for parent_name in depends_on:
+ assert (
+ parent_name in self.jobs
+ ), f"Parent job has to be defined before a child. Cannot find {parent_name}"
+ parent = self.jobs[parent_name]
+ assert parent.stage == job.stage, "Cannot have dependencies across stages"
+ self.jobs[name] = job
+ if kwargs.get("is_service"):
self.services.append(job)
return job
- def in_parallel(self, *jobs, image=None, timeout=None, env=None):
- jobs = [job for job in jobs if job is not None]
- timeout = (
- max(
- job.timeout if job.timeout is not None else self.timeout for job in jobs
- )
- if timeout is None
- else timeout
- )
-
- def run_and_join(job_self):
- job_self.logs["stdout"].append("Starting parallel run")
- for job in jobs:
- job_self.logs["stdout"].append(f"Trigger job: {job.job_id} {job.name}")
- job.trigger()
- something_is_running = True
- while something_is_running:
- time.sleep(1)
- something_is_running = False
- for job in jobs:
- job.check_job(with_update_report=False)
- job_self.logs["stdout"].append(
- f"Checking: {job.job_id} {job.name} is_complete: {job.is_complete()}"
- )
- if not job.is_complete():
- something_is_running = True
- if (
- job.is_complete()
- and job.status != Status.PASSED
- and job_self.status == Status.RUNNING
- ):
- job_self.status = Status.FAILED
- msg = "Dependent job failed"
- job_self.logging().error(msg, failed_job_id=job.job_id)
- job_self.logs["stdout"].append(f"{msg}: {job.job_id}")
- job.check_job()
- if job_self.status == Status.RUNNING:
- job_self.status = Status.PASSED
- job_self.logs["stdout"].append("Ok")
-
- join = Job(
- name="+",
- command=run_and_join,
- status=Status.PENDING,
- image=self.image if image is None else image,
- pipeline=self,
- env={} if env is None else env,
- children=[],
- timeout=timeout,
- parents=list(jobs),
- )
- self.jobs.append(join)
- for job in jobs:
- job.children.append(join)
- return join
-
- def in_sequence(self, *jobs, image=None, env=None, timeout=None):
- jobs = [job for job in jobs if job is not None]
-
- def run_seq(job_self):
- job_self.logs["stdout"].append("Starting sequential run")
- for job in jobs:
- if job_self.status == Status.RUNNING:
- job_self.logs["stdout"].append(
- f"Running job: {job.job_id} {job.name}"
- )
- ok = job.should_pass(is_internal_call=True)
- if not ok:
- job_self.status = Status.FAILED
- job_self.logs["stdout"].append(
- f"Failed job: {job.job_id} {job.name}"
- )
- job_self.logging().error(
- "Dependent job failed", failed_job_id=job.job_id
- )
- elif job_self.status == Status.FAILED:
- job_self.logs["stdout"].append(
- f"Skipping job: {job.job_id} {job.name}"
- )
- job.status = Status.SKIPPED
- continue
- if job_self.status == Status.RUNNING:
- job_self.status = Status.PASSED
- job_self.logs["stdout"].append("Ok")
-
- last_job = None
- for job in jobs:
- if last_job is not None:
- last_job.children.append(job)
- job.parents.append(last_job)
- self.seq_links.add((last_job, job))
- last_job = job
- # final chain job
- timeout = (
- sum(
- job.timeout if job.timeout is not None else self.timeout for job in jobs
- )
- if timeout is None
- else timeout
- )
- join = Job(
- name="+",
- command=run_seq,
- status=Status.PENDING,
- image=self.image if image is None else image,
- pipeline=self,
- env={} if env is None else env,
- children=[],
- timeout=timeout,
- parents=[last_job],
- )
- self.jobs.append(join)
- last_job.children.append(join)
- return join
-
def env_matrix(self, **kwargs):
"""
Return a cartesian product of all the provided kwargs
@@ -505,3 +373,63 @@ graph {self.graph_direction}
keys = list(sorted(kwargs.keys()))
for values in product(*[kwargs[key] for key in keys]):
yield dict(list(zip(keys, values)))
+
+ def run(self):
+ """
+ Run the pipeline.
+ """
+ # Ensure duplex connection between all nodes
+ for name, job in self.jobs.items():
+ for parent_name in job.parents:
+ parent = self.jobs[parent_name]
+ parent.children = list(sorted(set(parent.children).union(set([name]))))
+ # Run stages one by one
+ for stage in self.stages:
+ # --- Trigger starting jobs
+ jobs = {name: job for name, job in self.jobs.items() if job.stage == stage}
+ for name in {job.name for job in jobs.values() if job.parents}:
+ jobs[name].trigger()
+ # --- monitor and ensure all jobs run
+ while not all(job.is_complete() for job in jobs.values()):
+ for job in jobs.values():
+ job.check_job(with_update_report=False)
+ if not job.is_complete():
+ # If all dependencies are met: trigger
+ if len(job.parents) == 0 or all(
+ jobs[parent_name].is_complete()
+ and jobs[parent_name].status == Status.PASSED
+ for parent_name in job.parents
+ ):
+ job.trigger()
+ elif any(
+ jobs[parent_name].is_complete()
+ and jobs[parent_name].status != Status.PASSED
+ for parent_name in job.parents
+ ):
+ job.status = Status.SKIPPED
+ job.check_job()
+ time.sleep(1)
+ # --- has this stage passed?
+ if not all(
+ job.is_complete() and job.status == Status.PASSED
+ for job in jobs.values()
+ ):
+ self.logging().error("Stage failed")
+ job.update_report()
+ break
+ self.logging().error("Pipeline passed")
+ job.update_report()
+
+ @contextmanager
+ def stage(self, name, **kwargs):
+ """
+ Any kwargs passed to this stage are supplied to jobs created while
+ within this stage.
+ """
+ assert name not in self.jobs, "Stage name cannot match a job's name"
+ assert name not in self.stages, "Stage names cannot be re-used"
+ self.stages.append(name)
+ kwargs["stage"] = name
+ self.stage_kwargs = kwargs
+ yield # -------------------------
+ self.stage_kwargs = None
diff --git a/setup.sh b/setup.sh
@@ -5,7 +5,7 @@ set -o pipefail
main (){
REPO_ROOT=$(git rev-parse --show-toplevel)
LOCAL_HOOK=$(echo $REPO_ROOT/.git/hooks/pre-push)
- IMAGE='arjoonn/jaypore_ci:latest'
+ IMAGE='arjoonn/jci:latest'
echo "Working in repo: $REPO_ROOT"
mkdir $REPO_ROOT/.jaypore_ci || echo 'Moving on..'
cat > $REPO_ROOT/.jaypore_ci/cicd.py << EOF
@@ -36,8 +36,8 @@ main() {
SHA=\$(git rev-parse HEAD)
REPO_ROOT=\$(git rev-parse --show-toplevel)
TOKEN=\$(echo "url=\$(git remote -v|grep push|awk '{print \$2}')"|git credential fill|grep password|awk -F= '{print \$2}')
- # We will mount the current dir into /jaypore/repo
- # Then we will copy things over to /jaypore/run
+ # We will mount the current dir into /jaypore_ci/repo
+ # Then we will copy things over to /jaypore_ci/run
# Then we will run git clean to remove anything that is not in git
# Then we call the actual cicd code
#
@@ -49,11 +49,11 @@ main() {
--name jaypore_ci_\$SHA \\
-e JAYPORE_GITEA_TOKEN \\
-v /var/run/docker.sock:/var/run/docker.sock \\
- -v \$REPO_ROOT:/jaypore/repo:ro \\
- -v /tmp/jaypore_\$SHA:/jaypore/run \\
- --workdir /jaypore/run \\
+ -v \$REPO_ROOT:/jaypore_ci/repo:ro \\
+ -v /tmp/jaypore_\$SHA:/jaypore_ci/run \\
+ --workdir /jaypore_ci/run \\
$IMAGE \\
- bash -c 'cp -r /jaypore/repo/. /jaypore/run && cd /jaypore/run/ && git clean -fdx && python .jaypore_ci/cicd.py'
+ bash -c 'cp -r /jaypore_ci/repo/. /jaypore_ci/run && cd /jaypore_ci/run/ && git clean -fdx && python .jaypore_ci/cicd.py'
echo '----------------------------------------------'
}
(main)