package azkaban.execapp.jmx; import azkaban.event.Event; import azkaban.event.EventData; import azkaban.event.EventListener; import azkaban.execapp.JobRunner; import azkaban.executor.ExecutableNode; import azkaban.executor.Status; import azkaban.spi.EventType; import azkaban.utils.Props; import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.apache.log4j.Logger; /** * Responsible keeping track of job related MBean attributes through listening to job related * events. * * @author hluu */ public class JmxJobMBeanManager implements JmxJobMXBean, EventListener { private static final Logger logger = Logger .getLogger(JmxJobMBeanManager.class); private static final JmxJobMBeanManager INSTANCE = new JmxJobMBeanManager(); private final AtomicInteger runningJobCount = new AtomicInteger(0); private final AtomicInteger totalExecutedJobCount = new AtomicInteger(0); private final AtomicInteger totalFailedJobCount = new AtomicInteger(0); private final AtomicInteger totalSucceededJobCount = new AtomicInteger(0); private final Map<String, AtomicInteger> jobTypeFailureMap = new HashMap<>(); private final Map<String, AtomicInteger> jobTypeSucceededMap = new HashMap<>(); private boolean initialized; private JmxJobMBeanManager() { } public static JmxJobMBeanManager getInstance() { return INSTANCE; } public void initialize(final Props props) { logger.info("Initializing " + getClass().getName()); this.initialized = true; } @Override public int getNumRunningJobs() { return this.runningJobCount.get(); } @Override public int getTotalNumExecutedJobs() { return this.totalExecutedJobCount.get(); } @Override public int getTotalFailedJobs() { return this.totalFailedJobCount.get(); } @Override public int getTotalSucceededJobs() { return this.totalSucceededJobCount.get(); } @Override public Map<String, Integer> getTotalSucceededJobsByJobType() { return convertMapValueToInteger(this.jobTypeSucceededMap); } @Override public Map<String, Integer> getTotalFailedJobsByJobType() { return convertMapValueToInteger(this.jobTypeFailureMap); } private Map<String, Integer> convertMapValueToInteger( final Map<String, AtomicInteger> map) { final Map<String, Integer> result = new HashMap<>(map.size()); for (final Map.Entry<String, AtomicInteger> entry : map.entrySet()) { result.put(entry.getKey(), entry.getValue().intValue()); } return result; } @Override public void handleEvent(final Event event) { if (!this.initialized) { throw new RuntimeException("JmxJobMBeanManager has not been initialized"); } if (event.getRunner() instanceof JobRunner) { final JobRunner jobRunner = (JobRunner) event.getRunner(); final EventData eventData = event.getData(); final ExecutableNode node = jobRunner.getNode(); if (logger.isDebugEnabled()) { logger.debug("*** got " + event.getType() + " " + node.getId() + " " + event.getRunner().getClass().getName() + " status: " + eventData.getStatus()); } if (event.getType() == EventType.JOB_STARTED) { this.runningJobCount.incrementAndGet(); } else if (event.getType() == EventType.JOB_FINISHED) { this.totalExecutedJobCount.incrementAndGet(); if (this.runningJobCount.intValue() > 0) { this.runningJobCount.decrementAndGet(); } else { logger.warn("runningJobCount not messed up, it is already zero " + "and we are trying to decrement on job event " + EventType.JOB_FINISHED); } if (eventData.getStatus() == Status.FAILED) { this.totalFailedJobCount.incrementAndGet(); } else if (eventData.getStatus() == Status.SUCCEEDED) { this.totalSucceededJobCount.incrementAndGet(); } handleJobFinishedCount(eventData.getStatus(), node.getType()); } } else { logger.warn("((((((((( Got a different runner: " + event.getRunner().getClass().getName()); } } private void handleJobFinishedCount(final Status status, final String jobType) { switch (status) { case FAILED: handleJobFinishedByType(this.jobTypeFailureMap, jobType); break; case SUCCEEDED: handleJobFinishedByType(this.jobTypeSucceededMap, jobType); break; default: } } private void handleJobFinishedByType(final Map<String, AtomicInteger> jobTypeMap, final String jobType) { synchronized (jobTypeMap) { AtomicInteger count = jobTypeMap.get(jobType); if (count == null) { count = new AtomicInteger(); } count.incrementAndGet(); jobTypeMap.put(jobType, count); } } }