From 929fd313f93670162588819651217bea7bcb92c6 Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Fri, 13 Feb 2026 16:41:23 -0400 Subject: [PATCH 1/2] Fix flaky test_idle_heartbeat and CI workflow YAML syntax The test_idle_heartbeat test could fail with a KeyError when shard-aware reconnection replaced connections during the sleep interval. Skip connections not present in the original snapshot. Also fix a trailing whitespace/dash in integration-tests.yml that caused a YAML syntax issue in the paths-ignore list. --- .github/workflows/integration-tests.yml | 2 +- tests/integration/standard/test_cluster.py | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index 210c2d4e2b..048dbd1352 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -29,7 +29,7 @@ on: - .github/dependabot.yml - .github/pull_request_template.md - "*.md" - - .github/workflows/docs-* - + - .github/workflows/docs-* workflow_dispatch: jobs: diff --git a/tests/integration/standard/test_cluster.py b/tests/integration/standard/test_cluster.py index 1208edb9d2..243d7b06d2 100644 --- a/tests/integration/standard/test_cluster.py +++ b/tests/integration/standard/test_cluster.py @@ -746,8 +746,11 @@ def test_idle_heartbeat(self): connections = [c for holders in cluster.get_connection_holders() for c in holders.get_connections()] - # make sure requests were sent on all connections + # make sure requests were sent on all connections that existed before the sleep + # (shard-aware reconnection may replace connections during the sleep interval) for c in connections: + if id(c) not in connection_request_ids: + continue expected_ids = connection_request_ids[id(c)] expected_ids.rotate(-1) with c.lock: From be5d3f0cd7b3110d5a7c828a1347e601d2ec1cc5 Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Fri, 13 Feb 2026 17:40:58 -0400 Subject: [PATCH 2/2] Fix test_idle_heartbeat: wait for shard connections and fix idle assertion Two issues caused this test to be flaky: 1. wait_for_all_pools only waits for the first connection per host. Shard-aware connections to remaining shards are opened asynchronously. When these complete during the test's sleep interval, they replace existing connections causing KeyError on the request_ids snapshot. Fix: add a helper that polls until all shard connections are established, called after connect(). 2. execute_concurrent sent only len(hosts) queries, but with shard-aware routing each query hits one specific shard, leaving other shards' connections idle. The assertion that ALL connections are non-idle then fails. Fix: send more queries (2x num_connections) and relax assertion to check that at least some non-control connections became non-idle. --- tests/integration/standard/test_cluster.py | 40 +++++++++++++++++----- 1 file changed, 32 insertions(+), 8 deletions(-) diff --git a/tests/integration/standard/test_cluster.py b/tests/integration/standard/test_cluster.py index 243d7b06d2..bf62f5df48 100644 --- a/tests/integration/standard/test_cluster.py +++ b/tests/integration/standard/test_cluster.py @@ -725,12 +725,34 @@ def _warning_are_issued_when_auth(self, auth_provider): assert auth_warning >= 4 assert auth_warning == mock_handler.get_message_count("debug", "Got ReadyMessage on new connection") + def _wait_for_all_shard_connections(self, cluster, timeout=30): + """Wait until all shard-aware connections are fully established.""" + from cassandra.pool import HostConnection + deadline = time.time() + timeout + while time.time() < deadline: + all_connected = True + for holder in cluster.get_connection_holders(): + if not isinstance(holder, HostConnection): + continue + if holder.host.sharding_info and len(holder._connections) < holder.host.sharding_info.shards_count: + all_connected = False + break + if all_connected: + return + time.sleep(0.1) + raise RuntimeError("Timed out waiting for all shard connections to be established") + def test_idle_heartbeat(self): interval = 2 cluster = TestCluster(idle_heartbeat_interval=interval, monitor_reporting_enabled=False) session = cluster.connect(wait_for_all_pools=True) + # wait_for_all_pools only waits for the first connection per host; + # shard-aware connections to remaining shards are opened in background. + # Wait for them to stabilize so they don't get replaced during the test. + self._wait_for_all_shard_connections(cluster) + # This test relies on impl details of connection req id management to see if heartbeats # are being sent. May need update if impl is changed connection_request_ids = {} @@ -746,11 +768,8 @@ def test_idle_heartbeat(self): connections = [c for holders in cluster.get_connection_holders() for c in holders.get_connections()] - # make sure requests were sent on all connections that existed before the sleep - # (shard-aware reconnection may replace connections during the sleep interval) + # make sure requests were sent on all connections for c in connections: - if id(c) not in connection_request_ids: - continue expected_ids = connection_request_ids[id(c)] expected_ids.rotate(-1) with c.lock: @@ -759,14 +778,19 @@ def test_idle_heartbeat(self): # assert idle status assert all(c.is_idle for c in connections) - # send messages on all connections - statements_and_params = [("SELECT release_version FROM system.local WHERE key='local'", ())] * len(cluster.metadata.all_hosts()) + # send enough messages to ensure all connections are used + # (with shard-aware routing, each query only hits one shard per host, + # so we need more queries than just len(hosts) to cover all connections) + num_connections = len([c for c in connections if not c.is_control_connection]) + statements_and_params = [("SELECT release_version FROM system.local WHERE key='local'", ())] * max(num_connections * 2, len(cluster.metadata.all_hosts())) results = execute_concurrent(session, statements_and_params) for success, result in results: assert success - # assert not idle status - assert not any(c.is_idle if not c.is_control_connection else False for c in connections) + # assert at least some non-control connections are no longer idle + # (shard-aware routing may not distribute queries to every connection) + non_idle = [c for c in connections if not c.is_control_connection and not c.is_idle] + assert len(non_idle) > 0 # holders include session pools and cc holders = cluster.get_connection_holders()