annotate java-genomics-toolkit/gui/edu/unc/genomics/JobQueueManager.java @ 0:1daf3026d231

Upload alpha version
author timpalpant
date Mon, 13 Feb 2012 21:55:55 -0500
parents
children
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
0
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
1 package edu.unc.genomics;
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
2
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
3 import java.util.List;
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
4 import java.util.concurrent.ExecutorService;
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
5 import java.util.concurrent.Executors;
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
6 import java.util.concurrent.Future;
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
7
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
8 import org.apache.log4j.Logger;
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
9
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
10 /**
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
11 * Controller for scheduling and running jobs
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
12 * Wrapper for ExcecutorService, although the implementation could change
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
13 *
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
14 * @author timpalpant
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
15 *
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
16 */
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
17 public class JobQueueManager {
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
18
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
19 private static final Logger log = Logger.getLogger(JobQueueManager.class);
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
20
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
21 private final JobQueue queue;
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
22 private final ExecutorService exec;
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
23 private final Thread monitor;
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
24
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
25 public JobQueueManager(JobQueue queue) {
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
26 this.queue = queue;
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
27
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
28 int numProcessors = Runtime.getRuntime().availableProcessors();
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
29 log.debug("Initializing thread pool with "+numProcessors+" processors");
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
30 exec = Executors.newFixedThreadPool(numProcessors);
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
31
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
32 monitor = new Thread(new JobMonitor());
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
33 monitor.start();
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
34 }
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
35
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
36 public List<Runnable> shutdownNow() {
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
37 return exec.shutdownNow();
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
38 }
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
39
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
40 /**
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
41 * Add a Job to the queue
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
42 * @param job
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
43 * @throws JobException
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
44 */
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
45 public SubmittedJob submitJob(Job job) throws JobException {
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
46 // Refuse to add the Job to the queue if its arguments are not valid
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
47 if (!job.validateArguments()) {
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
48 throw new JobException("Job arguments are not valid");
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
49 }
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
50
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
51 // Submit the job for execution into the thread pool
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
52 Future<?> future = exec.submit(job);
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
53 SubmittedJob submittedJob = new SubmittedJob(job, future);
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
54 log.info("Submitted job " + submittedJob.getId());
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
55
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
56 // Add the SubmittedJob to the JobQueue
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
57 queue.add(submittedJob);
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
58 return submittedJob;
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
59 }
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
60
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
61 /**
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
62 * Are any jobs running? (not done)
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
63 * @return
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
64 */
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
65 public boolean isRunning() {
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
66 for (SubmittedJob job : queue) {
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
67 if (!job.isDone()) {
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
68 return true;
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
69 }
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
70 }
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
71
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
72 return false;
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
73 }
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
74
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
75
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
76 /**
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
77 * Background process for polling the status of submitted jobs
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
78 * @author timpalpant
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
79 *
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
80 */
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
81 public class JobMonitor implements Runnable {
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
82
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
83 public static final int JOB_POLL_INTERVAL = 1_000;
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
84
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
85 public void run() {
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
86 try {
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
87 while (true) {
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
88 // Check Job statuses every 1s
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
89 Thread.sleep(JOB_POLL_INTERVAL);
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
90
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
91 for (SubmittedJob job : queue) {
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
92 if (job.isDone()) {
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
93 queue.update(job);
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
94 }
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
95 }
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
96 }
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
97 } catch (InterruptedException e) {
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
98 log.fatal("JobMonitor crashed");
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
99 e.printStackTrace();
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
100 throw new RuntimeException("JobMonitor crashed");
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
101 }
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
102 }
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
103 }
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
104
1daf3026d231 Upload alpha version
timpalpant
parents:
diff changeset
105 }