项目:hadoop
文件:TestHsWebServicesTasks.java
public void verifyTaskGeneric(Task task,String id,String state,String type,String successfulAttempt,long startTime,long finishTime,long elapsedtime,float progress) {
TaskId taskid = task.getID();
String tid = MRApps.toString(taskid);
TaskReport report = task.getReport();
WebServicesTestUtils.checkStringMatch("id",tid,id);
WebServicesTestUtils.checkStringMatch("type",task.getType().toString(),type);
WebServicesTestUtils.checkStringMatch("state",report.getTaskState()
.toString(),state);
// not easily checked without duplicating logic,just make sure its here
assertNotNull("successfulAttempt null",successfulAttempt);
assertEquals("startTime wrong",report.getStartTime(),startTime);
assertEquals("finishTime wrong",report.getFinishTime(),finishTime);
assertEquals("elapsedtime wrong",finishTime - startTime,elapsedtime);
assertEquals("progress wrong",report.getProgress() * 100,progress,1e-3f);
}
private AtomicInteger containerNeed(TaskId taskID) {
JobId jobID = taskID.getJobId();
TaskType taskType = taskID.getTaskType();
ConcurrentMap<JobId,AtomicInteger> relevantMap
= taskType == TaskType.MAP ? mapContainerNeeds : reduceContainerNeeds;
AtomicInteger result = relevantMap.get(jobID);
if (result == null) {
relevantMap.putIfAbsent(jobID,new AtomicInteger(0));
result = relevantMap.get(jobID);
}
return result;
}
项目:hadoop
文件:TestFail.java
@Test
//All Task attempts are timed out,leading to Job failure
public void testTimedOutTask() throws Exception {
MRApp app = new TimeOutTaskMRApp(1,0);
Configuration conf = new Configuration();
int maxAttempts = 2;
conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS,maxAttempts);
// disable uberization (requires entire job to be reattempted,so max for
// subtask attempts is overridden to 1)
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE,false);
Job job = app.submit(conf);
app.waitForState(job,JobState.Failed);
Map<TaskId,Task> tasks = job.getTasks();
Assert.assertEquals("Num tasks is not correct",1,tasks.size());
Task task = tasks.values().iterator().next();
Assert.assertEquals("Task state not correct",TaskState.Failed,task.getReport().getTaskState());
Map<TaskAttemptId,TaskAttempt> attempts =
tasks.values().iterator().next().getAttempts();
Assert.assertEquals("Num attempts is not correct",maxAttempts,attempts.size());
for (TaskAttempt attempt : attempts.values()) {
Assert.assertEquals("Attempt state not correct",TaskAttemptState.Failed,attempt.getReport().getTaskAttemptState());
}
}
项目:hadoop
文件:MRClientService.java
@SuppressWarnings("unchecked")
@Override
public KillTaskResponse killTask(KillTaskRequest request)
throws IOException {
TaskId taskId = request.getTaskId();
UserGroupinformation callerUGI = UserGroupinformation.getCurrentUser();
String message = "Kill task " + taskId + " received from " + callerUGI
+ " at " + Server.getRemoteAddress();
LOG.info(message);
verifyAndGetTask(taskId,JobACL.MODIFY_JOB);
appContext.getEventHandler().handle(
new TaskEvent(taskId,TaskEventType.T_KILL));
KillTaskResponse response =
recordFactory.newRecordInstance(KillTaskResponse.class);
return response;
}
项目:hadoop
文件:HsAttemptsPage.java
@Override
protected Collection<TaskAttempt> getTaskAttempts() {
List<TaskAttempt> fewTaskAttemps = new ArrayList<TaskAttempt>();
String taskTypestr = $(TASK_TYPE);
TaskType taskType = MRApps.taskType(taskTypestr);
String attemptStateStr = $(ATTEMPT_STATE);
TaskAttemptStateUI neededState = MRApps
.taskAttemptState(attemptStateStr);
Job j = app.getJob();
Map<TaskId,Task> tasks = j.getTasks(taskType);
for (Task task : tasks.values()) {
Map<TaskAttemptId,TaskAttempt> attempts = task.getAttempts();
for (TaskAttempt attempt : attempts.values()) {
if (neededState.correspondsTo(attempt.getState())) {
fewTaskAttemps.add(attempt);
}
}
}
return fewTaskAttemps;
}
项目:hadoop
文件:TestAMWebServicesTasks.java
public void verifyAMTask(JSONArray arr,Job job,String type)
throws JSONException {
for (Task task : job.getTasks().values()) {
TaskId id = task.getID();
String tid = MRApps.toString(id);
Boolean found = false;
if (type != null && task.getType() == MRApps.taskType(type)) {
for (int i = 0; i < arr.length(); i++) {
JSONObject info = arr.getJSONObject(i);
if (tid.matches(info.getString("id"))) {
found = true;
verifyAMSingleTask(info,task);
}
}
assertTrue("task with id: " + tid + " not in web service output",found);
}
}
}
项目:hadoop
文件:CompletedTaskAttempt.java
CompletedTaskAttempt(TaskId taskId,TaskAttemptInfo attemptInfo) {
this.attemptInfo = attemptInfo;
this.attemptId = TypeConverter.toYarn(attemptInfo.getAttemptId());
if (attemptInfo.getTaskStatus() != null) {
this.state = TaskAttemptState.valueOf(attemptInfo.getTaskStatus());
} else {
this.state = TaskAttemptState.KILLED;
localDiagMessage = "Attmpt state missing from History : marked as KILLED";
diagnostics.add(localDiagMessage);
}
if (attemptInfo.getError() != null) {
diagnostics.add(attemptInfo.getError());
}
}
/**
* Absorbs one TaskAttemptStatus
*
* @param reportedStatus the status report that we got from a task attempt
* that we want to fold into the speculation data for this job
* @param timestamp the time this status corresponds to. This matters
* because statuses contain progress.
*/
protected void statusUpdate(TaskAttemptStatus reportedStatus,long timestamp) {
String stateString = reportedStatus.taskState.toString();
TaskAttemptId attemptID = reportedStatus.id;
TaskId taskID = attemptID.getTaskId();
Job job = context.getJob(taskID.getJobId());
if (job == null) {
return;
}
Task task = job.getTask(taskID);
if (task == null) {
return;
}
estimator.updateAttempt(reportedStatus,timestamp);
if (stateString.equals(TaskAttemptState.RUNNING.name())) {
runningTasks.putIfAbsent(taskID,Boolean.TRUE);
} else {
runningTasks.remove(taskID,Boolean.TRUE);
if (!stateString.equals(TaskAttemptState.STARTING.name())) {
runningTaskAttemptStatistics.remove(attemptID);
}
}
}
项目:hadoop
文件:HsTaskPage.java
/**
* @return The end of the JS map that is the jquery datatable config for the
* attempts table.
*/
private String attemptsTableInit() {
TaskType type = null;
String symbol = $(TASK_TYPE);
if (!symbol.isEmpty()) {
type = MRApps.taskType(symbol);
} else {
TaskId taskID = MRApps.toTaskID($(TASK_ID));
type = taskID.getTaskType();
}
StringBuilder b = tableInit()
.append(",'aaData': attemptsTableData")
.append(",bDeferRender: true")
.append(",bProcessing: true")
.append("\n,aoColumnDefs:[\n")
//logs column should not filterable (it includes container ID which may pollute searches)
.append("\n{'aTargets': [ 4 ]")
.append(",'bSearchable': false }")
.append("\n,{'sType':'numeric','aTargets': [ 0 ]")
.append(",'mRender': parseHadoopAttemptID }")
.append("\n,'aTargets': [ 5,6")
//Column numbers are different for maps and reduces
.append(type == TaskType.REDUCE ? ",7,8" : "")
.append(" ],'mRender': renderHadoopDate }")
.append("\n,'aTargets': [")
.append(type == TaskType.REDUCE ? "9,10,11,12" : "7")
.append(" ],'mRender': renderHadoopelapsedtime }]")
// Sort by id upon page load
.append("\n,aaSorting: [[0,'asc']]")
.append("}");
return b.toString();
}
boolean canSpeculate(AppContext context,TaskId taskID) {
// This class rejects speculating any task that already has speculations,// or isn't running.
// Subclasses should call TaskSpeculationPredicate.canSpeculate(...),but
// can be even more restrictive.
JobId jobID = taskID.getJobId();
Job job = context.getJob(jobID);
Task task = job.getTask(taskID);
return task.getAttempts().size() == 1;
}
项目:hadoop
文件:TaskReportPBImpl.java
@Override
public void setTaskId(TaskId taskId) {
maybeInitBuilder();
if (taskId == null)
builder.clearTaskId();
this.taskId = taskId;
}
项目:hadoop
文件:GetTaskReportRequestPBImpl.java
@Override
public TaskId getTaskId() {
GetTaskReportRequestProtoOrBuilder p = viaProto ? proto : builder;
if (this.taskId != null) {
return this.taskId;
}
if (!p.hasTaskId()) {
return null;
}
this.taskId = convertFromProtoFormat(p.getTaskId());
return this.taskId;
}
项目:hadoop
文件:StartEndTimesBase.java
@Override
public long thresholdRuntime(TaskId taskID) {
JobId jobID = taskID.getJobId();
Job job = context.getJob(jobID);
TaskType type = taskID.getTaskType();
DataStatistics statistics
= dataStatisticsForTask(taskID);
int completedTasksOfType
= type == TaskType.MAP
? job.getCompletedMaps() : job.getCompletedReduces();
int totalTasksOfType
= type == TaskType.MAP
? job.getTotalMaps() : job.getTotalReduces();
if (completedTasksOfType < MINIMUM_COMPLETE_NUMBER_TO_SPEculaTE
|| (((float)completedTasksOfType) / totalTasksOfType)
< MINIMUM_COMPLETE_PROPORTION_TO_SPEculaTE ) {
return Long.MAX_VALUE;
}
long result = statistics == null
? Long.MAX_VALUE
: (long)statistics.outlier(slowTaskRelativeTresholds.get(job));
return result;
}
项目:hadoop
文件:StartEndTimesBase.java
@Override
public long estimatednewAttemptRuntime(TaskId id) {
DataStatistics statistics = dataStatisticsForTask(id);
if (statistics == null) {
return -1L;
}
return (long)statistics.mean();
}
项目:hadoop
文件:AppController.java
/**
* Ensure that a TASK_ID was passed into the page.
*/
public void requireTask() {
if ($(TASK_ID).isEmpty()) {
badRequest("missing task ID");
throw new RuntimeException("missing task ID");
}
TaskId taskID = MRApps.toTaskID($(TASK_ID));
Job job = app.context.getJob(taskID.getJobId());
app.setJob(job);
if (app.getJob() == null) {
notFound(MRApps.toString(taskID.getJobId()));
throw new RuntimeException("Not Found: " + $(JOB_ID));
} else {
app.setTask(app.getJob().getTask(taskID));
if (app.getTask() == null) {
notFound($(TASK_ID));
throw new RuntimeException("Not Found: " + $(TASK_ID));
}
}
if (!checkAccess(job)) {
accessDenied("User " + request().getRemoteUser() + " does not have " +
" permission to view job " + $(JOB_ID));
throw new RuntimeException("Access denied: User " +
request().getRemoteUser() + " does not have permission to view job " +
$(JOB_ID));
}
}
项目:hadoop
文件:TaskReportPBImpl.java
@Override
public TaskId getTaskId() {
TaskReportProtoOrBuilder p = viaProto ? proto : builder;
if (this.taskId != null) {
return this.taskId;
}
if (!p.hasTaskId()) {
return null;
}
this.taskId = convertFromProtoFormat(p.getTaskId());
return this.taskId;
}
项目:hadoop
文件:TaskRecoverEvent.java
public TaskRecoverEvent(TaskId taskID,TaskInfo taskInfo,OutputCommitter committer,boolean recoverTaskOutput) {
super(taskID,TaskEventType.T_RECOVER);
this.taskInfo = taskInfo;
this.committer = committer;
this.recoverTaskOutput = recoverTaskOutput;
}
项目:hadoop
文件:TaskAttemptImpl.java
private static JobCounterUpdateEvent createJobCounterUpdateEventTASucceeded(
TaskAttemptImpl taskAttempt) {
TaskId taskId = taskAttempt.attemptId.getTaskId();
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskId.getJobId());
updateMillisCounters(jce,taskAttempt);
return jce;
}
项目:hadoop
文件:JobImpl.java
@Override
public Task getTask(TaskId taskID) {
readLock.lock();
try {
return tasks.get(taskID);
} finally {
readLock.unlock();
}
}
项目:hadoop
文件:TestFail.java
@Test
public void testTaskFailWithUnusedContainer() throws Exception {
MRApp app = new MRAppWithFailingTaskAndUnusedContainer();
Configuration conf = new Configuration();
int maxAttempts = 1;
conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS,JobState.RUNNING);
Map<TaskId,tasks.size());
Task task = tasks.values().iterator().next();
app.waitForState(task,TaskState.SCHEDULED);
Map<TaskAttemptId,TaskAttempt> attempts = tasks.values().iterator()
.next().getAttempts();
Assert.assertEquals("Num attempts is not correct",attempts
.size());
TaskAttempt attempt = attempts.values().iterator().next();
app.waitForInternalState((TaskAttemptImpl) attempt,TaskAttemptStateInternal.ASSIGNED);
app.getdispatcher().getEventHandler().handle(
new TaskAttemptEvent(attempt.getID(),TaskAttemptEventType.TA_CONTAINER_COMPLETED));
app.waitForState(job,JobState.Failed);
}
项目:hadoop
文件:JobImpl.java
protected void scheduleTasks(Set<TaskId> taskIDs,boolean recoverTaskOutput) {
for (TaskId taskID : taskIDs) {
TaskInfo taskInfo = completedTasksFromPrevIoUsRun.remove(taskID);
if (taskInfo != null) {
eventHandler.handle(new TaskRecoverEvent(taskID,taskInfo,committer,recoverTaskOutput));
} else {
eventHandler.handle(new TaskEvent(taskID,TaskEventType.T_SCHEDULE));
}
}
}
项目:hadoop
文件:JobImpl.java
@Override
public void transition(JobImpl job,JobEvent event) {
//get number of shuffling reduces
int shufflingReduceTasks = 0;
for (TaskId taskId : job.reduceTasks) {
Task task = job.tasks.get(taskId);
if (TaskState.RUNNING.equals(task.getState())) {
for(TaskAttempt attempt : task.getAttempts().values()) {
if(attempt.getPhase() == Phase.SHUFFLE) {
shufflingReduceTasks++;
break;
}
}
}
}
JobTaskAttemptFetchFailureEvent fetchfailureEvent =
(JobTaskAttemptFetchFailureEvent) event;
for (org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId mapId :
fetchfailureEvent.getMaps()) {
Integer fetchFailures = job.fetchFailuresMapping.get(mapId);
fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1);
job.fetchFailuresMapping.put(mapId,fetchFailures);
float failureRate = shufflingReduceTasks == 0 ? 1.0f :
(float) fetchFailures / shufflingReduceTasks;
// declare faulty if fetch-failures >= max-allowed-failures
if (fetchFailures >= job.getMaxFetchFailuresNotifications()
&& failureRate >= job.getMaxAllowedFetchFailuresFraction()) {
LOG.info("Too many fetch-failures for output of task attempt: " +
mapId + " ... raising fetch failure to map");
job.eventHandler.handle(new TaskAttemptEvent(mapId,TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
job.fetchFailuresMapping.remove(mapId);
}
}
}
项目:hadoop
文件:TestRMContainerAllocator.java
private ContainerFailedEvent createFailEvent(JobId jobId,int taskAttemptId,String host,boolean reduce) {
TaskId taskId;
if (reduce) {
taskId = MRBuilderUtils.newTaskId(jobId,TaskType.REDUCE);
} else {
taskId = MRBuilderUtils.newTaskId(jobId,TaskType.MAP);
}
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId,taskAttemptId);
return new ContainerFailedEvent(attemptId,host);
}
项目:hadoop
文件:HistoryClientService.java
@Override
public GetTaskReportResponse getTaskReport(GetTaskReportRequest request)
throws IOException {
TaskId taskId = request.getTaskId();
Job job = verifyAndGetJob(taskId.getJobId(),true);
GetTaskReportResponse response = recordFactory.newRecordInstance(GetTaskReportResponse.class);
response.setTaskReport(job.getTask(taskId).getReport());
return response;
}
项目:hadoop
文件:TestRuntimeEstimators.java
@Override
public void handle(TaskEvent event) {
TaskId taskID = event.getTaskID();
Task task = myJob.getTask(taskID);
Assert.assertEquals
("Wrong type event",TaskEventType.T_ADD_SPEC_ATTEMPT,event.getType());
System.out.println("SpeculationRequestEventHandler.handle adds a speculation task for " + taskID);
addAttempt(task);
}
项目:hadoop
文件:GetTaskReportRequestPBImpl.java
@Override
public void setTaskId(TaskId taskId) {
maybeInitBuilder();
if (taskId == null)
builder.clearTaskId();
this.taskId = taskId;
}
项目:hadoop
文件:TestBlocks.java
/**
* test HsTasksBlock's rendering.
*/
@Test
public void testHsTasksBlock() {
Task task = getTask(0);
Map<TaskId,Task> tasks = new HashMap<TaskId,Task>();
tasks.put(task.getID(),task);
AppContext ctx = mock(AppContext.class);
AppForTest app = new AppForTest(ctx);
Job job = mock(Job.class);
when(job.getTasks()).thenReturn(tasks);
app.setJob(job);
HsTasksBlockForTest block = new HsTasksBlockForTest(app);
block.addParameter(AMParams.TASK_TYPE,"r");
PrintWriter pWriter = new PrintWriter(data);
Block html = new BlockForTest(new HtmlBlockFortest(),pWriter,false);
block.render(html);
pWriter.flush();
// should be printed information about task
assertTrue(data.toString().contains("task_0_0001_r_000000"));
assertTrue(data.toString().contains("SUCCEEDED"));
assertTrue(data.toString().contains("100001"));
assertTrue(data.toString().contains("100011"));
assertTrue(data.toString().contains(""));
}
项目:hadoop
文件:TaskAttemptIdPBImpl.java
@Override
public synchronized void setTaskId(TaskId taskId) {
maybeInitBuilder();
if (taskId == null)
builder.clearTaskId();
this.taskId = taskId;
}
项目:hadoop
文件:TestRMContainerAllocator.java
private ContainerAllocatorEvent createDeallocateEvent(JobId jobId,TaskType.MAP);
}
TaskAttemptId attemptId =
MRBuilderUtils.newTaskAttemptId(taskId,taskAttemptId);
return new ContainerAllocatorEvent(attemptId,ContainerAllocator.EventType.CONTAINER_DEALLOCATE);
}
项目:hadoop
文件:TestMRApps.java
@Test (timeout = 120000)
public void testTaskAttemptIDtoString() {
TaskAttemptId taid = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskAttemptId.class);
taid.setTaskId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskId.class));
taid.getTaskId().setTaskType(TaskType.MAP);
taid.getTaskId().setJobId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobId.class));
taid.getTaskId().getJobId().setAppId(ApplicationId.newInstance(0,0));
assertEquals("attempt_0_0000_m_000000_0",MRApps.toString(taid));
}
项目:hadoop
文件:TestKill.java
@Test
public void testKillJob() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
MRApp app = new BlockingMRApp(1,latch);
//this will start the job but job won't complete as task is
//blocked
Job job = app.submit(new Configuration());
//wait and vailidate for Job to become RUNNING
app.waitForState(job,JobState.RUNNING);
//send the kill signal to Job
app.getContext().getEventHandler().handle(
new JobEvent(job.getID(),JobEventType.JOB_KILL));
//unblock Task
latch.countDown();
//wait and validate for Job to be KILLED
app.waitForState(job,JobState.KILLED);
Map<TaskId,Task> tasks = job.getTasks();
Assert.assertEquals("No of tasks is not correct",TaskState.KILLED,TaskAttempt> attempts =
tasks.values().iterator().next().getAttempts();
Assert.assertEquals("No of attempts is not correct",attempts.size());
Iterator<TaskAttempt> it = attempts.values().iterator();
Assert.assertEquals("Attempt state not correct",TaskAttemptState.KILLED,it.next().getReport().getTaskAttemptState());
}
项目:hadoop
文件:MockJobs.java
public static Map<TaskAttemptId,TaskAttempt> newTaskAttempts(TaskId tid,int m) {
Map<TaskAttemptId,TaskAttempt> map = Maps.newHashMap();
for (int i = 0; i < m; ++i) {
TaskAttempt ta = newTaskAttempt(tid,i);
map.put(ta.getID(),ta);
}
return map;
}
项目:hadoop
文件:TestMRApps.java
@Test (timeout = 120000)
public void testToTaskID() {
TaskId tid = MRApps.toTaskID("task_1_2_r_3");
assertEquals(1,tid.getJobId().getAppId().getClusterTimestamp());
assertEquals(2,tid.getJobId().getAppId().getId());
assertEquals(2,tid.getJobId().getId());
assertEquals(TaskType.REDUCE,tid.getTaskType());
assertEquals(3,tid.getId());
tid = MRApps.toTaskID("task_1_2_m_3");
assertEquals(TaskType.MAP,tid.getTaskType());
}
项目:hadoop
文件:TestContainerLauncherImpl.java
public static TaskAttemptId makeTaskAttemptId(long ts,int appId,int taskId,TaskType taskType,int id) {
ApplicationId aID = ApplicationId.newInstance(ts,appId);
JobId jID = MRBuilderUtils.newJobId(aID,id);
TaskId tID = MRBuilderUtils.newTaskId(jID,taskId,taskType);
return MRBuilderUtils.newTaskAttemptId(tID,id);
}
项目:hadoop
文件:TaskAttemptIdPBImpl.java
@Override
public synchronized TaskId getTaskId() {
TaskAttemptIdProtoOrBuilder p = viaProto ? proto : builder;
if (this.taskId != null) {
return this.taskId;
}
if (!p.hasTaskId()) {
return null;
}
taskId = convertFromProtoFormat(p.getTaskId());
return taskId;
}
项目:hadoop
文件:TestAMWebApp.java
public static Map<String,String> getTaskParams(AppContext appContext) {
JobId jobId = appContext.getAllJobs().entrySet().iterator().next().getKey();
Entry<TaskId,Task> e = appContext.getJob(jobId).getTasks().entrySet().iterator().next();
e.getValue().getType();
Map<String,String> params = new HashMap<String,String>();
params.put(AMParams.JOB_ID,MRApps.toString(jobId));
params.put(AMParams.TASK_ID,MRApps.toString(e.getKey()));
params.put(AMParams.TASK_TYPE,MRApps.taskSymbol(e.getValue().getType()));
return params;
}
项目:hadoop
文件:TestAMWebApp.java
@Test public void testSingleCounterView() {
AppContext appContext = new MockAppContext(0,1);
Job job = appContext.getAllJobs().values().iterator().next();
// add a Failed task to the job without any counters
Task FailedTask = MockJobs.newTask(job.getID(),2,true);
Map<TaskId,Task> tasks = job.getTasks();
tasks.put(FailedTask.getID(),FailedTask);
Map<String,String> params = getJobParams(appContext);
params.put(AMParams.COUNTER_GROUP,"org.apache.hadoop.mapreduce.FileSystemCounter");
params.put(AMParams.COUNTER_NAME,"HDFS_WRITE_OPS");
WebAppTests.testPage(SingleCounterPage.class,AppContext.class,appContext,params);
}
项目:hadoop
文件:MRBuilderUtils.java
public static TaskAttemptId newTaskAttemptId(TaskId taskId,int attemptId) {
TaskAttemptId taskAttemptId =
Records.newRecord(TaskAttemptId.class);
taskAttemptId.setTaskId(taskId);
taskAttemptId.setId(attemptId);
return taskAttemptId;
}
项目:hadoop
文件:TestAppController.java
@Before
public void setUp() throws IOException {
AppContext context = mock(AppContext.class);
when(context.getApplicationID()).thenReturn(
ApplicationId.newInstance(0,0));
when(context.getApplicationName()).thenReturn("AppName");
when(context.getUser()).thenReturn("User");
when(context.getStartTime()).thenReturn(System.currentTimeMillis());
job = mock(Job.class);
Task task = mock(Task.class);
when(job.getTask(any(TaskId.class))).thenReturn(task);
JobId jobID = MRApps.toJobID("job_01_01");
when(context.getJob(jobID)).thenReturn(job);
when(job.checkAccess(any(UserGroupinformation.class),any(JobACL.class)))
.thenReturn(true);
App app = new App(context);
Configuration configuration = new Configuration();
ctx = mock(RequestContext.class);
appController = new AppControllerForTest(app,configuration,ctx);
appController.getproperty().put(AMParams.JOB_ID,"job_01_01");
appController.getproperty().put(AMParams.TASK_ID,"task_01_01_m01_01");
}
项目:hadoop
文件:TestTaskAttempt.java
private TaskAttemptImpl createMapTaskAttemptImplForTest(
EventHandler eventHandler,TaskSplitMetaInfo taskSplitMetaInfo,Clock clock) {
ApplicationId appId = ApplicationId.newInstance(1,1);
JobId jobId = MRBuilderUtils.newJobId(appId,1);
TaskId taskId = MRBuilderUtils.newTaskId(jobId,TaskType.MAP);
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
Path jobFile = mock(Path.class);
JobConf jobConf = new JobConf();
TaskAttemptImpl taimpl =
new MapTaskAttemptImpl(taskId,eventHandler,jobFile,taskSplitMetaInfo,jobConf,taListener,null,clock,null);
return taimpl;
}