Module guidemaker.core

Core classes and functions for GuideMaker.

Expand source code
"""Core classes and functions for GuideMaker."""
import os
import re
import yaml
import logging
import gzip
import hashlib
import statistics
import nmslib
import regex
import gc
from typing import List, Dict, TypeVar, Generator
from itertools import product
from Bio import SeqIO
from Bio.SeqUtils import GC
from pybedtools import BedTool
from Bio import Seq
from copy import deepcopy
import pandas as pd
import numpy as np
import altair as alt
from guidemaker import doench_predict
from guidemaker import cfd_score_calculator

logger = logging.getLogger(__name__)
PandasDataFrame = TypeVar('pandas.core.frame.DataFrame')

pd.options.mode.chained_assignment = None

def is_gzip(filename: str):
    try:
        with open(filename, "rb") as f:
            logger.info("check if %s is gzipped" % filename)
            return f.read(2) == b'\x1f\x8b'
    except IOError as e:
        logger.error("Could not open the file %s to determine if it was gzipped" % filename)
        raise e


class PamTarget:

    """
    A Class representing a Protospacer Adjacent Motif (PAM) and targets.

    The classincludes all targets for given PAM as a dataframe,PAM and target attributes,
    and methods to find target and control sequences.

    """

    def __init__(self, pam: str, pam_orientation: str, dtype: str) -> None:
        """
        Pam __init__

        Args:
            pam (str): A DNA string in ambiguous IUPAC format
            pam_orientation (str): [5prime | 3prime ]
                5prime means the order is 5'-[pam][target]-3'
                3prime means the order is 5'-[target][pam]-3'
            dtype (str): hamming or leven

        Returns:
            None
        """
        for letter in pam.upper():
            assert letter in ['A', 'C', 'G', 'T', 'M', 'R', 'W',
                'S', 'Y', 'K', 'V', 'H', 'D', 'B', 'X', 'N']
        assert pam_orientation in ["3prime", "5prime"]
        self.pam: str = pam.upper()
        self.pam_orientation: str = pam_orientation
        self.dtype: str = dtype

    def __str__(self) -> str:
        """
        str __init__

        Args:
            self

        Returns:
            self(str)
        """
        return "A PAM object: {self.pam}".format(self=self)

    def find_targets(self, seq_record_iter: object, target_len: int) -> PandasDataFrame:
        """
        Find all targets on a sequence that match for the PAM on both strand(s)

        Args:
            seq_record_iter (object): A Biopython SeqRecord iterator from SeqIO.parse
            target_len (int): The length of the target sequence

        Returns:
            PandasDataFrame: A pandas dataframe with of matching targets
        """

        def reverse_complement(seq: str) -> str:
            """
            Reverse complement of the PAM sequence

            Args:
                seq (str): A DNA string

            Returns:
                str: A reverse complement of DNA string
            """
            bpseq = Seq.Seq(seq)
            return str(bpseq.reverse_complement())

        def pam2re(pam: str) -> str:
            """
            Convert an IUPAC ambiguous PAM to a Regex expression

            Args:
                pam (str): A DNA string

            Returns:
                str: A Regex expression
            """
            dnaval = {'A': 'A', 'C': 'C', 'G': 'G', 'T': 'T',
                      'M': '[A|C]', 'R': '[A|G]', 'W': '[A|T]', 'S': '[C|G]',
                      'Y': '[C|T]', 'K': '[G|T]', 'V': '[A|C|G]', 'H': '[A|C|T]',
                      'D': '[A|G|T]', 'B': '[C|G|T]', 'X': '[G|A|T|C]', 'N': '[G|A|T|C]'}
            return "".join([dnaval[base] for base in pam])

        #                5prime means the order is 5'-[pam][target]-3'
        #                3prime means the order is 5'-[target][pam]-3'

        def check_target(seq: str, target_len: int) -> bool:
            """
            Check targets for guidelength and DNA bases

            Args:
                seq (str): A DNA string
                target_len(int): Guide length

            Returns:
                bool: True or False
            """
            if len(seq) == target_len and all(letters in ['A', 'T', 'C', 'G'] for letters in seq):  # if not ATCG in the target then ignore those targets
                return True
            return False

        def run_for_5p(pam_pattern: str, dnaseq: str, target_len: int) -> Generator:
            """
            Search for guides with 5prime pam orientation in the forward strand

            Args:
                pam_pattern (str): A DNA string representing PAM
                dnaseq (str): A DNA string representing genome
                target_len (int): Guide length

            Returns:
                (Generator): A generator with target_seq, exact_pam, start, stop, strand, and pam_orientation
            """
            for match_obj in regex.finditer(pattern=pam_pattern, string=dnaseq, overlapped=True):
                target_seq = dnaseq[match_obj.end(): match_obj.end() + target_len]
                target_seq30 = dnaseq[match_obj.start()-3: match_obj.start()+27]
                ## 5'-[guide of 25 nt][exact pam, 3nt][next two]-3'
                if check_target(target_seq, target_len):
                    exact_pam = match_obj.group(0)
                    start = match_obj.end()
                    stop = match_obj.end() + target_len
                    # 5prime =True, 3prime = False
                    pam_orientation = True
                    # forward =True, reverse = False
                    strand = True
                    yield target_seq, exact_pam, start, stop, strand, pam_orientation, target_seq30



        def run_for_3p(pam_pattern, dnaseq, target_len) -> Generator:
            """
            Search for guides with 3prime pam orientation in the reverse strand

            Args:
                pam_pattern (str): A DNA string representing PAM
                dnaseq (str): A DNA string representing genome
                target_len (int): Guide length

            Returns:
                (Generator): A generator with target_seq, exact_pam, start, stop, strand, and pam_orientation
            """
            for match_obj in regex.finditer(pattern=pam_pattern, string=dnaseq, overlapped=True):
                target_seq = dnaseq[match_obj.start() - target_len: match_obj.start()]
                target_seq30 = dnaseq[match_obj.end()-27 :match_obj.end()+3]
                if check_target(target_seq, target_len):
                    exact_pam = match_obj.group(0)
                    start = match_obj.start() - target_len
                    stop = match_obj.start()
                    # 5prime =True, 3prime = False
                    pam_orientation = False
                    # forward =True, reverse = False
                    strand = True
                    yield target_seq, exact_pam, start, stop, strand, pam_orientation, target_seq30

        def run_rev_5p(pam_pattern, dnaseq, target_len) -> Generator:
            """
            Search for guides with 5prime pam orientation in the reverse strand

            Args:
                pam_pattern (str): A DNA string representing PAM
                dnaseq (str): A DNA string representing genome
                target_len (int): Guide length

            Returns:
                (Generator): A generator with target_seq, exact_pam, start, stop, strand, and pam_orientation
            """
            for match_obj in regex.finditer(pattern=pam_pattern, string=dnaseq, overlapped=True):
                target_seq = reverse_complement(
                    dnaseq[match_obj.start() - target_len: match_obj.start()])
                target_seq30 = reverse_complement(
                    dnaseq[match_obj.end()-27:match_obj.end()+3])
                if check_target(target_seq, target_len):
                    exact_pam = reverse_complement(match_obj.group(0))
                    start = match_obj.start() - target_len
                    stop = match_obj.start()
                    # 5prime =True, 3prime = False
                    pam_orientation = True
                    # forward =True, reverse = False
                    strand = False
                    yield target_seq, exact_pam, start, stop, strand, pam_orientation, target_seq30

        def run_rev_3p(pam_pattern, dnaseq, target_len) -> Generator:
            """
            Search for guides with 3prime pam orientation in the reverse strand

            Args:
                pam_pattern (str): A DNA string representing PAM
                dnaseq (str): A DNA string representing genome
                target_len (int): Guide length

            Returns:
                (Generator): A generator with target_seq, exact_pam, start, stop, strand, and pam_orientation
            """
            for match_obj in regex.finditer(pattern=pam_pattern, string=dnaseq, overlapped=True):
                target_seq = reverse_complement(
                    dnaseq[match_obj.end(): match_obj.end() + target_len])
                target_seq30 = reverse_complement(dnaseq[match_obj.start()-3:match_obj.start()+27])
                if check_target(target_seq, target_len):
                    exact_pam = reverse_complement(match_obj.group(0))
                    start = match_obj.end()
                    stop = match_obj.end() + target_len
                    # 5prime =True, 3prime = False
                    pam_orientation = False
                    # forward =True, reverse = False
                    strand = False
                    yield target_seq, exact_pam, start, stop, strand, pam_orientation, target_seq30

        target_list = []
        for record in seq_record_iter:
            record_id = record.id
            seq = str(record.seq)
            if self.pam_orientation == "5prime":
                # forward
                for5p = pd.DataFrame(run_for_5p(pam2re(self.pam), seq, target_len), columns=[
                                     "target", "exact_pam", "start", "stop", "strand", "pam_orientation", "target_seq30"])
                for5p["seqid"] = record_id
                # string to boolean conversion is not straight - as all string were set to Trues- so change the encoding in functions above.
                # https://stackoverflow.com/questions/715417/converting-from-a-string-to-boolean-in-python/715455#715455
                for5p = for5p.astype({"target": 'str', "exact_pam": 'category', "start": 'uint32',
                                     "stop": 'uint32', "strand": 'bool', "pam_orientation": 'bool', "seqid": 'category'})
                target_list.append(for5p)
                # reverse
                rev5p = pd.DataFrame(run_rev_5p(pam2re(reverse_complement(self.pam)), seq, target_len), columns=[
                                     "target", "exact_pam", "start", "stop", "strand", "pam_orientation","target_seq30"])
                rev5p["seqid"] = record_id
                rev5p = rev5p.astype({"target": 'str', "exact_pam": 'category', "start": 'uint32',
                                     "stop": 'uint32', "strand": 'bool', "pam_orientation": 'bool', "seqid": 'category'})
                target_list.append(rev5p)
                # Question? Append directly vs. concat then append? https://ravinpoudel.github.io/AppendVsConcat/
            elif self.pam_orientation == "3prime":
                # forward
                for3p = pd.DataFrame(run_for_3p(pam2re(self.pam), seq, target_len), columns=[
                                     "target", "exact_pam", "start", "stop", "strand", "pam_orientation","target_seq30"])
                for3p["seqid"] = record_id
                for3p = for3p.astype({"target": 'str', "exact_pam": 'category', "start": 'uint32',
                                     "stop": 'uint32', "strand": 'bool', "pam_orientation": 'bool', "seqid": 'category'})
                target_list.append(for3p)
                # reverse
                rev3p = pd.DataFrame(run_rev_3p(pam2re(reverse_complement(self.pam)), seq, target_len), columns=[
                                     "target", "exact_pam", "start", "stop", "strand", "pam_orientation","target_seq30"])
                rev3p["seqid"] = record_id
                rev3p = rev3p.astype({"target": 'str', "exact_pam": 'category', "start": 'uint32',
                                     "stop": 'uint32', "strand": 'bool', "pam_orientation": 'bool', "seqid": 'category'})
                target_list.append(rev3p)
            gc.collect()  # clear memory after each chromosome
        df_targets = pd.concat(target_list, ignore_index=True)
        df_targets = df_targets.assign(seedseq=np.nan, hasrestrictionsite=np.nan, isseedduplicated=np.nan)
        df_targets = df_targets.astype({"seedseq": 'str', "isseedduplicated": 'bool'})
        df_targets = df_targets.assign(dtype=self.dtype)
        df_targets = df_targets.astype({"dtype": 'category'})
        return df_targets


class TargetProcessor:

    """
    A Class representing a set of guide RNA targets.

    The class includes all targets in a dataframe, methods to process target and a dict with edit distances for sequences.

    """

    def __init__(self, targets: PandasDataFrame, lsr: int, editdist: int = 2, knum: int = 2) -> None:
        """
        TargetProcessor __init__

        Args:
            targets (PandasDataFrame): Dataframe with output from class PamTarget
            lsr (int): Length of seed region
            editdist (int): Edit distance
            knum (int): Number of negative controls

        Returns:
            None
        """
        self.targets = targets  # pandas dataframe
        self.lsr: int = lsr  # length of seed region
        self.editdist: int = editdist
        self.knum: int = knum
        self.nmslib_index: object = None
        self.neighbors: dict = {}
        self.closest_neighbor_df: PandasDataFrame = None
        self.ncontrolsearched: int = None
        self.gc_percent: float = None
        self.genomesize: float = None
        self.pam_orientation: bool = targets['pam_orientation'].iat[0]

    def __str__(self) -> None:
        """
        str __init__

        Args:
            self

        Return:
            None
        """
        info = "TargetList: contains a set of {} potential PAM targets".format(len(self.targets))
        return info

    def __len__(self) -> int:
        """
        len __init__ to display length of self.targets

        Args:
            self.targets

        Return:
            (int): Length of the self.targets
        """
        return len(self.targets)

    def check_restriction_enzymes(self, restriction_enzyme_list: list = []) -> None:
        """
        Check for restriction enzymes and its reverse complement within gRNA sequence

        Args:
            restriction_enzyme_list (list): A list with sequence for restriction enzymes

        Returns:
            None
        """
        element_to_exclude = []
        for record in set(restriction_enzyme_list):
            for letter in record.upper():
                assert letter in ['A', 'C', 'G', 'T', 'M', 'R', 'W',
                    'S', 'Y', 'K', 'V', 'H', 'D', 'B', 'X', 'N']
            record_seq = Seq.Seq(record.upper())
            element_to_exclude.append(extend_ambiguous_dna(str(record_seq)))
            element_to_exclude.append(extend_ambiguous_dna(
                str(record_seq.reverse_complement())))  # reverse complement
        element_to_exclude = sum(element_to_exclude, [])  # flatout list of list to list with restriction enzyme sites
        if len(element_to_exclude) > 0:
            self.targets['hasrestrictionsite'] = self.targets['target'].str.contains('|'.join(element_to_exclude))
        else:
            self.targets['hasrestrictionsite'] = False
    
    def _one_hot_encode(self, seq_list: List[object]) -> List[str]:
        """One hot encode Target DNA as a binary string representation for NMSLIB."""
        charmap = {'A': '1 0 0 0', 'C': '0 1 0 0', 'G': '0 0 1 0', 'T': '0 0 0 1'}

        def seq_to_bin(seq):
            charlist = [charmap[letter] for letter in seq]
            return " ".join(charlist)
        return list(map(seq_to_bin, seq_list))

    def find_unique_near_pam(self) -> None:
        """
        Identify unique sequences in the target list

        The function filters a list of Target objects for targets that
        are unique in the region closest to the PAM. The region length is defined
        by the lsr (length of seed region that need to be unique).

        Args:
            lsr (int): Length of seed region that is close to PAM

        Returns:
            None
        """
        def _get_prox(tseq):  # get target sequence as input
            if self.pam_orientation == True:  # 5prime = True 3prime=False
                if self.lsr == 0:
                    return tseq
                else:
                    return tseq[0:self.lsr]
            elif self.pam_orientation == False:  # 5prime = True 3prime=False
                if self.lsr == 0:
                    return tseq
                else:
                    return tseq[(len(tseq) - self.lsr):]
        # https://stackoverflow.com/questions/12555323/adding-new-column-to-existing-dataframe-in-python-pandas
        self.targets = deepcopy(self.targets)
        self.targets.loc[:, 'seedseq'] = self.targets.loc[:, 'target'].apply(_get_prox)
        self.targets.loc[:, 'isseedduplicated'] = self.targets.loc[:, 'seedseq'].duplicated()

    def create_index(self, configpath: str, num_threads=2):
        """
        Create nmslib index

        Converts self.targets to binary one hot encoding and returns NMSLIB index

        Args:
            num_threads (int): cpu threads
            configpath (str): Path to config file which contains hyper parameters for NMSLIB

                M (int): Controls the number of bi-directional links created for each element
                during index construction. Higher values lead to better results at the expense
                of memory consumption. Typical values are 2 -100, but for most datasets a
                range of 12 -48 is suitable. Can’t be smaller than 2.

                efC (int): Size of the dynamic list used during construction. A larger value means
                   a better quality index, but increases build time. Should be an integer value
                   between 1 and the size of the dataset.

        Returns:
            None (but writes NMSLIB index to self)
        """
        with open(configpath) as cf:
            config = yaml.safe_load(cf)

        M, efC, post = config['NMSLIB']['M'], config['NMSLIB']['efc'], config['NMSLIB']['post']

        # index everything but not duplicates
        notduplicated_targets = list(set(self.targets['target'].tolist()))
        #mod_logger.info("unique targets for index: %s" % len(notduplicated_targets))
        if self.targets['dtype'].iat[0] == "hamming":
            bintargets = self._one_hot_encode(notduplicated_targets)
            index_params = {'M': M, 'indexThreadQty': num_threads, 'efConstruction': efC, 'post': post}
            index = nmslib.init(space='bit_hamming',
                            dtype=nmslib.DistType.INT,
                            data_type=nmslib.DataType.OBJECT_AS_STRING,
                            method='hnsw')
            index.addDataPointBatch(bintargets) # notduplicated_targets
            index.createIndex(index_params, print_progress=True)
            self.nmslib_index = index
        else:
            bintargets = notduplicated_targets
            index_params = {'M': M, 'indexThreadQty': num_threads, 'efConstruction': efC, 'post': post}
            index = nmslib.init(space='leven',
                            dtype=nmslib.DistType.INT,
                            data_type=nmslib.DataType.OBJECT_AS_STRING,
                            method='hnsw')
            index.addDataPointBatch(bintargets) # notduplicated_targets
            index.createIndex(index_params, print_progress=True)
            self.nmslib_index = index



    def get_neighbors(self, configpath, num_threads=2) -> None:
        """
        Get nearest neighbors for sequences removing sequences that
        have neighbors less than the Hamming distance threshold.
        For the list of all targets calculate the (knum) nearest neighbors.
        filter out targets with close neighbors and
        Writes a dictionary to self.neighbors:
        self.neighbors[seq]{target: seq_obj, neighbors: {seqs:[s1, s1, ...], dist:[d1, d1,...]}}

        Args:
            configpath (str): Path to a parameter config file
            num_threads (int): Number of threads

        Returns:
            None
        """
        with open(configpath) as cf:
            config = yaml.safe_load(cf)

        ef = config['NMSLIB']['ef']

        # unique_targets = self.targets.loc[self.targets['isseedduplicated']
        #     == False]['target'].tolist()
        # For indexing we need to use all targets -- for checking off-targets. For searching neighbours remove seed duplicated and one wiht restriction site.
        unique_targets = self.targets.loc[(self.targets['isseedduplicated']==False) | (self.targets['hasrestrictionsite']==False)]['target'].tolist()
        if self.targets['dtype'].iat[0] == "hamming":
            unique_bintargets = self._one_hot_encode(unique_targets)  # search unique seed one
        else:
            unique_bintargets = unique_targets

        self.nmslib_index.setQueryTimeParams({'efSearch': ef})
        results_list = self.nmslib_index.knnQueryBatch(unique_bintargets,
                                               k=self.knum, num_threads=num_threads)
        neighbor_dict = {}
        for i, entry in enumerate(results_list):
            queryseq = unique_targets[i]
            hitseqidx = entry[0].tolist()
            editdist = entry[1].tolist()
            if self.targets['dtype'].iat[0] == "hamming":
                # check that the closest sequence meets the min. dist. requirment. We multiply by 2 b/c each 
                # base is one hot encoded. e.g. 1000 vs 0100 = 2 differences
                if editdist[1] >= 2 * self.editdist:
                    neighbors = {"seqs": [self.targets['target'].values[x] for x in hitseqidx],  # reverse this?
                                "dist": [int(x / 2) for x in editdist]} 
                    neighbor_dict[queryseq] = {"target": unique_targets[i],
                                            "neighbors": neighbors}
            else:
               if editdist[1] >= self.editdist: 
                    neighbors = {"seqs": [self.targets['target'].values[x] for x in hitseqidx],  # reverse this?
                                "dist": [int(x) for x in editdist]}
                    neighbor_dict[queryseq] = {"target": unique_targets[i],
                                            "neighbors": neighbors}
        self.neighbors = neighbor_dict

    def export_bed(self) -> object:
        """
        Export the targets in self.neighbors to a bed format file

        Args:
            file (str): the name and location of file to export

        Returns:
            (obj): A Pandas Dataframe in Bed format
        """
        # df = self.targets.copy()
        # why deepcopy - https://stackoverflow.com/questions/55745948/why-doesnt-deepcopy-of-a-pandas-dataframe-affect-memory-usage
        # select only guides that are not duplecated in the seedseq
        df = deepcopy(self.targets.loc[self.targets['isseedduplicated'] == False])
        df = df[["seqid", "start", "stop", "target", "strand"]]
        df.loc[:, 'strand'] = df.loc[:, 'strand'].apply(lambda x: '+' if x == True else '-')
        df.columns = ["chrom", "chromstart", "chromend", "name", "strand"]
        df.sort_values(by=['chrom', 'chromstart'], inplace=True)
        return df

    def get_control_seqs(self, seq_record_iter: object, configpath, length: int = 20, n: int = 10,
                         num_threads: int = 2) -> PandasDataFrame:
        """
        Create random sequences with a specified GC probability and find seqs with the greatest
        distance to any sequence flanking a PAM site

        Args:
            seq_record_iter (Bio.SeqIO): An iterator of fastas
            length (int): Length of the sequence, must match the index
            n (int): Number of sequences to  return
            num_threads (int): Number of processor threads

        Returns:
            (PandasDataFrame): A pandas dataframe with control sequence
        """

        with open(configpath) as cf:
            config = yaml.safe_load(cf)

        MINIMUM_HMDIST = config['CONTROL']['MINIMUM_HMDIST']

        MAX_CONTROL_SEARCH_MULTIPLE = max(config['CONTROL']['CONTROL_SEARCH_MULTIPLE'])

        #  search_mult (int): search this times n sequences
        CONTROL_SEARCH_MULTIPLE = config['CONTROL']['CONTROL_SEARCH_MULTIPLE']

        # get GC percent
        totlen = 0
        gccnt = 0
        for record in seq_record_iter:
            gccnt += GC(record.seq) * len(record)
            totlen += len(record)
        gc = gccnt / (totlen * 100)
        self.gc_percent = gc * 100
        self.genomesize = totlen / (1024 * 1024)

        minimum_hmdist = 0
        sm_count = 0
        search_mult = 0

        try:
            while minimum_hmdist < MINIMUM_HMDIST or search_mult == MAX_CONTROL_SEARCH_MULTIPLE:
                # generate random sequences
                seqs = []
                search_mult = CONTROL_SEARCH_MULTIPLE[sm_count]
                for i in range(n * search_mult):
                    seqs.append("".join(np.random.choice(a=["G", "C", "A", "T"], size=length,
                                                         replace=True, p=[gc / 2, gc / 2, (1 - gc) / 2, (1 - gc) / 2])))
                # one hot encode sequences
                binseq = []
                charmap = {'A': '1 0 0 0', 'C': '0 1 0 0', 'G': '0 0 1 0', 'T': '0 0 0 1'}
                for seq in seqs:
                    if self.targets['dtype'].iat[0] == "hamming":
                        charlist = [charmap[letter] for letter in seq]
                        binseq.append(" ".join(charlist))
                    else: # leven
                        binseq.append(seq)

                rand_seqs = self.nmslib_index.knnQueryBatch(binseq, k=2, num_threads=num_threads)
                distlist = []
                for i in rand_seqs:
                    distlist.append(i[1][0])
                zipped = list(zip(seqs, distlist))
                dist_seqs = sorted(zipped, reverse=True, key=lambda x: x[1])
                sort_seq = [item[0] for item in dist_seqs][0:n]

                #sort_dist
                if self.targets['dtype'].iat[0] == "hamming":
                    sort_dist = [item[1] / 2 for item in dist_seqs][0:n]  ### ? does divide by 2 holds for leven???
                else:
                    sort_dist = [item[1] for item in dist_seqs][0:n]  ### ? does divide by 2 holds for leven???

                minimum_hmdist = int(min(sort_dist))
                sm_count += 1
        except IndexError as e:
            raise e

        total_ncontrolsearched = search_mult * n
        self.ncontrolsearched = total_ncontrolsearched
        randomdf = pd.DataFrame(data={"Sequences": sort_seq, "Hamming distance": sort_dist})

        def create_name(seq):
            return "Cont-" + hashlib.md5(seq.encode()).hexdigest()
        randomdf['name'] = randomdf["Sequences"].apply(create_name)
        randomdf = randomdf[["name", "Sequences", "Hamming distance"]]
        randomdf.head()
        return (min(sort_dist),
                statistics.median(sort_dist),
                randomdf)


class Annotation:

    """
    Annotation class for data and methods on targets and gene annotations.

    """

    def __init__(self, annotation_list: List[str], annotation_type: str, target_bed_df: object) -> None:
        """
        Annotation class for data and methods on targets and gene annotations

        Args:
            annotation_list (List[str]): A list of genbank files from a single genome
            annotation_type (str): "genbank" | "gff"
            target_bed_df (object): A pandas dataframe in Bed format with the
                locations of targets in the genome

        Returns:
            None
        """
        self.annotation_list: List[str] = annotation_list
        self.annotation_type = annotation_type
        self.target_bed_df: object = target_bed_df
        self.genbank_bed_df: object = None
        self.feature_dict: Dict = None
        self.nearby: object = None
        self.filtered_df: object = None
        self.qualifiers: object = None

    def check_annotation_type(self):
        """determine if the file provided by the GFF argument is a GFF or GTF file

            Args: None

            Returns (str): ["gff" | "gtf"]
        """
        def search(f):
            line1 = f.readline()
            gffmatch = re.search("gff-version", line1)
            if gffmatch is not None:
                return "gff"
            gtfmatch = re.search("gtf-version", line1)
            if gtfmatch is not None:
                return "gtf" 
        testfile = self.annotation_list[0]
        if is_gzip(testfile):
            with gzip.open(testfile, 'rt') as f:
                return search(f)
        else:
            with open(testfile, 'r') as f:
                return search(f)

    def get_annotation_features(self, feature_types: List[str] = None) -> None:
        """
        Parse annotation records into pandas DF/Bed format and dict format saving to self

        Args:
            feature_types (List[str]): a list of Genbank feature types to use

        Returns:
            None
        """
        if feature_types is None:
            feature_types = ["CDS"]
        feature_dict = {}
        pddict = dict(chrom=[], chromStart=[], chromEnd=[], name=[], strand=[])
        if self.annotation_type == "genbank":
            for gbfile in self.annotation_list:
                try:
                    if is_gzip(gbfile):
                        f = gzip.open(gbfile, mode='rt')
                    else:
                        f = open(gbfile, mode='r')
                except IOError as e:
                    logger.error("The genbank file %s could not be opened" % gbfile)
                    raise e
                genbank_file = SeqIO.parse(f, "genbank")
                for entry in genbank_file:
                    for record in entry.features:
                        if record.type in feature_types:
                            if record.strand in [1, -1, "+", "-"]:
                                pddict["strand"].append("-" if str(record.strand) in ['-1', '-' ] else "+")
                            featid = hashlib.md5(str(record).encode()).hexdigest()
                            pddict['chrom'].append(entry.id)
                            pddict["chromStart"].append(record.location.start.position)
                            pddict["chromEnd"].append(record.location.end.position)
                            pddict["name"].append(featid)
                            for qualifier_key, qualifier_val in record.qualifiers.items():
                                if not qualifier_key in feature_dict:
                                    feature_dict[qualifier_key] = {}
                                feature_dict[qualifier_key][featid] = qualifier_val
            genbankbed = pd.DataFrame.from_dict(pddict)
            self.genbank_bed_df = genbankbed
            self.feature_dict = feature_dict
            f.close()
        elif self.annotation_type == "gff":
            anno_format = self.check_annotation_type()
            for gff in self.annotation_list:
                bedfile = BedTool(gff)
                for rec in bedfile:
                    if rec[2] in feature_types:
                        pddict["chrom"].append(rec[0])
                        pddict["chromStart"].append(rec[3])
                        pddict["chromEnd"].append(rec[4])
                        pddict["strand"].append(rec[6])
                        featid = hashlib.md5(str(rec).encode()).hexdigest()
                        pddict["name"].append(featid)
                        featlist = rec[8].split(';')
                        for feat in featlist:
                            if feat.isspace():
                                continue
                            if anno_format == 'gtf':
                                fl = re.search('^[^"]*', feat)
                                fv = re.search('"([^"]*)"', feat)
                                feat_key = fl.group(0).strip()
                                feat_val = fv.group(0).strip('"')
                            elif anno_format =='gff':
                                fl = feat.split('=')
                                feat_key = fl[0]
                                feat_val = fl[1]
                            if not feat_key in feature_dict:
                                feature_dict[feat_key] = {}
                            feature_dict[feat_key][featid] = feat_val
            genbankbed = pd.DataFrame.from_dict(pddict)
            self.genbank_bed_df = genbankbed
            self.feature_dict = feature_dict


    def _get_qualifiers(self, configpath, excluded: List[str] = None) -> object:
        """
        Create a dataframe with features and their qualifier values

        Create a dataframe with features and their qualifier values for
        all qualifiers over the minimum threshold (except 'translation'). Add
        to self.qualifiers

        Args:
            min_prop (float): A float between 0-1 representing the fraction of
            features the qualifier must be present in to be included in the dataframe
            excluded (List(str)): A list of genbank qualifiers to exclude, Default ["translation"]

        Returns:
            None
        """
        with open(configpath) as cf:
            config = yaml.safe_load(cf)

        min_prop = config['MINIMUM_PROPORTION']

        if excluded is None:
            excluded = ["translation"]
        final_quals = []
        qual_df = pd.DataFrame(data={"Feature id": []})
        for featkey, quals in self.feature_dict.items():
            if len(quals) / len(self.feature_dict[featkey]) > min_prop:
                final_quals.append(featkey)
        for qualifier in final_quals:
            if qualifier not in excluded:
                featlist = []
                quallist = []
                for feat, qual in self.feature_dict[qualifier].items():
                    featlist.append(feat)
                    if isinstance(qual, list):
                        quallist.append(";".join([str(i) for i in qual]))
                    else:
                        quallist.append(qual)
                tempdf = pd.DataFrame({'Feature id': featlist, qualifier: quallist})
                qual_df = qual_df.merge(tempdf, how="outer", on="Feature id")
        self.qualifiers = qual_df

    def _get_nearby_features(self) -> None:
        """
        Adds downstream information to the given target sequences and mapping information

        Args:
            None

        Returns:
            None

        Note:
            Writes a dataframe of nearby features to self.nearby
        """
        # Import Features and sort by chromosome and then by start position in ascending order
        featurebed = BedTool.from_dataframe(self.genbank_bed_df)
        featurebed = featurebed.sort()
        # import guide files and sort by chromosome and then by start position in ascending order
        mapbed = BedTool.from_dataframe(self.target_bed_df)
        mapbed = mapbed.sort()
        # get feature downstream of target sequence
        downstream = mapbed.closest(featurebed, d=True, fd=True, D="a", t="first")
        # get feature upstream of target sequence
        upstream = mapbed.closest(featurebed, d=True, id=True, D="a", t="first")
        headers = {0: "Accession", 1: "Guide start", 2: "Guide end", 3: "Guide sequence",
                   4: "Guide strand", 5: "Feature Accession", 6: "Feature start", 7: "Feature end", 8: "Feature id",
                   9: "Feature strand", 10: "Feature distance"}
        downstream: pd.DataFrame = downstream.to_dataframe(disable_auto_names=True, header=None)
        downstream['direction'] = 'downstream'
        upstream = upstream.to_dataframe(disable_auto_names=True, header=None)
        upstream['direction'] = 'upstream'
        upstream = upstream.append(downstream)
        self.nearby = upstream.rename(columns=headers)


    def _filter_features(self, before_feat: int = 100, after_feat: int = 200 ) -> None:
        """
        Merge targets with Feature list and filter for guides close enough to interact.

        Args:
            before_feat (int): The maximum distance before the start of a feature measured from closest point to guide
            after_feat (int): The maximum distance after the start codon (into the gene)

        Returns:
            None
        """
        # for guides in the same orientation as the targets ( +/+ or -/-) select guides that are within
        #  before_feat of the gene start
        filtered_df = self.nearby.query(
            '`Guide strand` == `Feature strand` and 0 < `Feature distance` < @before_feat')
        # for guides in the +/+ orientation select guides where the end is within [before_feat] of the gene start
        filtered_df = filtered_df.append(self.nearby.query('`Guide strand` == "+" and `Feature strand` == "+" \
                                             and `Feature distance` == 0 and \
                                             `Guide end` - `Feature start` < @after_feat'))
        # for guides in the -/- orientation select guides where the end is within [before_feat] of the gene start
        filtered_df = filtered_df.append(self.nearby.query('`Guide strand` == "-" and `Feature strand` == "-" \
                                                     and `Feature distance` == 0 \
                                                     and `Feature end` - `Guide start` < @after_feat'))
        # Select guides where target is + and guide is - and the guide is infront of the gene
        filtered_df = filtered_df.append(self.nearby.query('`Guide strand` == "-" and `Feature strand` == "+" and \
                                                     0 <`Feature start` - `Guide end` < @before_feat'))
        # Select guides where target is - and guide is + and the guide is infront of the gene
        filtered_df = filtered_df.append(self.nearby.query('`Guide strand` == "+" and `Feature strand` == "-" and \
                                                     0 <`Guide start` - `Feature end` < @before_feat'))
        # Select guides where target is + and guide is - and the guide is is within [before_feat] of the gene start
        filtered_df = filtered_df.append(self.nearby.query('`Guide strand` == "-" and `Feature strand` == "+" and \
                                                             0 <`Guide end` -`Feature start`  < @after_feat'))
        # Select guides where target is - and guide is + and the guide is is within [before_feat] of the gene start
        filtered_df = filtered_df.append(self.nearby.query('`Guide strand` == "+" and `Feature strand` == "-" and \
                                                             0 <`Feature end` - `Guide start` < @after_feat'))

        self.filtered_df = filtered_df

    def _format_guide_table(self, targetprocessor_object) -> PandasDataFrame:
        """
        Create guide table for output

        Args:
            target- a dataframe with targets from targetclass

        Returns:
            (PandasDataFrame): A formated pandas dataframe
        """
        def gc(seq):
            cnt = 0
            for letter in seq:
                if letter in ["G", "C"]:
                    cnt += 1
            return cnt / len(seq)

        def get_guide_hash(seq):
            return hashlib.md5(seq.encode()).hexdigest()

        def checklen30(seq):
            if len(seq) == 30:
                return True
            return False

        def get_off_target_score(seq):
            dlist = targetprocessor_object.neighbors[seq]["neighbors"]["dist"]
            s = [str(i) for i in dlist]
            return ";".join(s)

        def get_off_target_seqs(seq):
            slist = targetprocessor_object.neighbors[seq]["neighbors"]["seqs"]
            return ";".join(slist)
        pretty_df = deepcopy(self.filtered_df)  # anno class object
        # retrive only guides that are in neighbors keys.
        pretty_df = pretty_df[pretty_df["Guide sequence"].isin(
            list(targetprocessor_object.neighbors.keys()))]
        pretty_df['GC'] = pretty_df['Guide sequence'].apply(gc)
        pretty_df['Guide name'] = pretty_df['Guide sequence'].apply(get_guide_hash)
        pretty_df['Target strand'] = np.where(
            pretty_df['Guide strand'] == pretty_df['Feature strand'], 'coding', 'non-coding')
        pretty_df['Similar guide distances'] = pretty_df['Guide sequence'].apply(
            get_off_target_score)
        pretty_df['Similar guides'] = pretty_df['Guide sequence'].apply(get_off_target_seqs)
        pretty_df = pd.merge(pretty_df, targetprocessor_object.targets, how="left",
         left_on=['Guide sequence', 'Guide start', 'Guide end', 'Accession'],
            right_on=['target', 'start', 'stop', 'seqid'])
        # rename exact_pam to PAM
        pretty_df = pretty_df.rename(columns={"exact_pam": "PAM"})

        pretty_df = pretty_df[['Guide name', 'Guide sequence', 'GC', 'dtype', 'Accession', 'Guide start', 'Guide end',
                    'Guide strand', 'PAM', 'Feature id',
                    'Feature start', 'Feature end', 'Feature strand',
                    'Feature distance', 'Similar guides', 'Similar guide distances','target_seq30']]
        pretty_df = pretty_df.merge(self.qualifiers, how="left", on="Feature id")
        pretty_df = pretty_df.sort_values(by=['Accession', 'Feature start'])
        # to match with the numbering with other tools- offset
        pretty_df['Guide start'] = pretty_df['Guide start'] + 1
        pretty_df['Feature start'] = pretty_df['Feature start'] + 1
        pretty_df=pretty_df.loc[pretty_df['target_seq30'].apply(checklen30)==True]
        self.pretty_df = pretty_df

    def _filterlocus(self, filter_by_locus:list = []) -> PandasDataFrame:
        """
        Create guide table for output for a selected locus_tag

        Args:
            target- a dataframe with targets from targetclass

        Returns:
            (PandasDataFrame): A formated pandas dataframe
        """

        df = deepcopy(self.pretty_df)  # anno class object
        if len (filter_by_locus) > 0:
            df = df[df['locus_tag'].isin(filter_by_locus)]
        return df

    def locuslen(self) -> int:
        """
        Count the number of locus tag in the genebank file

        Args:
            None

        Returns:
            (int): Number of locus tag
        """

        locus_count = len(self.feature_dict['locus_tag' or 'locus'].keys())
        return(locus_count)



class GuideMakerPlot:

    """
    A class with functions to plot guides over genome cooridinates.

    """

    def __init__(self, prettydf: PandasDataFrame, outdir: str) -> None:
        """
        GuideMakerPlot class for visualizing distrubution of gRNA, features, and locus.

        Args:
            prettydf (PandasDataFrame): Final output from GuideMaker
            outdir (str): Output Directory

        Returns:
            None
        """
        self.prettydf = prettydf
        self.accession = list(set(self.prettydf['Accession']))

        def _singleplot(df):
            """
            Returns guidemaker plot describing PAM targets

            Args:
                df(PandasDataFrame): Final output from GuideMaker for a single accession

            Return:
                None
            """
            source = df
            brush = alt.selection(type='interval', encodings=['x'])
            binNum = int(round(source['Feature end'].max() / 200, 0))
            display_info = source.columns.tolist()

            # Feature density
            densityF = alt.Chart(source).transform_density(
            'Feature start',
            as_=['Feature start', 'Feature Density'],
            extent=[1, source['Feature end'].max()],
            bandwidth=binNum,
            ).mark_area(color='black', opacity=0.6).encode(
            x=alt.X('Feature start', axis=alt.Axis(title='Genome Coordinates (bp)', tickCount=5)),
            y='Feature Density:Q',
            ).properties(height=50, width=500)

            # Guide density
            densityG = alt.Chart(source).transform_density(
            'Guide start',
            as_=['Guide start', 'Guide Density'],
            extent=[1, source['Feature end'].max()],
            bandwidth=binNum,
            ).mark_area(color='pink', opacity=0.6).encode(
            x=alt.X('Guide start', axis=alt.Axis(title='Genome Coordinates (bp)', tickCount=5)),
            y='Guide Density:Q',
            ).properties(height=50, width=500).add_selection(brush)

            # locus bar
            locus = alt.Chart(source).mark_bar(cornerRadiusTopLeft=3, cornerRadiusTopRight=3).encode(
            x='count(locus_tag):Q',
            y=alt.Y('locus_tag', axis=alt.Axis(title='Locus')),
            color='PAM:N',
            tooltip=display_info
            ).transform_filter(
            brush
            ).interactive().properties(height=500, width=500)
            guidemakerChart = (densityF & densityG & locus)
            return(guidemakerChart)

        for accession in self.accession:
            df = self.prettydf[self.prettydf['Accession'] == accession]
            accession_plot = _singleplot(df)
            plot_file_name = f"{outdir}/{accession}.html"
            accession_plot.save(plot_file_name)


def get_fastas(filelist, input_format="genbank", tempdir=None):
    """
    Saves a Fasta and from 1 or more Genbank files (may be gzipped)

    Args:
        filelist (str): Genbank file to process

    Returns:
        None
    """
    try:
        fastpath = os.path.join(tempdir, "forward.fasta")
        with open(fastpath, "w") as f1:
            for file in filelist:
                if is_gzip(file):
                    with gzip.open(file, 'rt') as f:
                        records = SeqIO.parse(f, input_format)
                        SeqIO.write(records, f1, "fasta")
                else:
                    with open(file, 'r') as f:
                        records = (SeqIO.parse(f, input_format))
                        SeqIO.write(records, f1, "fasta")
        return fastpath
    except Exception as e:
        logger.exception("An error occurred in the input file %s" % file)
        raise e


def extend_ambiguous_dna(seq: str) -> List[str]:
    """
    Return list of all possible sequences given an ambiguous DNA input

    Args:
        seq(str): A DNA string

    Return:
        List[str]: A list of DNA string with expanded ambiguous DNA values
    """
    ambiguous_dna_values = {
    "A": "A",
    "C": "C",
    "G": "G",
    "T": "T",
    "M": "AC",
    "R": "AG",
    "W": "AT",
    "S": "CG",
    "Y": "CT",
    "K": "GT",
    "V": "ACG",
    "H": "ACT",
    "D": "AGT",
    "B": "CGT",
    "X": "GATC",
    "N": "GATC",
    }
    extend_list = []
    for i in product(*[ambiguous_dna_values[j] for j in seq]):
        extend_list.append("".join(i))
    return extend_list


# add CDF and Doench Azimuth scores

def cfd_score(df):
    def cfd_calculator(knnstrlist, guide, mm_scores):
            knnlist =  knnstrlist.split(';')
            cfd_list=[]
            for item in knnlist:
                result=cfd_score_calculator.calc_cfd(guide, item, mm_scores=mm_scores)
                cfd_list.append(result)
            s = [str(i) for i in cfd_list]
            return s

    def get_max_cfd(cfdlist):
        newlist = [float(x) for x in cfdlist]
        newlist.sort()
        maxcfd = newlist[-1]
        return(maxcfd)
    mm_scores, _ = cfd_score_calculator.get_mm_pam_scores()
    df['CFD Similar Guides'] = df.apply(lambda x: cfd_calculator(x['Similar guides'], x['Guide sequence'], mm_scores=mm_scores), axis=1)
    # Add a column with max CFD score
    df['Max CFD'] = df['CFD Similar Guides'].apply(get_max_cfd)
    return df



def get_doench_efficiency_score(df, pam_orientation, num_threads=1):
    checkset={'AGG','CGG','TGG','GGG'}
    if pam_orientation == "3prime" and set(df.PAM)==checkset:
        doenchscore = doench_predict.predict(np.array(df.target_seq30), num_threads=num_threads)
        df["Efficiency"] = doenchscore
    else:
        logger.warning("NOTE: doench_efficiency_score based on Doench et al. 2016 - can only  be used for NGG PAM).Check PAM sequence and PAM orientation")
        df["Efficiency"] = "Not Available"
    return df.drop('target_seq30', axis=1)

Functions

def cfd_score(df)
Expand source code
def cfd_score(df):
    def cfd_calculator(knnstrlist, guide, mm_scores):
            knnlist =  knnstrlist.split(';')
            cfd_list=[]
            for item in knnlist:
                result=cfd_score_calculator.calc_cfd(guide, item, mm_scores=mm_scores)
                cfd_list.append(result)
            s = [str(i) for i in cfd_list]
            return s

    def get_max_cfd(cfdlist):
        newlist = [float(x) for x in cfdlist]
        newlist.sort()
        maxcfd = newlist[-1]
        return(maxcfd)
    mm_scores, _ = cfd_score_calculator.get_mm_pam_scores()
    df['CFD Similar Guides'] = df.apply(lambda x: cfd_calculator(x['Similar guides'], x['Guide sequence'], mm_scores=mm_scores), axis=1)
    # Add a column with max CFD score
    df['Max CFD'] = df['CFD Similar Guides'].apply(get_max_cfd)
    return df
def extend_ambiguous_dna(seq: str) ‑> List[str]

Return list of all possible sequences given an ambiguous DNA input

Args

seq(str): A DNA string

Return

List[str]: A list of DNA string with expanded ambiguous DNA values

Expand source code
def extend_ambiguous_dna(seq: str) -> List[str]:
    """
    Return list of all possible sequences given an ambiguous DNA input

    Args:
        seq(str): A DNA string

    Return:
        List[str]: A list of DNA string with expanded ambiguous DNA values
    """
    ambiguous_dna_values = {
    "A": "A",
    "C": "C",
    "G": "G",
    "T": "T",
    "M": "AC",
    "R": "AG",
    "W": "AT",
    "S": "CG",
    "Y": "CT",
    "K": "GT",
    "V": "ACG",
    "H": "ACT",
    "D": "AGT",
    "B": "CGT",
    "X": "GATC",
    "N": "GATC",
    }
    extend_list = []
    for i in product(*[ambiguous_dna_values[j] for j in seq]):
        extend_list.append("".join(i))
    return extend_list
def get_doench_efficiency_score(df, pam_orientation, num_threads=1)
Expand source code
def get_doench_efficiency_score(df, pam_orientation, num_threads=1):
    checkset={'AGG','CGG','TGG','GGG'}
    if pam_orientation == "3prime" and set(df.PAM)==checkset:
        doenchscore = doench_predict.predict(np.array(df.target_seq30), num_threads=num_threads)
        df["Efficiency"] = doenchscore
    else:
        logger.warning("NOTE: doench_efficiency_score based on Doench et al. 2016 - can only  be used for NGG PAM).Check PAM sequence and PAM orientation")
        df["Efficiency"] = "Not Available"
    return df.drop('target_seq30', axis=1)
def get_fastas(filelist, input_format='genbank', tempdir=None)

Saves a Fasta and from 1 or more Genbank files (may be gzipped)

Args

filelist : str
Genbank file to process

Returns

None

Expand source code
def get_fastas(filelist, input_format="genbank", tempdir=None):
    """
    Saves a Fasta and from 1 or more Genbank files (may be gzipped)

    Args:
        filelist (str): Genbank file to process

    Returns:
        None
    """
    try:
        fastpath = os.path.join(tempdir, "forward.fasta")
        with open(fastpath, "w") as f1:
            for file in filelist:
                if is_gzip(file):
                    with gzip.open(file, 'rt') as f:
                        records = SeqIO.parse(f, input_format)
                        SeqIO.write(records, f1, "fasta")
                else:
                    with open(file, 'r') as f:
                        records = (SeqIO.parse(f, input_format))
                        SeqIO.write(records, f1, "fasta")
        return fastpath
    except Exception as e:
        logger.exception("An error occurred in the input file %s" % file)
        raise e
def is_gzip(filename: str)
Expand source code
def is_gzip(filename: str):
    try:
        with open(filename, "rb") as f:
            logger.info("check if %s is gzipped" % filename)
            return f.read(2) == b'\x1f\x8b'
    except IOError as e:
        logger.error("Could not open the file %s to determine if it was gzipped" % filename)
        raise e

Classes

class Annotation (annotation_list: List[str], annotation_type: str, target_bed_df: object)

Annotation class for data and methods on targets and gene annotations.

Annotation class for data and methods on targets and gene annotations

Args

annotation_list : List[str]
A list of genbank files from a single genome
annotation_type : str
"genbank" | "gff"
target_bed_df : object
A pandas dataframe in Bed format with the locations of targets in the genome

Returns

None

Expand source code
class Annotation:

    """
    Annotation class for data and methods on targets and gene annotations.

    """

    def __init__(self, annotation_list: List[str], annotation_type: str, target_bed_df: object) -> None:
        """
        Annotation class for data and methods on targets and gene annotations

        Args:
            annotation_list (List[str]): A list of genbank files from a single genome
            annotation_type (str): "genbank" | "gff"
            target_bed_df (object): A pandas dataframe in Bed format with the
                locations of targets in the genome

        Returns:
            None
        """
        self.annotation_list: List[str] = annotation_list
        self.annotation_type = annotation_type
        self.target_bed_df: object = target_bed_df
        self.genbank_bed_df: object = None
        self.feature_dict: Dict = None
        self.nearby: object = None
        self.filtered_df: object = None
        self.qualifiers: object = None

    def check_annotation_type(self):
        """determine if the file provided by the GFF argument is a GFF or GTF file

            Args: None

            Returns (str): ["gff" | "gtf"]
        """
        def search(f):
            line1 = f.readline()
            gffmatch = re.search("gff-version", line1)
            if gffmatch is not None:
                return "gff"
            gtfmatch = re.search("gtf-version", line1)
            if gtfmatch is not None:
                return "gtf" 
        testfile = self.annotation_list[0]
        if is_gzip(testfile):
            with gzip.open(testfile, 'rt') as f:
                return search(f)
        else:
            with open(testfile, 'r') as f:
                return search(f)

    def get_annotation_features(self, feature_types: List[str] = None) -> None:
        """
        Parse annotation records into pandas DF/Bed format and dict format saving to self

        Args:
            feature_types (List[str]): a list of Genbank feature types to use

        Returns:
            None
        """
        if feature_types is None:
            feature_types = ["CDS"]
        feature_dict = {}
        pddict = dict(chrom=[], chromStart=[], chromEnd=[], name=[], strand=[])
        if self.annotation_type == "genbank":
            for gbfile in self.annotation_list:
                try:
                    if is_gzip(gbfile):
                        f = gzip.open(gbfile, mode='rt')
                    else:
                        f = open(gbfile, mode='r')
                except IOError as e:
                    logger.error("The genbank file %s could not be opened" % gbfile)
                    raise e
                genbank_file = SeqIO.parse(f, "genbank")
                for entry in genbank_file:
                    for record in entry.features:
                        if record.type in feature_types:
                            if record.strand in [1, -1, "+", "-"]:
                                pddict["strand"].append("-" if str(record.strand) in ['-1', '-' ] else "+")
                            featid = hashlib.md5(str(record).encode()).hexdigest()
                            pddict['chrom'].append(entry.id)
                            pddict["chromStart"].append(record.location.start.position)
                            pddict["chromEnd"].append(record.location.end.position)
                            pddict["name"].append(featid)
                            for qualifier_key, qualifier_val in record.qualifiers.items():
                                if not qualifier_key in feature_dict:
                                    feature_dict[qualifier_key] = {}
                                feature_dict[qualifier_key][featid] = qualifier_val
            genbankbed = pd.DataFrame.from_dict(pddict)
            self.genbank_bed_df = genbankbed
            self.feature_dict = feature_dict
            f.close()
        elif self.annotation_type == "gff":
            anno_format = self.check_annotation_type()
            for gff in self.annotation_list:
                bedfile = BedTool(gff)
                for rec in bedfile:
                    if rec[2] in feature_types:
                        pddict["chrom"].append(rec[0])
                        pddict["chromStart"].append(rec[3])
                        pddict["chromEnd"].append(rec[4])
                        pddict["strand"].append(rec[6])
                        featid = hashlib.md5(str(rec).encode()).hexdigest()
                        pddict["name"].append(featid)
                        featlist = rec[8].split(';')
                        for feat in featlist:
                            if feat.isspace():
                                continue
                            if anno_format == 'gtf':
                                fl = re.search('^[^"]*', feat)
                                fv = re.search('"([^"]*)"', feat)
                                feat_key = fl.group(0).strip()
                                feat_val = fv.group(0).strip('"')
                            elif anno_format =='gff':
                                fl = feat.split('=')
                                feat_key = fl[0]
                                feat_val = fl[1]
                            if not feat_key in feature_dict:
                                feature_dict[feat_key] = {}
                            feature_dict[feat_key][featid] = feat_val
            genbankbed = pd.DataFrame.from_dict(pddict)
            self.genbank_bed_df = genbankbed
            self.feature_dict = feature_dict


    def _get_qualifiers(self, configpath, excluded: List[str] = None) -> object:
        """
        Create a dataframe with features and their qualifier values

        Create a dataframe with features and their qualifier values for
        all qualifiers over the minimum threshold (except 'translation'). Add
        to self.qualifiers

        Args:
            min_prop (float): A float between 0-1 representing the fraction of
            features the qualifier must be present in to be included in the dataframe
            excluded (List(str)): A list of genbank qualifiers to exclude, Default ["translation"]

        Returns:
            None
        """
        with open(configpath) as cf:
            config = yaml.safe_load(cf)

        min_prop = config['MINIMUM_PROPORTION']

        if excluded is None:
            excluded = ["translation"]
        final_quals = []
        qual_df = pd.DataFrame(data={"Feature id": []})
        for featkey, quals in self.feature_dict.items():
            if len(quals) / len(self.feature_dict[featkey]) > min_prop:
                final_quals.append(featkey)
        for qualifier in final_quals:
            if qualifier not in excluded:
                featlist = []
                quallist = []
                for feat, qual in self.feature_dict[qualifier].items():
                    featlist.append(feat)
                    if isinstance(qual, list):
                        quallist.append(";".join([str(i) for i in qual]))
                    else:
                        quallist.append(qual)
                tempdf = pd.DataFrame({'Feature id': featlist, qualifier: quallist})
                qual_df = qual_df.merge(tempdf, how="outer", on="Feature id")
        self.qualifiers = qual_df

    def _get_nearby_features(self) -> None:
        """
        Adds downstream information to the given target sequences and mapping information

        Args:
            None

        Returns:
            None

        Note:
            Writes a dataframe of nearby features to self.nearby
        """
        # Import Features and sort by chromosome and then by start position in ascending order
        featurebed = BedTool.from_dataframe(self.genbank_bed_df)
        featurebed = featurebed.sort()
        # import guide files and sort by chromosome and then by start position in ascending order
        mapbed = BedTool.from_dataframe(self.target_bed_df)
        mapbed = mapbed.sort()
        # get feature downstream of target sequence
        downstream = mapbed.closest(featurebed, d=True, fd=True, D="a", t="first")
        # get feature upstream of target sequence
        upstream = mapbed.closest(featurebed, d=True, id=True, D="a", t="first")
        headers = {0: "Accession", 1: "Guide start", 2: "Guide end", 3: "Guide sequence",
                   4: "Guide strand", 5: "Feature Accession", 6: "Feature start", 7: "Feature end", 8: "Feature id",
                   9: "Feature strand", 10: "Feature distance"}
        downstream: pd.DataFrame = downstream.to_dataframe(disable_auto_names=True, header=None)
        downstream['direction'] = 'downstream'
        upstream = upstream.to_dataframe(disable_auto_names=True, header=None)
        upstream['direction'] = 'upstream'
        upstream = upstream.append(downstream)
        self.nearby = upstream.rename(columns=headers)


    def _filter_features(self, before_feat: int = 100, after_feat: int = 200 ) -> None:
        """
        Merge targets with Feature list and filter for guides close enough to interact.

        Args:
            before_feat (int): The maximum distance before the start of a feature measured from closest point to guide
            after_feat (int): The maximum distance after the start codon (into the gene)

        Returns:
            None
        """
        # for guides in the same orientation as the targets ( +/+ or -/-) select guides that are within
        #  before_feat of the gene start
        filtered_df = self.nearby.query(
            '`Guide strand` == `Feature strand` and 0 < `Feature distance` < @before_feat')
        # for guides in the +/+ orientation select guides where the end is within [before_feat] of the gene start
        filtered_df = filtered_df.append(self.nearby.query('`Guide strand` == "+" and `Feature strand` == "+" \
                                             and `Feature distance` == 0 and \
                                             `Guide end` - `Feature start` < @after_feat'))
        # for guides in the -/- orientation select guides where the end is within [before_feat] of the gene start
        filtered_df = filtered_df.append(self.nearby.query('`Guide strand` == "-" and `Feature strand` == "-" \
                                                     and `Feature distance` == 0 \
                                                     and `Feature end` - `Guide start` < @after_feat'))
        # Select guides where target is + and guide is - and the guide is infront of the gene
        filtered_df = filtered_df.append(self.nearby.query('`Guide strand` == "-" and `Feature strand` == "+" and \
                                                     0 <`Feature start` - `Guide end` < @before_feat'))
        # Select guides where target is - and guide is + and the guide is infront of the gene
        filtered_df = filtered_df.append(self.nearby.query('`Guide strand` == "+" and `Feature strand` == "-" and \
                                                     0 <`Guide start` - `Feature end` < @before_feat'))
        # Select guides where target is + and guide is - and the guide is is within [before_feat] of the gene start
        filtered_df = filtered_df.append(self.nearby.query('`Guide strand` == "-" and `Feature strand` == "+" and \
                                                             0 <`Guide end` -`Feature start`  < @after_feat'))
        # Select guides where target is - and guide is + and the guide is is within [before_feat] of the gene start
        filtered_df = filtered_df.append(self.nearby.query('`Guide strand` == "+" and `Feature strand` == "-" and \
                                                             0 <`Feature end` - `Guide start` < @after_feat'))

        self.filtered_df = filtered_df

    def _format_guide_table(self, targetprocessor_object) -> PandasDataFrame:
        """
        Create guide table for output

        Args:
            target- a dataframe with targets from targetclass

        Returns:
            (PandasDataFrame): A formated pandas dataframe
        """
        def gc(seq):
            cnt = 0
            for letter in seq:
                if letter in ["G", "C"]:
                    cnt += 1
            return cnt / len(seq)

        def get_guide_hash(seq):
            return hashlib.md5(seq.encode()).hexdigest()

        def checklen30(seq):
            if len(seq) == 30:
                return True
            return False

        def get_off_target_score(seq):
            dlist = targetprocessor_object.neighbors[seq]["neighbors"]["dist"]
            s = [str(i) for i in dlist]
            return ";".join(s)

        def get_off_target_seqs(seq):
            slist = targetprocessor_object.neighbors[seq]["neighbors"]["seqs"]
            return ";".join(slist)
        pretty_df = deepcopy(self.filtered_df)  # anno class object
        # retrive only guides that are in neighbors keys.
        pretty_df = pretty_df[pretty_df["Guide sequence"].isin(
            list(targetprocessor_object.neighbors.keys()))]
        pretty_df['GC'] = pretty_df['Guide sequence'].apply(gc)
        pretty_df['Guide name'] = pretty_df['Guide sequence'].apply(get_guide_hash)
        pretty_df['Target strand'] = np.where(
            pretty_df['Guide strand'] == pretty_df['Feature strand'], 'coding', 'non-coding')
        pretty_df['Similar guide distances'] = pretty_df['Guide sequence'].apply(
            get_off_target_score)
        pretty_df['Similar guides'] = pretty_df['Guide sequence'].apply(get_off_target_seqs)
        pretty_df = pd.merge(pretty_df, targetprocessor_object.targets, how="left",
         left_on=['Guide sequence', 'Guide start', 'Guide end', 'Accession'],
            right_on=['target', 'start', 'stop', 'seqid'])
        # rename exact_pam to PAM
        pretty_df = pretty_df.rename(columns={"exact_pam": "PAM"})

        pretty_df = pretty_df[['Guide name', 'Guide sequence', 'GC', 'dtype', 'Accession', 'Guide start', 'Guide end',
                    'Guide strand', 'PAM', 'Feature id',
                    'Feature start', 'Feature end', 'Feature strand',
                    'Feature distance', 'Similar guides', 'Similar guide distances','target_seq30']]
        pretty_df = pretty_df.merge(self.qualifiers, how="left", on="Feature id")
        pretty_df = pretty_df.sort_values(by=['Accession', 'Feature start'])
        # to match with the numbering with other tools- offset
        pretty_df['Guide start'] = pretty_df['Guide start'] + 1
        pretty_df['Feature start'] = pretty_df['Feature start'] + 1
        pretty_df=pretty_df.loc[pretty_df['target_seq30'].apply(checklen30)==True]
        self.pretty_df = pretty_df

    def _filterlocus(self, filter_by_locus:list = []) -> PandasDataFrame:
        """
        Create guide table for output for a selected locus_tag

        Args:
            target- a dataframe with targets from targetclass

        Returns:
            (PandasDataFrame): A formated pandas dataframe
        """

        df = deepcopy(self.pretty_df)  # anno class object
        if len (filter_by_locus) > 0:
            df = df[df['locus_tag'].isin(filter_by_locus)]
        return df

    def locuslen(self) -> int:
        """
        Count the number of locus tag in the genebank file

        Args:
            None

        Returns:
            (int): Number of locus tag
        """

        locus_count = len(self.feature_dict['locus_tag' or 'locus'].keys())
        return(locus_count)

Methods

def check_annotation_type(self)

determine if the file provided by the GFF argument is a GFF or GTF file

Args: None

Returns (str): ["gff" | "gtf"]

Expand source code
def check_annotation_type(self):
    """determine if the file provided by the GFF argument is a GFF or GTF file

        Args: None

        Returns (str): ["gff" | "gtf"]
    """
    def search(f):
        line1 = f.readline()
        gffmatch = re.search("gff-version", line1)
        if gffmatch is not None:
            return "gff"
        gtfmatch = re.search("gtf-version", line1)
        if gtfmatch is not None:
            return "gtf" 
    testfile = self.annotation_list[0]
    if is_gzip(testfile):
        with gzip.open(testfile, 'rt') as f:
            return search(f)
    else:
        with open(testfile, 'r') as f:
            return search(f)
def get_annotation_features(self, feature_types: List[str] = None) ‑> None

Parse annotation records into pandas DF/Bed format and dict format saving to self

Args

feature_types : List[str]
a list of Genbank feature types to use

Returns

None

Expand source code
def get_annotation_features(self, feature_types: List[str] = None) -> None:
    """
    Parse annotation records into pandas DF/Bed format and dict format saving to self

    Args:
        feature_types (List[str]): a list of Genbank feature types to use

    Returns:
        None
    """
    if feature_types is None:
        feature_types = ["CDS"]
    feature_dict = {}
    pddict = dict(chrom=[], chromStart=[], chromEnd=[], name=[], strand=[])
    if self.annotation_type == "genbank":
        for gbfile in self.annotation_list:
            try:
                if is_gzip(gbfile):
                    f = gzip.open(gbfile, mode='rt')
                else:
                    f = open(gbfile, mode='r')
            except IOError as e:
                logger.error("The genbank file %s could not be opened" % gbfile)
                raise e
            genbank_file = SeqIO.parse(f, "genbank")
            for entry in genbank_file:
                for record in entry.features:
                    if record.type in feature_types:
                        if record.strand in [1, -1, "+", "-"]:
                            pddict["strand"].append("-" if str(record.strand) in ['-1', '-' ] else "+")
                        featid = hashlib.md5(str(record).encode()).hexdigest()
                        pddict['chrom'].append(entry.id)
                        pddict["chromStart"].append(record.location.start.position)
                        pddict["chromEnd"].append(record.location.end.position)
                        pddict["name"].append(featid)
                        for qualifier_key, qualifier_val in record.qualifiers.items():
                            if not qualifier_key in feature_dict:
                                feature_dict[qualifier_key] = {}
                            feature_dict[qualifier_key][featid] = qualifier_val
        genbankbed = pd.DataFrame.from_dict(pddict)
        self.genbank_bed_df = genbankbed
        self.feature_dict = feature_dict
        f.close()
    elif self.annotation_type == "gff":
        anno_format = self.check_annotation_type()
        for gff in self.annotation_list:
            bedfile = BedTool(gff)
            for rec in bedfile:
                if rec[2] in feature_types:
                    pddict["chrom"].append(rec[0])
                    pddict["chromStart"].append(rec[3])
                    pddict["chromEnd"].append(rec[4])
                    pddict["strand"].append(rec[6])
                    featid = hashlib.md5(str(rec).encode()).hexdigest()
                    pddict["name"].append(featid)
                    featlist = rec[8].split(';')
                    for feat in featlist:
                        if feat.isspace():
                            continue
                        if anno_format == 'gtf':
                            fl = re.search('^[^"]*', feat)
                            fv = re.search('"([^"]*)"', feat)
                            feat_key = fl.group(0).strip()
                            feat_val = fv.group(0).strip('"')
                        elif anno_format =='gff':
                            fl = feat.split('=')
                            feat_key = fl[0]
                            feat_val = fl[1]
                        if not feat_key in feature_dict:
                            feature_dict[feat_key] = {}
                        feature_dict[feat_key][featid] = feat_val
        genbankbed = pd.DataFrame.from_dict(pddict)
        self.genbank_bed_df = genbankbed
        self.feature_dict = feature_dict
def locuslen(self) ‑> int

Count the number of locus tag in the genebank file

Args

None

Returns

(int): Number of locus tag

Expand source code
def locuslen(self) -> int:
    """
    Count the number of locus tag in the genebank file

    Args:
        None

    Returns:
        (int): Number of locus tag
    """

    locus_count = len(self.feature_dict['locus_tag' or 'locus'].keys())
    return(locus_count)
class GuideMakerPlot (prettydf: ~pandas.core.frame.DataFrame, outdir: str)

A class with functions to plot guides over genome cooridinates.

GuideMakerPlot class for visualizing distrubution of gRNA, features, and locus.

Args

prettydf : PandasDataFrame
Final output from GuideMaker
outdir : str
Output Directory

Returns

None

Expand source code
class GuideMakerPlot:

    """
    A class with functions to plot guides over genome cooridinates.

    """

    def __init__(self, prettydf: PandasDataFrame, outdir: str) -> None:
        """
        GuideMakerPlot class for visualizing distrubution of gRNA, features, and locus.

        Args:
            prettydf (PandasDataFrame): Final output from GuideMaker
            outdir (str): Output Directory

        Returns:
            None
        """
        self.prettydf = prettydf
        self.accession = list(set(self.prettydf['Accession']))

        def _singleplot(df):
            """
            Returns guidemaker plot describing PAM targets

            Args:
                df(PandasDataFrame): Final output from GuideMaker for a single accession

            Return:
                None
            """
            source = df
            brush = alt.selection(type='interval', encodings=['x'])
            binNum = int(round(source['Feature end'].max() / 200, 0))
            display_info = source.columns.tolist()

            # Feature density
            densityF = alt.Chart(source).transform_density(
            'Feature start',
            as_=['Feature start', 'Feature Density'],
            extent=[1, source['Feature end'].max()],
            bandwidth=binNum,
            ).mark_area(color='black', opacity=0.6).encode(
            x=alt.X('Feature start', axis=alt.Axis(title='Genome Coordinates (bp)', tickCount=5)),
            y='Feature Density:Q',
            ).properties(height=50, width=500)

            # Guide density
            densityG = alt.Chart(source).transform_density(
            'Guide start',
            as_=['Guide start', 'Guide Density'],
            extent=[1, source['Feature end'].max()],
            bandwidth=binNum,
            ).mark_area(color='pink', opacity=0.6).encode(
            x=alt.X('Guide start', axis=alt.Axis(title='Genome Coordinates (bp)', tickCount=5)),
            y='Guide Density:Q',
            ).properties(height=50, width=500).add_selection(brush)

            # locus bar
            locus = alt.Chart(source).mark_bar(cornerRadiusTopLeft=3, cornerRadiusTopRight=3).encode(
            x='count(locus_tag):Q',
            y=alt.Y('locus_tag', axis=alt.Axis(title='Locus')),
            color='PAM:N',
            tooltip=display_info
            ).transform_filter(
            brush
            ).interactive().properties(height=500, width=500)
            guidemakerChart = (densityF & densityG & locus)
            return(guidemakerChart)

        for accession in self.accession:
            df = self.prettydf[self.prettydf['Accession'] == accession]
            accession_plot = _singleplot(df)
            plot_file_name = f"{outdir}/{accession}.html"
            accession_plot.save(plot_file_name)
class PamTarget (pam: str, pam_orientation: str, dtype: str)

A Class representing a Protospacer Adjacent Motif (PAM) and targets.

The classincludes all targets for given PAM as a dataframe,PAM and target attributes, and methods to find target and control sequences.

Pam init

Args

pam : str
A DNA string in ambiguous IUPAC format
pam_orientation : str
[5prime | 3prime ] 5prime means the order is 5'-[pam][target]-3' 3prime means the order is 5'-[target][pam]-3'
dtype : str
hamming or leven

Returns

None

Expand source code
class PamTarget:

    """
    A Class representing a Protospacer Adjacent Motif (PAM) and targets.

    The classincludes all targets for given PAM as a dataframe,PAM and target attributes,
    and methods to find target and control sequences.

    """

    def __init__(self, pam: str, pam_orientation: str, dtype: str) -> None:
        """
        Pam __init__

        Args:
            pam (str): A DNA string in ambiguous IUPAC format
            pam_orientation (str): [5prime | 3prime ]
                5prime means the order is 5'-[pam][target]-3'
                3prime means the order is 5'-[target][pam]-3'
            dtype (str): hamming or leven

        Returns:
            None
        """
        for letter in pam.upper():
            assert letter in ['A', 'C', 'G', 'T', 'M', 'R', 'W',
                'S', 'Y', 'K', 'V', 'H', 'D', 'B', 'X', 'N']
        assert pam_orientation in ["3prime", "5prime"]
        self.pam: str = pam.upper()
        self.pam_orientation: str = pam_orientation
        self.dtype: str = dtype

    def __str__(self) -> str:
        """
        str __init__

        Args:
            self

        Returns:
            self(str)
        """
        return "A PAM object: {self.pam}".format(self=self)

    def find_targets(self, seq_record_iter: object, target_len: int) -> PandasDataFrame:
        """
        Find all targets on a sequence that match for the PAM on both strand(s)

        Args:
            seq_record_iter (object): A Biopython SeqRecord iterator from SeqIO.parse
            target_len (int): The length of the target sequence

        Returns:
            PandasDataFrame: A pandas dataframe with of matching targets
        """

        def reverse_complement(seq: str) -> str:
            """
            Reverse complement of the PAM sequence

            Args:
                seq (str): A DNA string

            Returns:
                str: A reverse complement of DNA string
            """
            bpseq = Seq.Seq(seq)
            return str(bpseq.reverse_complement())

        def pam2re(pam: str) -> str:
            """
            Convert an IUPAC ambiguous PAM to a Regex expression

            Args:
                pam (str): A DNA string

            Returns:
                str: A Regex expression
            """
            dnaval = {'A': 'A', 'C': 'C', 'G': 'G', 'T': 'T',
                      'M': '[A|C]', 'R': '[A|G]', 'W': '[A|T]', 'S': '[C|G]',
                      'Y': '[C|T]', 'K': '[G|T]', 'V': '[A|C|G]', 'H': '[A|C|T]',
                      'D': '[A|G|T]', 'B': '[C|G|T]', 'X': '[G|A|T|C]', 'N': '[G|A|T|C]'}
            return "".join([dnaval[base] for base in pam])

        #                5prime means the order is 5'-[pam][target]-3'
        #                3prime means the order is 5'-[target][pam]-3'

        def check_target(seq: str, target_len: int) -> bool:
            """
            Check targets for guidelength and DNA bases

            Args:
                seq (str): A DNA string
                target_len(int): Guide length

            Returns:
                bool: True or False
            """
            if len(seq) == target_len and all(letters in ['A', 'T', 'C', 'G'] for letters in seq):  # if not ATCG in the target then ignore those targets
                return True
            return False

        def run_for_5p(pam_pattern: str, dnaseq: str, target_len: int) -> Generator:
            """
            Search for guides with 5prime pam orientation in the forward strand

            Args:
                pam_pattern (str): A DNA string representing PAM
                dnaseq (str): A DNA string representing genome
                target_len (int): Guide length

            Returns:
                (Generator): A generator with target_seq, exact_pam, start, stop, strand, and pam_orientation
            """
            for match_obj in regex.finditer(pattern=pam_pattern, string=dnaseq, overlapped=True):
                target_seq = dnaseq[match_obj.end(): match_obj.end() + target_len]
                target_seq30 = dnaseq[match_obj.start()-3: match_obj.start()+27]
                ## 5'-[guide of 25 nt][exact pam, 3nt][next two]-3'
                if check_target(target_seq, target_len):
                    exact_pam = match_obj.group(0)
                    start = match_obj.end()
                    stop = match_obj.end() + target_len
                    # 5prime =True, 3prime = False
                    pam_orientation = True
                    # forward =True, reverse = False
                    strand = True
                    yield target_seq, exact_pam, start, stop, strand, pam_orientation, target_seq30



        def run_for_3p(pam_pattern, dnaseq, target_len) -> Generator:
            """
            Search for guides with 3prime pam orientation in the reverse strand

            Args:
                pam_pattern (str): A DNA string representing PAM
                dnaseq (str): A DNA string representing genome
                target_len (int): Guide length

            Returns:
                (Generator): A generator with target_seq, exact_pam, start, stop, strand, and pam_orientation
            """
            for match_obj in regex.finditer(pattern=pam_pattern, string=dnaseq, overlapped=True):
                target_seq = dnaseq[match_obj.start() - target_len: match_obj.start()]
                target_seq30 = dnaseq[match_obj.end()-27 :match_obj.end()+3]
                if check_target(target_seq, target_len):
                    exact_pam = match_obj.group(0)
                    start = match_obj.start() - target_len
                    stop = match_obj.start()
                    # 5prime =True, 3prime = False
                    pam_orientation = False
                    # forward =True, reverse = False
                    strand = True
                    yield target_seq, exact_pam, start, stop, strand, pam_orientation, target_seq30

        def run_rev_5p(pam_pattern, dnaseq, target_len) -> Generator:
            """
            Search for guides with 5prime pam orientation in the reverse strand

            Args:
                pam_pattern (str): A DNA string representing PAM
                dnaseq (str): A DNA string representing genome
                target_len (int): Guide length

            Returns:
                (Generator): A generator with target_seq, exact_pam, start, stop, strand, and pam_orientation
            """
            for match_obj in regex.finditer(pattern=pam_pattern, string=dnaseq, overlapped=True):
                target_seq = reverse_complement(
                    dnaseq[match_obj.start() - target_len: match_obj.start()])
                target_seq30 = reverse_complement(
                    dnaseq[match_obj.end()-27:match_obj.end()+3])
                if check_target(target_seq, target_len):
                    exact_pam = reverse_complement(match_obj.group(0))
                    start = match_obj.start() - target_len
                    stop = match_obj.start()
                    # 5prime =True, 3prime = False
                    pam_orientation = True
                    # forward =True, reverse = False
                    strand = False
                    yield target_seq, exact_pam, start, stop, strand, pam_orientation, target_seq30

        def run_rev_3p(pam_pattern, dnaseq, target_len) -> Generator:
            """
            Search for guides with 3prime pam orientation in the reverse strand

            Args:
                pam_pattern (str): A DNA string representing PAM
                dnaseq (str): A DNA string representing genome
                target_len (int): Guide length

            Returns:
                (Generator): A generator with target_seq, exact_pam, start, stop, strand, and pam_orientation
            """
            for match_obj in regex.finditer(pattern=pam_pattern, string=dnaseq, overlapped=True):
                target_seq = reverse_complement(
                    dnaseq[match_obj.end(): match_obj.end() + target_len])
                target_seq30 = reverse_complement(dnaseq[match_obj.start()-3:match_obj.start()+27])
                if check_target(target_seq, target_len):
                    exact_pam = reverse_complement(match_obj.group(0))
                    start = match_obj.end()
                    stop = match_obj.end() + target_len
                    # 5prime =True, 3prime = False
                    pam_orientation = False
                    # forward =True, reverse = False
                    strand = False
                    yield target_seq, exact_pam, start, stop, strand, pam_orientation, target_seq30

        target_list = []
        for record in seq_record_iter:
            record_id = record.id
            seq = str(record.seq)
            if self.pam_orientation == "5prime":
                # forward
                for5p = pd.DataFrame(run_for_5p(pam2re(self.pam), seq, target_len), columns=[
                                     "target", "exact_pam", "start", "stop", "strand", "pam_orientation", "target_seq30"])
                for5p["seqid"] = record_id
                # string to boolean conversion is not straight - as all string were set to Trues- so change the encoding in functions above.
                # https://stackoverflow.com/questions/715417/converting-from-a-string-to-boolean-in-python/715455#715455
                for5p = for5p.astype({"target": 'str', "exact_pam": 'category', "start": 'uint32',
                                     "stop": 'uint32', "strand": 'bool', "pam_orientation": 'bool', "seqid": 'category'})
                target_list.append(for5p)
                # reverse
                rev5p = pd.DataFrame(run_rev_5p(pam2re(reverse_complement(self.pam)), seq, target_len), columns=[
                                     "target", "exact_pam", "start", "stop", "strand", "pam_orientation","target_seq30"])
                rev5p["seqid"] = record_id
                rev5p = rev5p.astype({"target": 'str', "exact_pam": 'category', "start": 'uint32',
                                     "stop": 'uint32', "strand": 'bool', "pam_orientation": 'bool', "seqid": 'category'})
                target_list.append(rev5p)
                # Question? Append directly vs. concat then append? https://ravinpoudel.github.io/AppendVsConcat/
            elif self.pam_orientation == "3prime":
                # forward
                for3p = pd.DataFrame(run_for_3p(pam2re(self.pam), seq, target_len), columns=[
                                     "target", "exact_pam", "start", "stop", "strand", "pam_orientation","target_seq30"])
                for3p["seqid"] = record_id
                for3p = for3p.astype({"target": 'str', "exact_pam": 'category', "start": 'uint32',
                                     "stop": 'uint32', "strand": 'bool', "pam_orientation": 'bool', "seqid": 'category'})
                target_list.append(for3p)
                # reverse
                rev3p = pd.DataFrame(run_rev_3p(pam2re(reverse_complement(self.pam)), seq, target_len), columns=[
                                     "target", "exact_pam", "start", "stop", "strand", "pam_orientation","target_seq30"])
                rev3p["seqid"] = record_id
                rev3p = rev3p.astype({"target": 'str', "exact_pam": 'category', "start": 'uint32',
                                     "stop": 'uint32', "strand": 'bool', "pam_orientation": 'bool', "seqid": 'category'})
                target_list.append(rev3p)
            gc.collect()  # clear memory after each chromosome
        df_targets = pd.concat(target_list, ignore_index=True)
        df_targets = df_targets.assign(seedseq=np.nan, hasrestrictionsite=np.nan, isseedduplicated=np.nan)
        df_targets = df_targets.astype({"seedseq": 'str', "isseedduplicated": 'bool'})
        df_targets = df_targets.assign(dtype=self.dtype)
        df_targets = df_targets.astype({"dtype": 'category'})
        return df_targets

Methods

def find_targets(self, seq_record_iter: object, target_len: int) ‑> ~pandas.core.frame.DataFrame

Find all targets on a sequence that match for the PAM on both strand(s)

Args

seq_record_iter : object
A Biopython SeqRecord iterator from SeqIO.parse
target_len : int
The length of the target sequence

Returns

PandasDataFrame
A pandas dataframe with of matching targets
Expand source code
def find_targets(self, seq_record_iter: object, target_len: int) -> PandasDataFrame:
    """
    Find all targets on a sequence that match for the PAM on both strand(s)

    Args:
        seq_record_iter (object): A Biopython SeqRecord iterator from SeqIO.parse
        target_len (int): The length of the target sequence

    Returns:
        PandasDataFrame: A pandas dataframe with of matching targets
    """

    def reverse_complement(seq: str) -> str:
        """
        Reverse complement of the PAM sequence

        Args:
            seq (str): A DNA string

        Returns:
            str: A reverse complement of DNA string
        """
        bpseq = Seq.Seq(seq)
        return str(bpseq.reverse_complement())

    def pam2re(pam: str) -> str:
        """
        Convert an IUPAC ambiguous PAM to a Regex expression

        Args:
            pam (str): A DNA string

        Returns:
            str: A Regex expression
        """
        dnaval = {'A': 'A', 'C': 'C', 'G': 'G', 'T': 'T',
                  'M': '[A|C]', 'R': '[A|G]', 'W': '[A|T]', 'S': '[C|G]',
                  'Y': '[C|T]', 'K': '[G|T]', 'V': '[A|C|G]', 'H': '[A|C|T]',
                  'D': '[A|G|T]', 'B': '[C|G|T]', 'X': '[G|A|T|C]', 'N': '[G|A|T|C]'}
        return "".join([dnaval[base] for base in pam])

    #                5prime means the order is 5'-[pam][target]-3'
    #                3prime means the order is 5'-[target][pam]-3'

    def check_target(seq: str, target_len: int) -> bool:
        """
        Check targets for guidelength and DNA bases

        Args:
            seq (str): A DNA string
            target_len(int): Guide length

        Returns:
            bool: True or False
        """
        if len(seq) == target_len and all(letters in ['A', 'T', 'C', 'G'] for letters in seq):  # if not ATCG in the target then ignore those targets
            return True
        return False

    def run_for_5p(pam_pattern: str, dnaseq: str, target_len: int) -> Generator:
        """
        Search for guides with 5prime pam orientation in the forward strand

        Args:
            pam_pattern (str): A DNA string representing PAM
            dnaseq (str): A DNA string representing genome
            target_len (int): Guide length

        Returns:
            (Generator): A generator with target_seq, exact_pam, start, stop, strand, and pam_orientation
        """
        for match_obj in regex.finditer(pattern=pam_pattern, string=dnaseq, overlapped=True):
            target_seq = dnaseq[match_obj.end(): match_obj.end() + target_len]
            target_seq30 = dnaseq[match_obj.start()-3: match_obj.start()+27]
            ## 5'-[guide of 25 nt][exact pam, 3nt][next two]-3'
            if check_target(target_seq, target_len):
                exact_pam = match_obj.group(0)
                start = match_obj.end()
                stop = match_obj.end() + target_len
                # 5prime =True, 3prime = False
                pam_orientation = True
                # forward =True, reverse = False
                strand = True
                yield target_seq, exact_pam, start, stop, strand, pam_orientation, target_seq30



    def run_for_3p(pam_pattern, dnaseq, target_len) -> Generator:
        """
        Search for guides with 3prime pam orientation in the reverse strand

        Args:
            pam_pattern (str): A DNA string representing PAM
            dnaseq (str): A DNA string representing genome
            target_len (int): Guide length

        Returns:
            (Generator): A generator with target_seq, exact_pam, start, stop, strand, and pam_orientation
        """
        for match_obj in regex.finditer(pattern=pam_pattern, string=dnaseq, overlapped=True):
            target_seq = dnaseq[match_obj.start() - target_len: match_obj.start()]
            target_seq30 = dnaseq[match_obj.end()-27 :match_obj.end()+3]
            if check_target(target_seq, target_len):
                exact_pam = match_obj.group(0)
                start = match_obj.start() - target_len
                stop = match_obj.start()
                # 5prime =True, 3prime = False
                pam_orientation = False
                # forward =True, reverse = False
                strand = True
                yield target_seq, exact_pam, start, stop, strand, pam_orientation, target_seq30

    def run_rev_5p(pam_pattern, dnaseq, target_len) -> Generator:
        """
        Search for guides with 5prime pam orientation in the reverse strand

        Args:
            pam_pattern (str): A DNA string representing PAM
            dnaseq (str): A DNA string representing genome
            target_len (int): Guide length

        Returns:
            (Generator): A generator with target_seq, exact_pam, start, stop, strand, and pam_orientation
        """
        for match_obj in regex.finditer(pattern=pam_pattern, string=dnaseq, overlapped=True):
            target_seq = reverse_complement(
                dnaseq[match_obj.start() - target_len: match_obj.start()])
            target_seq30 = reverse_complement(
                dnaseq[match_obj.end()-27:match_obj.end()+3])
            if check_target(target_seq, target_len):
                exact_pam = reverse_complement(match_obj.group(0))
                start = match_obj.start() - target_len
                stop = match_obj.start()
                # 5prime =True, 3prime = False
                pam_orientation = True
                # forward =True, reverse = False
                strand = False
                yield target_seq, exact_pam, start, stop, strand, pam_orientation, target_seq30

    def run_rev_3p(pam_pattern, dnaseq, target_len) -> Generator:
        """
        Search for guides with 3prime pam orientation in the reverse strand

        Args:
            pam_pattern (str): A DNA string representing PAM
            dnaseq (str): A DNA string representing genome
            target_len (int): Guide length

        Returns:
            (Generator): A generator with target_seq, exact_pam, start, stop, strand, and pam_orientation
        """
        for match_obj in regex.finditer(pattern=pam_pattern, string=dnaseq, overlapped=True):
            target_seq = reverse_complement(
                dnaseq[match_obj.end(): match_obj.end() + target_len])
            target_seq30 = reverse_complement(dnaseq[match_obj.start()-3:match_obj.start()+27])
            if check_target(target_seq, target_len):
                exact_pam = reverse_complement(match_obj.group(0))
                start = match_obj.end()
                stop = match_obj.end() + target_len
                # 5prime =True, 3prime = False
                pam_orientation = False
                # forward =True, reverse = False
                strand = False
                yield target_seq, exact_pam, start, stop, strand, pam_orientation, target_seq30

    target_list = []
    for record in seq_record_iter:
        record_id = record.id
        seq = str(record.seq)
        if self.pam_orientation == "5prime":
            # forward
            for5p = pd.DataFrame(run_for_5p(pam2re(self.pam), seq, target_len), columns=[
                                 "target", "exact_pam", "start", "stop", "strand", "pam_orientation", "target_seq30"])
            for5p["seqid"] = record_id
            # string to boolean conversion is not straight - as all string were set to Trues- so change the encoding in functions above.
            # https://stackoverflow.com/questions/715417/converting-from-a-string-to-boolean-in-python/715455#715455
            for5p = for5p.astype({"target": 'str', "exact_pam": 'category', "start": 'uint32',
                                 "stop": 'uint32', "strand": 'bool', "pam_orientation": 'bool', "seqid": 'category'})
            target_list.append(for5p)
            # reverse
            rev5p = pd.DataFrame(run_rev_5p(pam2re(reverse_complement(self.pam)), seq, target_len), columns=[
                                 "target", "exact_pam", "start", "stop", "strand", "pam_orientation","target_seq30"])
            rev5p["seqid"] = record_id
            rev5p = rev5p.astype({"target": 'str', "exact_pam": 'category', "start": 'uint32',
                                 "stop": 'uint32', "strand": 'bool', "pam_orientation": 'bool', "seqid": 'category'})
            target_list.append(rev5p)
            # Question? Append directly vs. concat then append? https://ravinpoudel.github.io/AppendVsConcat/
        elif self.pam_orientation == "3prime":
            # forward
            for3p = pd.DataFrame(run_for_3p(pam2re(self.pam), seq, target_len), columns=[
                                 "target", "exact_pam", "start", "stop", "strand", "pam_orientation","target_seq30"])
            for3p["seqid"] = record_id
            for3p = for3p.astype({"target": 'str', "exact_pam": 'category', "start": 'uint32',
                                 "stop": 'uint32', "strand": 'bool', "pam_orientation": 'bool', "seqid": 'category'})
            target_list.append(for3p)
            # reverse
            rev3p = pd.DataFrame(run_rev_3p(pam2re(reverse_complement(self.pam)), seq, target_len), columns=[
                                 "target", "exact_pam", "start", "stop", "strand", "pam_orientation","target_seq30"])
            rev3p["seqid"] = record_id
            rev3p = rev3p.astype({"target": 'str', "exact_pam": 'category', "start": 'uint32',
                                 "stop": 'uint32', "strand": 'bool', "pam_orientation": 'bool', "seqid": 'category'})
            target_list.append(rev3p)
        gc.collect()  # clear memory after each chromosome
    df_targets = pd.concat(target_list, ignore_index=True)
    df_targets = df_targets.assign(seedseq=np.nan, hasrestrictionsite=np.nan, isseedduplicated=np.nan)
    df_targets = df_targets.astype({"seedseq": 'str', "isseedduplicated": 'bool'})
    df_targets = df_targets.assign(dtype=self.dtype)
    df_targets = df_targets.astype({"dtype": 'category'})
    return df_targets
class TargetProcessor (targets: ~pandas.core.frame.DataFrame, lsr: int, editdist: int = 2, knum: int = 2)

A Class representing a set of guide RNA targets.

The class includes all targets in a dataframe, methods to process target and a dict with edit distances for sequences.

TargetProcessor init

Args

targets : PandasDataFrame
Dataframe with output from class PamTarget
lsr : int
Length of seed region
editdist : int
Edit distance
knum : int
Number of negative controls

Returns

None

Expand source code
class TargetProcessor:

    """
    A Class representing a set of guide RNA targets.

    The class includes all targets in a dataframe, methods to process target and a dict with edit distances for sequences.

    """

    def __init__(self, targets: PandasDataFrame, lsr: int, editdist: int = 2, knum: int = 2) -> None:
        """
        TargetProcessor __init__

        Args:
            targets (PandasDataFrame): Dataframe with output from class PamTarget
            lsr (int): Length of seed region
            editdist (int): Edit distance
            knum (int): Number of negative controls

        Returns:
            None
        """
        self.targets = targets  # pandas dataframe
        self.lsr: int = lsr  # length of seed region
        self.editdist: int = editdist
        self.knum: int = knum
        self.nmslib_index: object = None
        self.neighbors: dict = {}
        self.closest_neighbor_df: PandasDataFrame = None
        self.ncontrolsearched: int = None
        self.gc_percent: float = None
        self.genomesize: float = None
        self.pam_orientation: bool = targets['pam_orientation'].iat[0]

    def __str__(self) -> None:
        """
        str __init__

        Args:
            self

        Return:
            None
        """
        info = "TargetList: contains a set of {} potential PAM targets".format(len(self.targets))
        return info

    def __len__(self) -> int:
        """
        len __init__ to display length of self.targets

        Args:
            self.targets

        Return:
            (int): Length of the self.targets
        """
        return len(self.targets)

    def check_restriction_enzymes(self, restriction_enzyme_list: list = []) -> None:
        """
        Check for restriction enzymes and its reverse complement within gRNA sequence

        Args:
            restriction_enzyme_list (list): A list with sequence for restriction enzymes

        Returns:
            None
        """
        element_to_exclude = []
        for record in set(restriction_enzyme_list):
            for letter in record.upper():
                assert letter in ['A', 'C', 'G', 'T', 'M', 'R', 'W',
                    'S', 'Y', 'K', 'V', 'H', 'D', 'B', 'X', 'N']
            record_seq = Seq.Seq(record.upper())
            element_to_exclude.append(extend_ambiguous_dna(str(record_seq)))
            element_to_exclude.append(extend_ambiguous_dna(
                str(record_seq.reverse_complement())))  # reverse complement
        element_to_exclude = sum(element_to_exclude, [])  # flatout list of list to list with restriction enzyme sites
        if len(element_to_exclude) > 0:
            self.targets['hasrestrictionsite'] = self.targets['target'].str.contains('|'.join(element_to_exclude))
        else:
            self.targets['hasrestrictionsite'] = False
    
    def _one_hot_encode(self, seq_list: List[object]) -> List[str]:
        """One hot encode Target DNA as a binary string representation for NMSLIB."""
        charmap = {'A': '1 0 0 0', 'C': '0 1 0 0', 'G': '0 0 1 0', 'T': '0 0 0 1'}

        def seq_to_bin(seq):
            charlist = [charmap[letter] for letter in seq]
            return " ".join(charlist)
        return list(map(seq_to_bin, seq_list))

    def find_unique_near_pam(self) -> None:
        """
        Identify unique sequences in the target list

        The function filters a list of Target objects for targets that
        are unique in the region closest to the PAM. The region length is defined
        by the lsr (length of seed region that need to be unique).

        Args:
            lsr (int): Length of seed region that is close to PAM

        Returns:
            None
        """
        def _get_prox(tseq):  # get target sequence as input
            if self.pam_orientation == True:  # 5prime = True 3prime=False
                if self.lsr == 0:
                    return tseq
                else:
                    return tseq[0:self.lsr]
            elif self.pam_orientation == False:  # 5prime = True 3prime=False
                if self.lsr == 0:
                    return tseq
                else:
                    return tseq[(len(tseq) - self.lsr):]
        # https://stackoverflow.com/questions/12555323/adding-new-column-to-existing-dataframe-in-python-pandas
        self.targets = deepcopy(self.targets)
        self.targets.loc[:, 'seedseq'] = self.targets.loc[:, 'target'].apply(_get_prox)
        self.targets.loc[:, 'isseedduplicated'] = self.targets.loc[:, 'seedseq'].duplicated()

    def create_index(self, configpath: str, num_threads=2):
        """
        Create nmslib index

        Converts self.targets to binary one hot encoding and returns NMSLIB index

        Args:
            num_threads (int): cpu threads
            configpath (str): Path to config file which contains hyper parameters for NMSLIB

                M (int): Controls the number of bi-directional links created for each element
                during index construction. Higher values lead to better results at the expense
                of memory consumption. Typical values are 2 -100, but for most datasets a
                range of 12 -48 is suitable. Can’t be smaller than 2.

                efC (int): Size of the dynamic list used during construction. A larger value means
                   a better quality index, but increases build time. Should be an integer value
                   between 1 and the size of the dataset.

        Returns:
            None (but writes NMSLIB index to self)
        """
        with open(configpath) as cf:
            config = yaml.safe_load(cf)

        M, efC, post = config['NMSLIB']['M'], config['NMSLIB']['efc'], config['NMSLIB']['post']

        # index everything but not duplicates
        notduplicated_targets = list(set(self.targets['target'].tolist()))
        #mod_logger.info("unique targets for index: %s" % len(notduplicated_targets))
        if self.targets['dtype'].iat[0] == "hamming":
            bintargets = self._one_hot_encode(notduplicated_targets)
            index_params = {'M': M, 'indexThreadQty': num_threads, 'efConstruction': efC, 'post': post}
            index = nmslib.init(space='bit_hamming',
                            dtype=nmslib.DistType.INT,
                            data_type=nmslib.DataType.OBJECT_AS_STRING,
                            method='hnsw')
            index.addDataPointBatch(bintargets) # notduplicated_targets
            index.createIndex(index_params, print_progress=True)
            self.nmslib_index = index
        else:
            bintargets = notduplicated_targets
            index_params = {'M': M, 'indexThreadQty': num_threads, 'efConstruction': efC, 'post': post}
            index = nmslib.init(space='leven',
                            dtype=nmslib.DistType.INT,
                            data_type=nmslib.DataType.OBJECT_AS_STRING,
                            method='hnsw')
            index.addDataPointBatch(bintargets) # notduplicated_targets
            index.createIndex(index_params, print_progress=True)
            self.nmslib_index = index



    def get_neighbors(self, configpath, num_threads=2) -> None:
        """
        Get nearest neighbors for sequences removing sequences that
        have neighbors less than the Hamming distance threshold.
        For the list of all targets calculate the (knum) nearest neighbors.
        filter out targets with close neighbors and
        Writes a dictionary to self.neighbors:
        self.neighbors[seq]{target: seq_obj, neighbors: {seqs:[s1, s1, ...], dist:[d1, d1,...]}}

        Args:
            configpath (str): Path to a parameter config file
            num_threads (int): Number of threads

        Returns:
            None
        """
        with open(configpath) as cf:
            config = yaml.safe_load(cf)

        ef = config['NMSLIB']['ef']

        # unique_targets = self.targets.loc[self.targets['isseedduplicated']
        #     == False]['target'].tolist()
        # For indexing we need to use all targets -- for checking off-targets. For searching neighbours remove seed duplicated and one wiht restriction site.
        unique_targets = self.targets.loc[(self.targets['isseedduplicated']==False) | (self.targets['hasrestrictionsite']==False)]['target'].tolist()
        if self.targets['dtype'].iat[0] == "hamming":
            unique_bintargets = self._one_hot_encode(unique_targets)  # search unique seed one
        else:
            unique_bintargets = unique_targets

        self.nmslib_index.setQueryTimeParams({'efSearch': ef})
        results_list = self.nmslib_index.knnQueryBatch(unique_bintargets,
                                               k=self.knum, num_threads=num_threads)
        neighbor_dict = {}
        for i, entry in enumerate(results_list):
            queryseq = unique_targets[i]
            hitseqidx = entry[0].tolist()
            editdist = entry[1].tolist()
            if self.targets['dtype'].iat[0] == "hamming":
                # check that the closest sequence meets the min. dist. requirment. We multiply by 2 b/c each 
                # base is one hot encoded. e.g. 1000 vs 0100 = 2 differences
                if editdist[1] >= 2 * self.editdist:
                    neighbors = {"seqs": [self.targets['target'].values[x] for x in hitseqidx],  # reverse this?
                                "dist": [int(x / 2) for x in editdist]} 
                    neighbor_dict[queryseq] = {"target": unique_targets[i],
                                            "neighbors": neighbors}
            else:
               if editdist[1] >= self.editdist: 
                    neighbors = {"seqs": [self.targets['target'].values[x] for x in hitseqidx],  # reverse this?
                                "dist": [int(x) for x in editdist]}
                    neighbor_dict[queryseq] = {"target": unique_targets[i],
                                            "neighbors": neighbors}
        self.neighbors = neighbor_dict

    def export_bed(self) -> object:
        """
        Export the targets in self.neighbors to a bed format file

        Args:
            file (str): the name and location of file to export

        Returns:
            (obj): A Pandas Dataframe in Bed format
        """
        # df = self.targets.copy()
        # why deepcopy - https://stackoverflow.com/questions/55745948/why-doesnt-deepcopy-of-a-pandas-dataframe-affect-memory-usage
        # select only guides that are not duplecated in the seedseq
        df = deepcopy(self.targets.loc[self.targets['isseedduplicated'] == False])
        df = df[["seqid", "start", "stop", "target", "strand"]]
        df.loc[:, 'strand'] = df.loc[:, 'strand'].apply(lambda x: '+' if x == True else '-')
        df.columns = ["chrom", "chromstart", "chromend", "name", "strand"]
        df.sort_values(by=['chrom', 'chromstart'], inplace=True)
        return df

    def get_control_seqs(self, seq_record_iter: object, configpath, length: int = 20, n: int = 10,
                         num_threads: int = 2) -> PandasDataFrame:
        """
        Create random sequences with a specified GC probability and find seqs with the greatest
        distance to any sequence flanking a PAM site

        Args:
            seq_record_iter (Bio.SeqIO): An iterator of fastas
            length (int): Length of the sequence, must match the index
            n (int): Number of sequences to  return
            num_threads (int): Number of processor threads

        Returns:
            (PandasDataFrame): A pandas dataframe with control sequence
        """

        with open(configpath) as cf:
            config = yaml.safe_load(cf)

        MINIMUM_HMDIST = config['CONTROL']['MINIMUM_HMDIST']

        MAX_CONTROL_SEARCH_MULTIPLE = max(config['CONTROL']['CONTROL_SEARCH_MULTIPLE'])

        #  search_mult (int): search this times n sequences
        CONTROL_SEARCH_MULTIPLE = config['CONTROL']['CONTROL_SEARCH_MULTIPLE']

        # get GC percent
        totlen = 0
        gccnt = 0
        for record in seq_record_iter:
            gccnt += GC(record.seq) * len(record)
            totlen += len(record)
        gc = gccnt / (totlen * 100)
        self.gc_percent = gc * 100
        self.genomesize = totlen / (1024 * 1024)

        minimum_hmdist = 0
        sm_count = 0
        search_mult = 0

        try:
            while minimum_hmdist < MINIMUM_HMDIST or search_mult == MAX_CONTROL_SEARCH_MULTIPLE:
                # generate random sequences
                seqs = []
                search_mult = CONTROL_SEARCH_MULTIPLE[sm_count]
                for i in range(n * search_mult):
                    seqs.append("".join(np.random.choice(a=["G", "C", "A", "T"], size=length,
                                                         replace=True, p=[gc / 2, gc / 2, (1 - gc) / 2, (1 - gc) / 2])))
                # one hot encode sequences
                binseq = []
                charmap = {'A': '1 0 0 0', 'C': '0 1 0 0', 'G': '0 0 1 0', 'T': '0 0 0 1'}
                for seq in seqs:
                    if self.targets['dtype'].iat[0] == "hamming":
                        charlist = [charmap[letter] for letter in seq]
                        binseq.append(" ".join(charlist))
                    else: # leven
                        binseq.append(seq)

                rand_seqs = self.nmslib_index.knnQueryBatch(binseq, k=2, num_threads=num_threads)
                distlist = []
                for i in rand_seqs:
                    distlist.append(i[1][0])
                zipped = list(zip(seqs, distlist))
                dist_seqs = sorted(zipped, reverse=True, key=lambda x: x[1])
                sort_seq = [item[0] for item in dist_seqs][0:n]

                #sort_dist
                if self.targets['dtype'].iat[0] == "hamming":
                    sort_dist = [item[1] / 2 for item in dist_seqs][0:n]  ### ? does divide by 2 holds for leven???
                else:
                    sort_dist = [item[1] for item in dist_seqs][0:n]  ### ? does divide by 2 holds for leven???

                minimum_hmdist = int(min(sort_dist))
                sm_count += 1
        except IndexError as e:
            raise e

        total_ncontrolsearched = search_mult * n
        self.ncontrolsearched = total_ncontrolsearched
        randomdf = pd.DataFrame(data={"Sequences": sort_seq, "Hamming distance": sort_dist})

        def create_name(seq):
            return "Cont-" + hashlib.md5(seq.encode()).hexdigest()
        randomdf['name'] = randomdf["Sequences"].apply(create_name)
        randomdf = randomdf[["name", "Sequences", "Hamming distance"]]
        randomdf.head()
        return (min(sort_dist),
                statistics.median(sort_dist),
                randomdf)

Methods

def check_restriction_enzymes(self, restriction_enzyme_list: list = []) ‑> None

Check for restriction enzymes and its reverse complement within gRNA sequence

Args

restriction_enzyme_list : list
A list with sequence for restriction enzymes

Returns

None

Expand source code
def check_restriction_enzymes(self, restriction_enzyme_list: list = []) -> None:
    """
    Check for restriction enzymes and its reverse complement within gRNA sequence

    Args:
        restriction_enzyme_list (list): A list with sequence for restriction enzymes

    Returns:
        None
    """
    element_to_exclude = []
    for record in set(restriction_enzyme_list):
        for letter in record.upper():
            assert letter in ['A', 'C', 'G', 'T', 'M', 'R', 'W',
                'S', 'Y', 'K', 'V', 'H', 'D', 'B', 'X', 'N']
        record_seq = Seq.Seq(record.upper())
        element_to_exclude.append(extend_ambiguous_dna(str(record_seq)))
        element_to_exclude.append(extend_ambiguous_dna(
            str(record_seq.reverse_complement())))  # reverse complement
    element_to_exclude = sum(element_to_exclude, [])  # flatout list of list to list with restriction enzyme sites
    if len(element_to_exclude) > 0:
        self.targets['hasrestrictionsite'] = self.targets['target'].str.contains('|'.join(element_to_exclude))
    else:
        self.targets['hasrestrictionsite'] = False
def create_index(self, configpath: str, num_threads=2)

Create nmslib index

Converts self.targets to binary one hot encoding and returns NMSLIB index

Args

num_threads : int
cpu threads
configpath : str

Path to config file which contains hyper parameters for NMSLIB

M (int): Controls the number of bi-directional links created for each element during index construction. Higher values lead to better results at the expense of memory consumption. Typical values are 2 -100, but for most datasets a range of 12 -48 is suitable. Can’t be smaller than 2.

efC (int): Size of the dynamic list used during construction. A larger value means a better quality index, but increases build time. Should be an integer value between 1 and the size of the dataset.

Returns

None (but writes NMSLIB index to self)

Expand source code
def create_index(self, configpath: str, num_threads=2):
    """
    Create nmslib index

    Converts self.targets to binary one hot encoding and returns NMSLIB index

    Args:
        num_threads (int): cpu threads
        configpath (str): Path to config file which contains hyper parameters for NMSLIB

            M (int): Controls the number of bi-directional links created for each element
            during index construction. Higher values lead to better results at the expense
            of memory consumption. Typical values are 2 -100, but for most datasets a
            range of 12 -48 is suitable. Can’t be smaller than 2.

            efC (int): Size of the dynamic list used during construction. A larger value means
               a better quality index, but increases build time. Should be an integer value
               between 1 and the size of the dataset.

    Returns:
        None (but writes NMSLIB index to self)
    """
    with open(configpath) as cf:
        config = yaml.safe_load(cf)

    M, efC, post = config['NMSLIB']['M'], config['NMSLIB']['efc'], config['NMSLIB']['post']

    # index everything but not duplicates
    notduplicated_targets = list(set(self.targets['target'].tolist()))
    #mod_logger.info("unique targets for index: %s" % len(notduplicated_targets))
    if self.targets['dtype'].iat[0] == "hamming":
        bintargets = self._one_hot_encode(notduplicated_targets)
        index_params = {'M': M, 'indexThreadQty': num_threads, 'efConstruction': efC, 'post': post}
        index = nmslib.init(space='bit_hamming',
                        dtype=nmslib.DistType.INT,
                        data_type=nmslib.DataType.OBJECT_AS_STRING,
                        method='hnsw')
        index.addDataPointBatch(bintargets) # notduplicated_targets
        index.createIndex(index_params, print_progress=True)
        self.nmslib_index = index
    else:
        bintargets = notduplicated_targets
        index_params = {'M': M, 'indexThreadQty': num_threads, 'efConstruction': efC, 'post': post}
        index = nmslib.init(space='leven',
                        dtype=nmslib.DistType.INT,
                        data_type=nmslib.DataType.OBJECT_AS_STRING,
                        method='hnsw')
        index.addDataPointBatch(bintargets) # notduplicated_targets
        index.createIndex(index_params, print_progress=True)
        self.nmslib_index = index
def export_bed(self) ‑> object

Export the targets in self.neighbors to a bed format file

Args

file : str
the name and location of file to export

Returns

(obj): A Pandas Dataframe in Bed format

Expand source code
def export_bed(self) -> object:
    """
    Export the targets in self.neighbors to a bed format file

    Args:
        file (str): the name and location of file to export

    Returns:
        (obj): A Pandas Dataframe in Bed format
    """
    # df = self.targets.copy()
    # why deepcopy - https://stackoverflow.com/questions/55745948/why-doesnt-deepcopy-of-a-pandas-dataframe-affect-memory-usage
    # select only guides that are not duplecated in the seedseq
    df = deepcopy(self.targets.loc[self.targets['isseedduplicated'] == False])
    df = df[["seqid", "start", "stop", "target", "strand"]]
    df.loc[:, 'strand'] = df.loc[:, 'strand'].apply(lambda x: '+' if x == True else '-')
    df.columns = ["chrom", "chromstart", "chromend", "name", "strand"]
    df.sort_values(by=['chrom', 'chromstart'], inplace=True)
    return df
def find_unique_near_pam(self) ‑> None

Identify unique sequences in the target list

The function filters a list of Target objects for targets that are unique in the region closest to the PAM. The region length is defined by the lsr (length of seed region that need to be unique).

Args

lsr : int
Length of seed region that is close to PAM

Returns

None

Expand source code
def find_unique_near_pam(self) -> None:
    """
    Identify unique sequences in the target list

    The function filters a list of Target objects for targets that
    are unique in the region closest to the PAM. The region length is defined
    by the lsr (length of seed region that need to be unique).

    Args:
        lsr (int): Length of seed region that is close to PAM

    Returns:
        None
    """
    def _get_prox(tseq):  # get target sequence as input
        if self.pam_orientation == True:  # 5prime = True 3prime=False
            if self.lsr == 0:
                return tseq
            else:
                return tseq[0:self.lsr]
        elif self.pam_orientation == False:  # 5prime = True 3prime=False
            if self.lsr == 0:
                return tseq
            else:
                return tseq[(len(tseq) - self.lsr):]
    # https://stackoverflow.com/questions/12555323/adding-new-column-to-existing-dataframe-in-python-pandas
    self.targets = deepcopy(self.targets)
    self.targets.loc[:, 'seedseq'] = self.targets.loc[:, 'target'].apply(_get_prox)
    self.targets.loc[:, 'isseedduplicated'] = self.targets.loc[:, 'seedseq'].duplicated()
def get_control_seqs(self, seq_record_iter: object, configpath, length: int = 20, n: int = 10, num_threads: int = 2) ‑> ~pandas.core.frame.DataFrame

Create random sequences with a specified GC probability and find seqs with the greatest distance to any sequence flanking a PAM site

Args

seq_record_iter : Bio.SeqIO
An iterator of fastas
length : int
Length of the sequence, must match the index
n : int
Number of sequences to return
num_threads : int
Number of processor threads

Returns

(PandasDataFrame): A pandas dataframe with control sequence

Expand source code
def get_control_seqs(self, seq_record_iter: object, configpath, length: int = 20, n: int = 10,
                     num_threads: int = 2) -> PandasDataFrame:
    """
    Create random sequences with a specified GC probability and find seqs with the greatest
    distance to any sequence flanking a PAM site

    Args:
        seq_record_iter (Bio.SeqIO): An iterator of fastas
        length (int): Length of the sequence, must match the index
        n (int): Number of sequences to  return
        num_threads (int): Number of processor threads

    Returns:
        (PandasDataFrame): A pandas dataframe with control sequence
    """

    with open(configpath) as cf:
        config = yaml.safe_load(cf)

    MINIMUM_HMDIST = config['CONTROL']['MINIMUM_HMDIST']

    MAX_CONTROL_SEARCH_MULTIPLE = max(config['CONTROL']['CONTROL_SEARCH_MULTIPLE'])

    #  search_mult (int): search this times n sequences
    CONTROL_SEARCH_MULTIPLE = config['CONTROL']['CONTROL_SEARCH_MULTIPLE']

    # get GC percent
    totlen = 0
    gccnt = 0
    for record in seq_record_iter:
        gccnt += GC(record.seq) * len(record)
        totlen += len(record)
    gc = gccnt / (totlen * 100)
    self.gc_percent = gc * 100
    self.genomesize = totlen / (1024 * 1024)

    minimum_hmdist = 0
    sm_count = 0
    search_mult = 0

    try:
        while minimum_hmdist < MINIMUM_HMDIST or search_mult == MAX_CONTROL_SEARCH_MULTIPLE:
            # generate random sequences
            seqs = []
            search_mult = CONTROL_SEARCH_MULTIPLE[sm_count]
            for i in range(n * search_mult):
                seqs.append("".join(np.random.choice(a=["G", "C", "A", "T"], size=length,
                                                     replace=True, p=[gc / 2, gc / 2, (1 - gc) / 2, (1 - gc) / 2])))
            # one hot encode sequences
            binseq = []
            charmap = {'A': '1 0 0 0', 'C': '0 1 0 0', 'G': '0 0 1 0', 'T': '0 0 0 1'}
            for seq in seqs:
                if self.targets['dtype'].iat[0] == "hamming":
                    charlist = [charmap[letter] for letter in seq]
                    binseq.append(" ".join(charlist))
                else: # leven
                    binseq.append(seq)

            rand_seqs = self.nmslib_index.knnQueryBatch(binseq, k=2, num_threads=num_threads)
            distlist = []
            for i in rand_seqs:
                distlist.append(i[1][0])
            zipped = list(zip(seqs, distlist))
            dist_seqs = sorted(zipped, reverse=True, key=lambda x: x[1])
            sort_seq = [item[0] for item in dist_seqs][0:n]

            #sort_dist
            if self.targets['dtype'].iat[0] == "hamming":
                sort_dist = [item[1] / 2 for item in dist_seqs][0:n]  ### ? does divide by 2 holds for leven???
            else:
                sort_dist = [item[1] for item in dist_seqs][0:n]  ### ? does divide by 2 holds for leven???

            minimum_hmdist = int(min(sort_dist))
            sm_count += 1
    except IndexError as e:
        raise e

    total_ncontrolsearched = search_mult * n
    self.ncontrolsearched = total_ncontrolsearched
    randomdf = pd.DataFrame(data={"Sequences": sort_seq, "Hamming distance": sort_dist})

    def create_name(seq):
        return "Cont-" + hashlib.md5(seq.encode()).hexdigest()
    randomdf['name'] = randomdf["Sequences"].apply(create_name)
    randomdf = randomdf[["name", "Sequences", "Hamming distance"]]
    randomdf.head()
    return (min(sort_dist),
            statistics.median(sort_dist),
            randomdf)
def get_neighbors(self, configpath, num_threads=2) ‑> None

Get nearest neighbors for sequences removing sequences that have neighbors less than the Hamming distance threshold. For the list of all targets calculate the (knum) nearest neighbors. filter out targets with close neighbors and Writes a dictionary to self.neighbors: self.neighbors[seq]{target: seq_obj, neighbors: {seqs:[s1, s1, …], dist:[d1, d1,…]}}

Args

configpath : str
Path to a parameter config file
num_threads : int
Number of threads

Returns

None

Expand source code
def get_neighbors(self, configpath, num_threads=2) -> None:
    """
    Get nearest neighbors for sequences removing sequences that
    have neighbors less than the Hamming distance threshold.
    For the list of all targets calculate the (knum) nearest neighbors.
    filter out targets with close neighbors and
    Writes a dictionary to self.neighbors:
    self.neighbors[seq]{target: seq_obj, neighbors: {seqs:[s1, s1, ...], dist:[d1, d1,...]}}

    Args:
        configpath (str): Path to a parameter config file
        num_threads (int): Number of threads

    Returns:
        None
    """
    with open(configpath) as cf:
        config = yaml.safe_load(cf)

    ef = config['NMSLIB']['ef']

    # unique_targets = self.targets.loc[self.targets['isseedduplicated']
    #     == False]['target'].tolist()
    # For indexing we need to use all targets -- for checking off-targets. For searching neighbours remove seed duplicated and one wiht restriction site.
    unique_targets = self.targets.loc[(self.targets['isseedduplicated']==False) | (self.targets['hasrestrictionsite']==False)]['target'].tolist()
    if self.targets['dtype'].iat[0] == "hamming":
        unique_bintargets = self._one_hot_encode(unique_targets)  # search unique seed one
    else:
        unique_bintargets = unique_targets

    self.nmslib_index.setQueryTimeParams({'efSearch': ef})
    results_list = self.nmslib_index.knnQueryBatch(unique_bintargets,
                                           k=self.knum, num_threads=num_threads)
    neighbor_dict = {}
    for i, entry in enumerate(results_list):
        queryseq = unique_targets[i]
        hitseqidx = entry[0].tolist()
        editdist = entry[1].tolist()
        if self.targets['dtype'].iat[0] == "hamming":
            # check that the closest sequence meets the min. dist. requirment. We multiply by 2 b/c each 
            # base is one hot encoded. e.g. 1000 vs 0100 = 2 differences
            if editdist[1] >= 2 * self.editdist:
                neighbors = {"seqs": [self.targets['target'].values[x] for x in hitseqidx],  # reverse this?
                            "dist": [int(x / 2) for x in editdist]} 
                neighbor_dict[queryseq] = {"target": unique_targets[i],
                                        "neighbors": neighbors}
        else:
           if editdist[1] >= self.editdist: 
                neighbors = {"seqs": [self.targets['target'].values[x] for x in hitseqidx],  # reverse this?
                            "dist": [int(x) for x in editdist]}
                neighbor_dict[queryseq] = {"target": unique_targets[i],
                                        "neighbors": neighbors}
    self.neighbors = neighbor_dict