diff --git a/README.md b/README.md index 4fed22e..08a523a 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,7 @@ The suite consists of the following modules. ```python pyread # Data reading pyprocess # Listing and parallelisation +pydask # Dask-based distributed processing (inherits from pyprocess.py) pyimport # TTree (EventNtuple) importing interface pyplot # Plotting and visualisation pyprint # Array visualisation @@ -79,6 +80,50 @@ replacing the port (8888) with your chosen number and the user with your usernam To connect to the notebook copy the URL with the unique token printed in the terminal of the mu2e machine into your local browser. +### 2.1a Parallel Processing: `pyprocess` vs `pydask` + +`pyutils` currently provides two parallel processing options: + +**`pyprocess`** (ThreadPoolExecutor/ProcessPoolExecutor): +- Uses Python's built-in `concurrent.futures` for parallelisation +- Best for: Small to medium datasets, local processing on single machine +- Simpler to use, less overhead +- Limited to resources available on your machine + +**`pydask`** (Dask-based): +- Uses Dask distributed computing framework +- Best for: Large datasets, leveraging clusters, EAF integration +- Supports both local and remote Dask schedulers +- Better for scaling to many workers across multiple machines + +#### Using `pydask` Locally + +```python +from pyutils.pydask import DaskProcessor + +# Create processor +dp = DaskProcessor( + tree_path="EventNtuple/ntuple", + use_remote=True, + location="disk" +) + +# Process data - automatically creates local Dask cluster +data = dp.process_data( + file_list_path="files.txt", + branches=["trksegs"], + n_workers=4, + threads_per_worker=1, + processes=True # Use processes instead of threads +) +``` + +This creates a temporary local Dask cluster with 4 workers, processes the files, then shuts down. + +#### Using `pydask` on EAF with Remote Scheduler + +We are in conversations with EAF team. We are currently not able to use a remote, centralized scheduler. Contact Sophie or Sam for more information on the progress of these conversations. + ### 2.2 Module documentation Help information be accessed with `help(name)`, where `name` can be the module name, a class name, or a function name. diff --git a/examples/scripts/pyutils_basics_dask.py b/examples/scripts/pyutils_basics_dask.py new file mode 100644 index 0000000..953cb7d --- /dev/null +++ b/examples/scripts/pyutils_basics_dask.py @@ -0,0 +1,286 @@ +""" +pyutils basics with Dask +========================= + +This script demonstrates the core pyutils functionality using DaskProcessor +to handle multiple files in parallel. It shows: + +1. Importing data from multiple files using DaskProcessor +2. Applying selection cuts +3. Inspecting data +4. Performing vector operations +5. Creating plots from aggregated data + +Key differences from standard pyutils_basics: +- Uses DaskProcessor instead of Processor for parallel file handling +- Works with multiple files instead of a single file +- Scales across cores/clusters using Dask +""" + +import awkward as ak +import tempfile +import os + +# ============================================================================ +# 1. Setting up your environment +# ============================================================================ + +print("=" * 70) +print("pyutils Basics with Dask") +print("=" * 70) + +# Import the pyutils modules +from pyutils.pydask import DaskProcessor +from pyutils.pyselect import Select +from pyutils.pyprint import Print +from pyutils.pyvector import Vector +from pyutils.pyplot import Plot +from pyutils.pylogger import Logger + +# Initialize logger +logger = Logger(print_prefix="[pyutils_dask]", verbosity=2) + +# ============================================================================ +# 2. Processing Data with DaskProcessor +# ============================================================================ + +logger.log("Step 1: Initializing DaskProcessor", "info") + +# Initialize DaskProcessor with appropriate settings +processor = DaskProcessor( + tree_path="EventNtuple/ntuple", + use_remote=True, # Access files from remote location + location="disk", # Read from disk (persistent dataset) + verbosity=1, + worker_verbosity=0 +) + +logger.log("DaskProcessor initialized", "success") + +# Define the branches we want +branches = ["trksegs"] + +logger.log("Branches to extract: " + str(branches), "info") + +# For demonstration, show the API for multi-file processing +logger.log("\nDaskProcessor with SAM Definition:", "info") +logger.log(""" +DaskProcessor can retrieve files from SAM definitions for parallel processing: + +- SAM definition: "nts.mu2e.ensembleMDS3aMix1BBTriggered.MDC2025-001.root" +- Queries files dynamically from the data catalog +- DaskProcessor creates a local cluster to process files in parallel + +Advantages: +- Works with SAM definitions for dynamic file queries +- Can also use file_list_path for static file lists +- Scales across multiple cores automatically +- Built-in progress monitoring +- Can connect to remote Dask clusters on EAF +""", "info") + +# ============================================================================ +# 3. Processing Multiple Files with SAM Definition +# ============================================================================ + +logger.log("\n" + "=" * 70, "info") +logger.log("Step 2: Retrieving File List with get_file_list()", "info") +logger.log("=" * 70, "info") + +# Use get_file_list() to retrieve files from SAM definition +logger.log("Retrieving file list from SAM definition...", "info") + +file_list = processor.get_file_list( + defname="nts.mu2e.ensembleMDS3aMix1BBTriggered.MDC2025-001.root" +) + +if file_list and len(file_list) > 0: + logger.log(f"Retrieved {len(file_list)} files from SAM definition", "success") + logger.log(f"First file: {file_list[0][-80:]}", "info") + + # Save to temporary file for use with process_data + import tempfile + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as tmp: + for f in file_list: + tmp.write(f + '\n') + temp_file_list = tmp.name + + logger.log(f"Saved file list to temporary file: {temp_file_list}", "info") + + # ==================================================================== + # 4. Processing Multiple Files with DaskProcessor + # ==================================================================== + + logger.log("\n" + "=" * 70, "info") + logger.log("Step 3: Processing Multiple Files with DaskProcessor", "info") + logger.log("=" * 70, "info") + + logger.log("Starting parallel processing with DaskProcessor...", "info") + + try: + data = processor.process_data( + file_list_path=temp_file_list, + branches=branches, + n_workers=4, # Use 4 parallel workers + show_progress=True # Show progress bar + ) + + logger.log("Data aggregation complete", "success") + logger.log(f"Total events from all files: {len(data)}", "info") + + # Clean up temp file + import os + os.unlink(temp_file_list) + logger.log("Cleaned up temporary file", "info") + + # ==================================================================== + # 5. Applying Selection Cuts + # ==================================================================== + + logger.log("\n" + "=" * 70, "info") + logger.log("Step 4: Applying Selection Cuts", "info") + logger.log("=" * 70, "info") + + selector = Select(verbosity=1) + + logger.log("Selecting track segments at tracker entrance (TT_Front)...", "info") + + # Create a mask to select track segments at the tracker entrance + at_trkent = selector.select_surface( + data=data, + surface_name="TT_Front" # Tracker entrance + ) + + # Add the mask to the data array + data["at_trkent"] = at_trkent + + # Apply the mask + trkent = data[at_trkent] + + logger.log(f"Selected {len(trkent)} events at tracker entrance", "success") + + # ==================================================================== + # 6. Inspecting Your Data + # ==================================================================== + + logger.log("\n" + "=" * 70, "info") + logger.log("Step 5: Inspecting Data", "info") + logger.log("=" * 70, "info") + + printer = Print(verbose=False) + + logger.log("Data structure at tracker entrance:", "info") + printer.print_n_events(trkent, n_events=1) + + # ==================================================================== + # 7. Performing Vector Operations + # ==================================================================== + + logger.log("\n" + "=" * 70, "info") + logger.log("Step 6: Performing Vector Operations", "info") + logger.log("=" * 70, "info") + + vector = Vector(verbosity=1) + + logger.log("Computing momentum magnitude...", "info") + + mom_mag = vector.get_mag( + branch=trkent["trksegs"], + vector_name="mom" + ) + + logger.log("Momentum magnitude computed", "success") + + # ==================================================================== + # 8. Creating Plots + # ==================================================================== + + logger.log("\n" + "=" * 70, "info") + logger.log("Step 7: Creating Plots", "info") + logger.log("=" * 70, "info") + + plotter = Plot() + + # Flatten arrays for plotting + time_flat = ak.flatten(trkent["trksegs"]["time"], axis=None) + mom_mag_flat = ak.flatten(mom_mag, axis=None) + + logger.log(f"Time values to plot: {len(time_flat)}", "info") + logger.log(f"Momentum values to plot: {len(mom_mag_flat)}", "info") + + # 1D Histogram: Time distribution + logger.log("Creating 1D histogram of time distribution...", "info") + + plotter.plot_1D( + time_flat, + nbins=100, + xmin=450, + xmax=1695, + title="Time at Tracker Entrance (Dask Example)", + xlabel="Fit time at Trk Ent [ns]", + ylabel="Events per bin", + out_path='h1_time_dask.png', + stat_box=True, + error_bars=True + ) + + logger.log("1D histogram created: h1_time_dask.png", "success") + + # 2D Histogram: Momentum vs. Time + logger.log("Creating 2D histogram of momentum vs. time...", "info") + + plotter.plot_2D( + x=mom_mag_flat, + y=time_flat, + nbins_x=100, + xmin=85, + xmax=115, + nbins_y=100, + ymin=450, + ymax=1650, + title="Momentum vs. Time at Tracker Entrance (Dask Example)", + xlabel="Fit mom at Trk Ent [MeV/c]", + ylabel="Fit time at Trk Ent [ns]", + out_path='h2_timevmom_dask.png' + ) + + logger.log("2D histogram created: h2_timevmom_dask.png", "success") + + # ==================================================================== + # Summary + # ==================================================================== + + logger.log("\n" + "=" * 70, "info") + logger.log("SUMMARY", "info") + logger.log("=" * 70, "info") + + summary = f""" +Multi-File Processing with DaskProcessor: + - Total files processed: {len(file_list)} + - Total events aggregated: {len(data)} + - Events at tracker entrance: {len(trkent)} + - Momentum range: {ak.min(mom_mag_flat):.2f} - {ak.max(mom_mag_flat):.2f} MeV/c + - Time range: {ak.min(time_flat):.0f} - {ak.max(time_flat):.0f} ns + +Output plots created: + - h1_time_dask.png (1D time distribution) + - h2_timevmom_dask.png (2D momentum vs time) + +Key advantages of DaskProcessor: +✓ Process multiple files in parallel +✓ Automatic load balancing across cores +✓ Progress tracking with show_progress=True +✓ Easy scaling to remote clusters +✓ Same output format as standard Processor +✓ Built-in error resilience with retries +""" + logger.log(summary, "info") + + except FileNotFoundError: + logger.log("SAM definition files not found.", "error") + raise + except Exception as e: + logger.log(f"Error during processing: {e}", "error") + raise + +logger.log("\nScript completed!", "success") diff --git a/pyutils/pydask.py b/pyutils/pydask.py new file mode 100644 index 0000000..0b5f16d --- /dev/null +++ b/pyutils/pydask.py @@ -0,0 +1,172 @@ +"""Dask-based Processor wrapper for pyutils + +Provides a `DaskProcessor` class that mirrors the public API of +`Processor` in `pyprocess.py` but runs the per-file worker function using +Dask (local or distributed). This file intentionally leaves the original +`pyprocess.py` unchanged and reuses its module-level `_worker_func`. + +Usage: + from pyutils.pydask import DaskProcessor + + dp = DaskProcessor() + arr = dp.process_data(file_list_path="files.txt", branches=..., n_workers=4) + +This is a lightweight adapter intended to be a drop-in alternative to +`Processor.process_data` for users who want to run on a Dask cluster. +""" +from __future__ import annotations + +from typing import List, Optional, Dict, Tuple +import awkward as ak +from dask import delayed +from dask.distributed import Client, progress + +from .pyprocess import _worker_func, Processor + + +class DaskProcessor: + """Processor-like class that uses Dask for parallel file processing. + + Constructor arguments mirror `Processor` where relevant. + """ + + def __init__( + self, + tree_path: str = "EventNtuple/ntuple", + use_remote: bool = False, + location: str = "tape", + schema: str = "root", + verbosity: int = 1, + worker_verbosity: int = 0, + ): + # Reuse Processor for file-list utilities and logging + self._base = Processor( + tree_path=tree_path, + use_remote=use_remote, + location=location, + schema=schema, + verbosity=verbosity, + worker_verbosity=worker_verbosity, + ) + + def get_file_list(self, defname=None, file_list_path=None): + return self._base.get_file_list(defname=defname, file_list_path=file_list_path) + + def process_data( + self, + file_name: Optional[str] = None, + file_list_path: Optional[str] = None, + defname: Optional[str] = None, + branches: Optional[Dict] = None, + n_workers: Optional[int] = None, + threads_per_worker: int = 1, + processes: bool = False, + scheduler_address: Optional[str] = None, + show_progress: bool = True, + retries: int = 0, + custom_worker_func=None, + ) -> Optional[ak.Array]: + """Process files using Dask. Mirrors Processor.process_data semantics. + + Either provide a single `file_name` or one of `file_list_path`/`defname`. + If `scheduler_address` is provided, connects to that cluster; otherwise + starts a local `Client` which is closed before returning. + """ + + # Validate input sources + file_sources = sum(x is not None for x in [file_name, defname, file_list_path]) + if file_sources != 1: + self._base.logger.log("Please provide exactly one of 'file_name', 'file_list_path', or defname'", "error") + return None + + # Validate custom worker + if custom_worker_func is not None: + if not callable(custom_worker_func): + self._base.logger.log("custom_worker_func is not callable", "error") + return None + + # Single-file shortcut + if file_name: + if custom_worker_func is None: + worker = lambda fname: _worker_func( + fname, + branches=branches, + tree_path=self._base.tree_path, + use_remote=self._base.use_remote, + location=self._base.location, + schema=self._base.schema, + verbosity=self._base.worker_verbosity, + ) + else: + worker = custom_worker_func + + try: + result = worker(file_name) + return result + except Exception as e: + self._base.logger.log(f"Error processing {file_name}: {e}", "error") + return None + + # Prepare file list + file_list = self.get_file_list(defname=defname, file_list_path=file_list_path) + if not file_list: + self._base.logger.log("Results list has length zero", "warning") + return None + + # Choose worker function + if custom_worker_func is None: + # use module-level _worker_func as in pyprocess + def _wrap(fname): + return _worker_func( + fname, + branches=branches, + tree_path=self._base.tree_path, + use_remote=self._base.use_remote, + location=self._base.location, + schema=self._base.schema, + verbosity=self._base.worker_verbosity, + ) + + worker_func = _wrap + else: + worker_func = custom_worker_func + + client: Optional[Client] = None + created_client = False + try: + if scheduler_address: + client = Client(scheduler_address) + self._base.logger.log(f"Connected to Dask scheduler at {scheduler_address}", "info") + else: + client = Client(n_workers=n_workers, threads_per_worker=threads_per_worker, processes=processes) + created_client = True + self._base.logger.log(f"Started local Dask client: {client}", "info") + + # Create delayed tasks + tasks = [delayed(worker_func)(fname) for fname in file_list] + + futures = client.compute(tasks, retries=retries) + + if show_progress: + try: + progress(futures) + except Exception: + pass + + results = client.gather(futures) + + results = [r for r in results if r is not None] + if not results: + self._base.logger.log("Dask returned no successful results", "warning") + return None + + concatenated = ak.concatenate(results) + self._base.logger.log(f"Returning concatenated array containing {len(concatenated)} events", "success") + return concatenated + + finally: + if created_client and client is not None: + client.close() + + +__all__ = ["DaskProcessor"] diff --git a/setup.py b/setup.py index 99061e0..ee50f77 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name="pyutils", - version="1.4.0", + version="1.9.0", author="Sophie Middleton, Samuel Grant, Andrew Edmonds, Leo Borrel", description="Python tools for Mu2e collaborators", url="https://github.com/Mu2e/pyutils",