annotate gui/edu/unc/genomics/JobQueueManager.java @ 2:e16016635b2a

Uploaded
author timpalpant
date Mon, 13 Feb 2012 22:12:06 -0500
parents
children
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
2
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
1 package edu.unc.genomics;
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
2
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
3 import java.util.List;
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
4 import java.util.concurrent.ExecutorService;
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
5 import java.util.concurrent.Executors;
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
6 import java.util.concurrent.Future;
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
7
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
8 import org.apache.log4j.Logger;
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
9
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
10 /**
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
11 * Controller for scheduling and running jobs
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
12 * Wrapper for ExcecutorService, although the implementation could change
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
13 *
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
14 * @author timpalpant
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
15 *
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
16 */
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
17 public class JobQueueManager {
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
18
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
19 private static final Logger log = Logger.getLogger(JobQueueManager.class);
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
20
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
21 private final JobQueue queue;
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
22 private final ExecutorService exec;
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
23 private final Thread monitor;
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
24
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
25 public JobQueueManager(JobQueue queue) {
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
26 this.queue = queue;
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
27
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
28 int numProcessors = Runtime.getRuntime().availableProcessors();
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
29 log.debug("Initializing thread pool with "+numProcessors+" processors");
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
30 exec = Executors.newFixedThreadPool(numProcessors);
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
31
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
32 monitor = new Thread(new JobMonitor());
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
33 monitor.start();
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
34 }
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
35
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
36 public List<Runnable> shutdownNow() {
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
37 return exec.shutdownNow();
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
38 }
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
39
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
40 /**
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
41 * Add a Job to the queue
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
42 * @param job
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
43 * @throws JobException
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
44 */
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
45 public SubmittedJob submitJob(Job job) throws JobException {
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
46 // Refuse to add the Job to the queue if its arguments are not valid
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
47 if (!job.validateArguments()) {
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
48 throw new JobException("Job arguments are not valid");
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
49 }
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
50
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
51 // Submit the job for execution into the thread pool
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
52 Future<?> future = exec.submit(job);
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
53 SubmittedJob submittedJob = new SubmittedJob(job, future);
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
54 log.info("Submitted job " + submittedJob.getId());
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
55
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
56 // Add the SubmittedJob to the JobQueue
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
57 queue.add(submittedJob);
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
58 return submittedJob;
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
59 }
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
60
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
61 /**
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
62 * Are any jobs running? (not done)
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
63 * @return
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
64 */
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
65 public boolean isRunning() {
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
66 for (SubmittedJob job : queue) {
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
67 if (!job.isDone()) {
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
68 return true;
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
69 }
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
70 }
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
71
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
72 return false;
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
73 }
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
74
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
75
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
76 /**
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
77 * Background process for polling the status of submitted jobs
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
78 * @author timpalpant
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
79 *
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
80 */
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
81 public class JobMonitor implements Runnable {
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
82
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
83 public static final int JOB_POLL_INTERVAL = 1_000;
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
84
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
85 public void run() {
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
86 try {
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
87 while (true) {
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
88 // Check Job statuses every 1s
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
89 Thread.sleep(JOB_POLL_INTERVAL);
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
90
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
91 for (SubmittedJob job : queue) {
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
92 if (job.isDone()) {
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
93 queue.update(job);
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
94 }
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
95 }
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
96 }
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
97 } catch (InterruptedException e) {
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
98 log.fatal("JobMonitor crashed");
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
99 e.printStackTrace();
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
100 throw new RuntimeException("JobMonitor crashed");
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
101 }
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
102 }
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
103 }
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
104
e16016635b2a Uploaded
timpalpant
parents:
diff changeset
105 }