diff --git a/CHANGELOG.rst b/CHANGELOG.rst index a71b6b93..ff70667c 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -10,7 +10,9 @@ Versioning `_. Unreleased_ ----------- -Nothing yet! +Fixed: + +- `Fix infinite loop when setting record's own value from on_update callback <../../pull/202>`_ 4.7.0_ - 2026-01-14 ------------------- diff --git a/softioc/device.py b/softioc/device.py index e468d46b..82130963 100644 --- a/softioc/device.py +++ b/softioc/device.py @@ -11,7 +11,7 @@ dbLoadDatabase, signal_processing_complete, recGblResetAlarms, - db_put_field, + db_put_field_process, db_get_field, ) from .device_core import DeviceSupportCore, RecordLookup @@ -108,7 +108,7 @@ def set_field(self, field, value): data = (c_char * 40)() data.value = str(value).encode() + b'\0' name = self._name + '.' + field - db_put_field(name, fields.DBF_STRING, addressof(data), 1) + db_put_field_process(name, fields.DBF_STRING, addressof(data), 1, True) class ProcessDeviceSupportIn(ProcessDeviceSupportCore): _link_ = 'INP' @@ -178,7 +178,6 @@ def __init__(self, name, **kargs): self.__validate = kargs.pop('validate', None) self.__always_update = kargs.pop('always_update', False) - self.__enable_write = True if 'initial_value' in kargs: value = self._value_to_epics(kargs.pop('initial_value')) @@ -239,8 +238,7 @@ def _process(self, record): return EPICS_OK python_value = self._epics_to_value(value) - if self.__enable_write and self.__validate and \ - not self.__validate(self, python_value): + if self.__validate and not self.__validate(self, python_value): # Asynchronous validation rejects value, so restore the last good # value. self._write_value(record, self._value[0]) @@ -249,7 +247,7 @@ def _process(self, record): # Value is good. Hang onto it, let users know the value has changed self._value = (value, severity, alarm) record.UDF = 0 - if self.__on_update and self.__enable_write: + if self.__on_update: record.PACT = self._blocking dispatcher( self.__on_update, @@ -287,9 +285,15 @@ def set(self, value, process=True, else: # The array parameter is used to keep the raw pointer alive dbf_code, length, data, array = self._value_to_dbr(value) - self.__enable_write = process - db_put_field(_record.NAME, dbf_code, data, length) - self.__enable_write = True + + # If we do process we instead do this inside _process, allowing + # validation to potentially refuse the update. + # However if we do not process, we must do this here to keep the + # Python and EPICS values in line + if not process: + self._value = (value, severity, alarm) + + db_put_field_process(_record.NAME, dbf_code, data, length, process) def get(self): return self._epics_to_value(self._value[0]) diff --git a/softioc/extension.c b/softioc/extension.c index bf8bfd9c..62f579ba 100644 --- a/softioc/extension.c +++ b/softioc/extension.c @@ -93,16 +93,18 @@ static PyObject *get_field_offsets(PyObject *self, PyObject *args) } -/* Updates PV field with integrated db lookup. Safer to do this in C as we need - * an intermediate copy of the dbAddr structure, which changes size between - * EPICS releases. */ -static PyObject *db_put_field(PyObject *self, PyObject *args) +/* This is our own re-implementation of EPICS's dbPutField function. + * We do this to allow us to control when dbProcess is called. We use the + * same logicical flow as the original function. */ +static PyObject *db_put_field_process(PyObject *self, PyObject *args) { const char *name; short dbrType; PyObject *buffer_ptr; long length; - if (!PyArg_ParseTuple(args, "shOl", &name, &dbrType, &buffer_ptr, &length)) + short process; + if (!PyArg_ParseTuple(args, "shOlh", + &name, &dbrType, &buffer_ptr, &length, &process)) return NULL; void *pbuffer = PyLong_AsVoidPtr(buffer_ptr); if (!pbuffer) @@ -113,6 +115,8 @@ static PyObject *db_put_field(PyObject *self, PyObject *args) return PyErr_Format( PyExc_RuntimeError, "dbNameToAddr failed for %s", name); + struct dbCommon *precord = dbAddr.precord; + long put_result; /* There are two important locks to consider at this point: The Global * Interpreter Lock (GIL) and the EPICS record lock. A deadlock is possible @@ -125,7 +129,18 @@ static PyObject *db_put_field(PyObject *self, PyObject *args) * EPICS call, to avoid potential deadlocks. * See https://github.com/DiamondLightSource/pythonSoftIOC/issues/119. */ Py_BEGIN_ALLOW_THREADS - put_result = dbPutField(&dbAddr, dbrType, pbuffer, length); + dbScanLock(precord); + put_result = dbPut(&dbAddr, dbrType, pbuffer, length); + + if (put_result == 0 && process) + { + if (precord->pact) + precord->rpro = TRUE; + else + dbProcess(precord); + } + + dbScanUnlock(precord); Py_END_ALLOW_THREADS if (put_result) return PyErr_Format( @@ -314,7 +329,7 @@ static struct PyMethodDef softioc_methods[] = { "Get a map of DBF names to values"}, {"get_field_offsets", get_field_offsets, METH_VARARGS, "Get offset, size and type for each record field"}, - {"db_put_field", db_put_field, METH_VARARGS, + {"db_put_field_process", db_put_field_process, METH_VARARGS, "Put a database field to a value"}, {"db_get_field", db_get_field, METH_VARARGS, "Get a database field's value"}, diff --git a/softioc/imports.py b/softioc/imports.py index a54fc8ee..c27dd177 100644 --- a/softioc/imports.py +++ b/softioc/imports.py @@ -19,9 +19,12 @@ def get_field_offsets(record_type): '''Return {field_name: (offset, size, field_type)}''' return _extension.get_field_offsets(record_type) -def db_put_field(name, dbr_type, pbuffer, length): - '''Put field where pbuffer is void* pointer. Returns None.''' - return _extension.db_put_field(name, dbr_type, pbuffer, length) +def db_put_field_process(name, dbr_type, pbuffer, length, process): + '''Put field where pbuffer is void* pointer, conditionally processing + the record. Returns None.''' + return _extension.db_put_field_process( + name, dbr_type, pbuffer, length, process + ) def db_get_field(name, dbr_type, pbuffer, length): '''Get field where pbuffer is void* pointer. Returns None.''' diff --git a/tests/test_records.py b/tests/test_records.py index 58176ca0..ad0af407 100644 --- a/tests/test_records.py +++ b/tests/test_records.py @@ -380,10 +380,12 @@ def out_records(self, request): def validate_always_pass(self, record, new_val): """Validation method that always allows changes""" + log("VALIDATE: Returning True") return True def validate_always_fail(self, record, new_val): """Validation method that always rejects changes""" + log("VALIDATE: Returning False") return False def validate_ioc_test_func( @@ -445,7 +447,6 @@ def validate_test_runner( # Wait for message that IOC has started select_and_recv(parent_conn, "R") - # Suppress potential spurious warnings _channel_cache.purge() @@ -682,6 +683,95 @@ def test_on_update_true_false(self, out_records): always_update is True and the put'ed value is always different""" self.on_update_runner(out_records, True, False) + def on_update_recursive_set_test_func( + self, device_name, conn + ): + log("CHILD: Child started") + + builder.SetDeviceName(device_name) + + async def on_update_func(new_val): + log("CHILD: on_update_func started") + record.set(0, process=False) + conn.send("C") # "Callback" + log("CHILD: on_update_func ended") + + record = builder.Action( + "ACTION", + on_update=on_update_func, + blocking=True, + initial_value=1 # A non-zero value, to check it changes + ) + + dispatcher = asyncio_dispatcher.AsyncioDispatcher() + builder.LoadDatabase() + softioc.iocInit(dispatcher) + + conn.send("R") # "Ready" + + log("CHILD: Sent R over Connection to Parent") + + # Keep process alive while main thread runs CAGET + if conn.poll(TIMEOUT): + val = conn.recv() + assert val == "D", "Did not receive expected Done character" + + log("CHILD: Received exit command, child exiting") + + async def test_on_update_recursive_set(self): + """Test that on_update functions correctly when the on_update + callback sets the value of the record again (with process=False). + See issue #201""" + + ctx = get_multiprocessing_context() + parent_conn, child_conn = ctx.Pipe() + + device_name = create_random_prefix() + + process = ctx.Process( + target=self.on_update_recursive_set_test_func, + args=(device_name, child_conn), + ) + + process.start() + + log("PARENT: Child started, waiting for R command") + + from aioca import caget, caput + + try: + # Wait for message that IOC has started + select_and_recv(parent_conn, "R") + + log("PARENT: received R command") + + record = f"{device_name}:ACTION" + + val = await caget(record) + + assert val == 1, "ACTION record did not start with value 1" + + await caput(record, 1, wait=True) + + val = await caget(record) + + assert val == 0, "ACTION record did not return to zero value" + + # Expect one "C" + select_and_recv(parent_conn, "C") + + # ...But if we receive another we know there's a problem + if parent_conn.poll(5): # Shorter timeout to make this quicker + pytest.fail("Received unexpected second message") + + finally: + log("PARENT:Sending Done command to child") + parent_conn.send("D") # "Done" + process.join(timeout=TIMEOUT) + log(f"PARENT: Join completed with exitcode {process.exitcode}") + if process.exitcode is None: + pytest.fail("Process did not terminate") + class TestBlocking: @@ -1463,3 +1553,92 @@ def test_set_alarm_severity_status(self, set_enum): _channel_cache.purge() parent_conn.send("D") # "Done" process.join(timeout=TIMEOUT) + + +class TestProcess: + """Tests related to processing - checking values are as expected + between the EPICS and Python layers. """ + + test_result_rec = "TestResult" + + + def process_test_function(self, device_name, conn, process): + builder.SetDeviceName(device_name) + + rec = builder.longOut("TEST", initial_value=5) + + # Record to indicate success/failure + bi = builder.boolIn(self.test_result_rec, ZNAM="FAILED", ONAM="SUCCESS") + + builder.LoadDatabase() + softioc.iocInit() + + # Prove value changes from .set call + rec.set(10, process=process) + + conn.send("R") # "Ready" + log("CHILD: Sent R over Connection to Parent") + + select_and_recv(conn, "R") + + val = rec.get() + log(f"CHILD: record value is {val}") + + # value should be that which was set by .set() + if val == 10: + bi.set(1) + else: + bi.set(0) + + # Keep process alive while main thread works. + while (True): + if conn.poll(TIMEOUT): + val = conn.recv() + if val == "D": # "Done" + break + + @requires_cothread + @pytest.mark.parametrize("process", [True, False]) + def test_set_alarm_severity_status(self, process): + """Test that set_alarm function allows setting severity and status""" + ctx = get_multiprocessing_context() + parent_conn, child_conn = ctx.Pipe() + + device_name = create_random_prefix() + + process = ctx.Process( + target=self.process_test_function, + args=(device_name, child_conn, process), + ) + + process.start() + + from cothread.catools import caget, _channel_cache + from cothread import Sleep + + try: + # Wait for message that IOC has started + select_and_recv(parent_conn, "R") + + # Suppress potential spurious warnings + _channel_cache.purge() + + record = device_name + ":TEST" + val = caget(record, timeout=TIMEOUT) + + assert val == 10 + + parent_conn.send("R") # "Ready" + + Sleep(0.5) # Give child time to process update + + result_record = device_name + f":{self.test_result_rec}" + val = caget(result_record) + + assert val == 1, "Success record indicates failure" + + finally: + # Suppress potential spurious warnings + _channel_cache.purge() + parent_conn.send("D") # "Done" + process.join(timeout=TIMEOUT)