Mercurial > repos > vandelj > giant_hierarchical_clustering
comparison src/General_functions.py @ 0:14045c80a222 draft
"planemo upload for repository https://github.com/juliechevalier/GIANT/tree/master commit cb276a594444c8f32e9819fefde3a21f121d35df"
| author | vandelj | 
|---|---|
| date | Fri, 26 Jun 2020 09:38:23 -0400 | 
| parents | |
| children | ccca6ad98f78 | 
   comparison
  equal
  deleted
  inserted
  replaced
| -1:000000000000 | 0:14045c80a222 | 
|---|---|
| 1 import re | |
| 2 import numpy as np | |
| 3 | |
| 4 def get_column_names( file_path, toNotConsider=-1, each=1): | |
| 5 options=[] | |
| 6 inputfile = open(file_path) | |
| 7 firstLine = next(inputfile).strip().split("\t") | |
| 8 cpt=0 | |
| 9 for i, field_component in enumerate( firstLine ): | |
| 10 if i!=toNotConsider:#to squeeze the first column | |
| 11 if cpt==0: | |
| 12 options.append( ( field_component, field_component, False ) ) | |
| 13 cpt+=1 | |
| 14 if cpt==each: | |
| 15 cpt=0 | |
| 16 inputfile.close() | |
| 17 return options | |
| 18 | |
| 19 def get_column_names_filteredList( file_path, toNotConsider=[], each=1): | |
| 20 options=[] | |
| 21 inputfile = open(file_path) | |
| 22 firstLine = next(inputfile).strip().split("\t") | |
| 23 cpt=0 | |
| 24 for i, field_component in enumerate( firstLine ): | |
| 25 if i not in toNotConsider:#to squeeze the first columns | |
| 26 if cpt==0: | |
| 27 options.append( ( field_component, field_component, False ) ) | |
| 28 cpt+=1 | |
| 29 if cpt==each: | |
| 30 cpt=0 | |
| 31 inputfile.close() | |
| 32 return options | |
| 33 | |
| 34 def get_column_names_mergeNumber(file_path, numberToMerge=1, toNotConsider=[]): | |
| 35 options=[] | |
| 36 inputfile = open(file_path) | |
| 37 if int(numberToMerge)>0: | |
| 38 iHeader=0 | |
| 39 for iCurrentLine in inputfile: | |
| 40 iHeader=iHeader+1 | |
| 41 if iHeader>int(numberToMerge): | |
| 42 break | |
| 43 currentLine=iCurrentLine.strip().split("\t") | |
| 44 iOption=-1 | |
| 45 for i, field_component in enumerate( currentLine ): | |
| 46 if i not in toNotConsider:#to squeeze specified columns | |
| 47 iOption=iOption+1 | |
| 48 if iHeader==1: | |
| 49 options.append( ( str(field_component), str(field_component), False ) ) | |
| 50 else: | |
| 51 options[iOption]=(options[iOption][0]+"_"+str(field_component),options[iOption][1]+"_"+str(field_component),False) | |
| 52 else: | |
| 53 currentLine = next(inputfile).strip().split("\t") | |
| 54 for i, field_component in enumerate( currentLine ): | |
| 55 if i not in toNotConsider:#to squeeze specified columns | |
| 56 options.append( ( "Column_"+str(i), "Column_"+str(i), False ) ) | |
| 57 inputfile.close() | |
| 58 return options | |
| 59 | |
| 60 def get_row_names( file_path, factorName ): | |
| 61 inputfile = open(file_path) | |
| 62 firstLine = next(inputfile).strip().split("\t") | |
| 63 iColumn=-1 | |
| 64 for i, field_component in enumerate( firstLine ): | |
| 65 if field_component==factorName:#to test | |
| 66 iColumn=i | |
| 67 options=[] | |
| 68 if iColumn!=-1: | |
| 69 for nextLine in inputfile: | |
| 70 nextLine=nextLine.strip().split("\t") | |
| 71 if len(nextLine)>1: | |
| 72 if (nextLine[iColumn], nextLine[iColumn], False) not in options: | |
| 73 options.append( (nextLine[iColumn], nextLine[iColumn], False) ) | |
| 74 inputfile.close() | |
| 75 return options | |
| 76 | |
| 77 def get_condition_file_names( file_list, toNotConsider=-1, each=1): | |
| 78 options=[] | |
| 79 if not isinstance(file_list,list):#if input file is a tabular file, act as get_column_names | |
| 80 inputfile = open(file_list.file_name) | |
| 81 firstLine = next(inputfile).strip().split("\t") | |
| 82 cpt=0 | |
| 83 for i, field_component in enumerate( firstLine ): | |
| 84 if i!=toNotConsider:#to squeeze the first column | |
| 85 if cpt==0: | |
| 86 options.append( ( field_component, field_component, False ) ) | |
| 87 cpt+=1 | |
| 88 if cpt==each: | |
| 89 cpt=0 | |
| 90 inputfile.close() | |
| 91 else:#if input file is a .cel file list or a collection | |
| 92 if not hasattr(file_list[0],'collection'):#if it is not a collection, get name easily | |
| 93 for i, field_component in enumerate( file_list ): | |
| 94 options.append( ( field_component.name, field_component.name, False ) ) | |
| 95 else:#if the file is a collection, have to get deeper in the corresponding HistoryDatasetCollectionAssociation object | |
| 96 for i, field_component in enumerate( file_list[0].collection.elements ): | |
| 97 options.append( ( field_component.element_identifier, field_component.element_identifier, False ) ) | |
| 98 return options | |
| 99 | |
| 100 def generateFactorFile( file_list, factor_list, outputFileName, logFile): | |
| 101 forbidenCharacters={"*",":",",","|"} | |
| 102 outputfile = open(outputFileName, 'w') | |
| 103 outputLog = open(logFile, 'w') | |
| 104 sampleList=[] | |
| 105 if not isinstance(file_list,list): | |
| 106 conditionNames=get_condition_file_names(file_list,0) #unique expression file, remove the first column (index=0) | |
| 107 else : | |
| 108 conditionNames=get_condition_file_names(file_list) #.CEL files | |
| 109 for iSample, sample_component in enumerate (conditionNames): | |
| 110 sampleList.append(str(sample_component[1])) | |
| 111 outputLog.write("[INFO] "+str(len(sampleList))+" sample are detected as input\n") | |
| 112 globalDict=dict() | |
| 113 factorNameList=[] | |
| 114 firstLine="Conditions" | |
| 115 if len(factor_list)==0:#check if there is at least one factor available | |
| 116 outputLog.write("[ERROR] no factor was defined !\n") | |
| 117 return 1 | |
| 118 else: | |
| 119 for iFactor, factor_component in enumerate( factor_list ): | |
| 120 currentSampleList=list(sampleList) | |
| 121 currentFactor=str(factor_component['factorName']) | |
| 122 #check if factor name contains forbidden characters | |
| 123 for specialCharacter in forbidenCharacters: | |
| 124 if currentFactor.find(specialCharacter)!=-1: | |
| 125 outputLog.write("[ERROR] '"+specialCharacter+"' character is forbidden in factor name : '"+currentFactor+"'\n") | |
| 126 return 4 | |
| 127 #check if factor allready named like that | |
| 128 if not globalDict.get(currentFactor) is None: | |
| 129 outputLog.write("[ERROR] '"+currentFactor+"' is used several times as factor name\n") | |
| 130 return 3 | |
| 131 globalDict[currentFactor]=dict() | |
| 132 firstLine=firstLine+"\t"+currentFactor | |
| 133 factorNameList.append(currentFactor) | |
| 134 if len(factor_component['valueList'])<=1:#check if there is at least two value available | |
| 135 outputLog.write("[ERROR] at least two different values are necessary for '"+currentFactor+"' factor\n") | |
| 136 return 1 | |
| 137 else: | |
| 138 for iValue, value_component in enumerate( factor_component['valueList'] ): | |
| 139 currentValue=str(value_component['valueName']) | |
| 140 #check if factor name contains forbidden characters | |
| 141 for specialCharacter in forbidenCharacters: | |
| 142 if currentValue.find(specialCharacter)!=-1: | |
| 143 outputLog.write("[ERROR] '"+specialCharacter+"' character is forbidden in value name : '"+currentValue+"'\n") | |
| 144 return 4 | |
| 145 currentSample=str(value_component['valueConditions']).split(",") | |
| 146 for iSample, sample_component in enumerate (currentSample): | |
| 147 if not sample_component in currentSampleList: | |
| 148 outputLog.write("[ERROR] sample "+sample_component+" was assigned several times for factor '"+currentFactor+"'\n") | |
| 149 return 2 | |
| 150 currentSampleList.remove(sample_component) | |
| 151 globalDict[currentFactor][sample_component]=currentValue | |
| 152 if(len(currentSampleList)>0): | |
| 153 outputLog.write("[ERROR] for factor '"+currentFactor+"'' sample "+str(currentSampleList)+" are not assigned to any value\n") | |
| 154 return 2 | |
| 155 outputLog.write("[INFO] "+str(len(globalDict))+" factors are detected\n") | |
| 156 #start writing the factor file | |
| 157 outputfile.write(firstLine+"\n") | |
| 158 for iSample, sample_component in enumerate(sampleList): | |
| 159 newLine=sample_component | |
| 160 for iFactor, factor_component in enumerate(factorNameList): | |
| 161 newLine=newLine+"\t"+globalDict[factor_component][sample_component] | |
| 162 outputfile.write(newLine+"\n") | |
| 163 outputfile.close() | |
| 164 outputLog.close() | |
| 165 return 0 | |
| 166 | |
| 167 def selectSubSetTable(file_path,headerLine_number,columnsToAdd,columnNamesToKeep,outputFileName,logFile): | |
| 168 outputLog = open(logFile, 'w') | |
| 169 outputLog.write("[INFO] header line number : "+ headerLine_number+" lines\n") | |
| 170 availableColumnsTuple=get_column_names_mergeNumber(file_path, headerLine_number) | |
| 171 #convert tuple list as a simple array | |
| 172 availableColumns=[] | |
| 173 for iTuple, tuple_content in enumerate (availableColumnsTuple): | |
| 174 availableColumns.append(str(tuple_content[0])) | |
| 175 if len(availableColumns)==0: | |
| 176 outputLog.write("[ERROR] No detected columns in input file\n") | |
| 177 return 1 | |
| 178 selectedColumns=list(columnsToAdd) | |
| 179 for iVolcano, volcano_content in enumerate(columnNamesToKeep): | |
| 180 selectedColumns.append(availableColumns.index(volcano_content['pvalColumn'])) | |
| 181 if volcano_content['fdrColumn'] in availableColumns: | |
| 182 selectedColumns.append(availableColumns.index(volcano_content['fdrColumn'])) | |
| 183 else: | |
| 184 selectedColumns.append(0) | |
| 185 selectedColumns.append(availableColumns.index(volcano_content['fcColumn'])) | |
| 186 if len(selectedColumns)!=(3*len(columnNamesToKeep)+len(columnsToAdd)): | |
| 187 outputLog.write("[ERROR] matching between input file colnames and requested column names failed\n") | |
| 188 return 1 | |
| 189 outputLog.write("[INFO] columns kept : "+str(selectedColumns)+"\n") | |
| 190 #start writting formatted file | |
| 191 inputfile = open(file_path) | |
| 192 outputfile = open(outputFileName, 'w') | |
| 193 iLineCpt=-1 | |
| 194 for iCurrentLine in inputfile: | |
| 195 iLineCpt=iLineCpt+1 | |
| 196 if iLineCpt>=int(headerLine_number): | |
| 197 currentLineFields=np.array(iCurrentLine.strip().split("\t")) | |
| 198 newLine="\t".join(currentLineFields[selectedColumns]) | |
| 199 outputfile.write(newLine+"\n") | |
| 200 if iLineCpt<int(headerLine_number): | |
| 201 outputLog.write("[ERROR] not enough lines in input files ("+(iLineCpt+1)+" lines)\n") | |
| 202 return 1 | |
| 203 inputfile.close() | |
| 204 outputfile.close() | |
| 205 outputLog.close() | |
| 206 return 0 | 
