import numpy import sys import random import subprocess import re import decimal import math import os import shutil import time import types import argparse from argparse import RawTextHelpFormatter ############################## # ARGPARSE TYPES AND FUNCTIONS ############################## def parseGC(string): """ Argparse Type Function parses a string of "FLOAT:INT-INT", where ":INT-INT" is set to be optional. Returns a single value or a triple (FLAOT, INT, INT). """ m = re.match('^\s*(\d+\.\d+):?(\d+)?\s*?-?\s*?(\d+)?\s*?$', string) if m is None: print "'" + string + "' is not a valid input for tGC Expected forms like '0-5'." exit(1) tGC = float(m.group(1)) if m.group(2) and m.group(3): start = int(m.group(2)) end = int(m.group(3)) return (tGC, start, end) else: return (tGC) def convert_arg_line_to_args(arg_line): for arg in arg_line.split(): if not arg.strip(): continue yield arg ##################### # SUPPORT FUNCTIONS ##################### def isfloat(value): try: float(value) return True except ValueError: return False def print2file(f, i, m): """ print content i to file f in mode m """ line = str(i) if m == "a": call = "echo \"" + line + "\" >> " + f elif m == "w": call = "echo \"" + line + "\" > " + f os.system(call) def getUsedTime(start_time): """ Return the used time between -start time- and now. """ end_time = time.time() return end_time - start_time def substr(x, string, subst): """ Classical substring function """ s1 = string[:x-1] s2 = string[x-1:x] s3 = string[x:] #s2 = s[x+len(string)-x-1:] return s1 + subst + s3 #################################################### # STRUCTURE AND SEQUENCE INTEGRITY CHECK FUNCTIONS #################################################### def checkSequenceConstraint(SC): """ Checks the Sequence constraint for illegal nucleotide characters """ for c in SC: if c not in "ACGURYSWKMBDHVNacgu": print "\tIllegal Character in the constraint sequence!" print "\tPlease use the IUPAC nomenclature for defining nucleotides in the constraint sequence!" print "\tA Adenine" print "\tC Cytosine" print "\tG Guanine" print "\tT/U Thymine/Uracil" print "\tR A or G" print "\tY C or T/U" print "\tS G or C" print "\tW A or T/U" print "\tK G or T/U" print "\tM A or C" print "\tB C or G or T/U" print "\tD A or G or T/U" print "\tH A or C or T/U" print "\tV A or C or G" print "\tN any base" print "\tOr their lowercase soft constraint variants of ACGU!" exit(1) def checkSimilarLength(s, SC): """ Compares sequence and structure constraint length """ if len(s) != len(SC): print "Sequence and Structure-Constraint provide two different lengths..." print "Structure Constraint length : " + str(len(s)) print "Sequence Constraint length : " + str(len(SC)) exit(1) def isStructure(s): """ Checks if the structure constraint only contains "(", ")", and "." and legal fuzzy structure constraint characters. """ returnvalue = 1 for a in range(0,len(s)): if s[a] not in ".()[]{}<>": if s[a] not in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz": returnvalue = 0 return returnvalue def isBalanced(s): """ Check if the structure s is of a balanced nature """ balance = 1 for bracket in ["()", "[]", "{}", "<>"]: counter = 0 for a in xrange(len(s)): if s[a] in bracket[0]: counter += 1 elif s[a] in bracket[1]: counter -= 1 if counter != 0: balance = 0 return balance def fulfillsHairpinRule(s): """ CHECKING FOR THE 3 nt LOOP INTERSPACE for all kind of basepairs, even wihtin the pdeudoknots """ fulfillsRules = 1 for bracket in ["()", "[]", "{}", "<>"]: last_opening_char = 0 check = 0 for a in xrange(len(s)): if s[a] == bracket[0]: last_opening_char = a check = 1 elif s[a] == bracket[1] and check == 1: check = 0 if a - last_opening_char < 4: return 0 return 1 def isValidStructure(s): """ Checks, if the structure s is a valid structure """ Structure = isStructure(s) Balanced = isBalanced(s) HairpinRule = fulfillsHairpinRule(s) if not Structure == 1 or not Balanced == 1 or not HairpinRule == 1: print "Structure Integrity Issue!" exit(1) def isStructureCompatible(lp1, lp2 ,bp): """ Checks, if the region within lp1 and lp2 is structurally balanced """ x = lp1 + 1 while (x < lp2): if (bp[x] <= lp1 or bp[x] > lp2): return False if x == bp[x]: x += 1 else: x = bp[x] + 1 return x == lp2 def checkConstaintCompatibility(basepairstack, sequenceconstraint, IUPAC_compatibles): """ Checks if the constraints are compatible to each other """ for id1 in basepairstack: # key = (constraint , (pos, constraint))) constr1 = basepairstack[id1][0] id2 = basepairstack[id1][1][0] constr2 = basepairstack[id1][1][1] if id1 != id2 and not isCompatible(constr1, constr2, IUPAC_compatibles): print "Contraint Compatibility Issue:" print "Nucleotide constraint " + str(constr1) + " at position " + str(id1) + " is not compatible with nucleotide constraint " + str(constr2) + " at position " + str(id2) + "\n" exit(1) def transform(seq): """ Transforms "U" to "T" for the processing is done on DNA alphabet """ S = "" for s in seq: if s == "T": S += "U" elif s == "t": S += "u" else: S += s return S def complementBase(c): """ Returns the complement RNA character of c (without GU base pairs) """ retChar = "" if c == "A" : retChar = "U" elif c == "U": retChar = "A" elif c == "C": retChar = "G" elif c == "G": retChar = "C" return retChar ############################################ # IUPAC LOADINGS AND NUCLEOTIDE MANAGEMENT ############################################ def loadIUPACcompatibilities(IUPAC, useGU): """ Generating a hash containing all compatibilities of all IUPAC RNA NUCLEOTIDES """ compatible = {} for nuc1 in IUPAC: # ITERATING OVER THE DIFFERENT GROUPS OF IUPAC CODE sn1 = list(IUPAC[nuc1]) for nuc2 in IUPAC: # ITERATING OVER THE DIFFERENT GROUPS OF IUPAC CODE sn2 = list(IUPAC[nuc2]) compatib = 0 for c1 in sn1: # ITERATING OVER THE SINGLE NUCLEOTIDES WITHIN THE RESPECTIVE IUPAC CODE: for c2 in sn2: # ITERATING OVER THE SINGLE NUCLEOTIDES WITHIN THE RESPECTIVE IUPAC CODE: # CHECKING THEIR COMPATIBILITY if useGU == True: if (c1 == "A" and c2 == "U") or (c1 == "U" and c2 == "A") or (c1 == "C" and c2 == "G") or (c1 == "G" and c2 == "C") or (c1 == "G" and c2 == "U") or (c1 == "U" and c2 == "G"): compatib = 1 else: if (c1 == "A" and c2 == "U") or (c1 == "U" and c2 == "A") or (c1 == "C" and c2 == "G") or (c1 == "G" and c2 == "C"): compatib = 1 compatible[nuc1 + "_" + nuc2] = compatib # SAVING THE RESPECTIVE GROUP COMPATIBILITY, REVERSE SAVING IS NOT REQUIRED, SINCE ITERATING OVER ALL AGAINST ALL return compatible def isCompatibleToSet(c1, c2, IUPAC_compatibles): """ Checks compatibility of c1 wihtin c2 """ compatible = True for setmember in c2: if isCompatible(c1, setmember, IUPAC_compatibles) == False: return False return compatible def isCompatible(c1, c2, IUPAC_compatibles): """ Checks compatibility between character c1 and c2 """ if IUPAC_compatibles[c1.upper() + "_" + c2.upper()] == 1: return True else: return False ####################################### # STRUCTURE AND DICTIONARY MANAGEMENT ####################################### def getLP(BPSTACK): """ Retreives valid lonley base pairs from a base pair stack """ #20 ('N', (>BLOCK<, 'N')) # getting single base pairs stack = {} LP = {} if type(BPSTACK[random.choice(BPSTACK.keys())]) == types.TupleType: for i in BPSTACK.keys(): #if str(BPSTACK[i][1][0]) not in "ABCDEFGHIJKLMNOPQRSTUVWXYZ": stack[i] = int(BPSTACK[i][1][0]) #print i , BPSTACK[i][1][0] else: for i in BPSTACK.keys(): #if str(BPSTACK[i]) not in "ABCDEFGHIJKLMNOPQRSTUVWXYZ": stack[i] = BPSTACK[i] # removing redundant base pair indices for i in stack.keys(): if i >= stack[i]: del stack[i] # actual checking for single lonley base pairs for i in stack.keys(): if not (i-1 in stack and stack[i-1] == stack[i] + 1) and not (i+1 in stack and stack[i+1] == stack[i] - 1): LP[i] = stack[i] ##actual removal of 2er lonley base pairs for i in stack.keys(): if not (i-1 in stack and stack[i-1] == stack[i] + 1) and (i+1 in stack and stack[i+1] == stack[i] - 1) and not (i+2 in stack and stack[i+2] == stack[i] - 2): LP[i] = stack[i] LP[i+1] = stack[i+1] return LP def getBPStack(s, seq): """ Returns a dictionary of the corresponding basepairs of the structure s and the sequence constraint seq. """ tmp_stack = {"()":[], "{}":[], "[]":[], "<>":[]} bpstack = {} for i in xrange(len(s)): # REGULAR SECONDARY STRUCTURE DETECTION if s[i] in "(){}[]<>": no = 0 ### opening if s[i] in "([{<": if s[i] == "(": tmp_stack["()"].append((i, seq[i])) elif s[i] == "[": tmp_stack["[]"].append((i, seq[i])) elif s[i] == "{": tmp_stack["{}"].append((i, seq[i])) elif s[i] == "<": tmp_stack["<>"].append((i, seq[i])) #closing elif s[i] in ")]}>": if s[i] == ")": no, constr = tmp_stack["()"].pop() elif s[i] == "]": no, constr = tmp_stack["[]"].pop() elif s[i] == "}": no, constr = tmp_stack["{}"].pop() elif s[i] == ">": no, constr = tmp_stack["<>"].pop() bpstack[no] = (constr, (i, seq[i])) bpstack[i] = (seq[i] ,(no, constr)) elif s[i] == ".": bpstack[i] = (seq[i], (i, seq[i])) elif s[i] in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz": bpstack[i] = (seq[i], (i, seq[i])) return (bpstack, getLP(bpstack)) def getbpStack(s): """ Returns a dictionary of the corresponding basepairs of the structure s and the sequence constraint seq. """ tmp_stack = {"()":[], "{}":[], "[]":[], "<>":[]} bpstack = {} for i in xrange(len(s)): if s[i] in "(){}[]<>": no = 0 ### opening if s[i] in "([{<": if s[i] == "(": tmp_stack["()"].append(i) elif s[i] == "[": tmp_stack["[]"].append(i) elif s[i] == "{": tmp_stack["{}"].append(i) elif s[i] == "<": tmp_stack["<>"].append(i) #closing elif s[i] in ")]}>": if s[i] == ")": no = tmp_stack["()"].pop() elif s[i] == "]": no = tmp_stack["[]"].pop() elif s[i] == "}": no = tmp_stack["{}"].pop() elif s[i] == ">": no = tmp_stack["<>"].pop() bpstack[no] = i # save basepair in the format {opening base id (opening seq constr,(closing base id, closing seq constr))} bpstack[i] = no # save basepair in the format {closing base id (closing seq constr,(opening base id, opening seq constr))} elif s[i] == ".": # no structural constaint given: produce entry, which references itself as a base pair partner.... bpstack[i] = i elif s[i] in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz": bpstack[i] = i return (bpstack, getLP(bpstack)) ############################################ # TERRAIN GRAPH MANAGEMENT ############################################ def maprange( a, b, s): """ Mapping function """ (a1, a2), (b1, b2) = a, b return b1 + ((s - a1) * (b2 - b1) / (a2 - a1)) def getConstraint(TE, BPstack, IUPAC, IUPAC_compatibles, IUPAC_reverseComplements): """ Dependend on the situation in the constraint an the respective path section, setting wether a specific constraint can be given or not (for that path section) """ # TE :: transition element / path section under dispute # id1 :: id of the position of the caharacter to which the transition is leading to # id2 :: id of the position of the character, which is listed in the BPinformation, it can be id1 as well, when no bp is present # val :: BPstack information of the specific position # constr1 :: constraining character of pos id1 # constr2 :: constraining character of pos id2 id1 = int(TE.split(".")[0]) val = BPstack[id1] # check out the value of the destination character in the basepair/constraint stack constr1 = val[0] # getting the constraint character of position id1 id2 = int(val[1][0]) # getting position id2 constr2 = val[1][1] # getting the sequence constraint for position id2 targetNucleotide = TE.split(".")[1][-1:] # where the edge is leading to lowerCase = constr1.islower() if lowerCase: return 1 else: c1 = set(IUPAC[constr1.upper()]) # getting all explicit symbols of c1 c2 = set(IUPAC_reverseComplements[constr2.upper()]) # getting the reverse complement explicit symbols of c2 if targetNucleotide in c1: if id1 == id2: return 1 else: if targetNucleotide in c2: return 1 else: return 0 else: return 0 def initTerrain(s): """ Initialization of the terrain graph in equidistant mode. Vertices are modeled implicitly. """ nt = ["A","C","G","U"] nt2 = ["AA","AC","AG","AU","CA","CC","CG","CU","GA","GC","GG","GU","UA","UC","UG","UU"] # Allowed dinucleotides e = {} pathlength = 1 pheromone = 1 for p in xrange(len(s)): if p == 0: for i in nt: e["%s.%s"%(p,i)] = (pheromone, pathlength) elif p > 0: for n in nt2: e["%s.%s"%(p,n)] = (pheromone, pathlength) return e def applyTerrainModification(terrain, s, GC, SC, BPstack, IUPAC, IUPAC_compatibles, IUPAC_reverseComplements): """ Dependent on the input, this function modifies the terrain accordingly. It reflects the pheromone and path length adjustment. Directly affected edges and their implicit neighbor edges are removed from the graph. """ actGC = {} for i in GC: v, s1, s2 = i for j in range(s1-1,s2): actGC[j] = v dels = [] for terrainelement in sorted(terrain): pheromone, pathlength = terrain[terrainelement] pheromone = getConstraint(terrainelement, BPstack, IUPAC, IUPAC_compatibles, IUPAC_reverseComplements) pathlength = getConstraint(terrainelement, BPstack, IUPAC, IUPAC_compatibles, IUPAC_reverseComplements) pathlength = applyGCcontributionPathAdjustment(pathlength, actGC, terrainelement) if pheromone * pathlength == 0: dels.append(terrainelement) terrain[terrainelement] = (pheromone, pathlength,[]) further_dels = {} for terrainelement in sorted(dels): pos, nucs = terrainelement.split(".") if int(pos) < len(s)-1: to_nt = nucs[-1:] successor_pos = int(pos) + 1 for i in ["A", "C", "G", "U"]: del_element = str(successor_pos) + "." + to_nt + i further_dels[del_element] = 1 further_dels[terrainelement] = 1 # deleting the inbound and outbound edges, which are forbidden for terrainelement in further_dels: del terrain[terrainelement] # allocate the appropriate children of edges for terrainelement in terrain: pheromone, pathlength, children = terrain[terrainelement] pos, nucs = terrainelement.split(".") if int(pos) < len(s): to_nt = nucs[-1:] successor_pos = int(pos) + 1 for i in ["A", "C", "G", "U"]: if str(successor_pos) + "." + to_nt + i in terrain: children.append(str(successor_pos) + "." + to_nt + i) terrain[terrainelement] = (pheromone, pathlength,children) # ADDING THE START EDGES starts = [] for i in ["A", "C", "G", "U"]: if str(0) + "." + i in terrain: starts.append(str(0) + "." + i) terrain["00.XY"] = (1, 1, starts) return (terrain, BPstack) def applyGCcontributionPathAdjustment(pathlength, actGC, terrainelement): """ GC path length contribution calculation. """ GCadjustment = 1.5 minimum = 0.5 upper = GCadjustment lower = minimum position , nts = terrainelement.split(".") nt = nts[-1:] if nt == "A" or nt == "U": pathlength = pathlength * maprange( (0, 1) , (lower, upper), actGC[int(position)]) elif nt == "G" or nt == "C": pathlength = pathlength * maprange( (1, 0) , (lower, upper), actGC[int(position)]) return pathlength def printTerrain(terrain): """ Prints a given terrain """ #print sorted(terrain.keys()) tmp_i = "0" tmp_c = 0 terrain = terrain[0] for a, i in enumerate(sorted(terrain.keys())): #print a if i.split(".")[0] != tmp_i: print "\nElements:", tmp_c,"\n#########################\n", i, terrain[i] tmp_c = 1 tmp_i = i.split(".")[0] else: print i, terrain[i] tmp_c += 1 print "\nElements:", tmp_c print "#########################" print len(terrain) ######################################## # SEQUENCE ASSEMBLY a.k.a. The ANTWALK ######################################## def pickStep(tmp_steps, summe): """ Selects a step within the terrain """ if len(tmp_steps) == 1: return tmp_steps[0][1] # returning the nucleotide of the only present step else: rand = random.random() # draw random number mainval = 0 for choice in xrange(len(tmp_steps)): val, label = tmp_steps[choice] mainval += val/float(summe) if mainval > rand: # as soon, as the mainval gets larger than the random value the assignment is done return label def getPath(s, tmp_terrain, tmp_BPstack, alpha, beta, IUPAC, IUPAC_reverseComplements): """ Performs a walk through the terrain and assembles a sequence, while respecting the structure constraint and IUPAC base complementarity of the base pairs GU, GC and AT """ nt = ["A","C","G","U"] prev_edge = "00.XY" sequence = "" while len(sequence) < len(s): coming_from = sequence[-1:] summe = 0 steps = [] i = len(sequence) allowed_nt = "ACGU" # base pair closing case check, with subsequent delivery of a reduced allowed nt set if i > tmp_BPstack[i][1][0]: jump = tmp_BPstack[i][1][0] nuc_at_jump = sequence[jump] allowed_nt = IUPAC_reverseComplements[nuc_at_jump] # Checking for every possible nt if it is suitable for the selection procedure for edge in tmp_terrain[prev_edge][-1]: if edge[-1:] in allowed_nt: pheromone, PL , children = tmp_terrain[edge] value = ((float(pheromone * alpha)) + ((1/float(PL)) * beta)) summe += value steps.append((value, edge)) prev_edge = pickStep(steps, summe) sequence += prev_edge[-1:] return sequence def getPathFromSelection( aps, s, terrain, alpha, beta, RNAfold, RNAfold_pattern, GC, SC, LP, LBP_management, verbose, IUPAC_compatibles, degreeOfSequenceInducement, IUPAC_reverseComplements, IUPAC, pseudoknots,temperature, strategy): """ Returns the winning path from a selection of pathes... """ terr, BPs = terrain win_path = 0 for i in xrange(aps): # Generate Sequence path = getPath(s, terr, BPs, alpha, beta, IUPAC, IUPAC_reverseComplements) # Measure sequence features and transform them into singular distances distance_structural = float(getStructuralDistance(s, SC , path, RNAfold, verbose, LP, LBP_management, BPs, RNAfold_pattern, IUPAC_compatibles, degreeOfSequenceInducement, pseudoknots, temperature, strategy)) distance_GC = float(getGCDistance(GC, path)) distance_seq = float(getSequenceEditDistance(SC, path)) # Calculate Distance Score D = distance_structural + distance_GC + distance_seq # SELECT THE BEST-OUT-OF-k-SOLUTIONS according to distance score if i == 0: win_path = (path, D, distance_structural, distance_GC, distance_seq) else: if D < win_path[1]: win_path = (path, D, distance_structural, distance_GC, distance_seq) return win_path ################################ # STRUCTURE PREDICTION METHODS ################################ def getPKStructure(sequence, temperature, mode = "A"): """ Initialization pKiss mfe pseudoknot prediction """ p2p = "pKiss_mfe" p = subprocess.Popen( ([p2p, '-T', str(temperature), '-s', mode, sequence]), #shell = True, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE, close_fds = True) # get linewise stdout output pKiss_output = p.stdout.read().split("\n"); # init with empty sequence structure = "." * len(sequence) if (len(pKiss_output) > 1) : structure = "".join(pKiss_output[1].split(" ")[3]) else : print "DEBUG getPKStructure() : call stdout =" print pKiss_output print "DEBUG getPKStructure() : call stderr =" print p.stderr.read() print "DEBUG END (empty structure returned)" # close subprocess handle p.communicate() exit(-1) # close subprocess handle p.communicate() return structure def init_RNAfold(version, temperature, paramFile = ""): """ Initialization RNAfold listener """ p2p = "" t = "-T " + str(temperature) P = "" if paramFile != "": P = "-P " + paramFile p2p = "RNAfold" p = subprocess.Popen( ([p2p, '--noPS', '-d 2', t, P]), #shell = True, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE, close_fds = True) return p def consult_RNAfold(seq, p): """ Consults RNAfold listener """ p.stdin.write(seq+'\n') out = "" for i in xrange(2): out += p.stdout.readline() return out def getRNAfoldStructure(struct2, process1): """ Retrieves folded structure of a RNAfold call """ RNAfold_pattern = re.compile('.+\n([.()]+)\s.+') RNAfold_match = RNAfold_pattern.match(consult_RNAfold(struct2, process1)) current_structure = "" return RNAfold_match.group(1) ########################################### # DISTANCE MEASURES AND RELATED FUNCTIONS ########################################### def getInducingSequencePositions(Cseq, degreeOfSequenceInducement): """ Delimiting the degree of structure inducement by the supplied sequence constraint. 0 : no sequence induced structure constraint 1 : "ACGT" induce structure (explicit nucleotide structure inducement level) 2 : "MWKSYR" and "ACGT" (explicit and double instances) 3 : "BDHV" , "MWKSYR" and "ACGT" (explicit, double, and triple instances) """ setOfNucleotides = "" # resembling the "0"-case if degreeOfSequenceInducement == 1: setOfNucleotides = "ACGU" elif degreeOfSequenceInducement == 2: setOfNucleotides = "ACGUMWKSYR" elif degreeOfSequenceInducement == 3: setOfNucleotides = "ACGUMWKSYRBDHV" tmpSeq = "" listset = setOfNucleotides for pos in Cseq: if pos not in listset: tmpSeq += "N" else: tmpSeq += pos return setOfNucleotides, tmpSeq def getBPDifferenceDistance(stack1, stack2): """ Based on the not identical amount of base pairs within both structure stacks """ d = 0 for i in stack1.keys(): # check base pairs in stack 1 if i < stack1[i] and stack1[i] != stack2[i]: d += 1 # check base pairs in stack 2 for i in stack2.keys(): if i < stack2[i] and stack1[i] != stack2[i]: d += 1 return d def getStructuralDistance(target_structure, Cseq, path, RNAfold, verbose, LP, LBP_management, BP, RNAfold_pattern, IUPAC_compatibles, degreeOfSequenceInducement, pseudoknots, temperature, strategy): """ Calculator for Structural Distance """ # fold the current solution's sequence to obtain the structure current_structure = "" ### Selection of specific folding mechanism if pseudoknots: current_structure = getPKStructure(path,temperature, strategy) else: RNAfold_match = RNAfold_pattern.match(consult_RNAfold(path, RNAfold)) current_structure = RNAfold_match.group(1) # generate the current structure's base-pair stack bp = getbpStack(current_structure)[0] # deriving a working copy of the target structure# add case-dependend structural constraints in case of lonley basepairs formation tmp_target_structure_bp = getbpStack(target_structure)[0] ### LONELY BASE PAIR MANAGEMENT # add case-dependend structural constraints in case of lonley basepairs formation if LBP_management: for lp in LP: if bp[lp] == LP[lp]: # if the base pair is within the current solution structure, re-add the basepair into the constraint structure. tmp_target_structure_bp[lp] = LP[lp] tmp_target_structure_bp[LP[lp]] = lp ### ### "ABCDEFGHIJKLMNOPQRSTUVWXYZ" -> HARD CONSTRAINT ### "abcdefghijklmnopqrstuvwxyz" -> SOFT CONSTRAINT ### # FUZZY STRUCTURE CONSTRAINT MANAGEMENT - HARD CONSTRAINT # check for all allowed hard implicit constraint block declarators dsoftCstr = 0 for c in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz": occurances = [] for m in re.finditer(c, target_structure): # search for a declarator in the requested structure occurances.append(m.start()) # save the corresponding index # transform declarator into single stranded request for i in occurances: tmp_target_structure_bp[i] = i # infer a base pair within the block declarated positions, if the current structure provides it. fuzzy_block_penalty = 1 for i in occurances: for j in occurances: if i < j: if bp[i] == j: fuzzy_block_penalty = 0 # if a base pair is present within a current fuzzy_block, no penalty is risen tmp_target_structure_bp[i] = bp[i] tmp_target_structure_bp[bp[i]] = i if len(occurances) > 0: if c.isupper(): dsoftCstr += fuzzy_block_penalty # INDUCES STRUCTURE MANAGEMENT # Checking Cseq influence and it's induced basepairs IUPACinducers, tmp_Cseq = getInducingSequencePositions(Cseq, degreeOfSequenceInducement) if len(Cseq.strip("N")) > 0: #print "Processing Cseq influence" # Iterate over all positions within the Base Pair stack for i in BP: # Check for each base index i if i < bp[i]: # if the current index is samller that the affiliated in the basepair stack of the current solution bp_j = bp[i] # Actual j index of the current solution BP_j = BP[i][1][0] # j index of the requested structure if (i != bp_j and i == BP_j and BP[i][0] in IUPACinducers ): # if i pairs with some other base in the current structure, and i is requested single stranded and the Sequence constraint is allowed to induce... if (BP[bp_j][1][0] == bp_j and BP[bp_j][0] in IUPACinducers):# If position j is requested singlestranded and position j nucleotide can induce base pairs #if isCompatible(bp[i][0], bp[i][1][1], IUPAC_compatibles): # If both nucleotides, i and j are actually compatible tmp_target_structure_bp[i] = bp[i] tmp_target_structure_bp[bp_j] = i dsreg = getBPDifferenceDistance(tmp_target_structure_bp, bp) # CHECK FOR ALL DETERMINED LONELY BASE PAIRS (i 0: d = d - abs(nt_coeff - c) * pc_nt elif d == 0: pass d = abs(round(d, 7)) D += d return D def getSequenceEditDistance(SC, path): """ Calculate sequence edit distance of a solution to the constraint """ IUPAC = {"A":"A", "C":"C", "G":"G", "U":"U", "R":"AG", "Y":"CU", "S":"GC", "W":"AU","K":"GU", "M":"AC", "B":"CGU", "D":"AGU", "H":"ACU", "V":"ACG", "N":"ACGU"} edit = 0 for i in xrange(len(SC)): if path[i] not in IUPAC[SC[i].upper()]: edit += 1 return edit/float(len(path)) ######################### # TERRAIN GRAPH EDITING ######################### def evaporate(t, er): """ Evaporate the terrain's pheromone trails """ terr, BP = t c = 1 for key in terr: p,l,c = terr[key] p *= (1-er) terr[key] = (p, l, c) def trailBlaze(p, c_s, s, ds, dgc, dseq, t, correction_terms, BPstack, verbose): """ Pheromone Update function accorinding to the quality of the solution """ terr, BP = t bpstack, LP = getbpStack(c_s) struct_correction_term , GC_correction_term, seq_correction_term = correction_terms omega = 2.23 bs = updateValue(ds, struct_correction_term, omega) bGC = updateValue(dgc, GC_correction_term, omega) bSeq = updateValue(dseq, seq_correction_term, omega) d = bs + bGC + bSeq transitions = getTransitions(p) for trans in xrange(len(transitions)): # for each transition in the path id1 = int(transitions[trans].split(".")[0]) tar_id2 = int(BPstack[id1][1][0]) # getting requested position id2 curr_id2 = int(bpstack[id1]) # getting the current situation multiplicator = 0 if tar_id2 == curr_id2 and id1 != tar_id2 and id1 != curr_id2: # case of a base pair, having both brackets on the correct position multiplicator = 1 elif tar_id2 == curr_id2 and id1 == tar_id2 and id1 == curr_id2: # case of a single stranded base in both structures multiplicator = 1 p, l, c = terr[transitions[trans]] # getting the pheromone and the length value of the single path transition p += d * multiplicator terr[transitions[trans]] = (p, l, c) # updating the values wihtin the terrain's t = (terr, BP) def getTransitions(p): """ Retreive transitions of a specific path/sequence """ transitions = [] for pos in xrange(len(p)): if pos == 0: transitions.append(str(pos) + "." + p[pos]) else: insert = p[pos-1] + p[pos] transitions.append(str(pos) + "." + insert) return transitions def updateValue(distance, correction_term, omega): """ Retrieves a distance dependend pheromone value """ if correction_term == 0: return 0 else: if distance == 0: return omega * correction_term else: return (1/float(distance)) * correction_term def updateTerrain(p, c_s, s, ds, dgc, dseq, t, er, correction_terms, BPstack, verbose, ant_count): """ General updating function """ evaporate(t,er) trailBlaze(p, c_s, s, ds, dgc, dseq, t, correction_terms, BPstack, verbose) ####################### # CONVERGENCE MEASURE ####################### def inConvergenceCorridor(d_struct, d_gc, d_seq, BS_d_struct, BS_d_gc, BS_d_seq): """ Check if a solutions qualities are within the convergence corridor """ struct_var = (BS_d_struct + BS_d_struct/float(100) * 5) gc_var = (BS_d_gc + BS_d_gc/float(100) * 5) seq_var = (BS_d_seq + BS_d_seq/float(100) * 5) if d_struct <= struct_var and d_gc <= gc_var and d_seq <= seq_var: return True else: return False ############################## # TARGET GC VALUE MANAGEMENT ############################## def getGCSamplingValue(GC, tGCmax, tGCvar): """ Returns a suitable GC value, dependend on the user input: Either returning the single GC value, which the user entered, or a smpled GC value from a designated distribution in it's interavals """ returnval = 0 if tGCmax == -1.0 and tGCvar == -1.0: # regular plain tGC value as requested return GC elif tGCmax != -1.0 and tGCvar == -1.0: # uniform distribution tGC value sampling if GC < tGCmax: tmp_GC = tGCmax tGCmax = GC GC = tmp_GC while returnval <= 0: returnval = float(numpy.random.uniform(low=GC, high=tGCmax, size=1)) return returnval elif tGCmax == -1.0 and tGCvar != -1.0: # normal distribution tGC value sampling while returnval <= 0: returnval = float(numpy.random.normal(GC, tGCvar, 1)) return returnval def reachableGC(C_seq): """ Checks if a demanded GC target content is reachable in dependence with the given sequence constraint. For each explicit and ambiguous character definition within the constraint, the respective possibilities are elicited: "A" counts as "A", but for "B", also an "U" is possible, at the same time, "G" or "C" are possible as well. So two scenarios can be evaluated: A minimum GC content, which is possible and a maximum GC content. For this, for all not "N" letters their potential towards their potentials is evaluated and counted in the respective counters for min and max GC content. Only those characters are taken into concideration which would enforce a categorial pyrimidine/purine decision. (ACGU, SW) """ nucleotide_contribution = 1/float(len(C_seq)) * 1 minGC = 0.0 maxGC = 1.0 for i in C_seq: if i != "N": if i == "A" or i == "U": maxGC -= nucleotide_contribution elif i == "C" or i == "G": minGC += nucleotide_contribution elif i == "S":#(G or C) minGC += nucleotide_contribution elif i == "W":#(A or T/U): maxGC -= nucleotide_contribution return (minGC, maxGC) def parse_GC_management(parsed_tGC, l, sequence_constraint): if len(parsed_tGC) == 1 and type(parsed_tGC[0]) is float: # CASE Only one tGC value is defined, which needs to account for the whole terrain tgc = parsed_tGC.pop() parsed_tGC.append((tgc, 1, l)) for t in parsed_tGC: if len(t) != 3: print "Error :: Not enough tGC and affiliated areas declarations" exit(1) check_set = set(range(1,l + 1)) curr_set = set() for i, area in enumerate(parsed_tGC): # CHECK if the areas are consistent and do not show disconnectivity. v, s1, s2 = area if i < 0 or i > 1: print "Error: Chosen tGC > %s < not in range [0,1]" % (i) exit(1) tmp_set = set(range(int(s1), int(s2 + 1))) if len(curr_set.intersection(tmp_set)) == 0: curr_set = curr_set.union(tmp_set) else: print "Error: Double defined tGC declaration area sector detected. Nucleotide positions", ", ".join(str(e) for e in curr_set.intersection(tmp_set)), "show(s) redundant tGC declaration" exit(1) if len(curr_set.symmetric_difference(check_set)) != 0: print "Error: Undefined tGC area sectors detected. Nucleotide positions", ", ".join(str(e) for e in curr_set.symmetric_difference(check_set)), "is/are not covered." exit(1) for tgc in parsed_tGC: # CHECK if the specified GC values can be reached at all... v, start, stop = tgc tmp_sc = sequence_constraint[start:stop + 1] minGC, maxGC = reachableGC(tmp_sc) if v > maxGC or v < minGC: print >> sys.stderr, "WARNING: Chosen target GC %s content is not reachable. The selected sequence constraint contradicts the tGC constraint value." % (v) print >> sys.stderr, "Sequence Constraint allows tGC only to be in [%s,%s]" % (minGC, maxGC) exit (1) return parsed_tGC ################################################### # PRINCIPAL EXECUTION ROUTINES OF THE ACO METHOD ################################################### def runColony(s, SC, objective_to_target_distance, GC, alpha, beta, evaporation_rate, correction_terms, verbose, LBP_management, IUPAC, IUPAC_compatibles, degreeOfSequenceInducement, IUPAC_reverseComplements, termination_convergence, convergence_count, reset_limit, improve, temperature, paramFile, pseudoknots, strategy): """ Execution function of a single ant colony finding one solution sequence """ # Constraint Checks prior to execution checkSimilarLength(str(s), str(SC)) isValidStructure(s) checkSequenceConstraint(str(SC)) BPstack, LP = getBPStack(s, SC) checkConstaintCompatibility(BPstack, SC, IUPAC_compatibles) retString = "" retString2 = [] start_time = time.time() max_time = 600 # seconds # INITIALIZATION OF Vienna RNAfold RNAfold = init_RNAfold(213, temperature, paramFile) RNAfold_pattern = re.compile('.+\n([.()]+)\s.+') terrain = initTerrain(s) terrain = applyTerrainModification(terrain, s, GC, SC, BPstack, IUPAC, IUPAC_compatibles, IUPAC_reverseComplements) global_ant_count = 0 global_best_ants = 0 criterion = False met = True ant_no = 1 prev_res = 0 seq = "" counter = 0 dstruct_log = [] dGC_log = [] distance_structural = 1000 distance_GC = 1000 distance_seq = 1000 convergence = convergence_count convergence_counter = 0 resets = 0 path = "" curr_structure = "" Dscore = 100000 distance_structural = 10000 distance_GC = 10000 distance_seq = 10000 best_solution = (path, curr_structure, Dscore, distance_structural, distance_GC, distance_seq) best_solution_local = (path, curr_structure, Dscore, distance_structural, distance_GC, distance_seq) best_solution_since = 0 ants_per_selection = 10 # IN CASE OF LP-MANAGEMENT if LBP_management: if len(LP) > 0 : for lp in LP: s = substr(lp + 1, s, ".") s = substr(LP[lp] + 1, s, ".") init = 1 while criterion != met and getUsedTime(start_time) < max_time: iteration_start = time.time() global_ant_count += 1 global_best_ants += 1 path_info = getPathFromSelection(ants_per_selection, s, terrain, alpha, beta, RNAfold, RNAfold_pattern, GC, SC, LP, LBP_management, verbose, IUPAC_compatibles, degreeOfSequenceInducement, IUPAC_reverseComplements, IUPAC, pseudoknots, temperature, strategy) distance_structural_prev = distance_structural distance_GC_prev = distance_GC distance_seq_prev = distance_seq path, Dscore , distance_structural, distance_GC, distance_seq = path_info curr_structure = "" if pseudoknots: curr_structure = getPKStructure(path,temperature, strategy) else: curr_structure = getRNAfoldStructure(path, RNAfold) curr_solution = (path,curr_structure, Dscore, distance_structural, distance_GC, distance_seq) # BEST SOLUTION PICKING if improve == "h": # hierarchical check # for the global best solution if distance_structural < best_solution[3] or (distance_structural == best_solution[3] and distance_GC < best_solution[4]): best_solution = curr_solution ant_no = 1 # for the local (reset) best solution if distance_structural < best_solution_local[3] or (distance_structural == best_solution_local[3] and distance_GC < best_solution_local[4]): best_solution_local = curr_solution elif improve == "s": #score based check # store best global solution if Dscore < best_solution[2]: best_solution = curr_solution ant_no = 1 # store best local solution for this reset if Dscore < best_solution_local[2]: best_solution_local = curr_solution if verbose: print "SCORE " + str(Dscore) + " Resets " + str(resets) + " #Ant " + str(global_ant_count) + " out of " + str(ants_per_selection) + " cc " + str(convergence_counter) print s, " <- target struct" print best_solution[0] , " <- BS since ", str(best_solution_since), "Size of Terrrain:", len(terrain[0]) print best_solution[1] , " <- BS Dscore " + str(best_solution[2]) + " ds " + str(best_solution[3]) + " dGC " + str(best_solution[4]) + " dseq " + str(best_solution[5])+ " LP " + str(len(LP)) + " <- best solution stats" print curr_structure, " <- CS" print path, print " <- CS", "Dscore", str(Dscore), "ds", distance_structural, "dGC", distance_GC, "GC", getGC(path)*100, "Dseq", distance_seq #### UPDATING THE TERRAIN ACCORDING TO THE QUALITY OF THE CURRENT BESTO-OUT-OF-k SOLUTION updateTerrain(path, curr_structure, s, distance_structural,distance_GC, distance_seq, terrain, evaporation_rate, correction_terms, BPstack, verbose, global_ant_count) if verbose: print "Used time for one iteration", time.time() - iteration_start # CONVERGENCE AND TERMINATION CRITERION MANAGEMENT if inConvergenceCorridor(curr_solution[3], curr_solution[4], curr_solution[5], best_solution_local[3], best_solution_local[4], best_solution_local[5]): convergence_counter += 1 if distance_structural_prev == distance_structural and distance_GC_prev == distance_GC and distance_seq_prev == distance_seq: convergence_counter += 1 if best_solution[3] == objective_to_target_distance: if best_solution[4] == 0.0: if best_solution[5] == 0.0: break ant_no = ant_no + 1 convergence_counter -= 1 else: ant_no = 1 if ant_no == termination_convergence or resets >= reset_limit or global_ant_count >= 100000 or best_solution_since == 5: break # RESET if ant_no < termination_convergence and convergence_counter >= convergence: terrain = initTerrain(s) terrain = applyTerrainModification(terrain, s, GC, SC, BPstack, IUPAC, IUPAC_compatibles, IUPAC_reverseComplements) criterion = False met = True ant_no = 1 prev_res = 0 pre_path = "_" * len(s) path = "" curr_structure = "" counter = 0 Dscore = 100000 distance_structural = 1000 distance_GC = 1000 distance_seq = 1000 best_solution_local = (path, curr_structure, Dscore, distance_structural, distance_GC, distance_seq) convergence = convergence_count convergence_counter = 0 if resets == 0: sentinel_solution = best_solution best_solution_since += 1 else: if best_solution[2] < sentinel_solution[2]: sentinel_solution = best_solution best_solution_since = 0 else: best_solution_since += 1 resets += 1 duration = getUsedTime(start_time) retString += "|Ants:" + str(global_ant_count) retString += "|Resets:" + str(resets) + "/" + str(reset_limit) retString += "|AntsTC:" + str(termination_convergence) retString += "|CC:" + str(convergence_count) retString += "|IP:" + str(improve) retString += "|BSS:" + str(best_solution_since) sequence = best_solution[0] struct = best_solution[1] retString += "|LP:" + str(len(LP)) retString += "|ds:" + str(getStructuralDistance(s,SC, sequence, RNAfold, verbose, LP, LBP_management, BPstack, RNAfold_pattern, IUPAC_compatibles, degreeOfSequenceInducement, pseudoknots, temperature, strategy)) retString += "|dGC:" + str(best_solution[4]) retString += "|GC:" + str(getGC(sequence)*100) retString += "|dseq:" + str(getSequenceEditDistance(SC, sequence)) retString += "|L:" + str(len(sequence)) retString += "|Time:" + str(duration) retString2.append(sequence) retString2.append(struct) # CLOSING THE PIPES TO THE PROGRAMS if (RNAfold is not None) : RNAfold.communicate() return (retString, retString2) def findSequence(structure, Cseq, tGC, colonies, name, alpha, beta, evaporation_rate, struct_correction_term, GC_correction_term, seq_correction_term, degreeOfSequenceInducement, file_id, verbose, output_verbose, tGCmax, tGCvar, termination_convergence, convergence_count, reset_limit, improve, seed, temperature, paramFile, pseudoknots, strategy, useGU, LBP_management, return_mod = False): """ MAIN antaRNA - ant assembled RNA """ if seed != "none": random.seed(seed) if Cseq == "": sequenceconstraint = "N" * len(structure) else: sequenceconstraint = str(Cseq) alpha = float(alpha) beta = float(beta) evaporation_rate = float(evaporation_rate) struct_correction_term = float(struct_correction_term) GC_correction_term = float(GC_correction_term) seq_correction_term = float(seq_correction_term) colonies = int(colonies) file_id = str(file_id) tmp_verbose = verbose tmp_output_verbose = output_verbose verbose = tmp_output_verbose # Due to later change, this is a twistaround and a switching of purpose output_verbose = tmp_verbose # Due to later change, this is a twistaround and a switching of purpose correction_terms = struct_correction_term, GC_correction_term, seq_correction_term temperature = float(temperature) print_to_STDOUT = (file_id == "STDOUT") LBP_management = LBP_management useGU = useGU if return_mod == False: if print_to_STDOUT == False: outfolder = '/'.join(file_id.strip().split("/")[:-1]) curr_dir = os.getcwd() if not os.path.exists(outfolder): os.makedirs(outfolder) os.chdir(outfolder) sequenceconstraint = transform(sequenceconstraint) ############################################### # Allowed deviation from the structural target: objective_to_target_distance = 0.0 # Loading the IUPAC copatibilities of nuleotides and their abstract representing symbols IUPAC = {"A":"A", "C":"C", "G":"G", "U":"U", "R":"AG", "Y":"CU", "S":"GC", "W":"AU","K":"GU", "M":"AC", "B":"CGU", "D":"AGU", "H":"ACU", "V":"ACG", "N":"ACGU"} IUPAC_compatibles = loadIUPACcompatibilities(IUPAC, useGU) IUPAC_reverseComplements = {} if useGU == False: ## Without the GU basepair IUPAC_reverseComplements = {"A":"U", "C":"G", "G":"C", "U":"A", "R":"UC", "Y":"AG", "S":"GC", "W":"UA","K":"CA", "M":"UG", "B":"AGC", "D":"ACU", "H":"UGA", "V":"UGC", "N":"ACGU"} else: ## allowing the GU basepair IUPAC_reverseComplements = {"A":"U", "C":"G", "G":"UC", "U":"AG", "R":"UC", "Y":"AG", "S":"UGC", "W":"UAG","K":"UCAG", "M":"UG", "B":"AGCU", "D":"AGCU", "H":"UGA", "V":"UGC", "N":"ACGU"} result = [] for col in xrange(colonies): # Checking the kind of taget GC value should be used GC = [] if len(tGC) == 1: GC.append((getGCSamplingValue(tGC[0][0], tGCmax, tGCvar), tGC[0][1], tGC[0][2])) else: GC = tGC # Actual execution of a ant colony procesdure output_v, output_w = runColony(structure, sequenceconstraint, objective_to_target_distance, GC, alpha, beta, evaporation_rate, correction_terms, verbose, LBP_management, IUPAC, IUPAC_compatibles, degreeOfSequenceInducement, IUPAC_reverseComplements, termination_convergence, convergence_count, reset_limit, improve, temperature, paramFile, pseudoknots, strategy) # Post-Processing the output of a ant colony procedure line = ">" + name + str(col) if output_verbose: GC_out = "" for i in GC: v, s1, s2 = i GC_out += str(s1) + "-" + str(s2) + ">" + str(v) + ";" GC_out = GC_out[:-1] line += "|Cstr:" + structure + "|Cseq:" + sequenceconstraint + "|Alpha:" + str(alpha) + "|Beta:" + str(beta) + "|tGC:" + GC_out + "|ER:" + str(evaporation_rate) + "|Struct_CT:" + str(struct_correction_term) + "|GC_CT:" + str(GC_correction_term) + "|Seq_CT:" + str(seq_correction_term) + output_v + "\n" + "\n".join(output_w) else: line += "\n" + output_w[0] if return_mod == False: if print_to_STDOUT: print line else: if col == 0: print2file(file_id, line, 'w') else: print2file(file_id, line, 'a') else: result.append(line) if return_mod == True: return result if print_to_STDOUT == False: os.chdir(curr_dir) def execute(args): """ CHECK AND PARSE THE COMMAND LINE STUFF """ # Checking the arguments, parsed from the shell ############################################### name = args.name structure_constraint = args.Cstr if args.Cseq == "": sequence_constraint = "N" * len(structure_constraint) else: sequence_constraint = args.Cseq seed = args.seed alpha = args.alpha beta = args.beta tGC = parse_GC_management(args.tGC, len(structure_constraint), sequence_constraint) evaporation_rate = args.ER struct_correction_term = args.Cstrweight GC_correction_term = args.Cgcweight seq_correction_term = args.Cseqweight colonies = args.noOfColonies degreeOfSequenceInducement = args.level file_id = args.output_file verbose = args.verbose output_verbose = args.output_verbose tGCmax = args.tGCmax tGCvar = args.tGCvar LBP_management = args.noLBPmanagement termination_convergence = args.antsTerConv convergence_count = args.ConvergenceCount temperature = args.temperature reset_limit = args.Resets improve = args.improve_procedure ### RNAfold parameterfile paramFile = args.paramFile # Using the pkiss program under user changeable parameters pseudoknots = args.pseudoknots # Loading the optimized parameters for pseudoknots and ignore user input if args.pseudoknot_parameters: alpha = 1.0 beta = 0.1 evaporation_rate = 0.2 struct_correction_term = 0.1 GC_correction_term = 1.0 seq_correction_term = 0.5 termination_convergence = 50 convergence_count = 100 strategy = args.strategy useGU = args.useGUBasePair checkForViennaTools() if pseudoknots: checkForpKiss() findSequence(structure_constraint, sequence_constraint, tGC, colonies, name, alpha, beta, evaporation_rate, struct_correction_term, GC_correction_term, seq_correction_term, degreeOfSequenceInducement, file_id, verbose, output_verbose, tGCmax, tGCvar, termination_convergence, convergence_count, reset_limit, improve, seed, temperature, paramFile, pseudoknots, strategy, useGU, LBP_management) def exe(): """ MAIN EXECUTABLE WHICH PARSES THE INPUT COMMAND LINE """ argument_parser = argparse.ArgumentParser( fromfile_prefix_chars='@', description = """ ######################################################################### # antaRNA - ant assembled RNA # # -> Ant Colony Optimized RNA Sequence Design # # ------------------------------------------------------------ # # Robert Kleinkauf (c) 2015 # # Bioinformatics, Albert-Ludwigs University Freiburg, Germany # ######################################################################### - For antaRNA only the VIENNNA RNA Package must be installed on your linux system. antaRNA will only check, if the executables of RNAfold of the ViennaRNA package can be found. If those programs are not installed correctly, no output will be generated, an also no warning will be prompted. So the binary path of the Vienna Tools must be set up correctly in your system's PATH variable in order to run antaRNA correctly! - If you want to use the pseudoknot functionality, pKiss_mfe of the RNAshapes studion must be installed and callable as standalone in order to execute antaRNA. - antaRNA was only tested under Linux. - For questions and remarks please feel free to contact us at http://www.bioinf.uni-freiburg.de/ ##################### ## VERSION HISTORY ## ##################### v113: - multiple target GC values in different areas of the RNA v112: - including soft sequence constraint: lower case letters invoke the terrain at this position to be possible for all nucleotides. The position is then allocated according to the seq distance penalty within the quality measure. - improved reachableGC measure: now determines a minimum and a maximum rachable GC interval. Exits the program if the entered tGC is outside this interval. v111: - including LP management [1/0] - including strict and soft fuzzy structure constraints Lower case: soft fuzzy constraint: all structure is true, no structure is explicitly enforced Upper case: hard fuzzy constraint: at least one BP is required within one definition block in order to not rise a penalty v110: - parametrized pseudoknot webserver related paper version v109: - has now some extended RNAfold functionality (parameterfile selection, Temperature) """, epilog = """ Example calls: python antaRNA_vXY.py -Cstr "...(((...)))..." -tGC 0.5 -n 2 -v python antaRNA_vXY.py -Cstr ".........aaa(((...)))aaa........." -tGC 0.5 -n 10 --output_file /path/to/antaRNA_TESTRUN -v python antaRNA_vXY.py -Cstr "BBBBB....AAA(((...)))AAA....BBBBB" -Cseq "NNNNANNNNNCNNNNNNNNNNNGNNNNNNUNNN" --tGC 0.5 -n 10 ######################################################################### # --- Hail to the Queen!!! All power to the swarm!!! --- # ######################################################################### """, formatter_class=RawTextHelpFormatter ) argument_parser.convert_arg_line_to_args = convert_arg_line_to_args ### ARGUMENTS constraints = argument_parser.add_argument_group('Constraint Variables', 'Use to define an RNA constraint system.') constraints.add_argument("-Cstr", "--Cstr", help="Structure constraint using RNA dotbracket notation with fuzzy block constraint. \n(TYPE: %(type)s)\n\n", type=str, required=True) constraints.add_argument("-Cseq", "--Cseq", help="Sequence constraint using RNA nucleotide alphabet {A,C,G,U} and wild-card \"N\". \n(TYPE: %(type)s)\n\n", type=str, default = "") constraints.add_argument("-l", "--level", help="Sets the level of allowed influence of sequence constraint on the structure constraint [0:no influence; 3:extensive influence].\n(TYPE: %(type)s)\n\n", type=int, default = 1) constraints.add_argument("-tGC", "--tGC", help="Objective target GC content in [0,1].\n(TYPE: %(type)s)\n\n", type=parseGC, required=True, action = 'append') constraints.add_argument("-tGCmax", "--tGCmax", help = "Provides a maximum tGC value [0,1] for the case of uniform distribution sampling. The regular tGC value serves as minimum value.\n(DEFAULT: %(default)s, TYPE: %(type)s)\n\n", type=float, default=-1.0) constraints.add_argument("-tGCvar", "--tGCvar", help = "Provides a tGC variance (sigma square) for the case of normal distribution sampling. The regular tGC value serves as expectation value (mu).\n(DEFAULT: %(default)s, TYPE: %(type)s)\n\n", type=float, default=-1.0) constraints.add_argument("-t", "--temperature", help = "Provides a temperature for the folding algorithms.\n(DEFAULT: %(default)s, TYPE: %(type)s)\n\n", type=float, default=37.0) constraints.add_argument("-P", "--paramFile", help = "Changes the energy parameterfile of RNAfold. If using this explicitly, please provide a suitable energy file delivered by RNAfold. \n(DEFAULT: %(default)s, TYPE: %(type)s)\n\n", type=str, default="") constraints.add_argument("-GU", "--useGUBasePair", help="Allowing GU base pairs. \n\n", action="store_true") constraints.add_argument("-noLBP", "--noLBPmanagement", help="Disallowing antaRNA lonely base pair-management. \n\n", action="store_false", default =True) pk = argument_parser.add_argument_group('Pseudoknot Variables', 'Use in order to enable pseudoknot calculation. pKiss_mfe needs to be installed.') pk.add_argument("-p", "--pseudoknots", help = "Switch to pseudoknot based prediction using pKiss. Check the pseudoknot parameter usage!!!\n\n", action="store_true") pk.add_argument("-pkPar", "--pseudoknot_parameters", help = "Enable optimized parameters for the usage of pseudo knots (Further parameter input ignored).\n\n", action="store_true") pk.add_argument("--strategy", help = "Defining the pKiss folding strategy.\n(DEFAULT: %(default)s, TYPE: %(type)s)\n\n", type=str, default="A") output = argument_parser.add_argument_group('Output Variables', 'Tweak form and verbosity of output.') output.add_argument("-n", "--noOfColonies", help="Number of sequences which shall be produced. \n(TYPE: %(type)s)\n\n", type=int, default=1) output.add_argument("-of","--output_file", help="Provide a path and an output file, e.g. \"/path/to/the/target_file\". \n(DEFAULT: %(default)s, TYPE: %(type)s)\n\n", type=str, default="STDOUT") output.add_argument("--name", help="Defines a name which is used in the sequence output. \n(DEFAULT: %(default)s, TYPE: %(type)s)\n\n", type=str, default="antaRNA_") output.add_argument("-ov", "--output_verbose", help="Displayes intermediate output.\n\n", action="store_true") output.add_argument("-v", "--verbose", help="Prints additional features and stats to the headers of the produced sequences. Also adds the structure of the sequence.\n\n", action="store_true") aco = argument_parser.add_argument_group('Ant Colony Variables', 'Alter the behavior of the ant colony optimization.') aco.add_argument("-s", "--seed", help = "Provides a seed value for the used pseudo random number generator.\n(DEFAULT: %(default)s, TYPE: %(type)s)\n\n", type=str, default="none") aco.add_argument("-ip", "--improve_procedure", help = "Select the improving method. h=hierarchical, s=score_based.\n(DEFAULT: %(default)s, TYPE: %(type)s)\n\n", type=str, default="s") aco.add_argument("-r", "--Resets", help = "Amount of maximal terrain resets, until the best solution is retuned as solution.\n(DEFAULT: %(default)s, TYPE: %(type)s)\n\n", type=int, default=5) aco.add_argument("-CC", "--ConvergenceCount", help = "Delimits the convergence count criterion for a reset.\n(DEFAULT: %(default)s, TYPE: %(type)s)\n\n", type=int, default=130) aco.add_argument("-aTC", "--antsTerConv", help = "Delimits the amount of internal ants for termination convergence criterion for a reset.\n(DEFAULT: %(default)s, TYPE: %(type)s)\n\n", type=int, default=50) aco.add_argument("-a", "--alpha", help="Sets alpha, probability weight for terrain pheromone influence. [0,1] \n(DEFAULT: %(default)s, TYPE: %(type)s)\n\n", type=float, default=1.0) aco.add_argument("-b", "--beta", help="Sets beta, probability weight for terrain path influence. [0,1]\n(DEFAULT: %(default)s, TYPE: %(type)s)\n\n", type=float, default=1.0) aco.add_argument("-er", "--ER", help="Pheromone evaporation rate. \n(DEFAULT: %(default)s, TYPE: %(type)s)\n\n", type=float, default=0.2) aco.add_argument("-Cstrw", "--Cstrweight", help="Structure constraint quality weighting factor. [0,1]\n(DEFAULT: %(default)s, TYPE: %(type)s)\n\n", type=float, default=0.5) aco.add_argument("-Cgcw", "--Cgcweight", help="GC content constraint quality weighting factor. [0,1]\n(DEFAULT: %(default)s, TYPE: %(type)s)\n\n", type=float, default=5.0) aco.add_argument("-Cseqw", "--Cseqweight", help="Sequence constraint quality weighting factor. [0,1]\n(DEFAULT: %(default)s, TYPE: %(type)s)\n\n\n", type=float, default=1.0) args = argument_parser.parse_args() execute(args) ########################## # PROGRAM PRESENCE CHECK ########################## def checkForViennaTools(): """ Checking for the presence of the Vienna tools in the system by which'ing for RNAfold and RNAdistance """ RNAfold_output = subprocess.Popen(["which", "RNAfold"], stdout=subprocess.PIPE).communicate()[0].strip() if len(RNAfold_output) > 0 and RNAfold_output.find("found") == -1 and RNAfold_output.find(" no ") == -1: return True else: print "It seems the Vienna RNA Package is not installed on your machine. Please do so!" print "You can get it at http://www.tbi.univie.ac.at/" exit(0) def checkForpKiss(): """ Checking for the presence of pKiss """ pKiss_output = subprocess.Popen(["which", "pKiss_mfe"], stdout=subprocess.PIPE).communicate()[0].strip() if len(pKiss_output) > 0 and pKiss_output.find("found") == -1 and pKiss_output.find(" no ") == -1: return True else: print "It seems that pKiss is not installed on your machine. Please do so!" print "You can get it at http://bibiserv2.cebitec.uni-bielefeld.de/pkiss" exit(0) ######################### # ENTRY TO THE ANT HIVE ######################### if __name__ == "__main__": exe()