Python pyparsing 模块,ParseResults() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pyparsing.ParseResults()。
def onecmd(self, line):
""" This executes the actual do_* method for a command.
If the command provided doesn't exist,then it executes _default() instead.
:param line: ParsedString - subclass of string including the pyparsing ParseResults
:return: bool - a flag indicating whether the interpretation of commands should stop
"""
statement = self.parser_manager.parsed(line)
funcname = self._func_named(statement.parsed.command)
if not funcname:
return self.default(statement)
try:
func = getattr(self, funcname)
except AttributeError:
return self.default(statement)
stop = func(statement)
return stop
def _addSimpleData(self, data_list, data):
if isinstance(data[0], ParseResults):
inferred_dimen = len(data[0])
else:
inferred_dimen = 1
if self.dimen == None:
# infer dimension from records
self.dimen = inferred_dimen
if self.current_slice == None:
self._setSlice(tuple(['*'] * self.dimen))
if len(self.free_indices) == inferred_dimen:
for d in data.asList():
self._addValue(data_list, d)
elif len(self.free_indices) > 1 and inferred_dimen:
for c in chunk(data, len(self.free_indices)):
self._addValue(data_list, c)
else:
raise AmplyError("Dimension of elements (%d) does not match "
"declared dimension,(%d)" %
(inferred_dimen, self.dimen))
def make_span(s, l, t):
def compute_hi(init_loc, tokens):
hi = init_loc
for tok in tokens:
if isinstance(tok, ASTNode):
hi = max(hi, tok.span.hi)
elif isinstance(tok, basestring):
hi += len(tok)
elif isinstance(tok, p.ParseResults):
hi = max(hi, compute_hi(init_loc, tok))
else:
raise exception.BananaGrammarBug(
"Couldn't create span for: {}".format(tok)
)
return hi
if len(t) > 0:
span_hi = compute_hi(l, t)
return Span(s, span_hi)
else:
return Span(s, 2)
def __init__(self, span, expr_tree):
"""
Construct an expression
:type span: Span
:param span: Span for the expression.
:type expr_tree: p.ParseResults
;:param expr_tree: The tree generated by pyparsing.infixNotation
"""
super(Expr, self).__init__(span)
# We don't use this tree at this point.
# During typecheck we will make sure
# that the expression can evaluate
# Finally during evaluation,we will evaluate
# the final result.
if isinstance(expr_tree, p.ParseResults):
expr_tree = expr_tree.asList()
if isinstance(expr_tree, list):
for i in range(0, len(expr_tree)):
if isinstance(expr_tree[i], list):
expr_tree[i] = Expr(span, expr_tree[i])
self.expr_tree = expr_tree
else:
self.expr_tree = [expr_tree]
def paren_pop(self, parsed_tokens):
'''
Args:
parsed_tokens: a ParseResult object
Returns:
content: a list of string sentences
'''
# must convert the ParseResult to a list,otherwise adding it to a list
# causes weird results.
if isinstance(parsed_tokens, pypar.ParseResults):
parsed_tokens = parsed_tokens.asList()
content = self.paren_pop_helper(parsed_tokens)
return content
def paren_pop(self, pypar.ParseResults):
parsed_tokens = parsed_tokens.asList()
content = self.paren_pop_helper(parsed_tokens)
return content
def simplify(expr):
if isinstance(expr, ParseResults) and len(expr) == 1:
return simplify(expr[0])
if isinstance(expr, (list, ParseResults)):
return map(simplify, expr)
if not isinstance(expr, CompValue):
return expr
if expr.name.endswith('Expression'):
if expr.other is None:
return simplify(expr.expr)
for k in expr.keys():
expr[k] = simplify(expr[k])
# expr['expr']=simplify(expr.expr)
# expr['other']=simplify(expr.other)
return expr
def _traverseAgg(e, visitor=lambda n, v: None):
"""
Traverse a parse-tree,visit each node
if visit functions return a value,replace current node
"""
res = []
if isinstance(e, ParseResults, tuple)):
res = [_traverseAgg(x, visitor) for x in e]
elif isinstance(e, CompValue):
for k, val in e.iteritems():
if val != None:
res.append(_traverseAgg(val, visitor))
return visitor(e, res)
def simplify(expr):
if isinstance(expr, CompValue):
return expr
if expr.name.endswith('Expression'):
if expr.other is None:
return simplify(expr.expr)
for k in expr.keys():
expr[k] = simplify(expr[k])
# expr['expr']=simplify(expr.expr)
# expr['other']=simplify(expr.other)
return expr
def u(x, ip=None):
if type(x) is pyparsing.ParseResults:
return x.asList()[0]
if isinstance(x, Symbolic):
return x.value(ip)
return x
def _coerce_parse_result(results):
if isinstance(results, ParseResults):
return [_coerce_parse_result(i) for i in results]
else:
return results
def precmd(self, statement):
"""Hook method executed just before the command is processed by ``onecmd()`` and after adding it to the history.
:param statement: ParsedString - subclass of str which also contains pyparsing ParseResults instance
:return: ParsedString - a potentially modified version of the input ParsedString statement
"""
return statement
# ----- Methods which are cmd2-specific lifecycle hooks which are not present in cmd -----
# noinspection PyMethodMayBeStatic
def postparse(self, parse_result):
"""Hook that runs immediately after parsing the command-line but before ``parsed()`` returns a ParsedString.
:param parse_result: pyparsing.ParseResults - parsing results output by the pyparsing parser
:return: pyparsing.ParseResults - potentially modified ParseResults object
"""
return parse_result
# noinspection PyMethodMayBeStatic
def _restore_output(self, statement):
"""Handles restoring state after output redirection as well as the actual pipe operation if present.
:param statement: ParsedString - subclass of str which also contains pyparsing ParseResults instance
"""
# If we have redirected output to a file or the clipboard or piped it to a shell command,then restore state
if self.kept_state is not None:
# If we redirected output to the clipboard
if statement.parsed.output and not statement.parsed.outputTo:
self.stdout.seek(0)
write_to_paste_buffer(self.stdout.read())
try:
# Close the file or pipe that stdout was redirected to
self.stdout.close()
except broKEN_PIPE_ERROR:
pass
finally:
# Restore self.stdout
self.kept_state.restore()
self.kept_state = None
# If we were piping output to a shell command,then close the subprocess the shell command was running in
if self.pipe_proc is not None:
self.pipe_proc.communicate()
self.pipe_proc = None
# Restore sys.stdout if need be
if self.kept_sys is not None:
self.kept_sys.restore()
self.kept_sys = None
def _coerce_parse_result(results):
if isinstance(results, ParseResults):
return [_coerce_parse_result(i) for i in results]
else:
return results
def get_port(node):
if len(node) > 1:
if isinstance(node[1], pyparsing.ParseResults):
if len(node[1][0]) == 2:
if node[1][0][0] == ':':
return node[1][0][1]
return None
def _coerce_parse_result(results):
if isinstance(results, ParseResults):
return [_coerce_parse_result(i) for i in results]
else:
return results
def _coerce_parse_result(results):
if isinstance(results, ParseResults):
return [_coerce_parse_result(i) for i in results]
else:
return results
def _coerce_parse_result(results):
if isinstance(results, ParseResults):
return [_coerce_parse_result(i) for i in results]
else:
return results
def _coerce_parse_result(results):
if isinstance(results, ParseResults):
return [_coerce_parse_result(i) for i in results]
else:
return results
def _coerce_parse_result(results):
if isinstance(results, ParseResults):
return [_coerce_parse_result(i) for i in results]
else:
return results
def _coerce_parse_result(results):
if isinstance(results, ParseResults):
return [_coerce_parse_result(i) for i in results]
else:
return results
def _coerce_parse_result(results):
if isinstance(results, ParseResults):
return [_coerce_parse_result(i) for i in results]
else:
return results
def _parsed_query2dict(cls, parsed_query):
result = None
while parsed_query:
part = parsed_query.pop()
if part in cls.binary_operator:
result = {part: {parsed_query.pop(): result}}
elif part in cls.multiple_operators:
if result.get(part):
result[part].append(
cls._parsed_query2dict(parsed_query.pop()))
else:
result = {part: [result]}
elif part in cls.uninary_operators:
result = {part: result}
elif isinstance(part, pyparsing.ParseResults):
kind = part.getName()
if kind == "list":
res = part.asList()
else:
res = cls._parsed_query2dict(part)
if result is None:
result = res
elif isinstance(result, dict):
list(result.values())[0].append(res)
else:
result = part
return result
def _coerce_parse_result(results):
if isinstance(results, ParseResults):
return [_coerce_parse_result(i) for i in results]
else:
return results
def _coerce_parse_result(results):
if isinstance(results, ParseResults):
return [_coerce_parse_result(i) for i in results]
else:
return results
def _coerce_parse_result(results):
if isinstance(results, ParseResults):
return [_coerce_parse_result(i) for i in results]
else:
return results
def _coerce_parse_result(results):
if isinstance(results, ParseResults):
return [_coerce_parse_result(i) for i in results]
else:
return results
def parent_children(self, *children):
for i in children:
if isinstance(i, languageSyntaxOBbase):
i.set_parent(self)
i.parent_own_children()
elif isinstance(i, tuple, pp.ParseResults)):
self.parent_children(*i)
def _coerce_parse_result(results):
if isinstance(results, ParseResults):
return [_coerce_parse_result(i) for i in results]
else:
return results
def to_flat_dict(tokens, parent_key='', sep='_'):
"""
Creates a flattened dictionary from the named values in tokens.
E.g. suppose tokens.dump() output is
- isotopic_comp: ['0.000629','(','7',')']
- nominal_value: 0.000629
- std_dev: 7e-06
Then the new dictionary is {'isotopic_comp_nominal_value': 0.000629,'isotopic_comp_std_dev':7e-06}
Parameters
----------
tokens: ~pyparsing.ParseResults
parent_key: ~str -- is used in recursive calls; you don't need to pass this
sep: ~str -- is used to concatenate keys (default: "_")
Returns: ~dict
-------
"""
tokens_dict = dict()
for key, item in tokens.items():
new_key = parent_key + sep + key if parent_key else key
if isinstance(item, ParseResults):
tokens_dict.update(to_flat_dict(item, parent_key=new_key, sep=sep))
else:
tokens_dict[new_key] = item
return tokens_dict
def _coerce_parse_result(results):
if isinstance(results, ParseResults):
return [_coerce_parse_result(i) for i in results]
else:
return results
def _coerce_parse_result(results):
if isinstance(results, ParseResults):
return [_coerce_parse_result(i) for i in results]
else:
return results
def __getattr__(self, name):
"""Modified getattr to make it easier to access named results.
It Now always returns a ParseResults() instance,even if the element
is not there.
"""
result = self.get(name, None)
if result is None:
return ParseResults([])
# if not isinstance(result,ParseResults):
# return ParseResults(result)
return result
def _coerce_parse_result(results):
if isinstance(results, ParseResults):
return [_coerce_parse_result(i) for i in results]
else:
return results
def _parse_token(self, token):
"""Concrete implementation of parent abstract method.
:Parameters:
according to parent :py:meth:`cumin.backends.BaseQueryAggregator._parse_token`.
"""
if not isinstance(token, pp.ParseResults): # pragma: no cover - this should never happen
raise InvalidQueryError('Expecting ParseResults object,got {type}: {token}'.format(
type=type(token), token=token))
token_dict = token.asDict()
self.logger.trace('Token is: %s | %s', token_dict, token)
if 'hosts' in token_dict:
element = self._get_stack_element()
element['hosts'] = NodeSet.fromlist(token_dict['hosts'])
if 'bool' in token_dict:
element['bool'] = token_dict['bool']
self.stack_pointer['children'].append(element)
elif 'open_subgroup' in token_dict and 'close_subgroup' in token_dict:
self._open_subgroup()
if 'bool' in token_dict:
self.stack_pointer['bool'] = token_dict['bool']
for subtoken in token:
if isinstance(subtoken, str): # Grammar literals,boolean operators and parentheses
continue
self._parse_token(subtoken)
self._close_subgroup()
else: # pragma: no cover - this should never happen
raise InvalidQueryError('Got unexpected token: {token}'.format(token=token))
def _parse_token(self, token):
"""Concrete implementation of parent abstract method.
:Parameters:
according to parent :py:meth:`cumin.backends.BaseQuery._parse_token`.
Raises:
cumin.backends.InvalidQueryError: on internal parsing error.
"""
if not isinstance(token, token)
if 'key' in token_dict and 'value' in token_dict:
if token_dict['key'] == 'project':
self.search_project = token_dict['value']
else:
self.search_params[token_dict['key']] = token_dict['value']
elif 'all' in token_dict:
pass # nothing to do,search_project and search_params have the right defaults
else: # pragma: no cover - this should never happen
raise InvalidQueryError('Got unexpected token: {token}'.format(token=token))
def _parse_token(self, token):
"""Recursively interpret the tokens returned by the grammar parsing.
Arguments:
token (pyparsing.ParseResults): a single token returned by the grammar parsing.
"""
def _parse_token(self, token):
"""Concrete implementation of parent abstract method.
:Parameters:
according to parent :py:meth:`cumin.backends.BaseQueryAggregator._parse_token`.
Raises:
cumin.backends.InvalidQueryError: on internal parsing error.
"""
if not isinstance(token, ParseResults): # pragma: no cover - this should never happen
raise InvalidQueryError('Expecting ParseResults object, token=token))
token_dict = token.asDict()
self.logger.trace('Token is: %s', token_dict)
if self._replace_alias(token_dict):
return # This token was an alias and got replaced
if 'backend' in token_dict and 'query' in token_dict:
element = self._get_stack_element()
query = self.registered_backends[token_dict['backend']].cls(self.config)
element['hosts'] = query.execute(token_dict['query'])
if 'bool' in token_dict:
element['bool'] = token_dict['bool']
self.stack_pointer['children'].append(element)
elif 'open_subgroup' in token_dict and 'close_subgroup' in token_dict:
self._open_subgroup()
if 'bool' in token_dict:
self.stack_pointer['bool'] = token_dict['bool']
for subtoken in token:
if isinstance(subtoken, str):
continue
self._parse_token(subtoken)
self._close_subgroup()
else: # pragma: no cover - this should never happen
raise InvalidQueryError('Got unexpected token: {token}'.format(token=token))
def _coerce_parse_result(results):
if isinstance(results, ParseResults):
return [_coerce_parse_result(i) for i in results]
else:
return results
def _traverse(e, visitPre=lambda n: None, visitPost=lambda n: None):
"""
Traverse a parse-tree,replace current node
"""
_e = visitPre(e)
if _e is not None:
return _e
if e is None:
return None
if isinstance(e, ParseResults)):
return [_traverse(x, visitPre, visitPost) for x in e]
elif isinstance(e, tuple):
return tuple([_traverse(x, visitPost) for x in e])
elif isinstance(e, val in e.iteritems():
e[k] = _traverse(val, visitPost)
_e = visitPost(e)
if _e is not None:
return _e
return e
def value(ctx, val, variables=False, errors=False):
"""
utility function for evaluating something...
Variables will be looked up in the context
normally,non-bound vars is an error,
set variables=True to return unbound vars
normally,an error raises the error,
set errors=True to return error
"""
if isinstance(val, Expr):
return val.eval(ctx) # recurse?
elif isinstance(val, CompValue):
raise Exception("What do I do with this CompValue? %s" % val)
elif isinstance(val, list):
return [value(ctx, x, variables, errors) for x in val]
elif isinstance(val, (BNode, Variable)):
r = ctx.get(val)
if isinstance(r, SPARQLError) and not errors:
raise r
if r is not None:
return r
# not bound
if variables:
return val
else:
raise NotBoundError
elif isinstance(val, ParseResults) and len(val) == 1:
return value(ctx, val[0], errors)
else:
return val
def __init__(self, name, tokenList, isList):
self.isList = isList
self.name = name
if isinstance(tokenList, ParseResults)) and len(tokenList) == 1:
tokenList = tokenList[0]
self.tokenList = tokenList
def _traverse(e, visitPost)
_e = visitPost(e)
if _e is not None:
return _e
return e
def __init__(self, ParseResults)) and len(tokenList) == 1:
tokenList = tokenList[0]
self.tokenList = tokenList
def _coerce_parse_result(results):
if isinstance(results, ParseResults):
return [_coerce_parse_result(i) for i in results]
else:
return results
def _redirect_output(self, statement):
"""Handles output redirection for >,>>,and |.
:param statement: ParsedString - subclass of str which also contains pyparsing ParseResults instance
"""
if statement.parsed.pipeto:
self.kept_state = Statekeeper(self, ('stdout',))
# Create a pipe with read and write sides
read_fd, write_fd = os.pipe()
# Make sure that self.poutput() expects unicode strings in Python 3 and byte strings in Python 2
write_mode = 'w'
read_mode = 'r'
if six.PY2:
write_mode = 'wb'
read_mode = 'rb'
# Open each side of the pipe and set stdout accordingly
# noinspection PyTypeChecker
self.stdout = io.open(write_fd, write_mode)
# noinspection PyTypeChecker
subproc_stdin = io.open(read_fd, read_mode)
# We want Popen to raise an exception if it fails to open the process. Thus we don't set shell to True.
try:
self.pipe_proc = subprocess.Popen(shlex.split(statement.parsed.pipeto), stdin=subproc_stdin)
except Exception as ex:
# Restore stdout to what it was and close the pipe
self.stdout.close()
subproc_stdin.close()
self.pipe_proc = None
self.kept_state.restore()
self.kept_state = None
# Re-raise the exception
raise ex
elif statement.parsed.output:
if (not statement.parsed.outputTo) and (not can_clip):
raise EnvironmentError('Cannot redirect to paste buffer; install ``xclip`` and re-run to enable')
self.kept_state = Statekeeper(self,))
self.kept_sys = Statekeeper(sys,))
if statement.parsed.outputTo:
mode = 'w'
if statement.parsed.output == 2 * self.redirector:
mode = 'a'
sys.stdout = self.stdout = open(os.path.expanduser(statement.parsed.outputTo), mode)
else:
sys.stdout = self.stdout = tempfile.TemporaryFile(mode="w+")
if statement.parsed.output == '>>':
self.poutput(get_paste_buffer())
def push_top_graph_stmt(str, loc, toks):
attrs = {}
g = None
for element in toks:
if (isinstance(element, (pyparsing.ParseResults, list)) and
len(element) == 1 and isinstance(element[0], basestring)):
element = element[0]
if element == 'strict':
attrs['strict'] = True
elif element in ['graph', 'digraph']:
attrs = {}
g = pydot.Dot(graph_type=element, **attrs)
attrs['type'] = element
top_graphs.append(g)
elif isinstance(element, basestring):
g.set_name(element)
elif isinstance(element, pydot.Subgraph):
g.obj_dict['attributes'].update(element.obj_dict['attributes'])
g.obj_dict['edges'].update(element.obj_dict['edges'])
g.obj_dict['nodes'].update(element.obj_dict['nodes'])
g.obj_dict['subgraphs'].update(element.obj_dict['subgraphs'])
g.set_parent_graph(g)
elif isinstance(element, P_AttrList):
attrs.update(element.attrs)
elif isinstance(element, list)):
add_elements(g, element)
else:
raise ValueError("UnkNown element statement: %r " % element)
for g in top_graphs:
update_parent_graph_hierarchy(g)
if len(top_graphs) == 1:
return top_graphs[0]
return top_graphs