From 604151fe9e5a9d353c1fabf2d45a41d147eef069 Mon Sep 17 00:00:00 2001 From: Nasif Bin Zafar Date: Mon, 9 Feb 2026 12:59:41 +0600 Subject: [PATCH 1/4] Convert go to link Action to async --- .../Web/Playwright/BuiltInFunctions.py | 44 ++++++++++--------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/Framework/Built_In_Automation/Web/Playwright/BuiltInFunctions.py b/Framework/Built_In_Automation/Web/Playwright/BuiltInFunctions.py index b13120e3..09be50dc 100644 --- a/Framework/Built_In_Automation/Web/Playwright/BuiltInFunctions.py +++ b/Framework/Built_In_Automation/Web/Playwright/BuiltInFunctions.py @@ -27,8 +27,8 @@ import re from pathlib import Path -from playwright.sync_api import ( - sync_playwright, +from playwright.async_api import ( + async_playwright, Page, Browser, BrowserContext, @@ -75,7 +75,7 @@ ######################### @logger -def Open_Browser(step_data): +async def Open_Browser(step_data): """ Launch a new browser instance with Playwright. @@ -104,7 +104,7 @@ def Open_Browser(step_data): # Parse parameters url = None browser_name = "chromium" - headless = True + headless = False viewport = default_viewport.copy() args = [] timeout = default_timeout @@ -168,7 +168,7 @@ def Open_Browser(step_data): # Launch Playwright CommonUtil.ExecLog(sModuleInfo, f"Launching Playwright with {browser_name} browser", 1) - playwright_instance = sync_playwright().start() + playwright_instance = await async_playwright().start() # Browser launch options launch_options = { @@ -183,20 +183,20 @@ def Open_Browser(step_data): # Select and launch browser if browser_name in ("chrome", "chromium"): - browser = playwright_instance.chromium.launch(**launch_options) + browser = await playwright_instance.chromium.launch(**launch_options) elif browser_name == "firefox": - browser = playwright_instance.firefox.launch(**launch_options) + browser = await playwright_instance.firefox.launch(**launch_options) elif browser_name in ("webkit", "safari"): - browser = playwright_instance.webkit.launch(**launch_options) + browser = await playwright_instance.webkit.launch(**launch_options) elif browser_name in ("edge", "msedge", "microsoft edge"): launch_options["channel"] = "msedge" - browser = playwright_instance.chromium.launch(**launch_options) + browser = await playwright_instance.chromium.launch(**launch_options) elif browser_name == "chrome-beta": launch_options["channel"] = "chrome-beta" - browser = playwright_instance.chromium.launch(**launch_options) + browser = await playwright_instance.chromium.launch(**launch_options) else: CommonUtil.ExecLog(sModuleInfo, f"Unknown browser '{browser_name}', using chromium", 2) - browser = playwright_instance.chromium.launch(**launch_options) + browser = await playwright_instance.chromium.launch(**launch_options) # Context options context_options = {"viewport": viewport} @@ -214,9 +214,9 @@ def Open_Browser(step_data): context_options["color_scheme"] = color_scheme # Create context and page - context = browser.new_context(**context_options) + context = await browser.new_context(**context_options) context.set_default_timeout(timeout) - current_page = context.new_page() + current_page = await context.new_page() current_page_id = page_id # Store in details @@ -229,7 +229,7 @@ def Open_Browser(step_data): # Navigate if URL provided if url: - current_page.goto(url, wait_until="domcontentloaded") + await current_page.goto(url, wait_until="domcontentloaded") CommonUtil.ExecLog(sModuleInfo, f"Navigated to: {url}", 1) # Save to shared variables for compatibility @@ -246,7 +246,7 @@ def Open_Browser(step_data): @logger -def Go_To_Link(step_data): +async def Go_To_Link(step_data): """ Navigate to a URL. @@ -263,8 +263,11 @@ def Go_To_Link(step_data): try: if current_page is None: - CommonUtil.ExecLog(sModuleInfo, "No browser open. Use 'open browser' first.", 3) - return "zeuz_failed" + CommonUtil.ExecLog(sModuleInfo, "No browser open. Opening browser with default settings.", 2) + result = await Open_Browser(step_data) + if result == "zeuz_failed": + CommonUtil.ExecLog(sModuleInfo, "Failed to open browser automatically", 3) + return "zeuz_failed" url = None wait_until = "domcontentloaded" @@ -275,11 +278,10 @@ def Go_To_Link(step_data): mid_l = mid.strip().lower() right_v = right.strip() - if mid_l == "input parameter": - if left_l in ("go to link", "url", "link"): + if left_l in ("go to link", "url", "link"): url = right_v elif mid_l == "optional parameter": - if left_l in ("wait until", "wait_until", "waituntil"): + if left_l in ("wait until", "wait_until", "waituntil", "wait time"): wait_until = right_v.lower() elif left_l == "timeout": timeout = int(float(right_v) * 1000) @@ -292,7 +294,7 @@ def Go_To_Link(step_data): if timeout: goto_options["timeout"] = timeout - current_page.goto(url, **goto_options) + await current_page.goto(url, **goto_options) CommonUtil.ExecLog(sModuleInfo, f"Navigated to: {url}", 1) return "passed" From adbbb534bcf33d26292a87507ba0b6b0fda801e7 Mon Sep 17 00:00:00 2001 From: Nasif Bin Zafar Date: Mon, 9 Feb 2026 13:00:29 +0600 Subject: [PATCH 2/4] Convert action handlers and `MainDriver` to async --- Drivers/Built_In_Driver.py | 4 +- .../Sequential_Actions/sequential_actions.py | 48 ++++++++++--------- Framework/MainDriverApi.py | 37 ++++++++------ node_cli.py | 2 +- 4 files changed, 51 insertions(+), 40 deletions(-) diff --git a/Drivers/Built_In_Driver.py b/Drivers/Built_In_Driver.py index dd2e894c..dde1b15f 100755 --- a/Drivers/Built_In_Driver.py +++ b/Drivers/Built_In_Driver.py @@ -9,14 +9,14 @@ from Framework.Built_In_Automation.Sequential_Actions import sequential_actions as sa -def sequential_actions( +async def sequential_actions( step_data, test_action_info, temp_q, debug_actions=None, ): try: - sTestStepReturnStatus = sa.Sequential_Actions( + sTestStepReturnStatus = await sa.Sequential_Actions( step_data, test_action_info, debug_actions, diff --git a/Framework/Built_In_Automation/Sequential_Actions/sequential_actions.py b/Framework/Built_In_Automation/Sequential_Actions/sequential_actions.py index 4ff0044c..e590da74 100755 --- a/Framework/Built_In_Automation/Sequential_Actions/sequential_actions.py +++ b/Framework/Built_In_Automation/Sequential_Actions/sequential_actions.py @@ -279,7 +279,7 @@ def if_else_log_for_actions(left, next_level_step_data, statement="if"): return left + ".... condition matched\n" + "Running actions: " + log_actions -def If_else_action(step_data, data_set_no): +async def If_else_action(step_data, data_set_no): sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME try: data_set = step_data[data_set_no] @@ -526,7 +526,7 @@ def check_operators(): ) return "zeuz_failed" if data_set_index not in inner_skip: - result, skip = Run_Sequential_Actions( + result, skip = await Run_Sequential_Actions( [data_set_index] ) # Running inner_skip = list(set(inner_skip+skip)) @@ -559,7 +559,7 @@ def sanitize_deprecated_dataset(value): return value -def for_loop_action(step_data, data_set_no): +async def for_loop_action(step_data, data_set_no): sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME try: data_set = step_data[data_set_no] @@ -748,7 +748,7 @@ def for_loop_action(step_data, data_set_no): sr.Set_Shared_Variables(CommonUtil.dont_prettify_on_server[0], step_data, protected=True, pretty=False) sr.test_action_info = CommonUtil.all_action_info[step_index] return "zeuz_failed", outer_skip - result, skip = Run_Sequential_Actions([data_set_index]) + result, skip = await Run_Sequential_Actions([data_set_index]) inner_skip = list(set(inner_skip + skip)) outer_skip = list(set(outer_skip + inner_skip)) @@ -848,7 +848,7 @@ def for_loop_action(step_data, data_set_no): return CommonUtil.Exception_Handler(sys.exc_info()), [] -def While_Loop_Action(step_data, data_set_no): +async def While_Loop_Action(step_data, data_set_no): sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME try: data_set = step_data[data_set_no] @@ -947,7 +947,7 @@ def While_Loop_Action(step_data, data_set_no): 3 ) return "zeuz_failed", outer_skip - result, skip = Run_Sequential_Actions( + result, skip = await Run_Sequential_Actions( [data_set_index] ) # new edit: full step data is passed. [step_data[data_set_index]]) # Recursively call this function until all called data sets are complete @@ -1049,7 +1049,7 @@ def ticker_linear_shape(seconds, callable, *args, **kwargs): seconds -= 1 -def Sequential_Actions( +async def Sequential_Actions( step_data, test_action_info, debug_actions=None, @@ -1068,13 +1068,13 @@ def Sequential_Actions( # sr.Set_Shared_Variables("test_action_info", test_action_info, protected=True, print_variable=False) sr.test_action_info = test_action_info - result, skip_for_loop = Run_Sequential_Actions([], debug_actions) + result, skip_for_loop = await Run_Sequential_Actions([], debug_actions) # empty list means run all, instead of step data we want to send the dataset no's of the step data to run write_browser_logs() return result -def Run_Sequential_Actions( +async def Run_Sequential_Actions( data_set_list=None, debug_actions=None ): # data_set_no will used in recursive conditional action call if data_set_list is None: @@ -1100,7 +1100,7 @@ def Run_Sequential_Actions( data_set_list.append(i) if len(data_set_list) == 0 and CommonUtil.debug_status and not sr.Test_Shared_Variables("selenium_driver") and ConfigModule.get_config_value("Inspector", "ai_plugin").strip().lower() in CommonUtil.affirmative_words: - return Action_Handler([["browser", "selenium action", "browser"]], ["browser", "selenium action", "browser"]), [] + return await Action_Handler([["browser", "selenium action", "browser"]], ["browser", "selenium action", "browser"]), [] for dataset_cnt in data_set_list: # For each data set within step data data_set = step_data[dataset_cnt] # Save data set to variable @@ -1196,7 +1196,7 @@ def Run_Sequential_Actions( # If middle column = action, call action handler, but always return a pass elif "optional action" in action_name: - result = Action_Handler(data_set, row) # Pass data set, and action_name to action handler + result = await Action_Handler(data_set, row) # Pass data set, and action_name to action handler if result == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Optional action failed. Returning pass anyway", 2) result = "passed" @@ -1204,7 +1204,7 @@ def Run_Sequential_Actions( # If middle column = conditional action, evaluate data set elif "conditional action" in action_name or "if else" in action_name: if action_name.lower().strip() == "windows conditional action": - result, to_skip = Conditional_Action_Handler(step_data, dataset_cnt) + result, to_skip = await Conditional_Action_Handler(step_data, dataset_cnt) skip += to_skip skip_for_loop += to_skip if result in failed_tag_list: @@ -1215,7 +1215,7 @@ def Run_Sequential_Actions( elif action_name.lower().strip() != "conditional action" and action_name.lower().strip() != "if else": # old style conditional action - result, to_skip = Conditional_Action_Handler(step_data, dataset_cnt) + result, to_skip = await Conditional_Action_Handler(step_data, dataset_cnt) skip += to_skip skip_for_loop += to_skip if result in failed_tag_list: @@ -1224,7 +1224,7 @@ def Run_Sequential_Actions( break else: - result, to_skip = If_else_action(step_data, dataset_cnt) + result, to_skip = await If_else_action(step_data, dataset_cnt) skip += to_skip skip_for_loop += to_skip if result in failed_tag_list: @@ -1235,7 +1235,7 @@ def Run_Sequential_Actions( # Simulate a while/for loop with the specified data sets elif "loop action" in action_name: if action_name.lower().strip() == "for loop action": - result, skip_for_loop = for_loop_action(step_data, dataset_cnt) + result, skip_for_loop = await for_loop_action(step_data, dataset_cnt) skip = list(set(skip + skip_for_loop)) if result in failed_tag_list: return "zeuz_failed", skip_for_loop @@ -1243,7 +1243,7 @@ def Run_Sequential_Actions( elif action_name.lower().strip() not in ("while loop action", "for loop action"): # old style loop action # CommonUtil.ExecLog(sModuleInfo,"Old style loop action found. This will not be supported in 2020, please replace them with new loop actions",2) - result, skip_for_loop = Loop_Action_Handler(data_set, row, dataset_cnt) + result, skip_for_loop = await Loop_Action_Handler(data_set, row, dataset_cnt) skip = skip_for_loop position_of_loop_action = dataset_cnt @@ -1273,7 +1273,7 @@ def Run_Sequential_Actions( return "zeuz_failed", skip_for_loop elif "loop" in action_name: if "while" in action_name.lower(): - result, skip_for_loop = While_Loop_Action(step_data, dataset_cnt) + result, skip_for_loop = await While_Loop_Action(step_data, dataset_cnt) skip = list(set(skip + skip_for_loop)) if result in failed_tag_list: return "zeuz_failed", skip_for_loop @@ -1343,7 +1343,7 @@ def Run_Sequential_Actions( # If middle column = action, call action handler elif "action" in action_name: # Must be last, since it's a single word that also exists in other action types - result = Action_Handler(data_set, row) # Pass data set, and action_name to action handler + result = await Action_Handler(data_set, row) # Pass data set, and action_name to action handler if row[0].lower().strip() in ("step exit", "testcase exit"): global step_exit_fail_called, step_exit_pass_called CommonUtil.ExecLog(sModuleInfo, f"{row[0].lower().strip()} Exit called. Stopping Test Step.", 1) @@ -1373,12 +1373,12 @@ def Run_Sequential_Actions( continue CommonUtil.ExecLog(sModuleInfo, "Action failed. Trying bypass #%d" % (i + 1), 1) - result = Action_Handler(bypass_data_set[i], bypass_row[i]) + result = await Action_Handler(bypass_data_set[i], bypass_row[i]) if result in failed_tag_list: # This also failed, so chances are first failure was real continue # Try the next bypass, if any else: # Bypass passed, which indicates there was something blocking the element in the first place CommonUtil.ExecLog(sModuleInfo, "Bypass passed. Retrying original action", 1) - result = Action_Handler(data_set, row) # Retry failed original data set + result = await Action_Handler(data_set, row) # Retry failed original data set if result in failed_tag_list: # Still a failure, give up return "zeuz_failed", skip_for_loop break # No need to process more bypasses @@ -1404,7 +1404,7 @@ def Run_Sequential_Actions( return CommonUtil.Exception_Handler(sys.exc_info()) -def Loop_Action_Handler(data, row, dataset_cnt): +async def Loop_Action_Handler(data, row, dataset_cnt): """ Performs a sub-set of the data set in a loop, similar to a for or while loop """ sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME @@ -1903,7 +1903,7 @@ def build_subset(new_step_data): return CommonUtil.Exception_Handler(sys.exc_info()) -def Conditional_Action_Handler(step_data, dataset_cnt): +async def Conditional_Action_Handler(step_data, dataset_cnt): """ Process conditional actions, called only by Sequential_Actions() """ sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME @@ -2296,7 +2296,7 @@ def compare_variable_names(set, dataset): CommonUtil.compare_action_varnames = {"left": "Left", "right": "Right"} -def Action_Handler(_data_set, action_row, _bypass_bug=True): +async def Action_Handler(_data_set, action_row, _bypass_bug=True): """ Finds the appropriate function for the requested action in the step data and executes it """ sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME @@ -2417,6 +2417,8 @@ def Action_Handler(_data_set, action_row, _bypass_bug=True): elif module in CommonUtil.global_sleep and "_all_" in CommonUtil.global_sleep[module]: time.sleep(CommonUtil.global_sleep[module]["_all_"]["pre"]) result = run_function(data_set) # Execute function, providing all rows in the data set + if inspect.iscoroutine(result): + result = await result if post_sleep: time.sleep(post_sleep) elif module in CommonUtil.global_sleep and "_all_" in CommonUtil.global_sleep[module]: diff --git a/Framework/MainDriverApi.py b/Framework/MainDriverApi.py index f7734a5f..9e32bce3 100644 --- a/Framework/MainDriverApi.py +++ b/Framework/MainDriverApi.py @@ -281,7 +281,7 @@ def terminate_thread(thread): # To kill running thread raise SystemError("PyThreadState_SetAsyncExc failed") # call the function of a test step that is in its driver file -def call_driver_function_of_test_step( +async def call_driver_function_of_test_step( sModuleInfo, all_step_info, StepSeq, @@ -312,6 +312,7 @@ def call_driver_function_of_test_step( try: # importing functions from driver functionTocall = getattr(module_name, step_name) + print(functionTocall) except Exception as e: CommonUtil.Exception_Handler( sys.exc_info(), @@ -379,12 +380,20 @@ def call_driver_function_of_test_step( CommonUtil.Exception_Handler(sys.exc_info()) else: # run sequentially - sStepResult = functionTocall( - test_steps_data, - test_action_info, - simple_queue, - debug_actions, - ) + if inspect.iscoroutinefunction(functionTocall): + sStepResult = await functionTocall( + test_steps_data, + test_action_info, + simple_queue, + debug_actions, + ) + else: + sStepResult = functionTocall( + test_steps_data, + test_action_info, + simple_queue, + debug_actions, + ) except: CommonUtil.Exception_Handler(sys.exc_info()) # handle exceptions sStepResult = "zeuz_failed" @@ -411,7 +420,7 @@ def call_driver_function_of_test_step( # runs all test steps of a test case -def run_all_test_steps_in_a_test_case( +async def run_all_test_steps_in_a_test_case( testcase_info, test_case, sModuleInfo, @@ -606,7 +615,7 @@ def run_all_test_steps_in_a_test_case( CommonUtil.ExecLog(sModuleInfo, "STEP-%s is skipped" % StepSeq, 2) sStepResult = "skipped" else: - sStepResult = call_driver_function_of_test_step( + sStepResult = await call_driver_function_of_test_step( sModuleInfo, all_step_info, StepSeq, @@ -936,7 +945,7 @@ def check_test_skip(run_id, tc_num, skip_remaining=True) -> bool: return False -def run_test_case( +async def run_test_case( TestCaseID, sModuleInfo, run_id, @@ -994,7 +1003,7 @@ def run_test_case( if check_test_skip(run_id, tc_num): sTestStepResultList = ['SKIPPED' for i in range(len(testcase_info['steps']))] else: - sTestStepResultList = run_all_test_steps_in_a_test_case( + sTestStepResultList = await run_all_test_steps_in_a_test_case( testcase_info, test_case, sModuleInfo, @@ -1806,7 +1815,7 @@ def download_or_copy(attachment): # main function -def main(device_dict, all_run_id_info): +async def main(device_dict, all_run_id_info): try: # get module info sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME @@ -2093,7 +2102,7 @@ def kill(process): ) except Exception as e: CommonUtil.ExecLog(sModuleInfo, str(e), 3) - run_test_case( + await run_test_case( test_case_no, sModuleInfo, run_id, @@ -2111,7 +2120,7 @@ def kill(process): else: - run_test_case( + await run_test_case( test_case_no, sModuleInfo, run_id, diff --git a/node_cli.py b/node_cli.py index d4e1ce89..e8b13301 100755 --- a/node_cli.py +++ b/node_cli.py @@ -440,7 +440,7 @@ async def response_callback(response: str): # 3. Call MainDriver device_info = All_Device_Info.get_all_connected_device_info() await install_handler.cancel_run() - MainDriverApi.main( + await MainDriverApi.main( device_dict=device_info, all_run_id_info=node_json, ) From 60d97f5c72a60338d7a43eba9b46698c581c979a Mon Sep 17 00:00:00 2001 From: Nasif Bin Zafar Date: Mon, 9 Feb 2026 13:12:41 +0600 Subject: [PATCH 3/4] Make all Playwright web actions compatible with Playwright Async API --- .../Web/Playwright/BuiltInFunctions.py | 172 +++++++++--------- .../Web/Playwright/locator.py | 10 +- 2 files changed, 91 insertions(+), 91 deletions(-) diff --git a/Framework/Built_In_Automation/Web/Playwright/BuiltInFunctions.py b/Framework/Built_In_Automation/Web/Playwright/BuiltInFunctions.py index 09be50dc..17fa91d9 100644 --- a/Framework/Built_In_Automation/Web/Playwright/BuiltInFunctions.py +++ b/Framework/Built_In_Automation/Web/Playwright/BuiltInFunctions.py @@ -423,7 +423,7 @@ def Switch_Browser(step_data): ######################### @logger -def Click_Element(step_data): +async def Click_Element(step_data): """ Click an element. @@ -494,7 +494,7 @@ def Click_Element(step_data): right_click = True # Get element - locator = PlaywrightLocator.Get_Element(step_data, current_page) + locator = await PlaywrightLocator.Get_Element(step_data, current_page) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) return "zeuz_failed" @@ -516,14 +516,14 @@ def Click_Element(step_data): # Perform click if double_click: - locator.dblclick(**{k: v for k, v in click_options.items() if k != "click_count"}) + await locator.dblclick(**{k: v for k, v in click_options.items() if k != "click_count"}) CommonUtil.ExecLog(sModuleInfo, "Double click performed", 1) elif right_click: click_options["button"] = "right" - locator.click(**click_options) + await locator.click(**click_options) CommonUtil.ExecLog(sModuleInfo, "Right click performed", 1) else: - locator.click(**click_options) + await locator.click(**click_options) CommonUtil.ExecLog(sModuleInfo, "Click performed", 1) return "passed" @@ -533,7 +533,7 @@ def Click_Element(step_data): @logger -def Double_Click_Element(step_data): +async def Double_Click_Element(step_data): """ Double-click an element. @@ -550,11 +550,11 @@ def Double_Click_Element(step_data): modified_step_data[i] = ("double click", mid, right) break - return Click_Element(modified_step_data) + return await Click_Element(modified_step_data) @logger -def Right_Click_Element(step_data): +async def Right_Click_Element(step_data): """ Right-click (context click) an element. @@ -569,11 +569,11 @@ def Right_Click_Element(step_data): modified_step_data[i] = ("right click", mid, right) break - return Click_Element(modified_step_data) + return await Click_Element(modified_step_data) @logger -def Hover_Over_Element(step_data): +async def Hover_Over_Element(step_data): """ Hover over an element. @@ -608,7 +608,7 @@ def Hover_Over_Element(step_data): elif left_l == "timeout": timeout = int(float(right_v) * 1000) - locator = PlaywrightLocator.Get_Element(step_data, current_page) + locator = await PlaywrightLocator.Get_Element(step_data, current_page) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) return "zeuz_failed" @@ -621,7 +621,7 @@ def Hover_Over_Element(step_data): if timeout: hover_options["timeout"] = timeout - locator.hover(**hover_options) + await locator.hover(**hover_options) CommonUtil.ExecLog(sModuleInfo, "Hover performed", 1) return "passed" @@ -636,7 +636,7 @@ def Hover_Over_Element(step_data): ######################### @logger -def Enter_Text_In_Text_Box(step_data): +async def Enter_Text_In_Text_Box(step_data): """ Enter text in a text field. @@ -685,7 +685,7 @@ def Enter_Text_In_Text_Box(step_data): elif left_l == "timeout": timeout = int(float(right.strip()) * 1000) - locator = PlaywrightLocator.Get_Element(step_data, current_page) + locator = await PlaywrightLocator.Get_Element(step_data, current_page) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) return "zeuz_failed" @@ -693,17 +693,17 @@ def Enter_Text_In_Text_Box(step_data): # Enter text based on options if use_js: # Use JavaScript to set value directly - locator.evaluate(f"el => {{ el.value = `{text_value}`; }}") + await locator.evaluate(f"el => {{ el.value = `{text_value}`; }}") # Trigger events - locator.dispatch_event("input") - locator.dispatch_event("change") + await locator.dispatch_event("input") + await locator.dispatch_event("change") CommonUtil.ExecLog(sModuleInfo, f"Text entered via JS: {text_value[:50]}{'...' if len(text_value) > 50 else ''}", 1) elif clear: # fill() clears and sets value - recommended approach fill_options = {} if timeout: fill_options["timeout"] = timeout - locator.fill(text_value, **fill_options) + await locator.fill(text_value, **fill_options) CommonUtil.ExecLog(sModuleInfo, f"Text filled: {text_value[:50]}{'...' if len(text_value) > 50 else ''}", 1) else: # type() appends to existing value @@ -712,7 +712,7 @@ def Enter_Text_In_Text_Box(step_data): type_options["delay"] = int(delay * 1000) if timeout: type_options["timeout"] = timeout - locator.type(text_value, **type_options) + await locator.type(text_value, **type_options) CommonUtil.ExecLog(sModuleInfo, f"Text typed: {text_value[:50]}{'...' if len(text_value) > 50 else ''}", 1) return "passed" @@ -722,7 +722,7 @@ def Enter_Text_In_Text_Box(step_data): @logger -def Keystroke_For_Element(step_data): +async def Keystroke_For_Element(step_data): """ Send keystrokes to an element or the page. @@ -829,18 +829,18 @@ def Keystroke_For_Element(step_data): key = key_map.get(key, keystroke_value) if has_element: - locator = PlaywrightLocator.Get_Element(step_data, current_page) + locator = await PlaywrightLocator.Get_Element(step_data, current_page) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) return "zeuz_failed" for _ in range(key_count): - locator.press(key) + await locator.press(key) if delay > 0: time.sleep(delay) else: for _ in range(key_count): - current_page.keyboard.press(key) + await current_page.keyboard.press(key) if delay > 0: time.sleep(delay) @@ -852,13 +852,13 @@ def Keystroke_For_Element(step_data): type_options["delay"] = int(delay * 1000) if has_element: - locator = PlaywrightLocator.Get_Element(step_data, current_page) + locator = await PlaywrightLocator.Get_Element(step_data, current_page) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) return "zeuz_failed" - locator.type(keystroke_value, **type_options) + await locator.type(keystroke_value, **type_options) else: - current_page.keyboard.type(keystroke_value, **type_options) + await current_page.keyboard.type(keystroke_value, **type_options) CommonUtil.ExecLog(sModuleInfo, f"Typed chars: {keystroke_value[:50]}{'...' if len(keystroke_value) > 50 else ''}", 1) @@ -875,7 +875,7 @@ def Keystroke_For_Element(step_data): ######################### @logger -def Validate_Text(step_data): +async def Validate_Text(step_data): """ Validate that an element contains expected text. @@ -927,13 +927,13 @@ def Validate_Text(step_data): if left_l == "timeout": timeout = int(float(right_v) * 1000) - locator = PlaywrightLocator.Get_Element(step_data, current_page) + locator = await PlaywrightLocator.Get_Element(step_data, current_page) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) return "zeuz_failed" # Get actual text - actual_text = locator.text_content() or "" + actual_text = await locator.text_content() or "" # Compare match = False @@ -964,7 +964,7 @@ def Validate_Text(step_data): @logger -def if_element_exists(step_data): +async def if_element_exists(step_data): """ Check if an element exists on the page. @@ -992,14 +992,14 @@ def if_element_exists(step_data): if mid_l == "optional parameter" and left_l == "timeout": timeout = int(float(right_v) * 1000) - locator = PlaywrightLocator.Get_Element(step_data, current_page, element_wait=timeout/1000) + locator = await PlaywrightLocator.Get_Element(step_data, current_page, element_wait=timeout/1000) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element does not exist", 1) return "zeuz_failed" try: - count = locator.count() + count = await locator.count() if count > 0: CommonUtil.ExecLog(sModuleInfo, f"Element exists ({count} found)", 1) return "passed" @@ -1015,7 +1015,7 @@ def if_element_exists(step_data): @logger -def Save_Attribute(step_data): +async def Save_Attribute(step_data): """ Save an element's attribute value to a shared variable. @@ -1064,7 +1064,7 @@ def Save_Attribute(step_data): CommonUtil.ExecLog(sModuleInfo, "No save variable specified", 3) return "zeuz_failed" - locator = PlaywrightLocator.Get_Element(step_data, current_page) + locator = await PlaywrightLocator.Get_Element(step_data, current_page) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) return "zeuz_failed" @@ -1072,27 +1072,27 @@ def Save_Attribute(step_data): # Get attribute value based on name attr_lower = attribute_name.lower() if attr_lower == "text": - value = locator.text_content() + value = await locator.text_content() elif attr_lower == "innertext": - value = locator.inner_text() + value = await locator.inner_text() elif attr_lower == "innerhtml": - value = locator.inner_html() + value = await locator.inner_html() elif attr_lower == "outerhtml": - value = locator.evaluate("el => el.outerHTML") + value = await locator.evaluate("el => el.outerHTML") elif attr_lower == "value": - value = locator.input_value() + value = await locator.input_value() elif attr_lower == "checked": - value = locator.is_checked() + value = await locator.is_checked() elif attr_lower == "selected": - value = locator.evaluate("el => el.selected") + value = await locator.evaluate("el => el.selected") elif attr_lower == "visible": - value = locator.is_visible() + value = await locator.is_visible() elif attr_lower == "enabled": - value = locator.is_enabled() + value = await locator.is_enabled() elif attr_lower == "disabled": - value = locator.is_disabled() + value = await locator.is_disabled() else: - value = locator.get_attribute(attribute_name) + value = await locator.get_attribute(attribute_name) sr.Set_Shared_Variables(save_variable, value) CommonUtil.ExecLog(sModuleInfo, f"Saved '{attribute_name}' = '{value}' to '{save_variable}'", 1) @@ -1103,7 +1103,7 @@ def Save_Attribute(step_data): @logger -def get_element_info(step_data): +async def get_element_info(step_data): """ Get detailed information about an element. @@ -1126,23 +1126,23 @@ def get_element_info(step_data): if mid.strip().lower() == "save parameter": save_variable = left.strip() - locator = PlaywrightLocator.Get_Element(step_data, current_page) + locator = await PlaywrightLocator.Get_Element(step_data, current_page) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) return "zeuz_failed" # Gather element info info = { - "tag_name": locator.evaluate("el => el.tagName"), - "text": locator.text_content(), - "inner_html": locator.inner_html(), - "visible": locator.is_visible(), - "enabled": locator.is_enabled(), - "bounding_box": locator.bounding_box(), + "tag_name": await locator.evaluate("el => el.tagName"), + "text": await locator.text_content(), + "inner_html": await locator.inner_html(), + "visible": await locator.is_visible(), + "enabled": await locator.is_enabled(), + "bounding_box": await locator.bounding_box(), } # Get all attributes - attributes = locator.evaluate("""el => { + attributes = await locator.evaluate("""el => { const attrs = {}; for (const attr of el.attributes) { attrs[attr.name] = attr.value; @@ -1326,7 +1326,7 @@ def Scroll(step_data): @logger -def scroll_to_element(step_data): +async def scroll_to_element(step_data): """ Scroll an element into view. @@ -1358,15 +1358,15 @@ def scroll_to_element(step_data): elif left_l == "align to top": align_to_top = right_v.lower() in ("true", "yes", "1") - locator = PlaywrightLocator.Get_Element(step_data, current_page) + locator = await PlaywrightLocator.Get_Element(step_data, current_page) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) return "zeuz_failed" if use_js: - locator.evaluate(f"el => el.scrollIntoView({str(align_to_top).lower()})") + await locator.evaluate(f"el => el.scrollIntoView({str(align_to_top).lower()})") else: - locator.scroll_into_view_if_needed() + await locator.scroll_into_view_if_needed() CommonUtil.ExecLog(sModuleInfo, "Scrolled element into view", 1) return "passed" @@ -1382,7 +1382,7 @@ def scroll_to_element(step_data): ######################### @logger -def Select_Deselect(step_data): +async def Select_Deselect(step_data): """ Select or deselect options in a dropdown/select element. @@ -1438,7 +1438,7 @@ def Select_Deselect(step_data): CommonUtil.ExecLog(sModuleInfo, "No selection value provided", 3) return "zeuz_failed" - locator = PlaywrightLocator.Get_Element(step_data, current_page) + locator = await PlaywrightLocator.Get_Element(step_data, current_page) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) return "zeuz_failed" @@ -1453,7 +1453,7 @@ def Select_Deselect(step_data): if is_deselect: # Playwright doesn't have direct deselect, use JavaScript - locator.evaluate(f"""el => {{ + await locator.evaluate(f"""el => {{ for (const opt of el.options) {{ if (opt.{'value' if select_type == 'value' else 'text'} === '{select_value}') {{ opt.selected = false; @@ -1463,7 +1463,7 @@ def Select_Deselect(step_data): }}""") CommonUtil.ExecLog(sModuleInfo, f"Deselected: {select_value}", 1) else: - locator.select_option(**option) + await locator.select_option(**option) CommonUtil.ExecLog(sModuleInfo, f"Selected: {select_value}", 1) return "passed" @@ -1479,7 +1479,7 @@ def Select_Deselect(step_data): ######################### @logger -def check_uncheck(step_data): +async def check_uncheck(step_data): """ Check or uncheck a checkbox/radio button. @@ -1516,7 +1516,7 @@ def check_uncheck(step_data): if left_l == "use js": use_js = right_v in ("true", "yes", "1") - locator = PlaywrightLocator.Get_Element(step_data, current_page) + locator = await PlaywrightLocator.Get_Element(step_data, current_page) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) return "zeuz_failed" @@ -1526,10 +1526,10 @@ def check_uncheck(step_data): options["force"] = True if action == "check": - locator.check(**options) + await locator.check(**options) CommonUtil.ExecLog(sModuleInfo, "Checkbox checked", 1) else: - locator.uncheck(**options) + await locator.uncheck(**options) CommonUtil.ExecLog(sModuleInfo, "Checkbox unchecked", 1) return "passed" @@ -1938,7 +1938,7 @@ def handle_dialog(dialog): ######################### @logger -def drag_and_drop(step_data): +async def drag_and_drop(step_data): """ Drag and drop an element to a target. @@ -1971,19 +1971,19 @@ def drag_and_drop(step_data): target_params.append((left, mid, right)) # Get source element - source_locator = PlaywrightLocator.Get_Element(source_params, current_page) + source_locator = await PlaywrightLocator.Get_Element(source_params, current_page) if source_locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Source element not found", 3) return "zeuz_failed" # Get target element - target_locator = PlaywrightLocator.Get_Element(target_params, current_page) + target_locator = await PlaywrightLocator.Get_Element(target_params, current_page) if target_locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Target element not found", 3) return "zeuz_failed" # Perform drag and drop - source_locator.drag_to(target_locator) + await source_locator.drag_to(target_locator) CommonUtil.ExecLog(sModuleInfo, "Drag and drop completed", 1) return "passed" @@ -1998,7 +1998,7 @@ def drag_and_drop(step_data): ######################### @logger -def take_screenshot_playwright(step_data): +async def take_screenshot_playwright(step_data): """ Take a screenshot. @@ -2050,13 +2050,13 @@ def take_screenshot_playwright(step_data): # Take screenshot if has_element: - locator = PlaywrightLocator.Get_Element(step_data, current_page) + locator = await PlaywrightLocator.Get_Element(step_data, current_page) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) return "zeuz_failed" - locator.screenshot(path=screenshot_path) + await locator.screenshot(path=screenshot_path) else: - current_page.screenshot(path=screenshot_path, full_page=full_page) + await current_page.screenshot(path=screenshot_path, full_page=full_page) if save_variable: sr.Set_Shared_Variables(save_variable, screenshot_path) @@ -2075,7 +2075,7 @@ def take_screenshot_playwright(step_data): ######################### @logger -def execute_javascript(step_data): +async def execute_javascript(step_data): """ Execute JavaScript code in the browser. @@ -2120,13 +2120,13 @@ def execute_javascript(step_data): # Execute JS if has_element: - locator = PlaywrightLocator.Get_Element(step_data, current_page) + locator = await PlaywrightLocator.Get_Element(step_data, current_page) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) return "zeuz_failed" - result = locator.evaluate(js_code) + result = await locator.evaluate(js_code) else: - result = current_page.evaluate(js_code) + result = await current_page.evaluate(js_code) if save_variable: sr.Set_Shared_Variables(save_variable, result) @@ -2147,7 +2147,7 @@ def execute_javascript(step_data): ######################### @logger -def upload_file(step_data): +async def upload_file(step_data): """ Upload a file via file input. @@ -2190,12 +2190,12 @@ def upload_file(step_data): CommonUtil.ExecLog(sModuleInfo, f"File not found: {file_path}", 3) return "zeuz_failed" - locator = PlaywrightLocator.Get_Element(step_data, current_page) + locator = await PlaywrightLocator.Get_Element(step_data, current_page) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) return "zeuz_failed" - locator.set_input_files(file_path) + await locator.set_input_files(file_path) CommonUtil.ExecLog(sModuleInfo, f"File uploaded: {file_path}", 1) return "passed" @@ -2274,7 +2274,7 @@ def resize_window(step_data): ######################### @logger -def Wait_For_Element(step_data): +async def Wait_For_Element(step_data): """ Wait for an element to appear/disappear. @@ -2315,7 +2315,7 @@ def Wait_For_Element(step_data): if left_l == "timeout": timeout = int(float(right_v) * 1000) - locator = PlaywrightLocator.Get_Element(step_data, current_page, element_wait=0.1) + locator = await PlaywrightLocator.Get_Element(step_data, current_page, element_wait=0.1) if locator == "zeuz_failed": # For hidden/detached states, element not found is actually success @@ -2499,7 +2499,7 @@ def handle_route(route): ######################### @logger -def Extract_Table_Data(step_data): +async def Extract_Table_Data(step_data): """ Extract data from an HTML table. @@ -2534,13 +2534,13 @@ def Extract_Table_Data(step_data): elif left_l == "column": col_filter = right_v - locator = PlaywrightLocator.Get_Element(step_data, current_page) + locator = await PlaywrightLocator.Get_Element(step_data, current_page) if locator == "zeuz_failed": CommonUtil.ExecLog(sModuleInfo, "Table element not found", 3) return "zeuz_failed" # Extract table data using JavaScript - table_data = locator.evaluate("""table => { + table_data = await locator.evaluate("""table => { const data = []; const rows = table.querySelectorAll('tr'); rows.forEach(row => { diff --git a/Framework/Built_In_Automation/Web/Playwright/locator.py b/Framework/Built_In_Automation/Web/Playwright/locator.py index 5e7420d8..0546330d 100644 --- a/Framework/Built_In_Automation/Web/Playwright/locator.py +++ b/Framework/Built_In_Automation/Web/Playwright/locator.py @@ -25,7 +25,7 @@ MODULE_NAME = inspect.getmodulename(__file__) -def Get_Element(step_data, page, return_all=False, element_wait=None): +async def Get_Element(step_data, page, return_all=False, element_wait=None): """ Get element using Playwright's native Locator API. @@ -119,7 +119,7 @@ def Get_Element(step_data, page, return_all=False, element_wait=None): # Return all elements if requested if return_all: try: - elements = locator.all() + elements = await locator.all() CommonUtil.ExecLog(sModuleInfo, f"Found {len(elements)} elements", 1) return elements except Exception as e: @@ -128,7 +128,7 @@ def Get_Element(step_data, page, return_all=False, element_wait=None): # Check if element exists (with timeout) try: - count = locator.count() + count = await locator.count() if count == 0: CommonUtil.ExecLog(sModuleInfo, "No elements found matching locator", 3) return "zeuz_failed" @@ -471,7 +471,7 @@ def _extract_sr_index(mid_value): return 1 -def wait_for_element(step_data, page, state="visible", timeout=None): +async def wait_for_element(step_data, page, state="visible", timeout=None): """ Wait for element to reach a specific state. @@ -487,7 +487,7 @@ def wait_for_element(step_data, page, state="visible", timeout=None): sModuleInfo = "wait_for_element" try: - locator = Get_Element(step_data, page) + locator = await Get_Element(step_data, page) if locator == "zeuz_failed": return "zeuz_failed" From 8628ca102e64b36ee55c5b55c53ff1e825f905d8 Mon Sep 17 00:00:00 2001 From: Nasif Bin Zafar Date: Mon, 9 Feb 2026 13:23:25 +0600 Subject: [PATCH 4/4] Fix partial text match bug Partial match was not picking up action named "validate partial text" --- .../Built_In_Automation/Web/Playwright/BuiltInFunctions.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Framework/Built_In_Automation/Web/Playwright/BuiltInFunctions.py b/Framework/Built_In_Automation/Web/Playwright/BuiltInFunctions.py index 17fa91d9..4fdc6843 100644 --- a/Framework/Built_In_Automation/Web/Playwright/BuiltInFunctions.py +++ b/Framework/Built_In_Automation/Web/Playwright/BuiltInFunctions.py @@ -921,6 +921,8 @@ async def Validate_Text(step_data): case_insensitive = True elif left_l.startswith("*"): partial_match = True + elif "partial" in left_l: + partial_match = True expected_text = right_v elif mid_l == "optional parameter":