-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathoperator.py
More file actions
448 lines (386 loc) · 19.7 KB
/
operator.py
File metadata and controls
448 lines (386 loc) · 19.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
import os
import kopf
import kubernetes
import base64
import github
from datetime import datetime, timezone
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend
# Initialize the Kubernetes API client
kubernetes.config.load_incluster_config()
core_v1_api = kubernetes.client.CoreV1Api()
class GitHubKeyManager:
def __init__(self, logger):
self.logger = logger
self.github_token = self._get_github_token()
self.github_client = github.Github(self.github_token)
def _get_github_token(self):
"""Retrieve GitHub token from secret."""
current_namespace = "operators" # Set default namespace
try:
self.logger.info("Running in-cluster, attempting to determine current namespace...")
try:
namespace_file = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
self.logger.debug(f"Reading namespace from {namespace_file}")
with open(namespace_file, "r") as f:
ns = f.read().strip()
if ns: # Only use the namespace if we got a non-empty value
current_namespace = ns
self.logger.info(f"Successfully determined current namespace: {current_namespace}")
else:
self.logger.warning("Empty namespace found in service account token, using default 'operators'")
except (FileNotFoundError, PermissionError) as e:
self.logger.warning(
f"Could not read namespace from service account token ({str(e)}). "
"Falling back to default namespace 'operators'"
)
self.logger.info(f"Attempting to read 'github-token' secret from namespace '{current_namespace}'")
try:
secret = core_v1_api.read_namespaced_secret(
name='github-token',
namespace=current_namespace
)
except kubernetes.client.exceptions.ApiException as e:
if e.status == 404:
self.logger.error(
f"Secret 'github-token' not found in namespace '{current_namespace}'. "
"To fix this:\n"
"1. Create a GitHub personal access token\n"
"2. Create a Kubernetes secret:\n"
" kubectl create secret generic github-token \\\n"
" --from-literal=GITHUB_TOKEN=your_token_here \\\n"
f" -n {current_namespace}"
)
else:
self.logger.error(f"API error while reading secret: {e}")
raise kopf.PermanentError(f"Failed to get GitHub token: {e}")
try:
token = base64.b64decode(secret.data['GITHUB_TOKEN']).decode()
self.logger.info(f"Successfully retrieved GitHub token (starts with: {token[:4]}...)")
return token
except KeyError:
self.logger.error(
"Secret 'github-token' exists but does not contain GITHUB_TOKEN key. "
"Please ensure the secret is created with the correct key:\n"
"kubectl create secret generic github-token \\\n"
" --from-literal=GITHUB_TOKEN=your_token_here \\\n"
f" -n {current_namespace}"
)
raise kopf.PermanentError("Secret exists but GITHUB_TOKEN key is missing")
except Exception as e:
self.logger.error(f"Error decoding GitHub token: {e}")
raise kopf.PermanentError(f"Failed to decode GitHub token: {e}")
except Exception as e:
self.logger.error(
"Unexpected error in _get_github_token. "
f"Error: {str(e)}\n"
"For troubleshooting:\n"
"1. Check pod logs for more details\n"
"2. Verify RBAC permissions allow reading secrets\n"
"3. Confirm the github-token secret exists and is properly formatted\n"
f"4. Verify the operator has access to namespace '{current_namespace}'"
)
raise kopf.PermanentError(f"Unexpected error getting GitHub token: {e}")
def get_repository(self, repo_name):
"""Get GitHub repository instance."""
try:
repo = self.github_client.get_repo(repo_name)
self.logger.info(f"Got repository {repo_name}")
return repo
except github.GithubException as e:
raise kopf.PermanentError(f"Failed to get repository: {e}")
def generate_ssh_key(self):
"""Generate SSH key pair."""
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=4096,
backend=default_backend()
)
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
public_key = private_key.public_key().public_bytes(
encoding=serialization.Encoding.OpenSSH,
format=serialization.PublicFormat.OpenSSH
)
return private_pem.decode(), public_key.decode()
def verify_key_exists(self, repo, key_id):
"""Verify GitHub deploy key exists."""
try:
repo.get_key(key_id)
self.logger.info(f"Verified deploy key {key_id} exists in GitHub")
return True
except github.GithubException as e:
self.logger.error(f"Failed to verify deploy key {key_id}: {e}")
return False
def delete_key_by_id(self, repo, key_id):
"""Delete a specific GitHub deploy key by ID."""
try:
key = repo.get_key(key_id)
key.delete()
self.logger.info(f"Successfully deleted deploy key {key_id}")
return True
except github.GithubException as e:
if e.status == 404:
self.logger.info(f"Deploy key {key_id} was already deleted")
return True
self.logger.error(f"Error deleting deploy key {key_id}: {e}")
return False
def delete_keys_by_title(self, repo, title):
"""Delete all GitHub deploy keys with a specific title (including operator-managed prefix)."""
keys = list(repo.get_keys())
self.logger.info(f"Found {len(keys)} existing deploy keys")
managed_title = f"k8s-operator:{title}"
keys_deleted = 0
for key in keys:
# Match both the base title and the operator-managed title
if key.title == title or key.title == managed_title:
self.logger.info(f"Found deploy key with title '{key.title}' (id: {key.id}), deleting it")
if self.delete_key_by_id(repo, key.id):
keys_deleted += 1
return keys_deleted
def create_key(self, repo, title, key):
"""Create a new GitHub deploy key."""
try:
managed_title = f"k8s-operator:{title}"
return repo.create_key(managed_title, key, read_only=True)
except github.GithubException as e:
self.logger.error(f"Error creating key: {str(e)}")
raise
def is_operator_managed_key(self, key_title):
"""Check if a key was created by this operator"""
return key_title.startswith("k8s-operator:")
def get_key_base_title(self, key_title):
"""Get the original title without the operator prefix"""
if self.is_operator_managed_key(key_title):
return key_title.split(":", 1)[1]
return key_title
class KubernetesSecretManager:
def __init__(self, logger):
self.logger = logger
def _is_owned_by(self, secret, owner_uid):
"""Check if secret is owned by the given owner UID."""
if not secret.metadata.owner_references:
return False
return any(ref.uid == owner_uid for ref in secret.metadata.owner_references)
def create_or_update_secret(self, name, namespace, private_key, public_key, owner_reference):
"""Create or update Kubernetes secret with SSH keys.
Returns:
- 'created': new secret was created
- 'updated': existing secret was updated (owned by us)
- 'conflict': secret exists but owned by someone else (no action taken)
"""
# Add github.com to known_hosts
known_hosts = "github.com ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBEmKSENjQEezOmxkZMy7opKgwFB9nkt5YRrYMjNuG5N87uRgg6CLrbo5wAdT/y6v0mKV0U2w0WZ2YB/++Tpockg="
secret_data = {
'identity': private_key,
'identity.pub': public_key,
'known_hosts': known_hosts
}
encoded_data = {k: base64.b64encode(v.encode()).decode() for k, v in secret_data.items()}
try:
# Check if secret exists
secret = core_v1_api.read_namespaced_secret(name=name, namespace=namespace)
# Check ownership
if not self._is_owned_by(secret, owner_reference.uid):
self.logger.warning(
f"Secret {name} exists but is not owned by this GitHubDeployKey. "
f"Refusing to overwrite. Delete the existing secret manually if you want "
f"the operator to manage it."
)
return 'conflict'
# Owned by us - safe to update
secret.data = encoded_data
core_v1_api.replace_namespaced_secret(
name=name,
namespace=namespace,
body=secret
)
self.logger.info(f"Updated existing secret {name}")
return 'updated'
except kubernetes.client.exceptions.ApiException as e:
if e.status != 404:
raise
# Secret doesn't exist - create it
core_v1_api.create_namespaced_secret(
namespace=namespace,
body=kubernetes.client.V1Secret(
metadata=kubernetes.client.V1ObjectMeta(
name=name,
owner_references=[owner_reference]
),
type='Opaque',
data=encoded_data
)
)
self.logger.info(f"Created new secret {name}")
return 'created'
def delete_secret_if_exists(self, name, namespace):
"""Delete a Kubernetes secret if it exists."""
try:
core_v1_api.delete_namespaced_secret(name=name, namespace=namespace)
self.logger.info(f"Deleted existing secret {name}")
return True
except kubernetes.client.exceptions.ApiException as e:
if e.status != 404:
raise
return False
@kopf.on.create('github.com', 'v1alpha1', 'githubdeploykeys')
def create_deploy_key(spec, status, logger, patch, force=False, **kwargs):
"""Create a new deploy key. Called by kopf on CR creation and by reconcile when key is missing.
Args:
force: If True, proceed even if CR already has keyId (used by reconcile when key is missing)
"""
# If called from on.create but CR already has a keyId, skip - let reconcile handle it
# This prevents duplicate key creation when operator restarts and sees existing CRs
# When called from reconcile (with force=True), always proceed
if status and status.get('keyId') and not force:
logger.info(f"CR already has keyId {status['keyId']}, skipping on.create - reconcile will handle it")
return
github_manager = GitHubKeyManager(logger)
secret_manager = KubernetesSecretManager(logger)
try:
# Get repository
repo = github_manager.get_repository(spec['repository'])
# Handle existing keys
title = spec.get('title', 'Kubernetes-managed deploy key')
github_manager.delete_keys_by_title(repo, title)
# Generate and create new key
private_key, public_key = github_manager.generate_ssh_key()
key = github_manager.create_key(repo, title, public_key)
logger.info(f"Created new deploy key: {key.id}")
if not github_manager.verify_key_exists(repo, key.id):
raise kopf.PermanentError("Failed to verify deploy key")
# Create secret BEFORE updating status (so status only reflects successful state)
secret_name = f"{kwargs['meta']['name']}-private-key"
owner_reference = kubernetes.client.V1OwnerReference(
api_version=kwargs['body']['apiVersion'],
kind=kwargs['body']['kind'],
name=kwargs['body']['metadata']['name'],
uid=kwargs['body']['metadata']['uid']
)
result = secret_manager.create_or_update_secret(
secret_name,
kwargs['meta']['namespace'],
private_key,
public_key,
owner_reference
)
if result == 'conflict':
# Secret exists but not owned by us - clean up the GitHub key we just created
logger.warning(f"Secret {secret_name} owned by another resource. Cleaning up GitHub key.")
github_manager.delete_key_by_id(repo, key.id)
raise kopf.PermanentError(
f"Secret {secret_name} already exists and is not owned by this GitHubDeployKey. "
f"Delete the existing secret manually to allow the operator to manage it."
)
# Only update status after everything succeeded
patch['status'] = {'keyId': key.id}
logger.info(f"Successfully created deploy key {key.id} and secret {secret_name}")
except Exception as e:
logger.error(f"Error creating deploy key: {str(e)}")
# Clean up if key was created
try:
if 'key' in locals():
key.delete()
logger.info(f"Cleaned up deploy key {key.id} after error")
except Exception as cleanup_error:
logger.error(f"Error during cleanup: {str(cleanup_error)}")
raise kopf.PermanentError(str(e))
@kopf.on.update('github.com', 'v1alpha1', 'githubdeploykeys')
def update_deploy_key(spec, status, logger, patch, old, **kwargs):
if (old['spec'].get('title', 'Kubernetes-managed deploy key') == spec.get('title', 'Kubernetes-managed deploy key') and
old['spec'].get('readOnly', True) == spec.get('readOnly', True)):
logger.info("No relevant changes detected, skipping update")
return
logger.info("Detected changes in title or readOnly, recreating deploy key")
create_deploy_key(spec, status, logger, patch, force=True, **kwargs)
@kopf.on.delete('github.com', 'v1alpha1', 'githubdeploykeys')
def delete_deploy_key(spec, meta, status, logger, **kwargs):
github_manager = GitHubKeyManager(logger)
try:
repo = github_manager.get_repository(spec['repository'])
# Delete by key ID if available
key_id = status.get('keyId') if status else None
if key_id:
logger.info(f"Found key ID in status: {key_id}")
if not github_manager.delete_key_by_id(repo, key_id):
raise kopf.PermanentError(f"Failed to delete deploy key {key_id}")
else:
# Delete by title if no key ID
logger.info("No key ID in status, trying to find key by title")
title = spec.get('title', 'Kubernetes-managed deploy key')
keys_deleted = github_manager.delete_keys_by_title(repo, title)
logger.info(f"Deleted {keys_deleted} deploy key(s) with title '{title}'")
except github.GithubException as e:
if e.status != 404: # Ignore if repo not found
raise kopf.PermanentError(f"Failed to delete deploy key: {e}")
logger.info(f"Secret {meta['name']}-private-key will be deleted by garbage collection")
@kopf.timer('github.com', 'v1alpha1', 'githubdeploykeys', interval=60.0, initial_delay=60.0)
def reconcile_deploy_key(spec, status, logger, patch, **kwargs):
"""Periodically reconcile the deploy key to ensure it exists."""
github_manager = GitHubKeyManager(logger)
try:
repo = github_manager.get_repository(spec['repository'])
key_id = status.get('keyId') if status else None
base_title = spec.get('title', 'Kubernetes-managed deploy key')
managed_title = f"k8s-operator:{base_title}"
# Note: We no longer delete "stale" keys here. This caused a race condition where
# a newly created key (not yet in status) would be deleted as stale.
# Key cleanup is handled by create_deploy_key via delete_keys_by_title.
if not key_id:
# Check if CR was recently created - if so, let on.create handle it
# This prevents a race condition where both on.create and reconcile timer
# fire simultaneously on new CRs before status is patched
creation_time_str = kwargs['body']['metadata']['creationTimestamp']
creation_time = datetime.fromisoformat(creation_time_str.replace('Z', '+00:00'))
age_seconds = (datetime.now(timezone.utc) - creation_time).total_seconds()
if age_seconds < 90:
logger.info(f"CR created {age_seconds:.0f}s ago with no keyId - letting on.create handle it")
return
logger.info("No key ID in status, recreating deploy key")
create_deploy_key(spec, status, logger, patch, force=True, **kwargs)
return
# Check if our key still exists
try:
key = repo.get_key(key_id)
if key.title != managed_title:
logger.info(f"Deploy key {key_id} exists but title has changed, recreating")
# Delete old key before creating new one
github_manager.delete_key_by_id(repo, key_id)
create_deploy_key(spec, status, logger, patch, force=True, **kwargs)
else:
logger.info(f"Deploy key {key_id} exists and is correctly configured")
except github.GithubException as e:
if e.status == 404:
logger.info(f"Deploy key {key_id} no longer exists, recreating")
create_deploy_key(spec, status, logger, patch, force=True, **kwargs)
else:
logger.error(f"Error checking deploy key {key_id}: {e}")
# Verify secret exists and is owned by us
secret_name = f"{kwargs['meta']['name']}-private-key"
secret_manager = KubernetesSecretManager(logger)
try:
secret = core_v1_api.read_namespaced_secret(
name=secret_name,
namespace=kwargs['meta']['namespace']
)
# Check ownership
if not secret_manager._is_owned_by(secret, kwargs['body']['metadata']['uid']):
logger.warning(
f"Secret {secret_name} exists but is not owned by this GitHubDeployKey. "
f"Delete the existing secret manually to allow the operator to manage it."
)
else:
logger.info(f"Secret {secret_name} exists and is correctly owned")
except kubernetes.client.exceptions.ApiException as e:
if e.status == 404:
logger.info(f"Secret {secret_name} is missing, recreating deploy key")
create_deploy_key(spec, status, logger, patch, force=True, **kwargs)
else:
logger.error(f"Error checking secret {secret_name}: {e}")
except Exception as e:
logger.error(f"Error during reconciliation: {str(e)}")