diff src/General_functions.py @ 0:14045c80a222 draft

"planemo upload for repository https://github.com/juliechevalier/GIANT/tree/master commit cb276a594444c8f32e9819fefde3a21f121d35df"
author vandelj
date Fri, 26 Jun 2020 09:38:23 -0400
parents
children ccca6ad98f78
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/General_functions.py	Fri Jun 26 09:38:23 2020 -0400
@@ -0,0 +1,206 @@
+import re
+import numpy as np
+
+def get_column_names( file_path, toNotConsider=-1, each=1):
+	options=[]
+	inputfile = open(file_path)
+	firstLine = next(inputfile).strip().split("\t")
+	cpt=0
+	for i, field_component in enumerate( firstLine ):
+		if i!=toNotConsider:#to squeeze the first column
+			if cpt==0:
+				options.append( ( field_component, field_component, False ) )
+			cpt+=1
+			if cpt==each:
+				cpt=0
+	inputfile.close()
+	return options
+
+def get_column_names_filteredList( file_path, toNotConsider=[], each=1):
+	options=[]
+	inputfile = open(file_path)
+	firstLine = next(inputfile).strip().split("\t")
+	cpt=0
+	for i, field_component in enumerate( firstLine ):
+		if i not in toNotConsider:#to squeeze the first columns
+			if cpt==0:
+				options.append( ( field_component, field_component, False ) )
+			cpt+=1
+			if cpt==each:
+				cpt=0
+	inputfile.close()
+	return options
+
+def get_column_names_mergeNumber(file_path, numberToMerge=1, toNotConsider=[]):
+	options=[]
+	inputfile = open(file_path)
+	if int(numberToMerge)>0:
+		iHeader=0
+		for iCurrentLine in inputfile:
+			iHeader=iHeader+1
+			if iHeader>int(numberToMerge):
+				break
+			currentLine=iCurrentLine.strip().split("\t")
+			iOption=-1
+			for i, field_component in enumerate( currentLine ):
+				if i not in toNotConsider:#to squeeze specified columns
+					iOption=iOption+1
+					if iHeader==1:
+						options.append( ( str(field_component), str(field_component), False ) )
+					else:
+						options[iOption]=(options[iOption][0]+"_"+str(field_component),options[iOption][1]+"_"+str(field_component),False)
+	else:
+		currentLine = next(inputfile).strip().split("\t")
+		for i, field_component in enumerate( currentLine ):
+			if i not in toNotConsider:#to squeeze specified columns
+				options.append( ( "Column_"+str(i), "Column_"+str(i), False ) )
+	inputfile.close()
+	return options
+
+def get_row_names( file_path, factorName ):
+	inputfile = open(file_path)
+	firstLine = next(inputfile).strip().split("\t")
+	iColumn=-1
+	for i, field_component in enumerate( firstLine ):
+		if field_component==factorName:#to test
+			iColumn=i
+	options=[]
+	if iColumn!=-1:
+		for nextLine in inputfile:
+			nextLine=nextLine.strip().split("\t")
+			if len(nextLine)>1:
+				if (nextLine[iColumn], nextLine[iColumn], False) not in options:
+					options.append( (nextLine[iColumn], nextLine[iColumn], False) )
+	inputfile.close()
+	return options
+
+def get_condition_file_names( file_list, toNotConsider=-1, each=1):
+	options=[]
+	if not isinstance(file_list,list):#if input file is a tabular file, act as get_column_names
+		inputfile = open(file_list.file_name)
+		firstLine = next(inputfile).strip().split("\t")
+		cpt=0
+		for i, field_component in enumerate( firstLine ):
+			if i!=toNotConsider:#to squeeze the first column
+				if cpt==0:
+					options.append( ( field_component, field_component, False ) )
+				cpt+=1
+				if cpt==each:
+					cpt=0
+		inputfile.close()
+	else:#if input file is a .cel file list or a collection
+		if not hasattr(file_list[0],'collection'):#if it is not a collection, get name easily
+			for i, field_component in enumerate( file_list ):
+				options.append( ( field_component.name, field_component.name, False ) )
+		else:#if the file is a collection, have to get deeper in the corresponding HistoryDatasetCollectionAssociation object
+			for i, field_component in enumerate( file_list[0].collection.elements ):
+				options.append( ( field_component.element_identifier, field_component.element_identifier, False ) )
+	return options
+
+def generateFactorFile( file_list, factor_list, outputFileName, logFile):
+	forbidenCharacters={"*",":",",","|"}
+	outputfile = open(outputFileName, 'w')
+	outputLog = open(logFile, 'w')
+	sampleList=[]
+	if not isinstance(file_list,list):
+		conditionNames=get_condition_file_names(file_list,0) #unique expression file, remove the first column (index=0)
+	else :
+		conditionNames=get_condition_file_names(file_list) #.CEL files
+	for iSample, sample_component in enumerate (conditionNames):
+		sampleList.append(str(sample_component[1]))
+	outputLog.write("[INFO] "+str(len(sampleList))+" sample are detected as input\n")
+	globalDict=dict()
+	factorNameList=[]
+	firstLine="Conditions"
+	if len(factor_list)==0:#check if there is at least one factor available
+		outputLog.write("[ERROR] no factor was defined !\n")
+		return 1
+	else:
+		for iFactor, factor_component in enumerate( factor_list ):
+			currentSampleList=list(sampleList)
+			currentFactor=str(factor_component['factorName'])
+			#check if factor name contains forbidden characters
+			for specialCharacter in forbidenCharacters:
+				if currentFactor.find(specialCharacter)!=-1:
+					outputLog.write("[ERROR] '"+specialCharacter+"' character is forbidden in factor name : '"+currentFactor+"'\n")	
+					return 4
+			#check if factor allready named like that
+			if not globalDict.get(currentFactor) is None:
+				outputLog.write("[ERROR] '"+currentFactor+"' is used several times as factor name\n")	
+				return 3
+			globalDict[currentFactor]=dict()
+			firstLine=firstLine+"\t"+currentFactor
+			factorNameList.append(currentFactor)
+			if len(factor_component['valueList'])<=1:#check if there is at least two value available
+				outputLog.write("[ERROR] at least two different values are necessary for '"+currentFactor+"' factor\n")
+				return 1
+			else:
+				for iValue, value_component in enumerate( factor_component['valueList'] ):
+					currentValue=str(value_component['valueName'])
+					#check if factor name contains forbidden characters
+					for specialCharacter in forbidenCharacters:
+						if currentValue.find(specialCharacter)!=-1:
+							outputLog.write("[ERROR] '"+specialCharacter+"' character is forbidden in value name : '"+currentValue+"'\n")	
+							return 4
+					currentSample=str(value_component['valueConditions']).split(",")
+					for iSample, sample_component in enumerate (currentSample):
+						if not sample_component in currentSampleList:
+							outputLog.write("[ERROR] sample "+sample_component+" was assigned several times for factor '"+currentFactor+"'\n")
+							return 2
+						currentSampleList.remove(sample_component)
+						globalDict[currentFactor][sample_component]=currentValue
+			if(len(currentSampleList)>0):
+				outputLog.write("[ERROR] for factor '"+currentFactor+"'' sample "+str(currentSampleList)+" are not assigned to any value\n")
+				return 2
+	outputLog.write("[INFO] "+str(len(globalDict))+" factors are detected\n")
+	#start writing the factor file
+	outputfile.write(firstLine+"\n") 
+	for iSample, sample_component in enumerate(sampleList):
+		newLine=sample_component
+		for iFactor, factor_component in enumerate(factorNameList):
+			newLine=newLine+"\t"+globalDict[factor_component][sample_component]
+		outputfile.write(newLine+"\n") 
+	outputfile.close()
+	outputLog.close()
+	return 0
+
+def selectSubSetTable(file_path,headerLine_number,columnsToAdd,columnNamesToKeep,outputFileName,logFile):
+	outputLog = open(logFile, 'w')
+	outputLog.write("[INFO] header line number : "+ headerLine_number+" lines\n")	
+	availableColumnsTuple=get_column_names_mergeNumber(file_path, headerLine_number)
+	#convert tuple list as a simple array
+	availableColumns=[]
+	for iTuple, tuple_content in enumerate (availableColumnsTuple): 
+		availableColumns.append(str(tuple_content[0]))
+	if len(availableColumns)==0:
+		outputLog.write("[ERROR] No detected columns in input file\n")	
+		return 1
+	selectedColumns=list(columnsToAdd)
+	for iVolcano, volcano_content in enumerate(columnNamesToKeep):
+		selectedColumns.append(availableColumns.index(volcano_content['pvalColumn']))
+		if volcano_content['fdrColumn'] in availableColumns:
+			selectedColumns.append(availableColumns.index(volcano_content['fdrColumn']))
+		else:
+			selectedColumns.append(0)
+		selectedColumns.append(availableColumns.index(volcano_content['fcColumn']))
+	if len(selectedColumns)!=(3*len(columnNamesToKeep)+len(columnsToAdd)):
+		outputLog.write("[ERROR] matching between input file colnames and requested column names failed\n")	
+		return 1
+	outputLog.write("[INFO] columns kept : "+str(selectedColumns)+"\n")	
+	#start writting formatted file
+	inputfile = open(file_path)
+	outputfile = open(outputFileName, 'w')
+	iLineCpt=-1
+	for iCurrentLine in inputfile:
+		iLineCpt=iLineCpt+1
+		if iLineCpt>=int(headerLine_number):
+			currentLineFields=np.array(iCurrentLine.strip().split("\t"))
+			newLine="\t".join(currentLineFields[selectedColumns])
+			outputfile.write(newLine+"\n")
+	if iLineCpt<int(headerLine_number):
+		outputLog.write("[ERROR] not enough lines in input files ("+(iLineCpt+1)+" lines)\n")	
+		return 1
+	inputfile.close()
+	outputfile.close()
+	outputLog.close()
+	return 0
\ No newline at end of file