diff --git a/src/azure-cli-core/azure/cli/core/__init__.py b/src/azure-cli-core/azure/cli/core/__init__.py index 5b348c393f0..35777f7264d 100644 --- a/src/azure-cli-core/azure/cli/core/__init__.py +++ b/src/azure-cli-core/azure/cli/core/__init__.py @@ -792,7 +792,8 @@ def get(self, args): return None # Get the top-level command, like `network` in `network vnet create -h` - top_command = args[0] + # Normalize top-level command for index lookup so mixed-case commands hit key + top_command = args[0].lower() index = self.INDEX[self._COMMAND_INDEX] # Check the command index for (command: [module]) mapping, like # "network": ["azure.cli.command_modules.natgateway", "azure.cli.command_modules.network", "azext_firewall"] diff --git a/src/azure-cli-core/azure/cli/core/commands/__init__.py b/src/azure-cli-core/azure/cli/core/commands/__init__.py index 0c094694f0a..9ab97f29b6c 100644 --- a/src/azure-cli-core/azure/cli/core/commands/__init__.py +++ b/src/azure-cli-core/azure/cli/core/commands/__init__.py @@ -512,14 +512,11 @@ def execute(self, args): EVENT_INVOKER_FILTER_RESULT) from azure.cli.core.commands.events import ( EVENT_INVOKER_PRE_CMD_TBL_TRUNCATE, EVENT_INVOKER_PRE_LOAD_ARGUMENTS, EVENT_INVOKER_POST_LOAD_ARGUMENTS) - from azure.cli.core.util import roughly_parse_command_with_casing - # TODO: Can't simply be invoked as an event because args are transformed - command_preserve_casing = roughly_parse_command_with_casing(args) args = _pre_command_table_create(self.cli_ctx, args) if self._should_show_cached_help(args): - result = self._try_show_cached_help(command_preserve_casing, args) + result = self._try_show_cached_help(args) if result: return result @@ -593,7 +590,7 @@ def execute(self, args): logger.debug("Failed to cache help data: %s", ex) # TODO: No event in base with which to target - telemetry.set_command_details('az', command_preserve_casing=command_preserve_casing) + telemetry.set_command_details('az') telemetry.set_success(summary='welcome') return CommandResultItem(None, exit_code=0) @@ -648,8 +645,7 @@ def execute(self, args): pass telemetry.set_command_details(self.cli_ctx.data['command'], self.data['output'], self.cli_ctx.data['safe_params'], - extension_name=extension_name, extension_version=extension_version, - command_preserve_casing=command_preserve_casing) + extension_name=extension_name, extension_version=extension_version) if extension_name: self.data['command_extension_name'] = extension_name self.cli_ctx.logging.log_cmd_metadata_extension_info(extension_name, extension_version) @@ -740,7 +736,7 @@ def _should_show_cached_help(self, args): self._is_top_level_help_request(args) and not self.cli_ctx.data.get('completer_active')) - def _try_show_cached_help(self, command_preserve_casing, args): + def _try_show_cached_help(self, args): """Try to show cached help for top-level help request. Returns CommandResultItem if cached help was shown, None otherwise. @@ -752,7 +748,7 @@ def _try_show_cached_help(self, command_preserve_casing, args): if help_index: # Display cached help using the help system self.help.show_cached_help(help_index, args) - telemetry.set_command_details('az', command_preserve_casing=command_preserve_casing, parameters=['--help']) + telemetry.set_command_details('az', parameters=['--help']) telemetry.set_success(summary='show help') return CommandResultItem(None, exit_code=0) diff --git a/src/azure-cli-core/azure/cli/core/extension/dynamic_install.py b/src/azure-cli-core/azure/cli/core/extension/dynamic_install.py index 1f5c9e41a31..fc35e5d197c 100644 --- a/src/azure-cli-core/azure/cli/core/extension/dynamic_install.py +++ b/src/azure-cli-core/azure/cli/core/extension/dynamic_install.py @@ -194,7 +194,6 @@ def _check_value_in_extensions(cli_ctx, parser, args, no_prompt): # pylint: dis # extension is already installed and return if yes as the error is not caused by extension not installed. from azure.cli.core.extension import get_extension, ExtensionNotInstalledException from azure.cli.core.extension._resolve import resolve_from_index, NoExtensionCandidatesError - from azure.cli.core.util import roughly_parse_command_with_casing extension_allow_preview = _get_extension_allow_preview_install_config(cli_ctx) try: ext = get_extension(ext_name) @@ -209,8 +208,7 @@ def _check_value_in_extensions(cli_ctx, parser, args, no_prompt): # pylint: dis telemetry.set_command_details(command_str, parameters=AzCliCommandInvoker._extract_parameter_names(args), # pylint: disable=protected-access - extension_name=ext_name, - command_preserve_casing=roughly_parse_command_with_casing(args)) + extension_name=ext_name) run_after_extension_installed = _get_extension_run_after_dynamic_install_config(cli_ctx) prompt_info = "" if no_prompt: diff --git a/src/azure-cli-core/azure/cli/core/telemetry.py b/src/azure-cli-core/azure/cli/core/telemetry.py index 714bd751263..21d79ecef36 100644 --- a/src/azure-cli-core/azure/cli/core/telemetry.py +++ b/src/azure-cli-core/azure/cli/core/telemetry.py @@ -51,7 +51,6 @@ def __init__(self, correlation_id=None, application=None): self.feedback = None self.extension_management_detail = None self.raw_command = None - self.command_preserve_casing = None self.is_cmd_idx_rebuild_triggered = False self.show_survey_message = False self.region_input = None @@ -209,8 +208,6 @@ def _get_azure_cli_properties(self): set_custom_properties(result, 'InvokeTimeElapsed', str(self.invoke_time_elapsed)) set_custom_properties(result, 'OutputType', self.output_type) set_custom_properties(result, 'RawCommand', self.raw_command) - set_custom_properties(result, 'CommandPreserveCasing', - self.command_preserve_casing or '') set_custom_properties(result, 'IsCmdIdxRebuildTriggered', str(self.is_cmd_idx_rebuild_triggered)) set_custom_properties(result, 'Params', ','.join(self.parameters or [])) set_custom_properties(result, 'PythonVersion', platform.python_version()) @@ -448,13 +445,12 @@ def set_command_index_rebuild_triggered(is_cmd_idx_rebuild_triggered=False): @decorators.suppress_all_exceptions() def set_command_details(command, output_type=None, parameters=None, extension_name=None, - extension_version=None, command_preserve_casing=None): + extension_version=None): _session.command = command _session.output_type = output_type _session.parameters = parameters _session.extension_name = extension_name _session.extension_version = extension_version - _session.command_preserve_casing = command_preserve_casing @decorators.suppress_all_exceptions() diff --git a/src/azure-cli-core/azure/cli/core/tests/test_command_registration.py b/src/azure-cli-core/azure/cli/core/tests/test_command_registration.py index dbab0f61eb4..0e3ec03b70a 100644 --- a/src/azure-cli-core/azure/cli/core/tests/test_command_registration.py +++ b/src/azure-cli-core/azure/cli/core/tests/test_command_registration.py @@ -371,6 +371,11 @@ def update_and_check_index(): cmd_tbl = loader.load_command_table(["hello", "mod-only"]) self.assertEqual(['hello mod-only', 'hello overridden', 'hello ext-only'], list(cmd_tbl.keys())) + # Test mixed-case top-level command still uses command index + _set_index(self.expected_command_index) + cmd_tbl = loader.load_command_table(["HELLO", "mod-only"]) + self.assertEqual(['hello mod-only', 'hello overridden', 'hello ext-only'], list(cmd_tbl.keys())) + # Full scenario test 1: Installing an extension 'azext_hello1' that extends 'hello' group outdated_command_index = {'hello': ['azure.cli.command_modules.hello'], 'extra': ['azure.cli.command_modules.extra']} diff --git a/src/azure-cli-core/azure/cli/core/tests/test_telemetry.py b/src/azure-cli-core/azure/cli/core/tests/test_telemetry.py index 0077f2eeb28..4ff667b84c0 100644 --- a/src/azure-cli-core/azure/cli/core/tests/test_telemetry.py +++ b/src/azure-cli-core/azure/cli/core/tests/test_telemetry.py @@ -80,41 +80,3 @@ def test_show_version_sets_telemetry_params(self, mock_get_version): self.assertEqual(session.parameters, ["--version"]) self.assertIsNone(session.raw_command) - @mock.patch('azure.cli.core.util.get_az_version_string') - def test_command_preserve_casing_telemetry(self, mock_get_version): - """Test telemetry captures command preserve casing during actual command invocation.""" - from azure.cli.core import telemetry - from azure.cli.core.mock import DummyCli - from knack.completion import ARGCOMPLETE_ENV_NAME - - mock_get_version.return_value = ("azure-cli 2.80.0", ["core", "extension1"]) - - test_cases = [ - (["version"], "version"), - (["VERSION"], "VERSION"), - (["vm", "list"], "vm list"), - (["Vm", "List"], "Vm List"), - ] - - for command_args, expected_casing in test_cases: - with self.subTest(command_args=command_args): - cli = DummyCli() - telemetry.set_application(cli, ARGCOMPLETE_ENV_NAME) - telemetry.start() - - try: - cli.invoke(command_args) - except SystemExit: - pass - except Exception: - pass - - # Verify the telemetry session preserves casing - session = telemetry._session - self.assertEqual(session.command_preserve_casing, expected_casing) - - azure_cli_props = session._get_azure_cli_properties() - - self.assertIn('Context.Default.AzureCLI.CommandPreserveCasing', azure_cli_props) - self.assertEqual(azure_cli_props['Context.Default.AzureCLI.CommandPreserveCasing'], - expected_casing) diff --git a/src/azure-cli-core/azure/cli/core/tests/test_util.py b/src/azure-cli-core/azure/cli/core/tests/test_util.py index a21a1371cf8..37c5e21c91f 100644 --- a/src/azure-cli-core/azure/cli/core/tests/test_util.py +++ b/src/azure-cli-core/azure/cli/core/tests/test_util.py @@ -17,7 +17,7 @@ (get_file_json, truncate_text, shell_safe_json_parse, b64_to_hex, hash_string, random_string, open_page_in_browser, can_launch_browser, handle_exception, ConfiguredDefaultSetter, send_raw_request, should_disable_connection_verify, parse_proxy_resource_id, get_az_user_agent, get_az_rest_user_agent, - _get_parent_proc_name, is_wsl, run_cmd, run_az_cmd, roughly_parse_command, roughly_parse_command_with_casing) + _get_parent_proc_name, is_wsl, run_cmd, run_az_cmd, roughly_parse_command) from azure.cli.core.mock import DummyCli @@ -638,57 +638,5 @@ def test_roughly_parse_command(self): self.assertEqual(roughly_parse_command(['--help']), '') # Starts with flag self.assertEqual(roughly_parse_command(['-h']), '') # Starts with short flag - def test_roughly_parse_command_with_casing(self): - """Test roughly_parse_command_with_casing function that preserves original casing""" - # Basic command parsing with case preservation - self.assertEqual(roughly_parse_command_with_casing(['az', 'vm', 'create']), 'az vm create') - self.assertEqual(roughly_parse_command_with_casing(['account', 'show']), 'account show') - self.assertEqual(roughly_parse_command_with_casing(['network', 'vnet', 'list']), 'network vnet list') - - # Test case preservation - should keep original casing - self.assertEqual(roughly_parse_command_with_casing(['az', 'VM', 'CREATE']), 'az VM CREATE') - self.assertEqual(roughly_parse_command_with_casing(['Account', 'Show']), 'Account Show') - self.assertEqual(roughly_parse_command_with_casing(['Az', 'Network', 'Vnet', 'List']), 'Az Network Vnet List') - - # Test with flags - should stop at first flag and not include sensitive flag values - self.assertEqual(roughly_parse_command_with_casing(['az', 'vm', 'create', '--name', 'secretVM']), 'az vm create') - self.assertEqual(roughly_parse_command_with_casing(['az', 'VM', 'Create', '--name', 'superSecretVM']), 'az VM Create') - self.assertEqual(roughly_parse_command_with_casing(['az', 'storage', 'account', 'create', '--name', 'mystorageaccount']), 'az storage account create') - self.assertEqual(roughly_parse_command_with_casing(['az', 'keyvault', 'create', '--resource-group', 'myRG', '--name', 'myVault']), 'az keyvault create') - - # Test with short flags - self.assertEqual(roughly_parse_command_with_casing(['az', 'VM', 'list', '-g', 'myResourceGroup']), 'az VM list') - self.assertEqual(roughly_parse_command_with_casing(['az', 'Group', 'create', '-n', 'myGroup', '-l', 'eastus']), 'az Group create') - - # Test mixed case scenarios that might reveal user typing patterns - self.assertEqual(roughly_parse_command_with_casing(['Az', 'Vm', 'Create']), 'Az Vm Create') - self.assertEqual(roughly_parse_command_with_casing(['AZ', 'STORAGE', 'BLOB', 'LIST']), 'AZ STORAGE BLOB LIST') - - # Edge cases - self.assertEqual(roughly_parse_command_with_casing([]), '') - self.assertEqual(roughly_parse_command_with_casing(['az']), 'az') - self.assertEqual(roughly_parse_command_with_casing(['Az']), 'Az') - self.assertEqual(roughly_parse_command_with_casing(['--help']), '') # Starts with flag - self.assertEqual(roughly_parse_command_with_casing(['-h']), '') # Starts with short flag - - # Security test - ensure no sensitive information leaks after flags - test_cases_with_secrets = [ - (['az', 'vm', 'create', '--admin-password', 'SuperSecret123!'], 'az vm create'), - (['az', 'sql', 'server', 'create', '--admin-user', 'admin', '--admin-password', 'VerySecret!'], 'az sql server create'), - (['az', 'storage', 'account', 'create', '--name', 'storageacct', '--access-tier', 'Hot'], 'az storage account create'), - (['Az', 'KeyVault', 'Secret', 'Set', '--vault-name', 'myVault', '--name', 'secretName', '--value', 'topSecret'], 'Az KeyVault Secret Set') - ] - - for args, expected in test_cases_with_secrets: - with self.subTest(args=args): - result = roughly_parse_command_with_casing(args) - self.assertEqual(result, expected) - # Ensure no sensitive values made it through - self.assertNotIn('SuperSecret123!', result) - self.assertNotIn('VerySecret!', result) - self.assertNotIn('topSecret', result) - self.assertNotIn('storageacct', result) # Even non-secret values after flags should not appear - - if __name__ == '__main__': unittest.main() diff --git a/src/azure-cli-core/azure/cli/core/util.py b/src/azure-cli-core/azure/cli/core/util.py index c703a56cb37..ce64858bd09 100644 --- a/src/azure-cli-core/azure/cli/core/util.py +++ b/src/azure-cli-core/azure/cli/core/util.py @@ -1317,19 +1317,6 @@ def roughly_parse_command(args): return ' '.join(nouns).lower() -def roughly_parse_command_with_casing(args): - # Roughly parse the command part: --name vm1 - # Similar to knack.invocation.CommandInvoker._rudimentary_get_command, but preserves original casing - # and we don't need to bother with positional args - nouns = [] - for arg in args: - if arg and arg[0] != '-': - nouns.append(arg) - else: - break - return ' '.join(nouns) - - def is_guid(guid): import uuid try: