2
|
1 package edu.unc.genomics;
|
|
2
|
|
3 import java.util.List;
|
|
4 import java.util.concurrent.ExecutorService;
|
|
5 import java.util.concurrent.Executors;
|
|
6 import java.util.concurrent.Future;
|
|
7
|
|
8 import org.apache.log4j.Logger;
|
|
9
|
|
10 /**
|
|
11 * Controller for scheduling and running jobs
|
|
12 * Wrapper for ExcecutorService, although the implementation could change
|
|
13 *
|
|
14 * @author timpalpant
|
|
15 *
|
|
16 */
|
|
17 public class JobQueueManager {
|
|
18
|
|
19 private static final Logger log = Logger.getLogger(JobQueueManager.class);
|
|
20
|
|
21 private final JobQueue queue;
|
|
22 private final ExecutorService exec;
|
|
23 private final Thread monitor;
|
|
24
|
|
25 public JobQueueManager(JobQueue queue) {
|
|
26 this.queue = queue;
|
|
27
|
|
28 int numProcessors = Runtime.getRuntime().availableProcessors();
|
|
29 log.debug("Initializing thread pool with "+numProcessors+" processors");
|
|
30 exec = Executors.newFixedThreadPool(numProcessors);
|
|
31
|
|
32 monitor = new Thread(new JobMonitor());
|
|
33 monitor.start();
|
|
34 }
|
|
35
|
|
36 public List<Runnable> shutdownNow() {
|
|
37 return exec.shutdownNow();
|
|
38 }
|
|
39
|
|
40 /**
|
|
41 * Add a Job to the queue
|
|
42 * @param job
|
|
43 * @throws JobException
|
|
44 */
|
|
45 public SubmittedJob submitJob(Job job) throws JobException {
|
|
46 // Refuse to add the Job to the queue if its arguments are not valid
|
|
47 if (!job.validateArguments()) {
|
|
48 throw new JobException("Job arguments are not valid");
|
|
49 }
|
|
50
|
|
51 // Submit the job for execution into the thread pool
|
|
52 Future<?> future = exec.submit(job);
|
|
53 SubmittedJob submittedJob = new SubmittedJob(job, future);
|
|
54 log.info("Submitted job " + submittedJob.getId());
|
|
55
|
|
56 // Add the SubmittedJob to the JobQueue
|
|
57 queue.add(submittedJob);
|
|
58 return submittedJob;
|
|
59 }
|
|
60
|
|
61 /**
|
|
62 * Are any jobs running? (not done)
|
|
63 * @return
|
|
64 */
|
|
65 public boolean isRunning() {
|
|
66 for (SubmittedJob job : queue) {
|
|
67 if (!job.isDone()) {
|
|
68 return true;
|
|
69 }
|
|
70 }
|
|
71
|
|
72 return false;
|
|
73 }
|
|
74
|
|
75
|
|
76 /**
|
|
77 * Background process for polling the status of submitted jobs
|
|
78 * @author timpalpant
|
|
79 *
|
|
80 */
|
|
81 public class JobMonitor implements Runnable {
|
|
82
|
|
83 public static final int JOB_POLL_INTERVAL = 1_000;
|
|
84
|
|
85 public void run() {
|
|
86 try {
|
|
87 while (true) {
|
|
88 // Check Job statuses every 1s
|
|
89 Thread.sleep(JOB_POLL_INTERVAL);
|
|
90
|
|
91 for (SubmittedJob job : queue) {
|
|
92 if (job.isDone()) {
|
|
93 queue.update(job);
|
|
94 }
|
|
95 }
|
|
96 }
|
|
97 } catch (InterruptedException e) {
|
|
98 log.fatal("JobMonitor crashed");
|
|
99 e.printStackTrace();
|
|
100 throw new RuntimeException("JobMonitor crashed");
|
|
101 }
|
|
102 }
|
|
103 }
|
|
104
|
|
105 }
|