inference package¶
Submodules¶
inference.inference¶
DeepRM Inference Module
This program handles the inference process for DeepRM models, including loading the model, processing input data, and saving the output predictions.
- deeprm.inference.inference.add_arguments(parser)[source]¶
Adds command-line arguments.
- Parameters:
parser (
argparse.ArgumentParser) – Argument parser to which arguments will be added.- Returns:
None
- deeprm.inference.inference.main(args)[source]¶
Main function to run the evaluation pipeline.
- Parameters:
args (
argparse.Namespace) – Parsed command-line arguments.- Returns:
None
Notes
Parse command-line arguments.
Create necessary directories.
Run inference.
- deeprm.inference.inference.run_inference(args)[source]¶
Runs the inference process.
- Parameters:
args (
argparse.Namespace) – Parsed command-line arguments.- Returns:
None
- deeprm.inference.inference.inference_worker(rank, args_dict)[source]¶
Worker function for running inference on a single GPU.
- deeprm.inference.inference.to_device(data, device)[source]¶
Transfers data to the specified GPU device using a non-blocking stream.
- Parameters:
data (
dict) – Dictionary containing the data to be transferred.device (
torch.device) – The target GPU device.
- Returns:
A tuple containing the transferred data tensors (src_kmer, src_signal, src_seg_len, src_dwell_bq).
- Return type:
- deeprm.inference.inference.inference_loop(args_dict, rank, device, model, data_loader, start_index=0)[source]¶
Runs the inference loop for the given model and data loader.
- Parameters:
args_dict (
dict) – Dictionary of command-line arguments.rank (
int) – Rank of the current process.gpu_id (
int) – ID of the GPU to use.model (
torch.nn.Module) – The model to run inference on.data_loader (
torch.utils.data.DataLoader) – DataLoader for the dataset.
- Returns:
None
inference.inference_dataloader¶
Dataloader for Nanopore Dataset from NPZ Files
This module provides an iterator and dataset class for loading Nanopore data from NPZ files. It supports parallel reading of files and batching of data for efficient processing in PyTorch.
- class deeprm.inference.inference_dataloader.NanoporeDatasetIterator(file_paths, cb_len=21, kmer_len=5, sampling=6, sig_window=5, max_workers=4)[source]¶
Bases:
objectIterator for loading Nanopore dataset from NPZ files.
- Parameters:
- class deeprm.inference.inference_dataloader.NanoporeDataset(*args, **kwargs)[source]¶
Bases:
IterableDatasetIterable dataset for loading Nanopore data from NPZ files.
- Parameters:
data_path (
str) – Path to the directory containing NPZ files.rank (
int) – Rank of the current process.num_replicas (
int) – Number of replicas.seed (
int) – Random seed.num_files_read_once (
int) – Number of files to read at once.cb_len (
int) – Context block length.kmer_len (
int) – K-mer length.sampling (
int) – Sampling rate.sig_window (
int) – Signal window size.resume_from (
int) – Number of files to skip from the start.
- class deeprm.inference.inference_dataloader.NanoporeDataLoader(*args, **kwargs)[source]¶
Bases:
DataLoaderDataLoader for loading Nanopore data.
- Parameters:
dataset (
NanoporeDataset) – The dataset to load data from.num_workers (
int) – Number of worker processes.pin_memory (
bool) – Whether to pin memory.drop_last (
bool) – Whether to drop the last incomplete batch.collate_fn (
typing.Callable) – Function to collate data into batches.prefetch_factor (
int) – Number of batches to prefetch.
- deeprm.inference.inference_dataloader.load_dataset(data_path, rank, num_replicas, num_files_read_once=1, prefetch_factor=2, worker=16, cb_len=21, kmer_len=5, sampling=6, sig_window=5, resume_from=0)[source]¶
Loads the Nanopore dataset using DataLoader.
- Parameters:
data_path (
str) – Path to the directory containing NPZ files.batch_size (
int) – Batch size for loading data.rank (
int) – Rank of the current process.num_replicas (
int) – Number of replicas.pad_to (
int) – Padding length for sequences.bq_clip (
int) – Base quality clipping value.num_files_read_once (
int) – Number of files to read at once.prefetch_factor (
int) – Number of batches to prefetch.worker (
int) – Number of worker processes.cb_len (
int) – Context block length.kmer_len (
int) – K-mer length.sampling (
int) – Sampling rate.sig_window (
int) – Signal window size.resume_from (
int) – Number of files to skip from the start.
- Returns:
DataLoader for loading the dataset.
- Return type:
inference.inference_preprocess¶
Wrapper that uses a C++ preprocessing binary when available.
- deeprm.inference.inference_preprocess.add_arguments(parser)[source]¶
Add CLI arguments, mirroring the Python preprocessing module.
- Parameters:
parser (ArgumentParser)
- Return type:
None
inference.inference_preprocess_python¶
DeepRM Inference Preprocessing Module
This script segments and normalizes raw signal data from POD5 files and corresponding BAM alignments. It extracts dwell times, context blocks, and signal windows, then tokenizes and saves them as compressed .npz chunks for downstream modeling.
Key steps: 1. Parse POD5 and BAM to collect signal and alignment metadata. 2. Compute dwell-time and normalize signal windows. 3. Segment signals based on move, and then format into fixed-length blocks. 4. Save processed tokens in chunks for model input.
- deeprm.inference.inference_preprocess_python.add_arguments(parser)[source]¶
Adds command-line arguments. :param parser: Argument parser to which arguments will be added. :type parser:
argparse.ArgumentParser- Returns:
None
- Parameters:
parser (ArgumentParser)
- deeprm.inference.inference_preprocess_python.wait_for_processes(proc_list, stage_name)[source]¶
Join a list of processes and raise if any subprocess failed.
- Parameters:
proc_list (
list[multiprocessing.Process]) – child processes to join.stage_name (
str) – human-readable pipeline stage name.
- Returns:
None
- deeprm.inference.inference_preprocess_python.main(args)[source]¶
Run the full preprocessing pipeline with multiprocessing.
Steps: 1. Parse arguments and prepare output. 2. Spawn processes to parse BAM data. 3. Consolidate BAM DataFrame. 4. Spawn processes to segment and normalize POD5 signals. 5. Finalize and exit.
- Parameters:
args (
argparse.Namespace) – Parsed command-line arguments.- Returns:
None
- deeprm.inference.inference_preprocess_python.get_norm_factor()[source]¶
Return default normalization factors for signal and dwell scaling.
- Returns:
Keys ‘quantile_a’,’quantile_b’,’shift_mult’,’scale_mult’.
- Return type:
- deeprm.inference.inference_preprocess_python.mean_phred(phred)[source]¶
Calculate the mean Phred quality score from an array of scores. :param phred: Array of Phred scores. :type phred:
numpy.ndarray- Returns:
Mean Phred quality score.
- Return type:
- deeprm.inference.inference_preprocess_python.segmented_signal_to_block(signal_segmented, segment_len_arr, kmer, sampling, sig_window, pad_to)[source]¶
Convert segmented signal into a fixed-length block for given k-mer context.
- Parameters:
signal_segmented (
list) – list of signal segments around each base. (list of numpy.ndarray)segment_len_arr (
numpy.ndarray) – lengths of each segment in sampling units.kmer (
int) – length of k-mer context.sampling (
int) – samples per signal unit.sig_window (
int) – size of local signal window for padding calculation.pad_to (
int) – desired total length of output block in signal units.
- Returns:
concatenated, trimmed, and padded signal block, or None on failure.
- Return type:
numpy.ndarray or None
- deeprm.inference.inference_preprocess_python.create_segment_len_arr(segment_arr, sampling)[source]¶
Compute segment-length array in sampling units for each sub-segment.
- Parameters:
- Returns:
integer length array per segment after downsampling.
- Return type:
- deeprm.inference.inference_preprocess_python.move_to_dwell(move, quantile_a, quantile_b, shift_mult, scale_mult, sampling=6)[source]¶
Transform raw move array into scaled dwell-time tokens.
- Parameters:
- Returns:
standardized dwell-time values, or None for invalid input.
- Return type:
numpy.ndarray or None
Notes
Convert boolean moves into positions and compute deltas.
Log-transform dwell durations.
Scale and shift based on quantiles and multipliers.
- deeprm.inference.inference_preprocess_python.normalise_trim_segment_signal(signal, move, sp, ts, ns, quantile_a, quantile_b, shift_mult, scale_mult, sampling=6)[source]¶
Normalize and segment raw signal based on trimming and dwell indices.
- Parameters:
signal (
numpy.ndarray) – raw signal trace.move (
numpy.ndarray) – dwell-time tokens.sp (
int) – samples to skip at start.ts (
int) – trim start index.ns (
int) – trim end index (0 means till end).quantile_a (
float) – lower quantile threshold.quantile_b (
float) – upper quantile threshold.shift_mult (
float) – shift multiplier.scale_mult (
float) – scale multiplier.sampling (
int) – samples per signal unit.
- Returns:
list of per-base signal segments or None on error.
- Return type:
list or None
Notes
Trim start (sp) and segment window (ts:ns).
Flip signal for reverse processing.
Shift and scale signal by quantile multipliers.
Split by dwell move indices to segment per-base.
- deeprm.inference.inference_preprocess_python.parse_pod5(pod5_path, read_ids)[source]¶
Read signals and calibration from a POD5 file and return as DataFrame.
- Parameters:
- Returns:
with columns [‘signal’, ‘offset’, ‘scale’], indexed by ‘read_id’.
- Return type:
- deeprm.inference.inference_preprocess_python.parse_bam(pid, n_procs, n_thread, bam_data, bam_path, bq_cutoff, boi, expected_sampling)[source]¶
Extract move tags and alignment information from a BAM file in parallel.
- Parameters:
pid (
int) – process ID for sharding.n_procs (
int) – total number of processes.n_thread (
int) – thread for BAM reading.bam_data (
list) – multiprocessing.Manager list to collect DataFrames.bam_path (
str) – path to the BAM file.bq_cutoff (
int) – minimum average base quality threshold.boi (
str) – base-of-interest for alignment extraction.expected_sampling (
int) – expected move-tag stride in samples.
- Returns:
None (appends DataFrame to bam_data).
- deeprm.inference.inference_preprocess_python.segment_normalize_signal(bam_df, pod5_paths, norm_factor, pid, token_output_path, cb_len=21, kmer_len=5, chunk_size=10000, max_token_len=200, sampling=6, dwell_shift=10, sig_window=5, process_once=1000, label_div=1000000000)[source]¶
Segment and normalize signals per read, and save token chunks.
- Parameters:
bam_df (
pandas.DataFrame) – alignment metadata indexed by parent_id.pod5_paths (
list) – list of POD5 file paths.norm_factor (
dict) – normalization parameters.pid (
int) – process ID for naming outputs.token_output_path (
str) – directory to write .npz chunks.cb_len (
int) – context block length.kmer_len (
int) – k-mer length for segmentation.chunk_size (
int) – number of tokens per output file.max_token_len (
int) – maximum length of token in signal units.sampling (
int) – samples per signal unit.dwell_shift (
int) – shift for dwell token alignment.sig_window (
int) – local signal window size.process_once (
int) – reads to process per batch.label_div (
int) – label division factor for unique ID generation.
- Returns:
None (writes .npz files).
- deeprm.inference.inference_preprocess_python.save_npz(save_path, df)[source]¶
Serialize token DataFrame to a compressed .npz file.
- Parameters:
save_path (
str) – Output .npz file path.df (
pandas.DataFrame) – DataFrame with token columns.
- Returns:
None
inference.pileup_deeprm¶
DeepRM Pileup (Post-Processing) Module
This script performs post-processing on DeepRM prediction files to generate a pileup. It reads .npz prediction arrays, groups statistics by label IDs, and computes metrics.
The two metrics calculated are: 1. modscore: A score reflecting the site-level modification probability. (0-1 range) 2. stoichiometry: Estimated modification stoichiometry of the site. (0-1 range)
Finally, it writes a .npz file containing the results.
- deeprm.inference.pileup_deeprm.add_arguments(parser)[source]¶
Adds command-line arguments. :param parser: Argument parser to which arguments will be added. :type parser:
argparse.ArgumentParser- Returns:
None
- Parameters:
parser (ArgumentParser)
- deeprm.inference.pileup_deeprm.shm_put_array(name, arr)[source]¶
Create a SharedMemory block named name and copy arr into it.
- deeprm.inference.pileup_deeprm.shm_view(meta)[source]¶
Open a SharedMemory block and create a numpy view from its buffer.
The parent process only borrows these segments; workers remain the sole owners responsible for unlinking them after the parent acknowledges it is done reading. Immediately unregister the borrowed attachment from this process’s resource_tracker so interpreter shutdown does not try to unlink a segment that the creating worker already removed.
- deeprm.inference.pileup_deeprm.main(args)[source]¶
Main function: spawns worker processes, aggregates results, computes final metrics, and writes output .npz file.
- Parameters:
args (
argparse.Namespace) – Parsed command-line arguments.- Returns:
Results are saved to a .npz file in the specified output directory.
- Return type:
None
- deeprm.inference.pileup_deeprm.grouped_sum(n_unique, idx, vals)[source]¶
Sum values in ‘vals’ according to group indices ‘idx’.
- Parameters:
n_unique (
int) – Number of unique groups.idx (
numpy.ndarray) – Integer indices mapping each element in ‘vals’ to a group.vals (
numpy.ndarray) – Values to sum per group.
- Returns:
Array of summed values of length n_unique.
- Return type:
- deeprm.inference.pileup_deeprm.worker(pid, file_paths, meta_q, ack_q, shard_q, bam_path, output_dir, label_div, slice_idx=None, threshold_pos=0.98, epsilon=1e-30, flip=False, make_modbam=True)[source]¶
Worker function to process a subset of prediction files. Computes per-label statistics and writes a modBAM shard locally when requested.
- Parameters:
pid (
int) – Process ID for indexing results.file_paths (
list) – List of .npz input file paths.keys (
list) – List of data keys to compute/store.shared_dict (
dict) – Shared structure for results.slice (
int) – Column for 2D predictions. Defaults to None. (optional)threshold_pos (
float) – Threshold to count positive predictions.epsilon (
float) – Small constant for log and division safety.flip (
bool) – Whether to invert probabilities (1 - p).
- Returns:
Results are stored in shared_dict.
- Return type:
None
- deeprm.inference.pileup_deeprm.bed_formatter(ref_names, ref_pos, ref_strand, modscore, stoichiometry, count_all, count_pos, output_path)[source]¶
Formats the results into a BED-like structure.
- Parameters:
ref_names (
numpy.ndarray) – Array of reference names.ref_pos (
numpy.ndarray) – Array of reference positions.ref_strand (
numpy.ndarray) – Array of reference strands.modscore (
numpy.ndarray) – modscore scores.stoichiometry (
numpy.ndarray) – stoichiometry scores.count_all (
numpy.ndarray) – Total counts.count_pos (
numpy.ndarray) – Positive counts.
- Returns:
List of formatted strings for each entry.
- Return type:
inference.pileup_genomic¶
Utilities for mapping transcript-relative coordinates to genomic coordinates and aggregating per-site statistics across a transcriptome.
This module provides:
TranscriptMapper: A vectorized mapper that converts 0-based, half-open transcript coordinates into genomic coordinates for both ‘+’ and ‘-’ strands, returning np.nan for out-of-bounds inputs.
worker: A multiprocessing worker that applies the mapper per transcript and aggregates metrics.
parse_refflat: A helper to parse RefFlat/RefGene/GenePred annotations into a normalized DataFrame.
load_split_data: A helper that splits grouped input data into balanced shards for parallel processing.
pileup_genomic: A high-level function that produces a per-(chrom, strand, pos) pileup with derived scores.
All exon intervals are assumed to be 0-based, half-open [start, end), sorted by genomic start in ascending order.
- class deeprm.inference.pileup_genomic.TranscriptMapper(exon_starts, exon_ends, strand)[source]¶
Bases:
objectVectorized mapper from transcript coordinates to genomic coordinates.
This class maps 0-based, half-open transcript offsets into genomic 0-based positions using exon interval metadata. It supports both ‘+’ and ‘-’ strands and returns np.nan for any input coordinate that falls outside the valid transcript range [0, total_len) or is non-finite.
- starts¶
1D array of exon start genomic coordinates (0-based, inclusive), sorted ascending.
- Type:
- ends¶
1D array of exon end genomic coordinates (0-based, exclusive), same shape as starts.
- Type:
- lengths¶
Exon lengths, computed as ends - starts.
- Type:
- cumsum¶
Precomputed cumulative exon lengths. For ‘+’, cumsum[i] is the total length before exon i; for ‘-’, it is computed over reversed exons to enable reverse mapping.
- Type:
- Raises:
ValueError – If strand is not ‘+’ or ‘-’.
- Parameters:
- map(coords)[source]¶
Map transcript offsets to genomic positions (vectorized).
Input coordinates are treated as 0-based offsets into the concatenated transcript exonic sequence with a valid range [0, total_len). Values outside this range or non-finite values (NaN/Inf) yield np.nan in the output. The result is always float64 to accommodate NaNs.
For ‘+’ strand: genomic position = starts[idx] + offset. For ‘-’ strand: genomic position = ends[idx] - 1 - offset.
- Parameters:
coords (
numpy.ndarray) – Array-like of transcript offsets to map. May be any shape; the returned array will match this shape.- Returns:
Array of genomic positions (dtype float64) with np.nan for out-of-bounds or non-finite inputs. Positions are 0-based.
- Return type:
Notes
Assumes exon intervals are half-open [start, end) and sorted by genomic start ascending. It should be validated in parse_refflat().
No exceptions are raised for invalid coords; they are marked as NaN.
- deeprm.inference.pileup_genomic.worker(df_list, refflat_df, collect_list)[source]¶
Multiprocessing worker that maps transcript to genomic positions and aggregates metrics.
Iterates over per-transcript DataFrames, maps the ref_pos transcript offsets to genomic positions using TranscriptMapper, drops rows with NaN genomic positions, and aggregates metrics per (chrom, strand, pos). Results are appended to a shared multiprocessing.Manager().list() for collection by the parent process.
- Parameters:
df_list (
list) – List of DataFrames, each corresponding to a single transcript. Each DataFrame is expected to contain: - transcript_id (str): All rows share the same ID. - ref_pos (int): Transcript-relative offsets (0-based). - Metric columns used for aggregation: kl_div_neg, kl_div_pos, count_all, count_pos, logsum_1_p_pos.refflat_df (
pandas.DataFrame) – Annotation DataFrame indexed by transcript_id. Must provide columns: exonStarts (numpy.ndarray), exonEnds (numpy.ndarray), strand (‘+’|’-‘), and chrom (str).collect_list (
list) – Shared list where the worker appends its aggregated result DataFrame.
- Returns:
- Results are appended to collect_list as a pandas.DataFrame with columns:
chrom, strand, pos, kl_div_neg, kl_div_pos, count_all, count_pos, logsum_1_p_pos.
- Return type:
None
Notes
Transcripts missing in refflat_df or with invalid mapping are skipped.
Any unexpected exception within a transcript block is caught and skipped, allowing the worker to continue processing subsequent transcripts.
A tqdm progress bar is displayed with leave=False.
- deeprm.inference.pileup_genomic.load_split_data(data_df, cpu)[source]¶
Group by transcript and split into balanced shards for parallel processing.
The input is grouped by transcript_id (renamed from ref_names), sorted by descending group size, then distributed across up to cpu shards using a zig-zag assignment (0..cpu-1..0) to balance large and small groups.
- Parameters:
data_df (
pandas.DataFrame) – Input DataFrame containing at least: - ref_names (str): Transcript identifier per row; will be renamed to transcript_id. - Other columns required downstream (e.g., ref_pos, metrics).cpu (
int) – Maximum number of shards (typically the number of worker processes).
- Returns:
A list of length min(cpu, n_groups) where each element is a list of per-transcript DataFrames to be handled by one worker.
- Return type:
Notes
Groups are sorted by size to improve load balancing.
The zig-zag distribution helps avoid piling all large groups onto early shards.
- deeprm.inference.pileup_genomic.parse_refflat(refflat_path)[source]¶
Parse RefFlat/RefGene/GenePred annotation into a normalized DataFrame.
This function reads a tab-delimited annotation file and normalizes it into a common schema with the following columns: transcript_id, chrom, strand, txStart, txEnd, cdsStart, cdsEnd, exonCount, exonStarts, exonEnds.
It accepts three formats based on column count: * 11 columns (RefFlat): drops the first column. * 15 columns (RefGene): keeps the first 10 columns. * 10 columns (GenePred): uses as-is.
Exon start/end lists are expected as comma-separated strings with a trailing comma (UCSC style) and are converted to numpy.ndarray of int. Invalid transcripts are filtered out based on interval consistency and ordering. The resulting DataFrame is indexed by transcript_id.
- Parameters:
refflat_path (
str) – Path to the annotation file.- Returns:
- Normalized and validated annotation indexed by transcript_id.
Columns: - chrom (str) - strand (str; ‘+’ or ‘-‘) - txStart, txEnd, cdsStart, cdsEnd (int) - exonCount (int) - exonStarts, exonEnds (numpy.ndarray of int; 0-based, half-open)
- Return type:
- Raises:
ValueError – If the file has an unexpected number of columns or if no valid transcripts remain after validation.
Notes
- Validation ensures: txEnd > txStart, cdsEnd >= cdsStart, exonCount > 0,
len(exonStarts) == len(exonEnds) == exonCount, strictly positive exon lengths, and non-decreasing starts/ends across exons.