/*
 * Copyright 2012 LinkedIn Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package azkaban.webapp.servlet;

import azkaban.Constants;
import azkaban.Constants.ConfigurationKeys;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableJobInfo;
import azkaban.executor.ExecutorManagerAdapter;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.Status;
import azkaban.flow.Edge;
import azkaban.flow.Flow;
import azkaban.flow.FlowProps;
import azkaban.flow.Node;
import azkaban.flowtrigger.quartz.FlowTriggerScheduler;
import azkaban.project.Project;
import azkaban.project.ProjectFileHandler;
import azkaban.project.ProjectLogEvent;
import azkaban.project.ProjectLogEvent.EventType;
import azkaban.project.ProjectManager;
import azkaban.project.ProjectManagerException;
import azkaban.project.ProjectWhitelist;
import azkaban.project.validator.ValidationReport;
import azkaban.project.validator.ValidatorConfigs;
import azkaban.scheduler.Schedule;
import azkaban.scheduler.ScheduleManager;
import azkaban.scheduler.ScheduleManagerException;
import azkaban.server.session.Session;
import azkaban.user.Permission;
import azkaban.user.Permission.Type;
import azkaban.user.Role;
import azkaban.user.User;
import azkaban.user.UserManager;
import azkaban.user.UserUtils;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
import azkaban.utils.Utils;
import azkaban.webapp.AzkabanWebServer;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Writer;
import java.security.AccessControlException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.fileupload.FileItem;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {

  static final String FLOW_IS_LOCKED_PARAM = "isLocked";
  static final String FLOW_NAME_PARAM = "flowName";
  static final String FLOW_ID_PARAM = "flowId";
  static final String ERROR_PARAM = "error";
  private static final String APPLICATION_ZIP_MIME_TYPE = "application/zip";
  private static final long serialVersionUID = 1;
  private static final Logger logger = LoggerFactory.getLogger(ProjectManagerServlet.class);
  private static final NodeLevelComparator NODE_LEVEL_COMPARATOR =
      new NodeLevelComparator();
  private static final String LOCKDOWN_CREATE_PROJECTS_KEY =
      "lockdown.create.projects";
  private static final String LOCKDOWN_UPLOAD_PROJECTS_KEY =
      "lockdown.upload.projects";

  private static final String PROJECT_DOWNLOAD_BUFFER_SIZE_IN_BYTES =
      "project.download.buffer.size";
  private static final Comparator<Flow> FLOW_ID_COMPARATOR = new Comparator<Flow>() {
    @Override
    public int compare(final Flow f1, final Flow f2) {
      return f1.getId().compareTo(f2.getId());
    }
  };
  private ProjectManager projectManager;
  private ExecutorManagerAdapter executorManagerAdapter;
  private ScheduleManager scheduleManager;
  private UserManager userManager;
  private FlowTriggerScheduler scheduler;
  private int downloadBufferSize;
  private boolean lockdownCreateProjects = false;
  private boolean lockdownUploadProjects = false;
  private boolean enableQuartz = false;

  @Override
  public void init(final ServletConfig config) throws ServletException {
    super.init(config);

    final AzkabanWebServer server = (AzkabanWebServer) getApplication();
    this.projectManager = server.getProjectManager();
    this.executorManagerAdapter = server.getExecutorManager();
    this.scheduleManager = server.getScheduleManager();
    this.userManager = server.getUserManager();
    this.scheduler = server.getScheduler();
    this.lockdownCreateProjects =
        server.getServerProps().getBoolean(LOCKDOWN_CREATE_PROJECTS_KEY, false);
    this.enableQuartz = server.getServerProps().getBoolean(ConfigurationKeys.ENABLE_QUARTZ, false);
    if (this.lockdownCreateProjects) {
      logger.info("Creation of projects is locked down");
    }

    this.lockdownUploadProjects =
        server.getServerProps().getBoolean(LOCKDOWN_UPLOAD_PROJECTS_KEY, false);
    if (this.lockdownUploadProjects) {
      logger.info("Uploading of projects is locked down");
    }

    this.downloadBufferSize =
        server.getServerProps().getInt(PROJECT_DOWNLOAD_BUFFER_SIZE_IN_BYTES,
            8192);

    logger.info("downloadBufferSize: " + this.downloadBufferSize);
  }

  @Override
  protected void handleGet(final HttpServletRequest req, final HttpServletResponse resp,
      final Session session) throws ServletException, IOException {
    if (hasParam(req, "project")) {
      if (hasParam(req, "ajax")) {
        handleAJAXAction(req, resp, session);
      } else if (hasParam(req, "logs")) {
        handleProjectLogsPage(req, resp, session);
      } else if (hasParam(req, "permissions")) {
        handlePermissionPage(req, resp, session);
      } else if (hasParam(req, "prop")) {
        handlePropertyPage(req, resp, session);
      } else if (hasParam(req, "history")) {
        handleJobHistoryPage(req, resp, session);
      } else if (hasParam(req, "job")) {
        handleJobPage(req, resp, session);
      } else if (hasParam(req, "flow")) {
        handleFlowPage(req, resp, session);
      } else if (hasParam(req, "delete")) {
        handleRemoveProject(req, resp, session);
      } else if (hasParam(req, "purge")) {
        handlePurgeProject(req, resp, session);
      } else if (hasParam(req, "download")) {
        handleDownloadProject(req, resp, session);
      } else {
        handleProjectPage(req, resp, session);
      }
      return;
    } else if (hasParam(req, "reloadProjectWhitelist")) {
      handleReloadProjectWhitelist(req, resp, session);
    }

    final Page page =
        newPage(req, resp, session,
            "azkaban/webapp/servlet/velocity/projectpage.vm");
    page.add("errorMsg", "No project set.");
    page.render();
  }

  @Override
  protected void handleMultiformPost(final HttpServletRequest req,
      final HttpServletResponse resp, final Map<String, Object> params, final Session session)
      throws ServletException, IOException {
    // Looks like a duplicate, but this is a move away from the regular
    // multiform post + redirect
    // to a more ajax like command.
    if (params.containsKey("ajax")) {
      final String action = (String) params.get("ajax");
      final HashMap<String, String> ret = new HashMap<>();
      if (action.equals("upload")) {
        ajaxHandleUpload(req, resp, ret, params, session);
      }
      this.writeJSON(resp, ret);
    } else if (params.containsKey("action")) {
      final String action = (String) params.get("action");
      if (action.equals("upload")) {
        handleUpload(req, resp, params, session);
      }
    }
  }

  @Override
  protected void handlePost(final HttpServletRequest req, final HttpServletResponse resp,
      final Session session) throws ServletException, IOException {
    if (hasParam(req, "ajax")) {
      handleAJAXAction(req, resp, session);
    } else if (hasParam(req, "action")) {
      final String action = getParam(req, "action");
      if (action.equals("create")) {
        handleCreate(req, resp, session);
      }
    }
  }

  private void handleAJAXAction(final HttpServletRequest req,
      final HttpServletResponse resp, final Session session) throws ServletException,
      IOException {
    final String projectName = getParam(req, "project");
    final User user = session.getUser();

    final HashMap<String, Object> ret = new HashMap<>();
    ret.put("project", projectName);

    final Project project = this.projectManager.getProject(projectName);
    if (project == null) {
      ret.put(ERROR_PARAM, "Project " + projectName + " doesn't exist.");
    } else {
      ret.put("projectId", project.getId());
      final String ajaxName = getParam(req, "ajax");
      if (ajaxName.equals("getProjectId")) {
        // Do nothing, since projectId is added to all AJAX requests.
      } else if (ajaxName.equals("fetchProjectLogs")) {
        if (handleAjaxPermission(project, user, Type.READ, ret)) {
          ajaxFetchProjectLogEvents(project, req, ret);
        }
      } else if (ajaxName.equals("fetchflowjobs")) {
        if (handleAjaxPermission(project, user, Type.READ, ret)) {
          ajaxFetchFlow(project, ret, req);
        }
      } else if (ajaxName.equals("fetchflowdetails")) {
        if (handleAjaxPermission(project, user, Type.READ, ret)) {
          ajaxFetchFlowDetails(project, ret, req);
        }
      } else if (ajaxName.equals("fetchflowgraph")) {
        if (handleAjaxPermission(project, user, Type.READ, ret)) {
          ajaxFetchFlowGraph(project, ret, req);
        }
      } else if (ajaxName.equals("fetchflownodedata")) {
        if (handleAjaxPermission(project, user, Type.READ, ret)) {
          ajaxFetchFlowNodeData(project, ret, req);
        }
      } else if (ajaxName.equals("fetchprojectflows")) {
        if (handleAjaxPermission(project, user, Type.READ, ret)) {
          ajaxFetchProjectFlows(project, ret, req);
        }
      } else if (ajaxName.equals("changeDescription")) {
        if (handleAjaxPermission(project, user, Type.WRITE, ret)) {
          ajaxChangeDescription(project, ret, req, user);
        }
      } else if (ajaxName.equals("getPermissions")) {
        if (handleAjaxPermission(project, user, Type.READ, ret)) {
          ajaxGetPermissions(project, ret);
        }
      } else if (ajaxName.equals("getGroupPermissions")) {
        if (handleAjaxPermission(project, user, Type.READ, ret)) {
          ajaxGetGroupPermissions(project, ret);
        }
      } else if (ajaxName.equals("getProxyUsers")) {
        if (handleAjaxPermission(project, user, Type.READ, ret)) {
          ajaxGetProxyUsers(project, ret);
        }
      } else if (ajaxName.equals("changePermission")) {
        if (handleAjaxPermission(project, user, Type.ADMIN, ret)) {
          ajaxChangePermissions(project, ret, req, user);
        }
      } else if (ajaxName.equals("addPermission")) {
        if (handleAjaxPermission(project, user, Type.ADMIN, ret)) {
          ajaxAddPermission(project, ret, req, user);
        }
      } else if (ajaxName.equals("addProxyUser")) {
        if (handleAjaxPermission(project, user, Type.ADMIN, ret)) {
          ajaxAddProxyUser(project, ret, req, user);
        }
      } else if (ajaxName.equals("removeProxyUser")) {
        if (handleAjaxPermission(project, user, Type.ADMIN, ret)) {
          ajaxRemoveProxyUser(project, ret, req, user);
        }
      } else if (ajaxName.equals("fetchFlowExecutions")) {
        if (handleAjaxPermission(project, user, Type.READ, ret)) {
          ajaxFetchFlowExecutions(project, ret, req);
        }
      } else if (ajaxName.equals("fetchLastSuccessfulFlowExecution")) {
        if (handleAjaxPermission(project, user, Type.READ, ret)) {
          ajaxFetchLastSuccessfulFlowExecution(project, ret, req);
        }
      } else if (ajaxName.equals("fetchJobInfo")) {
        if (handleAjaxPermission(project, user, Type.READ, ret)) {
          ajaxFetchJobInfo(project, ret, req);
        }
      } else if (ajaxName.equals("setJobOverrideProperty")) {
        if (handleAjaxPermission(project, user, Type.WRITE, ret)) {
          ajaxSetJobOverrideProperty(project, ret, req, user);
        }
      } else if (ajaxName.equals("checkForWritePermission")) {
        ajaxCheckForWritePermission(project, user, ret);
      } else if (ajaxName.equals("setFlowLock")) {
        if (handleAjaxPermission(project, user, Type.ADMIN, ret)) {
          ajaxSetFlowLock(project, ret, req);
        }
      } else if (ajaxName.equals("isFlowLocked")) {
        if (handleAjaxPermission(project, user, Type.READ, ret)) {
          ajaxIsFlowLocked(project, ret, req);
        }
      } else {
        ret.put(ERROR_PARAM, "Cannot execute command " + ajaxName);
      }
    }

    this.writeJSON(resp, ret);
  }

  private boolean handleAjaxPermission(final Project project, final User user, final Type type,
      final Map<String, Object> ret) {
    if (hasPermission(project, user, type)) {
      return true;
    }

    ret.put(ERROR_PARAM, "Permission denied. Need " + type.toString() + " access.");
    return false;
  }

  private void ajaxFetchProjectLogEvents(final Project project,
      final HttpServletRequest req, final HashMap<String, Object> ret) throws ServletException {
    final int num = this.getIntParam(req, "size", 1000);
    final int skip = this.getIntParam(req, "skip", 0);

    final List<ProjectLogEvent> logEvents;
    try {
      logEvents = this.projectManager.getProjectEventLogs(project, num, skip);
    } catch (final ProjectManagerException e) {
      throw new ServletException(e);
    }

    final String[] columns = new String[]{"user", "time", "type", "message"};
    ret.put("columns", columns);

    final List<Object[]> eventData = new ArrayList<>();
    for (final ProjectLogEvent events : logEvents) {
      final Object[] entry = new Object[4];
      entry[0] = events.getUser();
      entry[1] = events.getTime();
      entry[2] = events.getType();
      entry[3] = events.getMessage();

      eventData.add(entry);
    }

    ret.put("logData", eventData);
  }

  private List<String> getFlowJobTypes(final Flow flow) {
    final Set<String> jobTypeSet = new HashSet<>();
    for (final Node node : flow.getNodes()) {
      jobTypeSet.add(node.getType());
    }
    final List<String> jobTypes = new ArrayList<>();
    jobTypes.addAll(jobTypeSet);
    return jobTypes;
  }

  private void ajaxFetchFlowDetails(final Project project,
      final HashMap<String, Object> ret, final HttpServletRequest req)
      throws ServletException {
    final String flowName = getParam(req, "flow");

    try {
      final Flow flow = project.getFlow(flowName);
      if (flow == null) {
        ret.put(ERROR_PARAM, "Flow " + flowName + " not found.");
        return;
      }

      ret.put("jobTypes", getFlowJobTypes(flow));
      if (flow.getCondition() != null) {
        ret.put("condition", flow.getCondition());
      }
    } catch (final AccessControlException e) {
      ret.put(ERROR_PARAM, e.getMessage());
    }
  }

  private void ajaxFetchLastSuccessfulFlowExecution(final Project project,
      final HashMap<String, Object> ret, final HttpServletRequest req)
      throws ServletException {
    final String flowId = getParam(req, "flow");
    List<ExecutableFlow> exFlows = null;
    try {
      exFlows =
          this.executorManagerAdapter.getExecutableFlows(project.getId(), flowId, 0, 1,
              Status.SUCCEEDED);
    } catch (final ExecutorManagerException e) {
      ret.put(ERROR_PARAM, "Error retrieving executable flows");
      return;
    }

    if (exFlows.size() == 0) {
      ret.put("success", "false");
      ret.put("message", "This flow has no successful run.");
      return;
    }

    ret.put("success", "true");
    ret.put("message", "");
    ret.put("execId", exFlows.get(0).getExecutionId());
  }

  private void ajaxFetchFlowExecutions(final Project project,
      final HashMap<String, Object> ret, final HttpServletRequest req)
      throws ServletException {
    final String flowId = getParam(req, "flow");
    final int from = Integer.valueOf(getParam(req, "start"));
    final int length = Integer.valueOf(getParam(req, "length"));

    final ArrayList<ExecutableFlow> exFlows = new ArrayList<>();
    int total = 0;
    try {
      total =
          this.executorManagerAdapter.getExecutableFlows(project.getId(), flowId, from,
              length, exFlows);
    } catch (final ExecutorManagerException e) {
      ret.put(ERROR_PARAM, "Error retrieving executable flows");
    }

    ret.put("flow", flowId);
    ret.put("total", total);
    ret.put("from", from);
    ret.put("length", length);

    final ArrayList<Object> history = new ArrayList<>();
    for (final ExecutableFlow flow : exFlows) {
      final HashMap<String, Object> flowInfo = new HashMap<>();
      flowInfo.put("execId", flow.getExecutionId());
      flowInfo.put(FLOW_ID_PARAM, flow.getFlowId());
      flowInfo.put("projectId", flow.getProjectId());
      flowInfo.put("status", flow.getStatus().toString());
      flowInfo.put("submitTime", flow.getSubmitTime());
      flowInfo.put("startTime", flow.getStartTime());
      flowInfo.put("endTime", flow.getEndTime());
      flowInfo.put("submitUser", flow.getSubmitUser());

      history.add(flowInfo);
    }

    ret.put("executions", history);
  }

  /**
   * Download project zip file from DB and send it back client.
   *
   * This method requires a project name and an optional project version.
   */
  private void handleDownloadProject(final HttpServletRequest req,
      final HttpServletResponse resp, final Session session) throws ServletException,
      IOException {

    final User user = session.getUser();
    final String projectName = getParam(req, "project");
    logger.info(user.getUserId() + " is downloading project: " + projectName);

    final Project project = this.projectManager.getProject(projectName);
    if (project == null) {
      this.setErrorMessageInCookie(resp, "Project " + projectName
          + " doesn't exist.");
      resp.sendRedirect(req.getContextPath());
      return;
    }

    if (!hasPermission(project, user, Type.READ)) {
      this.setErrorMessageInCookie(resp, "No permission to download project " + projectName
          + ".");
      resp.sendRedirect(req.getContextPath());
      return;
    }

    int version = -1;
    if (hasParam(req, "version")) {
      version = getIntParam(req, "version");
    }

    ProjectFileHandler projectFileHandler = null;
    FileInputStream inStream = null;
    OutputStream outStream = null;
    try {
      projectFileHandler =
          this.projectManager.getProjectFileHandler(project, version);
      if (projectFileHandler == null) {
        this.setErrorMessageInCookie(resp, "Project " + projectName
            + " with version " + version + " doesn't exist");
        resp.sendRedirect(req.getContextPath());
        return;
      }
      final File projectZipFile = projectFileHandler.getLocalFile();
      final String logStr =
          String.format(
              "downloading project zip file for project \"%s\" at \"%s\""
                  + " size: %d type: %s  fileName: \"%s\"",
              projectFileHandler.getFileName(),
              projectZipFile.getAbsolutePath(), projectZipFile.length(),
              projectFileHandler.getFileType(),
              projectFileHandler.getFileName());
      logger.info(logStr);

      // now set up HTTP response for downloading file
      inStream = new FileInputStream(projectZipFile);

      resp.setContentType(APPLICATION_ZIP_MIME_TYPE);

      final String headerKey = "Content-Disposition";
      final String headerValue =
          String.format("attachment; filename=\"%s\"",
              projectFileHandler.getFileName());
      resp.setHeader(headerKey, headerValue);
      resp.setHeader("version",
          Integer.toString(projectFileHandler.getVersion()));
      resp.setHeader("projectId",
          Integer.toString(projectFileHandler.getProjectId()));

      outStream = resp.getOutputStream();

      final byte[] buffer = new byte[this.downloadBufferSize];
      int bytesRead = -1;

      while ((bytesRead = inStream.read(buffer)) != -1) {
        outStream.write(buffer, 0, bytesRead);
      }

    } catch (final Throwable e) {
      logger.error(
          "Encountered error while downloading project zip file for project: "
              + projectName + " by user: " + user.getUserId(), e);
      throw new ServletException(e);
    } finally {
      IOUtils.closeQuietly(inStream);
      IOUtils.closeQuietly(outStream);

      if (projectFileHandler != null) {
        projectFileHandler.deleteLocalFile();
      }
    }

  }

  /**
   * validate readiness of a project and user permission and use projectManager to purge the project
   * if things looks good
   **/
  private void handlePurgeProject(final HttpServletRequest req,
      final HttpServletResponse resp, final Session session) throws ServletException,
      IOException {
    final User user = session.getUser();
    final HashMap<String, Object> ret = new HashMap<>();
    boolean isOperationSuccessful = true;

    try {
      Project project = null;
      final String projectParam = getParam(req, "project");

      if (StringUtils.isNumeric(projectParam)) {
        project = this.projectManager.getProject(Integer.parseInt(projectParam)); // get
        // project
        // by
        // Id
      } else {
        project = this.projectManager.getProject(projectParam); // get project by
        // name (name cannot
        // start
        // from ints)
      }

      // invalid project
      if (project == null) {
        ret.put(ERROR_PARAM, "invalid project");
        isOperationSuccessful = false;
      }

      // project is already deleted
      if (isOperationSuccessful
          && this.projectManager.isActiveProject(project.getId())) {
        ret.put(ERROR_PARAM, "Project " + project.getName()
            + " should be deleted before purging");
        isOperationSuccessful = false;
      }

      // only eligible users can purge a project
      if (isOperationSuccessful && !hasPermission(project, user, Type.ADMIN)) {
        ret.put(ERROR_PARAM, "Cannot purge. User '" + user.getUserId()
            + "' is not an ADMIN.");
        isOperationSuccessful = false;
      }

      if (isOperationSuccessful) {
        this.projectManager.purgeProject(project, user);
      }
    } catch (final Exception e) {
      ret.put(ERROR_PARAM, e.getMessage());
      isOperationSuccessful = false;
    }

    ret.put("success", isOperationSuccessful);
    this.writeJSON(resp, ret);
  }

  private void removeAssociatedSchedules(final Project project) throws ServletException {
    // remove regular schedules
    try {
      for (final Schedule schedule : this.scheduleManager.getSchedules()) {
        if (schedule.getProjectId() == project.getId()) {
          logger.info("removing schedule " + schedule.getScheduleId());
          this.scheduleManager.removeSchedule(schedule);
        }
      }
    } catch (final ScheduleManagerException e) {
      throw new ServletException(e);
    }

    // remove flow trigger schedules
    try {
      if (this.enableQuartz) {
        this.scheduler.unschedule(project);
      }
    } catch (final SchedulerException e) {
      throw new ServletException(e);
    }
  }

  private void handleRemoveProject(final HttpServletRequest req,
      final HttpServletResponse resp, final Session session) throws ServletException,
      IOException {
    final User user = session.getUser();
    final String projectName = getParam(req, "project");

    final Project project = this.projectManager.getProject(projectName);
    if (project == null) {
      this.setErrorMessageInCookie(resp, "Project " + projectName
          + " doesn't exist.");
      resp.sendRedirect(req.getContextPath());
      return;
    }

    if (!hasPermission(project, user, Type.ADMIN)) {
      this.setErrorMessageInCookie(resp,
          "Cannot delete. User '" + user.getUserId() + "' is not an ADMIN.");
      resp.sendRedirect(req.getRequestURI() + "?project=" + projectName);
      return;
    }

    removeAssociatedSchedules(project);

    try {
      this.projectManager.removeProject(project, user);
    } catch (final ProjectManagerException e) {
      this.setErrorMessageInCookie(resp, e.getMessage());
      resp.sendRedirect(req.getRequestURI() + "?project=" + projectName);
      return;
    }

    this.setSuccessMessageInCookie(resp, "Project '" + projectName
        + "' was successfully deleted and associated schedules are removed.");
    resp.sendRedirect(req.getContextPath());
  }

  private void ajaxChangeDescription(final Project project,
      final HashMap<String, Object> ret, final HttpServletRequest req, final User user)
      throws ServletException {
    final String description = getParam(req, "description");
    project.setDescription(description);

    try {
      this.projectManager.updateProjectDescription(project, description, user);
    } catch (final ProjectManagerException e) {
      ret.put(ERROR_PARAM, e.getMessage());
    }
  }

  private void ajaxFetchJobInfo(final Project project, final HashMap<String, Object> ret,
      final HttpServletRequest req) throws ServletException {
    final String flowName = getParam(req, "flowName");
    final String jobName = getParam(req, "jobName");

    final Flow flow = project.getFlow(flowName);
    if (flow == null) {
      ret.put(ERROR_PARAM,
          "Flow " + flowName + " not found in project " + project.getName());
      return;
    }

    final Node node = flow.getNode(jobName);
    if (node == null) {
      ret.put(ERROR_PARAM, "Job " + jobName + " not found in flow " + flowName);
      return;
    }

    Props jobProp;
    try {
      jobProp = this.projectManager.getProperties(project, flow, jobName, node.getJobSource());
    } catch (final ProjectManagerException e) {
      ret.put(ERROR_PARAM, "Failed to retrieve job properties!");
      return;
    }

    if (jobProp == null) {
      jobProp = new Props();
    }

    Props overrideProp;
    try {
      overrideProp = this.projectManager
          .getJobOverrideProperty(project, flow, jobName, node.getJobSource());
    } catch (final ProjectManagerException e) {
      ret.put(ERROR_PARAM, "Failed to retrieve job override properties!");
      return;
    }

    ret.put("jobName", node.getId());
    ret.put("jobType", jobProp.get("type"));

    if (overrideProp == null) {
      overrideProp = new Props(jobProp);
    }

    final Map<String, String> generalParams = new HashMap<>();
    final Map<String, String> overrideParams = new HashMap<>();
    for (final String ps : jobProp.getKeySet()) {
      generalParams.put(ps, jobProp.getString(ps));
    }
    for (final String ops : overrideProp.getKeySet()) {
      overrideParams.put(ops, overrideProp.getString(ops));
    }
    ret.put("generalParams", generalParams);
    ret.put("overrideParams", overrideParams);
  }

  private void ajaxSetJobOverrideProperty(final Project project,
      final HashMap<String, Object> ret, final HttpServletRequest req, final User user)
      throws ServletException {
    final String flowName = getParam(req, "flowName");
    final String jobName = getParam(req, "jobName");

    final Flow flow = project.getFlow(flowName);
    if (flow == null) {
      ret.put(ERROR_PARAM,
          "Flow " + flowName + " not found in project " + project.getName());
      return;
    }

    final Node node = flow.getNode(jobName);
    if (node == null) {
      ret.put(ERROR_PARAM, "Job " + jobName + " not found in flow " + flowName);
      return;
    }

    final Map<String, String> jobParamGroup = this.getParamGroup(req, "jobOverride");
    final Props overrideParams = new Props(null, jobParamGroup);
    try {
      this.projectManager
          .setJobOverrideProperty(project, flow, overrideParams, jobName, node.getJobSource(),
              user);
    } catch (final ProjectManagerException e) {
      ret.put(ERROR_PARAM, "Failed to upload job override property");
    }

  }

  private void ajaxFetchProjectFlows(final Project project,
      final HashMap<String, Object> ret, final HttpServletRequest req)
      throws ServletException {
    final ArrayList<Map<String, Object>> flowList =
        new ArrayList<>();
    for (final Flow flow : project.getFlows()) {
      if (!flow.isEmbeddedFlow()) {
        final HashMap<String, Object> flowObj = new HashMap<>();
        flowObj.put(FLOW_ID_PARAM, flow.getId());
        flowList.add(flowObj);
      }
    }

    ret.put("flows", flowList);
  }

  private void ajaxFetchFlowGraph(final Project project, final HashMap<String, Object> ret,
      final HttpServletRequest req) throws ServletException {
    final String flowId = getParam(req, "flow");

    fillFlowInfo(project, flowId, ret);
  }

  private void fillFlowInfo(final Project project, final String flowId,
      final HashMap<String, Object> ret) {
    final Flow flow = project.getFlow(flowId);
    if (flow == null) {
      ret.put(ERROR_PARAM,
          "Flow " + flowId + " not found in project " + project.getName());
      return;
    }

    final ArrayList<Map<String, Object>> nodeList =
        new ArrayList<>();
    for (final Node node : flow.getNodes()) {
      final HashMap<String, Object> nodeObj = new HashMap<>();
      nodeObj.put("id", node.getId());
      nodeObj.put("type", node.getType());
      if (node.getCondition() != null) {
        nodeObj.put("condition", node.getCondition());
      }
      if (node.getEmbeddedFlowId() != null) {
        nodeObj.put(FLOW_ID_PARAM, node.getEmbeddedFlowId());
        fillFlowInfo(project, node.getEmbeddedFlowId(), nodeObj);
      }

      nodeList.add(nodeObj);
      final Set<Edge> inEdges = flow.getInEdges(node.getId());
      if (inEdges != null && !inEdges.isEmpty()) {
        final ArrayList<String> inEdgesList = new ArrayList<>();
        for (final Edge edge : inEdges) {
          inEdgesList.add(edge.getSourceId());
        }
        Collections.sort(inEdgesList);
        nodeObj.put("in", inEdgesList);
      }
    }

    Collections.sort(nodeList, new Comparator<Map<String, Object>>() {
      @Override
      public int compare(final Map<String, Object> o1, final Map<String, Object> o2) {
        final String id = (String) o1.get("id");
        return id.compareTo((String) o2.get("id"));
      }
    });

    ret.put("flow", flowId);
    ret.put("nodes", nodeList);
  }

  private void ajaxFetchFlowNodeData(final Project project,
      final HashMap<String, Object> ret, final HttpServletRequest req)
      throws ServletException {
    final String flowId = getParam(req, "flow");
    final Flow flow = project.getFlow(flowId);

    final String nodeId = getParam(req, "node");
    final Node node = flow.getNode(nodeId);

    if (node == null) {
      ret.put(ERROR_PARAM, "Job " + nodeId + " doesn't exist.");
      return;
    }

    ret.put("id", nodeId);
    ret.put("flow", flowId);
    ret.put("type", node.getType());

    final Props jobProps;
    try {
      jobProps = this.projectManager.getProperties(project, flow, nodeId, node.getJobSource());
    } catch (final ProjectManagerException e) {
      ret.put(ERROR_PARAM, "Failed to upload job override property for " + nodeId);
      return;
    }

    if (jobProps == null) {
      ret.put(ERROR_PARAM, "Properties for " + nodeId + " isn't found.");
      return;
    }

    final Map<String, String> properties = PropsUtils.toStringMap(jobProps, true);
    ret.put("props", properties);

    if (node.getType().equals("flow")) {
      if (node.getEmbeddedFlowId() != null) {
        fillFlowInfo(project, node.getEmbeddedFlowId(), ret);
      }
    }
  }

  private void ajaxFetchFlow(final Project project, final HashMap<String, Object> ret,
      final HttpServletRequest req) throws ServletException {
    final String flowId = getParam(req, "flow");
    final Flow flow = project.getFlow(flowId);

    final ArrayList<Node> flowNodes = new ArrayList<>(flow.getNodes());
    Collections.sort(flowNodes, NODE_LEVEL_COMPARATOR);

    final ArrayList<Object> nodeList = new ArrayList<>();
    for (final Node node : flowNodes) {
      final HashMap<String, Object> nodeObj = new HashMap<>();
      nodeObj.put("id", node.getId());

      final ArrayList<String> dependencies = new ArrayList<>();
      Collection<Edge> collection = flow.getInEdges(node.getId());
      if (collection != null) {
        for (final Edge edge : collection) {
          dependencies.add(edge.getSourceId());
        }
      }

      final ArrayList<String> dependents = new ArrayList<>();
      collection = flow.getOutEdges(node.getId());
      if (collection != null) {
        for (final Edge edge : collection) {
          dependents.add(edge.getTargetId());
        }
      }

      nodeObj.put("dependencies", dependencies);
      nodeObj.put("dependents", dependents);
      nodeObj.put("level", node.getLevel());
      nodeList.add(nodeObj);
    }

    ret.put(FLOW_ID_PARAM, flowId);
    ret.put("nodes", nodeList);
    ret.put(FLOW_IS_LOCKED_PARAM, flow.isLocked());
  }

  private void ajaxAddProxyUser(final Project project, final HashMap<String, Object> ret,
      final HttpServletRequest req, final User user) throws ServletException {
    final String name = getParam(req, "name");

    logger.info("Adding proxy user " + name + " by " + user.getUserId());
    if (this.userManager.validateProxyUser(name, user)) {
      try {
        this.projectManager.addProjectProxyUser(project, name, user);
      } catch (final ProjectManagerException e) {
        ret.put(ERROR_PARAM, e.getMessage());
      }
    } else {
      ret.put(ERROR_PARAM, "User " + user.getUserId()
          + " has no permission to add " + name + " as proxy user.");
      return;
    }
  }

  private void ajaxRemoveProxyUser(final Project project,
      final HashMap<String, Object> ret, final HttpServletRequest req, final User user)
      throws ServletException {
    final String name = getParam(req, "name");

    logger.info("Removing proxy user " + name + " by " + user.getUserId());

    try {
      this.projectManager.removeProjectProxyUser(project, name, user);
    } catch (final ProjectManagerException e) {
      ret.put(ERROR_PARAM, e.getMessage());
    }
  }

  private void ajaxAddPermission(final Project project, final HashMap<String, Object> ret,
      final HttpServletRequest req, final User user) throws ServletException {
    final String name = getParam(req, "name");
    final boolean group = Boolean.parseBoolean(getParam(req, "group"));

    if (group) {
      if (project.getGroupPermission(name) != null) {
        ret.put(ERROR_PARAM, "Group permission already exists.");
        return;
      }
      if (!this.userManager.validateGroup(name)) {
        ret.put(ERROR_PARAM, "Group is invalid.");
        return;
      }
    } else {
      if (project.getUserPermission(name) != null) {
        ret.put(ERROR_PARAM, "User permission already exists.");
        return;
      }
      if (!this.userManager.validateUser(name)) {
        ret.put(ERROR_PARAM, "User is invalid.");
        return;
      }
    }

    final boolean admin = Boolean.parseBoolean(getParam(req, "permissions[admin]"));
    final boolean read = Boolean.parseBoolean(getParam(req, "permissions[read]"));
    final boolean write = Boolean.parseBoolean(getParam(req, "permissions[write]"));
    final boolean execute =
        Boolean.parseBoolean(getParam(req, "permissions[execute]"));
    final boolean schedule =
        Boolean.parseBoolean(getParam(req, "permissions[schedule]"));

    final Permission perm = new Permission();
    if (admin) {
      perm.setPermission(Type.ADMIN, true);
    } else {
      perm.setPermission(Type.READ, read);
      perm.setPermission(Type.WRITE, write);
      perm.setPermission(Type.EXECUTE, execute);
      perm.setPermission(Type.SCHEDULE, schedule);
    }

    try {
      this.projectManager.updateProjectPermission(project, name, perm, group, user);
    } catch (final ProjectManagerException e) {
      ret.put(ERROR_PARAM, e.getMessage());
    }
  }

  private void ajaxChangePermissions(final Project project,
      final HashMap<String, Object> ret, final HttpServletRequest req, final User user)
      throws ServletException {
    final boolean admin = Boolean.parseBoolean(getParam(req, "permissions[admin]"));
    final boolean read = Boolean.parseBoolean(getParam(req, "permissions[read]"));
    final boolean write = Boolean.parseBoolean(getParam(req, "permissions[write]"));
    final boolean execute =
        Boolean.parseBoolean(getParam(req, "permissions[execute]"));
    final boolean schedule =
        Boolean.parseBoolean(getParam(req, "permissions[schedule]"));

    final boolean group = Boolean.parseBoolean(getParam(req, "group"));

    final String name = getParam(req, "name");
    final Permission perm;
    if (group) {
      perm = project.getGroupPermission(name);
    } else {
      perm = project.getUserPermission(name);
    }

    if (perm == null) {
      ret.put(ERROR_PARAM, "Permissions for " + name + " cannot be found.");
      return;
    }

    if (admin || read || write || execute || schedule) {
      if (admin) {
        perm.setPermission(Type.ADMIN, true);
        perm.setPermission(Type.READ, false);
        perm.setPermission(Type.WRITE, false);
        perm.setPermission(Type.EXECUTE, false);
        perm.setPermission(Type.SCHEDULE, false);
      } else {
        perm.setPermission(Type.ADMIN, false);
        perm.setPermission(Type.READ, read);
        perm.setPermission(Type.WRITE, write);
        perm.setPermission(Type.EXECUTE, execute);
        perm.setPermission(Type.SCHEDULE, schedule);
      }

      try {
        this.projectManager
            .updateProjectPermission(project, name, perm, group, user);
      } catch (final ProjectManagerException e) {
        ret.put(ERROR_PARAM, e.getMessage());
      }
    } else {
      try {
        this.projectManager.removeProjectPermission(project, name, group, user);
      } catch (final ProjectManagerException e) {
        ret.put(ERROR_PARAM, e.getMessage());
      }
    }
  }

  /**
   * this only returns user permissions, but not group permissions and proxy users
   */
  private void ajaxGetPermissions(final Project project, final HashMap<String, Object> ret) {
    final ArrayList<HashMap<String, Object>> permissions =
        new ArrayList<>();
    for (final Pair<String, Permission> perm : project.getUserPermissions()) {
      final HashMap<String, Object> permObj = new HashMap<>();
      final String userId = perm.getFirst();
      permObj.put("username", userId);
      permObj.put("permission", perm.getSecond().toStringArray());

      permissions.add(permObj);
    }

    ret.put("permissions", permissions);
  }

  private void ajaxGetGroupPermissions(final Project project,
      final HashMap<String, Object> ret) {
    final ArrayList<HashMap<String, Object>> permissions =
        new ArrayList<>();
    for (final Pair<String, Permission> perm : project.getGroupPermissions()) {
      final HashMap<String, Object> permObj = new HashMap<>();
      final String userId = perm.getFirst();
      permObj.put("username", userId);
      permObj.put("permission", perm.getSecond().toStringArray());

      permissions.add(permObj);
    }

    ret.put("permissions", permissions);
  }

  private void ajaxGetProxyUsers(final Project project, final HashMap<String, Object> ret) {
    final String[] proxyUsers = project.getProxyUsers().toArray(new String[0]);
    ret.put("proxyUsers", proxyUsers);
  }

  private void ajaxCheckForWritePermission(final Project project, final User user,
      final HashMap<String, Object> ret) {
    ret.put("hasWritePermission", hasPermission(project, user, Type.WRITE));
  }

  /**
   * Set if a flow is locked.
   *
   * @param project the project for the flow.
   * @param ret the return value.
   * @param req the http request.
   */
  private void ajaxSetFlowLock(final Project project,
      final HashMap<String, Object> ret, final HttpServletRequest req)
      throws ServletException {
    final String flowName = getParam(req, FLOW_NAME_PARAM);
    final Flow flow = project.getFlow(flowName);
    if (flow == null) {
      ret.put(ERROR_PARAM,
          "Flow " + flowName + " not found in project " + project.getName());
      return;
    }

    final boolean isLocked = Boolean.parseBoolean(getParam(req, FLOW_IS_LOCKED_PARAM));

    // if there is a change in the locked value, then check to see if the project has a flow trigger
    // that needs to be paused/resumed.
    if (isLocked != flow.isLocked()) {
      try {
        if (this.projectManager.hasFlowTrigger(project, flow)) {
          if (isLocked) {
            if (this.scheduler.pauseFlowTriggerIfPresent(project.getId(), flow.getId())) {
              logger.info("Flow trigger for flow " + project.getName() + "." + flow.getId() +
                  " is paused");
            } else {
              logger.warn("Flow trigger for flow " + project.getName() + "." + flow.getId() +
                  " doesn't exist");
            }
          } else {
            if (this.scheduler.resumeFlowTriggerIfPresent(project.getId(), flow.getId())) {
              logger.info("Flow trigger for flow " + project.getName() + "." + flow.getId() +
                  " is resumed");
            } else {
              logger.warn("Flow trigger for flow " + project.getName() + "." + flow.getId() +
                  " doesn't exist");
            }
          }
        }
      } catch (final Exception e) {
        ret.put(ERROR_PARAM, e);
      }
    }

    flow.setLocked(isLocked);
    ret.put(FLOW_IS_LOCKED_PARAM, flow.isLocked());
    ret.put(FLOW_ID_PARAM, flow.getId());
    this.projectManager.updateFlow(project, flow);
  }

  /**
   * Returns true if the flow is locked, false if it is unlocked.
   *
   * @param project the project containing the flow.
   * @param ret the return value.
   * @param req the http request.
   */
  private void ajaxIsFlowLocked(final Project project,
      final HashMap<String, Object> ret, final HttpServletRequest req)
      throws ServletException {
    final String flowName = getParam(req, FLOW_NAME_PARAM);

    final Flow flow = project.getFlow(flowName);
    if (flow == null) {
      ret.put(ERROR_PARAM,
          "Flow " + flowName + " not found in project " + project.getName());
      return;
    }

    ret.put(FLOW_ID_PARAM, flow.getId());
    ret.put(FLOW_IS_LOCKED_PARAM, flow.isLocked());
  }


  private void handleProjectLogsPage(final HttpServletRequest req,
      final HttpServletResponse resp, final Session session) throws ServletException,
      IOException {
    final Page page =
        newPage(req, resp, session,
            "azkaban/webapp/servlet/velocity/projectlogpage.vm");
    final String projectName = getParam(req, "project");

    final User user = session.getUser();
    PageUtils
        .hideUploadButtonWhenNeeded(page, session, this.userManager, this.lockdownUploadProjects);
    Project project = null;
    try {
      project = this.projectManager.getProject(projectName);
      if (project == null) {
        page.add("errorMsg", "Project " + projectName + " doesn't exist.");
      } else {
        if (!hasPermission(project, user, Type.READ)) {
          throw new AccessControlException("No permission to view project "
              + projectName + ".");
        }

        page.add("project", project);
        page.add("admins", Utils.flattenToString(
            project.getUsersWithPermission(Type.ADMIN), ","));
        final Permission perm = this.getPermissionObject(project, user, Type.ADMIN);
        page.add("userpermission", perm);

        final boolean adminPerm = perm.isPermissionSet(Type.ADMIN);
        if (adminPerm) {
          page.add("admin", true);
        }
        // Set this so we can display execute buttons only to those who have
        // access.
        if (perm.isPermissionSet(Type.EXECUTE) || adminPerm) {
          page.add("exec", true);
        } else {
          page.add("exec", false);
        }
      }
    } catch (final AccessControlException e) {
      page.add("errorMsg", e.getMessage());
    }

    final int numBytes = 1024;

    // Really sucks if we do a lot of these because it'll eat up memory fast.
    // But it's expected that this won't be a heavily used thing. If it is,
    // then we'll revisit it to make it more stream friendly.
    final StringBuffer buffer = new StringBuffer(numBytes);
    page.add("log", buffer.toString());

    page.render();
  }

  private void handleJobHistoryPage(final HttpServletRequest req,
      final HttpServletResponse resp, final Session session) throws ServletException,
      IOException {
    final Page page =
        newPage(req, resp, session,
            "azkaban/webapp/servlet/velocity/jobhistorypage.vm");

    final String jobId = getParam(req, "job");
    page.add("jobId", jobId);

    int pageNum = Math.max(1, getIntParam(req, "page", 1));
    page.add("page", pageNum);

    final int pageSize = Math.max(1, getIntParam(req, "size", 25));
    page.add("pageSize", pageSize);

    page.add("recordCount", 0);
    page.add("projectId", "");
    page.add("projectName", "");
    page.add("dataSeries", "[]");
    page.add("history", null);

    final String projectName = getParam(req, "project");
    final User user = session.getUser();

    final Project project = this.projectManager.getProject(projectName);
    if (project == null) {
      page.add("errorMsg", "Project " + projectName + " doesn't exist.");
      page.render();
      return;
    }
    if (!hasPermission(project, user, Type.READ)) {
      page.add("errorMsg", "No permission to view project " + projectName + ".");
      page.render();
      return;
    }

    page.add("projectId", project.getId());
    page.add("projectName", project.getName());

    try {
      final int numResults = this.executorManagerAdapter.getNumberOfJobExecutions(project, jobId);
      page.add("recordCount", numResults);

      final int totalPages = ((numResults - 1) / pageSize) + 1;
      if (pageNum > totalPages) {
        pageNum = totalPages;
        page.add("page", pageNum);
      }
      final int elementsToSkip = (pageNum - 1) * pageSize;
      final List<ExecutableJobInfo> jobInfo =
          this.executorManagerAdapter.getExecutableJobs(project, jobId, elementsToSkip, pageSize);

      if (CollectionUtils.isNotEmpty(jobInfo)) {
        page.add("history", jobInfo);

        final ArrayList<Object> dataSeries = new ArrayList<>();
        for (final ExecutableJobInfo info : jobInfo) {
          final Map<String, Object> map = info.toObject();
          dataSeries.add(map);
        }
        page.add("dataSeries", JSONUtils.toJSON(dataSeries));
      }
    } catch (final ExecutorManagerException e) {
      page.add("errorMsg", e.getMessage());
    }

    page.render();
  }

  private void handlePermissionPage(final HttpServletRequest req,
      final HttpServletResponse resp, final Session session) throws ServletException {
    final Page page =
        newPage(req, resp, session,
            "azkaban/webapp/servlet/velocity/permissionspage.vm");
    final String projectName = getParam(req, "project");
    final User user = session.getUser();
    PageUtils
        .hideUploadButtonWhenNeeded(page, session, this.userManager, this.lockdownUploadProjects);
    Project project = null;
    try {
      project = this.projectManager.getProject(projectName);
      if (project == null) {
        page.add("errorMsg", "Project " + projectName + " not found.");
      } else {
        if (!hasPermission(project, user, Type.READ)) {
          throw new AccessControlException("No permission to view project "
              + projectName + ".");
        }

        page.add("project", project);
        page.add("username", user.getUserId());
        page.add("admins", Utils.flattenToString(
            project.getUsersWithPermission(Type.ADMIN), ","));
        final Permission perm = this.getPermissionObject(project, user, Type.ADMIN);
        page.add("userpermission", perm);

        if (perm.isPermissionSet(Type.ADMIN)) {
          page.add("admin", true);
        }

        final List<Pair<String, Permission>> userPermission =
            project.getUserPermissions();
        if (userPermission != null && !userPermission.isEmpty()) {
          page.add("permissions", userPermission);
        }

        final List<Pair<String, Permission>> groupPermission =
            project.getGroupPermissions();
        if (groupPermission != null && !groupPermission.isEmpty()) {
          page.add("groupPermissions", groupPermission);
        }

        final Set<String> proxyUsers = project.getProxyUsers();
        if (proxyUsers != null && !proxyUsers.isEmpty()) {
          page.add("proxyUsers", proxyUsers);
        }

        if (hasPermission(project, user, Type.ADMIN)) {
          page.add("isAdmin", true);
        }
      }
    } catch (final AccessControlException e) {
      page.add("errorMsg", e.getMessage());
    }

    page.render();
  }

  private void handleJobPage(final HttpServletRequest req, final HttpServletResponse resp,
      final Session session) throws ServletException {
    final Page page =
        newPage(req, resp, session,
            "azkaban/webapp/servlet/velocity/jobpage.vm");
    final String projectName = getParam(req, "project");
    final String flowName = getParam(req, "flow");
    final String jobName = getParam(req, "job");

    final User user = session.getUser();
    Project project = null;
    Flow flow = null;
    try {
      project = this.projectManager.getProject(projectName);
      logger.info("JobPage: project " + projectName + " version is " + project.getVersion()
          + ", reference is " + System.identityHashCode(project));
      if (project == null) {
        page.add("errorMsg", "Project " + projectName + " not found.");
        page.render();
        return;
      }
      if (!hasPermission(project, user, Type.READ)) {
        throw new AccessControlException("No permission to view project "
            + projectName + ".");
      }

      page.add("project", project);
      flow = project.getFlow(flowName);
      if (flow == null) {
        page.add("errorMsg", "Flow " + flowName + " not found.");
        page.render();
        return;
      }

      page.add("flowid", flow.getId());
      final Node node = flow.getNode(jobName);
      if (node == null) {
        page.add("errorMsg", "Job " + jobName + " not found.");
        page.render();
        return;
      }

      Props jobProp = this.projectManager
          .getJobOverrideProperty(project, flow, jobName, node.getJobSource());
      if (jobProp == null) {
        jobProp = this.projectManager.getProperties(project, flow, jobName, node.getJobSource());
      }

      page.add("jobid", node.getId());
      page.add("jobtype", node.getType());
      if (node.getCondition() != null) {
        page.add("condition", node.getCondition());
      }

      final ArrayList<String> dependencies = new ArrayList<>();
      final Set<Edge> inEdges = flow.getInEdges(node.getId());
      if (inEdges != null) {
        for (final Edge dependency : inEdges) {
          dependencies.add(dependency.getSourceId());
        }
      }
      if (!dependencies.isEmpty()) {
        page.add("dependencies", dependencies);
      }

      final ArrayList<String> dependents = new ArrayList<>();
      final Set<Edge> outEdges = flow.getOutEdges(node.getId());
      if (outEdges != null) {
        for (final Edge dependent : outEdges) {
          dependents.add(dependent.getTargetId());
        }
      }
      if (!dependents.isEmpty()) {
        page.add("dependents", dependents);
      }

      // Resolve property dependencies
      final ArrayList<String> source = new ArrayList<>();
      final String nodeSource = node.getPropsSource();
      if (nodeSource != null) {
        source.add(nodeSource);
        FlowProps parent = flow.getFlowProps(nodeSource);
        while (parent.getInheritedSource() != null) {
          source.add(parent.getInheritedSource());
          parent = flow.getFlowProps(parent.getInheritedSource());
        }
      }
      if (!source.isEmpty()) {
        page.add("properties", source);
      }

      final ArrayList<Pair<String, String>> parameters =
          new ArrayList<>();
      // Parameter
      for (final String key : jobProp.getKeySet()) {
        final String value = jobProp.get(key);
        parameters.add(new Pair<>(key, value));
      }

      page.add("parameters", parameters);
    } catch (final AccessControlException e) {
      page.add("errorMsg", e.getMessage());
    } catch (final ProjectManagerException e) {
      page.add("errorMsg", e.getMessage());
    }
    page.render();
  }

  private void handlePropertyPage(final HttpServletRequest req,
      final HttpServletResponse resp, final Session session) throws ServletException {
    final Page page =
        newPage(req, resp, session,
            "azkaban/webapp/servlet/velocity/propertypage.vm");
    final String projectName = getParam(req, "project");
    final String flowName = getParam(req, "flow");
    final String jobName = getParam(req, "job");
    final String propSource = getParam(req, "prop");

    final User user = session.getUser();
    Project project = null;
    Flow flow = null;
    try {
      project = this.projectManager.getProject(projectName);
      if (project == null) {
        page.add("errorMsg", "Project " + projectName + " not found.");
        logger.info("Display project property. Project " + projectName + " not found.");
        page.render();
        return;
      }

      if (!hasPermission(project, user, Type.READ)) {
        throw new AccessControlException("No permission to view project "
            + projectName + ".");
      }
      page.add("project", project);

      flow = project.getFlow(flowName);
      if (flow == null) {
        page.add("errorMsg", "Flow " + flowName + " not found.");
        logger.info("Display project property. Project " + projectName +
            " Flow " + flowName + " not found.");
        page.render();
        return;
      }

      page.add("flowid", flow.getId());
      final Node node = flow.getNode(jobName);
      if (node == null) {
        page.add("errorMsg", "Job " + jobName + " not found.");
        logger.info("Display project property. Project " + projectName +
            " Flow " + flowName + " Job " + jobName + " not found.");
        page.render();
        return;
      }

      final Props prop = this.projectManager.getProperties(project, flow, null, propSource);
      if (prop == null) {
        page.add("errorMsg", "Property " + propSource + " not found.");
        logger.info("Display project property. Project " + projectName +
            " Flow " + flowName + " Job " + jobName +
            " Property " + propSource + " not found.");
        page.render();
        return;

      }
      page.add("property", propSource);
      page.add("jobid", node.getId());

      // Resolve property dependencies
      final ArrayList<String> inheritProps = new ArrayList<>();
      FlowProps parent = flow.getFlowProps(propSource);
      while (parent.getInheritedSource() != null) {
        inheritProps.add(parent.getInheritedSource());
        parent = flow.getFlowProps(parent.getInheritedSource());
      }
      if (!inheritProps.isEmpty()) {
        page.add("inheritedproperties", inheritProps);
      }

      final ArrayList<String> dependingProps = new ArrayList<>();
      FlowProps child =
          flow.getFlowProps(flow.getNode(jobName).getPropsSource());
      while (!child.getSource().equals(propSource)) {
        dependingProps.add(child.getSource());
        child = flow.getFlowProps(child.getInheritedSource());
      }
      if (!dependingProps.isEmpty()) {
        page.add("dependingproperties", dependingProps);
      }

      final ArrayList<Pair<String, String>> parameters =
          new ArrayList<>();
      // Parameter
      for (final String key : prop.getKeySet()) {
        final String value = prop.get(key);
        parameters.add(new Pair<>(key, value));
      }

      page.add("parameters", parameters);
    } catch (final AccessControlException e) {
      page.add("errorMsg", e.getMessage());
    } catch (final ProjectManagerException e) {
      page.add("errorMsg", e.getMessage());
    }

    page.render();
  }

  private void handleFlowPage(final HttpServletRequest req, final HttpServletResponse resp,
      final Session session) throws ServletException {
    final Page page =
        newPage(req, resp, session,
            "azkaban/webapp/servlet/velocity/flowpage.vm");
    final String projectName = getParam(req, "project");
    final String flowName = getParam(req, "flow");

    final User user = session.getUser();
    Project project = null;
    Flow flow = null;
    try {
      project = this.projectManager.getProject(projectName);

      if (project == null) {
        page.add("errorMsg", "Project " + projectName + " not found.");
        page.render();
        return;
      }

      if (!hasPermission(project, user, Type.READ)) {
        throw new AccessControlException("No permission Project " + projectName
            + ".");
      }

      page.add("project", project);
      flow = project.getFlow(flowName);
      if (flow == null) {
        page.add("errorMsg", "Flow " + flowName + " not found.");
      } else {
        page.add("flowid", flow.getId());
        page.add("isLocked", flow.isLocked());
        if (flow.isLocked()) {
          final Props props = this.projectManager.getProps();
          final String lockedFlowMsg = String.format(props.getString(ConfigurationKeys
                  .AZKABAN_LOCKED_FLOW_ERROR_MESSAGE, Constants.DEFAULT_LOCKED_FLOW_ERROR_MESSAGE),
              flow.getId(), projectName);
          page.add("error_message", lockedFlowMsg);
        }
      }
    } catch (final AccessControlException e) {
      page.add("errorMsg", e.getMessage());
    }

    page.render();
  }

  private void handleProjectPage(final HttpServletRequest req,
      final HttpServletResponse resp, final Session session) throws ServletException {
    final Page page =
        newPage(req, resp, session,
            "azkaban/webapp/servlet/velocity/projectpage.vm");
    final String projectName = getParam(req, "project");

    final User user = session.getUser();
    PageUtils
        .hideUploadButtonWhenNeeded(page, session, this.userManager, this.lockdownUploadProjects);
    Project project = null;
    try {
      project = this.projectManager.getProject(projectName);
      if (project == null) {
        page.add("errorMsg", "Project " + projectName + " not found.");
      } else {
        if (!hasPermission(project, user, Type.READ)) {
          throw new AccessControlException("No permission to view project "
              + projectName + ".");
        }

        page.add("project", project);
        page.add("admins", Utils.flattenToString(
            project.getUsersWithPermission(Type.ADMIN), ","));
        final Permission perm = this.getPermissionObject(project, user, Type.ADMIN);
        page.add("userpermission", perm);
        page.add(
            "validatorFixPrompt",
            this.projectManager.getProps().getBoolean(
                ValidatorConfigs.VALIDATOR_AUTO_FIX_PROMPT_FLAG_PARAM,
                ValidatorConfigs.DEFAULT_VALIDATOR_AUTO_FIX_PROMPT_FLAG));
        page.add(
            "validatorFixLabel",
            this.projectManager.getProps().get(
                ValidatorConfigs.VALIDATOR_AUTO_FIX_PROMPT_LABEL_PARAM));
        page.add(
            "validatorFixLink",
            this.projectManager.getProps().get(
                ValidatorConfigs.VALIDATOR_AUTO_FIX_PROMPT_LINK_PARAM));

        final boolean adminPerm = perm.isPermissionSet(Type.ADMIN);
        if (adminPerm) {
          page.add("admin", true);
        }
        // Set this so we can display execute buttons only to those who have
        // access.
        if (perm.isPermissionSet(Type.EXECUTE) || adminPerm) {
          page.add("exec", true);
        } else {
          page.add("exec", false);
        }

        final List<Flow> flows = project.getFlows().stream().filter(flow -> !flow.isEmbeddedFlow())
            .collect(Collectors.toList());

        if (!flows.isEmpty()) {
          Collections.sort(flows, FLOW_ID_COMPARATOR);
          page.add("flows", flows);
        }
      }
    } catch (final AccessControlException e) {
      page.add("errorMsg", e.getMessage());
    }
    page.render();
  }

  private void handleCreate(final HttpServletRequest req, final HttpServletResponse resp,
      final Session session) throws ServletException {
    final String projectName = hasParam(req, "name") ? getParam(req, "name") : null;
    final String projectDescription =
        hasParam(req, "description") ? getParam(req, "description") : null;
    logger.info("Create project " + projectName);

    final User user = session.getUser();

    String status = null;
    String action = null;
    String message = null;
    HashMap<String, Object> params = null;

    if (this.lockdownCreateProjects &&
        !UserUtils.hasPermissionforAction(this.userManager, user, Type.CREATEPROJECTS)) {
      message =
          "User " + user.getUserId()
              + " doesn't have permission to create projects.";
      logger.info(message);
      status = ERROR_PARAM;
    } else {
      try {
        this.projectManager.createProject(projectName, projectDescription, user);
        status = "success";
        action = "redirect";
        final String redirect = "manager?project=" + projectName;
        params = new HashMap<>();
        params.put("path", redirect);
      } catch (final ProjectManagerException e) {
        message = e.getMessage();
        status = ERROR_PARAM;
      }
    }
    final String response = AbstractAzkabanServlet
        .createJsonResponse(status, message, action, params);
    try {
      final Writer write = resp.getWriter();
      write.append(response);
      write.flush();
    } catch (final IOException e) {
      e.printStackTrace();
    }
  }

  private void registerError(final Map<String, String> ret, final String error,
      final HttpServletResponse resp, final int returnCode) {
    ret.put(ERROR_PARAM, error);
    resp.setStatus(returnCode);
  }

  private void ajaxHandleUpload(final HttpServletRequest req, final HttpServletResponse resp,
      final Map<String, String> ret, final Map<String, Object> multipart, final Session session)
      throws ServletException, IOException {
    final User user = session.getUser();
    final String projectName = (String) multipart.get("project");
    final Project project = validateUploadAndGetProject(resp, ret, user, projectName);
    if (project == null) {
      return;
    }

    final FileItem item = (FileItem) multipart.get("file");
    final String name = item.getName();
    String type = null;

    final String contentType = item.getContentType();
    if (contentType != null && (contentType.startsWith(APPLICATION_ZIP_MIME_TYPE) ||
        contentType.startsWith("application/x-zip-compressed") ||
        contentType.startsWith("application/octet-stream"))) {
      type = "zip";
    } else {
      item.delete();
      registerError(ret, "File type " + contentType + " unrecognized.", resp,
          HttpServletResponse.SC_BAD_REQUEST);
      return;
    }

    final String autoFix = (String) multipart.get("fix");

    final Props props = new Props();
    if (autoFix != null && autoFix.equals("off")) {
      props.put(ValidatorConfigs.CUSTOM_AUTO_FIX_FLAG_PARAM, "false");
    } else {
      props.put(ValidatorConfigs.CUSTOM_AUTO_FIX_FLAG_PARAM, "true");
    }

    ret.put("projectId", String.valueOf(project.getId()));

    final File tempDir = Utils.createTempDir();
    OutputStream out = null;
    try {
      logger.info("Uploading file " + name);
      final File archiveFile = new File(tempDir, name);
      out = new BufferedOutputStream(new FileOutputStream(archiveFile));
      IOUtils.copy(item.getInputStream(), out);
      out.close();

      if (this.enableQuartz) {
        //todo chengren311: should maintain atomicity,
        // e.g, if uploadProject fails, associated schedule shouldn't be added.
        this.scheduler.unschedule(project);
      }

      // get the locked flows for the project, so that they can be locked again after upload
      final List<String> lockedFlows = getLockedFlows(project);

      final Map<String, ValidationReport> reports = this.projectManager
          .uploadProject(project, archiveFile, type, user, props);

      if (this.enableQuartz) {
        this.scheduler.schedule(project, user.getUserId());
      }

      // reset locks for flows as needed
      lockFlowsForProject(project, lockedFlows);

      // remove schedule of renamed/deleted flows
      removeScheduleOfDeletedFlows(project, this.scheduleManager, (schedule) -> {
        logger.info(
            "Removed schedule with id {} of renamed/deleted flow: {} from project: {}.",
            schedule.getScheduleId(), schedule.getFlowName(), schedule.getProjectName());
        this.projectManager.postProjectEvent(project, EventType.SCHEDULE, "azkaban",
            "Schedule " + schedule.toString() + " has been removed.");
      });

      registerErrorsAndWarningsFromValidationReport(resp, ret, reports);
    } catch (final Exception e) {
      logger.info("Installation Failed.", e);
      String error = e.getMessage();
      if (error.length() > 512) {
        error = error.substring(0, 512) + "<br>Too many errors to display.<br>";
      }
      registerError(ret, "Installation Failed.<br>" + error, resp,
          HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
    } finally {
      if (out != null) {
        out.close();
      }
      if (tempDir.exists()) {
        FileUtils.deleteDirectory(tempDir);
      }
    }

    logger.info("Upload: project " + projectName + " version is " + project.getVersion()
        + ", reference is " + System.identityHashCode(project));
    ret.put("version", String.valueOf(project.getVersion()));
  }

  /**
   * @return project. Null if invalid upload params or not enough permissions to proceed.
   */
  private Project validateUploadAndGetProject(final HttpServletResponse resp,
      final Map<String, String> ret, final User user, final String projectName) {
    if (projectName == null || projectName.isEmpty()) {
      registerError(ret, "No project name found.", resp, HttpServletResponse.SC_BAD_REQUEST);
      return null;
    }
    final Project project = this.projectManager.getProject(projectName);
    if (project == null || !project.isActive()) {
      final String failureCause = (project == null) ? "doesn't exist." : "was already removed.";
      registerError(ret, "Installation Failed. Project '" + projectName + " "
          + failureCause, resp, HttpServletResponse.SC_GONE);
      return null;
    }

    logger.info(
        "Upload: reference of project " + projectName + " is " + System.identityHashCode(project));

    if (this.lockdownUploadProjects && !UserUtils
        .hasPermissionforAction(this.userManager, user, Type.UPLOADPROJECTS)) {
      final String message =
          "Project uploading is locked out. Only admin users and users with special permissions can upload projects. "
              + "User " + user.getUserId() + " doesn't have permission to upload project.";
      logger.info(message);
      registerError(ret, message, resp, HttpServletResponse.SC_FORBIDDEN);
      return null;
    }
    if (!hasPermission(project, user, Type.WRITE)) {
      registerError(ret,
          "Installation Failed. User '" + user.getUserId() + "' does not have write access.",
          resp, HttpServletResponse.SC_BAD_REQUEST);
      return null;
    }
    return project;
  }

  /**
   * Remove schedule of renamed/deleted flows
   *
   * @param project project from which old flows will be unscheduled
   * @param scheduleManager the schedule manager
   * @param onDeletedSchedule a callback function to execute with every deleted schedule
   */
  static void removeScheduleOfDeletedFlows(final Project project,
      final ScheduleManager scheduleManager, final Consumer<Schedule> onDeletedSchedule)
      throws ScheduleManagerException {
    final Set<String> flowNameList = project.getFlows().stream().map(f -> f.getId()).collect(
        Collectors.toSet());

    for (final Schedule schedule : scheduleManager.getSchedules()) {
      if (schedule.getProjectId() == project.getId() &&
          !flowNameList.contains(schedule.getFlowName())) {
        scheduleManager.removeSchedule(schedule);
        onDeletedSchedule.accept(schedule);
      }
    }
  }

  private void registerErrorsAndWarningsFromValidationReport(final HttpServletResponse resp,
      final Map<String, String> ret, final Map<String, ValidationReport> reports) {
    final StringBuffer errorMsgs = new StringBuffer();
    final StringBuffer warnMsgs = new StringBuffer();
    for (final Entry<String, ValidationReport> reportEntry : reports.entrySet()) {
      final ValidationReport report = reportEntry.getValue();
      for (final String msg : report.getInfoMsgs()) {
        switch (ValidationReport.getInfoMsgLevel(msg)) {
          case ERROR:
            errorMsgs.append(ValidationReport.getInfoMsg(msg) + "<br/>");
            break;
          case WARN:
            warnMsgs.append(ValidationReport.getInfoMsg(msg) + "<br/>");
            break;
          default:
            break;
        }
      }
      if (!report.getErrorMsgs().isEmpty()) {
        errorMsgs.append("Validator " + reportEntry.getKey() + " reports errors:<ul>");
        for (final String msg : report.getErrorMsgs()) {
          errorMsgs.append("<li>" + msg + "</li>");
        }
        errorMsgs.append("</ul>");
      }
      if (!report.getWarningMsgs().isEmpty()) {
        warnMsgs.append("Validator " + reportEntry.getKey() + " reports warnings:<ul>");
        for (final String msg : report.getWarningMsgs()) {
          warnMsgs.append("<li>" + msg + "</li>");
        }
        warnMsgs.append("</ul>");
      }
    }
    if (errorMsgs.length() > 0) {
      // If putting more than 4000 characters in the cookie, the entire message will somehow
      // get discarded.
      registerError(ret,
          errorMsgs.length() > 4000 ? errorMsgs.substring(0, 4000) : errorMsgs.toString(), resp,
          HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
    }
    if (warnMsgs.length() > 0) {
      ret.put("warn", warnMsgs.length() > 4000 ? warnMsgs.substring(0, 4000) : warnMsgs.toString());
    }
  }

  /**
   * @return the list of locked flows for the specified project.
   */
  private List<String> getLockedFlows(final Project project) {
    final List<Flow> flows = project.getFlows();
    return flows.stream().filter(flow -> flow.isLocked()).map(flow -> flow.getId())
        .collect(Collectors.toList());
  }

  /**
   * Lock the specified flows for the project.
   *
   * @param project the project
   * @param lockedFlows list of flow IDs of flows to lock
   */
  private void lockFlowsForProject(final Project project, final List<String> lockedFlows) {
    for (final String flowId : lockedFlows) {
      final Flow flow = project.getFlow(flowId);
      if (flow != null) {
        flow.setLocked(true);
      }
    }
  }

  private void handleUpload(final HttpServletRequest req, final HttpServletResponse resp,
      final Map<String, Object> multipart, final Session session) throws ServletException,
      IOException {
    final HashMap<String, String> ret = new HashMap<>();
    final String projectName = (String) multipart.get("project");
    ajaxHandleUpload(req, resp, ret, multipart, session);

    if (ret.containsKey(ERROR_PARAM)) {
      setErrorMessageInCookie(resp, ret.get(ERROR_PARAM));
    }

    if (ret.containsKey("warn")) {
      setWarnMessageInCookie(resp, ret.get("warn"));
    }

    resp.sendRedirect(req.getRequestURI() + "?project=" + projectName);
  }

  private Permission getPermissionObject(final Project project, final User user,
      final Permission.Type type) {
    final Permission perm = project.getCollectivePermission(user);

    for (final String roleName : user.getRoles()) {
      final Role role = this.userManager.getRole(roleName);
      perm.addPermissions(role.getPermission());
    }

    return perm;
  }

  private void handleReloadProjectWhitelist(final HttpServletRequest req,
      final HttpServletResponse resp, final Session session) throws IOException {
    final HashMap<String, Object> ret = new HashMap<>();

    if (hasPermission(session.getUser(), Permission.Type.ADMIN)) {
      try {
        if (this.projectManager.loadProjectWhiteList()) {
          ret.put("success", "Project whitelist re-loaded!");
        } else {
          ret.put(ERROR_PARAM, "azkaban.properties doesn't contain property "
              + ProjectWhitelist.XML_FILE_PARAM);
        }
      } catch (final Exception e) {
        ret.put(ERROR_PARAM,
            "Exception occurred while trying to re-load project whitelist: "
                + e);
      }
    } else {
      ret.put(ERROR_PARAM, "Provided session doesn't have admin privilege.");
    }

    this.writeJSON(resp, ret);
  }

  protected boolean hasPermission(final User user, final Permission.Type type) {
    for (final String roleName : user.getRoles()) {
      final Role role = this.userManager.getRole(roleName);
      if (role.getPermission().isPermissionSet(type)
          || role.getPermission().isPermissionSet(Permission.Type.ADMIN)) {
        return true;
      }
    }

    return false;
  }

  private static class NodeLevelComparator implements Comparator<Node> {

    @Override
    public int compare(final Node node1, final Node node2) {
      return node1.getLevel() - node2.getLevel();
    }
  }

  public static class PageSelection {

    private final String page;
    private final int size;
    private final boolean disabled;
    private final int nextPage;
    private boolean selected;

    public PageSelection(final String pageName, final int size, final boolean disabled,
        final boolean selected, final int nextPage) {
      this.page = pageName;
      this.size = size;
      this.disabled = disabled;
      this.setSelected(selected);
      this.nextPage = nextPage;
    }

    public String getPage() {
      return this.page;
    }

    public int getSize() {
      return this.size;
    }

    public boolean getDisabled() {
      return this.disabled;
    }

    public boolean isSelected() {
      return this.selected;
    }

    public void setSelected(final boolean selected) {
      this.selected = selected;
    }

    public int getNextPage() {
      return this.nextPage;
    }
  }
}