Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ on:
- .github/dependabot.yml
- .github/pull_request_template.md
- "*.md"
- .github/workflows/docs-* -
- .github/workflows/docs-*
workflow_dispatch:

jobs:
Expand Down
35 changes: 31 additions & 4 deletions tests/integration/standard/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -725,12 +725,34 @@ def _warning_are_issued_when_auth(self, auth_provider):
assert auth_warning >= 4
assert auth_warning == mock_handler.get_message_count("debug", "Got ReadyMessage on new connection")

def _wait_for_all_shard_connections(self, cluster, timeout=30):
"""Wait until all shard-aware connections are fully established."""
from cassandra.pool import HostConnection
deadline = time.time() + timeout
while time.time() < deadline:
all_connected = True
for holder in cluster.get_connection_holders():
if not isinstance(holder, HostConnection):
continue
if holder.host.sharding_info and len(holder._connections) < holder.host.sharding_info.shards_count:
all_connected = False
break
if all_connected:
return
time.sleep(0.1)
raise RuntimeError("Timed out waiting for all shard connections to be established")

def test_idle_heartbeat(self):
interval = 2
cluster = TestCluster(idle_heartbeat_interval=interval,
monitor_reporting_enabled=False)
session = cluster.connect(wait_for_all_pools=True)

# wait_for_all_pools only waits for the first connection per host;
# shard-aware connections to remaining shards are opened in background.
# Wait for them to stabilize so they don't get replaced during the test.
self._wait_for_all_shard_connections(cluster)

# This test relies on impl details of connection req id management to see if heartbeats
# are being sent. May need update if impl is changed
connection_request_ids = {}
Expand All @@ -756,14 +778,19 @@ def test_idle_heartbeat(self):
# assert idle status
assert all(c.is_idle for c in connections)

# send messages on all connections
statements_and_params = [("SELECT release_version FROM system.local WHERE key='local'", ())] * len(cluster.metadata.all_hosts())
# send enough messages to ensure all connections are used
# (with shard-aware routing, each query only hits one shard per host,
# so we need more queries than just len(hosts) to cover all connections)
num_connections = len([c for c in connections if not c.is_control_connection])
statements_and_params = [("SELECT release_version FROM system.local WHERE key='local'", ())] * max(num_connections * 2, len(cluster.metadata.all_hosts()))
results = execute_concurrent(session, statements_and_params)
for success, result in results:
assert success

# assert not idle status
assert not any(c.is_idle if not c.is_control_connection else False for c in connections)
# assert at least some non-control connections are no longer idle
# (shard-aware routing may not distribute queries to every connection)
non_idle = [c for c in connections if not c.is_control_connection and not c.is_idle]
assert len(non_idle) > 0

# holders include session pools and cc
holders = cluster.get_connection_holders()
Expand Down
Loading