From 86ea4d739e448cc60e42d18860c3082855519249 Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Thu, 12 Feb 2026 13:48:54 -0400 Subject: [PATCH 1/5] Fix AsyncioConnection race conditions causing EBADF errors (#614) Fix four race conditions in AsyncioConnection that cause "[Errno 9] Bad file descriptor" errors during node restarts, especially with TLS: 1. close() now waits for _close() to complete when called from outside the event loop thread, eliminating the window where is_closed=True but the socket fd is still open. 2. handle_read() sets last_error on server EOF so factory() detects dead connections instead of returning them to callers. 3. push() rejects data when the connection is already closed, preventing writes from being enqueued to a shutting-down connection. 4. handle_write() treats BrokenPipeError/ConnectionResetError as clean peer disconnections instead of defuncting, and both I/O handlers skip defunct() if the connection is already shutting down. --- cassandra/io/asyncioreactor.py | 30 ++- tests/unit/io/test_asyncio_race_614.py | 308 +++++++++++++++++++++++++ 2 files changed, 337 insertions(+), 1 deletion(-) create mode 100644 tests/unit/io/test_asyncio_race_614.py diff --git a/cassandra/io/asyncioreactor.py b/cassandra/io/asyncioreactor.py index 66e1d7295c..16e466ec73 100644 --- a/cassandra/io/asyncioreactor.py +++ b/cassandra/io/asyncioreactor.py @@ -142,10 +142,20 @@ def close(self): # close from the loop thread to avoid races when removing file # descriptors - asyncio.run_coroutine_threadsafe( + future = asyncio.run_coroutine_threadsafe( self._close(), loop=self._loop ) + # When called from outside the event loop thread, wait for _close() + # to complete so the socket is actually closed when close() returns. + # This prevents a race window where is_closed=True but the socket fd + # is still open, which causes EBADF in pending I/O operations. + if threading.current_thread() != self._loop_thread: + try: + future.result(timeout=5) + except Exception: + pass + async def _close(self): log.debug("Closing connection (%s) to %s" % (id(self), self.endpoint)) if self._write_watcher: @@ -168,6 +178,10 @@ async def _close(self): self.connected_event.set() def push(self, data): + if self.is_closed or self.is_defunct: + raise ConnectionShutdown( + "Connection to %s is already closed" % (self.endpoint,)) + buff_size = self.out_buffer_size if len(data) > buff_size: chunks = [] @@ -202,6 +216,15 @@ async def handle_write(self): if next_msg: await self._loop.sock_sendall(self._socket, next_msg) except socket.error as err: + # Peer disconnected — do a clean close instead of defunct + if isinstance(err, (BrokenPipeError, ConnectionResetError)): + log.debug("Connection %s closed by peer during write: %s", + self, err) + self.close() + return + # Connection is already shutting down, just exit + if self.is_closed or self.is_defunct: + return log.debug("Exception in send for %s: %s", self, err) self.defunct(err) return @@ -223,6 +246,9 @@ async def handle_read(self): await asyncio.sleep(0) continue except socket.error as err: + # Connection is already shutting down, just exit + if self.is_closed or self.is_defunct: + return log.debug("Exception during socket recv for %s: %s", self, err) self.defunct(err) @@ -234,5 +260,7 @@ async def handle_read(self): self.process_io_buffer() else: log.debug("Connection %s closed by server", self) + self.last_error = ConnectionShutdown( + "Connection to %s was closed by server" % self.endpoint) self.close() return diff --git a/tests/unit/io/test_asyncio_race_614.py b/tests/unit/io/test_asyncio_race_614.py new file mode 100644 index 0000000000..c51a79c4cd --- /dev/null +++ b/tests/unit/io/test_asyncio_race_614.py @@ -0,0 +1,308 @@ +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Reproduction tests for issue #614: + "[Errno 9] Bad file descriptor" race conditions in AsyncioConnection. + +These tests use real socket pairs to demonstrate race conditions where: + - close() schedules _close() asynchronously, creating a window where + is_closed=True but the socket fd is still open + - factory() can return a dead connection when the server closes during setup + - push() accepts data after close() has been called + - handle_write() gets EBADF/BrokenPipe when the server closes mid-write + +Each test asserts the CORRECT expected behavior. Tests FAIL on the current +codebase because the bugs exist, proving the race conditions are real. +""" + +import logging +import socket +import time +import unittest + +from cassandra.connection import DefaultEndPoint, ConnectionShutdown + +AsyncioConnection, ASYNCIO_AVAILABLE = None, False +try: + from cassandra.io.asyncioreactor import AsyncioConnection + ASYNCIO_AVAILABLE = True +except (ImportError, SyntaxError, AttributeError): + AsyncioConnection = None + ASYNCIO_AVAILABLE = False + +from tests import is_monkey_patched + +log = logging.getLogger(__name__) + +skip_me = is_monkey_patched() or not ASYNCIO_AVAILABLE + + +class SocketPairAsyncioConnection(AsyncioConnection): + """ + A subclass of AsyncioConnection that uses a pre-provided socket from + a socketpair instead of connecting to a real CQL server. + + This bypasses _connect_socket() and _send_options_message() so we can + test I/O race conditions with a real fd registered on the event loop. + """ + + _test_socket = None # Set before __init__ + + def _connect_socket(self): + """Use the pre-provided test socket instead of connecting.""" + self._socket = self._test_socket + + def _send_options_message(self): + """Skip the CQL handshake -- we don't have a real server.""" + pass + + +def _make_connection_with_socketpair(): + """ + Create a SocketPairAsyncioConnection backed by a real socketpair. + + Returns (connection, server_socket) where server_socket is the + other end of the pair that can be used to send data to / close + the connection. + """ + client_sock, server_sock = socket.socketpair() + client_sock.setblocking(False) + server_sock.setblocking(False) + + SocketPairAsyncioConnection._test_socket = client_sock + conn = SocketPairAsyncioConnection( + DefaultEndPoint("127.0.0.1"), + connect_timeout=5, + ) + return conn, server_sock + + +@unittest.skipIf(is_monkey_patched(), 'runtime is monkey patched for another reactor') +@unittest.skipUnless(ASYNCIO_AVAILABLE, "asyncio is not available for this runtime") +class TestAsyncioRaceConditions614(unittest.TestCase): + """ + Reproduction tests for race conditions in AsyncioConnection that + cause "[Errno 9] Bad file descriptor" errors (issue #614). + + These tests FAIL on the current codebase, proving the bugs exist. + """ + + @classmethod + def setUpClass(cls): + if skip_me: + return + AsyncioConnection.initialize_reactor() + + def test_close_leaves_socket_open_window(self): + """ + BUG: close() returns with is_closed=True but the socket fd is still + open, because _close() is scheduled asynchronously. + + This is the root cause of all EBADF errors in issue #614. Any I/O + operation (handle_read's sock_recv, handle_write's sock_sendall) that + is in-flight when _close() eventually runs will get EBADF. + + Expected correct behavior: after close() returns, the socket should + no longer be usable for I/O (either closed or detached from the loop). + """ + conn, server_sock = _make_connection_with_socketpair() + try: + # Give handle_read time to start awaiting sock_recv + time.sleep(0.1) + + # close() sets is_closed=True and schedules _close() asynchronously + conn.close() + + # BUG: is_closed is True but the socket is still open. + # _close() hasn't run yet because it was scheduled via + # run_coroutine_threadsafe and we're checking immediately. + self.assertTrue(conn.is_closed) + + # The socket fd should be closed after close() returns. + # On a correctly implemented close(), the socket would be + # synchronously closed or at least detached. + try: + fd = conn._socket.fileno() + socket_still_open = (fd != -1) + except OSError: + socket_still_open = False + + self.assertFalse( + socket_still_open, + "BUG: close() returned with is_closed=True but socket fd={} " + "is still open. This creates a window where pending I/O " + "operations (sock_recv, sock_sendall) can get EBADF when " + "_close() eventually closes the fd.".format(fd) + ) + + finally: + # Wait for _close() to finish before cleanup + time.sleep(0.5) + server_sock.close() + + def test_factory_returns_closed_connection_on_server_eof(self): + """ + BUG: When the server closes the connection during handshake, + factory() returns a closed (dead) connection instead of raising. + + The sequence: + 1. Server closes its end -> handle_read gets empty buffer (EOF) + 2. handle_read calls close() -> _close() sets connected_event + 3. factory() wakes up: connected_event is set, last_error is None + 4. factory() returns the connection -- but it's already closed! + + Expected correct behavior: factory() should detect that the connection + is closed and raise an exception instead of returning a dead connection. + """ + conn, server_sock = _make_connection_with_socketpair() + try: + # Give handle_read time to start awaiting on sock_recv + time.sleep(0.1) + + # Server closes its end -- handle_read sees EOF (empty recv) + server_sock.close() + + # Wait for handle_read to process EOF and call close() -> _close() + # _close() sets connected_event when not defunct + conn.connected_event.wait(timeout=2.0) + + # Simulate what factory() does after connected_event.wait(): + # if conn.last_error: + # raise conn.last_error + # elif not conn.connected_event.is_set(): + # raise OperationTimedOut(...) + # else: + # return conn <-- BUG: returns closed connection + factory_would_raise = False + if conn.last_error: + factory_would_raise = True + elif not conn.connected_event.is_set(): + factory_would_raise = True + + # factory() should raise when the connection is dead + self.assertTrue( + factory_would_raise, + "BUG: factory() would return a dead connection! " + "connected_event.is_set()={}, last_error={}, is_closed={}, " + "is_defunct={}. factory() has no check for is_closed before " + "returning, so it returns a connection that is already " + "closed.".format( + conn.connected_event.is_set(), conn.last_error, + conn.is_closed, conn.is_defunct + ) + ) + + finally: + if not server_sock._closed: + server_sock.close() + + def test_push_accepts_data_after_close(self): + """ + BUG: push() does not check is_closed, so data can be enqueued + to the write queue after close() has been called. + + This is part of the TOCTOU race in send_msg(): send_msg checks + is_closed, then calls push(). But even without the TOCTOU in + send_msg, push() itself never validates the connection state. + + Expected correct behavior: push() should raise ConnectionShutdown + (or similar) when called on a closed connection. + """ + conn, server_sock = _make_connection_with_socketpair() + try: + # Give handle_read time to start + time.sleep(0.1) + + # close() sets is_closed=True and schedules _close() + conn.close() + self.assertTrue(conn.is_closed) + + # BUG: push() succeeds even though is_closed=True. + # The data gets enqueued to _write_queue and may cause + # EBADF when handle_write tries to send it after _close() + # closes the socket. + push_succeeded = False + try: + conn.push(b'\x00' * 100) + push_succeeded = True + except (ConnectionShutdown, OSError): + push_succeeded = False + + self.assertFalse( + push_succeeded, + "BUG: push() accepted data after close() was called " + "(is_closed=True). This data will be enqueued to the " + "write queue and handle_write() will attempt to send it, " + "potentially getting EBADF when _close() closes the socket." + ) + + finally: + time.sleep(0.5) + server_sock.close() + + def test_handle_write_gets_broken_pipe_on_server_close(self): + """ + BUG: handle_write() gets BrokenPipeError when the server closes + while data is queued for writing, causing the connection to become + defunct instead of cleanly closing. + + The sequence: + 1. Push data into the write queue + 2. Close server end -> handle_read sees EOF, calls close() + 3. handle_write tries sock_sendall -> BrokenPipeError + 4. handle_write calls defunct() with the error + + Expected correct behavior: when the server closes, pending writes + should be cancelled cleanly (not cause defunct with EBADF/BrokenPipe). + The connection should end up closed but NOT defunct. + """ + conn, server_sock = _make_connection_with_socketpair() + try: + # Give handle_read time to start + time.sleep(0.1) + + # Push data so handle_write has work to do + for _ in range(10): + conn.push(b'\x00' * 65536) + + # Close server end -- triggers EOF in handle_read and + # BrokenPipe in handle_write + server_sock.close() + + # Wait for the chain reaction + time.sleep(1.0) + + self.assertTrue(conn.is_closed, + "Connection should be closed after server EOF") + + # BUG: The connection becomes defunct (with BrokenPipeError or + # EBADF) instead of closing cleanly. A clean close should cancel + # pending writes without defuncting. + self.assertFalse( + conn.is_defunct, + "BUG: Connection became defunct instead of closing cleanly. " + "handle_write() got an error after the server closed: {}. " + "Pending writes should be cancelled by _close() before they " + "can hit a broken socket.".format(conn.last_error) + ) + + finally: + if not server_sock._closed: + server_sock.close() + + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG) + unittest.main() From 71f9ad1e10a680df835774f0a48b284274ef4e0c Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Thu, 12 Feb 2026 14:02:14 -0400 Subject: [PATCH 2/5] Fix _close() on Windows ProactorEventLoop ProactorEventLoop does not support remove_reader/remove_writer (raises NotImplementedError). Wrap these calls so the socket is always closed regardless, and use try/finally to ensure connected_event is always set even if cleanup fails. --- cassandra/io/asyncioreactor.py | 46 +++++++++++++++++++++------------- 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/cassandra/io/asyncioreactor.py b/cassandra/io/asyncioreactor.py index 16e466ec73..fff8e01d4b 100644 --- a/cassandra/io/asyncioreactor.py +++ b/cassandra/io/asyncioreactor.py @@ -158,24 +158,34 @@ def close(self): async def _close(self): log.debug("Closing connection (%s) to %s" % (id(self), self.endpoint)) - if self._write_watcher: - self._write_watcher.cancel() - if self._read_watcher: - self._read_watcher.cancel() - if self._socket: - self._loop.remove_writer(self._socket.fileno()) - self._loop.remove_reader(self._socket.fileno()) - self._socket.close() - - log.debug("Closed socket to %s" % (self.endpoint,)) - - if not self.is_defunct: - msg = "Connection to %s was closed" % self.endpoint - if self.last_error: - msg += ": %s" % (self.last_error,) - self.error_all_requests(ConnectionShutdown(msg)) - # don't leave in-progress operations hanging - self.connected_event.set() + try: + if self._write_watcher: + self._write_watcher.cancel() + if self._read_watcher: + self._read_watcher.cancel() + if self._socket: + # remove_reader/remove_writer are not supported on Windows + # ProactorEventLoop — ignore failures so the socket still + # gets closed. + try: + self._loop.remove_writer(self._socket.fileno()) + except (NotImplementedError, OSError): + pass + try: + self._loop.remove_reader(self._socket.fileno()) + except (NotImplementedError, OSError): + pass + self._socket.close() + + log.debug("Closed socket to %s" % (self.endpoint,)) + finally: + if not self.is_defunct: + msg = "Connection to %s was closed" % self.endpoint + if self.last_error: + msg += ": %s" % (self.last_error,) + self.error_all_requests(ConnectionShutdown(msg)) + # don't leave in-progress operations hanging + self.connected_event.set() def push(self, data): if self.is_closed or self.is_defunct: From 45b1780960d640fb9be1726f4330f244cb86817f Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Thu, 12 Feb 2026 14:52:52 -0400 Subject: [PATCH 3/5] Fix handle_write peer-disconnect detection on Windows On Windows, ProactorEventLoop may raise plain OSError with winerror=10054 (WSAECONNRESET) or 10053 (WSAECONNABORTED) instead of ConnectionResetError. Use ConnectionError base class plus winerror check for robust cross-platform detection. --- cassandra/io/asyncioreactor.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/cassandra/io/asyncioreactor.py b/cassandra/io/asyncioreactor.py index fff8e01d4b..2961205c50 100644 --- a/cassandra/io/asyncioreactor.py +++ b/cassandra/io/asyncioreactor.py @@ -226,8 +226,14 @@ async def handle_write(self): if next_msg: await self._loop.sock_sendall(self._socket, next_msg) except socket.error as err: - # Peer disconnected — do a clean close instead of defunct - if isinstance(err, (BrokenPipeError, ConnectionResetError)): + # Peer disconnected — do a clean close instead of defunct. + # Use ConnectionError (parent of BrokenPipeError, + # ConnectionResetError, ConnectionAbortedError) plus a + # winerror check for Windows IOCP which may raise plain + # OSError with winerror=10054 (WSAECONNRESET) or + # 10053 (WSAECONNABORTED). + if (isinstance(err, ConnectionError) + or getattr(err, 'winerror', None) in (10053, 10054)): log.debug("Connection %s closed by peer during write: %s", self, err) self.close() From 366064aac5bee06365f1e989ad2ed900705d7de3 Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Thu, 12 Feb 2026 15:19:28 -0400 Subject: [PATCH 4/5] Fix macOS ENOTCONN detection in handle_write peer-disconnect check MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit macOS raises OSError(57, "Socket is not connected") (ENOTCONN) when writing to a peer-disconnected socket. Unlike BrokenPipeError or ConnectionResetError, this is a plain OSError — not a ConnectionError subclass — so it was not caught by the isinstance(err, ConnectionError) check. Add errno-based detection for ENOTCONN, ESHUTDOWN, ECONNRESET, and ECONNABORTED to handle these platform-specific disconnect errors cleanly instead of defuncting the connection. --- cassandra/io/asyncioreactor.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/cassandra/io/asyncioreactor.py b/cassandra/io/asyncioreactor.py index 2961205c50..f069e6c35c 100644 --- a/cassandra/io/asyncioreactor.py +++ b/cassandra/io/asyncioreactor.py @@ -1,3 +1,4 @@ +import errno import threading from cassandra.connection import Connection, ConnectionShutdown @@ -232,8 +233,15 @@ async def handle_write(self): # winerror check for Windows IOCP which may raise plain # OSError with winerror=10054 (WSAECONNRESET) or # 10053 (WSAECONNABORTED). + # Also check errno for ENOTCONN/ESHUTDOWN/ECONNRESET + # which macOS raises as plain OSError (not ConnectionError). + _peer_errnos = ( + errno.ENOTCONN, errno.ESHUTDOWN, + errno.ECONNRESET, errno.ECONNABORTED, + ) if (isinstance(err, ConnectionError) - or getattr(err, 'winerror', None) in (10053, 10054)): + or getattr(err, 'winerror', None) in (10053, 10054) + or getattr(err, 'errno', None) in _peer_errnos): log.debug("Connection %s closed by peer during write: %s", self, err) self.close() From e4f0c6b5034777ce4dba28c2f4d327d646cb536f Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Thu, 12 Feb 2026 17:48:40 -0400 Subject: [PATCH 5/5] Address code quality review comments - Guard SocketPairAsyncioConnection definition with ASYNCIO_AVAILABLE check to avoid subclassing NoneType when asyncio is unavailable - Add debug logging in close() except block instead of bare pass - Add inline comments to except blocks in _close() for remove_reader/remove_writer --- cassandra/io/asyncioreactor.py | 8 ++++--- tests/unit/io/test_asyncio_race_614.py | 29 +++++++++++++------------- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/cassandra/io/asyncioreactor.py b/cassandra/io/asyncioreactor.py index f069e6c35c..e1865f2a44 100644 --- a/cassandra/io/asyncioreactor.py +++ b/cassandra/io/asyncioreactor.py @@ -155,7 +155,9 @@ def close(self): try: future.result(timeout=5) except Exception: - pass + # Best-effort close: suppress errors during shutdown + log.debug("Error waiting for async close of %s", + self.endpoint, exc_info=True) async def _close(self): log.debug("Closing connection (%s) to %s" % (id(self), self.endpoint)) @@ -171,11 +173,11 @@ async def _close(self): try: self._loop.remove_writer(self._socket.fileno()) except (NotImplementedError, OSError): - pass + pass # not supported on Windows ProactorEventLoop try: self._loop.remove_reader(self._socket.fileno()) except (NotImplementedError, OSError): - pass + pass # not supported on Windows ProactorEventLoop self._socket.close() log.debug("Closed socket to %s" % (self.endpoint,)) diff --git a/tests/unit/io/test_asyncio_race_614.py b/tests/unit/io/test_asyncio_race_614.py index c51a79c4cd..49b2d081a8 100644 --- a/tests/unit/io/test_asyncio_race_614.py +++ b/tests/unit/io/test_asyncio_race_614.py @@ -49,24 +49,25 @@ skip_me = is_monkey_patched() or not ASYNCIO_AVAILABLE -class SocketPairAsyncioConnection(AsyncioConnection): - """ - A subclass of AsyncioConnection that uses a pre-provided socket from - a socketpair instead of connecting to a real CQL server. +if ASYNCIO_AVAILABLE and AsyncioConnection is not None: + class SocketPairAsyncioConnection(AsyncioConnection): + """ + A subclass of AsyncioConnection that uses a pre-provided socket from + a socketpair instead of connecting to a real CQL server. - This bypasses _connect_socket() and _send_options_message() so we can - test I/O race conditions with a real fd registered on the event loop. - """ + This bypasses _connect_socket() and _send_options_message() so we can + test I/O race conditions with a real fd registered on the event loop. + """ - _test_socket = None # Set before __init__ + _test_socket = None # Set before __init__ - def _connect_socket(self): - """Use the pre-provided test socket instead of connecting.""" - self._socket = self._test_socket + def _connect_socket(self): + """Use the pre-provided test socket instead of connecting.""" + self._socket = self._test_socket - def _send_options_message(self): - """Skip the CQL handshake -- we don't have a real server.""" - pass + def _send_options_message(self): + """Skip the CQL handshake -- we don't have a real server.""" + pass def _make_connection_with_socketpair():