diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index 210c2d4e2b..048dbd1352 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -29,7 +29,7 @@ on: - .github/dependabot.yml - .github/pull_request_template.md - "*.md" - - .github/workflows/docs-* - + - .github/workflows/docs-* workflow_dispatch: jobs: diff --git a/tests/integration/standard/test_cluster.py b/tests/integration/standard/test_cluster.py index 1208edb9d2..bf62f5df48 100644 --- a/tests/integration/standard/test_cluster.py +++ b/tests/integration/standard/test_cluster.py @@ -725,12 +725,34 @@ def _warning_are_issued_when_auth(self, auth_provider): assert auth_warning >= 4 assert auth_warning == mock_handler.get_message_count("debug", "Got ReadyMessage on new connection") + def _wait_for_all_shard_connections(self, cluster, timeout=30): + """Wait until all shard-aware connections are fully established.""" + from cassandra.pool import HostConnection + deadline = time.time() + timeout + while time.time() < deadline: + all_connected = True + for holder in cluster.get_connection_holders(): + if not isinstance(holder, HostConnection): + continue + if holder.host.sharding_info and len(holder._connections) < holder.host.sharding_info.shards_count: + all_connected = False + break + if all_connected: + return + time.sleep(0.1) + raise RuntimeError("Timed out waiting for all shard connections to be established") + def test_idle_heartbeat(self): interval = 2 cluster = TestCluster(idle_heartbeat_interval=interval, monitor_reporting_enabled=False) session = cluster.connect(wait_for_all_pools=True) + # wait_for_all_pools only waits for the first connection per host; + # shard-aware connections to remaining shards are opened in background. + # Wait for them to stabilize so they don't get replaced during the test. + self._wait_for_all_shard_connections(cluster) + # This test relies on impl details of connection req id management to see if heartbeats # are being sent. May need update if impl is changed connection_request_ids = {} @@ -756,14 +778,19 @@ def test_idle_heartbeat(self): # assert idle status assert all(c.is_idle for c in connections) - # send messages on all connections - statements_and_params = [("SELECT release_version FROM system.local WHERE key='local'", ())] * len(cluster.metadata.all_hosts()) + # send enough messages to ensure all connections are used + # (with shard-aware routing, each query only hits one shard per host, + # so we need more queries than just len(hosts) to cover all connections) + num_connections = len([c for c in connections if not c.is_control_connection]) + statements_and_params = [("SELECT release_version FROM system.local WHERE key='local'", ())] * max(num_connections * 2, len(cluster.metadata.all_hosts())) results = execute_concurrent(session, statements_and_params) for success, result in results: assert success - # assert not idle status - assert not any(c.is_idle if not c.is_control_connection else False for c in connections) + # assert at least some non-control connections are no longer idle + # (shard-aware routing may not distribute queries to every connection) + non_idle = [c for c in connections if not c.is_control_connection and not c.is_idle] + assert len(non_idle) > 0 # holders include session pools and cc holders = cluster.get_connection_holders()