From 0fc9070e0bfa9e950005620119f0be75ac322df9 Mon Sep 17 00:00:00 2001 From: rnetser Date: Fri, 13 Feb 2026 16:10:05 +0200 Subject: [PATCH 1/7] feat: Add privileged_client support to VirtualMachineInstance Introduce explicit privileged_client parameter across VirtualMachineInstance methods to support proper RBAC-aware client usage. New public methods replacing deprecated properties: - get_virt_launcher_pod, get_virt_handler_pod, get_node, get_xml_dict Modified methods with privileged_client parameter: - pause, unpause, reset, get_xml, execute_virsh_command - get_dommemstat, get_domstate, wait_until_running, api_request New private helpers: - _get_pod_user_uid, _is_pod_root - _get_hypervisor_connection_uri, _build_virsh_cmd - _resolve_privileged_client, _get_subresource_api_url Warning strategy: - FutureWarning when privileged_client is omitted - DeprecationWarning on old property-based access --- ocp_resources/virtual_machine_instance.py | 413 ++++++++++++++++++---- 1 file changed, 349 insertions(+), 64 deletions(-) diff --git a/ocp_resources/virtual_machine_instance.py b/ocp_resources/virtual_machine_instance.py index 0aa172c292..79880b35d1 100644 --- a/ocp_resources/virtual_machine_instance.py +++ b/ocp_resources/virtual_machine_instance.py @@ -1,11 +1,13 @@ # Generated using https://github.com/RedHatQE/openshift-python-wrapper/blob/main/scripts/resource/README.md +import json import shlex from typing import Any from warnings import warn import xmltodict +from kubernetes.dynamic import DynamicClient from kubernetes.dynamic.exceptions import ResourceNotFoundError from timeout_sampler import TimeoutExpiredError, TimeoutSampler @@ -13,7 +15,14 @@ from ocp_resources.node import Node from ocp_resources.pod import Pod from ocp_resources.resource import NamespacedResource -from ocp_resources.utils.constants import PROTOCOL_ERROR_EXCEPTION_DICT, TIMEOUT_4MINUTES, TIMEOUT_5SEC, TIMEOUT_30SEC +from ocp_resources.utils.constants import ( + PROTOCOL_ERROR_EXCEPTION_DICT, + TIMEOUT_1SEC, + TIMEOUT_4MINUTES, + TIMEOUT_5SEC, + TIMEOUT_10SEC, + TIMEOUT_30SEC, +) class VirtualMachineInstance(NamespacedResource): @@ -171,7 +180,6 @@ def __init__( self.volumes = volumes def to_dict(self) -> None: - super().to_dict() if not self.kind_dict and not self.yaml_file: @@ -245,48 +253,139 @@ def to_dict(self) -> None: # End of generated code - @property - def _subresource_api_url(self): + @staticmethod + def _resolve_privileged_client( + privileged_client: DynamicClient | None, + fallback_client: DynamicClient, + method_name: str, + ) -> DynamicClient: + """Resolve privileged_client with FutureWarning if not provided. + + Args: + privileged_client: Client with elevated privileges, or None. + fallback_client: Client to use if privileged_client is None. + method_name: Name of the calling method (for the warning message). + + Returns: + The resolved client (privileged_client if given, else fallback_client). + """ + if privileged_client is None: + warn( + f"'{method_name}': 'privileged_client' will become a required argument in the next major future version. " + "Pass an admin/privileged client explicitly.", + category=FutureWarning, + stacklevel=3, + ) + return fallback_client + return privileged_client + + def _get_subresource_api_url(self, client: DynamicClient | None = None) -> str: + _client = client or self.client return ( - f"{self.client.configuration.host}/" + f"{_client.configuration.host}/" f"apis/subresources.kubevirt.io/{self.api.api_version}/" f"namespaces/{self.namespace}/virtualmachineinstances/{self.name}" ) + @property + def _subresource_api_url(self) -> str: + return self._get_subresource_api_url() + def api_request( - self, method: str, action: str, url: str = "", retry_params: dict[str, int] | None = None, **params: Any + self, + method: str, + action: str, + url: str = "", + retry_params: dict[str, int] | None = None, + privileged_client: DynamicClient | None = None, + **params: Any, ) -> dict[str, Any]: default_vmi_api_request_retry_params: dict[str, int] = {"timeout": TIMEOUT_30SEC, "sleep_time": TIMEOUT_5SEC} + _retry_params = retry_params or default_vmi_api_request_retry_params + + if privileged_client is not None: + _url = f"{url or self._get_subresource_api_url(client=privileged_client)}/{action}" + api_request_params: dict[str, Any] = { + "url": _url, + "method": method, + "headers": privileged_client.client.configuration.api_key, + } + response = self.retry_cluster_exceptions( + func=privileged_client.client.request, + timeout=_retry_params.get("timeout", TIMEOUT_10SEC), + sleep_time=_retry_params.get("sleep_time", TIMEOUT_1SEC), + **api_request_params, + **params, + ) + try: + return json.loads(response.data) + except json.decoder.JSONDecodeError: + return response.data + return super().api_request( method=method, action=action, url=url or self._subresource_api_url, - retry_params=retry_params or default_vmi_api_request_retry_params, + retry_params=_retry_params, **params, ) - def pause(self, timeout=TIMEOUT_4MINUTES, wait=False): - self.api_request(method="PUT", action="pause") + def pause( + self, timeout: int = TIMEOUT_4MINUTES, wait: bool = False, privileged_client: DynamicClient | None = None + ) -> None: + _client = self._resolve_privileged_client( + privileged_client=privileged_client, + fallback_client=self.client, + method_name="pause", + ) + self.api_request(method="PUT", action="pause", privileged_client=_client) if wait: - return self.wait_for_pause_status(pause=True, timeout=timeout) + self.wait_for_pause_status(pause=True, timeout=timeout) - def unpause(self, timeout=TIMEOUT_4MINUTES, wait=False): - self.api_request(method="PUT", action="unpause") + def unpause( + self, timeout: int = TIMEOUT_4MINUTES, wait: bool = False, privileged_client: DynamicClient | None = None + ) -> None: + _client = self._resolve_privileged_client( + privileged_client=privileged_client, + fallback_client=self.client, + method_name="unpause", + ) + self.api_request(method="PUT", action="unpause", privileged_client=_client) if wait: - return self.wait_for_pause_status(pause=False, timeout=timeout) + self.wait_for_pause_status(pause=False, timeout=timeout) - def reset(self) -> dict[str, Any]: - return self.api_request(method="PUT", action="reset") + def reset(self, privileged_client: DynamicClient | None = None) -> dict[str, Any]: + _client = self._resolve_privileged_client( + privileged_client=privileged_client, + fallback_client=self.client, + method_name="reset", + ) + return self.api_request(method="PUT", action="reset", privileged_client=_client) @property def interfaces(self): return self.instance.status.interfaces - @property - def virt_launcher_pod(self): + def get_virt_launcher_pod(self, privileged_client: DynamicClient | None = None) -> Pod: + """Get the virt-launcher pod for this VMI. + + Args: + privileged_client: Client with elevated privileges for pod listing. + + Returns: + Pod: The virt-launcher pod. + + Raises: + ResourceNotFoundError: If no virt-launcher pod is found. + """ + _client = self._resolve_privileged_client( + privileged_client=privileged_client, + fallback_client=self.client, + method_name="get_virt_launcher_pod", + ) pods = list( Pod.get( - client=self.client, + client=_client, namespace=self.namespace, label_selector=f"kubevirt.io=virt-launcher,kubevirt.io/created-by={self.instance.metadata.uid}", ) @@ -296,19 +395,41 @@ def virt_launcher_pod(self): migration_state = self.instance.status.migrationState if migration_state: - # After VM migration there are two pods, one in Completed status and one in Running status. - # We need to return the Pod that is not in Completed status. for pod in pods: if migration_state.targetPod == pod.name: return pod - else: - return pods[0] + + return pods[0] @property - def virt_handler_pod(self): + def virt_launcher_pod(self) -> Pod: + warn( + "'virt_launcher_pod' property is deprecated, use 'get_virt_launcher_pod(privileged_client=...)' instead.", + category=DeprecationWarning, + stacklevel=2, + ) + return self.get_virt_launcher_pod(privileged_client=self.client) + + def get_virt_handler_pod(self, privileged_client: DynamicClient | None = None) -> Pod: + """Get the virt-handler pod running on the same node as this VMI. + + Args: + privileged_client: Client with elevated privileges for pod listing. + + Returns: + Pod: The virt-handler pod. + + Raises: + ResourceNotFoundError: If no matching virt-handler pod is found. + """ + _client = self._resolve_privileged_client( + privileged_client=privileged_client, + fallback_client=self.client, + method_name="get_virt_handler_pod", + ) pods = list( Pod.get( - client=self.client, + client=_client, label_selector="kubevirt.io=virt-handler", ) ) @@ -318,7 +439,22 @@ def virt_handler_pod(self): raise ResourceNotFoundError - def wait_until_running(self, timeout=TIMEOUT_4MINUTES, logs=True, stop_status=None): + @property + def virt_handler_pod(self) -> Pod: + warn( + "'virt_handler_pod' property is deprecated, use 'get_virt_handler_pod(privileged_client=...)' instead.", + category=DeprecationWarning, + stacklevel=2, + ) + return self.get_virt_handler_pod(privileged_client=self.client) + + def wait_until_running( + self, + timeout: int = TIMEOUT_4MINUTES, + logs: bool = True, + stop_status: str | None = None, + privileged_client: DynamicClient | None = None, + ) -> None: """ Wait until VMI is running @@ -326,6 +462,7 @@ def wait_until_running(self, timeout=TIMEOUT_4MINUTES, logs=True, stop_status=No timeout (int): Time to wait for VMI. logs (bool): True to extract logs from the VMI pod and from the VMI. stop_status (str): Status which should stop the wait and failed. + privileged_client: Client with elevated privileges for error-path pod access. Raises: TimeoutExpiredError: If VMI failed to run. @@ -336,10 +473,10 @@ def wait_until_running(self, timeout=TIMEOUT_4MINUTES, logs=True, stop_status=No if not logs: raise try: - virt_pod = self.virt_launcher_pod + virt_pod = self.get_virt_launcher_pod(privileged_client=privileged_client or self.client) self.logger.error(f"Status of virt-launcher pod {virt_pod.name}: {virt_pod.status}") - self.logger.debug(f"{virt_pod.name} *****LOGS*****") - self.logger.debug(virt_pod.log(container="compute")) + self.logger.error(f"{virt_pod.name} *****LOGS*****") + self.logger.error(virt_pod.log(container="compute")) except ResourceNotFoundError as virt_pod_ex: self.logger.error(virt_pod_ex) raise sampler_ex from virt_pod_ex @@ -400,55 +537,159 @@ def wait_for_vmi_condition_pause_status(self, pause, timeout=TIMEOUT_4MINUTES): if not (pause and sample.get("reason")): return + def get_node(self, privileged_client: DynamicClient | None = None) -> Node: + """Get the node where this VMI is running. + + Args: + privileged_client: Client with elevated privileges for node access. + + Returns: + Node: The node running this VMI. + """ + _client = self._resolve_privileged_client( + privileged_client=privileged_client, + fallback_client=self.client, + method_name="get_node", + ) + return Node( + client=_client, + name=self.instance.status.nodeName, + ) + @property - def node(self): + def node(self) -> Node: """ Get the node name where the VM is running Returns: Node: Node """ - return Node( - client=self.client, - name=self.instance.status.nodeName, + warn( + "'node' property is deprecated, use 'get_node(privileged_client=...)' instead.", + category=DeprecationWarning, + stacklevel=2, + ) + return self.get_node(privileged_client=self.client) + + @staticmethod + def _get_pod_user_uid(pod: Pod) -> int: + """Get the runAsUser UID from a pod's security context. + + Args: + pod: The pod to inspect. + + Returns: + int: The runAsUser UID value. + """ + return pod.instance.spec.securityContext.runAsUser + + @staticmethod + def _is_pod_root(pod: Pod) -> bool: + """Check if a pod runs as root. + + Args: + pod: The pod to inspect. + + Returns: + bool: True if the pod runs as root (UID 0). + """ + return not bool(VirtualMachineInstance._get_pod_user_uid(pod=pod)) + + @staticmethod + def _get_hypervisor_connection_uri(pod: Pod) -> str: + """Get the hypervisor connection URI for a pod. + + Args: + pod: The virt-launcher pod. + + Returns: + str: The hypervisor connection URI string. + """ + if VirtualMachineInstance._is_pod_root(pod=pod): + return "" + + virtqemud_socket = "virtqemud" + socket = ( + virtqemud_socket + if virtqemud_socket in pod.execute(command=["ls", "/var/run/libvirt/"], container="compute") + else "libvirt" ) + return f"-c qemu+unix:///session?socket=/var/run/libvirt/{socket}-sock" + + def _build_virsh_cmd(self, action: str, hypervisor_uri: str) -> list[str]: + """Build a virsh command list. - def virsh_cmd(self, action): - return shlex.split( - f"virsh {self.virt_launcher_pod_hypervisor_connection_uri} {action} {self.namespace}_{self.name}" + Args: + action: The virsh action to perform (e.g., 'dumpxml', 'domstate'). + hypervisor_uri: The hypervisor connection URI. + + Returns: + list[str]: The command as a list of strings. + """ + return shlex.split(f"virsh {hypervisor_uri} {action} {self.namespace}_{self.name}") + + def virsh_cmd(self, action: str) -> list[str]: + warn( + "'virsh_cmd' method is deprecated, use 'execute_virsh_command(command=..., privileged_client=...)' instead.", + category=DeprecationWarning, + stacklevel=2, + ) + hypervisor_uri = self._get_hypervisor_connection_uri( + pod=self.get_virt_launcher_pod(privileged_client=self.client) ) + return self._build_virsh_cmd(action=action, hypervisor_uri=hypervisor_uri) - def get_xml(self): + def get_xml(self, privileged_client: DynamicClient | None = None) -> str: """ Get virtual machine instance XML + Args: + privileged_client: Client with elevated privileges. + Returns: xml_output(string): VMI XML in the multi-line string """ - return self.execute_virsh_command(command="dumpxml") + _client = self._resolve_privileged_client( + privileged_client=privileged_client, + fallback_client=self.client, + method_name="get_xml", + ) + return self.execute_virsh_command(command="dumpxml", privileged_client=_client) @property - def virt_launcher_pod_user_uid(self): + def virt_launcher_pod_user_uid(self) -> int: """ Get Virt Launcher Pod User UID value Returns: Int: Virt Launcher Pod UID value """ - return self.virt_launcher_pod.instance.spec.securityContext.runAsUser + warn( + "'virt_launcher_pod_user_uid' property is deprecated, " + "use 'VirtualMachineInstance._get_pod_user_uid(pod=...)' instead.", + category=DeprecationWarning, + stacklevel=2, + ) + return self._get_pod_user_uid(pod=self.get_virt_launcher_pod(privileged_client=self.client)) @property - def is_virt_launcher_pod_root(self): + def is_virt_launcher_pod_root(self) -> bool: """ Check if Virt Launcher Pod is Root Returns: Bool: True if Virt Launcher Pod is Root. """ - return not bool(self.virt_launcher_pod_user_uid) + warn( + "'is_virt_launcher_pod_root' property is deprecated, " + "use 'VirtualMachineInstance._is_pod_root(pod=...)' instead.", + category=DeprecationWarning, + stacklevel=2, + ) + return self._is_pod_root(pod=self.get_virt_launcher_pod(privileged_client=self.client)) @property - def virt_launcher_pod_hypervisor_connection_uri(self): + def virt_launcher_pod_hypervisor_connection_uri(self) -> str: """ Get Virt Launcher Pod Hypervisor Connection URI @@ -458,26 +699,24 @@ def virt_launcher_pod_hypervisor_connection_uri(self): Returns: String: Hypervisor Connection URI """ - if self.is_virt_launcher_pod_root: - hypervisor_connection_uri = "" - else: - virtqemud_socket = "virtqemud" - socket = ( - virtqemud_socket - if virtqemud_socket - in self.virt_launcher_pod.execute(command=["ls", "/var/run/libvirt/"], container="compute") - else "libvirt" - ) - hypervisor_connection_uri = f"-c qemu+unix:///session?socket=/var/run/libvirt/{socket}-sock" - return hypervisor_connection_uri + warn( + "'virt_launcher_pod_hypervisor_connection_uri' property is deprecated, " + "use 'VirtualMachineInstance._get_hypervisor_connection_uri(pod=...)' instead.", + category=DeprecationWarning, + stacklevel=2, + ) + return self._get_hypervisor_connection_uri(pod=self.get_virt_launcher_pod(privileged_client=self.client)) - def get_domstate(self): + def get_domstate(self, privileged_client: DynamicClient | None = None) -> str: """ Get virtual machine instance Status. Current workaround, as VM/VMI shows no status/phase == Paused yet. Bug: https://bugzilla.redhat.com/show_bug.cgi?id=1805178 + Args: + privileged_client: Client with elevated privileges. + Returns: String: VMI Status as string """ @@ -486,17 +725,27 @@ def get_domstate(self): category=DeprecationWarning, stacklevel=2, ) - return self.execute_virsh_command(command="domstate") + # Intentionally bypass _resolve_privileged_client: get_domstate is already deprecated + # (DeprecationWarning above), so we silently fall back to self.client to avoid double-warning. + return self.execute_virsh_command(command="domstate", privileged_client=privileged_client or self.client) - def get_dommemstat(self): + def get_dommemstat(self, privileged_client: DynamicClient | None = None) -> str: """ Get virtual machine domain memory stats link: https://libvirt.org/manpages/virsh.html#dommemstat + Args: + privileged_client: Client with elevated privileges. + Returns: String: VMI domain memory stats as string """ - return self.execute_virsh_command(command="dommemstat") + _client = self._resolve_privileged_client( + privileged_client=privileged_client, + fallback_client=self.client, + method_name="get_dommemstat", + ) + return self.execute_virsh_command(command="dommemstat", privileged_client=_client) def get_vmi_active_condition(self): """A VMI may have multiple conditions; the active one it the one with @@ -508,11 +757,31 @@ def get_vmi_active_condition(self): if condition["lastTransitionTime"] } + def get_xml_dict(self, privileged_client: DynamicClient | None = None) -> dict[str, Any]: + """Get virtual machine instance XML as dict. + + Args: + privileged_client: Client with elevated privileges. + + Returns: + dict: Parsed XML of the VMI. + """ + _client = self._resolve_privileged_client( + privileged_client=privileged_client, + fallback_client=self.client, + method_name="get_xml_dict", + ) + return xmltodict.parse(xml_input=self.get_xml(privileged_client=_client), process_namespaces=True) + @property - def xml_dict(self): + def xml_dict(self) -> dict[str, Any]: """Get virtual machine instance XML as dict""" - - return xmltodict.parse(xml_input=self.get_xml(), process_namespaces=True) + warn( + "'xml_dict' property is deprecated, use 'get_xml_dict(privileged_client=...)' instead.", + category=DeprecationWarning, + stacklevel=2, + ) + return self.get_xml_dict(privileged_client=self.client) @property def guest_os_info(self): @@ -537,8 +806,24 @@ def interface_ip(self, interface): iface_ip = [iface["ipAddress"] for iface in self.interfaces if iface["interfaceName"] == interface] return iface_ip[0] if iface_ip else None - def execute_virsh_command(self, command): - return self.virt_launcher_pod.execute( - command=self.virsh_cmd(action=command), + def execute_virsh_command(self, command: str, privileged_client: DynamicClient | None = None) -> str: + """Execute a virsh command in the virt-launcher pod. + + Args: + command: The virsh command to execute (e.g., 'dumpxml', 'domstate'). + privileged_client: Client with elevated privileges. + + Returns: + str: Command output. + """ + _client = self._resolve_privileged_client( + privileged_client=privileged_client, + fallback_client=self.client, + method_name="execute_virsh_command", + ) + pod = self.get_virt_launcher_pod(privileged_client=_client) + hypervisor_uri = self._get_hypervisor_connection_uri(pod=pod) + return pod.execute( + command=self._build_virsh_cmd(action=command, hypervisor_uri=hypervisor_uri), container="compute", ) From 2c54e7a7974633c571d85d3411f2600f3a5cb5af Mon Sep 17 00:00:00 2001 From: rnetser Date: Mon, 16 Feb 2026 09:16:18 +0200 Subject: [PATCH 2/7] fix: Address CodeRabbit review comments for privileged_client support - Pass original privileged_client (not resolved _client) to api_request in pause/unpause/reset - Handle unset runAsUser in _get_pod_user_uid (return int | None) and _is_pod_root (explicit uid == 0 check) - Use self.logger.exception() for error-path diagnostics in wait_until_running --- ocp_resources/virtual_machine_instance.py | 26 +++++++++++++---------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/ocp_resources/virtual_machine_instance.py b/ocp_resources/virtual_machine_instance.py index 79880b35d1..36c55ae5ab 100644 --- a/ocp_resources/virtual_machine_instance.py +++ b/ocp_resources/virtual_machine_instance.py @@ -333,34 +333,34 @@ def api_request( def pause( self, timeout: int = TIMEOUT_4MINUTES, wait: bool = False, privileged_client: DynamicClient | None = None ) -> None: - _client = self._resolve_privileged_client( + self._resolve_privileged_client( privileged_client=privileged_client, fallback_client=self.client, method_name="pause", ) - self.api_request(method="PUT", action="pause", privileged_client=_client) + self.api_request(method="PUT", action="pause", privileged_client=privileged_client) if wait: self.wait_for_pause_status(pause=True, timeout=timeout) def unpause( self, timeout: int = TIMEOUT_4MINUTES, wait: bool = False, privileged_client: DynamicClient | None = None ) -> None: - _client = self._resolve_privileged_client( + self._resolve_privileged_client( privileged_client=privileged_client, fallback_client=self.client, method_name="unpause", ) - self.api_request(method="PUT", action="unpause", privileged_client=_client) + self.api_request(method="PUT", action="unpause", privileged_client=privileged_client) if wait: self.wait_for_pause_status(pause=False, timeout=timeout) def reset(self, privileged_client: DynamicClient | None = None) -> dict[str, Any]: - _client = self._resolve_privileged_client( + self._resolve_privileged_client( privileged_client=privileged_client, fallback_client=self.client, method_name="reset", ) - return self.api_request(method="PUT", action="reset", privileged_client=_client) + return self.api_request(method="PUT", action="reset", privileged_client=privileged_client) @property def interfaces(self): @@ -572,16 +572,19 @@ def node(self) -> Node: return self.get_node(privileged_client=self.client) @staticmethod - def _get_pod_user_uid(pod: Pod) -> int: + def _get_pod_user_uid(pod: Pod) -> int | None: """Get the runAsUser UID from a pod's security context. Args: pod: The pod to inspect. Returns: - int: The runAsUser UID value. + int | None: The runAsUser UID value, or None if not set. """ - return pod.instance.spec.securityContext.runAsUser + security_context = pod.instance.spec.get("securityContext", {}) + if security_context: + return security_context.get("runAsUser") + return None @staticmethod def _is_pod_root(pod: Pod) -> bool: @@ -593,7 +596,8 @@ def _is_pod_root(pod: Pod) -> bool: Returns: bool: True if the pod runs as root (UID 0). """ - return not bool(VirtualMachineInstance._get_pod_user_uid(pod=pod)) + uid = VirtualMachineInstance._get_pod_user_uid(pod=pod) + return uid == 0 @staticmethod def _get_hypervisor_connection_uri(pod: Pod) -> str: @@ -657,7 +661,7 @@ def get_xml(self, privileged_client: DynamicClient | None = None) -> str: return self.execute_virsh_command(command="dumpxml", privileged_client=_client) @property - def virt_launcher_pod_user_uid(self) -> int: + def virt_launcher_pod_user_uid(self) -> int | None: """ Get Virt Launcher Pod User UID value From 8b82c7c52e52eb438202d9e20517795a923f0419 Mon Sep 17 00:00:00 2001 From: rnetser Date: Mon, 16 Feb 2026 12:43:59 +0200 Subject: [PATCH 3/7] refactor: Simplify privileged_client implementation - Remove _resolve_privileged_client helper and FutureWarning mechanism - Use simple 'privileged_client or self.client' fallback pattern - Deprecated properties emit DeprecationWarning only - Fix _is_pod_root to treat unset runAsUser as root (Kubernetes default) --- ocp_resources/resource.py | 11 +- ocp_resources/virtual_machine.py | 10 +- ocp_resources/virtual_machine_instance.py | 204 +++++----------------- 3 files changed, 58 insertions(+), 167 deletions(-) diff --git a/ocp_resources/resource.py b/ocp_resources/resource.py index 79c44a0c2e..bcc517d7e6 100644 --- a/ocp_resources/resource.py +++ b/ocp_resources/resource.py @@ -1313,7 +1313,13 @@ def wait_for_condition( return def api_request( - self, method: str, action: str, url: str, retry_params: dict[str, int] | None = None, **params: Any + self, + method: str, + action: str, + url: str, + retry_params: dict[str, int] | None = None, + client: DynamicClient | None = None, + **params: Any, ) -> dict[str, Any]: """ Handle API requests to resource. @@ -1323,12 +1329,13 @@ def api_request( action (str): Action to perform (stop/start/guestosinfo etc.). url (str): URL of resource. retry_params (dict): dict of timeout and sleep_time values for retrying the api request call + client (DynamicClient): Optional client to use for the request. Defaults to self.client. Returns: data(dict): response data """ - client: DynamicClient = self.client + client = client or self.client api_request_params = { "url": f"{url}/{action}", "method": method, diff --git a/ocp_resources/virtual_machine.py b/ocp_resources/virtual_machine.py index dce4bf96bf..eb224e6341 100644 --- a/ocp_resources/virtual_machine.py +++ b/ocp_resources/virtual_machine.py @@ -1,5 +1,6 @@ from typing import Any +from kubernetes.dynamic import DynamicClient from timeout_sampler import TimeoutSampler from ocp_resources.resource import NamespacedResource @@ -73,7 +74,13 @@ def _subresource_api_url(self): ) def api_request( - self, method: str, action: str, url: str = "", retry_params: dict[str, int] | None = None, **params: Any + self, + method: str, + action: str, + url: str = "", + retry_params: dict[str, int] | None = None, + client: DynamicClient | None = None, + **params: Any, ) -> dict[str, Any]: default_vm_api_request_retry_params: dict[str, int] = {"timeout": TIMEOUT_30SEC, "sleep_time": TIMEOUT_5SEC} return super().api_request( @@ -81,6 +88,7 @@ def api_request( action=action, url=url or self._subresource_api_url, retry_params=retry_params or default_vm_api_request_retry_params, + client=client, **params, ) diff --git a/ocp_resources/virtual_machine_instance.py b/ocp_resources/virtual_machine_instance.py index 36c55ae5ab..959e1723b1 100644 --- a/ocp_resources/virtual_machine_instance.py +++ b/ocp_resources/virtual_machine_instance.py @@ -1,7 +1,6 @@ # Generated using https://github.com/RedHatQE/openshift-python-wrapper/blob/main/scripts/resource/README.md -import json import shlex from typing import Any from warnings import warn @@ -17,10 +16,8 @@ from ocp_resources.resource import NamespacedResource from ocp_resources.utils.constants import ( PROTOCOL_ERROR_EXCEPTION_DICT, - TIMEOUT_1SEC, TIMEOUT_4MINUTES, TIMEOUT_5SEC, - TIMEOUT_10SEC, TIMEOUT_30SEC, ) @@ -180,6 +177,7 @@ def __init__( self.volumes = volumes def to_dict(self) -> None: + super().to_dict() if not self.kind_dict and not self.yaml_file: @@ -253,113 +251,49 @@ def to_dict(self) -> None: # End of generated code - @staticmethod - def _resolve_privileged_client( - privileged_client: DynamicClient | None, - fallback_client: DynamicClient, - method_name: str, - ) -> DynamicClient: - """Resolve privileged_client with FutureWarning if not provided. - - Args: - privileged_client: Client with elevated privileges, or None. - fallback_client: Client to use if privileged_client is None. - method_name: Name of the calling method (for the warning message). - - Returns: - The resolved client (privileged_client if given, else fallback_client). - """ - if privileged_client is None: - warn( - f"'{method_name}': 'privileged_client' will become a required argument in the next major future version. " - "Pass an admin/privileged client explicitly.", - category=FutureWarning, - stacklevel=3, - ) - return fallback_client - return privileged_client - - def _get_subresource_api_url(self, client: DynamicClient | None = None) -> str: - _client = client or self.client + @property + def _subresource_api_url(self): return ( - f"{_client.configuration.host}/" + f"{self.client.configuration.host}/" f"apis/subresources.kubevirt.io/{self.api.api_version}/" f"namespaces/{self.namespace}/virtualmachineinstances/{self.name}" ) - @property - def _subresource_api_url(self) -> str: - return self._get_subresource_api_url() - def api_request( self, method: str, action: str, url: str = "", retry_params: dict[str, int] | None = None, + client: DynamicClient | None = None, privileged_client: DynamicClient | None = None, **params: Any, ) -> dict[str, Any]: default_vmi_api_request_retry_params: dict[str, int] = {"timeout": TIMEOUT_30SEC, "sleep_time": TIMEOUT_5SEC} - _retry_params = retry_params or default_vmi_api_request_retry_params - - if privileged_client is not None: - _url = f"{url or self._get_subresource_api_url(client=privileged_client)}/{action}" - api_request_params: dict[str, Any] = { - "url": _url, - "method": method, - "headers": privileged_client.client.configuration.api_key, - } - response = self.retry_cluster_exceptions( - func=privileged_client.client.request, - timeout=_retry_params.get("timeout", TIMEOUT_10SEC), - sleep_time=_retry_params.get("sleep_time", TIMEOUT_1SEC), - **api_request_params, - **params, - ) - try: - return json.loads(response.data) - except json.decoder.JSONDecodeError: - return response.data - return super().api_request( method=method, action=action, url=url or self._subresource_api_url, - retry_params=_retry_params, + retry_params=retry_params or default_vmi_api_request_retry_params, + client=privileged_client or client, **params, ) def pause( self, timeout: int = TIMEOUT_4MINUTES, wait: bool = False, privileged_client: DynamicClient | None = None ) -> None: - self._resolve_privileged_client( - privileged_client=privileged_client, - fallback_client=self.client, - method_name="pause", - ) - self.api_request(method="PUT", action="pause", privileged_client=privileged_client) + self.api_request(method="PUT", action="pause", privileged_client=privileged_client or self.client) if wait: self.wait_for_pause_status(pause=True, timeout=timeout) def unpause( self, timeout: int = TIMEOUT_4MINUTES, wait: bool = False, privileged_client: DynamicClient | None = None ) -> None: - self._resolve_privileged_client( - privileged_client=privileged_client, - fallback_client=self.client, - method_name="unpause", - ) - self.api_request(method="PUT", action="unpause", privileged_client=privileged_client) + self.api_request(method="PUT", action="unpause", privileged_client=privileged_client or self.client) if wait: self.wait_for_pause_status(pause=False, timeout=timeout) def reset(self, privileged_client: DynamicClient | None = None) -> dict[str, Any]: - self._resolve_privileged_client( - privileged_client=privileged_client, - fallback_client=self.client, - method_name="reset", - ) return self.api_request(method="PUT", action="reset", privileged_client=privileged_client) @property @@ -378,14 +312,9 @@ def get_virt_launcher_pod(self, privileged_client: DynamicClient | None = None) Raises: ResourceNotFoundError: If no virt-launcher pod is found. """ - _client = self._resolve_privileged_client( - privileged_client=privileged_client, - fallback_client=self.client, - method_name="get_virt_launcher_pod", - ) pods = list( Pod.get( - client=_client, + client=privileged_client or self.client, namespace=self.namespace, label_selector=f"kubevirt.io=virt-launcher,kubevirt.io/created-by={self.instance.metadata.uid}", ) @@ -404,7 +333,7 @@ def get_virt_launcher_pod(self, privileged_client: DynamicClient | None = None) @property def virt_launcher_pod(self) -> Pod: warn( - "'virt_launcher_pod' property is deprecated, use 'get_virt_launcher_pod(privileged_client=...)' instead.", + "'virt_launcher_pod' property is deprecated, use 'get_virt_launcher_pod' instead.", category=DeprecationWarning, stacklevel=2, ) @@ -422,14 +351,9 @@ def get_virt_handler_pod(self, privileged_client: DynamicClient | None = None) - Raises: ResourceNotFoundError: If no matching virt-handler pod is found. """ - _client = self._resolve_privileged_client( - privileged_client=privileged_client, - fallback_client=self.client, - method_name="get_virt_handler_pod", - ) pods = list( Pod.get( - client=_client, + client=privileged_client or self.client, label_selector="kubevirt.io=virt-handler", ) ) @@ -442,7 +366,7 @@ def get_virt_handler_pod(self, privileged_client: DynamicClient | None = None) - @property def virt_handler_pod(self) -> Pod: warn( - "'virt_handler_pod' property is deprecated, use 'get_virt_handler_pod(privileged_client=...)' instead.", + "'virt_handler_pod' property is deprecated, use 'get_virt_handler_pod' instead.", category=DeprecationWarning, stacklevel=2, ) @@ -546,13 +470,8 @@ def get_node(self, privileged_client: DynamicClient | None = None) -> Node: Returns: Node: The node running this VMI. """ - _client = self._resolve_privileged_client( - privileged_client=privileged_client, - fallback_client=self.client, - method_name="get_node", - ) return Node( - client=_client, + client=privileged_client or self.client, name=self.instance.status.nodeName, ) @@ -565,14 +484,14 @@ def node(self) -> Node: Node: Node """ warn( - "'node' property is deprecated, use 'get_node(privileged_client=...)' instead.", + "'node' property is deprecated, use 'get_node' instead.", category=DeprecationWarning, stacklevel=2, ) return self.get_node(privileged_client=self.client) @staticmethod - def _get_pod_user_uid(pod: Pod) -> int | None: + def get_pod_user_uid(pod: Pod) -> int | None: """Get the runAsUser UID from a pod's security context. Args: @@ -587,20 +506,20 @@ def _get_pod_user_uid(pod: Pod) -> int | None: return None @staticmethod - def _is_pod_root(pod: Pod) -> bool: + def is_pod_root(pod: Pod) -> bool: """Check if a pod runs as root. Args: pod: The pod to inspect. Returns: - bool: True if the pod runs as root (UID 0). + bool: True if the pod runs as root (UID 0 or unset). """ - uid = VirtualMachineInstance._get_pod_user_uid(pod=pod) - return uid == 0 + uid = VirtualMachineInstance.get_pod_user_uid(pod=pod) + return uid is None or uid == 0 @staticmethod - def _get_hypervisor_connection_uri(pod: Pod) -> str: + def get_hypervisor_connection_uri(pod: Pod) -> str: """Get the hypervisor connection URI for a pod. Args: @@ -609,7 +528,7 @@ def _get_hypervisor_connection_uri(pod: Pod) -> str: Returns: str: The hypervisor connection URI string. """ - if VirtualMachineInstance._is_pod_root(pod=pod): + if VirtualMachineInstance.is_pod_root(pod=pod): return "" virtqemud_socket = "virtqemud" @@ -620,7 +539,7 @@ def _get_hypervisor_connection_uri(pod: Pod) -> str: ) return f"-c qemu+unix:///session?socket=/var/run/libvirt/{socket}-sock" - def _build_virsh_cmd(self, action: str, hypervisor_uri: str) -> list[str]: + def build_virsh_cmd(self, action: str, hypervisor_uri: str) -> list[str]: """Build a virsh command list. Args: @@ -634,14 +553,14 @@ def _build_virsh_cmd(self, action: str, hypervisor_uri: str) -> list[str]: def virsh_cmd(self, action: str) -> list[str]: warn( - "'virsh_cmd' method is deprecated, use 'execute_virsh_command(command=..., privileged_client=...)' instead.", + "'virsh_cmd' method is deprecated, use 'execute_virsh_command' instead.", category=DeprecationWarning, stacklevel=2, ) - hypervisor_uri = self._get_hypervisor_connection_uri( + hypervisor_uri = self.get_hypervisor_connection_uri( pod=self.get_virt_launcher_pod(privileged_client=self.client) ) - return self._build_virsh_cmd(action=action, hypervisor_uri=hypervisor_uri) + return self.build_virsh_cmd(action=action, hypervisor_uri=hypervisor_uri) def get_xml(self, privileged_client: DynamicClient | None = None) -> str: """ @@ -653,63 +572,35 @@ def get_xml(self, privileged_client: DynamicClient | None = None) -> str: Returns: xml_output(string): VMI XML in the multi-line string """ - _client = self._resolve_privileged_client( - privileged_client=privileged_client, - fallback_client=self.client, - method_name="get_xml", - ) - return self.execute_virsh_command(command="dumpxml", privileged_client=_client) + return self.execute_virsh_command(command="dumpxml", privileged_client=privileged_client or self.client) @property def virt_launcher_pod_user_uid(self) -> int | None: - """ - Get Virt Launcher Pod User UID value - - Returns: - Int: Virt Launcher Pod UID value - """ warn( - "'virt_launcher_pod_user_uid' property is deprecated, " - "use 'VirtualMachineInstance._get_pod_user_uid(pod=...)' instead.", + "'virt_launcher_pod_user_uid' property is deprecated, use 'get_pod_user_uid' instead.", category=DeprecationWarning, stacklevel=2, ) - return self._get_pod_user_uid(pod=self.get_virt_launcher_pod(privileged_client=self.client)) + return self.get_pod_user_uid(pod=self.get_virt_launcher_pod(privileged_client=self.client)) @property def is_virt_launcher_pod_root(self) -> bool: - """ - Check if Virt Launcher Pod is Root - - Returns: - Bool: True if Virt Launcher Pod is Root. - """ warn( - "'is_virt_launcher_pod_root' property is deprecated, " - "use 'VirtualMachineInstance._is_pod_root(pod=...)' instead.", + "'is_virt_launcher_pod_root' property is deprecated, use 'is_pod_root(pod=...)' instead.", category=DeprecationWarning, stacklevel=2, ) - return self._is_pod_root(pod=self.get_virt_launcher_pod(privileged_client=self.client)) + return self.is_pod_root(pod=self.get_virt_launcher_pod(privileged_client=self.client)) @property def virt_launcher_pod_hypervisor_connection_uri(self) -> str: - """ - Get Virt Launcher Pod Hypervisor Connection URI - - Required to connect to Hypervisor for - Non-Root Virt-Launcher Pod. - - Returns: - String: Hypervisor Connection URI - """ warn( "'virt_launcher_pod_hypervisor_connection_uri' property is deprecated, " - "use 'VirtualMachineInstance._get_hypervisor_connection_uri(pod=...)' instead.", + "use 'get_hypervisor_connection_uri(pod=...)' instead.", category=DeprecationWarning, stacklevel=2, ) - return self._get_hypervisor_connection_uri(pod=self.get_virt_launcher_pod(privileged_client=self.client)) + return self.get_hypervisor_connection_uri(pod=self.get_virt_launcher_pod(privileged_client=self.client)) def get_domstate(self, privileged_client: DynamicClient | None = None) -> str: """ @@ -729,8 +620,6 @@ def get_domstate(self, privileged_client: DynamicClient | None = None) -> str: category=DeprecationWarning, stacklevel=2, ) - # Intentionally bypass _resolve_privileged_client: get_domstate is already deprecated - # (DeprecationWarning above), so we silently fall back to self.client to avoid double-warning. return self.execute_virsh_command(command="domstate", privileged_client=privileged_client or self.client) def get_dommemstat(self, privileged_client: DynamicClient | None = None) -> str: @@ -744,12 +633,7 @@ def get_dommemstat(self, privileged_client: DynamicClient | None = None) -> str: Returns: String: VMI domain memory stats as string """ - _client = self._resolve_privileged_client( - privileged_client=privileged_client, - fallback_client=self.client, - method_name="get_dommemstat", - ) - return self.execute_virsh_command(command="dommemstat", privileged_client=_client) + return self.execute_virsh_command(command="dommemstat", privileged_client=privileged_client or self.client) def get_vmi_active_condition(self): """A VMI may have multiple conditions; the active one it the one with @@ -770,18 +654,15 @@ def get_xml_dict(self, privileged_client: DynamicClient | None = None) -> dict[s Returns: dict: Parsed XML of the VMI. """ - _client = self._resolve_privileged_client( - privileged_client=privileged_client, - fallback_client=self.client, - method_name="get_xml_dict", + return xmltodict.parse( + xml_input=self.get_xml(privileged_client=privileged_client or self.client), process_namespaces=True ) - return xmltodict.parse(xml_input=self.get_xml(privileged_client=_client), process_namespaces=True) @property def xml_dict(self) -> dict[str, Any]: """Get virtual machine instance XML as dict""" warn( - "'xml_dict' property is deprecated, use 'get_xml_dict(privileged_client=...)' instead.", + "'xml_dict' property is deprecated, use 'get_xml_dict' instead.", category=DeprecationWarning, stacklevel=2, ) @@ -820,14 +701,9 @@ def execute_virsh_command(self, command: str, privileged_client: DynamicClient | Returns: str: Command output. """ - _client = self._resolve_privileged_client( - privileged_client=privileged_client, - fallback_client=self.client, - method_name="execute_virsh_command", - ) - pod = self.get_virt_launcher_pod(privileged_client=_client) - hypervisor_uri = self._get_hypervisor_connection_uri(pod=pod) + pod = self.get_virt_launcher_pod(privileged_client=privileged_client or self.client) + hypervisor_uri = self.get_hypervisor_connection_uri(pod=pod) return pod.execute( - command=self._build_virsh_cmd(action=command, hypervisor_uri=hypervisor_uri), + command=self.build_virsh_cmd(action=command, hypervisor_uri=hypervisor_uri), container="compute", ) From 0b258164d28c3e7e0a83b22d5f90b8d8e06aeec7 Mon Sep 17 00:00:00 2001 From: rnetser Date: Mon, 16 Feb 2026 15:44:25 +0200 Subject: [PATCH 4/7] udpate clients --- ocp_resources/virtual_machine_instance.py | 43 ++++++++++------------- 1 file changed, 18 insertions(+), 25 deletions(-) diff --git a/ocp_resources/virtual_machine_instance.py b/ocp_resources/virtual_machine_instance.py index 959e1723b1..db199edec8 100644 --- a/ocp_resources/virtual_machine_instance.py +++ b/ocp_resources/virtual_machine_instance.py @@ -266,7 +266,6 @@ def api_request( url: str = "", retry_params: dict[str, int] | None = None, client: DynamicClient | None = None, - privileged_client: DynamicClient | None = None, **params: Any, ) -> dict[str, Any]: default_vmi_api_request_retry_params: dict[str, int] = {"timeout": TIMEOUT_30SEC, "sleep_time": TIMEOUT_5SEC} @@ -275,26 +274,26 @@ def api_request( action=action, url=url or self._subresource_api_url, retry_params=retry_params or default_vmi_api_request_retry_params, - client=privileged_client or client, + client=client or self.client, **params, ) def pause( self, timeout: int = TIMEOUT_4MINUTES, wait: bool = False, privileged_client: DynamicClient | None = None ) -> None: - self.api_request(method="PUT", action="pause", privileged_client=privileged_client or self.client) + self.api_request(method="PUT", action="pause", client=privileged_client or self.client) if wait: self.wait_for_pause_status(pause=True, timeout=timeout) def unpause( self, timeout: int = TIMEOUT_4MINUTES, wait: bool = False, privileged_client: DynamicClient | None = None ) -> None: - self.api_request(method="PUT", action="unpause", privileged_client=privileged_client or self.client) + self.api_request(method="PUT", action="unpause", client=privileged_client or self.client) if wait: self.wait_for_pause_status(pause=False, timeout=timeout) - def reset(self, privileged_client: DynamicClient | None = None) -> dict[str, Any]: - return self.api_request(method="PUT", action="reset", privileged_client=privileged_client) + def reset(self, client: DynamicClient | None = None) -> dict[str, Any]: + return self.api_request(method="PUT", action="reset", client=client or self.client) @property def interfaces(self): @@ -324,6 +323,8 @@ def get_virt_launcher_pod(self, privileged_client: DynamicClient | None = None) migration_state = self.instance.status.migrationState if migration_state: + # After VM migration there are two pods, one in Completed status and one in Running status. + # We need to return the Pod that is not in Completed status. for pod in pods: if migration_state.targetPod == pod.name: return pod @@ -399,8 +400,8 @@ def wait_until_running( try: virt_pod = self.get_virt_launcher_pod(privileged_client=privileged_client or self.client) self.logger.error(f"Status of virt-launcher pod {virt_pod.name}: {virt_pod.status}") - self.logger.error(f"{virt_pod.name} *****LOGS*****") - self.logger.error(virt_pod.log(container="compute")) + self.logger.debug(f"{virt_pod.name} *****LOGS*****") + self.logger.debug(virt_pod.log(container="compute")) except ResourceNotFoundError as virt_pod_ex: self.logger.error(virt_pod_ex) raise sampler_ex from virt_pod_ex @@ -539,29 +540,21 @@ def get_hypervisor_connection_uri(pod: Pod) -> str: ) return f"-c qemu+unix:///session?socket=/var/run/libvirt/{socket}-sock" - def build_virsh_cmd(self, action: str, hypervisor_uri: str) -> list[str]: - """Build a virsh command list. + def virsh_cmd(self, action: str, privileged_client: DynamicClient | None = None) -> list[str]: + """Build a virsh command list for the given action. Args: action: The virsh action to perform (e.g., 'dumpxml', 'domstate'). - hypervisor_uri: The hypervisor connection URI. + privileged_client: Client with elevated privileges. Returns: list[str]: The command as a list of strings. """ + _client = privileged_client or self.client + pod = self.get_virt_launcher_pod(privileged_client=_client) + hypervisor_uri = self.get_hypervisor_connection_uri(pod=pod) return shlex.split(f"virsh {hypervisor_uri} {action} {self.namespace}_{self.name}") - def virsh_cmd(self, action: str) -> list[str]: - warn( - "'virsh_cmd' method is deprecated, use 'execute_virsh_command' instead.", - category=DeprecationWarning, - stacklevel=2, - ) - hypervisor_uri = self.get_hypervisor_connection_uri( - pod=self.get_virt_launcher_pod(privileged_client=self.client) - ) - return self.build_virsh_cmd(action=action, hypervisor_uri=hypervisor_uri) - def get_xml(self, privileged_client: DynamicClient | None = None) -> str: """ Get virtual machine instance XML @@ -701,9 +694,9 @@ def execute_virsh_command(self, command: str, privileged_client: DynamicClient | Returns: str: Command output. """ - pod = self.get_virt_launcher_pod(privileged_client=privileged_client or self.client) - hypervisor_uri = self.get_hypervisor_connection_uri(pod=pod) + _client = privileged_client or self.client + pod = self.get_virt_launcher_pod(privileged_client=_client) return pod.execute( - command=self.build_virsh_cmd(action=command, hypervisor_uri=hypervisor_uri), + command=self.virsh_cmd(action=command, privileged_client=_client), container="compute", ) From cd9d51bcd063ec17b8c9a83f249e316987ad2b55 Mon Sep 17 00:00:00 2001 From: rnetser Date: Mon, 16 Feb 2026 16:12:50 +0200 Subject: [PATCH 5/7] remvoe used once --- ocp_resources/resource.py | 4 +-- ocp_resources/virtual_machine.py | 10 +----- ocp_resources/virtual_machine_instance.py | 44 ++++++++++------------- 3 files changed, 21 insertions(+), 37 deletions(-) diff --git a/ocp_resources/resource.py b/ocp_resources/resource.py index bcc517d7e6..68ea0bea1d 100644 --- a/ocp_resources/resource.py +++ b/ocp_resources/resource.py @@ -1318,7 +1318,6 @@ def api_request( action: str, url: str, retry_params: dict[str, int] | None = None, - client: DynamicClient | None = None, **params: Any, ) -> dict[str, Any]: """ @@ -1329,13 +1328,12 @@ def api_request( action (str): Action to perform (stop/start/guestosinfo etc.). url (str): URL of resource. retry_params (dict): dict of timeout and sleep_time values for retrying the api request call - client (DynamicClient): Optional client to use for the request. Defaults to self.client. Returns: data(dict): response data """ - client = client or self.client + client: DynamicClient = self.client api_request_params = { "url": f"{url}/{action}", "method": method, diff --git a/ocp_resources/virtual_machine.py b/ocp_resources/virtual_machine.py index eb224e6341..dce4bf96bf 100644 --- a/ocp_resources/virtual_machine.py +++ b/ocp_resources/virtual_machine.py @@ -1,6 +1,5 @@ from typing import Any -from kubernetes.dynamic import DynamicClient from timeout_sampler import TimeoutSampler from ocp_resources.resource import NamespacedResource @@ -74,13 +73,7 @@ def _subresource_api_url(self): ) def api_request( - self, - method: str, - action: str, - url: str = "", - retry_params: dict[str, int] | None = None, - client: DynamicClient | None = None, - **params: Any, + self, method: str, action: str, url: str = "", retry_params: dict[str, int] | None = None, **params: Any ) -> dict[str, Any]: default_vm_api_request_retry_params: dict[str, int] = {"timeout": TIMEOUT_30SEC, "sleep_time": TIMEOUT_5SEC} return super().api_request( @@ -88,7 +81,6 @@ def api_request( action=action, url=url or self._subresource_api_url, retry_params=retry_params or default_vm_api_request_retry_params, - client=client, **params, ) diff --git a/ocp_resources/virtual_machine_instance.py b/ocp_resources/virtual_machine_instance.py index db199edec8..f0b2461f86 100644 --- a/ocp_resources/virtual_machine_instance.py +++ b/ocp_resources/virtual_machine_instance.py @@ -260,13 +260,7 @@ def _subresource_api_url(self): ) def api_request( - self, - method: str, - action: str, - url: str = "", - retry_params: dict[str, int] | None = None, - client: DynamicClient | None = None, - **params: Any, + self, method: str, action: str, url: str = "", retry_params: dict[str, int] | None = None, **params: Any ) -> dict[str, Any]: default_vmi_api_request_retry_params: dict[str, int] = {"timeout": TIMEOUT_30SEC, "sleep_time": TIMEOUT_5SEC} return super().api_request( @@ -274,26 +268,21 @@ def api_request( action=action, url=url or self._subresource_api_url, retry_params=retry_params or default_vmi_api_request_retry_params, - client=client or self.client, **params, ) - def pause( - self, timeout: int = TIMEOUT_4MINUTES, wait: bool = False, privileged_client: DynamicClient | None = None - ) -> None: - self.api_request(method="PUT", action="pause", client=privileged_client or self.client) + def pause(self, timeout=TIMEOUT_4MINUTES, wait=False): + self.api_request(method="PUT", action="pause") if wait: self.wait_for_pause_status(pause=True, timeout=timeout) - def unpause( - self, timeout: int = TIMEOUT_4MINUTES, wait: bool = False, privileged_client: DynamicClient | None = None - ) -> None: - self.api_request(method="PUT", action="unpause", client=privileged_client or self.client) + def unpause(self, timeout=TIMEOUT_4MINUTES, wait=False): + self.api_request(method="PUT", action="unpause") if wait: self.wait_for_pause_status(pause=False, timeout=timeout) - def reset(self, client: DynamicClient | None = None) -> dict[str, Any]: - return self.api_request(method="PUT", action="reset", client=client or self.client) + def reset(self) -> dict[str, Any]: + return self.api_request(method="PUT", action="reset") @property def interfaces(self): @@ -362,7 +351,7 @@ def get_virt_handler_pod(self, privileged_client: DynamicClient | None = None) - if pod.instance["spec"]["nodeName"] == self.instance.status.nodeName: return pod - raise ResourceNotFoundError + raise ResourceNotFoundError(f"virt-handler pod not found on node {self.instance.status.nodeName}") @property def virt_handler_pod(self) -> Pod: @@ -517,6 +506,7 @@ def is_pod_root(pod: Pod) -> bool: bool: True if the pod runs as root (UID 0 or unset). """ uid = VirtualMachineInstance.get_pod_user_uid(pod=pod) + # In KubeVirt, virt-launcher pods without explicit runAsUser run as root (UID 0). return uid is None or uid == 0 @staticmethod @@ -540,18 +530,22 @@ def get_hypervisor_connection_uri(pod: Pod) -> str: ) return f"-c qemu+unix:///session?socket=/var/run/libvirt/{socket}-sock" - def virsh_cmd(self, action: str, privileged_client: DynamicClient | None = None) -> list[str]: + def virsh_cmd( + self, action: str, privileged_client: DynamicClient | None = None, pod: Pod | None = None + ) -> list[str]: """Build a virsh command list for the given action. Args: action: The virsh action to perform (e.g., 'dumpxml', 'domstate'). privileged_client: Client with elevated privileges. + pod: Optional virt-launcher pod (avoids re-fetching if already available). Returns: list[str]: The command as a list of strings. """ - _client = privileged_client or self.client - pod = self.get_virt_launcher_pod(privileged_client=_client) + if pod is None: + _client = privileged_client or self.client + pod = self.get_virt_launcher_pod(privileged_client=_client) hypervisor_uri = self.get_hypervisor_connection_uri(pod=pod) return shlex.split(f"virsh {hypervisor_uri} {action} {self.namespace}_{self.name}") @@ -579,7 +573,7 @@ def virt_launcher_pod_user_uid(self) -> int | None: @property def is_virt_launcher_pod_root(self) -> bool: warn( - "'is_virt_launcher_pod_root' property is deprecated, use 'is_pod_root(pod=...)' instead.", + "'is_virt_launcher_pod_root' property is deprecated, use 'is_pod_root' instead.", category=DeprecationWarning, stacklevel=2, ) @@ -589,7 +583,7 @@ def is_virt_launcher_pod_root(self) -> bool: def virt_launcher_pod_hypervisor_connection_uri(self) -> str: warn( "'virt_launcher_pod_hypervisor_connection_uri' property is deprecated, " - "use 'get_hypervisor_connection_uri(pod=...)' instead.", + "use 'get_hypervisor_connection_uri' instead.", category=DeprecationWarning, stacklevel=2, ) @@ -697,6 +691,6 @@ def execute_virsh_command(self, command: str, privileged_client: DynamicClient | _client = privileged_client or self.client pod = self.get_virt_launcher_pod(privileged_client=_client) return pod.execute( - command=self.virsh_cmd(action=command, privileged_client=_client), + command=self.virsh_cmd(action=command, pod=pod), container="compute", ) From 7cb86eaeaa3c17b84f988d0d9a20ce058578e67c Mon Sep 17 00:00:00 2001 From: rnetser Date: Mon, 16 Feb 2026 17:37:34 +0200 Subject: [PATCH 6/7] update virsh_cmd --- ocp_resources/virtual_machine_instance.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ocp_resources/virtual_machine_instance.py b/ocp_resources/virtual_machine_instance.py index f0b2461f86..aafcc9f9f2 100644 --- a/ocp_resources/virtual_machine_instance.py +++ b/ocp_resources/virtual_machine_instance.py @@ -543,9 +543,10 @@ def virsh_cmd( Returns: list[str]: The command as a list of strings. """ + # For backward compatibility if pod is None: - _client = privileged_client or self.client - pod = self.get_virt_launcher_pod(privileged_client=_client) + pod = self.get_virt_launcher_pod(privileged_client=privileged_client or self.client) + hypervisor_uri = self.get_hypervisor_connection_uri(pod=pod) return shlex.split(f"virsh {hypervisor_uri} {action} {self.namespace}_{self.name}") @@ -688,8 +689,7 @@ def execute_virsh_command(self, command: str, privileged_client: DynamicClient | Returns: str: Command output. """ - _client = privileged_client or self.client - pod = self.get_virt_launcher_pod(privileged_client=_client) + pod = self.get_virt_launcher_pod(privileged_client=privileged_client or self.client) return pod.execute( command=self.virsh_cmd(action=command, pod=pod), container="compute", From 47ff81f14e29afab17aa16480529d4919c591609 Mon Sep 17 00:00:00 2001 From: rnetser Date: Tue, 17 Feb 2026 14:41:48 +0200 Subject: [PATCH 7/7] restore ocp_resources/resource.py - not chnage needed --- ocp_resources/resource.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/ocp_resources/resource.py b/ocp_resources/resource.py index 68ea0bea1d..79c44a0c2e 100644 --- a/ocp_resources/resource.py +++ b/ocp_resources/resource.py @@ -1313,12 +1313,7 @@ def wait_for_condition( return def api_request( - self, - method: str, - action: str, - url: str, - retry_params: dict[str, int] | None = None, - **params: Any, + self, method: str, action: str, url: str, retry_params: dict[str, int] | None = None, **params: Any ) -> dict[str, Any]: """ Handle API requests to resource.