Source code for pypka.main

import os
import json
from delphi4py import DelPhi4py
from pdbmender.formats import gro2pdb, get_grobox_size, get_chains_from_file
from pdbmender.utils import identify_tit_sites

from pypka.log import checkDelPhiErrors
from pypka.clean.checksites import (
    check_sites_integrity,
    make_delphi_inputfile,
    fix_fixed_sites,
)
from pypka.clean.cleaning import cleanPDB, inputPDBCheck
from pypka.clean.pdb_out import write_output_structure
from pypka.concurrency import (
    runDelPhiSims,
    runInteractionCalcs,
    startPoolProcesses,
)
from pypka.config import Config
from pypka.constants import (
    KBOLTZ,
    TITRABLETAUTOMERS,
    TERMINAL_OFFSET,
    INSERTIONCODE_OFFSET,
)
from pypka.molecule import Molecule
from pypka.mc.run_mc import MonteCarlo
from pdbmender.utils import correct_insertion_codes

import logging

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(levelname)s: %(message)s")
ch.setFormatter(formatter)
logger.addHandler(ch)


[docs] class Titration: """ Main PypKa class Attributes: params (Config): method parameters molecules (dict): titrating molecules ordered by chain """ def __init__( self, parameters, sites="all", fixed_sites=None, debug=False, run="all" ): """ Runs the pKa prediction Args: parameters (dict): input parameters to overwrite the default configuration. sites (dict, optional): tuple titrable residue numbers indexed by chain. Defaults to 'all' which will titrate all titrable residues. debug (boolean, optional): debug mode switch. Defaults to False. """ self.molecules = {} self.__parameters = Config.storeParams(self, debug, parameters, sites) self.pKas = {} self.isoelectric_point = None self.isoelectric_point_limit = None if ( Config.pypka_params["load_mc_energies"] or Config.pypka_params["load_mc_energies_dict"] ): self.mc_only_run() else: self.normal_run(sites, fixed_sites, run) print("Results") print(self) def normal_run(self, sites, fixed_sites, run): print("Start Preprocessing") self.preprocessing(sites, fixed_sites) self.processDelPhiParams() if run == "preprocess": return print("Start PB Calculations") self.DelPhiLaunch() if run == "PB": return print("Calculating site-site interactions") self.calcSiteInteractionsParallel() print("Start MC", end="\r") self.run_mc() if Config.pypka_params["f_structure_out"]: sites = self.get_all_sites(get_list=True) write_output_structure(sites, self.molecules, self.delphi_input_content) def mc_only_run(self): if Config.pypka_params["load_mc_energies"]: with open(Config.pypka_params["load_mc_energies"]) as f: energies_dict = json.load(f) else: energies_dict = Config.pypka_params["load_mc_energies_dict"] Config.parallel_params.npossible_states = energies_dict["npossible_states"] Config.parallel_params.possible_states_g = energies_dict["possible_states_g"] Config.parallel_params.states_ddG = energies_dict["states_ddG"] Config.parallel_params.possible_states_occ = energies_dict[ "possible_states_occ" ] Config.parallel_params.interactions = energies_dict["interactions"] Config.parallel_params.interactions_look = energies_dict["interactions_look"] sites = {} chains_res = {} for site in energies_dict["all_sites"]: chain = site[0] resname = site[2:5] resnumb = int(site[6:]) if resnumb > TERMINAL_OFFSET: resnumb -= TERMINAL_OFFSET resnumb = str(resnumb) if chain not in sites: sites[chain] = [] sites[chain].append(resnumb) if chain not in chains_res: chains_res[chain] = {} chains_res[chain][resnumb] = resname self.create_molecule(sites, {}) for chain, molecule in self.molecules.items(): molecule.loadSites(chains_res[chain]) Config.parallel_params.all_sites = self.get_all_sites(get_list=True) self.run_mc() def preprocessing(self, sites, fixed_sites): def create_tit_sites(fname, chains_res): for _, molecule in self.molecules.items(): molecule.deleteAllSites() check_sites_integrity(fname, self.molecules, chains_res) if fixed_sites: for chain, fixed_sites_info in fixed_sites.items(): if chain not in sites: sites[chain] = [] for sitenumb in fixed_sites_info.keys(): sites[chain].append(sitenumb) f_in = "input.pdb" if Config.pypka_params["f_in_extension"] == "gro": groname = Config.pypka_params["f_in"] gro2pdb(groname, f_in) box = get_grobox_size(groname) Config.pypka_params.setBox(box) Config.pypka_params.redefine_f_in(f_in) renumbered = correct_insertion_codes( Config.pypka_params["f_in"], f_in, insertion_offset=INSERTIONCODE_OFFSET, replace_empty_chain=False, ) if Config.pypka_params["f_in_extension"] != "pdb": f_format = Config.pypka_params["f_in_extension"] raise Exception( "{0} file format is not currently supported".format(f_format) ) automatic_sites = self.create_molecule(sites, renumbered) # f_in = Config.pypka_params["f_in"] if not automatic_sites: # Reading .st files # If the titrable residues are defined clean_pdb = Config.pypka_params["clean_pdb"] _, chains_res = inputPDBCheck(f_in, sites, clean_pdb) for chain, molecule in self.molecules.items(): molecule.loadSites(chains_res[chain]) else: # If the titrable residues are not defined chains_res = identify_tit_sites( f_in, self.molecules.keys(), nomenclature=Config.pypka_params["ffinput"], add_ser_thr=Config.pypka_params["ser_thr_titration"], ) self.assign_residues_to_molecule(chains_res) if Config.pypka_params["clean_pdb"]: # Creates a .pdb input for DelPhi # where residues are in their standard state f_out = "TMP.pdb" chains_res = cleanPDB( f_in, f_out, self.molecules, chains_res, automatic_sites ) if not Config.debug and os.path.isfile(f_in): os.remove(f_in) f_in = f_out create_tit_sites(f_in, chains_res) f_out = "delphi_in_stmod.pdb" self.sequence = make_delphi_inputfile(f_in, f_out, self.molecules, fixed_sites) if fixed_sites: fix_fixed_sites(self.molecules, fixed_sites, f_out) if not Config.debug and os.path.isfile(f_in): os.remove(f_in) def create_molecule(self, sites, renumbered): # Creating instance of TitratingMolecule automatic_sites = False if sites == "all": f_in = Config.pypka_params["f_in"] chains = get_chains_from_file(f_in) sites = {chain: ["all"] for chain in chains} for chain, site_list in sites.items(): if site_list == ["all"]: site_list = "all" else: site_list = [str(site) for site in site_list] sites[chain] = site_list renumbered_chain = renumbered[chain] if chain in renumbered.keys() else {} self.molecules[chain] = Molecule(chain, site_list, renumbered_chain) if site_list == "all": automatic_sites = True return automatic_sites def assign_residues_to_molecule(self, chains_res): if not chains_res: f_in = Config.pypka_params["f_in"] raise Exception("Not one titrable residue was found in {}".format(f_in)) for chain, residues in chains_res.items(): molecule = self.molecules[chain] for resnumb, resname in residues.items(): if resname in ("NTR", "CTR"): resnumb = int(resnumb) if resname == "NTR": molecule.NTR += [resnumb] elif resname == "CTR": molecule.CTR += [resnumb] resnumb += TERMINAL_OFFSET sID = molecule.addSite(resnumb) ntautomers = TITRABLETAUTOMERS[resname] molecule.addTautomers(sID, ntautomers, resname) for molecule in self.molecules.values(): # Adding the reference tautomer to each site molecule.addReferenceTautomers() # Assigning a charge set to each tautomer molecule.addTautomersChargeSets() def get_total_atoms(self): return sum(molecule.getNAtoms() for molecule in self.molecules.values()) def get_all_sites(self, get_list=False): sites = [] if get_list else {} for chain, molecule in self.molecules.items(): chain_sites = molecule.getSitesOrdered() if get_list: # if isinstance(chain_sites, dict): # chain_sites = chain_sites.values() sites += chain_sites else: sites[chain] = chain_sites return sites def get_molecule_sites(self): sites = {} for chain, molecule in self.molecules.items(): chain_sites = molecule.getSites() sites[chain] = chain_sites return sites def get_total_sites(self): return sum(len(sites) for sites in self.get_all_sites().values()) def processDelPhiParams(self): # Storing DelPhi parameters and Creates DelPhi data structures logfile = "LOG_readFiles" delphimol = DelPhi4py( Config.pypka_params["f_crg"], Config.pypka_params["f_siz"], "delphi_in_stmod.pdb", igrid=Config.delphi_params["gsize"], scale=Config.delphi_params["scaleM"], precision=Config.delphi_params["precision"], epsin=Config.delphi_params["epsin"], epsout=Config.delphi_params["epssol"], conc=Config.delphi_params["ionicstr"], ibctyp=Config.delphi_params["bndcon"], res2=Config.delphi_params["maxc"], nlit=Config.delphi_params["nlit"], nonit=Config.delphi_params["nonit"], relfac=0.0, relpar=0.0, pbx=Config.delphi_params["pbx"], pby=Config.delphi_params["pby"], isurftype=Config.delphi_params["nanoshaper"], debug=Config.debug, outputfile=logfile, ) if Config.pypka_params["f_structure_out"]: with open("delphi_in_stmod.pdb") as f: self.delphi_input_content = f.readlines() if Config.pypka_params["save_pdb"]: with open("delphi_in_stmod.pdb") as f, open( Config.pypka_params["save_pdb"], "w" ) as f_new: content = f.read() f_new.write(content) if not Config.debug: os.remove("delphi_in_stmod.pdb") checkDelPhiErrors(logfile, "readFiles") lookup_atoms = {} for molecule in Config.titration.molecules.values(): for atom_name, atom_id, atom_position in molecule.iterAtoms(): lookup_atoms[atom_position] = (atom_id, atom_name) # Stores DelPhi4py object and attributes Config.delphi_params.store_run_params(delphimol) Config.delphi_params.lookup_atoms = lookup_atoms if Config.debug: for chain, molecule in self.molecules.items(): print("### CHAIN {chain} ###".format(chain=chain)) molecule.printAllSites() molecule.printAllTautomers() print(delphimol)
[docs] def iterAllSitesTautomers(self): """ Generator that iterates through all Tautomer instances. The iteration is sorted by site and within each site the first to be yielded is the reference tautomer and then the rest of the tautomers by order """ for molecule in self.molecules.values(): for site in molecule.sites_order: yield site.ref_tautomer for tautomer in sorted(site.tautomers.values()): yield tautomer
[docs] def calcpKint(self, unpacked_results): """Calculation the pKint of all tautomers.""" i = -1 if Config.debug: print("############ results ############") pkints = "" contributions = "" for tautomer in self.iterAllSitesTautomers(): i += 1 core_index = i % Config.pypka_params["ncpus"] job_index = int(i / Config.pypka_params["ncpus"]) result = unpacked_results[core_index][job_index] tautname = result[0] # tautresnumb = result[1] esolvationM = result[2] sitpotM = result[3] esolvationS = result[4] sitpotS = result[5] tautomer = Config.parallel_params.all_tautomers_order[i] tautomer.saveDelPhiResults(esolvationS, sitpotS, esolvationM, sitpotM) if Config.debug: print("### new tautomer ###") print( ( i, core_index, job_index, tautname, tautomer.name, esolvationM, esolvationS, ) ) print( ( tautomer.name, tautomer.esolvationS, len(tautomer.sitpotS), tautomer.esolvationM, len(tautomer.sitpotM), ) ) tautomer.calcBackEnergy() if not tautomer.isRefTautomer(): tautomer.calcpKint() if Config.debug: print(("pkint", tautomer.name, tautomer.dg, id(tautomer))) pkints += "{} {} {} {} {}\n".format( tautomer.site.molecule.chain, tautomer.name, tautomer.site.res_number, tautomer.dg, tautomer.pKint, ) contributions += "{}{} {} {} {} {}\n".format( tautomer.site.res_number, tautomer.name, tautomer.dG_solvationM, tautomer.dG_solvationS, tautomer.dG_solvationM - tautomer.dG_solvationS, tautomer.dG_back, ) if Config.debug: with open("pkint", "w") as f_new1, open("contributions", "w") as f_new2: f_new1.write(pkints) f_new2.write(contributions)
[docs] def calcSiteInteractionsParallel(self): """ Calculates the pairwise interaction energies Interactions are calculated using a pool of processes and written in a formatted .dat file Args: ncpus (int): number of cpus to be used """ def writeDatHeader(sites): """Writes pKint energies in .dat file header.""" to_write = "{0}\n".format(len(sites)) for site in sites: to_write += "{0:3s}-{1:<7}{2:>2} P *\n".format( site.res_name, site.res_number, len(site.tautomers) + 1 ) if site.type == "c": tau_prot_state = 0 ref_prot_state = 1 elif site.type == "a": tau_prot_state = 1 ref_prot_state = 0 for tautomer in site.iterOrderedTautomersWithoutRef(): to_write += "{0:1d} {1:13.6e}\n".format(tau_prot_state, tautomer.dg) to_write += "{0:1d} 0.000000e+00\n".format(ref_prot_state) if Config.debug: with open("interactions.dat", "w") as f_new: f_new.write(to_write) Config.loadParams(self.__parameters) sites = self.get_all_sites(get_list=True) if Config.debug: writeDatHeader(sites) counter = 0 site_interactions = [] for site1 in sites[:-1]: counter += 1 for site2 in sites[counter:]: site_interactions.append((site1, site2)) npossible_states = [len(site.tautomers) + 1 for site in sites] Config.parallel_params.npossible_states = npossible_states interactions = [] for nstate1 in range(sum(npossible_states)): interactions.append([]) for _ in range(sum(npossible_states)): interactions[nstate1].append(-999999) interactions_look = [] aux = -1 site = -1 for nstates in npossible_states: site += 1 interactions_look.append([]) for _ in range(nstates): aux += 1 interactions_look[site].append(aux) Config.parallel_params.interactions_look = interactions_look Config.parallel_params.site_interactions = site_interactions Config.parallel_params.all_sites = sites Config.parallel_params.interactions = interactions ncpus = min(len(site_interactions), Config.pypka_params["ncpus"]) results = [] if ncpus > 0: results = startPoolProcesses( runInteractionCalcs, site_interactions, ncpus, assign="ordered", merged_results=True, ) interactions = Config.parallel_params.interactions to_write = "" temperature = float(Config.pypka_params["temp"]) for interaction in results: site1i = interaction[0] site2i = interaction[1] if interactions[site1i][site2i] != -999999: interactions[site1i][site2i] += interaction[2] interactions[site2i][site1i] += interaction[2] interactions[site1i][site2i] /= 2 interactions[site2i][site1i] /= 2 if Config.debug: col1 = interaction[3][6:18] col2 = interaction[3][:6] col3 = interactions[site1i][site2i] * (KBOLTZ * temperature) to_write += "{0}{1} {2:13.6e}\n".format(col1, col2, col3) else: interactions[site1i][site2i] = interaction[2] interactions[site2i][site1i] = interaction[2] if Config.debug: with open("interactions.dat", "a") as f_new: f_new.write(to_write)
def DelPhiLaunch(self): Config.loadParams(self.__parameters) if self.get_total_sites() < 1: raise Exception("At least one site has to be correctly defined.") all_tautomers = list(self.iterAllSitesTautomers()) Config.parallel_params.all_tautomers_order = all_tautomers # Runs DelPhi simulations for all tautomers results = startPoolProcesses( runDelPhiSims, all_tautomers, Config.pypka_params["ncpus"] ) print("\rPB Runs Ended{:>80}".format("")) # Calculates the pKint of all tautomers print("Calculating intrinsic pK values") self.calcpKint(results) def run_mc(self): Config.loadParams(self.__parameters) sites = self.get_all_sites(get_list=True) mc = MonteCarlo(sites) text_pks, self.tit_curve = (mc.text_pks, mc.total_tit_curve) print("\rMC Runs Ended{:>80}\n".format("")) if Config.pypka_params["isoelectric_point"]: self.getIsoelectricPoint() if self.isoelectric_point: text_pks += "\nPredicted Isoelectric Point: {0}\n".format( self.isoelectric_point ) elif self.isoelectric_point_limit: pH, limit, charge = self.isoelectric_point_limit text_pks += "\nPredicted Isoelectric Point: {0} Limit: {1} \nCharge at pH {0}: {2}\n".format( pH, limit, charge ) if Config.pypka_params["f_out"]: with open(Config.pypka_params["f_out"], "w") as f: f.write(text_pks) if Config.pypka_params["f_prot_out"]: with open(Config.pypka_params["f_prot_out"], "w") as f: f.write(mc.text_prots) if Config.pypka_params["coupled_sites_output"]: with open(Config.pypka_params["coupled_sites_output"], "w") as f: f.write(mc.text_coupled_sites) def getTotalProtonationCurve(self): return self.tit_curve def getTitrationCurve(self): if Config.pypka_params["sites"] != "all": warn_message = "The titration curve weighted by charges can only be calculated when titrating all sites." raise Exception(warn_message) sites = self.get_all_sites(get_list=True) nsites = len(sites) total_anionic_titres = sum(site.type == "a" for site in sites) total_arg_charge = 0 for _, aas in self.sequence.items(): for _, resname in aas.items(): if resname == "ARG": total_arg_charge += 1 titration_curve = {} for pH, prot in self.tit_curve.items(): prot = prot * nsites charge = prot - total_anionic_titres + total_arg_charge titration_curve[pH] = charge return titration_curve def getIsoelectricPoint(self): def calc_isoelectric_point(): # anionic_aas = ("ASP", "GLU", "CYS", "TYR", "CTR") # cationic_aas = ("LYS", "ARG", "HIS", "NTR") titration_curve = list(self.getTitrationCurve().items()) i = 0 pH, prot = titration_curve[i] while prot > 0.0 and i < len(titration_curve) - 1: i += 1 pH, prot = titration_curve[i] if i in [0, len(titration_curve) - 1]: charge = round(prot, 2) warn_message = ( "Not enough points to interpolate the Isoelectric Point\n" "Lowest pH value: {0}\t" "Charge: {1}".format(pH, charge) ) print(warn_message) limit = "-" if i == 0 else "+" return pH, limit, charge else: last_pH, last_prot = titration_curve[i - 1] isoelectric_point = pH - (prot * (last_pH - pH)) / (last_prot - prot) return isoelectric_point if not self.isoelectric_point: result = calc_isoelectric_point() if isinstance(result, tuple): self.isoelectric_point_limit = result else: self.isoelectric_point = result else: result = self.isoelectric_point return result def __iter__(self): self.itersites = self.get_all_sites(get_list=True) self.iternumb = -1 self.itermax = len(self.itersites) return self def __next__(self): self.iternumb += 1 if self.iternumb < self.itermax: return self.itersites[self.iternumb] else: raise StopIteration
[docs] @staticmethod def getParameters(): """Get the parameters used in the calculations.""" return "{}\n{}\n{}".format( Config.pypka_params.__str__(), Config.delphi_params.__str__(), Config.mc_params.__str__(), )
@staticmethod def getParametersDict(): return ( Config.pypka_params.get_clean_params(), Config.delphi_params.get_clean_params(), Config.mc_params.get_clean_params(), ) def getSiteInteractions(self): Config.loadParams(self.__parameters) return ( Config.parallel_params.all_sites, Config.parallel_params.npossible_states, Config.parallel_params.interactions_look, Config.parallel_params.interactions, ) def __getitem__(self, chain): return self.molecules[chain] def __str__(self, str_format="print"): if str_format == "print": output = "Chain Site Name pK" elif str_format == "file": output = "" sites = self.get_all_sites() for chain in sites.keys(): for site in sites[chain]: pk = site.pK if pk and str_format == "print": pk = "{:.2f}".format(round(pk, 2)) elif not pk: pk = "Not In Range" output += "\n{0:>4} {1:>6} {2:3} {3:>5}".format( chain, site.getResNumber(correct_icode=True), site.res_name, pk ) if self.isoelectric_point: output += "\n\nPredicted Isoelectric Point: {0}\n".format( self.isoelectric_point ) return output
def getTitrableSites(pdb, ser_thr_titration=True, debug=False): """ Gets the all titrable sites from the pdb Returns an input structure to the Titration class containing all titrable sites found in the pdb file. Args: pdb (str): The filename a PDB file Returns: A dict mapping all titrable sites found in the pdb file """ parameters = { "structure": pdb, "ncpus": 1, "epsin": 15, "ser_thr_titration": ser_thr_titration, } Config.storeParams("", debug, parameters, "all") chains = get_chains_from_file(pdb) sites = {chain: "all" for chain in chains} chains_res = identify_tit_sites(pdb, chains) out_sites = {chain: [] for chain in chains_res.keys()} for chain, sites in chains_res.items(): for resnumb, resname in sites.items(): out_site = resnumb if resname == "NTR": out_site += "N" elif resname == "CTR": out_site += "C" out_sites[chain].append(out_site) return out_sites, chains_res