diff --git a/ocp_resources/virtual_machine_instance.py b/ocp_resources/virtual_machine_instance.py index 0aa172c292..bc9555e50a 100644 --- a/ocp_resources/virtual_machine_instance.py +++ b/ocp_resources/virtual_machine_instance.py @@ -6,6 +6,7 @@ from warnings import warn import xmltodict +from kubernetes.dynamic import DynamicClient from kubernetes.dynamic.exceptions import ResourceNotFoundError from timeout_sampler import TimeoutExpiredError, TimeoutSampler @@ -13,7 +14,12 @@ from ocp_resources.node import Node from ocp_resources.pod import Pod from ocp_resources.resource import NamespacedResource -from ocp_resources.utils.constants import PROTOCOL_ERROR_EXCEPTION_DICT, TIMEOUT_4MINUTES, TIMEOUT_5SEC, TIMEOUT_30SEC +from ocp_resources.utils.constants import ( + PROTOCOL_ERROR_EXCEPTION_DICT, + TIMEOUT_4MINUTES, + TIMEOUT_5SEC, + TIMEOUT_30SEC, +) class VirtualMachineInstance(NamespacedResource): @@ -268,12 +274,12 @@ def api_request( def pause(self, timeout=TIMEOUT_4MINUTES, wait=False): self.api_request(method="PUT", action="pause") if wait: - return self.wait_for_pause_status(pause=True, timeout=timeout) + self.wait_for_pause_status(pause=True, timeout=timeout) def unpause(self, timeout=TIMEOUT_4MINUTES, wait=False): self.api_request(method="PUT", action="unpause") if wait: - return self.wait_for_pause_status(pause=False, timeout=timeout) + self.wait_for_pause_status(pause=False, timeout=timeout) def reset(self) -> dict[str, Any]: return self.api_request(method="PUT", action="reset") @@ -282,11 +288,21 @@ def reset(self) -> dict[str, Any]: def interfaces(self): return self.instance.status.interfaces - @property - def virt_launcher_pod(self): + def get_virt_launcher_pod(self, privileged_client: DynamicClient | None = None) -> Pod: + """Get the virt-launcher pod for this VMI. + + Args: + privileged_client: Client with elevated privileges for pod listing. + + Returns: + Pod: The virt-launcher pod. + + Raises: + ResourceNotFoundError: If no virt-launcher pod is found. + """ pods = list( Pod.get( - client=self.client, + client=privileged_client or self.client, namespace=self.namespace, label_selector=f"kubevirt.io=virt-launcher,kubevirt.io/created-by={self.instance.metadata.uid}", ) @@ -301,14 +317,33 @@ def virt_launcher_pod(self): for pod in pods: if migration_state.targetPod == pod.name: return pod - else: - return pods[0] + + return pods[0] @property - def virt_handler_pod(self): + def virt_launcher_pod(self) -> Pod: + warn( + "'virt_launcher_pod' property is deprecated, use 'get_virt_launcher_pod' instead.", + category=DeprecationWarning, + stacklevel=2, + ) + return self.get_virt_launcher_pod(privileged_client=self.client) + + def get_virt_handler_pod(self, privileged_client: DynamicClient | None = None) -> Pod: + """Get the virt-handler pod running on the same node as this VMI. + + Args: + privileged_client: Client with elevated privileges for pod listing. + + Returns: + Pod: The virt-handler pod. + + Raises: + ResourceNotFoundError: If no matching virt-handler pod is found. + """ pods = list( Pod.get( - client=self.client, + client=privileged_client or self.client, label_selector="kubevirt.io=virt-handler", ) ) @@ -316,9 +351,24 @@ def virt_handler_pod(self): if pod.instance["spec"]["nodeName"] == self.instance.status.nodeName: return pod - raise ResourceNotFoundError + raise ResourceNotFoundError(f"virt-handler pod not found on node {self.instance.status.nodeName}") + + @property + def virt_handler_pod(self) -> Pod: + warn( + "'virt_handler_pod' property is deprecated, use 'get_virt_handler_pod' instead.", + category=DeprecationWarning, + stacklevel=2, + ) + return self.get_virt_handler_pod(privileged_client=self.client) - def wait_until_running(self, timeout=TIMEOUT_4MINUTES, logs=True, stop_status=None): + def wait_until_running( + self, + timeout: int = TIMEOUT_4MINUTES, + logs: bool = True, + stop_status: str | None = None, + privileged_client: DynamicClient | None = None, + ) -> None: """ Wait until VMI is running @@ -326,6 +376,7 @@ def wait_until_running(self, timeout=TIMEOUT_4MINUTES, logs=True, stop_status=No timeout (int): Time to wait for VMI. logs (bool): True to extract logs from the VMI pod and from the VMI. stop_status (str): Status which should stop the wait and failed. + privileged_client: Client with elevated privileges for error-path pod access. Raises: TimeoutExpiredError: If VMI failed to run. @@ -336,7 +387,7 @@ def wait_until_running(self, timeout=TIMEOUT_4MINUTES, logs=True, stop_status=No if not logs: raise try: - virt_pod = self.virt_launcher_pod + virt_pod = self.get_virt_launcher_pod(privileged_client=privileged_client or self.client) self.logger.error(f"Status of virt-launcher pod {virt_pod.name}: {virt_pod.status}") self.logger.debug(f"{virt_pod.name} *****LOGS*****") self.logger.debug(virt_pod.log(container="compute")) @@ -400,84 +451,157 @@ def wait_for_vmi_condition_pause_status(self, pause, timeout=TIMEOUT_4MINUTES): if not (pause and sample.get("reason")): return + def get_node(self, privileged_client: DynamicClient | None = None) -> Node: + """Get the node where this VMI is running. + + Args: + privileged_client: Client with elevated privileges for node access. + + Returns: + Node: The node running this VMI. + """ + return Node( + client=privileged_client or self.client, + name=self.instance.status.nodeName, + ) + @property - def node(self): + def node(self) -> Node: """ Get the node name where the VM is running Returns: Node: Node """ - return Node( - client=self.client, - name=self.instance.status.nodeName, + warn( + "'node' property is deprecated, use 'get_node' instead.", + category=DeprecationWarning, + stacklevel=2, ) + return self.get_node(privileged_client=self.client) - def virsh_cmd(self, action): - return shlex.split( - f"virsh {self.virt_launcher_pod_hypervisor_connection_uri} {action} {self.namespace}_{self.name}" - ) + @staticmethod + def get_pod_user_uid(pod: Pod) -> int | None: + """Get the runAsUser UID from a pod's security context. - def get_xml(self): - """ - Get virtual machine instance XML + Args: + pod: The pod to inspect. Returns: - xml_output(string): VMI XML in the multi-line string + int | None: The runAsUser UID value, or None if not set. """ - return self.execute_virsh_command(command="dumpxml") + security_context = pod.instance.spec.get("securityContext", {}) + if security_context: + return security_context.get("runAsUser") + return None - @property - def virt_launcher_pod_user_uid(self): - """ - Get Virt Launcher Pod User UID value + @staticmethod + def is_pod_root(pod: Pod) -> bool: + """Check if a pod runs as root. + + Args: + pod: The pod to inspect. Returns: - Int: Virt Launcher Pod UID value + bool: True if the pod runs as root (UID 0 or unset). """ - return self.virt_launcher_pod.instance.spec.securityContext.runAsUser + uid = VirtualMachineInstance.get_pod_user_uid(pod=pod) + # In KubeVirt, virt-launcher pods without explicit runAsUser run as root (UID 0). + return uid is None or uid == 0 - @property - def is_virt_launcher_pod_root(self): + @staticmethod + def get_hypervisor_connection_uri(pod: Pod) -> str: + """Get the hypervisor connection URI for a pod. + + Note: This method executes a remote command on the pod to detect the libvirt socket type. + + Args: + pod: The virt-launcher pod. + + Returns: + str: The hypervisor connection URI string. """ - Check if Virt Launcher Pod is Root + if VirtualMachineInstance.is_pod_root(pod=pod): + return "" + + virtqemud_socket = "virtqemud" + socket = ( + virtqemud_socket + if virtqemud_socket in pod.execute(command=["ls", "/var/run/libvirt/"], container="compute") + else "libvirt" + ) + return f"-c qemu+unix:///session?socket=/var/run/libvirt/{socket}-sock" + + def virsh_cmd( + self, action: str, privileged_client: DynamicClient | None = None, pod: Pod | None = None + ) -> list[str]: + """Build a virsh command list for the given action. + + Args: + action: The virsh action to perform (e.g., 'dumpxml', 'domstate'). + privileged_client: Client with elevated privileges. + pod: Optional virt-launcher pod (avoids re-fetching if already available). Returns: - Bool: True if Virt Launcher Pod is Root. + list[str]: The command as a list of strings. """ - return not bool(self.virt_launcher_pod_user_uid) + # For backward compatibility + if pod is None: + pod = self.get_virt_launcher_pod(privileged_client=privileged_client or self.client) - @property - def virt_launcher_pod_hypervisor_connection_uri(self): + hypervisor_uri = self.get_hypervisor_connection_uri(pod=pod) + return shlex.split(f"virsh {hypervisor_uri} {action} {self.namespace}_{self.name}") + + def get_xml(self, privileged_client: DynamicClient | None = None) -> str: """ - Get Virt Launcher Pod Hypervisor Connection URI + Get virtual machine instance XML - Required to connect to Hypervisor for - Non-Root Virt-Launcher Pod. + Args: + privileged_client: Client with elevated privileges. Returns: - String: Hypervisor Connection URI + xml_output(string): VMI XML in the multi-line string """ - if self.is_virt_launcher_pod_root: - hypervisor_connection_uri = "" - else: - virtqemud_socket = "virtqemud" - socket = ( - virtqemud_socket - if virtqemud_socket - in self.virt_launcher_pod.execute(command=["ls", "/var/run/libvirt/"], container="compute") - else "libvirt" - ) - hypervisor_connection_uri = f"-c qemu+unix:///session?socket=/var/run/libvirt/{socket}-sock" - return hypervisor_connection_uri + return self.execute_virsh_command(command="dumpxml", privileged_client=privileged_client) + + @property + def virt_launcher_pod_user_uid(self) -> int | None: + warn( + "'virt_launcher_pod_user_uid' property is deprecated, use 'get_pod_user_uid' instead.", + category=DeprecationWarning, + stacklevel=2, + ) + return self.get_pod_user_uid(pod=self.get_virt_launcher_pod(privileged_client=self.client)) + + @property + def is_virt_launcher_pod_root(self) -> bool: + warn( + "'is_virt_launcher_pod_root' property is deprecated, use 'is_pod_root' instead.", + category=DeprecationWarning, + stacklevel=2, + ) + return self.is_pod_root(pod=self.get_virt_launcher_pod(privileged_client=self.client)) - def get_domstate(self): + @property + def virt_launcher_pod_hypervisor_connection_uri(self) -> str: + warn( + "'virt_launcher_pod_hypervisor_connection_uri' property is deprecated, " + "use 'get_hypervisor_connection_uri' instead.", + category=DeprecationWarning, + stacklevel=2, + ) + return self.get_hypervisor_connection_uri(pod=self.get_virt_launcher_pod(privileged_client=self.client)) + + def get_domstate(self, privileged_client: DynamicClient | None = None) -> str: """ Get virtual machine instance Status. Current workaround, as VM/VMI shows no status/phase == Paused yet. Bug: https://bugzilla.redhat.com/show_bug.cgi?id=1805178 + Args: + privileged_client: Client with elevated privileges. + Returns: String: VMI Status as string """ @@ -486,17 +610,20 @@ def get_domstate(self): category=DeprecationWarning, stacklevel=2, ) - return self.execute_virsh_command(command="domstate") + return self.execute_virsh_command(command="domstate", privileged_client=privileged_client) - def get_dommemstat(self): + def get_dommemstat(self, privileged_client: DynamicClient | None = None) -> str: """ Get virtual machine domain memory stats link: https://libvirt.org/manpages/virsh.html#dommemstat + Args: + privileged_client: Client with elevated privileges. + Returns: String: VMI domain memory stats as string """ - return self.execute_virsh_command(command="dommemstat") + return self.execute_virsh_command(command="dommemstat", privileged_client=privileged_client) def get_vmi_active_condition(self): """A VMI may have multiple conditions; the active one it the one with @@ -508,11 +635,26 @@ def get_vmi_active_condition(self): if condition["lastTransitionTime"] } + def get_xml_dict(self, privileged_client: DynamicClient | None = None) -> dict[str, Any]: + """Get virtual machine instance XML as dict. + + Args: + privileged_client: Client with elevated privileges. + + Returns: + dict: Parsed XML of the VMI. + """ + return xmltodict.parse(xml_input=self.get_xml(privileged_client=privileged_client), process_namespaces=True) + @property - def xml_dict(self): + def xml_dict(self) -> dict[str, Any]: """Get virtual machine instance XML as dict""" - - return xmltodict.parse(xml_input=self.get_xml(), process_namespaces=True) + warn( + "'xml_dict' property is deprecated, use 'get_xml_dict' instead.", + category=DeprecationWarning, + stacklevel=2, + ) + return self.get_xml_dict(privileged_client=self.client) @property def guest_os_info(self): @@ -537,8 +679,18 @@ def interface_ip(self, interface): iface_ip = [iface["ipAddress"] for iface in self.interfaces if iface["interfaceName"] == interface] return iface_ip[0] if iface_ip else None - def execute_virsh_command(self, command): - return self.virt_launcher_pod.execute( - command=self.virsh_cmd(action=command), + def execute_virsh_command(self, command: str, privileged_client: DynamicClient | None = None) -> str: + """Execute a virsh command in the virt-launcher pod. + + Args: + command: The virsh command to execute (e.g., 'dumpxml', 'domstate'). + privileged_client: Client with elevated privileges. + + Returns: + str: Command output. + """ + pod = self.get_virt_launcher_pod(privileged_client=privileged_client or self.client) + return pod.execute( + command=self.virsh_cmd(action=command, pod=pod), container="compute", )