"""Utilities for mapping transcript-relative coordinates to genomic coordinates
and aggregating per-site statistics across a transcriptome.
This module provides:
* `TranscriptMapper`: A vectorized mapper that converts 0-based, half-open
transcript coordinates into genomic coordinates for both '+' and '-' strands,
returning `np.nan` for out-of-bounds inputs.
* `worker`: A multiprocessing worker that applies the mapper per transcript and
aggregates metrics.
* `parse_refflat`: A helper to parse RefFlat/RefGene/GenePred annotations into a
normalized DataFrame.
* `load_split_data`: A helper that splits grouped input data into balanced shards
for parallel processing.
* `pileup_genomic`: A high-level function that produces a per-(chrom, strand, pos)
pileup with derived scores.
All exon intervals are assumed to be 0-based, half-open `[start, end)`, sorted by
genomic start in ascending order.
"""
import gc
import multiprocessing as mp
import numpy as np
import pandas as pd
from tqdm import tqdm
[docs]
class TranscriptMapper:
"""Vectorized mapper from transcript coordinates to genomic coordinates.
This class maps 0-based, half-open transcript offsets into genomic 0-based
positions using exon interval metadata. It supports both '+' and '-' strands
and returns `np.nan` for any input coordinate that falls outside the valid
transcript range `[0, total_len)` or is non-finite.
Attributes:
starts (numpy.ndarray): 1D array of exon start genomic coordinates (0-based, inclusive),
sorted ascending.
ends (numpy.ndarray): 1D array of exon end genomic coordinates (0-based, exclusive),
same shape as `starts`.
lengths (numpy.ndarray): Exon lengths, computed as `ends - starts`.
total_len (int): Total transcript length, i.e., `lengths.sum()`.
strand (str): Strand symbol, either `'+'` or `'-'`.
cumsum (numpy.ndarray): Precomputed cumulative exon lengths. For `'+'`,
`cumsum[i]` is the total length before exon `i`; for `'-'`, it is
computed over reversed exons to enable reverse mapping.
Raises:
ValueError: If `strand` is not `'+'` or `'-'`.
"""
def __init__(self, exon_starts: np.ndarray, exon_ends: np.ndarray, strand: str):
"""Initialize the mapper with exon intervals and strand.
Args:
exon_starts (numpy.ndarray): 1D array of exon starts (0-based, inclusive),
sorted ascending by genomic coordinate.
exon_ends (numpy.ndarray): 1D array of exon ends (0-based, exclusive),
same shape as `exon_starts`. Each `end` must be strictly greater
than its corresponding `start`.
strand (str): `'+'` or `'-'`.
Raises:
ValueError: If `strand` is not `'+'` or `'-'`.
"""
self.starts = exon_starts
self.ends = exon_ends
self.lengths = self.ends - self.starts
self.total_len = int(self.lengths.sum())
self.strand = strand
# Precompute cumulative sums
if self.strand == "+":
self.cumsum = np.concatenate(([0], np.cumsum(self.lengths)))
elif self.strand == "-":
self.cumsum = np.concatenate(([0], np.cumsum(self.lengths[::-1])))
else:
raise ValueError("strand must be '+' or '-'.")
[docs]
def map(self, coords: np.ndarray) -> np.ndarray:
"""Map transcript offsets to genomic positions (vectorized).
Input coordinates are treated as 0-based offsets into the concatenated
transcript exonic sequence with a valid range `[0, total_len)`. Values
outside this range or non-finite values (NaN/Inf) yield `np.nan` in the
output. The result is always `float64` to accommodate NaNs.
For `'+'` strand: genomic position = `starts[idx] + offset`.
For `'-'` strand: genomic position = `ends[idx] - 1 - offset`.
Args:
coords (numpy.ndarray): Array-like of transcript offsets to map. May be any
shape; the returned array will match this shape.
Returns:
numpy.ndarray: Array of genomic positions (dtype `float64`) with `np.nan`
for out-of-bounds or non-finite inputs. Positions are 0-based.
Notes:
* Assumes exon intervals are half-open `[start, end)` and sorted by
genomic start ascending. It should be validated in parse_refflat().
* No exceptions are raised for invalid `coords`; they are marked as NaN.
"""
out = np.full(coords.shape, np.nan, dtype=np.float64)
# Valid coords in [0, total_len)
# also guard against inf/NaN in input by requiring finite
valid = np.isfinite(coords) & (coords >= 0) & (coords < self.total_len)
if not np.any(valid):
return out
coords = coords[valid].astype(np.int64, copy=False)
if self.strand == "+":
idx = np.searchsorted(self.cumsum, coords, side="right") - 1
offsets = coords - self.cumsum[idx]
out[valid] = (self.starts[idx] + offsets).astype(np.float64, copy=False)
else:
r_idx = np.searchsorted(self.cumsum, coords, side="right") - 1
offsets = coords - self.cumsum[r_idx]
idx = self.starts.size - r_idx - 1
out[valid] = (self.ends[idx] - offsets - 1).astype(np.float64, copy=False)
return out
[docs]
def worker(df_list, refflat_df, collect_list):
"""Multiprocessing worker that maps transcript to genomic positions and aggregates metrics.
Iterates over per-transcript DataFrames, maps the `ref_pos` transcript offsets to
genomic positions using `TranscriptMapper`, drops rows with NaN genomic positions,
and aggregates metrics per (chrom, strand, pos). Results are appended to a shared
`multiprocessing.Manager().list()` for collection by the parent process.
Args:
df_list (list): List of DataFrames, each corresponding to a single
transcript. Each DataFrame is expected to contain:
- `transcript_id` (str): All rows share the same ID.
- `ref_pos` (int): Transcript-relative offsets (0-based).
- Metric columns used for aggregation:
`kl_div_neg`, `kl_div_pos`, `count_all`, `count_pos`, `logsum_1_p_pos`.
refflat_df (pandas.DataFrame): Annotation DataFrame indexed by `transcript_id`.
Must provide columns: `exonStarts` (numpy.ndarray), `exonEnds` (numpy.ndarray),
`strand` ('+'|'-'), and `chrom` (str).
collect_list (list): Shared list where the worker
appends its aggregated result DataFrame.
Returns:
None: Results are appended to `collect_list` as a `pandas.DataFrame` with columns:
`chrom`, `strand`, `pos`, `kl_div_neg`, `kl_div_pos`, `count_all`,
`count_pos`, `logsum_1_p_pos`.
Notes:
* Transcripts missing in `refflat_df` or with invalid mapping are skipped.
* Any unexpected exception within a transcript block is caught and skipped,
allowing the worker to continue processing subsequent transcripts.
* A tqdm progress bar is displayed with `leave=False`.
"""
local_collect = []
for transcript_df in tqdm(df_list, desc="Converting to genomic coordinates", leave=False):
if len(transcript_df) == 0:
continue
transcript_id = transcript_df["transcript_id"].iloc[0]
try:
refflat_row = refflat_df.loc[transcript_id]
except KeyError:
continue
exon_starts = np.asarray(refflat_row["exonStarts"], dtype=np.int64)
exon_ends = np.asarray(refflat_row["exonEnds"], dtype=np.int64)
strand = refflat_row["strand"]
chrom = refflat_row["chrom"]
mapper = TranscriptMapper(exon_starts=exon_starts, exon_ends=exon_ends, strand=strand)
mapped_df = transcript_df.copy()
mapped_df["chrom"] = chrom
mapped_df["strand"] = strand
mapped_df["pos"] = mapper.map(mapped_df["ref_pos"].to_numpy())
mapped_df = mapped_df.dropna(subset=["pos"])
if len(mapped_df) == 0:
continue
mapped_df["pos"] = mapped_df["pos"].astype(np.int64)
local_collect.append(mapped_df)
if local_collect:
df = pd.concat(local_collect, ignore_index=True)
df = df.groupby(["chrom", "strand", "pos"], as_index=False).agg(
{
"kl_div_neg": "sum",
"kl_div_pos": "sum",
"count_all": "sum",
"count_pos": "sum",
"logsum_1_p_pos": "sum",
}
)
collect_list.append(df)
return None
[docs]
def load_split_data(data_df, cpu):
"""Group by transcript and split into balanced shards for parallel processing.
The input is grouped by `transcript_id` (renamed from `ref_names`), sorted by
descending group size, then distributed across up to `cpu` shards using a
zig-zag assignment (`0..cpu-1..0`) to balance large and small groups.
Args:
data_df (pandas.DataFrame): Input DataFrame containing at least:
- `ref_names` (str): Transcript identifier per row; will be renamed to
`transcript_id`.
- Other columns required downstream (e.g., `ref_pos`, metrics).
cpu (int): Maximum number of shards (typically the number of worker processes).
Returns:
list: A list of length `min(cpu, n_groups)` where each element
is a list of per-transcript DataFrames to be handled by one worker.
Notes:
* Groups are sorted by size to improve load balancing.
* The zig-zag distribution helps avoid piling all large groups onto early shards.
"""
if cpu <= 0:
raise ValueError("cpu must be positive")
data_df = data_df.rename({"ref_names": "transcript_id"}, axis=1)
grouped = sorted(data_df.groupby("transcript_id"), key=lambda x: len(x[1]), reverse=True)
if len(grouped) == 0:
return []
n_shards = min(cpu, len(grouped))
df_list_split = [[] for _ in range(n_shards)]
cycle = max(1, n_shards)
for idx, (_, df) in enumerate(grouped):
split_idx = idx % (2 * cycle)
if split_idx >= cycle:
split_idx = 2 * cycle - split_idx - 1
df_list_split[split_idx].append(df)
return df_list_split
def _parse_exon_array(value):
if isinstance(value, np.ndarray):
return value.astype(np.int64, copy=False)
text = str(value).strip()
if text.endswith(","):
text = text[:-1]
if text == "":
return np.array([], dtype=np.int64)
return np.fromstring(text, sep=",", dtype=np.int64)
[docs]
def parse_refflat(refflat_path):
"""
Parse RefFlat/RefGene/GenePred annotation into a normalized DataFrame.
This function reads a tab-delimited annotation file and normalizes it into a
common schema with the following columns:
`transcript_id`, `chrom`, `strand`, `txStart`, `txEnd`,
`cdsStart`, `cdsEnd`, `exonCount`, `exonStarts`, `exonEnds`.
It accepts three formats based on column count:
* 11 columns (RefFlat): drops the first column.
* 15 columns (RefGene): keeps the first 10 columns.
* 10 columns (GenePred): uses as-is.
Exon start/end lists are expected as comma-separated strings with a trailing comma
(UCSC style) and are converted to `numpy.ndarray` of `int`. Invalid transcripts are
filtered out based on interval consistency and ordering. The resulting DataFrame is
indexed by `transcript_id`.
Args:
refflat_path (str): Path to the annotation file.
Returns:
pandas.DataFrame: Normalized and validated annotation indexed by `transcript_id`.
Columns:
- `chrom` (str)
- `strand` (str; '+' or '-')
- `txStart`, `txEnd`, `cdsStart`, `cdsEnd` (int)
- `exonCount` (int)
- `exonStarts`, `exonEnds` (numpy.ndarray of int; 0-based, half-open)
Raises:
ValueError: If the file has an unexpected number of columns or if no valid
transcripts remain after validation.
Notes:
* Validation ensures: `txEnd > txStart`, `cdsEnd >= cdsStart`, `exonCount > 0`,
`len(exonStarts) == len(exonEnds) == exonCount`, strictly positive exon lengths,
and non-decreasing starts/ends across exons.
"""
col_list = [
"transcript_id",
"chrom",
"strand",
"txStart",
"txEnd",
"cdsStart",
"cdsEnd",
"exonCount",
"exonStarts",
"exonEnds",
]
refflat_df = pd.read_csv(refflat_path, sep="\t", header=None)
n_cols = refflat_df.shape[1]
if n_cols == 11:
refflat_df = refflat_df.iloc[:, 1:]
elif n_cols == 15:
refflat_df = refflat_df.iloc[:, :10]
elif n_cols != 10:
raise ValueError(
"Invalid annotation file format. Expected 10 (GenePred), 11 (RefFlat), or 15 (RefGene) columns."
)
refflat_df.columns = col_list
refflat_df[["txStart", "txEnd", "cdsStart", "cdsEnd", "exonCount"]] = refflat_df[
["txStart", "txEnd", "cdsStart", "cdsEnd", "exonCount"]
].astype(np.int64)
refflat_df = refflat_df[refflat_df["txEnd"] > refflat_df["txStart"]]
refflat_df = refflat_df[refflat_df["cdsEnd"] >= refflat_df["cdsStart"]]
refflat_df = refflat_df[refflat_df["exonCount"] > 0]
refflat_df["exonStarts"] = refflat_df["exonStarts"].apply(_parse_exon_array)
refflat_df["exonEnds"] = refflat_df["exonEnds"].apply(_parse_exon_array)
refflat_df = refflat_df[refflat_df["exonStarts"].apply(len) == refflat_df["exonCount"]]
refflat_df = refflat_df[refflat_df["exonEnds"].apply(len) == refflat_df["exonCount"]]
refflat_df = refflat_df[refflat_df.apply(lambda x: np.all(x["exonEnds"] > x["exonStarts"]), axis=1)]
refflat_df = refflat_df[refflat_df.apply(lambda x: np.all(x["exonStarts"][1:] >= x["exonStarts"][:-1]), axis=1)]
refflat_df = refflat_df[refflat_df.apply(lambda x: np.all(x["exonEnds"][1:] >= x["exonEnds"][:-1]), axis=1)]
refflat_df.drop_duplicates(subset="transcript_id", keep="first", inplace=True, ignore_index=True)
if len(refflat_df) == 0:
raise ValueError("No valid transcripts found in the annotation file.")
refflat_df.set_index("transcript_id", inplace=True)
return refflat_df
[docs]
def pileup_genomic(args, input_df):
def pileup_genomic(args, input_df):
"""
Aggregate per-genomic-position metrics using multiprocessing.
Spawns up to `args.thread` worker processes to convert transcript-relative
positions to genomic coordinates and aggregate metrics across all input rows.
Requires an annotation file path at `args.annot`.
The final output is a DataFrame aggregated by `(chrom, strand, pos)` with
derived columns:
* `stoichiometry` = `kl_div_pos` / (`kl_div_neg` + `kl_div_pos`)
* `modscore` = a modification prediction score.
Args:
args (argparse.Namespace): Must contain:
- `annot` (str or path-like): Path to RefFlat/RefGene/GenePred file.
- `thread` (int): Number of worker processes to spawn.
input_df (pandas.DataFrame): Input rows containing at least:
- `ref_names` (str): Transcript ID; will be renamed to `transcript_id`.
- `ref_pos` (int): Transcript-relative position (0-based).
- `kl_div_neg`, `kl_div_pos`, `count_all`, `count_pos`, `logsum_1_p_pos`:
Metric columns to be summed.
Returns:
pandas.DataFrame: Aggregated DataFrame with columns:
`chrom`, `strand`, `pos`, `modscore`, `stoichiometry`,
`count_all`, `count_pos`.
Raises:
ValueError: If no valid genomic positions are produced (e.g., due to
mismatched or invalid annotations).
Notes:
* Uses a `multiprocessing.Manager().list()` to collect per-process results.
* `load_split_data` is used to balance workload across processes.
* The output `pos` is 0-based genomic coordinate (float in intermediate steps,
but will be integral where valid).
"""
if len(input_df) == 0:
raise ValueError("input_df is empty. No transcript pileup entries were provided.")
n_proc = getattr(args, "thread", None)
if n_proc is None:
n_proc = max(1, int(0.95 * mp.cpu_count()))
if n_proc <= 0:
raise ValueError("args.thread must be a positive integer.")
refflat_df = parse_refflat(args.annot)
df_list_split = load_split_data(input_df, n_proc)
if len(df_list_split) == 0:
raise ValueError("No transcript groups were available for genomic aggregation.")
man = mp.Manager()
collect_list = man.list()
proc_list = []
for df_list in df_list_split:
proc = mp.Process(target=worker, args=(df_list, refflat_df, collect_list))
proc.start()
proc_list.append(proc)
for proc in proc_list:
proc.join()
if proc.exitcode != 0:
raise RuntimeError(f"Genomic pileup worker exited with code {proc.exitcode}.")
collect_list = list(collect_list)
man.shutdown()
if len(collect_list) == 0:
raise ValueError("No valid genomic position found. Please verify the annotation file.")
df = pd.concat(collect_list, ignore_index=True)
gc.collect()
df = df.groupby(["chrom", "strand", "pos"], as_index=False).agg(
{
"kl_div_neg": "sum",
"kl_div_pos": "sum",
"count_all": "sum",
"logsum_1_p_pos": "sum",
"count_pos": "sum",
}
)
epsilon = getattr(args, "epsilon", 1e-30)
digitization = 1000
df["stoichiometry"] = df["kl_div_pos"] / (df["kl_div_neg"] + df["kl_div_pos"] + epsilon)
df["modscore"] = 1 - np.power(
10, df["logsum_1_p_pos"] / df["count_all"] * (1 + np.power(10, 2 * (df["stoichiometry"] - 1)))
)
df["modscore"] = np.digitize(df["modscore"], np.linspace(0, 1, digitization + 1), right=True) / digitization
df["stoichiometry"] = df["stoichiometry"] * (
(np.log10(1 - args.threshold) * df["stoichiometry"]) > (df["logsum_1_p_pos"] / df["count_all"])
)
df = df[["chrom", "strand", "pos", "modscore", "stoichiometry", "count_all", "count_pos"]]
df["pos"] = df["pos"].astype(np.int64)
return df