From 790759dd368f163965ae62663717854858e715b0 Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Fri, 20 Feb 2026 13:54:48 -0800 Subject: [PATCH 01/20] bind mqtt5 manual puback feature --- awscrt/mqtt5.py | 67 ++++++++++++++- source/module.c | 1 + source/mqtt5_client.c | 190 +++++++++++++++++++++++++++++++++++++++++- source/mqtt5_client.h | 1 + 4 files changed, 254 insertions(+), 5 deletions(-) diff --git a/awscrt/mqtt5.py b/awscrt/mqtt5.py index 7c5e4f31f..6768bdf6b 100644 --- a/awscrt/mqtt5.py +++ b/awscrt/mqtt5.py @@ -495,6 +495,27 @@ def _try_puback_reason_code(value): except Exception: return None +class ManualPubackResult(IntEnum): + """Result for a manually invoked PUBACK operation.""" + + SUCCESS = 0 + """The PUBACK was successfully sent.""" + + PUBACK_CANCELLED = 1 + """The PUBACK was cancelled and will not be sent.""" + + PUBACK_INVALID = 2 + """The PUBACK attempting to be sent is invalid.""" + + CRT_FAILURE = 3 + """The PUBACK failed to send due to a CRT failure.""" + + +def _try_manual_puback_result(value): + try: + return ManualPubackResult(value) + except Exception: + return None class SubackReasonCode(IntEnum): """Reason code inside SUBACK packet payloads. @@ -1140,6 +1161,15 @@ class PubackPacket: reason_string: str = None user_properties: 'Sequence[UserProperty]' = None +@dataclass +class InvokePubackCompletion: + """dataclass containing results of a manually invoked PUBACK + + Args: + puback_result (ManualPubackResult): Result of manually invoked PUBACK + """ + puback_result: ManualPubackResult = None + @dataclass class ConnectPacket: @@ -1228,8 +1258,10 @@ class PublishReceivedData: Args: publish_packet (PublishPacket): Data model of an `MQTT5 PUBLISH `_ packet. + acquire_puback_control (Callable): Call this function to prevent automatic PUBACK and take manual control of this PUBLISH message's PUBACK. Returns an opaque handle object that can be passed to Client.invoke_puback(). """ publish_packet: PublishPacket = None + acquire_puback_control: Callable = None @dataclass @@ -1434,7 +1466,8 @@ def _on_publish( correlation_data, subscription_identifiers_tuples, content_type, - user_properties_tuples): + user_properties_tuples, + acquire_puback_control_fn): if self._on_publish_cb is None: return @@ -1468,9 +1501,13 @@ def _on_publish( publish_packet.content_type = content_type publish_packet.user_properties = _init_user_properties(user_properties_tuples) - self._on_publish_cb(PublishReceivedData(publish_packet=publish_packet)) + # Create PublishReceivedData with the manual control callback + publish_data = PublishReceivedData( + publish_packet=publish_packet, + acquire_puback_control=acquire_puback_control_fn + ) - return + self._on_publish_cb(publish_data) def _on_lifecycle_stopped(self): if self._on_lifecycle_stopped_cb: @@ -1957,6 +1994,30 @@ def get_stats(self): result = _awscrt.mqtt5_client_get_stats(self._binding) return OperationStatisticsData(result[0], result[1], result[2], result[3]) + def invoke_puback(self, puback_control_handle): + """Sends a PUBACK packet for the given puback control handle. + + Args: + puback_control_handle: An opaque handle obtained from acquire_puback_control(). This handle cannot be created manually and must come from the acquire_puback_control() Callable within PublishReceivedData. + + Returns: + A future with InvokePubackCompletion that completes when invoked PUBACK is sent or fails to send. A successfully sent PUBACK only confirms the requested PUBACK has been sent, not that the broker has received it and/or it hasn't re-sent the PUBLISH message being acknowledged. + """ + + future = Future() + + def invokePubackComplete(puback_result): + invokePubackCompletion = InvokePubackCompletion(puback_result=_try_manual_puback_result(puback_result)) + future.set_result(invokePubackCompletion) + + _awscrt.mqtt5_client_invoke_puback( + self._binding, + puback_control_handle, + invokePubackComplete + ) + + return future + def new_connection(self, on_connection_interrupted=None, on_connection_resumed=None, on_connection_success=None, on_connection_failure=None, on_connection_closed=None): from awscrt.mqtt import Connection diff --git a/source/module.c b/source/module.c index 84b5673d2..cab6af309 100644 --- a/source/module.c +++ b/source/module.c @@ -810,6 +810,7 @@ static PyMethodDef s_module_methods[] = { AWS_PY_METHOD_DEF(mqtt5_client_subscribe, METH_VARARGS), AWS_PY_METHOD_DEF(mqtt5_client_unsubscribe, METH_VARARGS), AWS_PY_METHOD_DEF(mqtt5_client_get_stats, METH_VARARGS), + AWS_PY_METHOD_DEF(mqtt5_client_invoke_puback, METH_VARARGS), AWS_PY_METHOD_DEF(mqtt5_ws_handshake_transform_complete, METH_VARARGS), /* MQTT Request Response Client */ diff --git a/source/mqtt5_client.c b/source/mqtt5_client.c index 243af6a0e..7e6c0dcea 100644 --- a/source/mqtt5_client.c +++ b/source/mqtt5_client.c @@ -218,6 +218,60 @@ static PyObject *s_aws_set_user_properties_to_PyObject( * Publish Handler ******************************************************************************/ +static const char *s_capsule_name_puback_control_handle = "aws_puback_control_handle"; + +struct puback_control_handle { + uint64_t control_id; +}; + +static void s_puback_control_handle_destructor(PyObject *capsule) { + struct puback_control_handle *handle = PyCapsule_GetPointer(capsule, s_capsule_name_puback_control_handle); + if (handle) { + aws_mem_release(aws_py_get_allocator(), handle); + } +} + +/* Callback context for manual PUBACK control */ +struct manual_puback_control_context { + struct aws_mqtt5_client *client; + struct aws_mqtt5_packet_publish_view *publish_packet; +}; + +static void s_manual_puback_control_context_destructor(PyObject *capsule) { + struct manual_puback_control_context *context = PyCapsule_GetPointer(capsule, "manual_puback_control_context"); + if (context) { + aws_mem_release(aws_py_get_allocator(), context); + } +} + +/* Function called from Python to set manual PUBACK control and return puback_control_id */ +PyObject *aws_py_mqtt5_client_acquire_puback(PyObject *self, PyObject *args) { + (void)args; + + struct manual_puback_control_context *context = PyCapsule_GetPointer(self, "manual_puback_control_context"); + if (!context || !context->publish_packet) { + PyErr_SetString(PyExc_ValueError, "Invalid manual PUBACK control context"); + return NULL; + } + + uint64_t puback_control_id = aws_mqtt5_client_acquire_puback(context->client, context->publish_packet); + + /* Create handle struct */ + struct puback_control_handle *handle = + aws_mem_calloc(aws_py_get_allocator(), 1, sizeof(struct puback_control_handle)); + + handle->control_id = puback_control_id; + + /* Wrap in capsule */ + PyObject *capsule = PyCapsule_New(handle, s_capsule_name_puback_control_handle, s_puback_control_handle_destructor); + if (!capsule) { + aws_mem_release(aws_py_get_allocator(), handle); + return NULL; + } + + return capsule; +} + static void s_on_publish_received(const struct aws_mqtt5_packet_publish_view *publish_packet, void *user_data) { if (!user_data) { @@ -234,10 +288,46 @@ static void s_on_publish_received(const struct aws_mqtt5_packet_publish_view *pu PyObject *result = NULL; PyObject *subscription_identifier_list = NULL; PyObject *user_properties_list = NULL; + PyObject *manual_control_callback = NULL; + PyObject *control_context_capsule = NULL; size_t subscription_identifier_count = publish_packet->subscription_identifier_count; size_t user_property_count = publish_packet->user_property_count; + /* Create manual PUBACK control context */ + struct manual_puback_control_context *control_context = + aws_mem_calloc(aws_py_get_allocator(), 1, sizeof(struct manual_puback_control_context)); + if (!control_context) { + PyErr_WriteUnraisable(PyErr_Occurred()); + goto cleanup; + } + + /* Set up the context with both client and publish packet */ + control_context->client = client->native; + control_context->publish_packet = (struct aws_mqtt5_packet_publish_view *)publish_packet; + + control_context_capsule = + PyCapsule_New(control_context, "manual_puback_control_context", s_manual_puback_control_context_destructor); + if (!control_context_capsule) { + aws_mem_release(aws_py_get_allocator(), control_context); + PyErr_WriteUnraisable(PyErr_Occurred()); + goto cleanup; + } + + /* Method definition for the manual control callback */ + static PyMethodDef method_def = { + "acquire_puback_control", + aws_py_mqtt5_client_acquire_puback, + METH_NOARGS, + "Take manual control of PUBACK for this message"}; + + /* Create the manual control callback function */ + manual_control_callback = PyCFunction_New(&method_def, control_context_capsule); + if (!manual_control_callback) { + PyErr_WriteUnraisable(PyErr_Occurred()); + goto cleanup; + } + /* Create list of uint32_t subscription identifier tuples */ subscription_identifier_list = PyList_New(subscription_identifier_count); if (!subscription_identifier_list) { @@ -261,7 +351,7 @@ static void s_on_publish_received(const struct aws_mqtt5_packet_publish_view *pu result = PyObject_CallMethod( client->client_core, "_on_publish", - "(y#iOs#OiOIOHs#y#Os#O)", + "(y#iOs#OiOIOHs#y#Os#OO)", /* y */ publish_packet->payload.ptr, /* # */ publish_packet->payload.len, /* i */ (int)publish_packet->qos, @@ -284,7 +374,9 @@ static void s_on_publish_received(const struct aws_mqtt5_packet_publish_view *pu /* O */ subscription_identifier_count > 0 ? subscription_identifier_list : Py_None, /* s */ publish_packet->content_type ? publish_packet->content_type->ptr : NULL, /* # */ publish_packet->content_type ? publish_packet->content_type->len : 0, - /* O */ user_property_count > 0 ? user_properties_list : Py_None); + /* O */ user_property_count > 0 ? user_properties_list : Py_None, + /* O */ manual_control_callback); + if (!result) { PyErr_WriteUnraisable(PyErr_Occurred()); } @@ -293,6 +385,8 @@ static void s_on_publish_received(const struct aws_mqtt5_packet_publish_view *pu Py_XDECREF(result); Py_XDECREF(subscription_identifier_list); Py_XDECREF(user_properties_list); + Py_XDECREF(manual_control_callback); + Py_XDECREF(control_context_capsule); PyGILState_Release(state); } @@ -1683,6 +1777,98 @@ PyObject *aws_py_mqtt5_client_publish(PyObject *self, PyObject *args) { return NULL; } +/******************************************************************************* + * Invoke Puback + ******************************************************************************/ + +struct invoke_puback_complete_userdata { + PyObject *callback; +}; + +static void s_on_invoke_puback_complete_fn(enum aws_mqtt5_manual_puback_result puback_result, void *complete_ctx) { + struct invoke_puback_complete_userdata *metadata = complete_ctx; + assert(metadata); + + PyObject *result = NULL; + + PyGILState_STATE state; + if (aws_py_gilstate_ensure(&state)) { + return; /* Python has shut down. Nothing matters anymore, but don't crash */ + } + + result = PyObject_CallFunction(metadata->callback, "(i)", (int)puback_result); + + if (!result) { + PyErr_WriteUnraisable(PyErr_Occurred()); + } + + // cleanup + Py_XDECREF(metadata->callback); + Py_XDECREF(result); + + PyGILState_Release(state); + + aws_mem_release(aws_py_get_allocator(), metadata); +} + +PyObject *aws_py_mqtt5_client_invoke_puback(PyObject *self, PyObject *args) { + (void)self; + bool success = false; + + PyObject *impl_capsule; + PyObject *puback_handle_capsule; + PyObject *manual_puback_callback_fn_py; + + if (!PyArg_ParseTuple( + args, + "OOO", + /* O */ &impl_capsule, + /* O */ &puback_handle_capsule, + /* O */ &manual_puback_callback_fn_py)) { + return NULL; + } + + struct mqtt5_client_binding *client = PyCapsule_GetPointer(impl_capsule, s_capsule_name_mqtt5_client); + if (!client) { + return NULL; + } + + /* Extract handle from capsule */ + struct puback_control_handle *handle = + PyCapsule_GetPointer(puback_handle_capsule, s_capsule_name_puback_control_handle); + if (!handle) { + PyErr_SetString(PyExc_TypeError, "Invalid PUBACK control handle"); + return NULL; + } + + /* callback related must be cleaned up after this point */ + struct invoke_puback_complete_userdata *metadata = + aws_mem_calloc(aws_py_get_allocator(), 1, sizeof(struct invoke_puback_complete_userdata)); + metadata->callback = manual_puback_callback_fn_py; + Py_INCREF(metadata->callback); + + struct aws_mqtt5_manual_puback_completion_options manual_puback_completion_options = { + .completion_callback = &s_on_invoke_puback_complete_fn, .completion_user_data = metadata}; + + if (aws_mqtt5_client_invoke_puback(client->native, handle->control_id, &manual_puback_completion_options)) { + PyErr_SetAwsLastError(); + goto manual_puback_failed; + } + + success = true; + goto done; + +manual_puback_failed: + Py_XDECREF(manual_puback_callback_fn_py); + aws_mem_release(aws_py_get_allocator(), metadata); + +done: + if (success) { + Py_RETURN_NONE; + } + return NULL; +} + /******************************************************************************* * Subscribe ******************************************************************************/ diff --git a/source/mqtt5_client.h b/source/mqtt5_client.h index 46c135f82..b9bc54f16 100644 --- a/source/mqtt5_client.h +++ b/source/mqtt5_client.h @@ -14,6 +14,7 @@ PyObject *aws_py_mqtt5_client_publish(PyObject *self, PyObject *args); PyObject *aws_py_mqtt5_client_subscribe(PyObject *self, PyObject *args); PyObject *aws_py_mqtt5_client_unsubscribe(PyObject *self, PyObject *args); PyObject *aws_py_mqtt5_client_get_stats(PyObject *self, PyObject *args); +PyObject *aws_py_mqtt5_client_invoke_puback(PyObject *self, PyObject *args); PyObject *aws_py_mqtt5_ws_handshake_transform_complete(PyObject *self, PyObject *args); From b45f3beeacc73ad33c2eab2f610a49eb7284caa1 Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Mon, 2 Mar 2026 13:23:09 -0800 Subject: [PATCH 02/20] remove completion options --- awscrt/mqtt5.py | 45 +-------------------------------------------- crt/aws-c-mqtt | 2 +- 2 files changed, 2 insertions(+), 45 deletions(-) diff --git a/awscrt/mqtt5.py b/awscrt/mqtt5.py index 6768bdf6b..d7f6f3aa2 100644 --- a/awscrt/mqtt5.py +++ b/awscrt/mqtt5.py @@ -495,27 +495,6 @@ def _try_puback_reason_code(value): except Exception: return None -class ManualPubackResult(IntEnum): - """Result for a manually invoked PUBACK operation.""" - - SUCCESS = 0 - """The PUBACK was successfully sent.""" - - PUBACK_CANCELLED = 1 - """The PUBACK was cancelled and will not be sent.""" - - PUBACK_INVALID = 2 - """The PUBACK attempting to be sent is invalid.""" - - CRT_FAILURE = 3 - """The PUBACK failed to send due to a CRT failure.""" - - -def _try_manual_puback_result(value): - try: - return ManualPubackResult(value) - except Exception: - return None class SubackReasonCode(IntEnum): """Reason code inside SUBACK packet payloads. @@ -1161,15 +1140,6 @@ class PubackPacket: reason_string: str = None user_properties: 'Sequence[UserProperty]' = None -@dataclass -class InvokePubackCompletion: - """dataclass containing results of a manually invoked PUBACK - - Args: - puback_result (ManualPubackResult): Result of manually invoked PUBACK - """ - puback_result: ManualPubackResult = None - @dataclass class ConnectPacket: @@ -1999,24 +1969,11 @@ def invoke_puback(self, puback_control_handle): Args: puback_control_handle: An opaque handle obtained from acquire_puback_control(). This handle cannot be created manually and must come from the acquire_puback_control() Callable within PublishReceivedData. - - Returns: - A future with InvokePubackCompletion that completes when invoked PUBACK is sent or fails to send. A successfully sent PUBACK only confirms the requested PUBACK has been sent, not that the broker has received it and/or it hasn't re-sent the PUBLISH message being acknowledged. """ - - future = Future() - - def invokePubackComplete(puback_result): - invokePubackCompletion = InvokePubackCompletion(puback_result=_try_manual_puback_result(puback_result)) - future.set_result(invokePubackCompletion) _awscrt.mqtt5_client_invoke_puback( self._binding, - puback_control_handle, - invokePubackComplete - ) - - return future + puback_control_handle) def new_connection(self, on_connection_interrupted=None, on_connection_resumed=None, on_connection_success=None, on_connection_failure=None, on_connection_closed=None): diff --git a/crt/aws-c-mqtt b/crt/aws-c-mqtt index 1d512d927..37741c07a 160000 --- a/crt/aws-c-mqtt +++ b/crt/aws-c-mqtt @@ -1 +1 @@ -Subproject commit 1d512d92709f60b74e2cafa018e69a2e647f28e9 +Subproject commit 37741c07a23d35a700100a8fa6628127673ed012 From 1c380bf22d9a74b9c6323b1af1b150819311ae77 Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Mon, 2 Mar 2026 13:26:52 -0800 Subject: [PATCH 03/20] remove completion options and lint --- awscrt/mqtt5.py | 4 +-- source/mqtt5_client.c | 59 ++++--------------------------------------- 2 files changed, 7 insertions(+), 56 deletions(-) diff --git a/awscrt/mqtt5.py b/awscrt/mqtt5.py index d7f6f3aa2..012a63dc9 100644 --- a/awscrt/mqtt5.py +++ b/awscrt/mqtt5.py @@ -1473,7 +1473,7 @@ def _on_publish( # Create PublishReceivedData with the manual control callback publish_data = PublishReceivedData( - publish_packet=publish_packet, + publish_packet=publish_packet, acquire_puback_control=acquire_puback_control_fn ) @@ -1970,7 +1970,7 @@ def invoke_puback(self, puback_control_handle): Args: puback_control_handle: An opaque handle obtained from acquire_puback_control(). This handle cannot be created manually and must come from the acquire_puback_control() Callable within PublishReceivedData. """ - + _awscrt.mqtt5_client_invoke_puback( self._binding, puback_control_handle) diff --git a/source/mqtt5_client.c b/source/mqtt5_client.c index 7e6c0dcea..1700da6ba 100644 --- a/source/mqtt5_client.c +++ b/source/mqtt5_client.c @@ -1781,50 +1781,18 @@ PyObject *aws_py_mqtt5_client_publish(PyObject *self, PyObject *args) { * Invoke Puback ******************************************************************************/ -struct invoke_puback_complete_userdata { - PyObject *callback; -}; - -static void s_on_invoke_puback_complete_fn(enum aws_mqtt5_manual_puback_result puback_result, void *complete_ctx) { - struct invoke_puback_complete_userdata *metadata = complete_ctx; - assert(metadata); - - PyObject *result = NULL; - - PyGILState_STATE state; - if (aws_py_gilstate_ensure(&state)) { - return; /* Python has shut down. Nothing matters anymore, but don't crash */ - } - - result = PyObject_CallFunction(metadata->callback, "(i)", (int)puback_result); - - if (!result) { - PyErr_WriteUnraisable(PyErr_Occurred()); - } - - // cleanup - Py_XDECREF(metadata->callback); - Py_XDECREF(result); - - PyGILState_Release(state); - - aws_mem_release(aws_py_get_allocator(), metadata); -} - PyObject *aws_py_mqtt5_client_invoke_puback(PyObject *self, PyObject *args) { (void)self; - bool success = false; + bool success = true; PyObject *impl_capsule; PyObject *puback_handle_capsule; - PyObject *manual_puback_callback_fn_py; if (!PyArg_ParseTuple( args, - "OOO", + "OO", /* O */ &impl_capsule, - /* O */ &puback_handle_capsule, - /* O */ &manual_puback_callback_fn_py)) { + /* O */ &puback_handle_capsule)) { return NULL; } @@ -1841,28 +1809,11 @@ PyObject *aws_py_mqtt5_client_invoke_puback(PyObject *self, PyObject *args) { return NULL; } - /* callback related must be cleaned up after this point */ - struct invoke_puback_complete_userdata *metadata = - aws_mem_calloc(aws_py_get_allocator(), 1, sizeof(struct invoke_puback_complete_userdata)); - metadata->callback = manual_puback_callback_fn_py; - Py_INCREF(metadata->callback); - - struct aws_mqtt5_manual_puback_completion_options manual_puback_completion_options = { - .completion_callback = &s_on_invoke_puback_complete_fn, .completion_user_data = metadata}; - - if (aws_mqtt5_client_invoke_puback(client->native, handle->control_id, &manual_puback_completion_options)) { + if (aws_mqtt5_client_invoke_puback(client->native, handle->control_id, NULL)) { PyErr_SetAwsLastError(); - goto manual_puback_failed; + success = false; } - success = true; - goto done; - -manual_puback_failed: - Py_XDECREF(manual_puback_callback_fn_py); - aws_mem_release(aws_py_get_allocator(), metadata); - -done: if (success) { Py_RETURN_NONE; } From d872ccba38c6667e33ce81d39467ed49dd5ff866 Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Mon, 9 Mar 2026 09:48:18 -0700 Subject: [PATCH 04/20] comments --- awscrt/mqtt5.py | 7 +++++-- source/mqtt5_client.c | 1 + 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/awscrt/mqtt5.py b/awscrt/mqtt5.py index 012a63dc9..0e2993a20 100644 --- a/awscrt/mqtt5.py +++ b/awscrt/mqtt5.py @@ -1228,7 +1228,7 @@ class PublishReceivedData: Args: publish_packet (PublishPacket): Data model of an `MQTT5 PUBLISH `_ packet. - acquire_puback_control (Callable): Call this function to prevent automatic PUBACK and take manual control of this PUBLISH message's PUBACK. Returns an opaque handle object that can be passed to Client.invoke_puback(). + acquire_puback_control (Callable): Acquires manual control over the PUBACK for this QoS 1 PUBLISH message, preventing the client from automatically sending a PUBACK. The returned handle can be passed to invoke_puback() to send the PUBACK to the broker. This method MUST be called within the message received callback. """ publish_packet: PublishPacket = None acquire_puback_control: Callable = None @@ -1965,7 +1965,10 @@ def get_stats(self): return OperationStatisticsData(result[0], result[1], result[2], result[3]) def invoke_puback(self, puback_control_handle): - """Sends a PUBACK packet for the given puback control handle. + """Sends a PUBACK packet for a QoS 1 PUBLISH that was previously acquired for manual control. + To use manual PUBACK control, call acquire_puback_control() within the on_publish_callback_fn + callback to obtain an opaque handle object. Then call this method with the opaque handle object + to send the PUBACK. Args: puback_control_handle: An opaque handle obtained from acquire_puback_control(). This handle cannot be created manually and must come from the acquire_puback_control() Callable within PublishReceivedData. diff --git a/source/mqtt5_client.c b/source/mqtt5_client.c index 1700da6ba..d87a49858 100644 --- a/source/mqtt5_client.c +++ b/source/mqtt5_client.c @@ -220,6 +220,7 @@ static PyObject *s_aws_set_user_properties_to_PyObject( static const char *s_capsule_name_puback_control_handle = "aws_puback_control_handle"; +/* An opaque handle representing manual control over a QoS 1 PUBACK for a received PUBLISH packet. */ struct puback_control_handle { uint64_t control_id; }; From 7114645fec9709067d5281e47ffbd4bb88bfe72d Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Mon, 9 Mar 2026 09:50:17 -0700 Subject: [PATCH 05/20] lint --- awscrt/mqtt5.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awscrt/mqtt5.py b/awscrt/mqtt5.py index 0e2993a20..52381db94 100644 --- a/awscrt/mqtt5.py +++ b/awscrt/mqtt5.py @@ -1966,7 +1966,7 @@ def get_stats(self): def invoke_puback(self, puback_control_handle): """Sends a PUBACK packet for a QoS 1 PUBLISH that was previously acquired for manual control. - To use manual PUBACK control, call acquire_puback_control() within the on_publish_callback_fn + To use manual PUBACK control, call acquire_puback_control() within the on_publish_callback_fn callback to obtain an opaque handle object. Then call this method with the opaque handle object to send the PUBACK. From 26b55841ee0abe53c4d2313d46ba7050c392b341 Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Mon, 9 Mar 2026 10:24:03 -0700 Subject: [PATCH 06/20] unify api language --- awscrt/mqtt5.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awscrt/mqtt5.py b/awscrt/mqtt5.py index 52381db94..544f42179 100644 --- a/awscrt/mqtt5.py +++ b/awscrt/mqtt5.py @@ -1228,7 +1228,7 @@ class PublishReceivedData: Args: publish_packet (PublishPacket): Data model of an `MQTT5 PUBLISH `_ packet. - acquire_puback_control (Callable): Acquires manual control over the PUBACK for this QoS 1 PUBLISH message, preventing the client from automatically sending a PUBACK. The returned handle can be passed to invoke_puback() to send the PUBACK to the broker. This method MUST be called within the message received callback. + acquire_puback_control (Callable): Acquires manual control over the PUBACK for this QoS 1 PUBLISH message, preventing the client from automatically sending a PUBACK. The returned handle can be passed to invoke_puback() at a later time to send the PUBACK to the broker. This method MUST be called within the message received callback. If this method is not called, the client will automatically send a PUBACK for QoS 1 messages when the callback returns. """ publish_packet: PublishPacket = None acquire_puback_control: Callable = None From a406590bb604071db09b05636ace4fd9e541b296 Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Wed, 11 Mar 2026 13:34:27 -0700 Subject: [PATCH 07/20] latest submodules --- crt/aws-c-auth | 2 +- crt/aws-c-event-stream | 2 +- crt/aws-c-http | 2 +- crt/aws-c-io | 2 +- crt/aws-c-mqtt | 2 +- crt/aws-checksums | 2 +- crt/aws-lc | 2 +- crt/s2n | 2 +- 8 files changed, 8 insertions(+), 8 deletions(-) diff --git a/crt/aws-c-auth b/crt/aws-c-auth index a4409b95d..fc4b87655 160000 --- a/crt/aws-c-auth +++ b/crt/aws-c-auth @@ -1 +1 @@ -Subproject commit a4409b95dad40a45b81e5fc9ff96f41386845e4f +Subproject commit fc4b87655e5cd3921f18d1859193c74af4102071 diff --git a/crt/aws-c-event-stream b/crt/aws-c-event-stream index f43a3d24a..c741f95e9 160000 --- a/crt/aws-c-event-stream +++ b/crt/aws-c-event-stream @@ -1 +1 @@ -Subproject commit f43a3d24a7c1f8b50f709ccb4fdf4c7fd2827fff +Subproject commit c741f95e9050a1a4bed4b3aa7543bd3e024f6e56 diff --git a/crt/aws-c-http b/crt/aws-c-http index acf313990..0d8e1a933 160000 --- a/crt/aws-c-http +++ b/crt/aws-c-http @@ -1 +1 @@ -Subproject commit acf31399077300c522315612dd2be09cfe48b5b8 +Subproject commit 0d8e1a933f46b8af984dfc8168ebcdf32748c184 diff --git a/crt/aws-c-io b/crt/aws-c-io index d5ad01cef..bfb0819d3 160000 --- a/crt/aws-c-io +++ b/crt/aws-c-io @@ -1 +1 @@ -Subproject commit d5ad01cef5d027f65c6f5b460972bae0a469779d +Subproject commit bfb0819d3906502483611ce832a5ec6b897c8421 diff --git a/crt/aws-c-mqtt b/crt/aws-c-mqtt index 37741c07a..dc2fe7be8 160000 --- a/crt/aws-c-mqtt +++ b/crt/aws-c-mqtt @@ -1 +1 @@ -Subproject commit 37741c07a23d35a700100a8fa6628127673ed012 +Subproject commit dc2fe7be81070f7c5095ad386d9a23c180a4276b diff --git a/crt/aws-checksums b/crt/aws-checksums index c412c6360..1d5f2f1f3 160000 --- a/crt/aws-checksums +++ b/crt/aws-checksums @@ -1 +1 @@ -Subproject commit c412c636091501c2cd544d23664c8d14999e9dcc +Subproject commit 1d5f2f1f3e5d013aae8810878ceb5b3f6f258c4e diff --git a/crt/aws-lc b/crt/aws-lc index 728811eec..95187142b 160000 --- a/crt/aws-lc +++ b/crt/aws-lc @@ -1 +1 @@ -Subproject commit 728811eecec794802c78105be6dbfe9d79870ac7 +Subproject commit 95187142bb3e7b87bf3a18767ac6e47862c0ea35 diff --git a/crt/s2n b/crt/s2n index 3276a0876..f5e5e8303 160000 --- a/crt/s2n +++ b/crt/s2n @@ -1 +1 @@ -Subproject commit 3276a0876054e9efbeab4a42f34ef60b0bf58c91 +Subproject commit f5e5e83031be60691f22442373fb8371274fcd56 From a7d62c195140c6ac6d5da40e8243a486c0f08c1f Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Wed, 11 Mar 2026 13:40:08 -0700 Subject: [PATCH 08/20] main submodules --- crt/aws-lc | 2 +- crt/s2n | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crt/aws-lc b/crt/aws-lc index 95187142b..37d86461a 160000 --- a/crt/aws-lc +++ b/crt/aws-lc @@ -1 +1 @@ -Subproject commit 95187142bb3e7b87bf3a18767ac6e47862c0ea35 +Subproject commit 37d86461a95782fd5d8b77873f9e1cb134ea2f95 diff --git a/crt/s2n b/crt/s2n index f5e5e8303..e82d62593 160000 --- a/crt/s2n +++ b/crt/s2n @@ -1 +1 @@ -Subproject commit f5e5e83031be60691f22442373fb8371274fcd56 +Subproject commit e82d625935141ea0eb73c9b54c29682a2ebcf0d4 From db9e100355d57f52b783413b07058556a37a9009 Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Wed, 11 Mar 2026 13:41:31 -0700 Subject: [PATCH 09/20] main aws-c-mqtt --- crt/aws-c-mqtt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crt/aws-c-mqtt b/crt/aws-c-mqtt index dc2fe7be8..41b6a7d6d 160000 --- a/crt/aws-c-mqtt +++ b/crt/aws-c-mqtt @@ -1 +1 @@ -Subproject commit dc2fe7be81070f7c5095ad386d9a23c180a4276b +Subproject commit 41b6a7d6d566a56eff69743df66c077d56a80c9d From 37418248d555d17a085728cf8f7d717d4d45af7b Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Fri, 20 Feb 2026 13:54:48 -0800 Subject: [PATCH 10/20] bind mqtt5 manual puback feature --- awscrt/mqtt5.py | 67 ++++++++++++++- source/module.c | 1 + source/mqtt5_client.c | 190 +++++++++++++++++++++++++++++++++++++++++- source/mqtt5_client.h | 1 + 4 files changed, 254 insertions(+), 5 deletions(-) diff --git a/awscrt/mqtt5.py b/awscrt/mqtt5.py index 7c5e4f31f..6768bdf6b 100644 --- a/awscrt/mqtt5.py +++ b/awscrt/mqtt5.py @@ -495,6 +495,27 @@ def _try_puback_reason_code(value): except Exception: return None +class ManualPubackResult(IntEnum): + """Result for a manually invoked PUBACK operation.""" + + SUCCESS = 0 + """The PUBACK was successfully sent.""" + + PUBACK_CANCELLED = 1 + """The PUBACK was cancelled and will not be sent.""" + + PUBACK_INVALID = 2 + """The PUBACK attempting to be sent is invalid.""" + + CRT_FAILURE = 3 + """The PUBACK failed to send due to a CRT failure.""" + + +def _try_manual_puback_result(value): + try: + return ManualPubackResult(value) + except Exception: + return None class SubackReasonCode(IntEnum): """Reason code inside SUBACK packet payloads. @@ -1140,6 +1161,15 @@ class PubackPacket: reason_string: str = None user_properties: 'Sequence[UserProperty]' = None +@dataclass +class InvokePubackCompletion: + """dataclass containing results of a manually invoked PUBACK + + Args: + puback_result (ManualPubackResult): Result of manually invoked PUBACK + """ + puback_result: ManualPubackResult = None + @dataclass class ConnectPacket: @@ -1228,8 +1258,10 @@ class PublishReceivedData: Args: publish_packet (PublishPacket): Data model of an `MQTT5 PUBLISH `_ packet. + acquire_puback_control (Callable): Call this function to prevent automatic PUBACK and take manual control of this PUBLISH message's PUBACK. Returns an opaque handle object that can be passed to Client.invoke_puback(). """ publish_packet: PublishPacket = None + acquire_puback_control: Callable = None @dataclass @@ -1434,7 +1466,8 @@ def _on_publish( correlation_data, subscription_identifiers_tuples, content_type, - user_properties_tuples): + user_properties_tuples, + acquire_puback_control_fn): if self._on_publish_cb is None: return @@ -1468,9 +1501,13 @@ def _on_publish( publish_packet.content_type = content_type publish_packet.user_properties = _init_user_properties(user_properties_tuples) - self._on_publish_cb(PublishReceivedData(publish_packet=publish_packet)) + # Create PublishReceivedData with the manual control callback + publish_data = PublishReceivedData( + publish_packet=publish_packet, + acquire_puback_control=acquire_puback_control_fn + ) - return + self._on_publish_cb(publish_data) def _on_lifecycle_stopped(self): if self._on_lifecycle_stopped_cb: @@ -1957,6 +1994,30 @@ def get_stats(self): result = _awscrt.mqtt5_client_get_stats(self._binding) return OperationStatisticsData(result[0], result[1], result[2], result[3]) + def invoke_puback(self, puback_control_handle): + """Sends a PUBACK packet for the given puback control handle. + + Args: + puback_control_handle: An opaque handle obtained from acquire_puback_control(). This handle cannot be created manually and must come from the acquire_puback_control() Callable within PublishReceivedData. + + Returns: + A future with InvokePubackCompletion that completes when invoked PUBACK is sent or fails to send. A successfully sent PUBACK only confirms the requested PUBACK has been sent, not that the broker has received it and/or it hasn't re-sent the PUBLISH message being acknowledged. + """ + + future = Future() + + def invokePubackComplete(puback_result): + invokePubackCompletion = InvokePubackCompletion(puback_result=_try_manual_puback_result(puback_result)) + future.set_result(invokePubackCompletion) + + _awscrt.mqtt5_client_invoke_puback( + self._binding, + puback_control_handle, + invokePubackComplete + ) + + return future + def new_connection(self, on_connection_interrupted=None, on_connection_resumed=None, on_connection_success=None, on_connection_failure=None, on_connection_closed=None): from awscrt.mqtt import Connection diff --git a/source/module.c b/source/module.c index 0b752e03d..6e2452246 100644 --- a/source/module.c +++ b/source/module.c @@ -810,6 +810,7 @@ static PyMethodDef s_module_methods[] = { AWS_PY_METHOD_DEF(mqtt5_client_subscribe, METH_VARARGS), AWS_PY_METHOD_DEF(mqtt5_client_unsubscribe, METH_VARARGS), AWS_PY_METHOD_DEF(mqtt5_client_get_stats, METH_VARARGS), + AWS_PY_METHOD_DEF(mqtt5_client_invoke_puback, METH_VARARGS), AWS_PY_METHOD_DEF(mqtt5_ws_handshake_transform_complete, METH_VARARGS), /* MQTT Request Response Client */ diff --git a/source/mqtt5_client.c b/source/mqtt5_client.c index 243af6a0e..7e6c0dcea 100644 --- a/source/mqtt5_client.c +++ b/source/mqtt5_client.c @@ -218,6 +218,60 @@ static PyObject *s_aws_set_user_properties_to_PyObject( * Publish Handler ******************************************************************************/ +static const char *s_capsule_name_puback_control_handle = "aws_puback_control_handle"; + +struct puback_control_handle { + uint64_t control_id; +}; + +static void s_puback_control_handle_destructor(PyObject *capsule) { + struct puback_control_handle *handle = PyCapsule_GetPointer(capsule, s_capsule_name_puback_control_handle); + if (handle) { + aws_mem_release(aws_py_get_allocator(), handle); + } +} + +/* Callback context for manual PUBACK control */ +struct manual_puback_control_context { + struct aws_mqtt5_client *client; + struct aws_mqtt5_packet_publish_view *publish_packet; +}; + +static void s_manual_puback_control_context_destructor(PyObject *capsule) { + struct manual_puback_control_context *context = PyCapsule_GetPointer(capsule, "manual_puback_control_context"); + if (context) { + aws_mem_release(aws_py_get_allocator(), context); + } +} + +/* Function called from Python to set manual PUBACK control and return puback_control_id */ +PyObject *aws_py_mqtt5_client_acquire_puback(PyObject *self, PyObject *args) { + (void)args; + + struct manual_puback_control_context *context = PyCapsule_GetPointer(self, "manual_puback_control_context"); + if (!context || !context->publish_packet) { + PyErr_SetString(PyExc_ValueError, "Invalid manual PUBACK control context"); + return NULL; + } + + uint64_t puback_control_id = aws_mqtt5_client_acquire_puback(context->client, context->publish_packet); + + /* Create handle struct */ + struct puback_control_handle *handle = + aws_mem_calloc(aws_py_get_allocator(), 1, sizeof(struct puback_control_handle)); + + handle->control_id = puback_control_id; + + /* Wrap in capsule */ + PyObject *capsule = PyCapsule_New(handle, s_capsule_name_puback_control_handle, s_puback_control_handle_destructor); + if (!capsule) { + aws_mem_release(aws_py_get_allocator(), handle); + return NULL; + } + + return capsule; +} + static void s_on_publish_received(const struct aws_mqtt5_packet_publish_view *publish_packet, void *user_data) { if (!user_data) { @@ -234,10 +288,46 @@ static void s_on_publish_received(const struct aws_mqtt5_packet_publish_view *pu PyObject *result = NULL; PyObject *subscription_identifier_list = NULL; PyObject *user_properties_list = NULL; + PyObject *manual_control_callback = NULL; + PyObject *control_context_capsule = NULL; size_t subscription_identifier_count = publish_packet->subscription_identifier_count; size_t user_property_count = publish_packet->user_property_count; + /* Create manual PUBACK control context */ + struct manual_puback_control_context *control_context = + aws_mem_calloc(aws_py_get_allocator(), 1, sizeof(struct manual_puback_control_context)); + if (!control_context) { + PyErr_WriteUnraisable(PyErr_Occurred()); + goto cleanup; + } + + /* Set up the context with both client and publish packet */ + control_context->client = client->native; + control_context->publish_packet = (struct aws_mqtt5_packet_publish_view *)publish_packet; + + control_context_capsule = + PyCapsule_New(control_context, "manual_puback_control_context", s_manual_puback_control_context_destructor); + if (!control_context_capsule) { + aws_mem_release(aws_py_get_allocator(), control_context); + PyErr_WriteUnraisable(PyErr_Occurred()); + goto cleanup; + } + + /* Method definition for the manual control callback */ + static PyMethodDef method_def = { + "acquire_puback_control", + aws_py_mqtt5_client_acquire_puback, + METH_NOARGS, + "Take manual control of PUBACK for this message"}; + + /* Create the manual control callback function */ + manual_control_callback = PyCFunction_New(&method_def, control_context_capsule); + if (!manual_control_callback) { + PyErr_WriteUnraisable(PyErr_Occurred()); + goto cleanup; + } + /* Create list of uint32_t subscription identifier tuples */ subscription_identifier_list = PyList_New(subscription_identifier_count); if (!subscription_identifier_list) { @@ -261,7 +351,7 @@ static void s_on_publish_received(const struct aws_mqtt5_packet_publish_view *pu result = PyObject_CallMethod( client->client_core, "_on_publish", - "(y#iOs#OiOIOHs#y#Os#O)", + "(y#iOs#OiOIOHs#y#Os#OO)", /* y */ publish_packet->payload.ptr, /* # */ publish_packet->payload.len, /* i */ (int)publish_packet->qos, @@ -284,7 +374,9 @@ static void s_on_publish_received(const struct aws_mqtt5_packet_publish_view *pu /* O */ subscription_identifier_count > 0 ? subscription_identifier_list : Py_None, /* s */ publish_packet->content_type ? publish_packet->content_type->ptr : NULL, /* # */ publish_packet->content_type ? publish_packet->content_type->len : 0, - /* O */ user_property_count > 0 ? user_properties_list : Py_None); + /* O */ user_property_count > 0 ? user_properties_list : Py_None, + /* O */ manual_control_callback); + if (!result) { PyErr_WriteUnraisable(PyErr_Occurred()); } @@ -293,6 +385,8 @@ static void s_on_publish_received(const struct aws_mqtt5_packet_publish_view *pu Py_XDECREF(result); Py_XDECREF(subscription_identifier_list); Py_XDECREF(user_properties_list); + Py_XDECREF(manual_control_callback); + Py_XDECREF(control_context_capsule); PyGILState_Release(state); } @@ -1683,6 +1777,98 @@ PyObject *aws_py_mqtt5_client_publish(PyObject *self, PyObject *args) { return NULL; } +/******************************************************************************* + * Invoke Puback + ******************************************************************************/ + +struct invoke_puback_complete_userdata { + PyObject *callback; +}; + +static void s_on_invoke_puback_complete_fn(enum aws_mqtt5_manual_puback_result puback_result, void *complete_ctx) { + struct invoke_puback_complete_userdata *metadata = complete_ctx; + assert(metadata); + + PyObject *result = NULL; + + PyGILState_STATE state; + if (aws_py_gilstate_ensure(&state)) { + return; /* Python has shut down. Nothing matters anymore, but don't crash */ + } + + result = PyObject_CallFunction(metadata->callback, "(i)", (int)puback_result); + + if (!result) { + PyErr_WriteUnraisable(PyErr_Occurred()); + } + + // cleanup + Py_XDECREF(metadata->callback); + Py_XDECREF(result); + + PyGILState_Release(state); + + aws_mem_release(aws_py_get_allocator(), metadata); +} + +PyObject *aws_py_mqtt5_client_invoke_puback(PyObject *self, PyObject *args) { + (void)self; + bool success = false; + + PyObject *impl_capsule; + PyObject *puback_handle_capsule; + PyObject *manual_puback_callback_fn_py; + + if (!PyArg_ParseTuple( + args, + "OOO", + /* O */ &impl_capsule, + /* O */ &puback_handle_capsule, + /* O */ &manual_puback_callback_fn_py)) { + return NULL; + } + + struct mqtt5_client_binding *client = PyCapsule_GetPointer(impl_capsule, s_capsule_name_mqtt5_client); + if (!client) { + return NULL; + } + + /* Extract handle from capsule */ + struct puback_control_handle *handle = + PyCapsule_GetPointer(puback_handle_capsule, s_capsule_name_puback_control_handle); + if (!handle) { + PyErr_SetString(PyExc_TypeError, "Invalid PUBACK control handle"); + return NULL; + } + + /* callback related must be cleaned up after this point */ + struct invoke_puback_complete_userdata *metadata = + aws_mem_calloc(aws_py_get_allocator(), 1, sizeof(struct invoke_puback_complete_userdata)); + metadata->callback = manual_puback_callback_fn_py; + Py_INCREF(metadata->callback); + + struct aws_mqtt5_manual_puback_completion_options manual_puback_completion_options = { + .completion_callback = &s_on_invoke_puback_complete_fn, .completion_user_data = metadata}; + + if (aws_mqtt5_client_invoke_puback(client->native, handle->control_id, &manual_puback_completion_options)) { + PyErr_SetAwsLastError(); + goto manual_puback_failed; + } + + success = true; + goto done; + +manual_puback_failed: + Py_XDECREF(manual_puback_callback_fn_py); + aws_mem_release(aws_py_get_allocator(), metadata); + +done: + if (success) { + Py_RETURN_NONE; + } + return NULL; +} + /******************************************************************************* * Subscribe ******************************************************************************/ diff --git a/source/mqtt5_client.h b/source/mqtt5_client.h index 46c135f82..b9bc54f16 100644 --- a/source/mqtt5_client.h +++ b/source/mqtt5_client.h @@ -14,6 +14,7 @@ PyObject *aws_py_mqtt5_client_publish(PyObject *self, PyObject *args); PyObject *aws_py_mqtt5_client_subscribe(PyObject *self, PyObject *args); PyObject *aws_py_mqtt5_client_unsubscribe(PyObject *self, PyObject *args); PyObject *aws_py_mqtt5_client_get_stats(PyObject *self, PyObject *args); +PyObject *aws_py_mqtt5_client_invoke_puback(PyObject *self, PyObject *args); PyObject *aws_py_mqtt5_ws_handshake_transform_complete(PyObject *self, PyObject *args); From 417d77ceb196ceacc77a4738fea78fecb053a8db Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Wed, 11 Mar 2026 13:50:27 -0700 Subject: [PATCH 11/20] point to latest aws-c-mqtt --- crt/aws-c-mqtt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crt/aws-c-mqtt b/crt/aws-c-mqtt index 41b6a7d6d..dc2fe7be8 160000 --- a/crt/aws-c-mqtt +++ b/crt/aws-c-mqtt @@ -1 +1 @@ -Subproject commit 41b6a7d6d566a56eff69743df66c077d56a80c9d +Subproject commit dc2fe7be81070f7c5095ad386d9a23c180a4276b From b7245db13b9a0529aecf5a8cd6748e045f91b23b Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Wed, 11 Mar 2026 13:53:59 -0700 Subject: [PATCH 12/20] lint --- awscrt/mqtt5.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/awscrt/mqtt5.py b/awscrt/mqtt5.py index 6768bdf6b..8299369cb 100644 --- a/awscrt/mqtt5.py +++ b/awscrt/mqtt5.py @@ -495,6 +495,7 @@ def _try_puback_reason_code(value): except Exception: return None + class ManualPubackResult(IntEnum): """Result for a manually invoked PUBACK operation.""" @@ -517,6 +518,7 @@ def _try_manual_puback_result(value): except Exception: return None + class SubackReasonCode(IntEnum): """Reason code inside SUBACK packet payloads. @@ -1161,6 +1163,7 @@ class PubackPacket: reason_string: str = None user_properties: 'Sequence[UserProperty]' = None + @dataclass class InvokePubackCompletion: """dataclass containing results of a manually invoked PUBACK @@ -1503,7 +1506,7 @@ def _on_publish( # Create PublishReceivedData with the manual control callback publish_data = PublishReceivedData( - publish_packet=publish_packet, + publish_packet=publish_packet, acquire_puback_control=acquire_puback_control_fn ) @@ -1999,7 +2002,7 @@ def invoke_puback(self, puback_control_handle): Args: puback_control_handle: An opaque handle obtained from acquire_puback_control(). This handle cannot be created manually and must come from the acquire_puback_control() Callable within PublishReceivedData. - + Returns: A future with InvokePubackCompletion that completes when invoked PUBACK is sent or fails to send. A successfully sent PUBACK only confirms the requested PUBACK has been sent, not that the broker has received it and/or it hasn't re-sent the PUBLISH message being acknowledged. """ @@ -2009,7 +2012,7 @@ def invoke_puback(self, puback_control_handle): def invokePubackComplete(puback_result): invokePubackCompletion = InvokePubackCompletion(puback_result=_try_manual_puback_result(puback_result)) future.set_result(invokePubackCompletion) - + _awscrt.mqtt5_client_invoke_puback( self._binding, puback_control_handle, From 94daf2d5e01128e848fb55d879972a33da7f5d3c Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Wed, 11 Mar 2026 14:03:56 -0700 Subject: [PATCH 13/20] restore removal of completion options --- source/mqtt5_client.c | 59 ++++--------------------------------------- 1 file changed, 5 insertions(+), 54 deletions(-) diff --git a/source/mqtt5_client.c b/source/mqtt5_client.c index 7e6c0dcea..1700da6ba 100644 --- a/source/mqtt5_client.c +++ b/source/mqtt5_client.c @@ -1781,50 +1781,18 @@ PyObject *aws_py_mqtt5_client_publish(PyObject *self, PyObject *args) { * Invoke Puback ******************************************************************************/ -struct invoke_puback_complete_userdata { - PyObject *callback; -}; - -static void s_on_invoke_puback_complete_fn(enum aws_mqtt5_manual_puback_result puback_result, void *complete_ctx) { - struct invoke_puback_complete_userdata *metadata = complete_ctx; - assert(metadata); - - PyObject *result = NULL; - - PyGILState_STATE state; - if (aws_py_gilstate_ensure(&state)) { - return; /* Python has shut down. Nothing matters anymore, but don't crash */ - } - - result = PyObject_CallFunction(metadata->callback, "(i)", (int)puback_result); - - if (!result) { - PyErr_WriteUnraisable(PyErr_Occurred()); - } - - // cleanup - Py_XDECREF(metadata->callback); - Py_XDECREF(result); - - PyGILState_Release(state); - - aws_mem_release(aws_py_get_allocator(), metadata); -} - PyObject *aws_py_mqtt5_client_invoke_puback(PyObject *self, PyObject *args) { (void)self; - bool success = false; + bool success = true; PyObject *impl_capsule; PyObject *puback_handle_capsule; - PyObject *manual_puback_callback_fn_py; if (!PyArg_ParseTuple( args, - "OOO", + "OO", /* O */ &impl_capsule, - /* O */ &puback_handle_capsule, - /* O */ &manual_puback_callback_fn_py)) { + /* O */ &puback_handle_capsule)) { return NULL; } @@ -1841,28 +1809,11 @@ PyObject *aws_py_mqtt5_client_invoke_puback(PyObject *self, PyObject *args) { return NULL; } - /* callback related must be cleaned up after this point */ - struct invoke_puback_complete_userdata *metadata = - aws_mem_calloc(aws_py_get_allocator(), 1, sizeof(struct invoke_puback_complete_userdata)); - metadata->callback = manual_puback_callback_fn_py; - Py_INCREF(metadata->callback); - - struct aws_mqtt5_manual_puback_completion_options manual_puback_completion_options = { - .completion_callback = &s_on_invoke_puback_complete_fn, .completion_user_data = metadata}; - - if (aws_mqtt5_client_invoke_puback(client->native, handle->control_id, &manual_puback_completion_options)) { + if (aws_mqtt5_client_invoke_puback(client->native, handle->control_id, NULL)) { PyErr_SetAwsLastError(); - goto manual_puback_failed; + success = false; } - success = true; - goto done; - -manual_puback_failed: - Py_XDECREF(manual_puback_callback_fn_py); - aws_mem_release(aws_py_get_allocator(), metadata); - -done: if (success) { Py_RETURN_NONE; } From 82526b2b249c1d30050fb8c35818c6d8802c2233 Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Wed, 11 Mar 2026 14:14:16 -0700 Subject: [PATCH 14/20] more remnants of completion options --- awscrt/mqtt5.py | 43 +------------------------------------------ 1 file changed, 1 insertion(+), 42 deletions(-) diff --git a/awscrt/mqtt5.py b/awscrt/mqtt5.py index 8299369cb..3d9f1a125 100644 --- a/awscrt/mqtt5.py +++ b/awscrt/mqtt5.py @@ -496,29 +496,6 @@ def _try_puback_reason_code(value): return None -class ManualPubackResult(IntEnum): - """Result for a manually invoked PUBACK operation.""" - - SUCCESS = 0 - """The PUBACK was successfully sent.""" - - PUBACK_CANCELLED = 1 - """The PUBACK was cancelled and will not be sent.""" - - PUBACK_INVALID = 2 - """The PUBACK attempting to be sent is invalid.""" - - CRT_FAILURE = 3 - """The PUBACK failed to send due to a CRT failure.""" - - -def _try_manual_puback_result(value): - try: - return ManualPubackResult(value) - except Exception: - return None - - class SubackReasonCode(IntEnum): """Reason code inside SUBACK packet payloads. @@ -1164,16 +1141,6 @@ class PubackPacket: user_properties: 'Sequence[UserProperty]' = None -@dataclass -class InvokePubackCompletion: - """dataclass containing results of a manually invoked PUBACK - - Args: - puback_result (ManualPubackResult): Result of manually invoked PUBACK - """ - puback_result: ManualPubackResult = None - - @dataclass class ConnectPacket: """Data model of an `MQTT5 CONNECT `_ packet. @@ -2002,21 +1969,13 @@ def invoke_puback(self, puback_control_handle): Args: puback_control_handle: An opaque handle obtained from acquire_puback_control(). This handle cannot be created manually and must come from the acquire_puback_control() Callable within PublishReceivedData. - - Returns: - A future with InvokePubackCompletion that completes when invoked PUBACK is sent or fails to send. A successfully sent PUBACK only confirms the requested PUBACK has been sent, not that the broker has received it and/or it hasn't re-sent the PUBLISH message being acknowledged. """ future = Future() - def invokePubackComplete(puback_result): - invokePubackCompletion = InvokePubackCompletion(puback_result=_try_manual_puback_result(puback_result)) - future.set_result(invokePubackCompletion) - _awscrt.mqtt5_client_invoke_puback( self._binding, - puback_control_handle, - invokePubackComplete + puback_control_handle ) return future From 13ce0f76dd2b587498a05433fcf647d0bca73e97 Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Thu, 12 Mar 2026 10:07:43 -0700 Subject: [PATCH 15/20] update documentation and remove completion future from invoke puback --- awscrt/mqtt5.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/awscrt/mqtt5.py b/awscrt/mqtt5.py index 3d9f1a125..38a589589 100644 --- a/awscrt/mqtt5.py +++ b/awscrt/mqtt5.py @@ -1228,7 +1228,14 @@ class PublishReceivedData: Args: publish_packet (PublishPacket): Data model of an `MQTT5 PUBLISH `_ packet. - acquire_puback_control (Callable): Call this function to prevent automatic PUBACK and take manual control of this PUBLISH message's PUBACK. Returns an opaque handle object that can be passed to Client.invoke_puback(). + acquire_puback_control (Callable): Call this function within the on_publish_callback_fn callback to take manual + control of the PUBACK for this QoS 1 message, preventing the client from automatically sending a PUBACK. + Returns an opaque handle that can be passed to Client.invoke_puback() to send the PUBACK to the broker. + + Important: This function must be called within the on_publish_callback_fn callback. Calling it after the + callback returns will result in an error. This function may only be called once per received PUBLISH. + If this function is not called, the client will automatically send a PUBACK for QoS 1 messages when + the callback returns. Only relevant for QoS 1 messages. """ publish_packet: PublishPacket = None acquire_puback_control: Callable = None @@ -1965,21 +1972,21 @@ def get_stats(self): return OperationStatisticsData(result[0], result[1], result[2], result[3]) def invoke_puback(self, puback_control_handle): - """Sends a PUBACK packet for the given puback control handle. + """Sends a PUBACK packet for a QoS 1 PUBLISH that was previously acquired for manual control. + + To use manual PUBACK control, call acquire_puback_control() within the on_publish_callback_fn + callback to obtain a handle. Then call this method to send the PUBACK. Args: - puback_control_handle: An opaque handle obtained from acquire_puback_control(). This handle cannot be created manually and must come from the acquire_puback_control() Callable within PublishReceivedData. + puback_control_handle: An opaque handle obtained from acquire_puback_control() within + PublishReceivedData. This handle cannot be created manually. """ - future = Future() - _awscrt.mqtt5_client_invoke_puback( self._binding, puback_control_handle ) - return future - def new_connection(self, on_connection_interrupted=None, on_connection_resumed=None, on_connection_success=None, on_connection_failure=None, on_connection_closed=None): from awscrt.mqtt import Connection From 89cf2cb6613a15b4b5b73810f673f1b46567718f Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Thu, 12 Mar 2026 10:11:48 -0700 Subject: [PATCH 16/20] more documentation changes --- awscrt/mqtt5.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/awscrt/mqtt5.py b/awscrt/mqtt5.py index 38a589589..4083b9728 100644 --- a/awscrt/mqtt5.py +++ b/awscrt/mqtt5.py @@ -1980,6 +1980,9 @@ def invoke_puback(self, puback_control_handle): Args: puback_control_handle: An opaque handle obtained from acquire_puback_control() within PublishReceivedData. This handle cannot be created manually. + + Raises: + Exception: If the native client returns an error when invoking the PUBACK. """ _awscrt.mqtt5_client_invoke_puback( From 31b1ea28d0a866311649559df5eb07ed89da705f Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Thu, 12 Mar 2026 11:13:03 -0700 Subject: [PATCH 17/20] prevent double use and use after we are out of the publish received callback --- awscrt/mqtt5.py | 15 +++++++++------ source/mqtt5_client.c | 35 ++++++++++++++++++++++++++++------- 2 files changed, 37 insertions(+), 13 deletions(-) diff --git a/awscrt/mqtt5.py b/awscrt/mqtt5.py index 4083b9728..91cc19f9d 100644 --- a/awscrt/mqtt5.py +++ b/awscrt/mqtt5.py @@ -1228,14 +1228,17 @@ class PublishReceivedData: Args: publish_packet (PublishPacket): Data model of an `MQTT5 PUBLISH `_ packet. - acquire_puback_control (Callable): Call this function within the on_publish_callback_fn callback to take manual - control of the PUBACK for this QoS 1 message, preventing the client from automatically sending a PUBACK. - Returns an opaque handle that can be passed to Client.invoke_puback() to send the PUBACK to the broker. + acquire_puback_control (Callable): For QoS 1 messages only: call this function within the + on_publish_callback_fn callback to take manual control of the PUBACK for this message, preventing + the client from automatically sending a PUBACK. Returns an opaque handle that can be passed to + Client.invoke_puback() to send the PUBACK to the broker. Important: This function must be called within the on_publish_callback_fn callback. Calling it after the - callback returns will result in an error. This function may only be called once per received PUBLISH. - If this function is not called, the client will automatically send a PUBACK for QoS 1 messages when - the callback returns. Only relevant for QoS 1 messages. + callback returns will raise a RuntimeError. This function may only be called once per received PUBLISH; + calling it a second time will also raise a RuntimeError. If this function is not called, the client will + automatically send a PUBACK for QoS 1 messages when the callback returns. + + For QoS 0 messages, this field is None. """ publish_packet: PublishPacket = None acquire_puback_control: Callable = None diff --git a/source/mqtt5_client.c b/source/mqtt5_client.c index 1700da6ba..629fb5011 100644 --- a/source/mqtt5_client.c +++ b/source/mqtt5_client.c @@ -249,13 +249,26 @@ PyObject *aws_py_mqtt5_client_acquire_puback(PyObject *self, PyObject *args) { (void)args; struct manual_puback_control_context *context = PyCapsule_GetPointer(self, "manual_puback_control_context"); - if (!context || !context->publish_packet) { + if (!context) { PyErr_SetString(PyExc_ValueError, "Invalid manual PUBACK control context"); return NULL; } + /* If the publish_packet pointer has been zeroed out, the callback has already returned (post-callback call) + * or this function was already called once (double-call). Both are usage errors. */ + if (!context->publish_packet) { + PyErr_SetString( + PyExc_RuntimeError, + "acquire_puback_control() must be called within the on_publish_callback_fn callback " + "and may only be called once."); + return NULL; + } + uint64_t puback_control_id = aws_mqtt5_client_acquire_puback(context->client, context->publish_packet); + /* Zero out the publish_packet pointer to prevent double-calls. */ + context->publish_packet = NULL; + /* Create handle struct */ struct puback_control_handle *handle = aws_mem_calloc(aws_py_get_allocator(), 1, sizeof(struct puback_control_handle)); @@ -321,11 +334,15 @@ static void s_on_publish_received(const struct aws_mqtt5_packet_publish_view *pu METH_NOARGS, "Take manual control of PUBACK for this message"}; - /* Create the manual control callback function */ - manual_control_callback = PyCFunction_New(&method_def, control_context_capsule); - if (!manual_control_callback) { - PyErr_WriteUnraisable(PyErr_Occurred()); - goto cleanup; + /* Only create the manual control callback for QoS 1 messages. + * For QoS 0, acquire_puback_control is passed as None. + * acquirePubackControl is only set for QoS 1). */ + if (publish_packet->qos == AWS_MQTT5_QOS_AT_LEAST_ONCE) { + manual_control_callback = PyCFunction_New(&method_def, control_context_capsule); + if (!manual_control_callback) { + PyErr_WriteUnraisable(PyErr_Occurred()); + goto cleanup; + } } /* Create list of uint32_t subscription identifier tuples */ @@ -375,12 +392,16 @@ static void s_on_publish_received(const struct aws_mqtt5_packet_publish_view *pu /* s */ publish_packet->content_type ? publish_packet->content_type->ptr : NULL, /* # */ publish_packet->content_type ? publish_packet->content_type->len : 0, /* O */ user_property_count > 0 ? user_properties_list : Py_None, - /* O */ manual_control_callback); + /* O */ manual_control_callback ? manual_control_callback : Py_None); if (!result) { PyErr_WriteUnraisable(PyErr_Occurred()); } + /* Invalidate the publish_packet pointer now that the callback has returned. + * This prevents use-after-free if acquire_puback_control() is called after the callback. */ + control_context->publish_packet = NULL; + cleanup: Py_XDECREF(result); Py_XDECREF(subscription_identifier_list); From 6f41ace7c4ae3ef7761790f00a6124bacbe46a58 Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Thu, 12 Mar 2026 13:04:01 -0700 Subject: [PATCH 18/20] manual puback tests --- test/test_mqtt5.py | 169 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 169 insertions(+) diff --git a/test/test_mqtt5.py b/test/test_mqtt5.py index 4d8f7d0e2..bd5078ea6 100644 --- a/test/test_mqtt5.py +++ b/test/test_mqtt5.py @@ -1574,6 +1574,175 @@ def _test_qos1_happy_path(self): def test_qos1_happy_path(self): test_retry_wrapper(self._test_qos1_happy_path) + def _test_manual_puback_hold(self): + input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST") + input_cert = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_CERT") + input_key = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_KEY") + + client_id = create_client_id() + topic_filter = "test/MQTT5_ManualPuback_Python_" + client_id + payload = str(uuid.uuid4()) + payload_bytes = payload.encode("utf-8") + + PUBACK_HOLD_TIMEOUT = 60.0 + + future_first_delivery = Future() + future_redelivery = Future() + puback_handle_holder = [None] # mutable container so the closure can write to it + + def on_publish_received(publish_received_data: mqtt5.PublishReceivedData): + received_payload = publish_received_data.publish_packet.payload + if not future_first_delivery.done(): + # First delivery: acquire manual PUBACK control to hold the PUBACK + puback_handle_holder[0] = publish_received_data.acquire_puback_control() + future_first_delivery.set_result(received_payload) + elif not future_redelivery.done(): + # Second delivery: broker re-sent because no PUBACK was received + future_redelivery.set_result(received_payload) + + tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path( + input_cert, + input_key + ) + client_options = mqtt5.ClientOptions( + host_name=input_host_name, + port=8883 + ) + client_options.connect_options = mqtt5.ConnectPacket(client_id=client_id) + client_options.tls_ctx = io.ClientTlsContext(tls_ctx_options) + + callbacks = Mqtt5TestCallbacks() + callbacks.on_publish_received = on_publish_received + + client = self._create_client(client_options=client_options, callbacks=callbacks) + client.start() + callbacks.future_connection_success.result(TIMEOUT) + + # Subscribe to the topic with QoS 1 + subscriptions = [mqtt5.Subscription(topic_filter=topic_filter, qos=mqtt5.QoS.AT_LEAST_ONCE)] + subscribe_packet = mqtt5.SubscribePacket(subscriptions=subscriptions) + subscribe_future = client.subscribe(subscribe_packet=subscribe_packet) + suback_packet = subscribe_future.result(TIMEOUT) + self.assertIsInstance(suback_packet, mqtt5.SubackPacket) + + # Publish a QoS 1 message with a unique UUID payload + publish_packet = mqtt5.PublishPacket( + payload=payload, + topic=topic_filter, + qos=mqtt5.QoS.AT_LEAST_ONCE) + publish_future = client.publish(publish_packet=publish_packet) + publish_completion_data = publish_future.result(TIMEOUT) + self.assertIsInstance(publish_completion_data.puback, mqtt5.PubackPacket) + + # Wait for the first delivery and confirm PUBACK was held + first_payload = future_first_delivery.result(TIMEOUT) + self.assertEqual(first_payload, payload_bytes) + self.assertIsNotNone(puback_handle_holder[0], "acquire_puback_control() should have returned a handle") + + # Wait up to 60 seconds for the broker to re-deliver the message (no PUBACK was sent) + redelivered_payload = future_redelivery.result(PUBACK_HOLD_TIMEOUT) + self.assertEqual(redelivered_payload, payload_bytes, + "Re-delivered payload should match the original UUID payload") + + # Release the held PUBACK now that we've confirmed re-delivery + client.invoke_puback(puback_handle_holder[0]) + + client.stop() + callbacks.future_stopped.result(TIMEOUT) + + def test_manual_puback_hold(self): + test_retry_wrapper(self._test_manual_puback_hold) + + def _test_manual_puback_invoke(self): + input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST") + input_cert = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_CERT") + input_key = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_KEY") + + client_id = create_client_id() + topic_filter = "test/MQTT5_ManualPuback_Python_" + client_id + payload = str(uuid.uuid4()) + payload_bytes = payload.encode("utf-8") + + NO_REDELIVERY_WAIT = 60.0 + + future_first_delivery = Future() + future_unexpected_redelivery = Future() + puback_handle_holder = [None] # mutable container so the closure can write to it + + def on_publish_received(publish_received_data: mqtt5.PublishReceivedData): + received_payload = publish_received_data.publish_packet.payload + if not future_first_delivery.done(): + # First delivery: acquire manual PUBACK control, then immediately invoke it + puback_handle_holder[0] = publish_received_data.acquire_puback_control() + future_first_delivery.set_result(received_payload) + elif received_payload == payload_bytes and not future_unexpected_redelivery.done(): + # A second delivery of the same payload means the broker re-sent — this should NOT happen + future_unexpected_redelivery.set_result(received_payload) + + tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path( + input_cert, + input_key + ) + client_options = mqtt5.ClientOptions( + host_name=input_host_name, + port=8883 + ) + client_options.connect_options = mqtt5.ConnectPacket(client_id=client_id) + client_options.tls_ctx = io.ClientTlsContext(tls_ctx_options) + + callbacks = Mqtt5TestCallbacks() + callbacks.on_publish_received = on_publish_received + + client = self._create_client(client_options=client_options, callbacks=callbacks) + client.start() + callbacks.future_connection_success.result(TIMEOUT) + + # Subscribe to the topic with QoS 1 + subscriptions = [mqtt5.Subscription(topic_filter=topic_filter, qos=mqtt5.QoS.AT_LEAST_ONCE)] + subscribe_packet = mqtt5.SubscribePacket(subscriptions=subscriptions) + subscribe_future = client.subscribe(subscribe_packet=subscribe_packet) + suback_packet = subscribe_future.result(TIMEOUT) + self.assertIsInstance(suback_packet, mqtt5.SubackPacket) + + # Publish a QoS 1 message with a unique UUID payload + publish_packet = mqtt5.PublishPacket( + payload=payload, + topic=topic_filter, + qos=mqtt5.QoS.AT_LEAST_ONCE) + publish_future = client.publish(publish_packet=publish_packet) + publish_completion_data = publish_future.result(TIMEOUT) + self.assertIsInstance(publish_completion_data.puback, mqtt5.PubackPacket) + + # Wait for the first delivery and confirm PUBACK handle was acquired + first_payload = future_first_delivery.result(TIMEOUT) + self.assertEqual(first_payload, payload_bytes) + self.assertIsNotNone(puback_handle_holder[0], "acquire_puback_control() should have returned a handle") + + # Immediately invoke the PUBACK using the acquired handle + client.invoke_puback(puback_handle_holder[0]) + + # Wait 60 seconds and confirm the broker does NOT re-deliver the message + # (because we sent the PUBACK via invoke_puback) + redelivered = future_unexpected_redelivery.done() or \ + (not future_unexpected_redelivery.done() and + self._wait_for_future_timeout(future_unexpected_redelivery, NO_REDELIVERY_WAIT)) + self.assertFalse(redelivered, + "Broker should NOT re-deliver the message after invoke_puback() was called") + + client.stop() + callbacks.future_stopped.result(TIMEOUT) + + def _wait_for_future_timeout(self, future, timeout_sec): + """Returns True if the future completed within timeout_sec, False if it timed out.""" + try: + future.result(timeout_sec) + return True + except Exception: + return False + + def test_manual_puback_invoke(self): + test_retry_wrapper(self._test_manual_puback_invoke) + # ============================================================== # RETAIN TEST CASES # ============================================================== From 63f662fae809c01674338d4b2a601bbed9a1a00f Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Fri, 13 Mar 2026 10:44:10 -0700 Subject: [PATCH 19/20] more manual puback tests --- test/test_mqtt5.py | 160 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 160 insertions(+) diff --git a/test/test_mqtt5.py b/test/test_mqtt5.py index bd5078ea6..92a78a0d6 100644 --- a/test/test_mqtt5.py +++ b/test/test_mqtt5.py @@ -1743,6 +1743,166 @@ def _wait_for_future_timeout(self, future, timeout_sec): def test_manual_puback_invoke(self): test_retry_wrapper(self._test_manual_puback_invoke) + def _test_manual_puback_acquire_double_call_raises(self): + """Verify that calling acquire_puback_control() twice on the same QoS 1 PUBLISH raises RuntimeError.""" + input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST") + input_cert = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_CERT") + input_key = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_KEY") + + client_id = create_client_id() + topic_filter = "test/MQTT5_Binding_Python_" + client_id + payload = str(uuid.uuid4()) + + future_result = Future() + + def on_publish_received(publish_received_data: mqtt5.PublishReceivedData): + try: + # First call should succeed + handle = publish_received_data.acquire_puback_control() + # Second call on the same message should raise RuntimeError + try: + publish_received_data.acquire_puback_control() + future_result.set_result("no_error") # Should not reach here + except RuntimeError: + future_result.set_result("double_call_raised") + except Exception as e: + future_result.set_result(f"unexpected_error: {e}") + # Release the handle we acquired + # (handle is valid, invoke it to clean up) + except Exception as e: + future_result.set_result(f"first_call_failed: {e}") + + tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path(input_cert, input_key) + client_options = mqtt5.ClientOptions(host_name=input_host_name, port=8883) + client_options.connect_options = mqtt5.ConnectPacket(client_id=client_id) + client_options.tls_ctx = io.ClientTlsContext(tls_ctx_options) + + callbacks = Mqtt5TestCallbacks() + callbacks.on_publish_received = on_publish_received + + client = self._create_client(client_options=client_options, callbacks=callbacks) + client.start() + callbacks.future_connection_success.result(TIMEOUT) + + subscriptions = [mqtt5.Subscription(topic_filter=topic_filter, qos=mqtt5.QoS.AT_LEAST_ONCE)] + subscribe_future = client.subscribe(mqtt5.SubscribePacket(subscriptions=subscriptions)) + subscribe_future.result(TIMEOUT) + + publish_future = client.publish(mqtt5.PublishPacket( + payload=payload, topic=topic_filter, qos=mqtt5.QoS.AT_LEAST_ONCE)) + publish_future.result(TIMEOUT) + + result = future_result.result(TIMEOUT) + self.assertEqual(result, "double_call_raised", + f"Expected RuntimeError on double-call, got: {result}") + + client.stop() + callbacks.future_stopped.result(TIMEOUT) + + def test_manual_puback_acquire_double_call_raises(self): + test_retry_wrapper(self._test_manual_puback_acquire_double_call_raises) + + def _test_manual_puback_acquire_post_callback_raises(self): + """Verify that calling acquire_puback_control() after the callback has returned raises RuntimeError.""" + input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST") + input_cert = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_CERT") + input_key = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_KEY") + + client_id = create_client_id() + topic_filter = "test/MQTT5_Binding_Python_" + client_id + payload = str(uuid.uuid4()) + + future_callback_done = Future() + saved_acquire_fn_holder = [None] + + def on_publish_received(publish_received_data: mqtt5.PublishReceivedData): + # Save the callable but do NOT call it within the callback + saved_acquire_fn_holder[0] = publish_received_data.acquire_puback_control + future_callback_done.set_result(True) + + tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path(input_cert, input_key) + client_options = mqtt5.ClientOptions(host_name=input_host_name, port=8883) + client_options.connect_options = mqtt5.ConnectPacket(client_id=client_id) + client_options.tls_ctx = io.ClientTlsContext(tls_ctx_options) + + callbacks = Mqtt5TestCallbacks() + callbacks.on_publish_received = on_publish_received + + client = self._create_client(client_options=client_options, callbacks=callbacks) + client.start() + callbacks.future_connection_success.result(TIMEOUT) + + subscriptions = [mqtt5.Subscription(topic_filter=topic_filter, qos=mqtt5.QoS.AT_LEAST_ONCE)] + subscribe_future = client.subscribe(mqtt5.SubscribePacket(subscriptions=subscriptions)) + subscribe_future.result(TIMEOUT) + + publish_future = client.publish(mqtt5.PublishPacket( + payload=payload, topic=topic_filter, qos=mqtt5.QoS.AT_LEAST_ONCE)) + publish_future.result(TIMEOUT) + + # Wait for the callback to complete + future_callback_done.result(TIMEOUT) + + # Now call acquire_puback_control() after the callback has returned — should raise RuntimeError + acquire_fn = saved_acquire_fn_holder[0] + self.assertIsNotNone(acquire_fn, "acquire_puback_control should have been saved") + with self.assertRaises(RuntimeError): + acquire_fn() + + client.stop() + callbacks.future_stopped.result(TIMEOUT) + + def test_manual_puback_acquire_post_callback_raises(self): + test_retry_wrapper(self._test_manual_puback_acquire_post_callback_raises) + + def _test_manual_puback_qos0_acquire_is_none(self): + """Verify that acquire_puback_control is None for QoS 0 messages.""" + input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST") + input_cert = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_CERT") + input_key = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_KEY") + + client_id = create_client_id() + topic_filter = "test/MQTT5_Binding_Python_" + client_id + payload = str(uuid.uuid4()) + + future_acquire_value = Future() + + def on_publish_received(publish_received_data: mqtt5.PublishReceivedData): + # For QoS 0, acquire_puback_control should be None + future_acquire_value.set_result(publish_received_data.acquire_puback_control) + + tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path(input_cert, input_key) + client_options = mqtt5.ClientOptions(host_name=input_host_name, port=8883) + client_options.connect_options = mqtt5.ConnectPacket(client_id=client_id) + client_options.tls_ctx = io.ClientTlsContext(tls_ctx_options) + + callbacks = Mqtt5TestCallbacks() + callbacks.on_publish_received = on_publish_received + + client = self._create_client(client_options=client_options, callbacks=callbacks) + client.start() + callbacks.future_connection_success.result(TIMEOUT) + + # Subscribe with QoS 1 so the broker delivers at QoS 0 (publish at QoS 0) + subscriptions = [mqtt5.Subscription(topic_filter=topic_filter, qos=mqtt5.QoS.AT_LEAST_ONCE)] + subscribe_future = client.subscribe(mqtt5.SubscribePacket(subscriptions=subscriptions)) + subscribe_future.result(TIMEOUT) + + # Publish at QoS 0 — no PUBACK involved + publish_future = client.publish(mqtt5.PublishPacket( + payload=payload, topic=topic_filter, qos=mqtt5.QoS.AT_MOST_ONCE)) + publish_future.result(TIMEOUT) + + acquire_value = future_acquire_value.result(TIMEOUT) + self.assertIsNone(acquire_value, + "acquire_puback_control should be None for QoS 0 messages") + + client.stop() + callbacks.future_stopped.result(TIMEOUT) + + def test_manual_puback_qos0_acquire_is_none(self): + test_retry_wrapper(self._test_manual_puback_qos0_acquire_is_none) + # ============================================================== # RETAIN TEST CASES # ============================================================== From 56dda25a2802e4bf0e3f142354e32d70846ab6e1 Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Fri, 13 Mar 2026 11:22:54 -0700 Subject: [PATCH 20/20] add more comments --- test/test_mqtt5.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/test/test_mqtt5.py b/test/test_mqtt5.py index 92a78a0d6..d2883b938 100644 --- a/test/test_mqtt5.py +++ b/test/test_mqtt5.py @@ -1574,6 +1574,11 @@ def _test_qos1_happy_path(self): def test_qos1_happy_path(self): test_retry_wrapper(self._test_qos1_happy_path) + # ============================================================== + # MANUAL PUBACK TEST CASES + # ============================================================== + + # Manual PUBACK hold test: hold PUBACK and verify broker re-delivers the message def _test_manual_puback_hold(self): input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST") input_cert = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_CERT") @@ -1653,6 +1658,7 @@ def on_publish_received(publish_received_data: mqtt5.PublishReceivedData): def test_manual_puback_hold(self): test_retry_wrapper(self._test_manual_puback_hold) + # Manual PUBACK invoke test: acquire and immediately invoke PUBACK, verify no re-delivery def _test_manual_puback_invoke(self): input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST") input_cert = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_CERT") @@ -1676,7 +1682,7 @@ def on_publish_received(publish_received_data: mqtt5.PublishReceivedData): puback_handle_holder[0] = publish_received_data.acquire_puback_control() future_first_delivery.set_result(received_payload) elif received_payload == payload_bytes and not future_unexpected_redelivery.done(): - # A second delivery of the same payload means the broker re-sent — this should NOT happen + # A second delivery of the same payload means the broker re-sent, this should NOT happen future_unexpected_redelivery.set_result(received_payload) tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path( @@ -1743,6 +1749,7 @@ def _wait_for_future_timeout(self, future, timeout_sec): def test_manual_puback_invoke(self): test_retry_wrapper(self._test_manual_puback_invoke) + # Manual PUBACK double-call test: calling acquirePubackControl() twice raises IllegalStateException def _test_manual_puback_acquire_double_call_raises(self): """Verify that calling acquire_puback_control() twice on the same QoS 1 PUBLISH raises RuntimeError.""" input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST") @@ -1802,6 +1809,7 @@ def on_publish_received(publish_received_data: mqtt5.PublishReceivedData): def test_manual_puback_acquire_double_call_raises(self): test_retry_wrapper(self._test_manual_puback_acquire_double_call_raises) + # Manual PUBACK post-callback test: calling acquire_puback_control() after callback returns raises RuntimeError def _test_manual_puback_acquire_post_callback_raises(self): """Verify that calling acquire_puback_control() after the callback has returned raises RuntimeError.""" input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST") @@ -1843,7 +1851,7 @@ def on_publish_received(publish_received_data: mqtt5.PublishReceivedData): # Wait for the callback to complete future_callback_done.result(TIMEOUT) - # Now call acquire_puback_control() after the callback has returned — should raise RuntimeError + # Now call acquire_puback_control() after the callback has returned, this should raise RuntimeError acquire_fn = saved_acquire_fn_holder[0] self.assertIsNotNone(acquire_fn, "acquire_puback_control should have been saved") with self.assertRaises(RuntimeError): @@ -1855,6 +1863,7 @@ def on_publish_received(publish_received_data: mqtt5.PublishReceivedData): def test_manual_puback_acquire_post_callback_raises(self): test_retry_wrapper(self._test_manual_puback_acquire_post_callback_raises) + # Manual PUBACK QoS 0 test: acquirePubackControl() throws IllegalStateException for QoS 0 messages def _test_manual_puback_qos0_acquire_is_none(self): """Verify that acquire_puback_control is None for QoS 0 messages.""" input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST") @@ -1888,7 +1897,7 @@ def on_publish_received(publish_received_data: mqtt5.PublishReceivedData): subscribe_future = client.subscribe(mqtt5.SubscribePacket(subscriptions=subscriptions)) subscribe_future.result(TIMEOUT) - # Publish at QoS 0 — no PUBACK involved + # Publish at QoS 0, there's no PUBACK involved publish_future = client.publish(mqtt5.PublishPacket( payload=payload, topic=topic_filter, qos=mqtt5.QoS.AT_MOST_ONCE)) publish_future.result(TIMEOUT)