From e3fc2aaaa089c8575cd0bfa2783d3cef27b284ee Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Tue, 27 Jan 2026 12:13:31 +0000 Subject: [PATCH 1/3] Remove the old tui force metadata code --- src/murfey/client/multigrid_control.py | 27 +------------------------- src/murfey/instrument_server/api.py | 1 - 2 files changed, 1 insertion(+), 27 deletions(-) diff --git a/src/murfey/client/multigrid_control.py b/src/murfey/client/multigrid_control.py index 819bfbe8..903affd4 100644 --- a/src/murfey/client/multigrid_control.py +++ b/src/murfey/client/multigrid_control.py @@ -40,7 +40,6 @@ class MultigridController: finalising: bool = False dormant: bool = False multigrid_watcher_active: bool = True - processing_enabled: bool = True do_transfer: bool = True dummy_dc: bool = False force_mdoc_metadata: bool = True @@ -86,8 +85,6 @@ def __post_init__(self): for ds in val.values() for s in ds ] - self._data_collection_form_complete = False - self._register_dc: bool | None = None self.rsync_processes = self.rsync_processes or {} self.analysers = self.analysers or {} @@ -260,7 +257,6 @@ def _start_rsyncer_multigrid( self._start_rsyncer( source, destination, - force_metadata=self.processing_enabled, analyse=analyse, remove_files=remove_files, tag=tag, @@ -324,7 +320,6 @@ def _start_rsyncer( source: Path, destination: str, visit_path: str = "", - force_metadata: bool = False, analyse: bool = True, remove_files: bool = False, tag: str = "", @@ -455,12 +450,7 @@ def rsync_result(update: RSyncerUpdate): force_mdoc_metadata=self.force_mdoc_metadata, limited=limited, ) - if force_metadata: - self.analysers[source].subscribe( - partial(self._start_dc, from_form=True) - ) - else: - self.analysers[source].subscribe(self._data_collection_form) + self.analysers[source].subscribe(partial(self._start_dc, from_form=True)) self.analysers[source].start() if transfer: self.rsync_processes[source].subscribe(self.analysers[source].enqueue) @@ -502,21 +492,6 @@ def _rsync_update_converter(p: Path) -> None: ) self._environment.watchers[source].start() - def _data_collection_form(self, response: dict): - log.info("data collection form ready") - if self._data_collection_form_complete: - return - if self._register_dc and response.get("form"): - self._form_values = {k: str(v) for k, v in response.get("form", {}).items()} - log.info( - f"gain reference is set to {self._form_values.get('gain_ref')}, {self._environment.gain_ref}" - ) - if self._form_values.get("gain_ref") in (None, "None"): - self._form_values["gain_ref"] = self._environment.gain_ref - self._data_collection_form_complete = True - elif self._register_dc is None: - self._data_collection_form_complete = True - def _start_dc(self, metadata_json, from_form: bool = False): if self.dummy_dc: return diff --git a/src/murfey/instrument_server/api.py b/src/murfey/instrument_server/api.py index 1b603a46..5811b3ee 100644 --- a/src/murfey/instrument_server/api.py +++ b/src/murfey/instrument_server/api.py @@ -174,7 +174,6 @@ def setup_multigrid_watcher( session_id, murfey_url=_get_murfey_url(), do_transfer=True, - processing_enabled=not watcher_spec.skip_existing_processing, _machine_config=machine_config, token=tokens.get(session_id, "token"), data_collection_parameters=data_collection_parameters.get(label, {}), From 3b35bcd5545b11e88eaa906dc04d4ecbf8581d34 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Tue, 27 Jan 2026 12:20:02 +0000 Subject: [PATCH 2/3] No more skip existing processing and first loop logic --- src/murfey/client/watchdir_multigrid.py | 26 ++++--------------------- src/murfey/instrument_server/api.py | 1 - src/murfey/server/api/instrument.py | 1 - src/murfey/util/instrument_models.py | 1 - src/murfey/util/models.py | 1 - 5 files changed, 4 insertions(+), 26 deletions(-) diff --git a/src/murfey/client/watchdir_multigrid.py b/src/murfey/client/watchdir_multigrid.py index f1cc814f..dbaab049 100644 --- a/src/murfey/client/watchdir_multigrid.py +++ b/src/murfey/client/watchdir_multigrid.py @@ -17,7 +17,6 @@ def __init__( self, path: str | os.PathLike, machine_config: dict, - skip_existing_processing: bool = False, ): super().__init__() self._basepath = Path(path) @@ -30,7 +29,6 @@ def __init__( ) # Toggleable settings self._analyse = True - self._skip_existing_processing = skip_existing_processing self._stopping = False def start(self): @@ -61,21 +59,14 @@ def _handle_metadata(self, directory: Path, extra_directory: str): ) self._seen_dirs.append(directory) - def _handle_fractions(self, directory: Path, first_loop: bool): + def _handle_fractions(self, directory: Path): processing_started = False for d02 in directory.glob("Images-Disc*"): if d02 not in self._seen_dirs: - # If 'skip_existing_processing' is set, do not process for - # any data directories found on the first loop. - # This allows you to avoid triggering processing again if Murfey is restarted self.notify( d02, remove_files=True, - analyse=( - not (first_loop and self._skip_existing_processing) - if self._analyse - else False - ), + analyse=self._analyse, tag="fractions", ) self._seen_dirs.append(d02) @@ -88,17 +79,12 @@ def _handle_fractions(self, directory: Path, first_loop: bool): ): self.notify( directory, - analyse=( - not (first_loop and self._skip_existing_processing) - if self._analyse - else False - ), + analyse=self._analyse, tag="fractions", ) self._seen_dirs.append(directory) def _process(self): - first_loop = True while not self._stopping: for d in self._basepath.glob("*"): if d.name in self._machine_config["create_directories"]: @@ -133,7 +119,6 @@ def _process(self): self._handle_fractions( sample.parent.parent.parent / f"{sample.parent.name}_{sample.name}", - first_loop, ) else: @@ -141,10 +126,7 @@ def _process(self): self._handle_metadata( d, extra_directory=f"metadata_{d.name}" ) - self._handle_fractions(d.parent.parent / d.name, first_loop) - - if first_loop: - first_loop = False + self._handle_fractions(d.parent.parent / d.name) time.sleep(15) self.notify(final=True) diff --git a/src/murfey/instrument_server/api.py b/src/murfey/instrument_server/api.py index 5811b3ee..cd968584 100644 --- a/src/murfey/instrument_server/api.py +++ b/src/murfey/instrument_server/api.py @@ -189,7 +189,6 @@ def setup_multigrid_watcher( watchers[session_id] = MultigridDirWatcher( watcher_spec.source, machine_config, - skip_existing_processing=watcher_spec.skip_existing_processing, ) watchers[session_id].subscribe( partial( diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index d71e412a..35f4cceb 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -154,7 +154,6 @@ async def setup_multigrid_watcher( "visit": visit, "label": visit, "instrument_name": instrument_name, - "skip_existing_processing": watcher_spec.skip_existing_processing, "destination_overrides": { str(k): v for k, v in watcher_spec.destination_overrides.items() }, diff --git a/src/murfey/util/instrument_models.py b/src/murfey/util/instrument_models.py index 4b2b1ff9..ff1c16e1 100644 --- a/src/murfey/util/instrument_models.py +++ b/src/murfey/util/instrument_models.py @@ -10,7 +10,6 @@ class MultigridWatcherSpec(BaseModel): label: str visit: str instrument_name: str - skip_existing_processing: bool = False destination_overrides: Dict[Path, str] = {} rsync_restarts: List[str] = [] visit_end_time: Optional[datetime] = None diff --git a/src/murfey/util/models.py b/src/murfey/util/models.py index 004cb5f6..4d079dbd 100644 --- a/src/murfey/util/models.py +++ b/src/murfey/util/models.py @@ -203,7 +203,6 @@ class BatchPositionParameters(BaseModel): class MultigridWatcherSetup(BaseModel): source: Path - skip_existing_processing: bool = False destination_overrides: Dict[Path, str] = {} rsync_restarts: List[str] = [] From 809e5a30fd45453aa99f78bd99af50a3e5b976cb Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Mon, 2 Feb 2026 16:55:48 +0000 Subject: [PATCH 3/3] Remove redundant form logic --- src/murfey/client/analyser.py | 12 +-- src/murfey/client/multigrid_control.py | 142 ++++++++++++------------- 2 files changed, 68 insertions(+), 86 deletions(-) diff --git a/src/murfey/client/analyser.py b/src/murfey/client/analyser.py index 4142184f..ba4925cf 100644 --- a/src/murfey/client/analyser.py +++ b/src/murfey/client/analyser.py @@ -308,11 +308,7 @@ def _analyse(self): dc_metadata["acquisition_software"] = ( self._context._acquisition_software ) - self.notify( - { - "form": dc_metadata, - } - ) + self.notify(dc_metadata) # If a file with a CLEM context is identified, immediately post it elif isinstance(self._context, CLEMContext): @@ -366,11 +362,7 @@ def _analyse(self): dc_metadata["acquisition_software"] = ( self._context._acquisition_software ) - self.notify( - { - "form": dc_metadata, - } - ) + self.notify(dc_metadata) elif isinstance( self._context, ( diff --git a/src/murfey/client/multigrid_control.py b/src/murfey/client/multigrid_control.py index 903affd4..bfe4372a 100644 --- a/src/murfey/client/multigrid_control.py +++ b/src/murfey/client/multigrid_control.py @@ -450,7 +450,7 @@ def rsync_result(update: RSyncerUpdate): force_mdoc_metadata=self.force_mdoc_metadata, limited=limited, ) - self.analysers[source].subscribe(partial(self._start_dc, from_form=True)) + self.analysers[source].subscribe(self._start_dc) self.analysers[source].start() if transfer: self.rsync_processes[source].subscribe(self.analysers[source].enqueue) @@ -492,17 +492,13 @@ def _rsync_update_converter(p: Path) -> None: ) self._environment.watchers[source].start() - def _start_dc(self, metadata_json, from_form: bool = False): + def _start_dc(self, metadata_json): if self.dummy_dc: return - # for multigrid the analyser sends the message straight to _start_dc by-passing user input - # it is then necessary to extract the data from the message - if from_form: - metadata_json = metadata_json.get("form", {}) - # Safely convert all entries into strings, but leave None as-is - metadata_json = { - k: str(v) if v is not None else None for k, v in metadata_json.items() - } + # Safely convert all entries into strings, but leave None as-is + metadata_json = { + k: str(v) if v is not None else None for k, v in metadata_json.items() + } self._environment.dose_per_frame = metadata_json.get("dose_per_frame") self._environment.gain_ref = metadata_json.get("gain_ref") self._environment.symmetry = metadata_json.get("symmetry") @@ -576,82 +572,76 @@ def _start_dc(self, metadata_json, from_form: bool = False): environment=self._environment, token=self.token, ) - if from_form: - data = { - "voltage": metadata_json["voltage"], - "pixel_size_on_image": metadata_json["pixel_size_on_image"], - "experiment_type": metadata_json["experiment_type"], - "image_size_x": metadata_json["image_size_x"], - "image_size_y": metadata_json["image_size_y"], - "file_extension": metadata_json["file_extension"], - "acquisition_software": metadata_json["acquisition_software"], - "image_directory": str( - self._environment.default_destinations[source] - ), - "tag": str(source), - "source": str(source), - "magnification": metadata_json["magnification"], - "total_exposed_dose": metadata_json.get("total_exposed_dose"), - "c2aperture": metadata_json.get("c2aperture"), - "exposure_time": metadata_json.get("exposure_time"), - "slit_width": metadata_json.get("slit_width"), - "phase_plate": metadata_json.get("phase_plate", False), - } + data = { + "voltage": metadata_json["voltage"], + "pixel_size_on_image": metadata_json["pixel_size_on_image"], + "experiment_type": metadata_json["experiment_type"], + "image_size_x": metadata_json["image_size_x"], + "image_size_y": metadata_json["image_size_y"], + "file_extension": metadata_json["file_extension"], + "acquisition_software": metadata_json["acquisition_software"], + "image_directory": str(self._environment.default_destinations[source]), + "tag": str(source), + "source": str(source), + "magnification": metadata_json["magnification"], + "total_exposed_dose": metadata_json.get("total_exposed_dose"), + "c2aperture": metadata_json.get("c2aperture"), + "exposure_time": metadata_json.get("exposure_time"), + "slit_width": metadata_json.get("slit_width"), + "phase_plate": metadata_json.get("phase_plate", False), + } + capture_post( + base_url=str(self._environment.url.geturl()), + router_name="workflow.router", + function_name="start_dc", + token=self.token, + visit_name=self._environment.visit, + session_id=self.session_id, + data=data, + ) + for recipe in ( + "em-spa-preprocess", + "em-spa-extract", + "em-spa-class2d", + "em-spa-class3d", + "em-spa-refine", + ): capture_post( base_url=str(self._environment.url.geturl()), router_name="workflow.router", - function_name="start_dc", + function_name="register_proc", token=self.token, visit_name=self._environment.visit, session_id=self.session_id, - data=data, - ) - for recipe in ( - "em-spa-preprocess", - "em-spa-extract", - "em-spa-class2d", - "em-spa-class3d", - "em-spa-refine", - ): - capture_post( - base_url=str(self._environment.url.geturl()), - router_name="workflow.router", - function_name="register_proc", - token=self.token, - visit_name=self._environment.visit, - session_id=self.session_id, - data={ - "tag": str(source), - "source": str(source), - "recipe": recipe, - }, - ) - log.info(f"Posting SPA processing parameters: {metadata_json}") - response = capture_post( - base_url=str(self._environment.url.geturl()), - router_name="workflow.spa_router", - function_name="register_spa_proc_params", - token=self.token, - session_id=self.session_id, data={ - **{ - k: None if v == "None" else v - for k, v in metadata_json.items() - }, "tag": str(source), + "source": str(source), + "recipe": recipe, }, ) - if response and not str(response.status_code).startswith("2"): - log.warning(f"{response.reason}") - capture_post( - base_url=str(self._environment.url.geturl()), - router_name="workflow.spa_router", - function_name="flush_spa_processing", - token=self.token, - visit_name=self._environment.visit, - session_id=self.session_id, - data={"tag": str(source)}, - ) + log.info(f"Posting SPA processing parameters: {metadata_json}") + response = capture_post( + base_url=str(self._environment.url.geturl()), + router_name="workflow.spa_router", + function_name="register_spa_proc_params", + token=self.token, + session_id=self.session_id, + data={ + **{k: None if v == "None" else v for k, v in metadata_json.items()}, + "tag": str(source), + }, + ) + if response and not str(response.status_code).startswith("2"): + log.warning(f"{response.reason}") + capture_post( + base_url=str(self._environment.url.geturl()), + router_name="workflow.spa_router", + function_name="flush_spa_processing", + token=self.token, + visit_name=self._environment.visit, + session_id=self.session_id, + data={"tag": str(source)}, + ) def _increment_file_count( self, observed_files: List[Path], source: str, destination: str