Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions src/azure-cli-testsdk/azure/cli/testsdk/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import inspect
import unittest
import tempfile
import time
import random

from .scenario_tests import (IntegrationTestBase, ReplayableTest, SubscriptionRecordingProcessor,
LargeRequestBodyProcessor,
Expand Down Expand Up @@ -78,6 +80,129 @@ def is_empty(self): # pylint: disable=no-self-use
from azure.cli.testsdk.checkers import NoneCheck
return NoneCheck()

@staticmethod
def _is_provisioning_state_check(check):
"""Return True if *check* is a JMESPathCheck asserting provisioningState == 'Succeeded'."""
from azure.cli.testsdk.checkers import JMESPathCheck
if not isinstance(check, JMESPathCheck):
return False
query = getattr(check, '_query', '')
if not isinstance(query, str):
return False
query = query.lower()
is_provisioning_state = query == 'provisioningstate' or query.endswith('.provisioningstate')
return is_provisioning_state and getattr(check, '_expected_result', '') == 'Succeeded'

def _should_retry_for_provisioning_state(self, checks):
"""Check if any JMESPathCheck asserts provisioningState == 'Succeeded'."""
env_val = os.environ.get('AZURE_CLI_TEST_RETRY_PROVISIONING_CHECK', 'false').lower()
if not checks or env_val != 'true':
return False

checks_list = checks if isinstance(checks, list) else [checks]

for check in checks_list:
if self._is_provisioning_state_check(check):
return True
return False

def _cmd_with_retry(self, command, checks, cli_ctx):
"""Execute command with two-phase check validation.

Phase 1: Validate provisioningState == Succeeded, retrying with poll if needed.
Phase 2: Validate all remaining checks on the original result.

Uses etag to detect external modifications (e.g. Azure Policy).
"""
import jmespath
from azure.cli.testsdk.exceptions import JMESPathCheckAssertionError

max_retries = int(os.environ.get('AZURE_CLI_TEST_MAX_RETRIES', '10'))
base_delay = int(os.environ.get('AZURE_CLI_TEST_BASE_DELAY', '2'))
max_delay = int(os.environ.get('AZURE_CLI_TEST_MAX_DELAY', '60'))

# Split checks into Phase 1 (provisioningState) and Phase 2 (everything else)
checks_list = checks if isinstance(checks, list) else [checks]
ps_checks = []
other_checks = []
for c in checks_list:
if self._is_provisioning_state_check(c):
ps_checks.append(c)
else:
other_checks.append(c)

# Execute the original command once
result = execute(cli_ctx, command, expect_failure=False)

# Phase 1: Is the resource ready?
try:
result.assert_with_checks(ps_checks)
except JMESPathCheckAssertionError:

# Extract resource id and etag for polling
try:
json_value = result.get_output_in_json()
resource_id = jmespath.search('id', json_value)
original_etag = jmespath.search('etag', json_value) or \
jmespath.search('properties.etag', json_value)
except (KeyError, TypeError, ValueError, AttributeError):
resource_id = None
original_etag = None

if not resource_id:
raise

# Poll with generic ARM GET until provisioningState is terminal
poll_command = 'resource show --ids {}'.format(resource_id)
actual_state = None
current_etag = None
last_seen_etag = original_etag

logger.warning(
"provisioningState was not 'Succeeded' for resource '%s'. "
"Polling with '%s' (max %d retries)...",
resource_id, poll_command, max_retries)

for attempt in range(max_retries):
delay = min(base_delay * (2 ** attempt) + random.uniform(0, 1), max_delay)
time.sleep(delay)

poll_result = execute(cli_ctx, poll_command, expect_failure=False)

try:
poll_json = poll_result.get_output_in_json()
actual_state = jmespath.search('properties.provisioningState', poll_json)
current_etag = jmespath.search('etag', poll_json)
except (KeyError, TypeError, ValueError, AttributeError):
actual_state = None
current_etag = None

if last_seen_etag and current_etag and current_etag != last_seen_etag:
logger.warning(
"ETag changed ('%s' -> '%s'): resource modified externally "
"(likely Azure Policy). Waiting for it to complete...",
last_seen_etag[:16], current_etag[:16])
last_seen_etag = current_etag

if actual_state == 'Succeeded':
break

if actual_state in ('Failed', 'Canceled'):
raise AssertionError(
"Resource '{}' reached terminal state '{}' after external modification.".format(
resource_id, actual_state))
else:
raise TimeoutError(
"Resource '{}' did not reach 'Succeeded' after {} retries. "
"Last state: '{}'. Original ETag: '{}', Current ETag: '{}'.".format(
resource_id, max_retries, actual_state, original_etag, current_etag))

# Phase 2: Validate the operation result
if other_checks:
result.assert_with_checks(other_checks)

return result


class ScenarioTest(ReplayableTest, CheckerMixin, unittest.TestCase):
def __init__(self, method_name, config_file=None, recording_name=None,
Expand Down Expand Up @@ -174,6 +299,9 @@ def create_guid(self):

def cmd(self, command, checks=None, expect_failure=False):
command = self._apply_kwargs(command)
# Only retry in live mode — playback recordings have a fixed HTTP sequence
if self.is_live and not expect_failure and self._should_retry_for_provisioning_state(checks):
return self._cmd_with_retry(command, checks, self.cli_ctx)
return execute(self.cli_ctx, command, expect_failure=expect_failure).assert_with_checks(checks)

def get_subscription_id(self):
Expand Down Expand Up @@ -230,6 +358,8 @@ def setUp(self):

def cmd(self, command, checks=None, expect_failure=False):
command = self._apply_kwargs(command)
if not expect_failure and self._should_retry_for_provisioning_state(checks):
return self._cmd_with_retry(command, checks, self.cli_ctx)
return execute(self.cli_ctx, command, expect_failure=expect_failure).assert_with_checks(checks)

def get_subscription_id(self):
Expand Down
Loading