Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/azure-cli-core/azure/cli/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,8 @@ def get(self, args):
return None

# Get the top-level command, like `network` in `network vnet create -h`
top_command = args[0]
# Normalize top-level command for index lookup so mixed-case commands hit key
top_command = args[0].lower()
index = self.INDEX[self._COMMAND_INDEX]
# Check the command index for (command: [module]) mapping, like
# "network": ["azure.cli.command_modules.natgateway", "azure.cli.command_modules.network", "azext_firewall"]
Expand Down
14 changes: 5 additions & 9 deletions src/azure-cli-core/azure/cli/core/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,14 +512,11 @@ def execute(self, args):
EVENT_INVOKER_FILTER_RESULT)
from azure.cli.core.commands.events import (
EVENT_INVOKER_PRE_CMD_TBL_TRUNCATE, EVENT_INVOKER_PRE_LOAD_ARGUMENTS, EVENT_INVOKER_POST_LOAD_ARGUMENTS)
from azure.cli.core.util import roughly_parse_command_with_casing

# TODO: Can't simply be invoked as an event because args are transformed
command_preserve_casing = roughly_parse_command_with_casing(args)
args = _pre_command_table_create(self.cli_ctx, args)

if self._should_show_cached_help(args):
result = self._try_show_cached_help(command_preserve_casing, args)
result = self._try_show_cached_help(args)
if result:
return result

Expand Down Expand Up @@ -593,7 +590,7 @@ def execute(self, args):
logger.debug("Failed to cache help data: %s", ex)

# TODO: No event in base with which to target
telemetry.set_command_details('az', command_preserve_casing=command_preserve_casing)
telemetry.set_command_details('az')
telemetry.set_success(summary='welcome')
return CommandResultItem(None, exit_code=0)

Expand Down Expand Up @@ -648,8 +645,7 @@ def execute(self, args):
pass
telemetry.set_command_details(self.cli_ctx.data['command'], self.data['output'],
self.cli_ctx.data['safe_params'],
extension_name=extension_name, extension_version=extension_version,
command_preserve_casing=command_preserve_casing)
extension_name=extension_name, extension_version=extension_version)
if extension_name:
self.data['command_extension_name'] = extension_name
self.cli_ctx.logging.log_cmd_metadata_extension_info(extension_name, extension_version)
Expand Down Expand Up @@ -740,7 +736,7 @@ def _should_show_cached_help(self, args):
self._is_top_level_help_request(args) and
not self.cli_ctx.data.get('completer_active'))

def _try_show_cached_help(self, command_preserve_casing, args):
def _try_show_cached_help(self, args):
"""Try to show cached help for top-level help request.

Returns CommandResultItem if cached help was shown, None otherwise.
Expand All @@ -752,7 +748,7 @@ def _try_show_cached_help(self, command_preserve_casing, args):
if help_index:
# Display cached help using the help system
self.help.show_cached_help(help_index, args)
telemetry.set_command_details('az', command_preserve_casing=command_preserve_casing, parameters=['--help'])
telemetry.set_command_details('az', parameters=['--help'])
telemetry.set_success(summary='show help')
return CommandResultItem(None, exit_code=0)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ def _check_value_in_extensions(cli_ctx, parser, args, no_prompt): # pylint: dis
# extension is already installed and return if yes as the error is not caused by extension not installed.
from azure.cli.core.extension import get_extension, ExtensionNotInstalledException
from azure.cli.core.extension._resolve import resolve_from_index, NoExtensionCandidatesError
from azure.cli.core.util import roughly_parse_command_with_casing
extension_allow_preview = _get_extension_allow_preview_install_config(cli_ctx)
try:
ext = get_extension(ext_name)
Expand All @@ -209,8 +208,7 @@ def _check_value_in_extensions(cli_ctx, parser, args, no_prompt): # pylint: dis

telemetry.set_command_details(command_str,
parameters=AzCliCommandInvoker._extract_parameter_names(args), # pylint: disable=protected-access
extension_name=ext_name,
command_preserve_casing=roughly_parse_command_with_casing(args))
extension_name=ext_name)
run_after_extension_installed = _get_extension_run_after_dynamic_install_config(cli_ctx)
prompt_info = ""
if no_prompt:
Expand Down
6 changes: 1 addition & 5 deletions src/azure-cli-core/azure/cli/core/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ def __init__(self, correlation_id=None, application=None):
self.feedback = None
self.extension_management_detail = None
self.raw_command = None
self.command_preserve_casing = None
self.is_cmd_idx_rebuild_triggered = False
self.show_survey_message = False
self.region_input = None
Expand Down Expand Up @@ -209,8 +208,6 @@ def _get_azure_cli_properties(self):
set_custom_properties(result, 'InvokeTimeElapsed', str(self.invoke_time_elapsed))
set_custom_properties(result, 'OutputType', self.output_type)
set_custom_properties(result, 'RawCommand', self.raw_command)
set_custom_properties(result, 'CommandPreserveCasing',
self.command_preserve_casing or '')
set_custom_properties(result, 'IsCmdIdxRebuildTriggered', str(self.is_cmd_idx_rebuild_triggered))
set_custom_properties(result, 'Params', ','.join(self.parameters or []))
set_custom_properties(result, 'PythonVersion', platform.python_version())
Expand Down Expand Up @@ -448,13 +445,12 @@ def set_command_index_rebuild_triggered(is_cmd_idx_rebuild_triggered=False):

@decorators.suppress_all_exceptions()
def set_command_details(command, output_type=None, parameters=None, extension_name=None,
extension_version=None, command_preserve_casing=None):
extension_version=None):
_session.command = command
_session.output_type = output_type
_session.parameters = parameters
_session.extension_name = extension_name
_session.extension_version = extension_version
_session.command_preserve_casing = command_preserve_casing


@decorators.suppress_all_exceptions()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,11 @@ def update_and_check_index():
cmd_tbl = loader.load_command_table(["hello", "mod-only"])
self.assertEqual(['hello mod-only', 'hello overridden', 'hello ext-only'], list(cmd_tbl.keys()))

# Test mixed-case top-level command still uses command index
_set_index(self.expected_command_index)
cmd_tbl = loader.load_command_table(["HELLO", "mod-only"])
self.assertEqual(['hello mod-only', 'hello overridden', 'hello ext-only'], list(cmd_tbl.keys()))

# Full scenario test 1: Installing an extension 'azext_hello1' that extends 'hello' group
outdated_command_index = {'hello': ['azure.cli.command_modules.hello'],
'extra': ['azure.cli.command_modules.extra']}
Expand Down
38 changes: 0 additions & 38 deletions src/azure-cli-core/azure/cli/core/tests/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,41 +80,3 @@ def test_show_version_sets_telemetry_params(self, mock_get_version):
self.assertEqual(session.parameters, ["--version"])
self.assertIsNone(session.raw_command)

@mock.patch('azure.cli.core.util.get_az_version_string')
def test_command_preserve_casing_telemetry(self, mock_get_version):
"""Test telemetry captures command preserve casing during actual command invocation."""
from azure.cli.core import telemetry
from azure.cli.core.mock import DummyCli
from knack.completion import ARGCOMPLETE_ENV_NAME

mock_get_version.return_value = ("azure-cli 2.80.0", ["core", "extension1"])

test_cases = [
(["version"], "version"),
(["VERSION"], "VERSION"),
(["vm", "list"], "vm list"),
(["Vm", "List"], "Vm List"),
]

for command_args, expected_casing in test_cases:
with self.subTest(command_args=command_args):
cli = DummyCli()
telemetry.set_application(cli, ARGCOMPLETE_ENV_NAME)
telemetry.start()

try:
cli.invoke(command_args)
except SystemExit:
pass
except Exception:
pass

# Verify the telemetry session preserves casing
session = telemetry._session
self.assertEqual(session.command_preserve_casing, expected_casing)

azure_cli_props = session._get_azure_cli_properties()

self.assertIn('Context.Default.AzureCLI.CommandPreserveCasing', azure_cli_props)
self.assertEqual(azure_cli_props['Context.Default.AzureCLI.CommandPreserveCasing'],
expected_casing)
54 changes: 1 addition & 53 deletions src/azure-cli-core/azure/cli/core/tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
(get_file_json, truncate_text, shell_safe_json_parse, b64_to_hex, hash_string, random_string,
open_page_in_browser, can_launch_browser, handle_exception, ConfiguredDefaultSetter, send_raw_request,
should_disable_connection_verify, parse_proxy_resource_id, get_az_user_agent, get_az_rest_user_agent,
_get_parent_proc_name, is_wsl, run_cmd, run_az_cmd, roughly_parse_command, roughly_parse_command_with_casing)
_get_parent_proc_name, is_wsl, run_cmd, run_az_cmd, roughly_parse_command)
from azure.cli.core.mock import DummyCli


Expand Down Expand Up @@ -638,57 +638,5 @@ def test_roughly_parse_command(self):
self.assertEqual(roughly_parse_command(['--help']), '') # Starts with flag
self.assertEqual(roughly_parse_command(['-h']), '') # Starts with short flag

def test_roughly_parse_command_with_casing(self):
"""Test roughly_parse_command_with_casing function that preserves original casing"""
# Basic command parsing with case preservation
self.assertEqual(roughly_parse_command_with_casing(['az', 'vm', 'create']), 'az vm create')
self.assertEqual(roughly_parse_command_with_casing(['account', 'show']), 'account show')
self.assertEqual(roughly_parse_command_with_casing(['network', 'vnet', 'list']), 'network vnet list')

# Test case preservation - should keep original casing
self.assertEqual(roughly_parse_command_with_casing(['az', 'VM', 'CREATE']), 'az VM CREATE')
self.assertEqual(roughly_parse_command_with_casing(['Account', 'Show']), 'Account Show')
self.assertEqual(roughly_parse_command_with_casing(['Az', 'Network', 'Vnet', 'List']), 'Az Network Vnet List')

# Test with flags - should stop at first flag and not include sensitive flag values
self.assertEqual(roughly_parse_command_with_casing(['az', 'vm', 'create', '--name', 'secretVM']), 'az vm create')
self.assertEqual(roughly_parse_command_with_casing(['az', 'VM', 'Create', '--name', 'superSecretVM']), 'az VM Create')
self.assertEqual(roughly_parse_command_with_casing(['az', 'storage', 'account', 'create', '--name', 'mystorageaccount']), 'az storage account create')
self.assertEqual(roughly_parse_command_with_casing(['az', 'keyvault', 'create', '--resource-group', 'myRG', '--name', 'myVault']), 'az keyvault create')

# Test with short flags
self.assertEqual(roughly_parse_command_with_casing(['az', 'VM', 'list', '-g', 'myResourceGroup']), 'az VM list')
self.assertEqual(roughly_parse_command_with_casing(['az', 'Group', 'create', '-n', 'myGroup', '-l', 'eastus']), 'az Group create')

# Test mixed case scenarios that might reveal user typing patterns
self.assertEqual(roughly_parse_command_with_casing(['Az', 'Vm', 'Create']), 'Az Vm Create')
self.assertEqual(roughly_parse_command_with_casing(['AZ', 'STORAGE', 'BLOB', 'LIST']), 'AZ STORAGE BLOB LIST')

# Edge cases
self.assertEqual(roughly_parse_command_with_casing([]), '')
self.assertEqual(roughly_parse_command_with_casing(['az']), 'az')
self.assertEqual(roughly_parse_command_with_casing(['Az']), 'Az')
self.assertEqual(roughly_parse_command_with_casing(['--help']), '') # Starts with flag
self.assertEqual(roughly_parse_command_with_casing(['-h']), '') # Starts with short flag

# Security test - ensure no sensitive information leaks after flags
test_cases_with_secrets = [
(['az', 'vm', 'create', '--admin-password', 'SuperSecret123!'], 'az vm create'),
(['az', 'sql', 'server', 'create', '--admin-user', 'admin', '--admin-password', 'VerySecret!'], 'az sql server create'),
(['az', 'storage', 'account', 'create', '--name', 'storageacct', '--access-tier', 'Hot'], 'az storage account create'),
(['Az', 'KeyVault', 'Secret', 'Set', '--vault-name', 'myVault', '--name', 'secretName', '--value', 'topSecret'], 'Az KeyVault Secret Set')
]

for args, expected in test_cases_with_secrets:
with self.subTest(args=args):
result = roughly_parse_command_with_casing(args)
self.assertEqual(result, expected)
# Ensure no sensitive values made it through
self.assertNotIn('SuperSecret123!', result)
self.assertNotIn('VerySecret!', result)
self.assertNotIn('topSecret', result)
self.assertNotIn('storageacct', result) # Even non-secret values after flags should not appear


if __name__ == '__main__':
unittest.main()
13 changes: 0 additions & 13 deletions src/azure-cli-core/azure/cli/core/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -1317,19 +1317,6 @@ def roughly_parse_command(args):
return ' '.join(nouns).lower()


def roughly_parse_command_with_casing(args):
# Roughly parse the command part: <az VM create> --name vm1
# Similar to knack.invocation.CommandInvoker._rudimentary_get_command, but preserves original casing
# and we don't need to bother with positional args
nouns = []
for arg in args:
if arg and arg[0] != '-':
nouns.append(arg)
else:
break
return ' '.join(nouns)


def is_guid(guid):
import uuid
try:
Expand Down