comparison java-genomics-toolkit/gui/edu/unc/genomics/JobQueueManager.java @ 0:1daf3026d231

Upload alpha version
author timpalpant
date Mon, 13 Feb 2012 21:55:55 -0500
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:1daf3026d231
1 package edu.unc.genomics;
2
3 import java.util.List;
4 import java.util.concurrent.ExecutorService;
5 import java.util.concurrent.Executors;
6 import java.util.concurrent.Future;
7
8 import org.apache.log4j.Logger;
9
10 /**
11 * Controller for scheduling and running jobs
12 * Wrapper for ExcecutorService, although the implementation could change
13 *
14 * @author timpalpant
15 *
16 */
17 public class JobQueueManager {
18
19 private static final Logger log = Logger.getLogger(JobQueueManager.class);
20
21 private final JobQueue queue;
22 private final ExecutorService exec;
23 private final Thread monitor;
24
25 public JobQueueManager(JobQueue queue) {
26 this.queue = queue;
27
28 int numProcessors = Runtime.getRuntime().availableProcessors();
29 log.debug("Initializing thread pool with "+numProcessors+" processors");
30 exec = Executors.newFixedThreadPool(numProcessors);
31
32 monitor = new Thread(new JobMonitor());
33 monitor.start();
34 }
35
36 public List<Runnable> shutdownNow() {
37 return exec.shutdownNow();
38 }
39
40 /**
41 * Add a Job to the queue
42 * @param job
43 * @throws JobException
44 */
45 public SubmittedJob submitJob(Job job) throws JobException {
46 // Refuse to add the Job to the queue if its arguments are not valid
47 if (!job.validateArguments()) {
48 throw new JobException("Job arguments are not valid");
49 }
50
51 // Submit the job for execution into the thread pool
52 Future<?> future = exec.submit(job);
53 SubmittedJob submittedJob = new SubmittedJob(job, future);
54 log.info("Submitted job " + submittedJob.getId());
55
56 // Add the SubmittedJob to the JobQueue
57 queue.add(submittedJob);
58 return submittedJob;
59 }
60
61 /**
62 * Are any jobs running? (not done)
63 * @return
64 */
65 public boolean isRunning() {
66 for (SubmittedJob job : queue) {
67 if (!job.isDone()) {
68 return true;
69 }
70 }
71
72 return false;
73 }
74
75
76 /**
77 * Background process for polling the status of submitted jobs
78 * @author timpalpant
79 *
80 */
81 public class JobMonitor implements Runnable {
82
83 public static final int JOB_POLL_INTERVAL = 1_000;
84
85 public void run() {
86 try {
87 while (true) {
88 // Check Job statuses every 1s
89 Thread.sleep(JOB_POLL_INTERVAL);
90
91 for (SubmittedJob job : queue) {
92 if (job.isDone()) {
93 queue.update(job);
94 }
95 }
96 }
97 } catch (InterruptedException e) {
98 log.fatal("JobMonitor crashed");
99 e.printStackTrace();
100 throw new RuntimeException("JobMonitor crashed");
101 }
102 }
103 }
104
105 }