Source code for craw.coverage

###########################################################################
#                                                                         #
# This file is part of Counter RNAseq Window (craw) package.              #
#                                                                         #
#    Authors: Bertrand Neron                                              #
#    Copyright (c) 2017-2019  Institut Pasteur (Paris).                   #
#    see COPYRIGHT file for details.                                      #
#                                                                         #
#    craw is free software: you can redistribute it and/or modify         #
#    it under the terms of the GNU General Public License as published by #
#    the Free Software Foundation, either version 3 of the License, or    #
#    (at your option) any later version.                                  #
#                                                                         #
#    craw is distributed in the hope that it will be useful,              #
#    but WITHOUT ANY WARRANTY; without even the implied warranty of       #
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.                 #
#    See the GNU General Public License for more details.                 #
#                                                                         #
#    You should have received a copy of the GNU General Public License    #
#    along with craw (see COPYING file).                                  #
#    If not, see <http://www.gnu.org/licenses/>.                          #
#                                                                         #
###########################################################################

import logging
import numpy as np
import scipy.interpolate

from pysam import AlignmentFile

from .wig import Genome

_log = logging.getLogger(__name__)


[docs]def sum_coverage_maker(input_data, qual_thr=None): """ This function return a new function :func:`get_sum_coverage` :param input_data: the input either a samfile (see pysam library) or a genome build from a wig file (see wig module) :type input_data: :class:`craw.wig.Genome` or :class:`pysam.AlignmentFile` object :param qual_thr: The quality threshold if input data come from wig this parameter is not used, :type qual_thr: int :return: :func:`get_sum_coverage`, a function which compute the sum of coverage for a gene on each strand between position [start, stop[ This function take 3 parameters: - **annot_entry**: an entry of the annotation file. - **start**: The position to start to compute the coverage(coordinates are 0-based, start position is included). - **stop**: The position to stop to compute the coverage (coordinates are 0-based, stop position is excluded). and - **return**: a tuple with 2 tuple of float or int representing the coverage on strand forward and reverse. :rtype: function """ get_raw_coverage = get_raw_coverage_function(input_data) def get_sum_coverage(annot_entry, start, stop): if start < 0: # if start is negative # when start is compute from large window and reads map at the beginning of the reference # pysam crash see issue #10 # and numpy return empty silce # So we consider that negative positions (which doe not really exists) have coverage of 0 start = 0 covs = get_raw_coverage(input_data, annot_entry, start, stop, qual_thr) return tuple([(sum(cov),) for cov in covs]) return get_sum_coverage
[docs]def resized_coverage_maker(input_data, new_size, qual_thr=None): """ :param input_data: the input either a samfile (see pysam library) or a genome build from a wig file (see wig module) :type input_data: :class:`craw.wig.Genome` or :class:`pysam.AlignmentFile` object :param new_size: the number of values in the coverage vector. :type new_size: postive int :param qual_thr: The quality threshold if input data come from wig this parameter is not used, :type qual_thr: int :return: a function :func:`get_resized_coverage`, a function which compute the coverage for a gene on each strand between position [start, stop[ This function take 3 parameters: the coverage values are generate by linear interpolation from raw values between [start, stop[ using the scipy. see https://docs.scipy.org/doc/scipy-0.19.0/reference/generated/scipy.interpolate.interp1d.html - **annot_entry**: an entry of the annotation file. - **start**: The position to start to compute the coverage(coordinates are 0-based, start position is included). - **stop**: The position to stop to compute the coverage (coordinates are 0-based, stop position is excluded). and - **return**: a tuple with 2 tuple of float or int representing the coverage on strand forward and reverse. :rtype: function """ get_raw_coverage = get_raw_coverage_function(input_data) def get_resized_coverage(annot_entry, start, stop): """ :param annot_entry: an entry of the annotation file. :type annot_entry: :class:`annotation.Entry` object. :param start: The position to start to compute the coverage(coordinates are 0-based, start position is included). :type start: int :param stop: The position to stop to compute the coverage (coordinates are 0-based, stop position is excluded). :type stop: int :return: a new serie with new_size length :rtype: list of float instance. """ if start < 0: # if start is negative # when start is compute from large window and reads map at the beginning of the reference # pysam crash see issue #10 # and numpy return empty silce # So we ommit negative value generate new values using real positions start = 0 raw_forward_cov, raw_reverse_cov = get_raw_coverage(input_data, annot_entry, start, stop, qual_thr) coverages = [] serie_len = len(raw_forward_cov) for serie in raw_forward_cov, raw_reverse_cov: f = scipy.interpolate.interp1d(range(serie_len), serie) coverages.append(f(np.linspace(0, serie_len - 1, num=new_size, endpoint=True)).tolist()) return tuple(coverages) return get_resized_coverage
[docs]def padded_coverage_maker(input_data, max_left, max_right, qual_thr=None): """ :param input_data: the input either a samfile (see pysam library) or a genome build from a wig file (see wig module) :type input_data: :class:`wig.Genome` or :class:`pysam.AlignmentFile` object :param max_left: The highest number of base before the reference position to take in account. :type max_left: int :param max_right: The highest number of base after the reference position to take in account. :type max_right: int :param qual_thr: The quality threshold if input data come from wig this parameter is not used, :type qual_thr: int :return: a function :func:`get_padded_coverage`, a function which compute the coverage for a gene on each strand between position *[start, stop[* The coverage values are centered on the annot_entry.ref position, the matrix is padded by ``None`` value.:: [.......[ coverage ref.pos ] .....] [....[covergae ref.pos ] .....] [............[ cov ref.pos ]] This function take 3 parameters: - **annot_entry**: an entry of the annotation file. - **start**: The position to start to compute the coverage(coordinates are 0-based, start position is included). - **stop**: The position to stop to compute the coverage (coordinates are 0-based, stop position is excluded). and - **return**: a tuple with 2 tuple of float or int representing the coverage on strand forward and reverse. :rtype: function """ get_raw_coverage = get_raw_coverage_function(input_data) def get_padded_coverage(annot_entry, start, stop): """ :param annot_entry: an entry of the annotation file. :type annot_entry: :class:`annotation.Entry` object. :param start: The position to start to compute the coverage(coordinates are 0-based, start position is included). :type start: int :param stop: The position to stop to compute the coverage (coordinates are 0-based, stop position is excluded). :type stop: int :return: the coverage for forward and reverse strand padded with ``None``. :rtype: tuple of 2 list containing int or float (forward coverage, reverse coverage) """ real_start = start pad_neg_start = [] if start < 0: # if start is negative # when start is compute from large window and reads map at the beginning of the reference # pysam crash see issue #10 # so we ask coverage from 0 and pad with None value for negative positions start = 0 pad_neg_start = [None] * abs(real_start) raw_forward_cov, raw_reverse_cov = get_raw_coverage(input_data, annot_entry, start, stop, qual_thr) if annot_entry.strand == '+': # -1 because the ref must not be take in account in pad # start and stop are 0 based (see docstring) pad_left = [None] * (max_left - (annot_entry.ref - 1 - real_start)) # but stop is excluded in get_bam and included in annot_entry # so it (stop -1) - ( ref -1) => stop -1 pad_right = [None] * (max_right - (stop - annot_entry.ref)) pad_left += pad_neg_start else: pad_left = [None] * (max_left - (stop - annot_entry.ref)) pad_right = [None] * (max_right - (annot_entry.ref - 1 - real_start)) pad_right += pad_neg_start forward_cov = pad_left + raw_forward_cov + pad_right reverse_cov = pad_left + raw_reverse_cov + pad_right return forward_cov, reverse_cov return get_padded_coverage
[docs]def get_raw_coverage_function(input): """ :param input: the input either a samfile (see pysam library) or a genome build from a wig file (see wig module) :type input: :class:`wig.Genome` or :class:`pysam.calignmentfile.AlignmentFile` object :return: get_wig_coverage or get_bam_coverage according the type of input :rtype: function :raise RuntimeError: when input is not instance of :class:`pysam.calignmentfile.AlignmentFile` or :class:`wig.Genome` """ if isinstance(input, AlignmentFile): return get_raw_bam_coverage elif isinstance(input, Genome): return get_raw_wig_coverage else: raise RuntimeError("get_coverage support only 'wig.Genome' or " "'pysam.calignmentfile.AlignmentFile' as Input, not {}".format(input.__class__.__name__))
[docs]def get_raw_wig_coverage(genome, annot_entry, start, stop, qual_thr=None): """ :param genome: The genome which store all coverages. :type genome: :class:`craw.wig.Genome` object :param annot_entry: an entry of the annotation file :type annot_entry: :class:`annotation.Entry` object :param start: The position to start to compute the coverage(coordinates are 0-based, start position is included). :type start: int :param stop: The position to stop to compute the coverage (coordinates are 0-based, stop position is excluded). :type stop: int :param qual_thr: this parameter is not used, It's here to have the same api as get_bam_coverage. :type qual_thr: None :return: the coverage (all bases) :rtype: tuple of 2 list containing int or float """ chromosome = genome[annot_entry.chromosome] forward_cov, reverse_cov = chromosome[start:stop] if annot_entry.strand == '-': forward_cov.reverse() reverse_cov.reverse() return forward_cov, reverse_cov
[docs]def get_raw_bam_coverage(sam_file, annot_entry, start, stop, qual_thr=15): """ Compute the coverage for a region position by position on each strand :param sam_file: the samfile openend with pysam :type sam_file: :class:`pysam.AlignmentFile` object. :param annot_entry: an entry of the annotation file :type annot_entry: :class:`annotation.Entry` object :param start: The position to start to compute the coverage(coordinates are 0-based, start position is included). :type start: positive int :param stop: The position to start to compute the coverage (coordinates are 0-based, stop position is excluded). :type stop: positive int :param qual_thr: The quality threshold :type qual_thr: int :return: the coverage (all bases) :rtype: tuple of 2 list containing int """ def on_forward(al_seg): """ :param al_seg: a pysam aligned segment (the object used by pysam to represent an aligned read) :type al_seg: :class:`pysam.AlignedSegment` :return: True if read is mapped to forward strand :rtype: boolean """ return not al_seg.is_reverse def on_reverse(al_seg): """ :param al_seg: a pysam aligned segment (the object used by pysam to represent an aligned read) :type al_seg: :class:`pysam.AlignedSegment` :return: True if read is mapped to reverse strand. :rtype: boolean """ return al_seg.is_reverse def coverage_one_strand(sam_file, chromosome, start, stop, qual, strand): """ Compute the coverage for each position between start and stop on the chromosome on the strand. :param sam_file: the sam alignment to use :type sam_file: a :class:`pysam.AlignmentFile` object :param chromosome: the name of the chromosome :type chromosome: basestring :param start: The position to start to compute the coverage(coordinates are 0-based, start position is included). :type start: int :param stop:The position to start to compute the coverage (coordinates are 0-based, stop position is excluded). :type stop: int :param qual: The quality threshold. :type qual: int :param strand: the strand on which the read match :type strand: string :return: the coverage on forward then on reverse strand. The coverage is the sum of all kind bases mapped for each position :rtype: tuple of 2 list containing int """ call_back = on_forward if strand == '+' else on_reverse try: coverage = sam_file.count_coverage(chromosome, start=start, stop=stop, quality_threshold=qual, read_callback=call_back) except SystemError as err: import sys _log.critical("ERROR when call count_coverage with following arguments\n" "reference= {chromosome} \n" "start={start}\n" "end={stop}\n" "quality_threshold={qual}\n" "read_callback={call_back}".format(chromosome=chromosome, start=start, stop=stop, qual=qual, call_back=call_back) ) raise err coverage = [array.tolist() for array in coverage] window_cov = [] for cov_A, cov_T, cov_C, cov_G in zip(*coverage): window_cov.append(cov_A + cov_T + cov_C + cov_G) return window_cov forward_cov = coverage_one_strand(sam_file, annot_entry.chromosome, start, stop, qual_thr, '+' ) reverse_cov = coverage_one_strand(sam_file, annot_entry.chromosome, start, stop, qual_thr, '-' ) if annot_entry.strand == '-': forward_cov.reverse() reverse_cov.reverse() return forward_cov, reverse_cov