Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
A
azkaban_3.76
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
liqin
azkaban_3.76
Commits
3e829a3a
Commit
3e829a3a
authored
Feb 26, 2021
by
liqin
💬
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
bug fixed
parent
a8fb71d7
Hide whitespace changes
Inline
Side-by-side
Showing
28 changed files
with
891 additions
and
582 deletions
+891
-582
Constants.java
az-core/src/main/java/azkaban/Constants.java
+1
-0
build.gradle
azkaban-common/build.gradle
+16
-0
AlerterHolder.java
...-common/src/main/java/azkaban/executor/AlerterHolder.java
+3
-2
ExecutableFlow.java
...common/src/main/java/azkaban/executor/ExecutableFlow.java
+6
-0
ExecutionController.java
...n/src/main/java/azkaban/executor/ExecutionController.java
+1
-1
ExecutionControllerUtils.java
.../main/java/azkaban/executor/ExecutionControllerUtils.java
+315
-201
ExecutionFinalizer.java
...on/src/main/java/azkaban/executor/ExecutionFinalizer.java
+19
-2
ExecutionOptions.java
...mmon/src/main/java/azkaban/executor/ExecutionOptions.java
+330
-324
ExecutorManager.java
...ommon/src/main/java/azkaban/executor/ExecutorManager.java
+9
-3
QueuedExecutions.java
...mmon/src/main/java/azkaban/executor/QueuedExecutions.java
+1
-1
CommonJobProperties.java
...ommon/src/main/java/azkaban/flow/CommonJobProperties.java
+4
-2
Flow.java
azkaban-common/src/main/java/azkaban/flow/Flow.java
+19
-2
FlowUtils.java
azkaban-common/src/main/java/azkaban/flow/FlowUtils.java
+1
-0
DirectoryFlowLoader.java
...on/src/main/java/azkaban/project/DirectoryFlowLoader.java
+1
-0
DirectoryYamlFlowLoader.java
...rc/main/java/azkaban/project/DirectoryYamlFlowLoader.java
+1
-0
FlowLoaderUtils.java
...common/src/main/java/azkaban/project/FlowLoaderUtils.java
+29
-0
HttpRequestUtils.java
...common/src/main/java/azkaban/server/HttpRequestUtils.java
+48
-2
ExecuteFlowAction.java
.../main/java/azkaban/trigger/builtin/ExecuteFlowAction.java
+7
-0
EmailMessage.java
azkaban-common/src/main/java/azkaban/utils/EmailMessage.java
+10
-1
Emailer.java
azkaban-common/src/main/java/azkaban/utils/Emailer.java
+4
-0
SmsAlerter.java
azkaban-common/src/main/java/azkaban/utils/SmsAlerter.java
+2
-6
ExecutionControllerTest.java
...c/test/java/azkaban/executor/ExecutionControllerTest.java
+14
-14
FlowRunner.java
...exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+15
-1
azkaban.properties
...an-exec-server/src/main/resources/conf/azkaban.properties
+1
-1
ExecutorServlet.java
...src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+17
-4
LoginAbstractAzkabanServlet.java
...a/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
+2
-0
flowexecutionpanel.vm
...ces/azkaban/webapp/servlet/velocity/flowexecutionpanel.vm
+13
-13
azkaban.properties
...ban-web-server/src/main/resources/conf/azkaban.properties
+2
-2
No files found.
az-core/src/main/java/azkaban/Constants.java
View file @
3e829a3a
...
...
@@ -102,6 +102,7 @@ public class Constants {
// Executors can use cpu load calculated from this period to take/skip polling turns
public
static
final
int
DEFAULT_AZKABAN_POLLING_CRITERIA_CPU_LOAD_PERIOD_SEC
=
60
;
public
static
class
ConfigurationKeys
{
// Configures Azkaban to use new polling model for dispatching
...
...
azkaban-common/build.gradle
View file @
3e829a3a
/*
* Copyright 2017 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
dependencies
{
compile
project
(
':az-core'
)
compile
project
(
':azkaban-spi'
)
...
...
azkaban-common/src/main/java/azkaban/executor/AlerterHolder.java
View file @
3e829a3a
...
...
@@ -38,6 +38,7 @@ public class AlerterHolder {
@Inject
public
AlerterHolder
(
final
Props
props
,
final
Emailer
mailAlerter
)
{
logger
.
info
(
"job配置=========》"
+
props
.
toString
());
try
{
this
.
alerters
=
loadAlerters
(
props
,
mailAlerter
);
}
catch
(
final
Exception
ex
)
{
...
...
@@ -47,12 +48,12 @@ public class AlerterHolder {
}
private
Map
<
String
,
Alerter
>
loadAlerters
(
final
Props
props
,
final
Emailer
mailAlerter
)
{
logger
.
info
(
"================load报警器================="
);
final
Map
<
String
,
Alerter
>
allAlerters
=
new
HashMap
<>();
// load built-in alerters
allAlerters
.
put
(
"email"
,
mailAlerter
);
//
load external alerters
//
添加手机警报器
allAlerters
.
put
(
"sms"
,
new
SmsAlerter
(
props
));
// load all plugin alerters
final
String
pluginDir
=
props
.
getString
(
"alerter.plugin.dir"
,
"plugins/alerter"
);
allAlerters
.
putAll
(
loadPluginAlerters
(
pluginDir
));
...
...
azkaban-common/src/main/java/azkaban/executor/ExecutableFlow.java
View file @
3e829a3a
...
...
@@ -122,6 +122,12 @@ public class ExecutableFlow extends ExecutableFlowBase {
if
(
flow
.
getFailureEmails
()
!=
null
)
{
this
.
executionOptions
.
setFailureEmails
(
flow
.
getFailureEmails
());
}
if
(
flow
.
getSuccessSms
()
!=
null
)
{
this
.
executionOptions
.
setSuccessSms
(
flow
.
getSuccessSms
());
}
if
(
flow
.
getFailureSms
()
!=
null
)
{
this
.
executionOptions
.
setFailureSms
(
flow
.
getFailureSms
());
}
}
@Override
...
...
azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
View file @
3e829a3a
...
...
@@ -59,7 +59,7 @@ public class ExecutionController extends EventHandler implements ExecutorManager
private
final
AlerterHolder
alerterHolder
;
private
final
ExecutorHealthChecker
executorHealthChecker
;
private
final
int
maxConcurrentRunsOneFlow
;
private
final
Map
<
Pair
<
String
,
String
>,
Integer
>
maxConcurrentRunsPerFlowMap
;
private
final
Map
<
Pair
<
String
,
String
>,
Integer
>
maxConcurrentRunsPerFlowMap
;
private
final
CommonMetrics
commonMetrics
;
private
final
Props
azkProps
;
...
...
azkaban-common/src/main/java/azkaban/executor/ExecutionControllerUtils.java
View file @
3e829a3a
...
...
@@ -21,19 +21,20 @@ import static java.util.Objects.requireNonNull;
import
azkaban.Constants.ConfigurationKeys
;
import
azkaban.alert.Alerter
;
import
azkaban.utils.AuthenticationUtils
;
import
azkaban.utils.FileIOUtils
;
import
azkaban.utils.Props
;
import
java.io.BufferedReader
;
import
java.io.InputStreamReader
;
import
java.net.HttpURLConnection
;
import
java.net.URL
;
import
java.nio.charset.StandardCharsets
;
import
java.util.LinkedHashSet
;
import
java.util.LinkedList
;
import
java.util.List
;
import
java.util.Set
;
import
java.util.*
;
import
java.util.regex.Matcher
;
import
java.util.regex.Pattern
;
import
javax.annotation.Nullable
;
import
org.apache.commons.lang.StringUtils
;
import
org.apache.commons.lang.exception.ExceptionUtils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
...
...
@@ -43,228 +44,341 @@ import org.slf4j.LoggerFactory;
*/
public
class
ExecutionControllerUtils
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
ExecutionControllerUtils
.
class
);
private
static
final
String
SPARK_JOB_TYPE
=
"spark"
;
private
static
final
String
APPLICATION_ID
=
"${application.id}"
;
// The regex to look for while fetching application ID from the Hadoop/Spark job log
private
static
final
Pattern
APPLICATION_ID_PATTERN
=
Pattern
.
compile
(
"application_(\\d+_\\d+)"
);
// The regex to look for while validating the content from RM job link
private
static
final
Pattern
FAILED_TO_READ_APPLICATION_PATTERN
=
Pattern
.
compile
(
"Failed to read the application"
);
private
static
final
Pattern
INVALID_APPLICATION_ID_PATTERN
=
Pattern
.
compile
(
"Invalid Application ID"
);
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
ExecutionControllerUtils
.
class
);
private
static
final
String
SPARK_JOB_TYPE
=
"spark"
;
private
static
final
String
APPLICATION_ID
=
"${application.id}"
;
// The regex to look for while fetching application ID from the Hadoop/Spark job log
private
static
final
Pattern
APPLICATION_ID_PATTERN
=
Pattern
.
compile
(
"application_\\d+_\\d+"
);
// The regex to look for while validating the content from RM job link
private
static
final
Pattern
FAILED_TO_READ_APPLICATION_PATTERN
=
Pattern
.
compile
(
"Failed to read the application"
);
private
static
final
Pattern
INVALID_APPLICATION_ID_PATTERN
=
Pattern
.
compile
(
"Invalid Application ID"
);
/**
* If the current status of the execution is not one of the finished statuses, mark the execution
* as failed in the DB.
*
* @param executorLoader the executor loader
* @param alerterHolder the alerter holder
* @param flow the execution
* @param reason reason for finalizing the execution
* @param originalError the cause, if execution is being finalized because of an error
*/
public
static
void
finalizeFlow
(
final
ExecutorLoader
executorLoader
,
final
AlerterHolder
alerterHolder
,
final
ExecutableFlow
flow
,
final
String
reason
,
@Nullable
final
Throwable
originalError
)
{
boolean
alertUser
=
true
;
/**
* If the current status of the execution is not one of the finished statuses, mark the execution
* as failed in the DB.
* 如果执行的当前状态不是完成状态之一,那么在DB中将执行标记为失败。
*
* @param executorLoader the executor loader
* @param alerterHolder the alerter holder
* @param flow the execution
* @param reason reason for finalizing the execution
* @param originalError the cause, if execution is being finalized because of an error
*/
public
static
void
finalizeFlow
(
final
ExecutorLoader
executorLoader
,
final
AlerterHolder
alerterHolder
,
final
ExecutableFlow
flow
,
final
String
reason
,
@Nullable
final
Throwable
originalError
)
{
boolean
alertUser
=
true
;
// First check if the execution in the datastore is finished.
try
{
final
ExecutableFlow
dsFlow
;
if
(
isFinished
(
flow
))
{
dsFlow
=
flow
;
}
else
{
dsFlow
=
executorLoader
.
fetchExecutableFlow
(
flow
.
getExecutionId
());
// First check if the execution in the datastore is finished.
try
{
final
ExecutableFlow
dsFlow
;
if
(
isFinished
(
flow
))
{
dsFlow
=
flow
;
}
else
{
dsFlow
=
executorLoader
.
fetchExecutableFlow
(
flow
.
getExecutionId
());
// If it's marked finished, we're good. If not, we fail everything and then mark it
// finished.
if
(!
isFinished
(
dsFlow
))
{
failEverything
(
dsFlow
);
executorLoader
.
updateExecutableFlow
(
dsFlow
);
}
}
// If it's marked finished, we're good. If not, we fail everything and then mark it
// finished.
if
(!
isFinished
(
dsFlow
))
{
failEverything
(
dsFlow
);
executorLoader
.
updateExecutableFlow
(
dsFlow
);
if
(
flow
.
getEndTime
()
==
-
1
)
{
flow
.
setEndTime
(
System
.
currentTimeMillis
());
executorLoader
.
updateExecutableFlow
(
dsFlow
);
}
}
catch
(
final
ExecutorManagerException
e
)
{
// If failed due to azkaban internal error, do not alert user.
alertUser
=
false
;
logger
.
error
(
"Failed to finalize flow "
+
flow
.
getExecutionId
()
+
", do not alert user."
,
e
);
}
}
if
(
flow
.
getEndTime
()
==
-
1
)
{
flow
.
setEndTime
(
System
.
currentTimeMillis
());
executorLoader
.
updateExecutableFlow
(
dsFlow
);
}
}
catch
(
final
ExecutorManagerException
e
)
{
// If failed due to azkaban internal error, do not alert user.
alertUser
=
false
;
logger
.
error
(
"Failed to finalize flow "
+
flow
.
getExecutionId
()
+
", do not alert user."
,
e
);
}
if
(
alertUser
)
{
if
(
alertUser
)
{
alertUserOnFlowFinished
(
flow
,
alerterHolder
,
getFinalizeFlowReasons
(
reason
,
originalError
));
String
[]
finalizeFlowReasons
=
ExecutionControllerUtils
.
getFinalizeFlowReasons
(
reason
,
originalError
);
try
{
List
<
String
>
errorLogs
=
ExecutionControllerUtils
.
getErrorLogs
(
flow
,
executorLoader
);
for
(
String
finalizeFlowReason
:
finalizeFlowReasons
)
{
String
erroLine
=
ExecutionControllerUtils
.
getErroLine
(
finalizeFlowReason
);
errorLogs
.
add
(
erroLine
);
}
finalizeFlowReasons
=
errorLogs
.
toArray
(
new
String
[
errorLogs
.
size
()]);
}
catch
(
Exception
e
){
e
.
printStackTrace
();
}
alertUserOnFlowFinished
(
flow
,
alerterHolder
,
finalizeFlowReasons
,
executorLoader
);
// alertUserOnFlowFinished(flow, alerterHolder, getFinalizeFlowReasons(reason, originalError),executorLoader);
}
}
}
/**
* When a flow is finished, alert the user as is configured in the execution options.
*
* @param flow the execution
* @param alerterHolder the alerter holder
* @param extraReasons the extra reasons for alerting
*/
public
static
void
alertUserOnFlowFinished
(
final
ExecutableFlow
flow
,
final
AlerterHolder
alerterHolder
,
final
String
[]
extraReasons
)
{
final
ExecutionOptions
options
=
flow
.
getExecutionOptions
();
final
Alerter
mailAlerter
=
alerterHolder
.
get
(
"email"
);
if
(
flow
.
getStatus
()
!=
Status
.
SUCCEEDED
)
{
if
(
options
.
getFailureEmails
()
!=
null
&&
!
options
.
getFailureEmails
().
isEmpty
())
{
try
{
mailAlerter
.
alertOnError
(
flow
,
extraReasons
);
}
catch
(
final
Exception
e
)
{
logger
.
error
(
"Failed to alert on error for execution "
+
flow
.
getExecutionId
(),
e
);
}
}
if
(
options
.
getFlowParameters
().
containsKey
(
"alert.type"
))
{
final
String
alertType
=
options
.
getFlowParameters
().
get
(
"alert.type"
);
final
Alerter
alerter
=
alerterHolder
.
get
(
alertType
);
if
(
alerter
!=
null
)
{
try
{
alerter
.
alertOnError
(
flow
,
extraReasons
);
}
catch
(
final
Exception
e
)
{
logger
.
error
(
"Failed to alert on error by "
+
alertType
+
" for execution "
+
flow
.
getExecutionId
(),
e
);
}
}
else
{
logger
.
error
(
"Alerter type "
+
alertType
+
" doesn't exist. Failed to alert."
);
}
}
}
else
{
if
(
options
.
getSuccessEmails
()
!=
null
&&
!
options
.
getSuccessEmails
().
isEmpty
())
{
try
{
mailAlerter
.
alertOnSuccess
(
flow
);
}
catch
(
final
Exception
e
)
{
logger
.
error
(
"Failed to alert on success for execution "
+
flow
.
getExecutionId
(),
e
);
}
}
if
(
options
.
getFlowParameters
().
containsKey
(
"alert.type"
))
{
final
String
alertType
=
options
.
getFlowParameters
().
get
(
"alert.type"
);
final
Alerter
alerter
=
alerterHolder
.
get
(
alertType
);
if
(
alerter
!=
null
)
{
try
{
alerter
.
alertOnSuccess
(
flow
);
}
catch
(
final
Exception
e
)
{
logger
.
error
(
"Failed to alert on success by "
+
alertType
+
" for execution "
+
flow
.
getExecutionId
(),
e
);
}
/**
* When a flow is finished, alert the user as is configured in the execution options.
* 当流完成时,按照执行选项中配置的方式向用户发出警报。
*
* @param flow the execution
* @param alerterHolder the alerter holder
* @param extraReasons the extra reasons for alerting
*/
public
static
void
alertUserOnFlowFinished
(
final
ExecutableFlow
flow
,
final
AlerterHolder
alerterHolder
,
final
String
[]
extraReasons
,
final
ExecutorLoader
executorLoader
)
{
final
ExecutionOptions
options
=
flow
.
getExecutionOptions
();
final
Alerter
mailAlerter
=
alerterHolder
.
get
(
"email"
);
final
Alerter
msg
=
alerterHolder
.
get
(
"msg"
);
if
(
flow
.
getStatus
()
!=
Status
.
SUCCEEDED
)
{
if
(
options
.
getFailureEmails
()
!=
null
&&
!
options
.
getFailureEmails
().
isEmpty
())
{
try
{
mailAlerter
.
alertOnError
(
flow
,
extraReasons
);
}
catch
(
final
Exception
e
)
{
logger
.
error
(
"Failed to alert on error for execution "
+
flow
.
getExecutionId
(),
e
);
}
}
//添加短信报警功能
if
(
options
.
getFailureSms
()
!=
null
&&
!
options
.
getFailureSms
().
isEmpty
())
{
try
{
msg
.
alertOnError
(
flow
,
extraReasons
);
}
catch
(
Exception
e
)
{
logger
.
error
(
"Failed to alert on error for execution "
+
flow
.
getExecutionId
(),
e
);
}
}
if
(
options
.
getFlowParameters
().
containsKey
(
"alert.type"
))
{
final
String
alertType
=
options
.
getFlowParameters
().
get
(
"alert.type"
);
final
Alerter
alerter
=
alerterHolder
.
get
(
alertType
);
if
(
alerter
!=
null
)
{
try
{
alerter
.
alertOnError
(
flow
,
extraReasons
);
}
catch
(
final
Exception
e
)
{
logger
.
error
(
"Failed to alert on error by "
+
alertType
+
" for execution "
+
flow
.
getExecutionId
(),
e
);
}
}
else
{
logger
.
error
(
"Alerter type "
+
alertType
+
" doesn't exist. Failed to alert."
);
}
}
}
else
{
logger
.
error
(
"Alerter type "
+
alertType
+
" doesn't exist. Failed to alert."
);
if
(
options
.
getSuccessEmails
()
!=
null
&&
!
options
.
getSuccessEmails
().
isEmpty
())
{
try
{
mailAlerter
.
alertOnSuccess
(
flow
);
}
catch
(
final
Exception
e
)
{
logger
.
error
(
"Failed to alert on success for execution "
+
flow
.
getExecutionId
(),
e
);
}
}
//添加短信报警功能
if
(
options
.
getSuccessSms
()
!=
null
&&
!
options
.
getSuccessSms
().
isEmpty
())
{
try
{
msg
.
alertOnSuccess
(
flow
);
}
catch
(
Exception
e
)
{
logger
.
error
(
"Failed to alert on success for execution "
+
flow
.
getExecutionId
(),
e
);
}
}
if
(
options
.
getFlowParameters
().
containsKey
(
"alert.type"
))
{
final
String
alertType
=
options
.
getFlowParameters
().
get
(
"alert.type"
);
final
Alerter
alerter
=
alerterHolder
.
get
(
alertType
);
if
(
alerter
!=
null
)
{
try
{
alerter
.
alertOnSuccess
(
flow
);
}
catch
(
final
Exception
e
)
{
logger
.
error
(
"Failed to alert on success by "
+
alertType
+
" for execution "
+
flow
.
getExecutionId
(),
e
);
}
}
else
{
logger
.
error
(
"Alerter type "
+
alertType
+
" doesn't exist. Failed to alert."
);
}
}
}
}
}
}
/**
* Alert the user when the flow has encountered the first error.
*
* @param flow the execution
* @param alerterHolder the alerter holder
*/
public
static
void
alertUserOnFirstError
(
final
ExecutableFlow
flow
,
final
AlerterHolder
alerterHolder
)
{
final
ExecutionOptions
options
=
flow
.
getExecutionOptions
();
if
(
options
.
getNotifyOnFirstFailure
())
{
logger
.
info
(
"Alert on first error of execution "
+
flow
.
getExecutionId
());
final
Alerter
mailAlerter
=
alerterHolder
.
get
(
"email"
);
try
{
mailAlerter
.
alertOnFirstError
(
flow
);
}
catch
(
final
Exception
e
)
{
logger
.
error
(
"Failed to send first error email."
+
e
.
getMessage
(),
e
);
}
/**
* Alert the user when the flow has encountered the first error.
* 当流遇到第一个错误时通知用户。
*
* @param flow the execution
* @param alerterHolder the alerter holder
*/
public
static
void
alertUserOnFirstError
(
final
ExecutableFlow
flow
,
final
AlerterHolder
alerterHolder
)
{
final
ExecutionOptions
options
=
flow
.
getExecutionOptions
();
//控制报警器运行程序
if
(
options
.
getNotifyOnFirstFailure
())
{
logger
.
info
(
"Alert on first error of execution "
+
flow
.
getExecutionId
());
final
Alerter
mailAlerter
=
alerterHolder
.
get
(
"email"
);
try
{
mailAlerter
.
alertOnFirstError
(
flow
);
}
catch
(
final
Exception
e
)
{
logger
.
error
(
"Failed to send first error email."
+
e
.
getMessage
(),
e
);
}
if
(
options
.
getFlowParameters
().
containsKey
(
"alert.type"
))
{
final
String
alertType
=
options
.
getFlowParameters
().
get
(
"alert.type"
);
final
Alerter
alerter
=
alerterHolder
.
get
(
alertType
);
if
(
alerter
!=
null
)
{
try
{
alerter
.
alertOnFirstError
(
flow
);
}
catch
(
final
Exception
e
)
{
logger
.
error
(
"Failed to alert by "
+
alertType
,
e
);
}
}
else
{
logger
.
error
(
"Alerter type "
+
alertType
+
" doesn't exist. Failed to alert."
);
if
(
options
.
getFlowParameters
().
containsKey
(
"alert.type"
))
{
final
String
alertType
=
options
.
getFlowParameters
().
get
(
"alert.type"
);
final
Alerter
alerter
=
alerterHolder
.
get
(
alertType
);
if
(
alerter
!=
null
)
{
try
{
alerter
.
alertOnFirstError
(
flow
);
}
catch
(
final
Exception
e
)
{
logger
.
error
(
"Failed to alert by "
+
alertType
,
e
);
}
}
else
{
logger
.
error
(
"Alerter type "
+
alertType
+
" doesn't exist. Failed to alert."
);
}
}
}
}
//短信报警
if
(
options
.
getSmsOnFirstFailure
())
{
final
Alerter
msg
=
alerterHolder
.
get
(
"msg"
);
try
{
msg
.
alertOnFirstError
(
flow
);
}
catch
(
Exception
e
)
{
logger
.
error
(
"Failed to send first error email."
+
e
.
getMessage
(),
e
);
}
}
}
}
/**
* Get the reasons to finalize the flow.
*
* @param reason the reason
* @param originalError the original error
* @return the reasons to finalize the flow
*/
public
static
String
[]
getFinalizeFlowReasons
(
final
String
reason
,
final
Throwable
originalError
)
{
final
List
<
String
>
reasons
=
new
LinkedList
<>();
reasons
.
add
(
reason
);
if
(
originalError
!=
null
)
{
reasons
.
add
(
ExceptionUtils
.
getStackTrace
(
originalError
));
/**
* Get the reasons to finalize the flow.
* 获取完成流的原因。
*
* @param reason the reason
* @param originalError the original error
* @return the reasons to finalize the flow
*/
public
static
String
[]
getFinalizeFlowReasons
(
final
String
reason
,
final
Throwable
originalError
)
{
final
List
<
String
>
reasons
=
new
LinkedList
<>();
reasons
.
add
(
reason
);
if
(
originalError
!=
null
)
{
reasons
.
add
(
ExceptionUtils
.
getStackTrace
(
originalError
));
}
return
reasons
.
toArray
(
new
String
[
reasons
.
size
()]);
}
return
reasons
.
toArray
(
new
String
[
reasons
.
size
()]);
}
/**
* Set the flow status to failed and fail every node inside the flow.
*
* @param exFlow the executable flow
*/
public
static
void
failEverything
(
final
ExecutableFlow
exFlow
)
{
final
long
time
=
System
.
currentTimeMillis
();
for
(
final
ExecutableNode
node
:
exFlow
.
getExecutableNodes
())
{
switch
(
node
.
getStatus
())
{
case
SUCCEEDED:
case
FAILED:
case
KILLED:
case
SKIPPED:
case
DISABLED:
continue
;
// case UNKNOWN:
case
READY:
node
.
setStatus
(
Status
.
KILLING
);
break
;
default
:
node
.
setStatus
(
Status
.
FAILED
);
break
;
}
/**
* 从数据库中查询报错日志
* @param flow
* @param executorLoader
* @return
*/
public
static
List
<
String
>
getErrorLogs
(
final
ExecutableFlow
flow
,
final
ExecutorLoader
executorLoader
){
// 错误的job
final
List
<
String
>
failedJobs
=
findFailedJobs
(
flow
);
final
int
execId
=
flow
.
getExecutionId
();
List
<
String
>
erros
=
new
ArrayList
<>();
for
(
String
job:
failedJobs
)
{
try
{
FileIOUtils
.
LogData
logData
=
executorLoader
.
fetchLogs
(
execId
,
job
,
0
,
0
,
500000
);
String
data
=
logData
.
getData
();
if
(
StringUtils
.
isBlank
(
data
)||!
data
.
contains
(
"ERROR - Job run failed!\n"
)){
continue
;
}
else
{
String
[]
split
=
data
.
split
(
"ERROR - Job run failed!\n"
);
if
(
split
.
length
>
1
){
String
erroLine
=
job
+
"====>"
+
getErroLine
(
split
[
1
]);
logger
.
debug
(
"处理后的一条错误信息=========》"
+
erroLine
);
erros
.
add
(
erroLine
);
}
}
}
catch
(
ExecutorManagerException
e
)
{
logger
.
error
(
"组装错误日志出错"
+
e
.
getLocalizedMessage
());
}
}
return
erros
;
if
(
node
.
getStartTime
()
==
-
1
)
{
node
.
setStartTime
(
time
);
}
if
(
node
.
getEndTime
()
==
-
1
)
{
node
.
setEndTime
(
time
);
}
}
if
(
exFlow
.
getEndTime
()
==
-
1
)
{
exFlow
.
setEndTime
(
time
);
/***
* 获取报错信息中第一行报错信息
* @param errorData
* @return
*/
public
static
String
getErroLine
(
String
errorData
){
if
(
StringUtils
.
isBlank
(
errorData
)){
return
null
;
}
else
{
String
[]
split
=
errorData
.
split
(
"\n"
);
return
split
[
0
];
}
}
exFlow
.
setStatus
(
Status
.
FAILED
);
}
/**
* 获取错误的job
* @param flow
* @return
*/
private
static
List
<
String
>
findFailedJobs
(
final
ExecutableFlow
flow
)
{
final
ArrayList
<
String
>
failedJobs
=
new
ArrayList
<>();
for
(
final
ExecutableNode
node
:
flow
.
getExecutableNodes
())
{
if
(
node
.
getStatus
()
==
Status
.
FAILED
)
{
failedJobs
.
add
(
node
.
getId
());
}
}
return
failedJobs
;
}
/**
* Check if the flow status is finished.
*
* @param flow the executable flow
* @return the boolean
*/
public
static
boolean
isFinished
(
final
ExecutableFlow
flow
)
{
switch
(
flow
.
getStatus
())
{
case
SUCCEEDED:
case
FAILED:
case
KILLED:
return
true
;
default
:
return
false
;
/**
* Set the flow status to failed and fail every node inside the flow.
* 将流状态设置为failed,并使流中的每个节点失效。
*
* @param exFlow the executable flow
*/
public
static
void
failEverything
(
final
ExecutableFlow
exFlow
)
{
final
long
time
=
System
.
currentTimeMillis
();
for
(
final
ExecutableNode
node
:
exFlow
.
getExecutableNodes
())
{
switch
(
node
.
getStatus
())
{
case
SUCCEEDED:
case
FAILED:
case
KILLED:
case
SKIPPED:
case
DISABLED:
continue
;
// case UNKNOWN:
case
READY:
node
.
setStatus
(
Status
.
KILLING
);
break
;
default
:
node
.
setStatus
(
Status
.
FAILED
);
break
;
}
if
(
node
.
getStartTime
()
==
-
1
)
{
node
.
setStartTime
(
time
);
}
if
(
node
.
getEndTime
()
==
-
1
)
{
node
.
setEndTime
(
time
);
}
}
if
(
exFlow
.
getEndTime
()
==
-
1
)
{
exFlow
.
setEndTime
(
time
);
}
exFlow
.
setStatus
(
Status
.
FAILED
);
}
/**
* Check if the flow status is finished.
*
* @param flow the executable flow
* @return the boolean
*/
public
static
boolean
isFinished
(
final
ExecutableFlow
flow
)
{
switch
(
flow
.
getStatus
())
{
case
SUCCEEDED:
case
FAILED:
case
KILLED:
return
true
;
default
:
return
false
;
}
}
}
/**
* Dynamically create the job link url. Construct the job link url from resource manager url.
...
...
azkaban-common/src/main/java/azkaban/executor/ExecutionFinalizer.java
View file @
3e829a3a
...
...
@@ -18,8 +18,13 @@ package azkaban.executor;
import
javax.annotation.Nullable
;
import
javax.inject.Inject
;
import
azkaban.utils.FileIOUtils
;
import
org.apache.log4j.Logger
;
import
java.util.Arrays
;
import
java.util.List
;
/**
* Handles removing of running executions (after they have been deemed to be be done or orphaned).
*/
...
...
@@ -94,9 +99,21 @@ public class ExecutionFinalizer {
this
.
updaterStage
.
set
(
"finalizing flow "
+
execId
+
" alerting and emailing"
);
if
(
alertUser
)
{
String
[]
finalizeFlowReasons
=
ExecutionControllerUtils
.
getFinalizeFlowReasons
(
reason
,
originalError
);
try
{
List
<
String
>
errorLogs
=
ExecutionControllerUtils
.
getErrorLogs
(
flow
,
executorLoader
);
for
(
String
finalizeFlowReason
:
finalizeFlowReasons
)
{
String
erroLine
=
ExecutionControllerUtils
.
getErroLine
(
finalizeFlowReason
);
errorLogs
.
add
(
erroLine
);
}
finalizeFlowReasons
=
errorLogs
.
toArray
(
new
String
[
errorLogs
.
size
()]);
}
catch
(
Exception
e
){
e
.
printStackTrace
();
}
ExecutionControllerUtils
.
alertUserOnFlowFinished
(
flow
,
this
.
alerterHolder
,
ExecutionControllerUtils
.
getFinalizeFlowReasons
(
reason
,
originalError
));
finalizeFlowReasons
,
this
.
executorLoader
);
}
}
...
...
azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java
View file @
3e829a3a
...
...
@@ -20,7 +20,6 @@ import azkaban.executor.mail.DefaultMailCreator;
import
azkaban.sla.SlaOption
;
import
azkaban.utils.TypedMapWrapper
;
import
com.google.gson.GsonBuilder
;
import
java.util.ArrayList
;
import
java.util.Collection
;
import
java.util.Collections
;
...
...
@@ -33,327 +32,334 @@ import java.util.Map;
*/
public
class
ExecutionOptions
{
public
static
final
String
CONCURRENT_OPTION_SKIP
=
"skip"
;
public
static
final
String
CONCURRENT_OPTION_PIPELINE
=
"pipeline"
;
public
static
final
String
CONCURRENT_OPTION_IGNORE
=
"ignore"
;
public
static
final
String
FLOW_PRIORITY
=
"flowPriority"
;
/* override dispatcher selection and use executor id specified */
public
static
final
String
USE_EXECUTOR
=
"useExecutor"
;
public
static
final
int
DEFAULT_FLOW_PRIORITY
=
5
;
private
static
final
String
FLOW_PARAMETERS
=
"flowParameters"
;
private
static
final
String
NOTIFY_ON_FIRST_FAILURE
=
"notifyOnFirstFailure"
;
private
static
final
String
NOTIFY_ON_LAST_FAILURE
=
"notifyOnLastFailure"
;
private
static
final
String
SMS_ON_FIRST_FAILURE
=
"smsOnFirstFailure"
;
private
static
final
String
SMS_ON_LAST_FAILURE
=
"smsOnLastFailure"
;
private
static
final
String
SUCCESS_EMAILS
=
"successEmails"
;
private
static
final
String
FAILURE_EMAILS
=
"failureEmails"
;
private
static
final
String
FAILURE_ACTION
=
"failureAction"
;
private
static
final
String
PIPELINE_LEVEL
=
"pipelineLevel"
;
private
static
final
String
PIPELINE_EXECID
=
"pipelineExecId"
;
private
static
final
String
QUEUE_LEVEL
=
"queueLevel"
;
private
static
final
String
CONCURRENT_OPTION
=
"concurrentOption"
;
private
static
final
String
DISABLE
=
"disabled"
;
private
static
final
String
FAILURE_EMAILS_OVERRIDE
=
"failureEmailsOverride"
;
private
static
final
String
SUCCESS_EMAILS_OVERRIDE
=
"successEmailsOverride"
;
private
static
final
String
MAIL_CREATOR
=
"mailCreator"
;
private
static
final
String
MEMORY_CHECK
=
"memoryCheck"
;
private
static
final
String
FAILURE_SMS_OVERRIDE
=
"failureSmsOverride"
;
private
static
final
String
SUCCESS_SMS_OVERRIDE
=
"successSmsOverride"
;
private
static
final
String
FAILURE_SMS
=
"failureSms"
;
private
static
final
String
SUCCESS_SMS
=
"successSms"
;
private
boolean
notifyOnFirstFailure
=
true
;
private
boolean
notifyOnLastFailure
=
false
;
private
boolean
smsOnFirstFailure
=
true
;
private
boolean
smsOnLastFailure
=
false
;
private
boolean
failureEmailsOverride
=
false
;
private
boolean
successEmailsOverride
=
false
;
private
ArrayList
<
String
>
failureEmails
=
new
ArrayList
<>();
private
ArrayList
<
String
>
successEmails
=
new
ArrayList
<>();
private
boolean
failureSmsOverride
=
false
;
private
boolean
successSmsOverride
=
false
;
private
ArrayList
<
String
>
failureSms
=
new
ArrayList
<>();
private
ArrayList
<
String
>
successSms
=
new
ArrayList
<>();
private
Integer
pipelineLevel
=
null
;
private
Integer
pipelineExecId
=
null
;
private
Integer
queueLevel
=
0
;
private
String
concurrentOption
=
CONCURRENT_OPTION_IGNORE
;
private
String
mailCreator
=
DefaultMailCreator
.
DEFAULT_MAIL_CREATOR
;
private
boolean
memoryCheck
=
true
;
private
Map
<
String
,
String
>
flowParameters
=
new
HashMap
<>();
private
FailureAction
failureAction
=
FailureAction
.
FINISH_CURRENTLY_RUNNING
;
private
List
<
DisabledJob
>
initiallyDisabledJobs
=
new
ArrayList
<>();
private
List
<
SlaOption
>
slaOptions
=
new
ArrayList
<>();
public
static
ExecutionOptions
createFromObject
(
final
Object
obj
)
{
if
(
obj
==
null
||
!(
obj
instanceof
Map
))
{
return
null
;
}
final
Map
<
String
,
Object
>
optionsMap
=
(
Map
<
String
,
Object
>)
obj
;
final
TypedMapWrapper
<
String
,
Object
>
wrapper
=
new
TypedMapWrapper
<>(
optionsMap
);
final
ExecutionOptions
options
=
new
ExecutionOptions
();
if
(
optionsMap
.
containsKey
(
FLOW_PARAMETERS
))
{
options
.
flowParameters
=
new
HashMap
<>();
options
.
flowParameters
.
putAll
(
wrapper
.<
String
,
String
>
getMap
(
FLOW_PARAMETERS
));
}
// Failure notification
options
.
notifyOnFirstFailure
=
wrapper
.
getBool
(
NOTIFY_ON_FIRST_FAILURE
,
options
.
notifyOnFirstFailure
);
options
.
notifyOnLastFailure
=
wrapper
.
getBool
(
NOTIFY_ON_LAST_FAILURE
,
options
.
notifyOnLastFailure
);
options
.
concurrentOption
=
wrapper
.
getString
(
CONCURRENT_OPTION
,
options
.
concurrentOption
);
options
.
smsOnFirstFailure
=
wrapper
.
getBool
(
SMS_ON_FIRST_FAILURE
,
options
.
smsOnFirstFailure
);
options
.
smsOnLastFailure
=
wrapper
.
getBool
(
SMS_ON_LAST_FAILURE
,
options
.
smsOnLastFailure
);
if
(
wrapper
.
containsKey
(
DISABLE
))
{
options
.
initiallyDisabledJobs
=
DisabledJob
.
fromDeprecatedObjectList
(
wrapper
.<
Object
>
getList
(
DISABLE
));
}
if
(
optionsMap
.
containsKey
(
MAIL_CREATOR
))
{
options
.
mailCreator
=
(
String
)
optionsMap
.
get
(
MAIL_CREATOR
);
}
// Failure action
options
.
failureAction
=
FailureAction
.
valueOf
(
wrapper
.
getString
(
FAILURE_ACTION
,
options
.
failureAction
.
toString
()));
options
.
pipelineLevel
=
wrapper
.
getInt
(
PIPELINE_LEVEL
,
options
.
pipelineLevel
);
options
.
pipelineExecId
=
wrapper
.
getInt
(
PIPELINE_EXECID
,
options
.
pipelineExecId
);
options
.
queueLevel
=
wrapper
.
getInt
(
QUEUE_LEVEL
,
options
.
queueLevel
);
// Success emails
options
.
setSuccessEmails
(
wrapper
.<
String
>
getList
(
SUCCESS_EMAILS
,
Collections
.<
String
>
emptyList
()));
options
.
setFailureEmails
(
wrapper
.<
String
>
getList
(
FAILURE_EMAILS
,
Collections
.<
String
>
emptyList
()));
//添加手机验证
options
.
setFailureSms
(
wrapper
.
getList
(
FAILURE_SMS
,
Collections
.
emptyList
()));
options
.
setSuccessSms
(
wrapper
.
getList
(
SUCCESS_SMS
,
Collections
.
emptyList
()));
options
.
setSuccessEmailsOverridden
(
wrapper
.
getBool
(
SUCCESS_EMAILS_OVERRIDE
,
false
));
options
.
setFailureEmailsOverridden
(
wrapper
.
getBool
(
FAILURE_EMAILS_OVERRIDE
,
false
));
//添加手机验证
options
.
setSuccessSmsOverride
(
wrapper
.
getBool
(
SUCCESS_SMS_OVERRIDE
,
false
));
options
.
setFailureSmsOverride
(
wrapper
.
getBool
(
FAILURE_SMS_OVERRIDE
,
false
));
options
.
setMemoryCheck
(
wrapper
.
getBool
(
MEMORY_CHECK
,
true
));
// Note: slaOptions was originally outside of execution options, so it parsed and set
// separately for the original JSON format. New formats should include slaOptions as
// part of execution options.
return
options
;
}
public
void
addAllFlowParameters
(
final
Map
<
String
,
String
>
flowParam
)
{
this
.
flowParameters
.
putAll
(
flowParam
);
}
public
Map
<
String
,
String
>
getFlowParameters
()
{
return
this
.
flowParameters
;
}
public
boolean
isFailureEmailsOverridden
()
{
return
this
.
failureEmailsOverride
;
}
public
void
setFailureEmailsOverridden
(
final
boolean
override
)
{
this
.
failureEmailsOverride
=
override
;
}
public
boolean
isSuccessEmailsOverridden
()
{
return
this
.
successEmailsOverride
;
}
public
void
setSuccessEmailsOverridden
(
final
boolean
override
)
{
this
.
successEmailsOverride
=
override
;
}
public
List
<
String
>
getFailureEmails
()
{
return
this
.
failureEmails
;
}
public
void
setFailureEmails
(
final
Collection
<
String
>
emails
)
{
this
.
failureEmails
=
new
ArrayList
<>(
emails
);
}
public
List
<
String
>
getSuccessEmails
()
{
return
this
.
successEmails
;
}
public
void
setSuccessEmails
(
final
Collection
<
String
>
emails
)
{
this
.
successEmails
=
new
ArrayList
<>(
emails
);
}
public
boolean
isFailureSmsOverride
()
{
return
this
.
failureSmsOverride
;
}
public
void
setFailureSmsOverride
(
final
boolean
failureSmsOverride
)
{
this
.
failureSmsOverride
=
failureSmsOverride
;
}
public
boolean
isSuccessSmsOverride
()
{
return
this
.
successSmsOverride
;
}
public
void
setSuccessSmsOverride
(
boolean
successSmsOverride
)
{
this
.
successSmsOverride
=
successSmsOverride
;
}
public
ArrayList
<
String
>
getFailureSms
()
{
return
this
.
failureSms
;
}
public
void
setFailureSms
(
final
Collection
<
String
>
failureSms
)
{
if
(
failureSms
!=
null
&&
failureSms
.
size
()>
0
)
this
.
failureSms
=
new
ArrayList
<>(
failureSms
);
}
public
ArrayList
<
String
>
getSuccessSms
()
{
return
this
.
successSms
;
}
public
void
setSuccessSms
(
final
Collection
<
String
>
successSms
)
{
if
(
successSms
!=
null
&&
successSms
.
size
()
>
0
)
this
.
successSms
=
new
ArrayList
<>(
successSms
);
}
public
boolean
getNotifyOnFirstFailure
()
{
return
this
.
notifyOnFirstFailure
;
}
public
void
setNotifyOnFirstFailure
(
final
boolean
notify
)
{
this
.
notifyOnFirstFailure
=
notify
;
}
public
boolean
getNotifyOnLastFailure
()
{
return
this
.
notifyOnLastFailure
;
}
public
void
setNotifyOnLastFailure
(
final
boolean
notify
)
{
this
.
notifyOnLastFailure
=
notify
;
}
public
boolean
getSmsOnFirstFailure
()
{
return
this
.
smsOnFirstFailure
;
}
public
void
setSmsOnFirstFailure
(
final
boolean
smsOnFirstFailure
)
{
this
.
smsOnFirstFailure
=
smsOnFirstFailure
;
}
public
boolean
getSmsOnLastFailure
()
{
return
this
.
smsOnLastFailure
;
}
public
void
setSmsOnLastFailure
(
final
boolean
smsOnLastFailure
)
{
this
.
smsOnLastFailure
=
smsOnLastFailure
;
}
public
FailureAction
getFailureAction
()
{
return
this
.
failureAction
;
}
public
void
setFailureAction
(
final
FailureAction
action
)
{
this
.
failureAction
=
action
;
}
public
String
getConcurrentOption
()
{
return
this
.
concurrentOption
;
}
public
void
setConcurrentOption
(
final
String
concurrentOption
)
{
this
.
concurrentOption
=
concurrentOption
;
}
public
String
getMailCreator
()
{
return
this
.
mailCreator
;
}
public
void
setMailCreator
(
final
String
mailCreator
)
{
this
.
mailCreator
=
mailCreator
;
}
public
Integer
getPipelineLevel
()
{
return
this
.
pipelineLevel
;
}
public
void
setPipelineLevel
(
final
Integer
level
)
{
this
.
pipelineLevel
=
level
;
}
public
Integer
getPipelineExecutionId
()
{
return
this
.
pipelineExecId
;
}
public
void
setPipelineExecutionId
(
final
Integer
id
)
{
this
.
pipelineExecId
=
id
;
}
public
Integer
getQueueLevel
()
{
return
this
.
queueLevel
;
}
public
List
<
DisabledJob
>
getDisabledJobs
()
{
return
new
ArrayList
<>(
this
.
initiallyDisabledJobs
);
}
public
void
setDisabledJobs
(
final
List
<
DisabledJob
>
disabledJobs
)
{
this
.
initiallyDisabledJobs
=
disabledJobs
;
}
public
boolean
getMemoryCheck
()
{
return
this
.
memoryCheck
;
}
public
void
setMemoryCheck
(
final
boolean
memoryCheck
)
{
this
.
memoryCheck
=
memoryCheck
;
}
public
List
<
SlaOption
>
getSlaOptions
()
{
return
slaOptions
;
}
public
void
setSlaOptions
(
final
List
<
SlaOption
>
slaOptions
)
{
this
.
slaOptions
=
slaOptions
;
}
public
Map
<
String
,
Object
>
toObject
()
{
final
HashMap
<
String
,
Object
>
flowOptionObj
=
new
HashMap
<>();
flowOptionObj
.
put
(
FLOW_PARAMETERS
,
this
.
flowParameters
);
flowOptionObj
.
put
(
NOTIFY_ON_FIRST_FAILURE
,
this
.
notifyOnFirstFailure
);
flowOptionObj
.
put
(
NOTIFY_ON_LAST_FAILURE
,
this
.
notifyOnLastFailure
);
flowOptionObj
.
put
(
SMS_ON_FIRST_FAILURE
,
this
.
smsOnFirstFailure
);
flowOptionObj
.
put
(
SMS_ON_LAST_FAILURE
,
this
.
smsOnLastFailure
);
flowOptionObj
.
put
(
SUCCESS_EMAILS
,
this
.
successEmails
);
flowOptionObj
.
put
(
FAILURE_EMAILS
,
this
.
failureEmails
);
flowOptionObj
.
put
(
FAILURE_ACTION
,
this
.
failureAction
.
toString
());
flowOptionObj
.
put
(
PIPELINE_LEVEL
,
this
.
pipelineLevel
);
flowOptionObj
.
put
(
PIPELINE_EXECID
,
this
.
pipelineExecId
);
flowOptionObj
.
put
(
QUEUE_LEVEL
,
this
.
queueLevel
);
flowOptionObj
.
put
(
CONCURRENT_OPTION
,
this
.
concurrentOption
);
flowOptionObj
.
put
(
DISABLE
,
DisabledJob
.
toDeprecatedObjectList
(
this
.
initiallyDisabledJobs
));
flowOptionObj
.
put
(
FAILURE_EMAILS_OVERRIDE
,
this
.
failureEmailsOverride
);
flowOptionObj
.
put
(
SUCCESS_EMAILS_OVERRIDE
,
this
.
successEmailsOverride
);
flowOptionObj
.
put
(
MAIL_CREATOR
,
this
.
mailCreator
);
flowOptionObj
.
put
(
MEMORY_CHECK
,
this
.
memoryCheck
);
flowOptionObj
.
put
(
SUCCESS_SMS_OVERRIDE
,
this
.
successSmsOverride
);
flowOptionObj
.
put
(
FAILURE_SMS_OVERRIDE
,
this
.
failureSmsOverride
);
flowOptionObj
.
put
(
SUCCESS_SMS
,
this
.
successSms
);
flowOptionObj
.
put
(
FAILURE_SMS
,
failureSms
);
return
flowOptionObj
;
}
public
String
toJSON
()
{
return
new
GsonBuilder
().
setPrettyPrinting
().
create
().
toJson
(
toObject
());
}
public
enum
FailureAction
{
FINISH_CURRENTLY_RUNNING
,
CANCEL_ALL
,
FINISH_ALL_POSSIBLE
}
public
static
final
String
CONCURRENT_OPTION_SKIP
=
"skip"
;
public
static
final
String
CONCURRENT_OPTION_PIPELINE
=
"pipeline"
;
public
static
final
String
CONCURRENT_OPTION_IGNORE
=
"ignore"
;
public
static
final
String
FLOW_PRIORITY
=
"flowPriority"
;
/* override dispatcher selection and use executor id specified */
public
static
final
String
USE_EXECUTOR
=
"useExecutor"
;
public
static
final
int
DEFAULT_FLOW_PRIORITY
=
5
;
private
static
final
String
FLOW_PARAMETERS
=
"flowParameters"
;
private
static
final
String
NOTIFY_ON_FIRST_FAILURE
=
"notifyOnFirstFailure"
;
private
static
final
String
NOTIFY_ON_LAST_FAILURE
=
"notifyOnLastFailure"
;
private
static
final
String
SMS_ON_FIRST_FAILURE
=
"smsOnFirstFailure"
;
private
static
final
String
SMS_ON_LAST_FAILURE
=
"smsOnLastFailure"
;
private
static
final
String
SUCCESS_EMAILS
=
"successEmails"
;
private
static
final
String
FAILURE_EMAILS
=
"failureEmails"
;
private
static
final
String
FAILURE_ACTION
=
"failureAction"
;
private
static
final
String
PIPELINE_LEVEL
=
"pipelineLevel"
;
private
static
final
String
PIPELINE_EXECID
=
"pipelineExecId"
;
private
static
final
String
QUEUE_LEVEL
=
"queueLevel"
;
private
static
final
String
CONCURRENT_OPTION
=
"concurrentOption"
;
private
static
final
String
DISABLE
=
"disabled"
;
private
static
final
String
FAILURE_EMAILS_OVERRIDE
=
"failureEmailsOverride"
;
private
static
final
String
SUCCESS_EMAILS_OVERRIDE
=
"successEmailsOverride"
;
private
static
final
String
MAIL_CREATOR
=
"mailCreator"
;
private
static
final
String
MEMORY_CHECK
=
"memoryCheck"
;
private
static
final
String
FAILURE_SMS_OVERRIDE
=
"failureSmsOverride"
;
private
static
final
String
SUCCESS_SMS_OVERRIDE
=
"successSmsOverride"
;
private
static
final
String
FAILURE_SMS
=
"failureSms"
;
private
static
final
String
SUCCESS_SMS
=
"successSms"
;
private
boolean
notifyOnFirstFailure
=
true
;
private
boolean
notifyOnLastFailure
=
false
;
private
boolean
smsOnFirstFailure
=
true
;
private
boolean
smsOnLastFailure
=
false
;
private
boolean
failureEmailsOverride
=
false
;
private
boolean
successEmailsOverride
=
false
;
private
ArrayList
<
String
>
failureEmails
=
new
ArrayList
<>();
private
ArrayList
<
String
>
successEmails
=
new
ArrayList
<>();
private
boolean
failureSmsOverride
=
false
;
private
boolean
successSmsOverride
=
false
;
private
ArrayList
<
String
>
failureSms
=
new
ArrayList
<>();
private
ArrayList
<
String
>
successSms
=
new
ArrayList
<>();
private
Integer
pipelineLevel
=
null
;
private
Integer
pipelineExecId
=
null
;
private
Integer
queueLevel
=
0
;
private
String
concurrentOption
=
CONCURRENT_OPTION_IGNORE
;
private
String
mailCreator
=
DefaultMailCreator
.
DEFAULT_MAIL_CREATOR
;
private
boolean
memoryCheck
=
true
;
private
Map
<
String
,
String
>
flowParameters
=
new
HashMap
<>();
private
FailureAction
failureAction
=
FailureAction
.
FINISH_CURRENTLY_RUNNING
;
private
List
<
DisabledJob
>
initiallyDisabledJobs
=
new
ArrayList
<>();
private
List
<
SlaOption
>
slaOptions
=
new
ArrayList
<>();
public
static
ExecutionOptions
createFromObject
(
final
Object
obj
)
{
if
(
obj
==
null
||
!(
obj
instanceof
Map
))
{
return
null
;
}
final
Map
<
String
,
Object
>
optionsMap
=
(
Map
<
String
,
Object
>)
obj
;
final
TypedMapWrapper
<
String
,
Object
>
wrapper
=
new
TypedMapWrapper
<>(
optionsMap
);
final
ExecutionOptions
options
=
new
ExecutionOptions
();
if
(
optionsMap
.
containsKey
(
FLOW_PARAMETERS
))
{
options
.
flowParameters
=
new
HashMap
<>();
options
.
flowParameters
.
putAll
(
wrapper
.<
String
,
String
>
getMap
(
FLOW_PARAMETERS
));
}
// Failure notification
options
.
notifyOnFirstFailure
=
wrapper
.
getBool
(
NOTIFY_ON_FIRST_FAILURE
,
options
.
notifyOnFirstFailure
);
options
.
notifyOnLastFailure
=
wrapper
.
getBool
(
NOTIFY_ON_LAST_FAILURE
,
options
.
notifyOnLastFailure
);
options
.
concurrentOption
=
wrapper
.
getString
(
CONCURRENT_OPTION
,
options
.
concurrentOption
);
options
.
smsOnFirstFailure
=
wrapper
.
getBool
(
SMS_ON_FIRST_FAILURE
,
options
.
smsOnFirstFailure
);
options
.
smsOnLastFailure
=
wrapper
.
getBool
(
SMS_ON_LAST_FAILURE
,
options
.
smsOnLastFailure
);
if
(
wrapper
.
containsKey
(
DISABLE
))
{
options
.
initiallyDisabledJobs
=
DisabledJob
.
fromDeprecatedObjectList
(
wrapper
.<
Object
>
getList
(
DISABLE
));
}
if
(
optionsMap
.
containsKey
(
MAIL_CREATOR
))
{
options
.
mailCreator
=
(
String
)
optionsMap
.
get
(
MAIL_CREATOR
);
}
// Failure action
options
.
failureAction
=
FailureAction
.
valueOf
(
wrapper
.
getString
(
FAILURE_ACTION
,
options
.
failureAction
.
toString
()));
options
.
pipelineLevel
=
wrapper
.
getInt
(
PIPELINE_LEVEL
,
options
.
pipelineLevel
);
options
.
pipelineExecId
=
wrapper
.
getInt
(
PIPELINE_EXECID
,
options
.
pipelineExecId
);
options
.
queueLevel
=
wrapper
.
getInt
(
QUEUE_LEVEL
,
options
.
queueLevel
);
// Success emails
options
.
setSuccessEmails
(
wrapper
.<
String
>
getList
(
SUCCESS_EMAILS
,
Collections
.<
String
>
emptyList
()));
options
.
setFailureEmails
(
wrapper
.<
String
>
getList
(
FAILURE_EMAILS
,
Collections
.<
String
>
emptyList
()));
//添加手机验证
options
.
setFailureSms
(
wrapper
.
getList
(
FAILURE_SMS
,
Collections
.
emptyList
()));
options
.
setSuccessSms
(
wrapper
.
getList
(
SUCCESS_SMS
,
Collections
.
emptyList
()));
options
.
setSuccessEmailsOverridden
(
wrapper
.
getBool
(
SUCCESS_EMAILS_OVERRIDE
,
false
));
options
.
setFailureEmailsOverridden
(
wrapper
.
getBool
(
FAILURE_EMAILS_OVERRIDE
,
false
));
//添加手机验证
options
.
setSuccessSmsOverride
(
wrapper
.
getBool
(
SUCCESS_SMS_OVERRIDE
,
false
));
options
.
setFailureSmsOverride
(
wrapper
.
getBool
(
FAILURE_SMS_OVERRIDE
,
false
));
options
.
setMemoryCheck
(
wrapper
.
getBool
(
MEMORY_CHECK
,
true
));
// Note: slaOptions was originally outside of execution options, so it parsed and set
// separately for the original JSON format. New formats should include slaOptions as
// part of execution options.
return
options
;
}
public
void
addAllFlowParameters
(
final
Map
<
String
,
String
>
flowParam
)
{
this
.
flowParameters
.
putAll
(
flowParam
);
}
public
Map
<
String
,
String
>
getFlowParameters
()
{
return
this
.
flowParameters
;
}
public
boolean
isFailureEmailsOverridden
()
{
return
this
.
failureEmailsOverride
;
}
public
void
setFailureEmailsOverridden
(
final
boolean
override
)
{
this
.
failureEmailsOverride
=
override
;
}
public
boolean
isSuccessEmailsOverridden
()
{
return
this
.
successEmailsOverride
;
}
public
void
setSuccessEmailsOverridden
(
final
boolean
override
)
{
this
.
successEmailsOverride
=
override
;
}
public
List
<
String
>
getFailureEmails
()
{
return
this
.
failureEmails
;
}
public
void
setFailureEmails
(
final
Collection
<
String
>
emails
)
{
this
.
failureEmails
=
new
ArrayList
<>(
emails
);
}
public
List
<
String
>
getSuccessEmails
()
{
return
this
.
successEmails
;
}
public
void
setSuccessEmails
(
final
Collection
<
String
>
emails
)
{
this
.
successEmails
=
new
ArrayList
<>(
emails
);
}
public
boolean
isFailureSmsOverride
()
{
return
this
.
failureSmsOverride
;
}
public
void
setFailureSmsOverride
(
final
boolean
failureSmsOverride
)
{
this
.
failureSmsOverride
=
failureSmsOverride
;
}
public
boolean
isSuccessSmsOverride
()
{
return
this
.
successSmsOverride
;
}
public
void
setSuccessSmsOverride
(
boolean
successSmsOverride
)
{
this
.
successSmsOverride
=
successSmsOverride
;
}
public
ArrayList
<
String
>
getFailureSms
()
{
return
this
.
failureSms
;
}
public
void
setFailureSms
(
final
Collection
<
String
>
failureSms
)
{
if
(
failureSms
!=
null
&&
failureSms
.
size
()>
0
)
this
.
failureSms
=
new
ArrayList
<>(
failureSms
);
}
public
ArrayList
<
String
>
getSuccessSms
()
{
return
this
.
successSms
;
}
public
void
setSuccessSms
(
final
Collection
<
String
>
successSms
)
{
if
(
successSms
!=
null
&&
successSms
.
size
()>
0
)
this
.
successSms
=
new
ArrayList
<>(
successSms
);
}
public
boolean
getNotifyOnFirstFailure
()
{
return
this
.
notifyOnFirstFailure
;
}
public
void
setNotifyOnFirstFailure
(
final
boolean
notify
)
{
this
.
notifyOnFirstFailure
=
notify
;
}
public
boolean
getNotifyOnLastFailure
()
{
return
this
.
notifyOnLastFailure
;
}
public
void
setNotifyOnLastFailure
(
final
boolean
notify
)
{
this
.
notifyOnLastFailure
=
notify
;
}
public
boolean
getSmsOnFirstFailure
()
{
return
this
.
smsOnFirstFailure
;
}
public
void
setSmsOnFirstFailure
(
final
boolean
smsOnFirstFailure
)
{
this
.
smsOnFirstFailure
=
smsOnFirstFailure
;
}
public
boolean
getSmsOnLastFailure
()
{
return
this
.
smsOnLastFailure
;
}
public
void
setSmsOnLastFailure
(
final
boolean
smsOnLastFailure
)
{
this
.
smsOnLastFailure
=
smsOnLastFailure
;
}
public
FailureAction
getFailureAction
()
{
return
this
.
failureAction
;
}
public
void
setFailureAction
(
final
FailureAction
action
)
{
this
.
failureAction
=
action
;
}
public
String
getConcurrentOption
()
{
return
this
.
concurrentOption
;
}
public
void
setConcurrentOption
(
final
String
concurrentOption
)
{
this
.
concurrentOption
=
concurrentOption
;
}
public
String
getMailCreator
()
{
return
this
.
mailCreator
;
}
public
void
setMailCreator
(
final
String
mailCreator
)
{
this
.
mailCreator
=
mailCreator
;
}
public
Integer
getPipelineLevel
()
{
return
this
.
pipelineLevel
;
}
public
void
setPipelineLevel
(
final
Integer
level
)
{
this
.
pipelineLevel
=
level
;
}
public
Integer
getPipelineExecutionId
()
{
return
this
.
pipelineExecId
;
}
public
void
setPipelineExecutionId
(
final
Integer
id
)
{
this
.
pipelineExecId
=
id
;
}
public
Integer
getQueueLevel
()
{
return
this
.
queueLevel
;
}
public
List
<
DisabledJob
>
getDisabledJobs
()
{
return
new
ArrayList
<>(
this
.
initiallyDisabledJobs
);
}
public
void
setDisabledJobs
(
final
List
<
DisabledJob
>
disabledJobs
)
{
this
.
initiallyDisabledJobs
=
disabledJobs
;
}
public
boolean
getMemoryCheck
()
{
return
this
.
memoryCheck
;
}
public
void
setMemoryCheck
(
final
boolean
memoryCheck
)
{
this
.
memoryCheck
=
memoryCheck
;
}
public
List
<
SlaOption
>
getSlaOptions
()
{
return
slaOptions
;
}
public
void
setSlaOptions
(
final
List
<
SlaOption
>
slaOptions
)
{
this
.
slaOptions
=
slaOptions
;
}
public
Map
<
String
,
Object
>
toObject
()
{
final
HashMap
<
String
,
Object
>
flowOptionObj
=
new
HashMap
<>();
flowOptionObj
.
put
(
FLOW_PARAMETERS
,
this
.
flowParameters
);
flowOptionObj
.
put
(
NOTIFY_ON_FIRST_FAILURE
,
this
.
notifyOnFirstFailure
);
flowOptionObj
.
put
(
NOTIFY_ON_LAST_FAILURE
,
this
.
notifyOnLastFailure
);
flowOptionObj
.
put
(
SMS_ON_FIRST_FAILURE
,
this
.
smsOnFirstFailure
);
flowOptionObj
.
put
(
SMS_ON_LAST_FAILURE
,
this
.
smsOnLastFailure
);
flowOptionObj
.
put
(
SUCCESS_EMAILS
,
this
.
successEmails
);
flowOptionObj
.
put
(
FAILURE_EMAILS
,
this
.
failureEmails
);
flowOptionObj
.
put
(
FAILURE_ACTION
,
this
.
failureAction
.
toString
());
flowOptionObj
.
put
(
PIPELINE_LEVEL
,
this
.
pipelineLevel
);
flowOptionObj
.
put
(
PIPELINE_EXECID
,
this
.
pipelineExecId
);
flowOptionObj
.
put
(
QUEUE_LEVEL
,
this
.
queueLevel
);
flowOptionObj
.
put
(
CONCURRENT_OPTION
,
this
.
concurrentOption
);
flowOptionObj
.
put
(
DISABLE
,
DisabledJob
.
toDeprecatedObjectList
(
this
.
initiallyDisabledJobs
));
flowOptionObj
.
put
(
FAILURE_EMAILS_OVERRIDE
,
this
.
failureEmailsOverride
);
flowOptionObj
.
put
(
SUCCESS_EMAILS_OVERRIDE
,
this
.
successEmailsOverride
);
flowOptionObj
.
put
(
MAIL_CREATOR
,
this
.
mailCreator
);
flowOptionObj
.
put
(
MEMORY_CHECK
,
this
.
memoryCheck
);
flowOptionObj
.
put
(
SUCCESS_SMS_OVERRIDE
,
this
.
successSmsOverride
);
flowOptionObj
.
put
(
FAILURE_SMS_OVERRIDE
,
this
.
failureSmsOverride
);
flowOptionObj
.
put
(
SUCCESS_SMS
,
this
.
successSms
);
flowOptionObj
.
put
(
FAILURE_SMS
,
failureSms
);
return
flowOptionObj
;
}
public
String
toJSON
()
{
return
new
GsonBuilder
().
setPrettyPrinting
().
create
().
toJson
(
toObject
());
}
public
enum
FailureAction
{
FINISH_CURRENTLY_RUNNING
,
CANCEL_ALL
,
FINISH_ALL_POSSIBLE
}
}
azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
View file @
3e829a3a
...
...
@@ -58,9 +58,8 @@ import org.apache.commons.lang.StringUtils;
import
org.apache.log4j.Logger
;
import
org.joda.time.DateTime
;
/**
/**
执行管理器用于管理客户端工作。
* Executor manager used to manage the client side job.
*
* @deprecated replaced by {@link ExecutionController}
*/
@Singleton
...
...
@@ -81,7 +80,7 @@ public class ExecutorManager extends EventHandler implements
private
final
RunningExecutionsUpdaterThread
updaterThread
;
private
final
ExecutorApiGateway
apiGateway
;
private
final
int
maxConcurrentRunsOneFlow
;
private
final
Map
<
Pair
<
String
,
String
>,
Integer
>
maxConcurrentRunsPerFlowMap
;
private
final
Map
<
Pair
<
String
,
String
>,
Integer
>
maxConcurrentRunsPerFlowMap
;
private
final
ExecutorManagerUpdaterStage
updaterStage
;
private
final
ExecutionFinalizer
executionFinalizer
;
private
final
ActiveExecutors
activeExecutors
;
...
...
@@ -892,6 +891,8 @@ public class ExecutorManager extends EventHandler implements
final
String
exFlowKey
=
exflow
.
getProjectName
()
+
"."
+
exflow
.
getId
()
+
".submitFlow"
;
// using project and flow name to prevent race condition when same flow is submitted by API and schedule at the same time
// causing two same flow submission entering this piece.
//当API和schedule同时提交相同的流时,使用项目和流名称防止竞态条件
//导致两个相同的流提交进入此部分。
synchronized
(
exFlowKey
.
intern
())
{
final
String
flowId
=
exflow
.
getFlowId
();
...
...
@@ -913,6 +914,7 @@ public class ExecutorManager extends EventHandler implements
exflow
.
setSubmitTime
(
System
.
currentTimeMillis
());
// Get collection of running flows given a project and a specific flow name
//获取给定项目和特定流名称的运行流的集合
final
List
<
Integer
>
running
=
getRunningFlows
(
projectId
,
flowId
);
ExecutionOptions
options
=
exflow
.
getExecutionOptions
();
...
...
@@ -965,10 +967,14 @@ public class ExecutorManager extends EventHandler implements
// The exflow id is set by the loader. So it's unavailable until after
// this call.
// exflow id由加载程序设置。所以要到之后才能用
//这个调用。
this
.
executorLoader
.
uploadExecutableFlow
(
exflow
);
// We create an active flow reference in the datastore. If the upload
// fails, we remove the reference.
//我们在数据存储中创建一个活动流引用。如果上传
//失败,我们删除引用。
final
ExecutionReference
reference
=
new
ExecutionReference
(
exflow
.
getExecutionId
());
...
...
azkaban-common/src/main/java/azkaban/executor/QueuedExecutions.java
View file @
3e829a3a
...
...
@@ -58,7 +58,7 @@ public class QueuedExecutions {
/**
* <pre>
* Helper method to have a single point of insertion in the queued flows
*
*
帮助器方法,以便在队列流中具有单点插入
* @param exflow
* flow to be enqueued
* @param ref
...
...
azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
View file @
3e829a3a
...
...
@@ -55,12 +55,14 @@ public class CommonJobProperties {
* Comma delimited list of email addresses for success messages
*/
public
static
final
String
SUCCESS_EMAILS
=
"success.emails"
;
public
static
final
String
NOTIFY_SMS
=
"notify.sms"
;
/**
*
* Comma delimited list of email addresses for failure messages
*/
public
static
final
String
FAILURE_EMAILS
=
"failure.emails"
;
public
static
final
String
SUCCESS_SMS
=
"success.sms"
;
public
static
final
String
FAILURE_SMS
=
"failure.sms"
;
/*
* The following are the common props that will be added to the job by azkaban
*/
...
...
azkaban-common/src/main/java/azkaban/flow/Flow.java
View file @
3e829a3a
...
...
@@ -42,6 +42,9 @@ public class Flow {
private
int
numLevels
=
-
1
;
private
List
<
String
>
failureEmail
=
new
ArrayList
<>();
private
List
<
String
>
successEmail
=
new
ArrayList
<>();
//添加手机号
private
List
<
String
>
failureSms
=
new
ArrayList
<>();
private
List
<
String
>
successSms
=
new
ArrayList
<>();
private
String
mailCreator
=
DefaultMailCreator
.
DEFAULT_MAIL_CREATOR
;
private
ArrayList
<
String
>
errors
;
private
int
version
=
-
1
;
...
...
@@ -119,6 +122,8 @@ public class Flow {
flow
.
failureEmail
=
(
List
<
String
>)
flowObject
.
get
(
"failure.email"
);
flow
.
successEmail
=
(
List
<
String
>)
flowObject
.
get
(
"success.email"
);
flow
.
failureSms
=(
List
<
String
>)
flowObject
.
get
(
"failure.sms"
);
flow
.
successSms
=(
List
<
String
>)
flowObject
.
get
(
"success.sms"
);
if
(
flowObject
.
containsKey
(
"mailCreator"
))
{
flow
.
mailCreator
=
flowObject
.
get
(
"mailCreator"
).
toString
();
}
...
...
@@ -219,7 +224,9 @@ public class Flow {
public
List
<
String
>
getSuccessEmails
()
{
return
this
.
successEmail
;
}
public
List
<
String
>
getSuccessSms
()
{
return
this
.
successSms
;
}
public
String
getMailCreator
()
{
return
this
.
mailCreator
;
}
...
...
@@ -231,7 +238,9 @@ public class Flow {
public
List
<
String
>
getFailureEmails
()
{
return
this
.
failureEmail
;
}
public
List
<
String
>
getFailureSms
()
{
return
this
.
failureSms
;
}
public
void
addSuccessEmails
(
final
Collection
<
String
>
emails
)
{
this
.
successEmail
.
addAll
(
emails
);
}
...
...
@@ -239,6 +248,12 @@ public class Flow {
public
void
addFailureEmails
(
final
Collection
<
String
>
emails
)
{
this
.
failureEmail
.
addAll
(
emails
);
}
public
void
addFailureSms
(
final
Collection
<
String
>
sms
)
{
this
.
failureSms
.
addAll
(
sms
);
}
public
void
addSuccessSms
(
final
Collection
<
String
>
sms
)
{
this
.
successSms
.
addAll
(
sms
);
}
public
int
getNumLevels
()
{
return
this
.
numLevels
;
...
...
@@ -348,6 +363,8 @@ public class Flow {
flowObj
.
put
(
"edges"
,
objectizeEdges
());
flowObj
.
put
(
"failure.email"
,
this
.
failureEmail
);
flowObj
.
put
(
"success.email"
,
this
.
successEmail
);
flowObj
.
put
(
"failure.sms"
,
this
.
failureSms
);
flowObj
.
put
(
"success.sms"
,
this
.
successSms
);
flowObj
.
put
(
"mailCreator"
,
this
.
mailCreator
);
flowObj
.
put
(
"layedout"
,
this
.
isLayedOut
);
flowObj
.
put
(
"embeddedFlow"
,
this
.
isEmbeddedFlow
);
...
...
azkaban-common/src/main/java/azkaban/flow/FlowUtils.java
View file @
3e829a3a
...
...
@@ -67,6 +67,7 @@ public class FlowUtils {
/**
* Change job status to disabled in exflow if the job is in disabledJobs
* 如果作业处于disabledJobs中,则在exflow中将作业状态更改为disabled
*/
public
static
void
applyDisabledJobs
(
final
List
<
DisabledJob
>
disabledJobs
,
final
ExecutableFlowBase
exflow
)
{
...
...
azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
View file @
3e829a3a
...
...
@@ -324,6 +324,7 @@ public class DirectoryFlowLoader implements FlowLoader {
final
Props
jobProp
=
this
.
jobPropsMap
.
get
(
base
.
getId
());
FlowLoaderUtils
.
addEmailPropsToFlow
(
flow
,
jobProp
);
FlowLoaderUtils
.
addSmsPropsToFlow
(
flow
,
jobProp
);
flow
.
addAllFlowProperties
(
this
.
flowPropsList
);
final
Set
<
String
>
visitedNodesOnPath
=
new
HashSet
<>();
...
...
azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
View file @
3e829a3a
...
...
@@ -151,6 +151,7 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
flow
.
setAzkabanFlowVersion
(
Constants
.
AZKABAN_FLOW_VERSION_2_0
);
final
Props
props
=
azkabanFlow
.
getProps
();
FlowLoaderUtils
.
addEmailPropsToFlow
(
flow
,
props
);
FlowLoaderUtils
.
addSmsPropsToFlow
(
flow
,
props
);
props
.
setSource
(
flowFile
.
getName
());
flow
.
addAllFlowProperties
(
ImmutableList
.
of
(
new
FlowProps
(
props
)));
...
...
azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
View file @
3e829a3a
...
...
@@ -225,6 +225,35 @@ public class FlowLoaderUtils {
flow
.
addSuccessEmails
(
successEmail
);
}
public
static
void
addSmsPropsToFlow
(
final
Flow
flow
,
final
Props
prop
)
{
final
List
<
String
>
successSmsList
=
prop
.
getStringList
(
CommonJobProperties
.
SUCCESS_SMS
,
Collections
.
EMPTY_LIST
);
final
Set
<
String
>
successSms
=
new
HashSet
<>();
for
(
final
String
sms
:
successSmsList
)
{
successSms
.
add
(
sms
.
toLowerCase
());
}
final
List
<
String
>
failureSmsList
=
prop
.
getStringList
(
CommonJobProperties
.
FAILURE_SMS
,
Collections
.
EMPTY_LIST
);
final
Set
<
String
>
failureSms
=
new
HashSet
<>();
for
(
final
String
sms
:
failureSmsList
)
{
failureSms
.
add
(
sms
.
toLowerCase
());
}
final
List
<
String
>
notifySmslList
=
prop
.
getStringList
(
CommonJobProperties
.
NOTIFY_SMS
,
Collections
.
EMPTY_LIST
);
for
(
String
sms
:
notifySmslList
)
{
sms
=
sms
.
toLowerCase
();
successSms
.
add
(
sms
);
failureSms
.
add
(
sms
);
}
flow
.
addFailureSms
(
failureSms
);
flow
.
addSuccessSms
(
successSms
);
}
/**
* Generate flow loader report validation report.
*
...
...
azkaban-common/src/main/java/azkaban/server/HttpRequestUtils.java
View file @
3e829a3a
...
...
@@ -37,8 +37,10 @@ import javax.servlet.ServletException;
import
javax.servlet.http.HttpServletRequest
;
import
org.apache.commons.lang.StringUtils
;
import
org.apache.log4j.Logger
;
public
class
HttpRequestUtils
{
private
static
final
Logger
logger
=
Logger
.
getLogger
(
HttpRequestUtils
.
class
);
public
static
ExecutionOptions
parseFlowOptions
(
final
HttpServletRequest
req
)
throws
ServletException
{
...
...
@@ -78,6 +80,37 @@ public class HttpRequestUtils {
execOptions
.
setSuccessEmails
(
Arrays
.
asList
(
emailSplit
));
}
}
logger
.
info
(
"============收到HTTP请求================="
);
//添加短信解析
if
(
hasParam
(
req
,
"failureSmsOverride"
))
{
logger
.
info
(
"============收到failureSmsOverride================="
);
final
boolean
override
=
getBooleanParam
(
req
,
"failureSmsOverride"
,
false
);
execOptions
.
setFailureSmsOverride
(
override
);
}
if
(
hasParam
(
req
,
"successSmsOverride"
))
{
logger
.
info
(
"============successSmsOverride================="
);
final
boolean
override
=
getBooleanParam
(
req
,
"successSmsOverride"
,
false
);
execOptions
.
setSuccessSmsOverride
(
override
);
}
if
(
hasParam
(
req
,
"failureSms"
))
{
final
String
emails
=
getParam
(
req
,
"failureSms"
);
logger
.
info
(
"============failureSms================="
+
emails
);
if
(!
emails
.
isEmpty
())
{
final
String
[]
emailSplit
=
emails
.
split
(
"\\s*,\\s*|\\s*;\\s*|\\s+"
);
execOptions
.
setFailureSms
(
Arrays
.
asList
(
emailSplit
));
}
}
if
(
hasParam
(
req
,
"successSms"
))
{
final
String
emails
=
getParam
(
req
,
"successSms"
);
logger
.
info
(
"============successSms================="
+
emails
);
if
(!
emails
.
isEmpty
())
{
final
String
[]
emailSplit
=
emails
.
split
(
"\\s*,\\s*|\\s*;\\s*|\\s+"
);
execOptions
.
setSuccessSms
(
Arrays
.
asList
(
emailSplit
));
}
}
if
(
hasParam
(
req
,
"notifyFailureFirst"
))
{
execOptions
.
setNotifyOnFirstFailure
(
Boolean
.
parseBoolean
(
getParam
(
req
,
"notifyFailureFirst"
)));
...
...
@@ -86,6 +119,17 @@ public class HttpRequestUtils {
execOptions
.
setNotifyOnLastFailure
(
Boolean
.
parseBoolean
(
getParam
(
req
,
"notifyFailureLast"
)));
}
if
(
hasParam
(
req
,
"smsFailureFirst"
)){
logger
.
info
(
"============smsFailureFirst================="
);
execOptions
.
setSmsOnFirstFailure
(
Boolean
.
parseBoolean
(
getParam
(
req
,
"smsFailureFirst"
)));
}
if
(
hasParam
(
req
,
"smsFailureLast"
))
{
logger
.
info
(
"============smsFailureLast================="
);
execOptions
.
setSmsOnLastFailure
(
Boolean
.
parseBoolean
(
getParam
(
req
,
"smsFailureLast"
)));
}
String
concurrentOption
=
getParam
(
req
,
"concurrentOption"
,
"skip"
);
execOptions
.
setConcurrentOption
(
concurrentOption
);
...
...
@@ -112,7 +156,8 @@ public class HttpRequestUtils {
if
(!
disabled
.
isEmpty
())
{
// TODO edlu: see if it's possible to pass in the new format
final
List
<
DisabledJob
>
disabledList
=
DisabledJob
.
fromDeprecatedObjectList
((
List
<
Object
>)
JSONUtils
.
parseJSONFromStringQuiet
(
disabled
));
DisabledJob
.
fromDeprecatedObjectList
((
List
<
Object
>)
JSONUtils
.
parseJSONFromStringQuiet
(
disabled
));
execOptions
.
setDisabledJobs
(
disabledList
);
}
}
...
...
@@ -268,7 +313,8 @@ public class HttpRequestUtils {
return
defaultVal
;
}
public
static
Map
<
String
,
String
>
getParamGroup
(
final
HttpServletRequest
request
,
final
String
groupName
)
{
public
static
Map
<
String
,
String
>
getParamGroup
(
final
HttpServletRequest
request
,
final
String
groupName
)
throws
ServletException
{
final
Enumeration
<
String
>
enumerate
=
request
.
getParameterNames
();
final
String
matchString
=
groupName
+
"["
;
...
...
azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java
View file @
3e829a3a
...
...
@@ -204,6 +204,13 @@ public class ExecuteFlowAction implements TriggerAction {
if
(!
this
.
executionOptions
.
isSuccessEmailsOverridden
())
{
this
.
executionOptions
.
setSuccessEmails
(
flow
.
getSuccessEmails
());
}
//添加手机号报警
if
(!
this
.
executionOptions
.
isFailureSmsOverride
())
{
this
.
executionOptions
.
setFailureSms
(
flow
.
getFailureSms
());
}
if
(!
this
.
executionOptions
.
isSuccessSmsOverride
())
{
this
.
executionOptions
.
setSuccessSms
(
flow
.
getSuccessSms
());
}
exflow
.
setExecutionOptions
(
this
.
executionOptions
);
...
...
azkaban-common/src/main/java/azkaban/utils/EmailMessage.java
View file @
3e829a3a
...
...
@@ -178,7 +178,13 @@ public class EmailMessage {
props
.
put
(
"mail.smtp.connectiontimeout"
,
_connectionTimeout
);
props
.
put
(
"mail.smtp.starttls.enable"
,
this
.
_tls
);
props
.
put
(
"mail.smtp.ssl.trust"
,
this
.
_mailHost
);
//添加认证协议
// props.put("mail.smtp.ssl.enable", true);
this
.
logger
.
info
(
"打印配置文件值================================================"
);
props
.
forEach
((
s
,
x
)->{
this
.
logger
.
info
(
s
.
toString
()+
"="
+
x
.
toString
());
});
this
.
logger
.
info
(
"============================================================"
);
final
JavaxMailSender
sender
=
this
.
creator
.
createSender
(
props
);
final
Message
message
=
sender
.
createMessage
();
...
...
@@ -216,9 +222,12 @@ public class EmailMessage {
}
private
void
connectToSMTPServer
(
final
JavaxMailSender
s
)
throws
MessagingException
{
this
.
logger
.
info
(
"host="
+
_mailHost
+
" port="
+
_mailPort
+
" user="
+
_mailUser
+
" pwd="
+
_mailPassword
);
if
(
this
.
_usesAuth
)
{
this
.
logger
.
info
(
"带参"
);
s
.
connect
(
this
.
_mailHost
,
this
.
_mailPort
,
this
.
_mailUser
,
this
.
_mailPassword
);
}
else
{
this
.
logger
.
info
(
"不带参"
);
s
.
connect
();
}
}
...
...
azkaban-common/src/main/java/azkaban/utils/Emailer.java
View file @
3e829a3a
...
...
@@ -94,6 +94,7 @@ public class Emailer extends AbstractMailer implements Alerter {
/**
* Send an email to the specified email list
* 发送电子邮件到指定的电子邮件列表
*/
public
void
sendEmail
(
final
List
<
String
>
emailList
,
final
String
subject
,
final
String
body
)
{
if
(
emailList
!=
null
&&
!
emailList
.
isEmpty
())
{
...
...
@@ -159,6 +160,8 @@ public class Emailer extends AbstractMailer implements Alerter {
* [mail creator] x [failure email address list]
*
* Executions with the same combo are grouped into a single message.
*
* 发送尽可能多的电子邮件,因为有独特的组合:[邮件造物主]x[失败的电子邮件地址列表]执行相同的组合,分组成一个单一的消息。
*/
@Override
public
void
alertOnFailedUpdate
(
final
Executor
executor
,
List
<
ExecutableFlow
>
flows
,
...
...
@@ -190,6 +193,7 @@ public class Emailer extends AbstractMailer implements Alerter {
/**
* Sends a single email about failed updates.
* 发送一个单一的电子邮件关于失败的更新。
*/
private
void
sendFailedUpdateEmail
(
final
Executor
executor
,
final
ExecutorManagerException
exception
,
final
MailCreator
mailCreator
,
...
...
azkaban-common/src/main/java/azkaban/utils/SmsAlerter.java
View file @
3e829a3a
...
...
@@ -31,9 +31,6 @@ public class SmsAlerter implements Alerter {
sendSms
(
successSms
,
getSuccessMSG
(
exflow
));
}
/**
* Send a message to the specified phone list
*/
public
void
sendSms
(
final
List
<
String
>
phoneList
,
final
String
msg
)
{
if
(
phoneList
!=
null
&&
!
phoneList
.
isEmpty
())
{
for
(
String
phone
:
phoneList
)
{
...
...
@@ -43,6 +40,7 @@ public class SmsAlerter implements Alerter {
}
else
{
logger
.
warn
(
"没有手机号码"
);
}
}
/***
...
...
@@ -83,7 +81,7 @@ public class SmsAlerter implements Alerter {
String
message
=
"\n\t项目%s执行失败"
+
"\n\t任务ID:%s\n\t项目名:%s\n\tflow名:%s"
+
"\n\t任务开始时间:%s\n\t任务结束时间:%s\n\t任务总耗时:%s"
+
"\n\t失败的job:%s"
+
"\n\t失败的job:%s"
+
"\n\t任务状态:%s"
+
"\n\t错误日志:%s"
;
//任务id
...
...
@@ -130,7 +128,6 @@ public class SmsAlerter implements Alerter {
}
return
failedJobs
;
}
@Override
public
void
alertOnError
(
ExecutableFlow
exflow
,
String
...
extraReasons
)
{
this
.
logger
.
info
(
"=====================执行失败,发送手机短信==========================="
);
...
...
@@ -164,5 +161,4 @@ public class SmsAlerter implements Alerter {
}
}
}
}
azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
View file @
3e829a3a
/*
* Copyright 2018 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the “License”); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
* Copyright 2018 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the “License”); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package
azkaban
.
executor
;
import
static
org
.
assertj
.
core
.
api
.
Assertions
.
assertThat
;
...
...
azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
View file @
3e829a3a
...
...
@@ -279,10 +279,24 @@ public class FlowRunner extends EventHandler implements Runnable {
Event
.
create
(
this
,
EventType
.
FLOW_FINISHED
,
new
EventData
(
this
.
flow
)));
// In polling model, executor will be responsible for sending alerting emails when a flow
// finishes.
//在轮询模型中,executor将负责在流结束时发送警告电子邮件。
// Todo jamiesjc: switch to event driven model and alert on FLOW_FINISHED event.
if
(
this
.
azkabanProps
.
getBoolean
(
ConfigurationKeys
.
AZKABAN_POLL_MODEL
,
false
))
{
String
[]
finalizeFlowReasons
=
ExecutionControllerUtils
.
getFinalizeFlowReasons
(
"Flow finished"
,
null
);
try
{
List
<
String
>
errorLogs
=
ExecutionControllerUtils
.
getErrorLogs
(
flow
,
executorLoader
);
for
(
String
finalizeFlowReason
:
finalizeFlowReasons
)
{
String
erroLine
=
ExecutionControllerUtils
.
getErroLine
(
finalizeFlowReason
);
errorLogs
.
add
(
erroLine
);
}
finalizeFlowReasons
=
errorLogs
.
toArray
(
new
String
[
errorLogs
.
size
()]);
}
catch
(
Exception
e
){
e
.
printStackTrace
();
}
ExecutionControllerUtils
.
alertUserOnFlowFinished
(
this
.
flow
,
this
.
alerterHolder
,
ExecutionControllerUtils
.
getFinalizeFlowReasons
(
"Flow finished"
,
null
));
finalizeFlowReasons
,
this
.
executorLoader
);
// ExecutionControllerUtils.alertUserOnFlowFinished(this.flow, this.alerterHolder,
// ExecutionControllerUtils.getFinalizeFlowReasons("Flow finished", null),this.executorLoader);
}
}
}
...
...
azkaban-exec-server/src/main/resources/conf/azkaban.properties
View file @
3e829a3a
...
...
@@ -42,7 +42,7 @@ azkaban.jobtype.plugin.dir=/opt/azkaban3/azkaban-exec-server-0.1.0-SNAPSHOT/plug
database.type
=
mysql
mysql.port
=
3306
mysql.host
=
localhost
mysql.database
=
azkaban
mysql.database
=
azkaban
_db
mysql.user
=
azkaban
mysql.password
=
azkaban
mysql.numconnections
=
100
...
...
azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
View file @
3e829a3a
...
...
@@ -54,7 +54,6 @@ import azkaban.webapp.plugin.ViewerPlugin;
import
java.io.IOException
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.LinkedHashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Optional
;
...
...
@@ -285,8 +284,9 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
private
void
handleExecutionJobDetailsPage
(
final
HttpServletRequest
req
,
final
HttpServletResponse
resp
,
final
Session
session
)
throws
ServletException
,
IOException
{
final
Page
page
=
newPage
(
req
,
resp
,
session
,
"azkaban/webapp/servlet/velocity/jobdetailspage.vm"
);
final
Page
page
=
newPage
(
req
,
resp
,
session
,
"azkaban/webapp/servlet/velocity/jobdetailspage.vm"
);
final
User
user
=
session
.
getUser
();
final
int
execId
=
getIntParam
(
req
,
"execid"
);
final
String
jobId
=
getParam
(
req
,
"job"
);
...
...
@@ -308,7 +308,8 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
node
=
flow
.
getExecutableNodePath
(
jobId
);
if
(
node
==
null
)
{
page
.
add
(
"errorMsg"
,
"Job "
+
jobId
+
" doesn't exist in "
+
flow
.
getExecutionId
());
page
.
add
(
"errorMsg"
,
"Job "
+
jobId
+
" doesn't exist in "
+
flow
.
getExecutionId
());
return
;
}
...
...
@@ -657,6 +658,9 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
ret
.
put
(
"successEmails"
,
flow
.
getSuccessEmails
());
ret
.
put
(
"failureEmails"
,
flow
.
getFailureEmails
());
ret
.
put
(
"successSms"
,
flow
.
getSuccessSms
());
ret
.
put
(
"failureSms"
,
flow
.
getFailureSms
());
Schedule
sflow
=
null
;
try
{
...
...
@@ -953,6 +957,15 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
if
(!
options
.
isSuccessEmailsOverridden
())
{
options
.
setSuccessEmails
(
flow
.
getSuccessEmails
());
}
if
(!
options
.
isFailureSmsOverride
())
{
if
(
flow
.
getFailureSms
()!=
null
&&
flow
.
getFailureSms
().
size
()>
0
)
options
.
setFailureSms
(
flow
.
getFailureSms
());
}
if
(!
options
.
isSuccessSmsOverride
())
{
if
(
flow
.
getSuccessSms
()!=
null
&&
flow
.
getSuccessSms
().
size
()>
0
)
options
.
setSuccessSms
(
flow
.
getSuccessSms
());
}
options
.
setMailCreator
(
flow
.
getMailCreator
());
try
{
...
...
azkaban-web-server/src/main/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
View file @
3e829a3a
...
...
@@ -26,6 +26,7 @@ import azkaban.user.UserManager;
import
azkaban.user.UserManagerException
;
import
azkaban.utils.StringUtils
;
import
azkaban.webapp.WebMetrics
;
import
java.io.BufferedInputStream
;
import
java.io.File
;
import
java.io.FileInputStream
;
...
...
@@ -41,6 +42,7 @@ import javax.servlet.ServletException;
import
javax.servlet.http.Cookie
;
import
javax.servlet.http.HttpServletRequest
;
import
javax.servlet.http.HttpServletResponse
;
import
org.apache.commons.fileupload.servlet.ServletFileUpload
;
import
org.apache.commons.io.IOUtils
;
import
org.apache.log4j.Logger
;
...
...
azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/flowexecutionpanel.vm
View file @
3e829a3a
...
...
@@ -62,7 +62,7 @@
<div class="col-xs-8">
<div id="execution-graph-options-panel">
## SVG graph panel.
## SVG graph panel.
<div id="svg-div-custom" class="side-panel">
<svg id="flow-executing-graph" xmlns="http://www.w3.org/2000/svg" version="1.1"
...
...
@@ -70,7 +70,7 @@
</svg>
</div>
## Notification panel.
## Notification panel.
<div id="notification-panel" class="side-panel">
<h4>Notify on failure</h4>
...
...
@@ -156,7 +156,7 @@
<textarea class="form-control" rows="3" id="success-sms"></textarea>
</div>
## Failure options panel.
## Failure options panel.
<div id="failure-options" class="side-panel">
<h4>Failure Options</h4>
...
...
@@ -179,7 +179,7 @@
</select>
</div>
## Concurrent execution options panel.
## Concurrent execution options panel.
<div id="concurrent-panel" class="side-panel">
<h4>Concurrent Execution Options</h4>
...
...
@@ -199,7 +199,7 @@
Run Concurrently
</label>
<span
class="help-block">Run the flow anyway. Previous execution is unaffected.</span>
class="help-block">Run the flow anyway. Previous execution is unaffected.</span>
</div>
<div class="radio">
...
...
@@ -222,7 +222,7 @@
</div>
</div>
## Flow parameters panel
## Flow parameters panel
<div id="flow-parameters-panel" class="side-panel">
<h4>Flow Property Override</h4>
...
...
@@ -258,13 +258,13 @@
</div>
#end
#*
#if ($triggerPlugins.size() > 0)
#foreach ($triggerPlugin in $triggerPlugins)
<button type="button" class="btn btn-default" id=set-$triggerPlugin.pluginName>$triggerPlugin.pluginName</button>
#end
#*
#if ($triggerPlugins.size() > 0)
#foreach ($triggerPlugin in $triggerPlugins)
<button type="button" class="btn btn-default" id=set-$triggerPlugin.pluginName>$triggerPlugin.pluginName</button>
#end
*#
#end
*#
<button type="button" class="btn btn-default" data-dismiss="modal">Cancel</button>
<button type="button" class="btn btn-primary" id="execute-btn">Execute</button>
</div><!-- /modal-footer -->
...
...
@@ -286,4 +286,4 @@
#end
*#
<div id="contextMenu"></div>
\ No newline at end of file
<div id="contextMenu"></div>
azkaban-web-server/src/main/resources/conf/azkaban.properties
View file @
3e829a3a
...
...
@@ -39,7 +39,7 @@ lockdown.create.projects=false
cache.directory
=
cache
# Azkaban plugin settings
azkaban.jobtype.plugin.dir
=
/opt/azkaban3/azkaban-web-server-0.1.0-SNAPSHOT/plugins/jobtypes
#
azkaban.jobtype.plugin.dir=/opt/azkaban3/azkaban-web-server-0.1.0-SNAPSHOT/plugins/jobtypes
# JMX stats
jetty.connector.stats
=
true
...
...
@@ -55,7 +55,7 @@ executor.connector.stats=true
database.type
=
mysql
mysql.port
=
3306
mysql.host
=
localhost
mysql.database
=
azkaban
mysql.database
=
azkaban
_db
mysql.user
=
azkaban
mysql.password
=
azkaban
mysql.numconnections
=
100
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment