Python argparse 模块-Namespace() 实例源码

def _parse_args(args):
    parser = argparse.ArgumentParser()

    if any([arg == '--version' for arg in args]):
        return argparse.Namespace(version=True)

    parser.add_argument('script', help='Script to run')
    parser.add_argument('target', nargs='?', default='build', help='Target object to build; defaults to \'build\'')

    parser.add_argument('--version', action='store_true', help='Print version info and exit')
    parser.add_argument('--clear', help='Clear output directory')
    parser.add_argument('--clear-cache', help='Clear cache before compiling')

    parser.add_argument('--threads', '-t', type=int, help='Set thread count; defaults to cores*2')
    parser.add_argument('--no-threading', '-nt', help='disable multithreaded compiling')

    # Todo: Make target '*' instead of '?' so multiple targets Could be ran from the same command

    return parser.parse_args(args)
def parse(self, argv):
        Parse arguments.

        :param argv: arguments.
        values = dict() = None
        self._errors = list()

        for idx, param in enumerate(self.params):
                values[param['dest'] or param['name']] = self.parse_parameter(param, argv, idx)
            except ParamException as e:
                self._errors.append(str(e)) = Namespace(**values)
def list_action_remove(self, player, values, map_dictionary, view, **kwargs):
        # Check permission.
        if not await self.instance.permission_manager.has_permission(player, 'admin:remove_map'):
                '$f00You don\'t have the permission to perform this action!',

        # Ask for confirmation.
        cancel = bool(await ask_confirmation(player, 'Are you sure you want to remove the map \'{}\'$z$s from the server?'.format(
        ), size='sm'))
        if cancel is True:

        # Simulate command.
        await self.remove_map(player, Namespace(nr=map_dictionary['id']))

        # Reload parent view.
        await view.refresh(player)
def run_with_args(args, parser):
    # type: (argparse.Namespace,argparse.ArgumentParser) -> int
    set_logging_parameters(args, parser)
    start_time = time.time()
    ret = OK
        if args.profile:
            profile("ret = whatstyle(args,parser)", locals(), globals())
            ret = whatstyle(args, parser)
    except IOError as exc:
        # If the output is piped into a pager like 'less' we get a broken pipe when
        # the pager is quit early and that is ok.
        if exc.errno == errno.EPIPE:
        elif str(exc) == 'Stream closed':
        if not PY2:
    iprint(INFO_TIME, 'Run time: %s seconds' % (time.time() - start_time))
    return ret
def test_start_object(self):
        server = PJFServer(configuration=PJFConfiguration(Namespace(ports={"servers": {"HTTP_PORT": 8080, "HTTPS_PORT": 8443}},
                                                   html=False, level=6, command=["radamsa"], stdin=True,
                                                   json={"a": "test"}, indent=True, strong_fuzz=False, url_encode=False,
                                                   parameters=[], notify=False, debug=False, content_type="text/plain",
                                                                    utf8=False, nologo=True)))
        json_http = urllib2.urlopen("").read()
            import requests
            json_https = requests.get('', verify=False).content
        except ImportError:
def cli_args():
    """Parse the command line arguments.

    :return: The parsed arguments.
    :rtype: argparse.Namespace
    parser = argparse.ArgumentParser()
    parser.add_argument('-c', '--check',
                        help="check if the system is vulnerable to WCry")
    parser.add_argument('-m', '--mitigate',
                        help="mitigate the system's vulnerability by disabling the"
                             " SMBv1 protocol,if necessary; implies --check")
    parser.add_argument('-f', '--fix', action='store_true')
                        help="Optionally specify a directory where the Microsoft"
                             " KB update is saved when using --fix")
    if len(sys.argv) == 1:
    # else:
    return parser.parse_args()
def register_command(handler: Callable[[argparse.Namespace], None],
                     main_parser: Optional[ArgParserType]=None,
                    ) -> Callable[[argparse.Namespace], None]:
    if main_parser is None:
        main_parser = global_argparser
    if id(main_parser) not in _subparsers:
        subparsers = main_parser.add_subparsers(title='commands',
        _subparsers[id(main_parser)] = subparsers
        subparsers = _subparsers[id(main_parser)]

    def wrapped(args):

    doc_summary = handler.__doc__.split('\n\n')[0]
    inner_parser = subparsers.add_parser(handler.__name__.replace('_', '-'),
    wrapped.register_command = functools.partial(register_command,
    wrapped.add_argument = inner_parser.add_argument
    return wrapped
def __call__(
            parser,  # type: argparse.ArgumentParser
            namespace,  # type: argparse.Namespace
            values,  # type: Union[ARGPARSE_TEXT,Sequence[Any],None]
            option_string=None  # type: Optional[ARGPARSE_TEXT]
        # type: (...) -> None
        """Checks to make sure that the destination is empty before writing.

        :raises parser.error: if destination is already set
        if getattr(namespace, self.dest) is not None:  # type: ignore # typeshed doesn't kNow about Action.dest yet?
            parser.error('{} argument may not be specified more than once'.format(option_string))
        setattr(namespace, self.dest, values)  # type: ignore # typeshed doesn't kNow about Action.dest yet?
def start(self):
        import StaticUPnP_Settings
        permissions = Namespace(**StaticUPnP_Settings.permissions)
        if permissions.drop_permissions:

        self.running = Value(ctypes.c_int, 1)
        self.queue = Queue()
        self.reciever_thread = Process(target=self.socket_handler, args=(self.queue, self.running))
        self.schedule_thread = Process(target=self.schedule_handler, args=(self.running,))
        self.response_thread = Process(target=self.response_handler, self.running))
def get_interface_addresses(logger):
    import StaticUPnP_Settings
    interface_config = Namespace(**StaticUPnP_Settings.interfaces)
    ip_addresses = StaticUPnP_Settings.ip_addresses
    if len(ip_addresses) == 0:
        import netifaces
        ifs = netifaces.interfaces()
        if len(interface_config.include) > 0:
            ifs = interface_config.include
        if len(interface_config.exclude) > 0:
            for iface in interface_config.exclude:

        for i in ifs:
            addrs = netifaces.ifaddresses(i)
            if netifaces.AF_INET in addrs:
                for addr in addrs[netifaces.AF_INET]:
          "Regestering multicast on %s: %s"%(i, addr['addr']))
    return ip_addresses
def parse_args() -> argparse.Namespace:  # pragma: no cover
    Parses the Command Line Arguments using argparse

    :return: The parsed arguments

    parser = argparse.ArgumentParser()
    parser.add_argument("connection", help="The Type of Connection to use")

    parser.add_argument("-v", "--verbose", action="store_true",
                        help="Activates verbose output")
    parser.add_argument("-d", "--debug",
                        help="Activates debug-level logging output")
    parser.add_argument("-q", "--quiet",
                        help="disables all text output")
    parser.add_argument("-c", "--config",
                        help="Overrides the configuration directory location")

    return parser.parse_args()
def test_init_2(tmpdir):
    "it should open sam file if provided"

    make_bam(tmpdir.strpath, """
        r1 + ...........
        r1 -       ......*....
        r2 +    .........*.
        r2 -        .....*.......

    o = Namespace(query="test.vcf", cfdna=tmpdir.join("test.bam").strpath, gdna=None, output=None)


    assert isinstance(o.cfdna, AlignmentFile)
    assert o.gdna == None
项目:MrBam    作者:OpenGene    | 项目源码 | 文件源码
def test_init_3(tmpdir):
    "it should generate a proper output file name if not provided"

    make_bam(tmpdir.strpath, output=None)


    assert o.output != None
项目:MrBam    作者:OpenGene    | 项目源码 | 文件源码
def test_aggregate_reads_3():
    "it should ignore when 3+ reads share the same name"

    o = Namespace(verbos=False, qual=20, mismatch_limit=-1)

    reads = (
        ("r1", 'A', 60, 2, 11, -1, 9,  False, True),
        ("r1", 'C', -9, True,  True),
        ("r2", 0,  True,  False)

    unique_pairs, unique_single, _, nerror, *_ = aggregate_reads(o, reads)

    assert len(unique_pairs) == 0
    assert len(unique_single) == 1
    assert nerror == 3
项目:MrBam    作者:OpenGene    | 项目源码 | 文件源码
def test_aggregate_reads_4():
    "it should ignore when base in overlap area inconsistent between two reads"

    o = Namespace(verbos=False, 4, 13, -11, 3, 12, 5, 14, True)

    unique_pairs, *_, ninconsis = aggregate_reads(o, reads)

    assert len(unique_pairs) == 1
    assert ninconsis == 2
项目:MrBam    作者:OpenGene    | 项目源码 | 文件源码
def test_aggregate_reads_5():
    "it should drop reads that has too much mismatch"

    o = Namespace(verbos=False, mismatch_limit=2)

    reads = (
        ("r1", 1,
        ("r3", 6, False),
        ("r4", 7, False)

    unique_pairs, nlowq, reads)

    assert len(unique_pairs) == 1
    assert len(unique_single) == 1
    assert nlowq == 3
项目:MrBam    作者:OpenGene    | 项目源码 | 文件源码
def test_count_different_type_1():
    "it should NOT count dna that more than 10% reads have different bases"

    o = Namespace(verbos=False, allow_inconsist=False)

    pair = {
        ('ref', True): [
            ('A', 60),
            ('T', 60)
        ('ref', False): [
            ('A', 60)

    mor, mnr, msr, oor, onr, osr, moa, mna, msa, ooa, ona, osa, inconsis = count_different_type(o, pair, {}, 'T', 'A')

    assert inconsis == 6
    assert sum((mor, osa)) == 0
项目:MrBam    作者:OpenGene    | 项目源码 | 文件源码
def test_get_reads_1(tmpdir):
    "it should get all but only the reads that covers the given position"

    make_bam(tmpdir.strpath, """
        r1 + ...........
        r1 -      ......*....
        r2 +   .........*.
        r2 -       .....*.......
        r3 +       ...........
        r3 -            ....*......
        r4 +       ...........
        r4 -            ...........

    o = Namespace(verbos=False, mismatch_limit=-1)
    sam = AlignmentFile(tmpdir.join("test.bam").strpath)

    assert sum( 1 for _ in get_reads(o, sam, 'ref', '4') )  == 2
    assert sum( 1 for _ in get_reads(o, '12') ) == 7
    assert sum( 1 for _ in get_reads(o, '20') ) == 2
项目:MrBam    作者:OpenGene    | 项目源码 | 文件源码
def test_get_reads_2(tmpdir):
    "it should read properties correctly"

    make_bam(tmpdir.strpath, """
        r1 + ...*.......
        r1 -   .*.........

    o = Namespace(verbos=False, mismatch_limit=-1)
    sam = AlignmentFile(tmpdir.join("test.bam").strpath)

    r = next(get_reads(o, '4'))

    assert r[0] == "r1" # name
    assert r[3] == 0 # 0-based pos
    assert r[4] == 11 # length
    assert r[5] == -1 # mismatch,not caculated
    assert r[6] == 2 # mate pos
    assert r[7] == 13 # template length
    assert r[8] == False # is_reverse
    assert r[9] == True # paired and mapped
项目:MrBam    作者:OpenGene    | 项目源码 | 文件源码
def test_pad_softclip_2(tmpdir):
    "it should ignore more than two reads which share the same name"

    make_bam(tmpdir.strpath, """
        r1 + __.*.......
        r1 -   .*.......__
        r1 -   .*.......__
        r2 +   .*.......__
        r2 -   .*.......__

    o = Namespace(verbos=False, mismatch_limit=-1)
    sam = AlignmentFile(tmpdir.join("test.bam").strpath)

    adjusted_pos = pad_softclip(sam)

    assert sum(1 for startpos, length in adjusted_pos.values() if startpos != -1) == 1
项目:MrBam    作者:OpenGene    | 项目源码 | 文件源码
def test_pad_softclip_3(tmpdir):
    "it should pad softclipped bases"

    make_bam(tmpdir.strpath, """
        r1 + __.*.......
        r1 -   .*.........
        r2 - ...*.......
        r2 +   .*.......__

    o = Namespace(verbos=False, mismatch_limit=-1)
    sam = AlignmentFile(tmpdir.join("test.bam").strpath)

    adjusted_pos = pad_softclip(sam)

    assert adjusted_pos["r1"] == (0, 13) # 0-based position
    assert adjusted_pos["r2"] == (0, 13)
def get_api_id(config, args):
    Get the API ID from terraform,or from AWS if that fails.

    :param config: configuration
    :type config: :py:class:`~.Config`
    :param args: command line arguments
    :type args: :py:class:`argparse.Namespace`
    :return: API Gateway ID
    :rtype: str
        logger.debug('Trying to get terraform rest_api_id output')
        runner = terraformRunner(config, args.tf_path)
        outputs = runner._get_outputs()
        depl_id = outputs['rest_api_id']
        logger.debug("terraform rest_api_id output: '%s'", depl_id)
    except Exception:'Unable to find API rest_api_id from terraform state;'
                    ' querying AWS.', exc_info=1)
        aws = AWSInfo(config)
        depl_id = aws.get_api_id()
        logger.debug("AWS API ID: '%s'", depl_id)
    return depl_id
def _predict(predictors: Dict[str, str]):
    def predict_inner(args: argparse.Namespace) -> None:
        predictor = _get_predictor(args, predictors)
        output_file = None

        if args.silent and not args.output_file:
            print("--silent specified without --output-file.")
            print("Exiting early because no output will be created.")

        # ExitStack allows us to conditionally context-manage `output_file`,which may or may not exist
        with ExitStack() as stack:
            input_file = stack.enter_context(args.input_file)  # type: ignore
            if args.output_file:
                output_file = stack.enter_context(args.output_file)  # type: ignore

            _run(predictor, input_file, output_file, args.batch_size, not args.silent, args.cuda_device)

    return predict_inner
def get_parsed_args(self, comp_words):
        """ gets the parsed args from a patched parser """
        active_parsers = self._patch_argument_parser()

        parsed_args = argparse.Namespace()

        self.completing = True
        if USING_PYTHON2:
            # Python 2 argparse only properly works with byte strings.
            comp_words = [ensure_bytes(word) for word in comp_words]

            stderr = sys.stderr
            sys.stderr =, "w")

            active_parsers[0].parse_kNown_args(comp_words, namespace=parsed_args)

            sys.stderr = stderr
        except BaseException:

        self.completing = False
        return parsed_args
def test_with_config(self, is_file, open_):
        # Create our stand-in config file.
        config_file = textwrap.dedent(u"""\
          reporoot: ~/Code
        publish: local
        is_file.return_value = True
        open_.return_value = io.StringIO(config_file)

        # Get the config and test the result.
        flags = Namespace(user_config='/bogus/file.yaml', command='not_init')
        user_config = main.read_user_config(flags)
        assert user_config == {
            'local_paths': {'reporoot': '~/Code'},
            'publish': 'local',
项目:artman    作者:googleapis    | 项目源码 | 文件源码
def test_with_ssh_repo(self, login):
        # Set up test data to return when we attempt to make the
        # pull request.
        gh = mock.Magicmock(spec=github3.github.GitHub)
        login.return_value = gh
        url = ''
        gh.repository().create_pull.return_value = Namespace(html_url=url)

        # Run the task.
        task = github.CreateGitHubPullRequest()
        pr = task.execute(**self.task_kwargs)

        # Assert we got the correct result.
        assert pr.html_url == url

        # Assert that the correct methods were called.
        login.assert_called_once_with('lukesneeringer', '1335020400')
        gh.repository.assert_called_with('me', 'repo')
            body='This pull request was generated by artman. '
                 'Please review it thoroughly before merging.',
            title='Python GAPIC: Pubsub v1',
项目:artman    作者:googleapis    | 项目源码 | 文件源码
def test_with_http_url(self, login):
        # Set up test data to return when we attempt to make the
        # pull request.
        gh = mock.Magicmock(spec=github3.github.GitHub)
        login.return_value = gh
        url = ''
        gh.repository().create_pull.return_value = Namespace(html_url=url)

        # Run the task.
        task = github.CreateGitHubPullRequest()
        pr = task.execute(**dict(self.task_kwargs, git_repo={
            'location': 'https://github/me/repo/',

        # Assert we got the correct result.
        assert pr.html_url == url

        # Assert that the correct repository method was still called.
        gh.repository.assert_called_with('me', 'repo')
项目:parlai    作者:facebookresearch    | 项目源码 | 文件源码
def print_args(self):
        """Print out all the arguments in this parser."""
        if not self.opt:
        values = {}
        for key, value in self.opt.items():
            values[str(key)] = str(value)
        for group in self._action_groups:
            group_dict = {
                a.dest: getattr(self.args, a.dest, None)
                for a in group._group_actions
            namespace = argparse.Namespace(**group_dict)
            count = 0
            for key in namespace.__dict__:
                if key in values:
                    if count == 0:
                        print('[ ' + group.title + ': ] ')
                    count += 1
                    print('[  ' + key + ': ' + values[key] + ' ]')
项目:sequana    作者:sequana    | 项目源码 | 文件源码
def _test_standalone_sequana(qtbot, tmpdir):
    wkdir = TemporaryDirectory()
    inputdir = os.path.realpath(

    # Standalone for sequana given a wkdir and pipeline and input_dir
    args = Namespace(pipeline="quality_control",,
    widget = sequana_gui.SequanaGUI(ipython=False, user_options=args)
    assert widget.mode == "sequana"
    widget.force = True

    count = 0
    while widget.process.state() and count < 5:
项目:python-MysqL-binlog-pubsub    作者:tarzanjw    | 项目源码 | 文件源码
def _setup_arg_parser(argv):
    """ Parse arguments from command line
        argv list: by default it's command line arguments

        argparse.Namespace: parsed argument
    parser = argparse.ArgumentParser(
        description='MysqL binlog to Google Cloud Pub/Sub')
                        help='configuration file for publishing')
    parser.add_argument('--loglevel', '-l', default=None,
                        help='log level for root')

    if os.path.isfile('logging.ini'):
        _log_file = 'logging.ini'
        _log_file = None

    parser.add_argument('--logconf', default=_log_file,
                        help='INI file log configuration')
    args = parser.parse_args(argv)
    return args
项目:nojs    作者:chrisdickinson    | 项目源码 | 文件源码
def RunTestsCommand(args):
  """Checks test type and dispatches to the appropriate function.

    args: argparse.Namespace object.

    Integer indicated exit code.

    Exception: UnkNown command name passed in,or an exception from an
        individual test runner.
  command = args.command

  ProcessCommonoptions(args)'command: %s', ' '.join(sys.argv))
  if args.enable_platform_mode or command in _DEFAULT_PLATFORM_MODE_TESTS:
    return RunTestsInPlatformMode(args)

  if command == 'python':
    return _RunPythonTests(args)
    raise Exception('UnkNown test type.')
项目:clinner    作者:PeRDy    | 项目源码 | 文件源码
def __init__(self, args=None, parse_args=True):
        self.cli = CLI()
        self.args, self.unkNown_args = argparse.Namespace(), []
        if parse_args:
            self.args, self.unkNown_args = self.parse_arguments(args=args)
            self.settings = self.args.settings or os.environ.get('CLINNER_SETTINGS')

            # Inject parameters related to current stage as environment variables

            # Load settings

            # Apply quiet mode
            if self.args.quiet:
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def add_domains(args_or_config, domains):
    """Registers new domains to be used during the current client run.

    Domains are not added to the list of requested domains if they have
    already been registered.

    :param args_or_config: parsed command line arguments
    :type args_or_config: argparse.Namespace or
    :param str domain: one or more comma separated domains

    :returns: domains after they have been normalized and validated
    :rtype: `list` of `str`

    validated_domains = []
    for domain in domains.split(","):
        domain = util.enforce_domain_sanity(domain.strip())
        if domain not in

    return validated_domains
项目:linode-cli    作者:linode    | 项目源码 | 文件源码
def update(namespace, username=None):
    This updates a Namespace (as returned by ArgumentParser) with config values
    if they aren't present in the Namespace already.
    if not os.path.isfile(_get_config_path()):
        return namespace

    if not username:
        username = "DEFAULT"

    conf = _get_config()

    if not username in conf:
        print("User {} is not configured.".format(username))

    return update_namespace(namespace, conf[username])
项目:python-tempestconf    作者:redhat-openstack    | 项目源码 | 文件源码
def test_remove_values_having_hyphen(self):
        api_exts = "dvr,l3-flavors,rbac-policies,project-id"
        remove_exts = ["dvr", "project-id"]
        args = Namespace(
                "network-feature-enabled.api-extensions": remove_exts
        self.conf = self._get_conf("v2.0", "v3")
        self.conf.set("network-feature-enabled", "api-extensions", api_exts)
        conf_exts = self.conf.get("network-feature-enabled", "api-extensions")
        conf_exts = conf_exts.split(',')
        for ext in api_exts.split(','):
            if ext in remove_exts:
                self.assertFalse(ext in conf_exts)
                self.assertTrue(ext in conf_exts)
项目:AutoMergetool    作者:xgouchet    | 项目源码 | 文件源码
def parse_arguments(args: list) -> Namespace:
    Parses the arguments passed on invocation in a dict and return it
    parser = ArgumentParser(description="A tool to combine multiple merge tools")

    parser.add_argument('-b', '--base', required=True)
    parser.add_argument('-l', '--local', required=True)
    parser.add_argument('-r', '--remote', required=True)
    parser.add_argument('-m', '--merged', required=True)

    # convert to absolute path
    parsed_arg = parser.parse_args(args)

    parsed_arg.base = os.path.abspath(parsed_arg.base)
    parsed_arg.local = os.path.abspath(parsed_arg.local)
    parsed_arg.remote = os.path.abspath(parsed_arg.remote)
    parsed_arg.merged = os.path.abspath(parsed_arg.merged)
    return parsed_arg
项目:AutoMergetool    作者:xgouchet    | 项目源码 | 文件源码
def merge(config: RawConfigParser,
          args: Namespace,
          launcher: ToolsLauncher,
          analyser: ConflictedFileAnalyser) -> int:
    Handle the merge tools chain for the given argument
    config -- the current amt configuration
    args -- the arguments with the base,local,remote and merged file names
    launcher -- the launcher helper
    if not (config.has_option(SECT_AMT, OPT_TOOLS)):
        raise RuntimeError('Missing the {0}.{1} configuration'.format(SECT_AMT, OPT_TOOLS))

    tools = config.get(SECT_AMT, OPT_TOOLS).split(';')
    merge_result = ERROR_NO_TOOL

    for tool in tools:
        merge_result = merge_with_tool(tool, config, args, launcher, analyser)
        if merge_result == 0:
            return 0

    print(" [AMT] ? Sorry,it seems we can't solve it this time")

    return merge_result
项目:coquery    作者:gkunter    | 项目源码 | 文件源码
def setUp(self):
        options.cfg = argparse.Namespace()
        options.cfg.corpus = None
        options.cfg.current_server = "MockConnection"
        options.cfg.MODE = QUERY_MODE_TOKENS
        options.cfg.stopword_list = []
        options.cfg.context_mode = CONTEXT_NONE
        options.cfg.drop_on_na = True
        options.cfg.drop_duplicates = True
        options.cfg.benchmark = False
        options.cfg.verbose = False

        self.Session = Session()
        self.Session.Resource = BaseResource()

        self.manager = Manager()
项目:coquery    作者:gkunter    | 项目源码 | 文件源码
def setUp(self):
        self.maxDiff = None
        options.cfg = argparse.Namespace()
        options.cfg.number_of_tokens = 0
        options.cfg.limit_matches = False
        options.cfg.regexp = False
        options.cfg.query_case_sensitive = False
        options.get_configuration_type = lambda: sql_MysqL
        options.get_resource = _monkeypatch_get_resource
        self.Session = MockOptions()
        self.Session.Resource = self.resource
        self.Session.Lexicon = None
        self.Session.Corpus = None = coquery.links.Link(
              , "corpus_word",
              , "word_label",
                        join="LEFT JOIN")
        options.cfg.current_server = "Default"
        options.cfg.table_links = {}
        options.cfg.table_links[options.cfg.current_server] = []
项目:coquery    作者:gkunter    | 项目源码 | 文件源码
def setUp(self):
        self.maxDiff = None
        options.cfg = argparse.Namespace()
        options.cfg.number_of_tokens = 0
        options.cfg.limit_matches = False
        options.cfg.regexp = False
        options.cfg.query_case_sensitive = False
        options.get_configuration_type = lambda: sql_MysqL
        options.get_resource = _monkeypatch_get_resource
        self.Session = MockOptions()
        self.Session.Resource = self.resource
        self.Session.Lexicon = None
        self.Session.Corpus = None = coquery.links.Link(
                        join="LEFT JOIN")
        options.cfg.current_server = "Default"
        options.cfg.table_links = {}
        options.cfg.table_links[options.cfg.current_server] = []
项目:deb-python-oauth2client    作者:openstack    | 项目源码 | 文件源码
def setUp(self):
        self.server = mock.Mock()
        self.flow = mock.Mock() = mock.Mock()
        self.credentials = mock.Mock()

        self.flow.step1_get_authorize_url.return_value = (
        self.flow.step2_exchange.return_value = self.credentials

        self.flags = argparse.Namespace(
            noauth_local_webserver=True, logging_level='INFO')
        self.server_flags = argparse.Namespace(
            auth_host_port=[8080, ],
项目:CSB    作者:csb-toolBox    | 项目源码 | 文件源码
def parse(self, args):
        Parse the command line arguments.

        @param args: the list of user-provided command line arguments --
                     normally sys.argv[1:]
        @type args: tuple of str        

        @return: an object initialized with the parsed arguments
        @rtype: argparse.Namespace
            return self.parser.parse_args(args)
        except SystemExit as se:
            if se.code > 0:
                raise AppExit('Bad command line', ExitCodes.USAGE_ERROR)
                raise AppExit(code=ExitCodes.CLEAN)
项目:PHEnix    作者:phe-bioinformatics    | 项目源码 | 文件源码
def __init__(self, args):
        """Min Mapping Quality Zero constructor."""
        # This needs to happen first,because threshold is initialised here.
        super(MQ0FFilter, self).__init__(args)

        # Change the threshold to custom gq value.
        self.threshold = self._default_threshold
        if isinstance(args, argparse.Namespace):
            self.threshold = args.mq_score
        elif isinstance(args, dict):
                self.threshold = float(args.get(self.parameter))
            except (TypeError, ValueError):
                logging.error("Could not retrieve threshold from %s", args.get(self.parameter))
                logging.error("This parameter requires to be a float!")
                raise Exception("Could not create MQ0F filter from parameters: %s" % args)
项目:PHEnix    作者:phe-bioinformatics    | 项目源码 | 文件源码
def __init__(self,because threshold is initialised here.
        super(MQ0Filter, args.get(self.parameter))
                logging.error("This parameter requires to be a float!")
                raise Exception("Could not create MQ0 filter from parameters: %s" % args)
项目:PHEnix    作者:phe-bioinformatics    | 项目源码 | 文件源码
def __init__(self, args):
        """AD Ratio constructor."""
        # This needs to happen first,because threshold is initialised here.
        super(DP4Filter, self).__init__(args)

        # Change the threshold to custom dp value.
        self.threshold = self._default_threshold
        if isinstance(args, argparse.Namespace):
            self.threshold = args.ad_ratio
        elif isinstance(args, args.get(self.parameter))
                logging.error("This parameter requires to be a float!")
                raise Exception("Could not create DP4 filter from parameters: %s" % args)
项目:PHEnix    作者:phe-bioinformatics    | 项目源码 | 文件源码
def __init__(self, args):
        """Min Mapping Quality constructor."""
        # This needs to happen first,because threshold is initialised here.
        super(MQFilter, dict):
                self.threshold = int(args.get(self.parameter))
            except (TypeError, args.get(self.parameter))
                logging.error("This parameter requires to be an integer!")
                raise Exception("Could not create MQ filter from parameters: %s" % args)
项目:PHEnix    作者:phe-bioinformatics    | 项目源码 | 文件源码
def __init__(self, args):
        """Min Depth constructor."""
        # This needs to happen first,because threshold is initialised here.
        super(GQFilter, argparse.Namespace):
            self.threshold = args.gq_score
        elif isinstance(args, args.get(self.parameter))
                logging.error("This parameter requires to be an integer!")
                raise Exception("Could not create GQ filter from parameters: %s" % args)
项目:PHEnix    作者:phe-bioinformatics    | 项目源码 | 文件源码
def __init__(self,because threshold is initialised here.
        super(QualFilter, args.get(self.parameter))
                logging.error("This parameter requires to be a float!")
                raise Exception("Could not create QUAL filter from parameters: %s" % args)
项目:PHEnix    作者:phe-bioinformatics    | 项目源码 | 文件源码
def __init__(self,because threshold is initialised here.
        super(UncallableGTFilter, dict):
                self.threshold = str(args.get(self.parameter))
            except (TypeError, args.get(self.parameter))
                logging.error("This parameter requires to be a string!")
                self.threshold = None
项目:open-project-linter    作者:OpenNewsLabs    | 项目源码 | 文件源码
def parse_linter_args():
    """Parse command-line arguments and set up the command-line
    interface and help.

    Defaults to current working directory for the directory arg and the
    copy of in the directory this module is in for the rules arg.

        object subclass with arguments as attributes for each given argument
    parser = argparse.ArgumentParser()
    group = parser.add_mutually_exclusive_group()
    group.add_argument('-d', '--directory', help="The local path to your repository's base directory. Defaults to the current working directory.",
    parser.add_argument('-r', '--rules', help='The path to the rules configuration file,a YAML file containing the rules you would like to check for. Defaults to path/to/openlinter/rules.yml.',
        default=os.path.join(get_current_script_dir(), 'rules.yml')
    parser.add_argument('-v', '--version', action='version', version='1.0.1')
    return parser.parse_args()


