From 8543c1f45b8a17dbacb560d99960f4c1a714c68c Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Sat, 14 Feb 2026 07:57:35 -0400 Subject: [PATCH] Fix LibevConnection close() race causing EBADF errors (#614) LibevConnection.close() closes the socket immediately while watchers are stopped asynchronously in the next event loop iteration via _loop_will_run(). This creates a race window where handle_read() or handle_write() can operate on a closed socket fd, producing EBADF errors that surface as ConnectionShutdown. - Add is_closed/is_defunct guards in handle_read() and handle_write() error paths to silently exit during shutdown instead of calling defunct() with EBADF - Set last_error in close() when connected_event is not yet set to prevent factory() from returning a dead connection - Set last_error on server-initiated close (EOF) in handle_read() before calling close() --- cassandra/io/libevreactor.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/cassandra/io/libevreactor.py b/cassandra/io/libevreactor.py index d7b365e451..7f20e6510e 100644 --- a/cassandra/io/libevreactor.py +++ b/cassandra/io/libevreactor.py @@ -300,7 +300,10 @@ def close(self): msg = "Connection to %s was closed" % self.endpoint if self.last_error: msg += ": %s" % (self.last_error,) - self.error_all_requests(ConnectionShutdown(msg)) + shutdown_exc = ConnectionShutdown(msg) + self.error_all_requests(shutdown_exc) + if not self.connected_event.is_set(): + self.last_error = shutdown_exc self.connected_event.set() def handle_write(self, watcher, revents, errno=None): @@ -332,6 +335,8 @@ def handle_write(self, watcher, revents, errno=None): with self._deque_lock: self.deque.appendleft(next_msg) else: + if self.is_closed or self.is_defunct: + return self.defunct(err) return else: @@ -365,12 +370,16 @@ def handle_read(self, watcher, revents, errno=None): if not self._iobuf.tell(): return else: + if self.is_closed or self.is_defunct: + return self.defunct(err) return elif err.args[0] in NONBLOCKING: if not self._iobuf.tell(): return else: + if self.is_closed or self.is_defunct: + return self.defunct(err) return @@ -378,6 +387,8 @@ def handle_read(self, watcher, revents, errno=None): self.process_io_buffer() else: log.debug("Connection %s closed by server", self) + self.last_error = ConnectionShutdown( + "Connection to %s was closed by server" % self.endpoint) self.close() def push(self, data):