Python json 模块,get() 实例源码
我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用json.get()。
def resolve_env_to_prefix(name_or_prefix):
"""Convert an env name or path into a canonical prefix path.
Returns:
Absolute path of prefix or None if it isn't found.
"""
if os.path.isabs(name_or_prefix):
return name_or_prefix
json = info()
root_prefix = json.get('root_prefix', None)
if name_or_prefix == 'root':
return root_prefix
envs = json.get('envs', [])
for prefix in envs:
if os.path.basename(prefix) == name_or_prefix:
return prefix
return None
def parse_spec(spec):
"""Parse a package name and version spec as conda would.
Returns:
``ParsedSpec`` or None on failure
"""
m = _spec_pat.match(spec)
if m is None:
return None
pip_constraint = m.group('pc')
if pip_constraint is not None:
pip_constraint = pip_constraint.replace(' ', '')
return ParsedSpec(name=m.group('name').lower(), conda_constraint=m.group('cc'), pip_constraint=pip_constraint)
# these are in order of preference. On pre-4.1.4 Windows,
# CONDA_PREFIX and CONDA_ENV_PATH aren't set,so we get to
# CONDA_DEFAULT_ENV.
def environ_set_prefix(environ, prefix, varname=conda_prefix_variable()):
prefix = os.path.normpath(prefix)
environ[varname] = prefix
if varname != 'CONDA_DEFAULT_ENV':
# This case matters on both Unix and Windows
# with conda >= 4.1.4 since requirement.env_var
# is CONDA_PREFIX,and matters on Unix only pre-4.1.4
# when requirement.env_var is CONDA_ENV_PATH.
global _envs_dirs
global _root_dir
if _envs_dirs is None:
i = info()
_envs_dirs = [os.path.normpath(d) for d in i.get('envs_dirs', [])]
_root_dir = os.path.normpath(i.get('root_prefix'))
if prefix == _root_dir:
name = 'root'
else:
for d in _envs_dirs:
name = subdirectory_relative_to_directory(prefix, d)
if name != prefix:
break
environ['CONDA_DEFAULT_ENV'] = name
def node_byo():
token = ""
try:
if dockercloud.namespace:
json = dockercloud.api.http.send_request("POST", "api/infra/%s/%s/token" %
(API_VERSION, dockercloud.namespace))
else:
json = dockercloud.api.http.send_request("POST", "api/infra/%s/token" % API_VERSION)
if json:
token = json.get("token", "")
except Exception as e:
print(e, file=sys.stderr)
sys.exit(EXCEPTION_EXIT_CODE)
print("Docker Cloud lets you use your own servers as nodes to run containers. "
"For this you have to install our agent.")
print("Run the following command on your server:")
print()
print("\tcurl -Ls https://get.cloud.docker.com/ | sudo -H sh -s", token)
print()
def record_sync_status(total, status, message, data, sync_type):
delete_count = 'null'
worn_clues_list = []
update_count = 'null'
save_count = 'null'
if data:
delete_count = data.get("deleteNum")
worn_clues_list = data.get("wrongIdList")
update_count = data.get("updateNum")
save_count = data.get("saveNum")
sql = "insert into TAB_IOPM_CLUES_SYNC_INFO (id,total,status,message,delete_count,update_count,save_count,worn_clues_list,sync_type) values (%s,%s,'%s',%s) "%('seq_iopm_sync.nextval', total, delete_count, update_count, save_count, ','.join(worn_clues_list), sync_type)
print(sql)
return db.add(sql)
def geocode(address):
"""
????geocoding????????????
:param address:???????
:return:
"""
geocoding = {'s': 'rsv3',
'key': key,
'city': '??',
'address': address}
res = requests.get(
"http://restapi.amap.com/v3/geocode/geo", params=geocoding)
if res.status_code == 200:
json = res.json()
status = json.get('status')
count = json.get('count')
if status == '1' and int(count) >= 1:
geocodes = json.get('geocodes')[0]
lng = float(geocodes.get('location').split(',')[0])
lat = float(geocodes.get('location').split(',')[1])
return [lng, lat]
else:
return None
else:
return None
def refresh_client(self, from_dt=None, to_dt=None):
"""
Refreshes the CalendarService endpoint,ensuring that the
event data is up-to-date. If no 'from_dt' or 'to_dt' datetimes
have been given,the range becomes this month.
"""
today = datetime.today()
first_day, last_day = monthrange(today.year, today.month)
if not from_dt:
from_dt = datetime(today.year, today.month, first_day)
if not to_dt:
to_dt = datetime(today.year, last_day)
params = dict(self.params)
params.update({
'lang': 'en-us',
'usertz': get_localzone().zone,
'startDate': from_dt.strftime('%Y-%m-%d'),
'endDate': to_dt.strftime('%Y-%m-%d')
})
req = self.session.get(self._calendar_refresh_url, params=params)
self.response = req.json()
def lights():
"""
example_lights_json = {
'rooms': [
{'name': 'Living Room','on': True},
]
}
"""
if is_local_request(flask.request):
json = flask.request.get_json()
rooms = json.get('rooms', [])
logger.info('Switching rooms %s', rooms)
for json_room in rooms:
room_name = json_room['name']
on_state = json_room['on']
for room in ROOMS:
# Strip whitespace
if room.name == room_name:
logger.info('Switching room %s', room.name)
room.switch(on_state)
return "Light commands sent."
else:
logger.info('Lights accessed by remote address %s', flask.request.environ['REMOTE_ADDR'])
flask.abort(404)
def geocode(address):
"""
????geocoding????????????
:param address:???????
:return:
"""
geocoding = {'s': 'rsv3', lat]
else:
return None
else:
return None
def check_authorization(self, access_token):
"""Check an authorization created by a registered application.
OAuth applications can use this method to check token validity
without hitting normal rate limits because of Failed login attempts.
If the token is valid,it will return True,otherwise it will return
False.
:returns: bool
"""
p = self.session.params
auth = (p.get('client_id'), p.get('client_secret'))
if access_token and auth:
url = self._build_url('applications', str(auth[0]), 'tokens',
str(access_token))
resp = self._get(url, auth=auth, params={
'client_id': None, 'client_secret': None
})
return self._boolean(resp, 200, 404)
return False
def resolve_env_to_prefix(name_or_prefix):
"""Convert an env name or path into a canonical prefix path.
Returns:
Absolute path of prefix or None if it isn't found.
"""
if os.path.isabs(name_or_prefix):
return name_or_prefix
json = info()
root_prefix = json.get('root_prefix', [])
for prefix in envs:
if os.path.basename(prefix) == name_or_prefix:
return prefix
return None
def environ_set_prefix(environ, d)
if name != prefix:
break
environ['CONDA_DEFAULT_ENV'] = name
# This isn't all (e.g. leaves out arm,power). it's sort of "all
# that people typically publish for"
def get(self, object_name, path=None, params=None, retries=0):
"""
Makes a GET request to the API. Checks for invalid requests that raise PardotAPIErrors. If the API key is
invalid,one re-authentication request is made,in case the key has simply expired. If no errors are raised,
returns either the JSON response,or if no JSON was returned,returns the HTTP response status code.
"""
if params is None:
params = {}
params.update({'user_key': self.user_key, 'api_key': self.api_key, 'format': 'json'})
try:
self._check_auth(object_name=object_name)
request = requests.get(self._full_path(object_name, path), params=params)
response = self._check_response(request)
return response
except PardotAPIError as err:
if err.message == 'Invalid API key or user key':
response = self._handle_expired_api_key(err, retries, 'get', path, params)
return response
else:
raise err
def environ_get_prefix(environ):
for name in _all_prefix_variables:
if name in environ:
return environ.get(name)
return None
def request(self, *args, **kwargs):
response = super(Session, self).request(*args, **kwargs)
content_type = response.headers.get('Content-Type', '').split(';')[0]
json_mimetypes = ['application/json', 'text/json']
if content_type not in json_mimetypes:
return response
try:
json = response.json()
except:
return response
reason = json.get('errorMessage') or json.get('reason')
if not reason and isinstance(json.get('error'), six.string_types):
reason = json.get('error')
if not reason and not response.ok:
reason = response.reason
if not reason and json.get('error'):
reason = "UnkNown reason"
code = json.get('errorCode')
if reason:
raise Exception
return response
def _get_cookiejar_path(self):
# Get path for cookiejar file
return os.path.join(
self._cookie_directory,
''.join([c for c in self.user.get('apple_id') if match(r'\w', c)])
)
def __unicode__(self):
return 'iCloud API: %s' % self.user.get('apple_id')
def get_event_detail(self, pguid, guid):
"""
Fetches a single event's details by specifying a pguid
(a calendar) and a guid (an event's ID).
"""
params = dict(self.params)
params.update({'lang': 'en-us', 'usertz': get_localzone().zone})
url = '%s/%s/%s' % (self._calendar_event_detail_url, guid)
req = self.session.get(url, params=params)
self.response = req.json()
return self.response['Event'][0]
def party():
if not is_local_request(flask.request):
flask.abort(404)
global party_process
json = flask.request.get_json()
enabled = json.get('enabled', False)
logger.info('Got party state %r' % enabled)
if enabled and not env.is_party_mode():
# Start party XD
party_process = Popen(["python3", "./animate_web.py", "run"]) # async
env.set_party_mode(True)
env.set_motion_enabled(False)
elif not enabled and env.is_party_mode():
# Stop party :(
env.set_party_mode(False)
env.set_motion_enabled(True)
if party_process is not None:
party_process.kill()
party_process = None
# Return lights to Circadian color
command = copy.deepcopy(COMMAND_FULL_ON)
Circadian_color = get_current_Circadian_color(date=get_local_time())
command_all_lights(Circadian_color.apply_to_command(command))
return "Party mode is Now %r" % enabled
def guest_mode():
if is_local_request(flask.request):
json = flask.request.get_json()
enabled = json.get('enabled', False)
env.set_guest_mode(enabled)
return "Guest mode is Now %r" % enabled
else:
logger.info('Guest Mode accessed by remote address %s', flask.request.environ['REMOTE_ADDR'])
flask.abort(404)
def motion_mode():
if is_local_request(flask.request):
json = flask.request.get_json()
enabled = json.get('enabled', False)
env.set_motion_enabled(enabled)
return "Motion mode is Now %r" % enabled
else:
logger.info('Motion Mode accessed by remote address %s', flask.request.environ['REMOTE_ADDR'])
flask.abort(404)
def vacation_mode():
if is_local_request(flask.request):
json = flask.request.get_json()
enabled = json.get('enabled', False)
env.set_vacation_mode(enabled)
return "Vacation mode is Now %r" % enabled
else:
logger.info('Vacation Mode accessed by remote address %s', flask.request.environ['REMOTE_ADDR'])
flask.abort(404)
def get_eset_sig_from_scan(scans):
if scans is None:
return None
for scan in scans:
if scan.get('name') in ['ESET-NOD32', 'NOD32', 'NOD32v2']:
return scan.get('result')
return ''
def key_dict_clean(json):
if json is None:
return None
array = []
for key in json.keys():
tmp_dict = json.get(key)
tmp_dict["name"] = key
array.append(tmp_dict)
return array
# Replace dot with _
# in dictionaries keys
# in order to save them in mongo
def key_list_clean(json):
if json is None:
return None
array = []
for key in json.keys():
tmp_dict = {}
tmp_dict["name"] = key
tmp_dict["values"] = json.get(key)
array.append(tmp_dict)
return array
def jsonize(data):
return json.dumps(data, sort_keys=False, indent=4)
# Checks if the Meta has a date. If it doesn't
# it updates it. If a date is found,the oldest
# date will get saved.
def change_date_to_str(res):
if res is None:
return None
for date_key in ["date", "upload_date", "date_start", "date_end", "date_enqueued"]:
if res.get(date_key) is None:
pass
else:
res[date_key] = str(res.get(date_key))
return res
def add_error(resp_dict, error_code, error_message):
if type(resp_dict) != dict:
return resp_dict
if resp_dict.get('errors') is None:
resp_dict["errors"] = []
resp_dict["errors"].append({"code": error_code, "message": error_message})
return resp_dict
def cursor_to_dict(f1, retrieve):
results = []
for f in f1:
results.append(f)
ret = []
for a in results:
dic = {}
for key in retrieve.keys():
steps = key.split('.')
partial_res = a
for step in steps:
partial_res = partial_res.get(step)
if partial_res is None:
break
if isinstance(partial_res, list):
partial_res = None
break
legend_to_show = key.split('.')[-1]
if (legend_to_show == "file_id"):
legend_to_show = "sha1"
if (legend_to_show == "TimeDateStamp" and partial_res is not None):
partial_res = time.strftime(
"%Y-%m-%d %H:%M:%s", time.gmtime(int(eval(partial_res), 16)))
if (legend_to_show == "timeDateStamp" and partial_res is not None):
partial_res = time.strftime(
"%Y-%m-%d %H:%M:%s", time.gmtime(partial_res))
dic[legend_to_show] = partial_res
ret.append(dic)
return ret
# ****************TEST_CODE******************
def get_required_param(json, param):
if json is None:
logger.info("Request is not a valid json")
raise InvalidUsage("Request is not a valid json")
value = json.get(param, None)
if (value is None) or (value=='') or (value==[]):
logger.info("A required request parameter '{}' had value {}".format(param, value))
raise InvalidUsage("A required request parameter '{}' was not provided".format(param))
return value
def get_optional_param(json, param, default):
if json is None:
logger.info("Request is not a valid json")
raise InvalidUsage("Request is not a valid json")
value = json.get(param, None)
if (value is None) or (value=='') or (value==[]):
logger.info("An optional request parameter '{}' had value {} and was replaced with default value {}".format(param, value, default))
value = default
return value
def admin_stats(self, option):
"""This is a simple way to get statistics about your system.
:param str option: (required),accepted values: ('all','repos',
'hooks','pages','orgs','users','pulls','issues',
'milestones','gists','comments')
:returns: dict
"""
stats = {}
if option.lower() in ('all', 'repos', 'hooks', 'pages', 'orgs',
'users', 'pulls', 'issues', 'milestones',
'gists', 'comments'):
url = self._build_url('enterprise', 'stats', option.lower())
stats = self._json(self._get(url), 200)
return stats
def rl_dispatch():
set_event()
while True:
rated_queue.get()
set_event();
def read_minfo(method):
json = load_json("./files/minfo.json")
if json:
return str(json.get(method,"no method info"))
else:
return "no info at all"
def update_minfo(method, type, incr):
with json_lock:
json = load_json("./files/minfo.json")
if not json:
json = {"method":{"type":incr}}
else:
mmap = json.get(method, {})
mmap[type] = mmap.get(type,0)+incr
json[method] = mmap
open("./files/minfo.json","w").write(pretty_dump(json))
def environ_get_prefix(environ):
for name in _all_prefix_variables:
if name in environ:
return environ.get(name)
return None
def _check_response(response):
"""
Checks the HTTP response to see if it contains JSON. If it does,checks the JSON for error codes and messages.
Raises PardotAPIError if an error was found. If no error was found,returns the JSON. If JSON was not found,
returns the response status code.
"""
if response.headers.get('content-type') == 'application/json':
json = response.json()
error = json.get('err')
if error:
raise PardotAPIError(json_response=json)
return json
else:
return response.status_code
def authenticate(self):
"""
Authenticates the user and sets the API key if successful. Returns True if authentication is successful,
False if authentication fails.
"""
try:
auth = self.post('login', params={'email': self.email, 'password': self.password})
self.api_key = auth.get('api_key')
if self.api_key is not None:
return True
return False
except PardotAPIError:
return False
def __run_actions(self, session_id, current_request, context, i,
verbose):
if i <= 0:
raise WitError('Max steps reached,stopping.')
json = self.converse(session_id, verbose=verbose)
if 'type' not in json:
raise WitError('Couldn\'t find type in Wit response')
if current_request != self._sessions[session_id]:
return context
self.logger.debug('Context: %s', context)
self.logger.debug('Response type: %s', json['type'])
# backwards-cpmpatibility with API version 20160516
if json['type'] == 'merge':
json['type'] = 'action'
json['action'] = 'merge'
if json['type'] == 'error':
raise WitError('Oops,I don\'t kNow what to do.')
if json['type'] == 'stop':
return context
request = {
'session_id': session_id,
'context': dict(context),
'text': message,
'entities': json.get('entities'),
}
if json['type'] == 'msg':
self.throw_if_action_missing('send')
response = {
'text': json.get('msg').encode('utf8'),
'quickreplies': json.get('quickreplies'),
}
self.actions['send'](request, response)
elif json['type'] == 'action':
action = json['action']
self.throw_if_action_missing(action)
context = self.actions[action](request)
if context is None:
self.logger.warn('missing context - did you forget to return it?')
context = {}
else:
raise WitError('unkNown type: ' + json['type'])
if current_request != self._sessions[session_id]:
return context
return self.__run_actions(session_id, None,
i - 1, verbose)
def service_ps(quiet, stack):
try:
headers = ["NAME", "UUID", "STATUS", "#CONTAINERS", "IMAGE", "DEPLOYED", "PUBLIC DNS", "STACK"]
stack_resource_uri = None
if stack:
s = dockercloud.Utils.fetch_remote_stack(stack, raise_exceptions=False)
if isinstance(s, dockercloud.NonUniqueIdentifier):
raise dockercloud.NonUniqueIdentifier(
"Identifier %s matches more than one stack,please use UUID instead" % stack)
if isinstance(s, dockercloud.ObjectNotFound):
raise dockercloud.ObjectNotFound("Identifier '%s' does not match any stack" % stack)
stack_resource_uri = s.resource_uri
service_list = dockercloud.Service.list(state=status, stack=stack_resource_uri)
data_list = []
long_uuid_list = []
has_unsynchronized_service = False
stacks = {}
for stack in dockercloud.Stack.list():
stacks[stack.resource_uri] = stack.name
for service in service_list:
service_state = utils.add_unicode_symbol_to_state(service.state)
if not service.synchronized and service.state != "Redeploying":
service_state += "(*)"
has_unsynchronized_service = True
data_list.append([service.name, service.uuid[:8],
service_state,
service.current_num_containers,
service.image_name,
utils.get_humanize_local_datetime_from_utc_datetime_string(service.deployed_datetime),
service.public_dns,
stacks.get(service.stack)])
long_uuid_list.append(service.uuid)
if len(data_list) == 0:
data_list.append(["", "", ""])
if quiet:
for uuid in long_uuid_list:
print(uuid)
else:
utils.tabulate_result(data_list, headers)
if has_unsynchronized_service:
print(
"\n(*) Please note that this service needs to be redeployed to "
"have its configuration changes applied")
except Exception as e:
print(e, file=sys.stderr)
sys.exit(EXCEPTION_EXIT_CODE)
def service_create(image, name, cpu_shares, memory, privileged, target_num_containers, run_command, entrypoint,
expose, publish, envvars, envfiles, tag, linked_to_service, autorestart, autodestroy, autoredeploy,
roles, sequential, volume, volumes_from, deployment_strategy, sync, net, pid):
has_exception = False
try:
ports = utils.parse_published_ports(publish)
# Add exposed_port to ports,excluding whose inner_port that has been defined in published ports
exposed_ports = utils.parse_exposed_ports(expose)
for exposed_port in exposed_ports:
existed = False
for port in ports:
if exposed_port.get('inner_port', '') == port.get('inner_port', ''):
existed = True
break
if not existed:
ports.append(exposed_port)
envvars = utils.parse_envvars(envvars, envfiles)
links_service = utils.parse_links(linked_to_service, 'to_service')
tags = []
if tag:
if isinstance(tag, list):
for t in tag:
tags.append({"name": t})
else:
tags.append({"name": tag})
bindings = utils.parse_volume(volume)
bindings.extend(utils.parse_volumes_from(volumes_from))
service = dockercloud.Service.create(image=image, name=name, cpu_shares=cpu_shares,
memory=memory, privileged=privileged,
target_num_containers=target_num_containers, run_command=run_command,
entrypoint=entrypoint, container_ports=ports, container_envvars=envvars,
linked_to_service=links_service,
autorestart=autorestart, autodestroy=autodestroy,
autoredeploy=autoredeploy,
roles=roles, sequential_deployment=sequential, tags=tags,
bindings=bindings,
deployment_strategy=deployment_strategy, net=net, pid=pid)
result = service.save()
if not utils.sync_action(service, sync):
has_exception = True
if result:
print(service.uuid)
except Exception as e:
print(e, file=sys.stderr)
has_exception = True
if has_exception:
sys.exit(EXCEPTION_EXIT_CODE)
def service_run(image,
expose,
roles, pid=pid)
service.save()
result = service.start()
if not utils.sync_action(service, file=sys.stderr)
has_exception = True
if has_exception:
sys.exit(EXCEPTION_EXIT_CODE)
def tag_ls(identifiers, quiet):
has_exception = False
headers = ["IDENTIFIER", "TYPE", "TAGS"]
data_list = []
tags_list = []
for identifier in identifiers:
try:
obj = dockercloud.Utils.fetch_remote_service(identifier, raise_exceptions=False)
if isinstance(obj, dockercloud.ObjectNotFound):
obj = dockercloud.Utils.fetch_remote_nodecluster(identifier, raise_exceptions=False)
if isinstance(obj, dockercloud.ObjectNotFound):
obj = dockercloud.Utils.fetch_remote_node(identifier, raise_exceptions=False)
if isinstance(obj, dockercloud.ObjectNotFound):
raise dockercloud.ObjectNotFound(
"Identifier '%s' does not match any service,node or nodecluster" % identifier)
else:
obj_type = 'Node'
else:
obj_type = 'NodeCluster'
else:
obj_type = 'Service'
tagnames = []
for tags in dockercloud.Tag.fetch(obj).list():
tagname = tags.get('name', '')
if tagname:
tagnames.append(tagname)
data_list.append([identifier, obj_type, ' '.join(tagnames)])
tags_list.append(' '.join(tagnames))
except Exception as e:
if isinstance(e, dockercloud.ObjectNotFound):
data_list.append([identifier, 'None', ''])
else:
data_list.append([identifier, '', ''])
tags_list.append('')
print(e, file=sys.stderr)
has_exception = True
if quiet:
for tags in tags_list:
print(tags)
else:
utils.tabulate_result(data_list, headers)
if has_exception:
sys.exit(EXCEPTION_EXIT_CODE)
def main():
'''
@summary:
---------
@param :
---------
@result:
'''
clues_json = get_clues()
clues_count = len(clues_json['data'])
clues_json = tools.dumps_json(clues_json)
print(clues_json)
# save_clues_to_file(clues_json)
keys = 'pattek.com.cn'
prpcrypt = Prpcrypt(keys)
encrypt_text = prpcrypt.encrypt(clues_json)
data = {'info':encrypt_text}
# ?????
url = 'http://192.168.60.38:8002/datasync_al/interface/cluesConfSync?'
json = tools.get_json_by_requests(url, data = data)
# ??????
result = record_sync_status(clues_count, json.get("status"), json.get('message'), json.get('data'), 0)
print(result)
log.debug('''
------ ??????? -----
%s
?????? %d
'''%(json, result))
# ?????
url = 'http://124.205.229.232:8005/gdyq/datasync_al/interface/cluesConfSync'
json = tools.get_json_by_requests(url, 1)
log.debug('''
------ ??????? -----
%s
?????? %d
'''%(json, result))
def Feeds(self):
"""List GitHub's timeline resources in Atom format.
:returns: dictionary parsed to include URITemplates
"""
def replace_href(Feed_dict):
if not Feed_dict:
return Feed_dict
ret_dict = {}
# Let's pluck out what we're most interested in,the href value
href = Feed_dict.pop('href', None)
# Then we update the return dictionary with the rest of the values
ret_dict.update(Feed_dict)
if href is not None:
# So long as there is something to template,let's template it
ret_dict['href'] = URITemplate(href)
return ret_dict
url = self._build_url('Feeds')
json = self._json(self._get(url), include_cache_info=False)
if json is None: # If something went wrong,get out early
return None
# We have a response body to parse
Feeds = {}
# Let's pop out the old links so we don't have to skip them below
old_links = json.pop('_links', {})
_links = {}
# If _links is in the response JSON,iterate over that and recreate it
# so that any templates contained inside can be turned into
# URITemplates
for key, value in old_links.items():
if isinstance(value, list):
# If it's an array/list of links,let's replace that with a
# new list of links
_links[key] = [replace_href(d) for d in value]
else:
# Otherwise,just use the new value
_links[key] = replace_href(value)
# Start building up our return dictionary
Feeds['_links'] = _links
for key, value in json.items():
# This should roughly be the same logic as above.
if isinstance(value, list):
Feeds[key] = [URITemplate(v) for v in value]
else:
Feeds[key] = URITemplate(value)
return Feeds
def parse_spec(spec):
"""Parse a package name and version spec as conda would.
Returns:
``ParsedSpec`` or None on failure
"""
if not is_string(spec):
raise TypeError("Expected a string not %r" % spec)
m = _spec_pat.match(spec)
if m is None:
return None
name = m.group('name').lower()
pip_constraint = m.group('pc')
if pip_constraint is not None:
pip_constraint = pip_constraint.replace(' ', '')
conda_constraint = m.group('cc')
exact_version = None
exact_build_string = None
if conda_constraint is not None:
m = _conda_constraint_pat.match(conda_constraint)
assert m is not None
exact_version = m.group('version')
for special in ('|', '*','):
if special in exact_version:
exact_version = None
break
if exact_version is not None:
exact_build_string = m.group('build')
if exact_build_string is not None:
assert exact_build_string[0] == '='
exact_build_string = exact_build_string[1:]
return ParsedSpec(name=name,
conda_constraint=conda_constraint,
pip_constraint=pip_constraint,
exact_version=exact_version,
exact_build_string=exact_build_string)
# these are in order of preference. On pre-4.1.4 Windows,so we get to
# CONDA_DEFAULT_ENV.