diff --git a/awscrt/mqtt5.py b/awscrt/mqtt5.py index 7c5e4f31f..91cc19f9d 100644 --- a/awscrt/mqtt5.py +++ b/awscrt/mqtt5.py @@ -1228,8 +1228,20 @@ class PublishReceivedData: Args: publish_packet (PublishPacket): Data model of an `MQTT5 PUBLISH `_ packet. + acquire_puback_control (Callable): For QoS 1 messages only: call this function within the + on_publish_callback_fn callback to take manual control of the PUBACK for this message, preventing + the client from automatically sending a PUBACK. Returns an opaque handle that can be passed to + Client.invoke_puback() to send the PUBACK to the broker. + + Important: This function must be called within the on_publish_callback_fn callback. Calling it after the + callback returns will raise a RuntimeError. This function may only be called once per received PUBLISH; + calling it a second time will also raise a RuntimeError. If this function is not called, the client will + automatically send a PUBACK for QoS 1 messages when the callback returns. + + For QoS 0 messages, this field is None. """ publish_packet: PublishPacket = None + acquire_puback_control: Callable = None @dataclass @@ -1434,7 +1446,8 @@ def _on_publish( correlation_data, subscription_identifiers_tuples, content_type, - user_properties_tuples): + user_properties_tuples, + acquire_puback_control_fn): if self._on_publish_cb is None: return @@ -1468,9 +1481,13 @@ def _on_publish( publish_packet.content_type = content_type publish_packet.user_properties = _init_user_properties(user_properties_tuples) - self._on_publish_cb(PublishReceivedData(publish_packet=publish_packet)) + # Create PublishReceivedData with the manual control callback + publish_data = PublishReceivedData( + publish_packet=publish_packet, + acquire_puback_control=acquire_puback_control_fn + ) - return + self._on_publish_cb(publish_data) def _on_lifecycle_stopped(self): if self._on_lifecycle_stopped_cb: @@ -1957,6 +1974,25 @@ def get_stats(self): result = _awscrt.mqtt5_client_get_stats(self._binding) return OperationStatisticsData(result[0], result[1], result[2], result[3]) + def invoke_puback(self, puback_control_handle): + """Sends a PUBACK packet for a QoS 1 PUBLISH that was previously acquired for manual control. + + To use manual PUBACK control, call acquire_puback_control() within the on_publish_callback_fn + callback to obtain a handle. Then call this method to send the PUBACK. + + Args: + puback_control_handle: An opaque handle obtained from acquire_puback_control() within + PublishReceivedData. This handle cannot be created manually. + + Raises: + Exception: If the native client returns an error when invoking the PUBACK. + """ + + _awscrt.mqtt5_client_invoke_puback( + self._binding, + puback_control_handle + ) + def new_connection(self, on_connection_interrupted=None, on_connection_resumed=None, on_connection_success=None, on_connection_failure=None, on_connection_closed=None): from awscrt.mqtt import Connection diff --git a/crt/aws-c-mqtt b/crt/aws-c-mqtt index 41b6a7d6d..dc2fe7be8 160000 --- a/crt/aws-c-mqtt +++ b/crt/aws-c-mqtt @@ -1 +1 @@ -Subproject commit 41b6a7d6d566a56eff69743df66c077d56a80c9d +Subproject commit dc2fe7be81070f7c5095ad386d9a23c180a4276b diff --git a/source/module.c b/source/module.c index 0b752e03d..6e2452246 100644 --- a/source/module.c +++ b/source/module.c @@ -810,6 +810,7 @@ static PyMethodDef s_module_methods[] = { AWS_PY_METHOD_DEF(mqtt5_client_subscribe, METH_VARARGS), AWS_PY_METHOD_DEF(mqtt5_client_unsubscribe, METH_VARARGS), AWS_PY_METHOD_DEF(mqtt5_client_get_stats, METH_VARARGS), + AWS_PY_METHOD_DEF(mqtt5_client_invoke_puback, METH_VARARGS), AWS_PY_METHOD_DEF(mqtt5_ws_handshake_transform_complete, METH_VARARGS), /* MQTT Request Response Client */ diff --git a/source/mqtt5_client.c b/source/mqtt5_client.c index 243af6a0e..629fb5011 100644 --- a/source/mqtt5_client.c +++ b/source/mqtt5_client.c @@ -218,6 +218,73 @@ static PyObject *s_aws_set_user_properties_to_PyObject( * Publish Handler ******************************************************************************/ +static const char *s_capsule_name_puback_control_handle = "aws_puback_control_handle"; + +struct puback_control_handle { + uint64_t control_id; +}; + +static void s_puback_control_handle_destructor(PyObject *capsule) { + struct puback_control_handle *handle = PyCapsule_GetPointer(capsule, s_capsule_name_puback_control_handle); + if (handle) { + aws_mem_release(aws_py_get_allocator(), handle); + } +} + +/* Callback context for manual PUBACK control */ +struct manual_puback_control_context { + struct aws_mqtt5_client *client; + struct aws_mqtt5_packet_publish_view *publish_packet; +}; + +static void s_manual_puback_control_context_destructor(PyObject *capsule) { + struct manual_puback_control_context *context = PyCapsule_GetPointer(capsule, "manual_puback_control_context"); + if (context) { + aws_mem_release(aws_py_get_allocator(), context); + } +} + +/* Function called from Python to set manual PUBACK control and return puback_control_id */ +PyObject *aws_py_mqtt5_client_acquire_puback(PyObject *self, PyObject *args) { + (void)args; + + struct manual_puback_control_context *context = PyCapsule_GetPointer(self, "manual_puback_control_context"); + if (!context) { + PyErr_SetString(PyExc_ValueError, "Invalid manual PUBACK control context"); + return NULL; + } + + /* If the publish_packet pointer has been zeroed out, the callback has already returned (post-callback call) + * or this function was already called once (double-call). Both are usage errors. */ + if (!context->publish_packet) { + PyErr_SetString( + PyExc_RuntimeError, + "acquire_puback_control() must be called within the on_publish_callback_fn callback " + "and may only be called once."); + return NULL; + } + + uint64_t puback_control_id = aws_mqtt5_client_acquire_puback(context->client, context->publish_packet); + + /* Zero out the publish_packet pointer to prevent double-calls. */ + context->publish_packet = NULL; + + /* Create handle struct */ + struct puback_control_handle *handle = + aws_mem_calloc(aws_py_get_allocator(), 1, sizeof(struct puback_control_handle)); + + handle->control_id = puback_control_id; + + /* Wrap in capsule */ + PyObject *capsule = PyCapsule_New(handle, s_capsule_name_puback_control_handle, s_puback_control_handle_destructor); + if (!capsule) { + aws_mem_release(aws_py_get_allocator(), handle); + return NULL; + } + + return capsule; +} + static void s_on_publish_received(const struct aws_mqtt5_packet_publish_view *publish_packet, void *user_data) { if (!user_data) { @@ -234,10 +301,50 @@ static void s_on_publish_received(const struct aws_mqtt5_packet_publish_view *pu PyObject *result = NULL; PyObject *subscription_identifier_list = NULL; PyObject *user_properties_list = NULL; + PyObject *manual_control_callback = NULL; + PyObject *control_context_capsule = NULL; size_t subscription_identifier_count = publish_packet->subscription_identifier_count; size_t user_property_count = publish_packet->user_property_count; + /* Create manual PUBACK control context */ + struct manual_puback_control_context *control_context = + aws_mem_calloc(aws_py_get_allocator(), 1, sizeof(struct manual_puback_control_context)); + if (!control_context) { + PyErr_WriteUnraisable(PyErr_Occurred()); + goto cleanup; + } + + /* Set up the context with both client and publish packet */ + control_context->client = client->native; + control_context->publish_packet = (struct aws_mqtt5_packet_publish_view *)publish_packet; + + control_context_capsule = + PyCapsule_New(control_context, "manual_puback_control_context", s_manual_puback_control_context_destructor); + if (!control_context_capsule) { + aws_mem_release(aws_py_get_allocator(), control_context); + PyErr_WriteUnraisable(PyErr_Occurred()); + goto cleanup; + } + + /* Method definition for the manual control callback */ + static PyMethodDef method_def = { + "acquire_puback_control", + aws_py_mqtt5_client_acquire_puback, + METH_NOARGS, + "Take manual control of PUBACK for this message"}; + + /* Only create the manual control callback for QoS 1 messages. + * For QoS 0, acquire_puback_control is passed as None. + * acquirePubackControl is only set for QoS 1). */ + if (publish_packet->qos == AWS_MQTT5_QOS_AT_LEAST_ONCE) { + manual_control_callback = PyCFunction_New(&method_def, control_context_capsule); + if (!manual_control_callback) { + PyErr_WriteUnraisable(PyErr_Occurred()); + goto cleanup; + } + } + /* Create list of uint32_t subscription identifier tuples */ subscription_identifier_list = PyList_New(subscription_identifier_count); if (!subscription_identifier_list) { @@ -261,7 +368,7 @@ static void s_on_publish_received(const struct aws_mqtt5_packet_publish_view *pu result = PyObject_CallMethod( client->client_core, "_on_publish", - "(y#iOs#OiOIOHs#y#Os#O)", + "(y#iOs#OiOIOHs#y#Os#OO)", /* y */ publish_packet->payload.ptr, /* # */ publish_packet->payload.len, /* i */ (int)publish_packet->qos, @@ -284,15 +391,23 @@ static void s_on_publish_received(const struct aws_mqtt5_packet_publish_view *pu /* O */ subscription_identifier_count > 0 ? subscription_identifier_list : Py_None, /* s */ publish_packet->content_type ? publish_packet->content_type->ptr : NULL, /* # */ publish_packet->content_type ? publish_packet->content_type->len : 0, - /* O */ user_property_count > 0 ? user_properties_list : Py_None); + /* O */ user_property_count > 0 ? user_properties_list : Py_None, + /* O */ manual_control_callback ? manual_control_callback : Py_None); + if (!result) { PyErr_WriteUnraisable(PyErr_Occurred()); } + /* Invalidate the publish_packet pointer now that the callback has returned. + * This prevents use-after-free if acquire_puback_control() is called after the callback. */ + control_context->publish_packet = NULL; + cleanup: Py_XDECREF(result); Py_XDECREF(subscription_identifier_list); Py_XDECREF(user_properties_list); + Py_XDECREF(manual_control_callback); + Py_XDECREF(control_context_capsule); PyGILState_Release(state); } @@ -1683,6 +1798,49 @@ PyObject *aws_py_mqtt5_client_publish(PyObject *self, PyObject *args) { return NULL; } +/******************************************************************************* + * Invoke Puback + ******************************************************************************/ + +PyObject *aws_py_mqtt5_client_invoke_puback(PyObject *self, PyObject *args) { + (void)self; + bool success = true; + + PyObject *impl_capsule; + PyObject *puback_handle_capsule; + + if (!PyArg_ParseTuple( + args, + "OO", + /* O */ &impl_capsule, + /* O */ &puback_handle_capsule)) { + return NULL; + } + + struct mqtt5_client_binding *client = PyCapsule_GetPointer(impl_capsule, s_capsule_name_mqtt5_client); + if (!client) { + return NULL; + } + + /* Extract handle from capsule */ + struct puback_control_handle *handle = + PyCapsule_GetPointer(puback_handle_capsule, s_capsule_name_puback_control_handle); + if (!handle) { + PyErr_SetString(PyExc_TypeError, "Invalid PUBACK control handle"); + return NULL; + } + + if (aws_mqtt5_client_invoke_puback(client->native, handle->control_id, NULL)) { + PyErr_SetAwsLastError(); + success = false; + } + + if (success) { + Py_RETURN_NONE; + } + return NULL; +} + /******************************************************************************* * Subscribe ******************************************************************************/ diff --git a/source/mqtt5_client.h b/source/mqtt5_client.h index 46c135f82..b9bc54f16 100644 --- a/source/mqtt5_client.h +++ b/source/mqtt5_client.h @@ -14,6 +14,7 @@ PyObject *aws_py_mqtt5_client_publish(PyObject *self, PyObject *args); PyObject *aws_py_mqtt5_client_subscribe(PyObject *self, PyObject *args); PyObject *aws_py_mqtt5_client_unsubscribe(PyObject *self, PyObject *args); PyObject *aws_py_mqtt5_client_get_stats(PyObject *self, PyObject *args); +PyObject *aws_py_mqtt5_client_invoke_puback(PyObject *self, PyObject *args); PyObject *aws_py_mqtt5_ws_handshake_transform_complete(PyObject *self, PyObject *args); diff --git a/test/test_mqtt5.py b/test/test_mqtt5.py index 4d8f7d0e2..d2883b938 100644 --- a/test/test_mqtt5.py +++ b/test/test_mqtt5.py @@ -1574,6 +1574,344 @@ def _test_qos1_happy_path(self): def test_qos1_happy_path(self): test_retry_wrapper(self._test_qos1_happy_path) + # ============================================================== + # MANUAL PUBACK TEST CASES + # ============================================================== + + # Manual PUBACK hold test: hold PUBACK and verify broker re-delivers the message + def _test_manual_puback_hold(self): + input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST") + input_cert = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_CERT") + input_key = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_KEY") + + client_id = create_client_id() + topic_filter = "test/MQTT5_ManualPuback_Python_" + client_id + payload = str(uuid.uuid4()) + payload_bytes = payload.encode("utf-8") + + PUBACK_HOLD_TIMEOUT = 60.0 + + future_first_delivery = Future() + future_redelivery = Future() + puback_handle_holder = [None] # mutable container so the closure can write to it + + def on_publish_received(publish_received_data: mqtt5.PublishReceivedData): + received_payload = publish_received_data.publish_packet.payload + if not future_first_delivery.done(): + # First delivery: acquire manual PUBACK control to hold the PUBACK + puback_handle_holder[0] = publish_received_data.acquire_puback_control() + future_first_delivery.set_result(received_payload) + elif not future_redelivery.done(): + # Second delivery: broker re-sent because no PUBACK was received + future_redelivery.set_result(received_payload) + + tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path( + input_cert, + input_key + ) + client_options = mqtt5.ClientOptions( + host_name=input_host_name, + port=8883 + ) + client_options.connect_options = mqtt5.ConnectPacket(client_id=client_id) + client_options.tls_ctx = io.ClientTlsContext(tls_ctx_options) + + callbacks = Mqtt5TestCallbacks() + callbacks.on_publish_received = on_publish_received + + client = self._create_client(client_options=client_options, callbacks=callbacks) + client.start() + callbacks.future_connection_success.result(TIMEOUT) + + # Subscribe to the topic with QoS 1 + subscriptions = [mqtt5.Subscription(topic_filter=topic_filter, qos=mqtt5.QoS.AT_LEAST_ONCE)] + subscribe_packet = mqtt5.SubscribePacket(subscriptions=subscriptions) + subscribe_future = client.subscribe(subscribe_packet=subscribe_packet) + suback_packet = subscribe_future.result(TIMEOUT) + self.assertIsInstance(suback_packet, mqtt5.SubackPacket) + + # Publish a QoS 1 message with a unique UUID payload + publish_packet = mqtt5.PublishPacket( + payload=payload, + topic=topic_filter, + qos=mqtt5.QoS.AT_LEAST_ONCE) + publish_future = client.publish(publish_packet=publish_packet) + publish_completion_data = publish_future.result(TIMEOUT) + self.assertIsInstance(publish_completion_data.puback, mqtt5.PubackPacket) + + # Wait for the first delivery and confirm PUBACK was held + first_payload = future_first_delivery.result(TIMEOUT) + self.assertEqual(first_payload, payload_bytes) + self.assertIsNotNone(puback_handle_holder[0], "acquire_puback_control() should have returned a handle") + + # Wait up to 60 seconds for the broker to re-deliver the message (no PUBACK was sent) + redelivered_payload = future_redelivery.result(PUBACK_HOLD_TIMEOUT) + self.assertEqual(redelivered_payload, payload_bytes, + "Re-delivered payload should match the original UUID payload") + + # Release the held PUBACK now that we've confirmed re-delivery + client.invoke_puback(puback_handle_holder[0]) + + client.stop() + callbacks.future_stopped.result(TIMEOUT) + + def test_manual_puback_hold(self): + test_retry_wrapper(self._test_manual_puback_hold) + + # Manual PUBACK invoke test: acquire and immediately invoke PUBACK, verify no re-delivery + def _test_manual_puback_invoke(self): + input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST") + input_cert = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_CERT") + input_key = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_KEY") + + client_id = create_client_id() + topic_filter = "test/MQTT5_ManualPuback_Python_" + client_id + payload = str(uuid.uuid4()) + payload_bytes = payload.encode("utf-8") + + NO_REDELIVERY_WAIT = 60.0 + + future_first_delivery = Future() + future_unexpected_redelivery = Future() + puback_handle_holder = [None] # mutable container so the closure can write to it + + def on_publish_received(publish_received_data: mqtt5.PublishReceivedData): + received_payload = publish_received_data.publish_packet.payload + if not future_first_delivery.done(): + # First delivery: acquire manual PUBACK control, then immediately invoke it + puback_handle_holder[0] = publish_received_data.acquire_puback_control() + future_first_delivery.set_result(received_payload) + elif received_payload == payload_bytes and not future_unexpected_redelivery.done(): + # A second delivery of the same payload means the broker re-sent, this should NOT happen + future_unexpected_redelivery.set_result(received_payload) + + tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path( + input_cert, + input_key + ) + client_options = mqtt5.ClientOptions( + host_name=input_host_name, + port=8883 + ) + client_options.connect_options = mqtt5.ConnectPacket(client_id=client_id) + client_options.tls_ctx = io.ClientTlsContext(tls_ctx_options) + + callbacks = Mqtt5TestCallbacks() + callbacks.on_publish_received = on_publish_received + + client = self._create_client(client_options=client_options, callbacks=callbacks) + client.start() + callbacks.future_connection_success.result(TIMEOUT) + + # Subscribe to the topic with QoS 1 + subscriptions = [mqtt5.Subscription(topic_filter=topic_filter, qos=mqtt5.QoS.AT_LEAST_ONCE)] + subscribe_packet = mqtt5.SubscribePacket(subscriptions=subscriptions) + subscribe_future = client.subscribe(subscribe_packet=subscribe_packet) + suback_packet = subscribe_future.result(TIMEOUT) + self.assertIsInstance(suback_packet, mqtt5.SubackPacket) + + # Publish a QoS 1 message with a unique UUID payload + publish_packet = mqtt5.PublishPacket( + payload=payload, + topic=topic_filter, + qos=mqtt5.QoS.AT_LEAST_ONCE) + publish_future = client.publish(publish_packet=publish_packet) + publish_completion_data = publish_future.result(TIMEOUT) + self.assertIsInstance(publish_completion_data.puback, mqtt5.PubackPacket) + + # Wait for the first delivery and confirm PUBACK handle was acquired + first_payload = future_first_delivery.result(TIMEOUT) + self.assertEqual(first_payload, payload_bytes) + self.assertIsNotNone(puback_handle_holder[0], "acquire_puback_control() should have returned a handle") + + # Immediately invoke the PUBACK using the acquired handle + client.invoke_puback(puback_handle_holder[0]) + + # Wait 60 seconds and confirm the broker does NOT re-deliver the message + # (because we sent the PUBACK via invoke_puback) + redelivered = future_unexpected_redelivery.done() or \ + (not future_unexpected_redelivery.done() and + self._wait_for_future_timeout(future_unexpected_redelivery, NO_REDELIVERY_WAIT)) + self.assertFalse(redelivered, + "Broker should NOT re-deliver the message after invoke_puback() was called") + + client.stop() + callbacks.future_stopped.result(TIMEOUT) + + def _wait_for_future_timeout(self, future, timeout_sec): + """Returns True if the future completed within timeout_sec, False if it timed out.""" + try: + future.result(timeout_sec) + return True + except Exception: + return False + + def test_manual_puback_invoke(self): + test_retry_wrapper(self._test_manual_puback_invoke) + + # Manual PUBACK double-call test: calling acquirePubackControl() twice raises IllegalStateException + def _test_manual_puback_acquire_double_call_raises(self): + """Verify that calling acquire_puback_control() twice on the same QoS 1 PUBLISH raises RuntimeError.""" + input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST") + input_cert = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_CERT") + input_key = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_KEY") + + client_id = create_client_id() + topic_filter = "test/MQTT5_Binding_Python_" + client_id + payload = str(uuid.uuid4()) + + future_result = Future() + + def on_publish_received(publish_received_data: mqtt5.PublishReceivedData): + try: + # First call should succeed + handle = publish_received_data.acquire_puback_control() + # Second call on the same message should raise RuntimeError + try: + publish_received_data.acquire_puback_control() + future_result.set_result("no_error") # Should not reach here + except RuntimeError: + future_result.set_result("double_call_raised") + except Exception as e: + future_result.set_result(f"unexpected_error: {e}") + # Release the handle we acquired + # (handle is valid, invoke it to clean up) + except Exception as e: + future_result.set_result(f"first_call_failed: {e}") + + tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path(input_cert, input_key) + client_options = mqtt5.ClientOptions(host_name=input_host_name, port=8883) + client_options.connect_options = mqtt5.ConnectPacket(client_id=client_id) + client_options.tls_ctx = io.ClientTlsContext(tls_ctx_options) + + callbacks = Mqtt5TestCallbacks() + callbacks.on_publish_received = on_publish_received + + client = self._create_client(client_options=client_options, callbacks=callbacks) + client.start() + callbacks.future_connection_success.result(TIMEOUT) + + subscriptions = [mqtt5.Subscription(topic_filter=topic_filter, qos=mqtt5.QoS.AT_LEAST_ONCE)] + subscribe_future = client.subscribe(mqtt5.SubscribePacket(subscriptions=subscriptions)) + subscribe_future.result(TIMEOUT) + + publish_future = client.publish(mqtt5.PublishPacket( + payload=payload, topic=topic_filter, qos=mqtt5.QoS.AT_LEAST_ONCE)) + publish_future.result(TIMEOUT) + + result = future_result.result(TIMEOUT) + self.assertEqual(result, "double_call_raised", + f"Expected RuntimeError on double-call, got: {result}") + + client.stop() + callbacks.future_stopped.result(TIMEOUT) + + def test_manual_puback_acquire_double_call_raises(self): + test_retry_wrapper(self._test_manual_puback_acquire_double_call_raises) + + # Manual PUBACK post-callback test: calling acquire_puback_control() after callback returns raises RuntimeError + def _test_manual_puback_acquire_post_callback_raises(self): + """Verify that calling acquire_puback_control() after the callback has returned raises RuntimeError.""" + input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST") + input_cert = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_CERT") + input_key = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_KEY") + + client_id = create_client_id() + topic_filter = "test/MQTT5_Binding_Python_" + client_id + payload = str(uuid.uuid4()) + + future_callback_done = Future() + saved_acquire_fn_holder = [None] + + def on_publish_received(publish_received_data: mqtt5.PublishReceivedData): + # Save the callable but do NOT call it within the callback + saved_acquire_fn_holder[0] = publish_received_data.acquire_puback_control + future_callback_done.set_result(True) + + tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path(input_cert, input_key) + client_options = mqtt5.ClientOptions(host_name=input_host_name, port=8883) + client_options.connect_options = mqtt5.ConnectPacket(client_id=client_id) + client_options.tls_ctx = io.ClientTlsContext(tls_ctx_options) + + callbacks = Mqtt5TestCallbacks() + callbacks.on_publish_received = on_publish_received + + client = self._create_client(client_options=client_options, callbacks=callbacks) + client.start() + callbacks.future_connection_success.result(TIMEOUT) + + subscriptions = [mqtt5.Subscription(topic_filter=topic_filter, qos=mqtt5.QoS.AT_LEAST_ONCE)] + subscribe_future = client.subscribe(mqtt5.SubscribePacket(subscriptions=subscriptions)) + subscribe_future.result(TIMEOUT) + + publish_future = client.publish(mqtt5.PublishPacket( + payload=payload, topic=topic_filter, qos=mqtt5.QoS.AT_LEAST_ONCE)) + publish_future.result(TIMEOUT) + + # Wait for the callback to complete + future_callback_done.result(TIMEOUT) + + # Now call acquire_puback_control() after the callback has returned, this should raise RuntimeError + acquire_fn = saved_acquire_fn_holder[0] + self.assertIsNotNone(acquire_fn, "acquire_puback_control should have been saved") + with self.assertRaises(RuntimeError): + acquire_fn() + + client.stop() + callbacks.future_stopped.result(TIMEOUT) + + def test_manual_puback_acquire_post_callback_raises(self): + test_retry_wrapper(self._test_manual_puback_acquire_post_callback_raises) + + # Manual PUBACK QoS 0 test: acquirePubackControl() throws IllegalStateException for QoS 0 messages + def _test_manual_puback_qos0_acquire_is_none(self): + """Verify that acquire_puback_control is None for QoS 0 messages.""" + input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST") + input_cert = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_CERT") + input_key = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_KEY") + + client_id = create_client_id() + topic_filter = "test/MQTT5_Binding_Python_" + client_id + payload = str(uuid.uuid4()) + + future_acquire_value = Future() + + def on_publish_received(publish_received_data: mqtt5.PublishReceivedData): + # For QoS 0, acquire_puback_control should be None + future_acquire_value.set_result(publish_received_data.acquire_puback_control) + + tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path(input_cert, input_key) + client_options = mqtt5.ClientOptions(host_name=input_host_name, port=8883) + client_options.connect_options = mqtt5.ConnectPacket(client_id=client_id) + client_options.tls_ctx = io.ClientTlsContext(tls_ctx_options) + + callbacks = Mqtt5TestCallbacks() + callbacks.on_publish_received = on_publish_received + + client = self._create_client(client_options=client_options, callbacks=callbacks) + client.start() + callbacks.future_connection_success.result(TIMEOUT) + + # Subscribe with QoS 1 so the broker delivers at QoS 0 (publish at QoS 0) + subscriptions = [mqtt5.Subscription(topic_filter=topic_filter, qos=mqtt5.QoS.AT_LEAST_ONCE)] + subscribe_future = client.subscribe(mqtt5.SubscribePacket(subscriptions=subscriptions)) + subscribe_future.result(TIMEOUT) + + # Publish at QoS 0, there's no PUBACK involved + publish_future = client.publish(mqtt5.PublishPacket( + payload=payload, topic=topic_filter, qos=mqtt5.QoS.AT_MOST_ONCE)) + publish_future.result(TIMEOUT) + + acquire_value = future_acquire_value.result(TIMEOUT) + self.assertIsNone(acquire_value, + "acquire_puback_control should be None for QoS 0 messages") + + client.stop() + callbacks.future_stopped.result(TIMEOUT) + + def test_manual_puback_qos0_acquire_is_none(self): + test_retry_wrapper(self._test_manual_puback_qos0_acquire_is_none) + # ============================================================== # RETAIN TEST CASES # ==============================================================