diff --git a/cassandra/io/libevreactor.py b/cassandra/io/libevreactor.py index d7b365e451..7f20e6510e 100644 --- a/cassandra/io/libevreactor.py +++ b/cassandra/io/libevreactor.py @@ -300,7 +300,10 @@ def close(self): msg = "Connection to %s was closed" % self.endpoint if self.last_error: msg += ": %s" % (self.last_error,) - self.error_all_requests(ConnectionShutdown(msg)) + shutdown_exc = ConnectionShutdown(msg) + self.error_all_requests(shutdown_exc) + if not self.connected_event.is_set(): + self.last_error = shutdown_exc self.connected_event.set() def handle_write(self, watcher, revents, errno=None): @@ -332,6 +335,8 @@ def handle_write(self, watcher, revents, errno=None): with self._deque_lock: self.deque.appendleft(next_msg) else: + if self.is_closed or self.is_defunct: + return self.defunct(err) return else: @@ -365,12 +370,16 @@ def handle_read(self, watcher, revents, errno=None): if not self._iobuf.tell(): return else: + if self.is_closed or self.is_defunct: + return self.defunct(err) return elif err.args[0] in NONBLOCKING: if not self._iobuf.tell(): return else: + if self.is_closed or self.is_defunct: + return self.defunct(err) return @@ -378,6 +387,8 @@ def handle_read(self, watcher, revents, errno=None): self.process_io_buffer() else: log.debug("Connection %s closed by server", self) + self.last_error = ConnectionShutdown( + "Connection to %s was closed by server" % self.endpoint) self.close() def push(self, data):