Coverage for jaypore_ci/executors/docker.py: 83%

108 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-30 09:04 +0000

1""" 

2A docker executor for Jaypore CI. 

3""" 

4from copy import deepcopy 

5 

6import pendulum 

7import docker 

8from rich import print as rprint 

9from tqdm import tqdm 

10 

11from jaypore_ci import clean 

12from jaypore_ci.interfaces import Executor, TriggerFailed, JobStatus 

13from jaypore_ci.logging import logger 

14 

15 

16class Docker(Executor): 

17 """ 

18 Run jobs via docker. To communicate with docker we use the `Python docker 

19 sdk <https://docker-py.readthedocs.io/en/stable/client.html>`_. 

20 

21 Using this executor will: 

22 - Create a separate network for each run 

23 - Run jobs as part of the network 

24 - Clean up all jobs when the pipeline exits. 

25 """ 

26 

27 def __init__(self): 

28 super().__init__() 

29 self.pipe_id = None 

30 self.pipeline = None 

31 self.docker = docker.from_env() 

32 self.client = docker.APIClient() 

33 self.__execution_order__ = [] 

34 

35 def logging(self): 

36 """ 

37 Returns a logging instance that has executor specific 

38 information bound to it. 

39 """ 

40 return logger.bind(pipe_id=self.pipe_id, network_name=self.get_net()) 

41 

42 def set_pipeline(self, pipeline): 

43 """ 

44 Set executor's pipeline to the given one. 

45 

46 This will clean up old networks and create new ones. 

47 """ 

48 if self.pipe_id is not None: 48 ↛ 49line 48 didn't jump to line 49, because the condition on line 48 was never true

49 self.delete_network() 

50 self.delete_all_jobs() 

51 self.pipe_id = pipeline.pipe_id 

52 self.pipeline = pipeline 

53 self.create_network() 

54 

55 def teardown(self): 

56 self.delete_network() 

57 self.delete_all_jobs() 

58 

59 def setup(self): 

60 self.delete_old_containers() 

61 

62 def delete_old_containers(self): 

63 a_week_back = pendulum.now().subtract(days=7) 

64 pipe_ids_removed = set() 

65 for container in tqdm( 

66 self.docker.containers.list(filters={"status": "exited"}), 

67 desc="Removing jobs older than a week", 

68 ): 

69 if "jayporeci_" not in container.name: 69 ↛ 70line 69 didn't jump to line 70, because the condition on line 69 was never true

70 continue 

71 if "__job__" in container.name: 71 ↛ 75line 71 didn't jump to line 75, because the condition on line 71 was never false

72 pipe_ids_removed.add( 

73 container.name.split("__job__")[1].split("__", 1)[0] 

74 ) 

75 finished_at = pendulum.parse(container.attrs["State"]["FinishedAt"]) 

76 if finished_at <= a_week_back: 76 ↛ 77line 76 didn't jump to line 77, because the condition on line 76 was never true

77 container.remove(v=True) 

78 for network in tqdm( 

79 self.docker.networks.list( 

80 names=[self.get_net(pipe_id=pipe_id) for pipe_id in pipe_ids_removed] 

81 ), 

82 desc="Removing related networks", 

83 ): 

84 network.remove() 

85 

86 def get_net(self, *, pipe_id=None): 

87 """ 

88 Return a network name based on what the curent pipeline is. 

89 """ 

90 pipe_id = pipe_id if pipe_id is not None else self.pipe_id 

91 return f"jayporeci__net__{pipe_id}" if pipe_id is not None else None 

92 

93 def create_network(self): 

94 """ 

95 Will create a docker network. 

96 

97 If it fails to do so in 3 attempts it will raise an 

98 exception and fail. 

99 """ 

100 assert self.pipe_id is not None, "Cannot create network if pipe is not set" 

101 for _ in range(3): 101 ↛ 111line 101 didn't jump to line 111, because the loop on line 101 didn't complete

102 if len(self.docker.networks.list(names=[self.get_net()])) != 0: 

103 self.logging().info("Found network", network_name=self.get_net()) 

104 return 

105 self.logging().info( 

106 "Create network", 

107 subprocess=self.docker.networks.create( 

108 name=self.get_net(), driver="bridge" 

109 ), 

110 ) 

111 raise TriggerFailed("Cannot create network") 

112 

113 def delete_all_jobs(self): 

114 """ 

115 Deletes all jobs associated with the pipeline for this 

116 executor. 

117 

118 It will stop any jobs that are still running. 

119 """ 

120 assert self.pipe_id is not None, "Cannot delete jobs if pipe is not set" 

121 job = None 

122 for job in self.pipeline.jobs.values(): 

123 if job.run_id is not None and not job.run_id.startswith("pyrun_"): 123 ↛ 122line 123 didn't jump to line 122, because the condition on line 123 was never false

124 container = self.docker.containers.get(job.run_id) 

125 container.stop(timeout=1) 

126 self.logging().info("Stop job:", run_id=job.run_id) 

127 job.check_job(with_update_report=False) 

128 if job is not None: 

129 job.check_job() 

130 self.logging().info("All jobs stopped") 

131 

132 def delete_network(self): 

133 """ 

134 Delete the network for this executor. 

135 """ 

136 assert self.pipe_id is not None, "Cannot delete network if pipe is not set" 

137 try: 

138 net = self.docker.networks.get(self.get_net()) 

139 net.remove() 

140 except docker.errors.NotFound: 

141 self.logging().error("Delete network: Not found", netid=self.get_net()) 

142 

143 def get_job_name(self, job, tail=False): 

144 """ 

145 Generates a clean job name slug. 

146 """ 

147 name = clean.name(job.name) 

148 if tail: 

149 return name 

150 return f"jayporeci__job__{self.pipe_id}__{name}" 

151 

152 def run(self, job: "Job") -> str: 

153 """ 

154 Run the given job and return a docker container ID. 

155 In case something goes wrong it will raise TriggerFailed 

156 """ 

157 assert self.pipe_id is not None, "Cannot run job if pipe id is not set" 

158 ex_kwargs = deepcopy(job.executor_kwargs) 

159 env = job.get_env() 

160 env.update(ex_kwargs.pop("environment", {})) 

161 trigger = { 

162 "detach": True, 

163 "environment": env, 

164 "volumes": list( 

165 set( 

166 [ 

167 "/var/run/docker.sock:/var/run/docker.sock", 

168 "/usr/bin/docker:/usr/bin/docker:ro", 

169 "/tmp/jayporeci__cidfiles:/jaypore_ci/cidfiles:ro", 

170 f"/tmp/jayporeci__src__{self.pipeline.remote.sha}:/jaypore_ci/run", 

171 ] 

172 + (ex_kwargs.pop("volumes", [])) 

173 ) 

174 ), 

175 "name": self.get_job_name(job), 

176 "network": self.get_net(), 

177 "image": job.image, 

178 "command": job.command if not job.is_service else None, 

179 } 

180 for key, value in ex_kwargs.items(): 180 ↛ 181line 180 didn't jump to line 181, because the loop on line 180 never started

181 if key in trigger: 

182 self.logging().warning( 

183 f"Overwriting existing value of `{key}` for job trigger.", 

184 old_value=trigger[key], 

185 new_value=value, 

186 ) 

187 trigger[key] = value 

188 if not job.is_service: 188 ↛ 190line 188 didn't jump to line 190, because the condition on line 188 was never false

189 trigger["working_dir"] = "/jaypore_ci/run" 

190 if not job.is_service: 190 ↛ 192line 190 didn't jump to line 192, because the condition on line 190 was never false

191 assert job.command 

192 rprint(trigger) 

193 try: 

194 container = self.docker.containers.run(**trigger) 

195 self.__execution_order__.append( 

196 (self.get_job_name(job, tail=True), container.id, "Run") 

197 ) 

198 return container.id 

199 except docker.errors.APIError as e: 

200 self.logging().exception(e) 

201 raise TriggerFailed(e) from e 

202 

203 def get_status(self, run_id: str) -> JobStatus: 

204 """ 

205 Given a run_id, it will get the status for that run. 

206 """ 

207 inspect = self.client.inspect_container(run_id) 

208 status = JobStatus( 

209 is_running=inspect["State"]["Running"], 

210 exit_code=int(inspect["State"]["ExitCode"]), 

211 logs="", 

212 started_at=pendulum.parse(inspect["State"]["StartedAt"]), 

213 finished_at=pendulum.parse(inspect["State"]["FinishedAt"]) 

214 if inspect["State"]["FinishedAt"] != "0001-01-01T00:00:00Z" 

215 else None, 

216 ) 

217 # --- logs 

218 self.logging().debug("Check status", status=status) 

219 logs = self.docker.containers.get(run_id).logs().decode() 

220 return status._replace(logs=logs) 

221 

222 def get_execution_order(self): 

223 return {name: i for i, (name, *_) in enumerate(self.__execution_order__)}