Pythonバイオ? Pythonバイオ/ツール?
45   2019-05-29 (水) 12:12:28

Pythonによるパイプライン制御(シェル機能)

組み立ててみた

#!/usr/bin/env python3

#プログラムバイナリの存在確認
import subprocess
def checkcommand(command):
    out = subprocess.run(['which', command], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    if out.returncode != 0:
        logging.error('command ' + command + 'not accessible.')
        logging.error(out.stderr.decode('utf8'))
        return(out.returncode)
    else:
        logging.info(out.stdout.decode('utf8').strip() + 'found')
        return(out.returncode)

#ファイル存在確認
import os
def checkfile(fname):
    return(os.path.exists(fname))

def do_fastqc(rd):
    if checkfile(rd + '_fastqc.html'):  # 存在すれば
        logging.info('fastqc: ' + rd + '_fastqc.html exists. Skipping fastqc for ' + rd)
        return(0)
    else:
        out = subprocess.run(['fastqc', '--nogroup', rd+'.fastq.gz'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        print(out.stderr.decode('utf8').strip())
        if out.returncode != 0:
            logging.error('fastqc ' + rd+'.fastq.gz' + ' failed.  ' + out.stderr.decode('utf8').strip())
            return(out.returncode)
        else:
            logging.info('fastqc ' + rd+'.fastq,gz' + ' completed.  ' + out.stdout.decode('utf8').strip())
            return(0)

def do_trimmomatic(rd_full):
    (readdata_dir, rd) = rd_full
    if checkfile(readdata_dir + 'paired_' + rd + '_R1_001.trim.fastq'):  # 存在すれば
        print('trimmologic: ' + readdata_dir + 'paired_' + rd + '_R1_001.trim.fastq exists. Skipping trimmomatic for ' + rd  + '_R1_001')
        logging.info('trimmologic: ' + readdata_dir + 'paired_' + rd + '_R1_001.trim.fastq exists. Skipping trimmomatic for ' + rd  + '_R1_001')
        return(0)
    elif checkfile(readdata_dir + 'paired_' + rd + '_R2_001.trim.fastq'):  # 存在すれば
        print('trimmologic: ' + readdata_dir + 'paired_' + rd + '_R2_001.trim.fastq exists. Skipping trimmomatic for ' + rd  + '_R2_001')
        logging.info('trimmologic: ' + readdata_dir + 'paired_' + rd + '_R2_001.trim.fastq exists. Skipping trimmomatic for ' + rd  + '_R2_001')
        return(0)
    else:
        # num of threads <=16.  大きいと途中でハングする
        paramstr = 'java -jar -Xmx512m /usr/local/Trimmomatic/trimmomatic-0.38.jar PE -threads 16 -phred33'
        paramstr += ' -trimlog trimmomatic_log_' + rd + '.txt'
        paramstr += ' ' + readdata_dir + rd + '_R1_001.fastq.gz' + ' ' + readdata_dir + rd + '_R2_001.fastq.gz' # input files
        paramstr += ' ' + readdata_dir + 'paired_' + rd + '_R1_001.trim.fastq' + \
                  ' ' + readdata_dir + 'unpaired_' + rd + '_R1_001.trim.fastq' + \
                  ' ' + readdata_dir + 'paired_' + rd + '_R2_001.trim.fastq' + \
                  ' ' + readdata_dir + 'unpaired_' + rd + '_R2_001.trim.fastq'    # input files
        paramstr += ' ' + 'ILLUMINACLIP:'+'/home/yamanouc/src/RNAseq-Saccha/Saccha/TruSeq3-PE-2.fa:2:30:10' + \
                  ' ' + 'LEADING:20 TRAILING:20' + \
                  ' ' + 'SLIDINGWINDOW:4:15 MINLEN:36'
        paramlist = paramstr.split(' ')
        print(paramlist)
        out = subprocess.run(paramlist, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        print(out.args)
        print(out.stderr.decode('utf8').strip())
        if out.returncode != 0:
            logging.error('trimmomatic ' + rd+'.fastq' + ' failed.  ' + out.stderr.decode('utf8').strip())
            return(out.returncode)
        else:
            logging.info('trimmomatic ' + rd+'.fastq' + ' completed.  ' + out.stdout.decode('utf8').strip())
            return(0)

def do_hisat2(rd_full2):
    (readdata_dir, rd, reference_seq, reference_build_seq) = rd_full2
    if checkfile(readdata_dir+rd+'.sam'):  # 存在すれば
        print('hisat2: ' + readdata_dir+rd+'.sam' + ' exists. Skipping hisat2 for ' + rd)
        logging.info('hisat2: ' + readdata_dir+rd+'.sam' + ' exists. Skipping hisat2 for ' + rd)
        return(0)
    #hisat2 -p 32 -x s288c.fna -1 paired_SRR453566_1.trim.fastq -2 paired_SRR453566_2.trim.fastq -S SRR453566.sam
    paramlist = ['hisat2', '-p', '32', '-x', reference_seq, '-1', readdata_dir+'paired_'+rd+'_R1_001.trim.fastq', \
                '-2', readdata_dir+'paired_'+rd+'_R2_001.trim.fastq', '-S', readdata_dir+rd+'.sam']
    out = subprocess.run(paramlist, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    print(out.args)
    print(out.stderr.decode('utf8').strip())
    if out.returncode != 0:
        logging.error(' '.join(paramlist) + ' failed.  ' + out.stderr.decode('utf8').strip())
        return(out.returncode)
    else:
        logging.info(' '.join(paramlist) + ' completed.  ' + out.stdout.decode('utf8').strip())
        return(0)

def do_sam2bam2sort(rd_full):
    (readdata_dir, rd) = rd_full
    if checkfile(readdata_dir + rd + '.bam'):  # bamファイルが存在すれば処理をスキップ
        print('samtools view: ' + readdata_dir + rd + '.sam exists. Skipping samtools view for ' + rd)
        logging.info('samtools view: ' + readdata_dir + rd + '.sam exists. Skipping samtools view for ' + rd)
    else:
        paramstr = 'samtools view -@ 32 -b ' + readdata_dir+rd+'.sam ' + '-o ' + readdata_dir+rd+'.bam'
        paramlist = paramstr.split(' ')
        #out = subprocess.run(paramlist, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=readdata_dir)
        out = subprocess.run(paramlist, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        print(out.args)
        print(out.stderr.decode('utf8').strip())
        if out.returncode != 0:
            logging.error('samtools view ' + rd+'.sam' + ' failed.  ' + out.stderr.decode('utf8').strip())
            return(out.returncode)
        else:
            logging.info('samtools view ' + rd+'.sam' + ' completed.  ' + out.stdout.decode('utf8').strip())
                    
    if checkfile(readdata_dir + rd + '.sorted.bam'):  # 存在すれば
        print('samtools sort: ' + readdata_dir + rd + '.sorted.bam exists. Skipping samtools sort for ' + rd)
        logging.info('samtools sort: ' + readdata_dir + rd + '.sorted.bam exists. Skipping samtools sort for ' + rd)
    else:
        paramstr = 'samtools sort -@ 32 ' + '-o ' + readdata_dir+rd+'.sorted.bam ' + readdata_dir+rd+'.bam'
        paramlist = paramstr.split(' ')
        #out = subprocess.run(paramlist, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=readdata_dir)
        out = subprocess.run(paramlist, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        print(out.args)
        print(out.stderr.decode('utf8').strip())
        if out.returncode != 0:
            logging.error('samtools sort ' + rd+'.bam' + ' failed.  ' + out.stderr.decode('utf8').strip())
            return(out.returncode)
        else:
            logging.info('samtools sort ' + rd+'.bam' + ' completed.  ' + out.stdout.decode('utf8').strip())

def do_mpileup(rd_full):
    (readdata_dir, pileup_set, reference_seq) = rd_full
    refname = os.path.basename(reference_seq)
    if checkfile(readdata_dir + refname[:-4] + '.vcf'):  # vcfファイルが存在すれば処理をスキップ
        print('samtools mpileup: ' + readdata_dir + refname[:-4] + '.vcf exists. Skipping samtools mpileup for ' + refname[:-4])
        logging.info('samtools mpileup: ' + readdata_dir + refname[:-4] + '.vcf exists. Skipping samtools mpileup for ' + refname[:-4])
    else:
        bamfiles = ' '.join([readdata_dir+u+'.sorted.bam' for u in pileup_set])
        ## paramstr = 'samtools mpileup -uvf ' + reference_seq + ' ' + bamfiles + ' -o ' + readdata_dir+refname[:-4]+'.vcf'
        paramstr = 'bcftools mpileup -Ou -f ' + reference_seq + ' ' + bamfiles + ' -o ' + readdata_dir+refname[:-4]+'.vcf.gz'
        print(paramstr)
        paramlist = paramstr.split(' ')
        out = subprocess.run(paramlist, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=readdata_dir)
        out = subprocess.run(paramlist, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        print(out.args)
        print(out.stderr.decode('utf8').strip())
        if out.returncode != 0:
            #logging.error('samtools mpileup ' + refname[:-4] + 'vcf' + ' failed.  ' + out.stderr.decode('utf8').strip())
            logging.error('bcftools mpileup ' + refname[:-4] + 'vcf' + ' failed.  ' + out.stderr.decode('utf8').strip())
            return(out.returncode)
        else:
            #logging.info('samtools mpileuup ' + refname[:-4] + 'vcf' + ' completed.  ' + out.stdout.decode('utf8').strip())
            logging.info('bcftools mpileuup ' + refname[:-4] + 'vcf' + ' completed.  ' + out.stdout.decode('utf8').strip()) 
   
##############################
def main():

    logging.info('start')
    # プログラムの存在確認
    programs_to_use = ['fastqc', 'java', 'hisat2-build', 'hisat2', 'samtools', 'bcftools', 'featureCounts' ]
    for cmd in programs_to_use:
        if checkcommand(cmd)!=0:
            logging.error(cmd + 'not found (not on the PATH)')

    pfiles_to_use = ['/usr/local/Trimmomatic/trimmomatic-0.38.jar', '/usr/local/RNAseq/add_gene_id',\
                     '/home/yamanouc/src/RNAseq-Saccha/Saccha/TruSeq3-PE-2.fa']
    for pfile in pfiles_to_use:
        if not checkfile(pfile):
            logging.error(pfile + ' not found')
            return(-1)
        else:
            logging.info(pfile + ' found')
    print('programs exist')
    
    # データ設定
    #readdata_dir = '/home/yamanouc/src/biopython/Saccha/'
    #readdata_dir = '/home/yamanouc/1710JNHX-0008/rawdata/'
    readdata_dir = '/home/yamanouc/KishimotoRNA/'
    #readdata_set = ['SRR453568']  # ファイル名は ***_{1,2}.fasta であること
    #readdata_set = ['Anc','1_2-1', '1_2-2','2_2-1', '2_2-2','2_5-1','2_5-1-7A','2_6-2','2_6-2-10E']
    readdata_set = [\
        'Kishimoto_01_43B_S1', \
        'Kishimoto_02_45a_plus_S2', \
        'Kishimoto_03_45a_minus_S3', \
        'Kishimoto_04_45b_plus_S4', \
        'Kishimoto_05_45b_minus_S5', \
        'Kishimoto_06_45c_plus_S6', \
        'Kishimoto_07_45c_minus_S7', \
        'Kishimoto_08_45A_plus_S8', \
        'Kishimoto_09_45A_minus_S9', \
        'Kishimoto_10_45L_S10' ]
    #reference_seq = '/home/yamanouc/src/biopython/Saccha/s288c.fna'
    #reference_gff = '/home/yamanouc/src/biopython/Saccha/s288c.gff'
    reference_seq = readdata_dir + 'AP012030.fna'
    reference_gff = readdata_dir + 'AP012030.gff'

    # step-1 クオリティチェック
    readdata12full = [ readdata_dir + u + '_R1'+'_001' for u in readdata_set] + \
        [readdata_dir + u + '_R2'+'_001' for u in readdata_set]

    numproc = len(readdata12full)
    #print(numproc, readdata12full)
    with Pool(numproc) as p:
        ret = p.map(do_fastqc, readdata12full)
    
    # step-2 トリミング
    #   trimmmomatic中での並列とpythonでのmultiprocessの並列は効率よく両立していないらしい。
    #   仕方なく、python中では直列ループ、trimmomaticの中で並列を稼ぐことにする
    #readdata_full = [[readdata_dir, u] for u in readdata_set]
    #numproc = len(readdata_full)
    #print(numproc, readdata_full)
    #with Pool(numproc) as p:
    #    ret = p.map(do_trimmologic, readdata_full)
    #
    for rd in readdata_set:
        do_trimmomatic((readdata_dir, rd))

    # step-3 マッピング
    # 初めに、reference-build.fnaを作る
    reference_build_seq = reference_seq[:-4] + '.fna.1.ht2'
    if checkfile(reference_build_seq):  # 存在すれば
        print('hisat2-build: ' + reference_build_seq + ' exists. Skipping hisat2-build for ' + rd)
        logging.info('hisat2-build: ' + reference_build_seq + ' exists. Skipping hisat2-build for ' + rd)
    else:
        paramlist = ['hisat2-build',  reference_seq, reference_seq[:-4]+'.fna']
        out = subprocess.run(paramlist, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        print(out.args)
        print(out.stderr.decode('utf8').strip())
        if out.returncode != 0:
            logging.error('hisat2-build' + reference_seq + reference_seq[:-4]+'.fna' + ' failed.' + \
                          out.stderr.decode('utf8').strip())
            return(out.returncode)
        else:
            logging.info('hisat2-build' + reference_seq + reference_seq[:-4]+'.fna' + ' completed.' + \
                         out.stdout.decode('utf8').strip())
   
    # hisat2でマッピングを行う
    for rd in readdata_set:
        do_hisat2((readdata_dir, rd, reference_seq, reference_build_seq))

    # step-4 SAM-BAM-sort
    for rd in readdata_set:
        do_sam2bam2sort((readdata_dir, rd))
        
    '''      
    # step-5 mpileup
    mpileup_set = ['SRR453568']
    do_mpileup((readdata_dir, mpileup_set, reference_seq))
    '''
   
    # Step-5 featureCounts
    # GFF preprocessing
    reference_gff = reference_seq[:-4]+'.gff'
    extended_reference_gff = reference_seq[:-4]+'_e.gff'
    if checkfile(extended_reference_gff):  # 存在すれば
        print('add_gene_id: ' + extended_reference_gff + ' exists. Skipping add_gene_id for ' + reference_seq)
        logging.info('add_gene_id: ' + reference_gff + ' exists. Skipping add_gene_id for ' + reference_seq)
    else:
        paramlist = ['python', '/usr/local/RNAseq/add_gene_id', reference_gff, extended_reference_gff]
        out = subprocess.run(paramlist, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        print(out.args)
        print(out.stderr.decode('utf8').strip())
        if out.returncode != 0:
            logging.error('add_gene_id ' + reference_gff + ' failed.' + \
                          out.stderr.decode('utf8').strip())
            return(out.returncode)
        else:
            logging.info('add_gene_id ' + reference_gff + ' completed.' + \
                         out.stdout.decode('utf8').strip())
   
    # FeatureCounts
    extended_reference_gff = reference_seq[:-4]+'_e.gff'
    bamlist = [readdata_dir + u + '.sorted.bam' for u in readdata_set]
    paramlist = ['featureCounts', '-p', '-T', '32', '-t', 'gene', '-g', 'gene_id', \
                 '-a', extended_reference_gff, '-o', readdata_dir+'counts.txt']

    if checkfile(readdata_dir + '_counts.txt'):  # 存在すれば
        print('featureCount: ' + readdata_dir + '_counts.txt' + ' exists. Skipping featureCount for ' + readdata_dir + '_counts.txt')
        logging.info('featureCount: ' + readdata_dir + '_counts.txt' + ' exists. Skipping featureCount for ' + readdata_dir + '_counts.txt')
    else:
        paramlist.extend(bamlist)
        print(paramlist)
        out = subprocess.run(paramlist, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        print(out.args)
        print(out.stderr.decode('utf8').strip())
        if out.returncode != 0:
            logging.error('featureCounts ' + readdata_dir + ' failed.' + \
                      out.stderr.decode('utf8').strip())
            return(out.returncode)
        else:
            logging.info('featureCounts ' + readdata_dir + ' completed.' + \
                     out.stdout.decode('utf8').strip())       
        
# main starts here 
import logging
from multiprocessing import Pool
import os, datetime
dt = datetime.datetime.now()
#logfname = 'RNAseq'+dt.strftime('%Y%m%d%H%M%S')+'.log'
logfname = 'RNAseq'+dt.strftime('%Y%m%d')+'.log'
if os.path.exists(logfname):
    os.remove(logfname)
logging.basicConfig(filename=logfname, format='%(levelname)s: %(message)s', level=logging.INFO)

if __name__ == "__main__":
    main()
    print('processing complete')
    
logging.info('processing complete')
logging.shutdown()

トップ   編集 凍結 差分 バックアップ 添付 複製 名前変更 リロード   新規 一覧 単語検索 最終更新   ヘルプ   最終更新のRSS
Last-modified: 2019-05-29 (水) 12:12:28 (109d)