来自容器程序的纱线应用程序失败

问题描述

我写了一个纱线客户端,它从 YARN 请求一些容器

  • 一个应用程序主机容器,我在其上运行应用程序主机
  • 应用程序主从另一个容器请求运行我的应用程序

问题是即使我的应用程序在第二个容器上失败后,应用程序仍被标记为成功。

如果我的应用程序在容器上失败,任何人都可以帮我找出一种方法来将我的应用程序标记为失败

客户端.java

FirebaseFirestore.instance
      .collection('messages')
      .where('encSenderUId',whereIn: [widget.encUId,_loggedInUserId])
      .where('encReceiverUId',isEqualTo: _loggedInUserId)
      .where('encReceiverUId',isEqualTo: widget.encUId)
      .orderBy('sentOn',descending: false)
      .startAt([_startAtTimestamp]).snapshots();

ApplicationMaster.java

    import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;


public class Client {

    Configuration conf = new YarnConfiguration();

    public void run(String[] args) throws Exception {
//        final String command = args[0];
        final Path jarPath = new Path(args[0]);

        // Create yarnClient
        YarnConfiguration conf = new YarnConfiguration();
        YarnClient yarnClient = YarnClient.createYarnClient();
        yarnClient.init(conf);
        yarnClient.start();

        // Create application via yarnClient
        YarnClientApplication app = yarnClient.createApplication();
        ApplicationSubmissionContext appContext =
                app.getApplicationSubmissionContext();
        ApplicationId appId = appContext.getApplicationId();

        // Set up the container launch context for the application master
        ContainerLaunchContext amContainer =
                Records.newRecord(ContainerLaunchContext.class);
        amContainer.setCommands(
                Collections.singletonList(
                        "$JAVA_HOME/bin/java" +
                                " -Xmx256M" +
                                " com.paytm.lending.datamart.yarn.v2.ApplicationMaster" +
                                " " + appId.toString() +
                                " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
                                " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
                )
        );

        // Setup jar for ApplicationMaster
        LocalResource appMasterJar = Records.newRecord(LocalResource.class);
        setupAppMasterJar(jarPath,appMasterJar);
        amContainer.setLocalResources(
                Collections.singletonMap("simpleapp.jar",appMasterJar));

        // Setup CLAsspATH for ApplicationMaster
        Map<String,String> appMasterEnv = new HashMap<String,String>();
        setupAppMasterEnv(appMasterEnv,jarPath.toUri().getRawPath());
        amContainer.setEnvironment(appMasterEnv);

        // Set up resource type requirements for ApplicationMaster
        Resource capability = Records.newRecord(Resource.class);
        capability.setMemorySize(256);
        capability.setVirtualCores(1);

        // Finally,set-up ApplicationSubmissionContext for the application
        appContext.setApplicationName("simple-yarn-app"); // application name
        appContext.setAMContainerSpec(amContainer);
        appContext.setResource(capability);
        appContext.setQueue("default"); // queue

        // Submit application
        System.out.println("Submitting application " + appId);
        yarnClient.submitApplication(appContext);

        ApplicationReport appReport = yarnClient.getApplicationReport(appId);
        YarnApplicationState appState = appReport.getYarnApplicationState();
        while (appState != YarnApplicationState.FINISHED &&
                appState != YarnApplicationState.KILLED &&
                appState != YarnApplicationState.Failed) {
            Thread.sleep(100);
            appReport = yarnClient.getApplicationReport(appId);
            appState = appReport.getYarnApplicationState();
        }

        System.out.println(
                "Application " + appId + " finished with" +
                        " state " + appState +
                        " at " + appReport.getFinishTime());

    }

    private void setupAppMasterJar(Path jarPath,LocalResource appMasterJar) throws IOException {
        FileStatus jarStat = FileSystem.get(conf).getFileStatus(jarPath);
        appMasterJar.setResource(URL.fromPath(jarPath));
        appMasterJar.setSize(jarStat.getLen());
        appMasterJar.setTimestamp(jarStat.getModificationTime());
        appMasterJar.setType(LocalResourceType.FILE);
        appMasterJar.setVisibility(LocalResourceVisibility.PUBLIC);
    }

    private void setupAppMasterEnv(Map<String,String> appMasterEnv,String jarPath) {
        for (String c : conf.getStrings(
                YarnConfiguration.YARN_APPLICATION_CLAsspATH,YarnConfiguration.DEFAULT_YARN_APPLICATION_CLAsspATH)) {
            Apps.addToEnvironment(appMasterEnv,Environment.CLAsspATH.name(),c.trim(),File.pathSeparator);
        }
        Apps.addToEnvironment(appMasterEnv,Environment.PWD.$() + File.separator + "*",File.pathSeparator);
        Apps.addToEnvironment(appMasterEnv,"JAR_PATH",jarPath,File.pathSeparator);
    }

    public static void main(String[] args) throws Exception {
        Client c = new Client();
        c.run(args);
    }
}

DummyApplication.java

import java.io.File;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.Records;

public class ApplicationMaster {

    public static void main(String[] args) throws Exception {

        final String appId = args[0];

        // Initialize clients to ResourceManager and NodeManagers
        Configuration conf = new YarnConfiguration();

        AMRMClient<ContainerRequest> rmClient = AMRMClient.createAMRMClient();
        rmClient.init(conf);
        rmClient.start();

        NMClient nmClient = NMClient.createNMClient();
        nmClient.init(conf);
        nmClient.start();

        // Register with ResourceManager
        System.out.println("registerapplicationMaster 0");
        rmClient.registerapplicationMaster("","");
        System.out.println("registerapplicationMaster 1");

        // Priority for worker containers - priorities are intra-application
        Priority priority = Records.newRecord(Priority.class);
        priority.setPriority(0);

        // Resource requirements for worker containers
        Resource capability = Records.newRecord(Resource.class);
        capability.setMemorySize(128);
        capability.setVirtualCores(1);

        // Make container requests to ResourceManager
        for (int i = 0; i < 2; ++i) {
            ContainerRequest containerAsk = new ContainerRequest(capability,null,priority);
            System.out.println("Making resource request " + i);
            rmClient.addContainerRequest(containerAsk);
        }

        // Obtain allocated containers,launch and check for responses
        int responseId = 0;
        int completedContainers = 0;
        while (completedContainers < 2) {
            AllocateResponse response = rmClient.allocate(responseId++);
            System.out.println("response.getAllocatedContainers().size() "+response.getAllocatedContainers().size());
            for (Container container : response.getAllocatedContainers()) {
                List<String> commands = new ArrayList<String>(Arrays.asList("$JAVA_HOME/bin/java " +
                                " -Xmx256m " +
                                " com.paytm.lending.datamart.yarn.DummyApplication " +1 +" "+
                                " 1> " + "/var/log/hadoop-yarn/containers/"+appId+"/"+container.getId()+"/"+ApplicationConstants.STDOUT +
                                " 2> " + "/var/log/hadoop-yarn/containers/"+appId+"/"+container.getId()+"/"+ApplicationConstants.STDERR,"exit 100"));
//                List<String> commands = Collections.singletonList(
//                        "$JAVA_HOME/bin/java " +
//                                " -Xmx256m " +
//                                " com.paytm.lending.datamart.yarn.DummyApplication " +1 +" "+
//                                " 1> " + "/var/log/hadoop-yarn/containers/"+appId+"/"+container.getId()+"/"+ApplicationConstants.STDOUT +
//                                " 2> " + "/var/log/hadoop-yarn/containers/"+appId+"/"+container.getId()+"/"+ApplicationConstants.STDERR
//                );
                // Launch container by create ContainerLaunchContext
                ContainerLaunchContext ctx =
                        Records.newRecord(ContainerLaunchContext.class);

                Map<String,LocalResource> test = new HashMap<>();
                LocalResource test1 = Records.newRecord(LocalResource.class);
                FileStatus status = FileSystem.get(conf).getFileStatus(new Path(System.getenv("JAR_PATH")));
                test1.setResource(URL.fromPath(status.getPath()));
                test1.setSize(status.getLen());
                test1.setTimestamp(status.getModificationTime());
                test1.setType(LocalResourceType.FILE);
                test1.setVisibility(LocalResourceVisibility.PUBLIC);

                Map<String,String> env = new HashMap<String,String>();
                for (String c : conf.getStrings(
                        YarnConfiguration.YARN_APPLICATION_CLAsspATH,YarnConfiguration.DEFAULT_YARN_APPLICATION_CLAsspATH)) {
                    Apps.addToEnvironment(env,ApplicationConstants.Environment.CLAsspATH.name(),File.pathSeparator);
                }
                Apps.addToEnvironment(env,ApplicationConstants.Environment.PWD.$() + File.separator + "*",File.pathSeparator);

                test.put("simpleapp.jar",test1);

                ctx.setLocalResources(test);
                ctx.setEnvironment(env);

                ctx.setCommands(commands);
                System.out.println("New Launching container " + container + " with command "+commands);
                nmClient.startContainer(container,ctx);
            }
            for (ContainerStatus status : response.getCompletedContaineRSStatuses()) {
                ++completedContainers;
                System.out.println("Completed container " + status.getContainerId());
                System.out.println("Container state "+ status.getState());
            }
            Thread.sleep(100);
        }

        // Un-register with ResourceManager
        rmClient.unregisterapplicationMaster(
                FinalApplicationStatus.SUCCEEDED,"","");
    }
}

如果在 DummyApplication.java 类中发生异常,我需要一种可以使我的应用程序失败的方法

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)