| 2 | 1 package edu.unc.genomics; | 
|  | 2 | 
|  | 3 import java.util.List; | 
|  | 4 import java.util.concurrent.ExecutorService; | 
|  | 5 import java.util.concurrent.Executors; | 
|  | 6 import java.util.concurrent.Future; | 
|  | 7 | 
|  | 8 import org.apache.log4j.Logger; | 
|  | 9 | 
|  | 10 /** | 
|  | 11  * Controller for scheduling and running jobs | 
|  | 12  * Wrapper for ExcecutorService, although the implementation could change | 
|  | 13  * | 
|  | 14  * @author timpalpant | 
|  | 15  * | 
|  | 16  */ | 
|  | 17 public class JobQueueManager { | 
|  | 18 | 
|  | 19 	private static final Logger log = Logger.getLogger(JobQueueManager.class); | 
|  | 20 | 
|  | 21 	private final JobQueue queue; | 
|  | 22 	private final ExecutorService exec; | 
|  | 23 	private final Thread monitor; | 
|  | 24 | 
|  | 25 	public JobQueueManager(JobQueue queue) { | 
|  | 26 		this.queue = queue; | 
|  | 27 | 
|  | 28 		int numProcessors = Runtime.getRuntime().availableProcessors(); | 
|  | 29 		log.debug("Initializing thread pool with "+numProcessors+" processors"); | 
|  | 30 		exec = Executors.newFixedThreadPool(numProcessors); | 
|  | 31 | 
|  | 32 		monitor = new Thread(new JobMonitor()); | 
|  | 33 		monitor.start(); | 
|  | 34 	} | 
|  | 35 | 
|  | 36 	public List<Runnable> shutdownNow() { | 
|  | 37 		return exec.shutdownNow(); | 
|  | 38 	} | 
|  | 39 | 
|  | 40 	/** | 
|  | 41 	 * Add a Job to the queue | 
|  | 42 	 * @param job | 
|  | 43 	 * @throws JobException | 
|  | 44 	 */ | 
|  | 45 	public SubmittedJob submitJob(Job job) throws JobException { | 
|  | 46 		// Refuse to add the Job to the queue if its arguments are not valid | 
|  | 47 		if (!job.validateArguments()) { | 
|  | 48 			throw new JobException("Job arguments are not valid"); | 
|  | 49 		} | 
|  | 50 | 
|  | 51 		// Submit the job for execution into the thread pool | 
|  | 52 		Future<?> future = exec.submit(job); | 
|  | 53 		SubmittedJob submittedJob = new SubmittedJob(job, future); | 
|  | 54 		log.info("Submitted job " + submittedJob.getId()); | 
|  | 55 | 
|  | 56 		// Add the SubmittedJob to the JobQueue | 
|  | 57 		queue.add(submittedJob); | 
|  | 58 		return submittedJob; | 
|  | 59 	} | 
|  | 60 | 
|  | 61 	/** | 
|  | 62 	 * Are any jobs running? (not done) | 
|  | 63 	 * @return | 
|  | 64 	 */ | 
|  | 65 	public boolean isRunning() { | 
|  | 66 		for (SubmittedJob job : queue) { | 
|  | 67 			if (!job.isDone()) { | 
|  | 68 				return true; | 
|  | 69 			} | 
|  | 70 		} | 
|  | 71 | 
|  | 72 		return false; | 
|  | 73 	} | 
|  | 74 | 
|  | 75 | 
|  | 76 	/** | 
|  | 77 	 * Background process for polling the status of submitted jobs | 
|  | 78 	 * @author timpalpant | 
|  | 79 	 * | 
|  | 80 	 */ | 
|  | 81 	public class JobMonitor implements Runnable { | 
|  | 82 | 
|  | 83 		public static final int JOB_POLL_INTERVAL = 1_000; | 
|  | 84 | 
|  | 85 		public void run() { | 
|  | 86 			try { | 
|  | 87 				while (true) { | 
|  | 88 					// Check Job statuses every 1s | 
|  | 89 					Thread.sleep(JOB_POLL_INTERVAL); | 
|  | 90 | 
|  | 91 					for (SubmittedJob job : queue) { | 
|  | 92 						if (job.isDone()) { | 
|  | 93 							queue.update(job); | 
|  | 94 						} | 
|  | 95 					} | 
|  | 96 				} | 
|  | 97 			} catch (InterruptedException e) { | 
|  | 98 				log.fatal("JobMonitor crashed"); | 
|  | 99 				e.printStackTrace(); | 
|  | 100 				throw new RuntimeException("JobMonitor crashed"); | 
|  | 101 			} | 
|  | 102 		} | 
|  | 103 	} | 
|  | 104 | 
|  | 105 } |