Python pip 模块,get_installed_distributions() 实例源码
我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用pip.get_installed_distributions()。
def install_libraries(libraries):
"""
Install libraries that are not already installed.
Arguments:
libraries (iterable):
Returns:
None
"""
# Get currently installed libraries
libraries_installed = [lib.key for lib in get_installed_distributions()]
# Install libraries not found in the currently installed libraries
for lib in libraries:
if lib not in libraries_installed:
print('Installing {} ...'.format(lib))
main(['install', lib])
else:
print('{} is already installed.'.format(lib))
def collect_environment(self):
import socket
import os
import pip
import platform
env = {}
import aetros
env['aetros_version'] = aetros.__version__
env['python_version'] = platform.python_version()
env['python_executable'] = sys.executable
env['hostname'] = socket.gethostname()
env['variables'] = dict(os.environ)
if 'AETROS_SSH_KEY' in env['variables']: del env['variables']['AETROS_SSH_KEY']
if 'AETROS_SSH_KEY_BASE64' in env['variables']: del env['variables']['AETROS_SSH_KEY_BASE64']
env['pip_packages'] = sorted([[i.key, i.version] for i in pip.get_installed_distributions()])
self.set_system_info('environment', env)
def get_installed_packages(site_packages, site_packages_64):
"""
Returns a dict of installed packages that Zappa cares about.
"""
import pip # this is to avoid 'funkiness' with global import
package_to_keep = []
if os.path.isdir(site_packages):
package_to_keep += os.listdir(site_packages)
if os.path.isdir(site_packages_64):
package_to_keep += os.listdir(site_packages_64)
package_to_keep = [x.lower() for x in package_to_keep]
installed_packages = {package.project_name.lower(): package.version for package in
pip.get_installed_distributions()
if package.project_name.lower() in package_to_keep
or package.location in [site_packages, site_packages_64]}
return installed_packages
def get_package_tree(ignore_list=None, include_only=None):
"""Returns dependency package tree
:param ignore_list: list of dependencies to exclude from tree
:param include_only: list of dependencies to include if
:return: dictionary of top level packages with their dependencies
"""
ignore_list = [i.lower() for i in ignore_list] if ignore_list else []
include_only = [i.lower() for i in include_only] if include_only else []
packages = [
package for package in
pip.get_installed_distributions()
if package.key not in ignore_list
]
# if include_only is set,remove other packages
if include_only:
packages = [
package for package in packages
if package.key in include_only
]
dist_index = pipdeptree.build_dist_index(pkgs=packages)
tree = pipdeptree.construct_tree(index=dist_index)
return tree
def get_remote_installed_packages(ip_address):
'''
This method queries a remote python installation about the installed packages.
All necessary information is extracted from ~/.artemisrc
:param address: Ip address of Remote Server
:return:
'''
python_executable = get_artemis_config_value(section=ip_address, option="python")
function = "%s -c 'import pip; import json; print json.dumps({i.key: i.version for i in pip.get_installed_distributions() })' "%python_executable
ssh_conn = get_ssh_connection(ip_address)
stdin , stdout, stderr = ssh_conn.exec_command(function)
err = stderr.read()
if err:
msg="Quering %s python installation at %s sent a message on stderr. If you are confident that the error can be ignored,catch this RuntimeError" \
"accordingly. The error is: %s"%(ip_address, python_executable, err)
raise RuntimeError(msg)
installed_packages = json.loads(stdout.read())
ssh_conn.close()
return installed_packages
def chill(show_all=False):
if show_all:
ignored_packages = ()
else:
ignored_packages = ('pip', 'pip-chill', 'wheel', 'setuptools',
'pkg-resources')
# Gather all packages that are requirements and will be auto-installed.
dependencies = set()
for distribution in pip.get_installed_distributions():
for requirement in distribution.requires():
dependencies.add(requirement.key)
# List all packages and versions installed,excluding the auto-installed.
return [
(distribution.key, distribution.version)
for distribution in pip.get_installed_distributions()
if distribution.key not in dependencies
and distribution.key not in ignored_packages
]
def _load():
global GPIO, _DRIVER, HAL_DRIVER_ID
if not _DRIVER:
installed_packages = pip.get_installed_distributions()
flat_installed_packages = [package.project_name for package in installed_packages]
kNown_drivers = [
("kervi-hal-win", "kervi.platforms.windows"),
("kervi-hal-linux", "kervi.platforms.linux"),
("kervi-hal-rpi", "kervi.platforms.raspBerry"),
("kervi-hal-generic", "kervi.platforms.generic")
]
for driver_name, module_name in kNown_drivers:
if driver_name in flat_installed_packages:
_DRIVER = importlib.import_module(module_name)
HAL_DRIVER_ID = module_name
GPIO = get_gpio()
return driver_name
def show_version(scoring_version):
''' Python version and library versions '''
swrite('\n=== VERSIONS ===\n\n')
# Scoring program version
swrite("Scoring program version: " + str(scoring_version) + "\n\n")
# Python version
swrite("Python version: " + version + "\n\n")
# Give information on the version installed
swrite("Versions of libraries installed:\n")
map(swrite, sorted(["%s==%s\n" % (i.key, i.version) for i in lib()]))
def show_version():
# Python version and library versions
swrite('\n=== VERSIONS ===\n\n')
# Python version
swrite("Python version: " + version + "\n\n")
# Give information on the version installed
swrite("Versions of libraries installed:\n")
map(swrite, i.version) for i in lib()]))
def check_requirements(self, show_warning=False):
req_path = self.get_requirements() or self.path
req_file = 'requirements.txt'
missing = []
try:
with open(os.path.join(req_path, req_file), 'r') as f:
import pip
installed_packages = [re.sub(r'-', '_', package.project_name.lower()) for package in pip.get_installed_distributions(local_only=True)]
for line in f.read().splitlines():
pkg = re.sub(r'-', re.sub(r'^([\w-]+).*$', r'\1', line).lower())
if not pkg in installed_packages:
missing.append(pkg)
if missing and install_requirements:
try:
action("Auto-installing missing Python modules...")
pquery(['pip', 'install', '-q', '-r', os.path.join(req_path, req_file)])
missing = []
except ProcessException:
warning("Unable to auto-install required Python modules.")
except (IOError, ImportError, OSError):
pass
if missing:
err = (
"-----------------------------------------------------------------\n"
"The mbed OS tools in this program require the following Python modules: %s\n"
"You can install all missing modules by running \"pip install -r %s\" in \"%s\"" % (','.join(missing), req_file, req_path))
if os.name == 'posix':
err += "\nOn Posix systems (Linux,Mac,etc) you might have to switch to superuser account or use \"sudo\""
if show_warning:
warning(err)
else:
error(err, 1)
# Routines after cloning mbed-os
def show_version(scoring_version):
''' Python version and library versions '''
swrite('\n=== VERSIONS ===\n\n')
# Scoring program version
swrite("Scoring program version: " + str(scoring_version) + "\n\n")
# Python version
swrite("Python version: " + version + "\n\n")
# Give information on the version installed
swrite("Versions of libraries installed:\n")
map(swrite, i.version) for i in lib()]))
def show_version():
# Python version and library versions
swrite('\n=== VERSIONS ===\n\n')
# Python version
swrite("Python version: " + version + "\n\n")
# Give information on the version installed
swrite("Versions of libraries installed:\n")
map(swrite, i.version) for i in lib()]))
def _verify_dependencies(self):
# These will always be initialized.
logging.up('Verifying dependencies')
installed_packages_list = sorted([i.key for i in pip.get_installed_distributions()])
for plugin_id, plugin_info in self.available_plugins.items():
if not plugin_info['_is_core']:
if 'dependencies' not in plugin_info:
continue
if 'os' in plugin_info['dependencies']:
if get_general_os() in plugin_info['dependencies']['os']:
plugin_info['dependencies'] = data_merge(plugin_info['dependencies'], plugin_info['dependencies']['os'][get_general_os()])
if 'plugin' in plugin_info['dependencies']:
for depend_name in plugin_info['dependencies']['plugin']:
installed = 'prism_' + depend_name in self.available_plugins
if not installed:
plugin_info['_is_satisfied'] = False
plugin_info['_dependencies'].append(('plugin', depend_name, installed))
if 'binary' in plugin_info['dependencies']:
for depend_name in plugin_info['dependencies']['binary']:
installed = is_package_installed(depend_name)
if not installed:
plugin_info['_is_satisfied'] = False
plugin_info['_dependencies'].append(('binary', installed))
if 'module' in plugin_info['dependencies']:
for depend_name in plugin_info['dependencies']['module']:
installed = (depend_name in installed_packages_list)
if not installed:
plugin_info['_is_satisfied'] = False
plugin_info['_dependencies'].append(('module', installed))
if not plugin_info['_is_satisfied']:
# Create a dummy plugin container
self._insert_dummy_plugin(plugin_info)
logging.error('Dependency unsatisfied. Offender: %s' % plugin_id)
logging.down()
def show_version(scoring_version):
''' Python version and library versions '''
swrite('\n=== VERSIONS ===\n\n')
# Scoring program version
swrite("Scoring program version: " + str(scoring_version) + "\n\n")
# Python version
swrite("Python version: " + version + "\n\n")
# Give information on the version installed
swrite("Versions of libraries installed:\n")
map(swrite, i.version) for i in lib()]))
def show_version():
# Python version and library versions
swrite('\n=== VERSIONS ===\n\n')
# Python version
swrite("Python version: " + version + "\n\n")
# Give information on the version installed
swrite("Versions of libraries installed:\n")
map(swrite, i.version) for i in lib()]))
def _requirements(self, package_name):
def _get_package(_package_name):
candidates = [p for p in pip.get_installed_distributions() if p.project_name == _package_name]
if not candidates:
raise ValueError('No package "{}"'.format(package_name))
return candidates[0]
package = _get_package(package_name)
result = set(name for name in package._get_Metadata("top_level.txt") if '/' not in name)
for requirement in package.requires():
result |= self._requirements(requirement.project_name)
return result
def setUp(self):
"""
Download 'nvme-cli'.
"""
self.device = self.params.get('device', default='/dev/nvme0')
self.disk = self.params.get('disk', default='/dev/nvme0n1')
cmd = 'ls %s' % self.device
if process.system(cmd, ignore_status=True) is not 0:
self.skip("%s does not exist" % self.device)
smm = SoftwareManager()
if not smm.check_installed("nvme-cli") and not \
smm.install("nvme-cli"):
self.skip('nvme-cli is needed for the test to be run')
python_packages = pip.get_installed_distributions()
python_packages_list = [i.key for i in python_packages]
python_pkgs = ['nose', 'nose2', 'pep8', 'Flake8', 'pylint', 'epydoc']
for py_pkg in python_pkgs:
if py_pkg not in python_packages_list:
self.skip("python package %s not installed" % py_pkg)
url = 'https://codeload.github.com/linux-nvme/nvme-cli/zip/master'
tarball = self.fetch_asset("nvme-cli-master.zip", locations=[url],
expire='7d')
archive.extract(tarball, self.teststmpdir)
self.nvme_dir = os.path.join(self.teststmpdir, "nvme-cli-master")
print os.listdir(self.nvme_dir)
os.chdir(os.path.join(self.nvme_dir, 'tests'))
msg = ['{']
msg.append(' \"controller\": \"%s\",' % self.device)
msg.append(' \"ns1\": \"%s\",' % self.disk)
msg.append(' \"log_dir\": \"%s\"' % self.outputdir)
msg.append('}')
with open('config.json', 'w') as config_file:
config_file.write("\n".join(msg))
process.system("cat config.json")
def get_function_source(func):
"""
Determine the source file of a function
Parameters
----------
func : function
Returns
-------
str
the module name
list of str
a list of filenames necessary to be copied
"""
installed_packages = pip.get_installed_distributions()
inpip = func.__module__.split('.')[0] in [p.key for p in installed_packages]
insubdir = os.path.realpath(
func.__code__.co_filename).startswith(os.path.realpath(os.getcwd()))
is_local = not inpip and insubdir
if not is_local:
return func.__module__, []
else:
return func.__module__.split('.')[-1], \
[os.path.realpath(func.__code__.co_filename)]
def check_dependency(self):
list_deps = []
missing_deps = []
with open('requirements.txt') as f:
list_deps = f.read().splitlines()
pip_list = sorted([(i.key) for i in pip.get_installed_distributions()])
for req_dep in list_deps:
if req_dep not in pip_list:
# Why this package is not in get_installed_distributions ?
if str(req_dep) == "argparse":
pass
else:
missing_deps.append(req_dep)
if missing_deps:
missing_deps_warning ="""
You are missing a module required for Belati. In order to continue using Belati,please install them with:
{}`pip install --upgrade --force-reinstall -r requirements.txt`{}
or manually install missing modules with:
{}`pip install --upgrade --force-reinstall {}`{}
"""
log.console_log(missing_deps_warning.format(Y, W, Y, ' '.join(missing_deps), W))
sys.exit()
def installed_python_packages():
"""
This function ...
:return:
"""
# Initialize dictionary to contain the package names and version numbers
packages = dict()
# Get all python distributions
distributions = pip.get_installed_distributions()
# Loop over the distributions
for distribution in distributions:
# Get name and version
top_level_Meta_data = list(distribution._get_Metadata('top_level.txt'))
import_name = top_level_Meta_data[0] if len(top_level_Meta_data) > 0 else distribution.project_name
version = str(distribution.parsed_version)
# possible other interesting properties of an entry in the distributions list:
# .egg_name()
# .as_requirement()
# .parsed_version
# .has_version()
# .project_name
# .py_version
# .requires()
# Add entry to the dictionary
packages[import_name] = version
# Return the dictionary
return packages
# -----------------------------------------------------------------
def installed_python_packages():
"""
This function ...
:return:
"""
# Initialize dictionary to contain the package names and version numbers
packages = dict()
# Get all python distributions
distributions = pip.get_installed_distributions()
# Loop over the distributions
for distribution in distributions:
# Get name and version
top_level_Meta_data = list(distribution._get_Metadata('top_level.txt'))
import_name = top_level_Meta_data[0] if len(top_level_Meta_data) > 0 else distribution.project_name
version = str(distribution.parsed_version)
# possible other interesting properties of an entry in the distributions list:
# .egg_name()
# .as_requirement()
# .parsed_version
# .has_version()
# .project_name
# .py_version
# .requires()
# Add entry to the dictionary
packages[import_name] = version
# Return the dictionary
return packages
# -----------------------------------------------------------------
def is_gpu():
packages = [str(i) for i in pip.get_installed_distributions()]
for item in packages:
if "tensorflow-gpu" in item:
return True
return False
def get_deps_list(self, pkg_name, installed_distros=None):
"""
For a given package,returns a list of required packages. Recursive.
"""
import pip
deps = []
if not installed_distros:
installed_distros = pip.get_installed_distributions()
for package in installed_distros:
if package.project_name.lower() == pkg_name.lower():
deps = [(package.project_name, package.version)]
for req in package.requires():
deps += self.get_deps_list(pkg_name=req.project_name, installed_distros=installed_distros)
return list(set(deps)) # de-dupe before returning
def check(key, db, json, full_report, bare, stdin, files, cache, ignore):
if files and stdin:
click.secho("Can't read from --stdin and --file at the same time,exiting", fg="red")
sys.exit(-1)
if files:
packages = list(itertools.chain.from_iterable(read_requirements(f, resolve=True) for f in files))
elif stdin:
packages = list(read_requirements(sys.stdin))
else:
packages = pip.get_installed_distributions()
try:
vulns = safety.check(packages=packages, key=key, db_mirror=db, cached=cache, ignore_ids=ignore)
click.secho(report(
vulns=vulns,
full=full_report,
json_report=json,
bare_report=bare,
checked_packages=len(packages),
db=db,
key=key
)
)
sys.exit(-1 if vulns else 0)
except InvalidKeyError:
click.secho("Your API Key '{key}' is invalid. See {link}".format(
key=key, link='https://goo.gl/O7Y1rS'),
fg="red")
sys.exit(-1)
except DatabaseFileNotFoundError:
click.secho("Unable to load vulnerability database from {db}".format(db=db), fg="red")
sys.exit(-1)
except DatabaseFetchError:
click.secho("Unable to load vulnerability database", fg="red")
sys.exit(-1)
def get_package_locations():
"""
Get the paths of directories where 3rd packages are installed.
:returns: a list of absolute paths
"""
pkg_locations = []
installed_pkgs = get_installed_distributions(local_only=True,
include_editables=False)
for pkg in installed_pkgs:
if pkg.location not in pkg_locations:
pkg_locations.append(pkg.location)
return pkg_locations
def package_installed(name, version=None):
import pip
for i in pip.get_installed_distributions():
if name.lower() == i.key.lower() and \
(version is None or version == i.version):
return True
return False
def package_list(include_version=False):
"""
Return
------
['odin','lasagne','keras',...] if include_version is False
else ['odin==8.12','lasagne==25.18',...]
"""
all_packages = []
import pip
for i in pip.get_installed_distributions():
all_packages.append(i.key +
(('==' + i.version) if include_version is True else ''))
return all_packages
def _check_package_available(name):
for i in pip.get_installed_distributions():
if name.lower() == i.key.lower():
return True
return False
def _get_version_info():
from pip import get_installed_distributions
installed_dists = get_installed_distributions(local_only=True)
component_version_info = sorted([{'name': dist.key.replace(COMPONENT_PREFIX, ''),
'version': dist.version}
for dist in installed_dists
if dist.key.startswith(COMPONENT_PREFIX)],
key=lambda x: x['name'])
return str(component_version_info), sys.version
def show_version_info_exit(out_file):
import platform
from pip import get_installed_distributions
from azure.cli.core.extension import get_extensions, EXTENSIONS_DIR
installed_dists = get_installed_distributions(local_only=True)
cli_info = None
for dist in installed_dists:
if dist.key == CLI_PACKAGE_NAME:
cli_info = {'name': dist.key, 'version': dist.version}
break
if cli_info:
print('{} ({})'.format(cli_info['name'], cli_info['version']), file=out_file)
component_version_info = sorted([{'name': dist.key.replace(COMPONENT_PREFIX,
key=lambda x: x['name'])
print(file=out_file)
print('\n'.join(['{} ({})'.format(c['name'], c['version']) for c in component_version_info]),
file=out_file)
print(file=out_file)
extensions = get_extensions()
if extensions:
print('Extensions:', file=out_file)
print('\n'.join(['{} ({})'.format(c.name, c.version) for c in extensions]),
file=out_file)
print(file=out_file)
print("Python location '{}'".format(sys.executable), file=out_file)
print("Extensions directory '{}'".format(EXTENSIONS_DIR), file=out_file)
print(file=out_file)
print('Python ({}) {}'.format(platform.system(), sys.version), file=out_file)
print(file=out_file)
print('Legal docs and information: aka.ms/AzureCliLegal', file=out_file)
print(file=out_file)
sys.exit(0)
def list_package_versions():
"""
Returns a dict of installed pip packages
{'package_name': "django",'package_version': "1.8.18"}
"""
installed_packages = pip.get_installed_distributions()
results = [{"package_name": i.key, "package_version": i.version} for i in installed_packages]
return sorted(results, key=itemgetter('package_name'))
def get_pip_packages_csv(writer):
"""
Takes a csv writer and writes installed pip packages to it.
"""
installed_packages = pip.get_installed_distributions()
for i in installed_packages:
writer.writerow([i.key, i.version])
return writer
def main():
pkgs = [{'name': package.project_name,
'version': package.version,
"source": "pip", "location": package.location} for package in pip.get_installed_distributions()]
# If location has anaconda,change source name
for item in pkgs:
if 'anaconda' in item["location"] or 'miniconda' in item["location"]:
item["source"] = "conda"
else:
pass
# Return for node/angular process
print(json.dumps(pkgs))
def help_environment():
cmddir = os.path.dirname(os.path.abspath(sys.executable))+os.sep
info = Options()
#
info.python = Options()
info.python.version = '%d.%d.%d' % sys.version_info[:3]
info.python.executable = sys.executable
info.python.platform = sys.platform
try:
packages = []
import pip
for package in pip.get_installed_distributions():
packages.append( Options(name=package.project_name, version=package.version) )
info.python.packages = packages
except:
pass
#
info.environment = Options()
path = os.environ.get('PATH', None)
if not path is None:
info.environment['shell path'] = path.split(os.pathsep)
info.environment['python path'] = sys.path
#
print('#')
print('# information About the Python and Shell Environment')
print('#')
print(str(info))
def ifninstall(pkg_name):
"""pip install language models used by spacy."""
egg = '#egg='
installed_pkg = [i.project_name for i in pip.get_installed_distributions()]
egg_name = (egg in pkg_name) and pkg_name.split(egg)[1].replace('_', '-')
if pkg_name and egg_name not in installed_pkg:
pip.main(['install', pkg_name])
def _collect_Meta_info(self):
"""Record environment information."""
os.environ["BIOMASS_REACTIONS"] = "|".join([
rxn.id for rxn in find_biomass_reaction(self._model)])
self._Meta["platform"] = platform.system()
self._Meta["release"] = platform.release()
self._Meta["python"] = platform.python_version()
self._Meta["packages"] = dict(
(dist.project_name, dist.version) for dist in
pip.get_installed_distributions())
self._Meta["timestamp"] = datetime.utcNow().isoformat(" ")
if self.repo is not None:
self._collect_git_info()
def get_pkg_info():
"""Return Python package information as a dict."""
# Todo: Basically copying the requirements from setup.py is brittle,
# should come up with a better way in future,for example,
# using requirements files that can be read in.
dependencies = frozenset(PKG_ORDER)
blob = dict()
for dist in pip.get_installed_distributions():
if dist.project_name in dependencies:
blob[dist.project_name] = dist.version
return blob
def load_project(self):
"""(Re)loads the project. Returns True if all plugins are satisfied."""
with open(join(self.path, "config.yaml"), "rb") as f:
config = yaml.load(f)
self.__plugins = defaultdict(list, config.pop('plugins', {}) or {})
self.__python_dependencies = config.pop('dependencies', []) or []
self.__version = config.pop('version', None)
self.__author = config.pop('author', '')
self.__name = config.pop('name', None) or basename(normpath(self.path))
if self.parent_project is None:
# Get plugins from plugins.
for entry in os.scandir(join(self.path, "plugins")):
if entry.is_dir():
try:
# Load in plugin project
d_project = UjProject(entry.path, self)
for name, sources in d_project.plugins.items():
for source in sources:
if source not in self.plugins[name]:
self.plugins[name].append(source)
for pd in d_project.python_dependencies:
if pd not in self.python_dependencies:
self.python_dependencies.append(pd)
except InvalidUjProjectError:
pass
# Print warning for missing python plugins
installed = [i.key for i in pip.get_installed_distributions()]
for package in self.python_dependencies:
if package not in installed:
self.print("WARNING: Python package dependency '{}' missing.".format(package))
# Check for missing plugins
self.create_symlinks()
if self.parent_project is None:
plugin_symlinks = join(self.path, '.uj', 'plugin_symlinks')
else:
plugin_symlinks = join(self.parent_project.path, 'plugin_symlinks')
# Check if all plugins have been loaded. Any newly added plugins,might have unsatisfied plugins.
# So you might want to run this function again.
for name in self.plugins:
if not islink(join(plugin_symlinks, name)):
self.__plugins_updated = False
return False
self.__plugins_updated = True
return True
def check_online(self,
pkg_name_inp_lst=[],
pkg_name_toignore_lst=[],
pkg_name_toprioritize_lst=[]):
logger_debug = self.logger.debug
pkg_name_cur_lst = []
pkg_cur_lst = pip.get_installed_distributions()
logger_debug(
'--- --- --- --- --- PACKAGE CURRENTLY INSTALLED --- --- --- --- ---')
logger_debug(sorted(['{}=={}'.format(i.key, i.version)
for i in pkg_cur_lst]))
for pkg in pkg_cur_lst:
pkg_name_cur_lst.append(pkg.project_name.lower())
for pkg_name in pkg_name_toignore_lst:
if pkg_name in pkg_name_cur_lst:
del pkg_name_cur_lst[pkg_name_cur_lst.index(pkg_name)]
for pkg_name in pkg_name_toprioritize_lst:
if pkg_name in pkg_name_cur_lst:
pkg_name_cur_lst.insert(0, pkg_name_cur_lst.pop(
pkg_name_cur_lst.index(pkg_name)))
if pkg_name_inp_lst:
pkg_name_toupd_lst = []
pkg_name_toins_lst = []
for pkg_name in pkg_name_cur_lst:
if pkg_name in pkg_name_inp_lst:
pkg_name_toupd_lst.append(pkg_name)
else:
pkg_name_toins_lst.append(pkg_name)
else:
pkg_name_toupd_lst = list(pkg_name_cur_lst)
pkg_name_toins_lst = []
if pkg_name_toupd_lst:
logger_debug(
'--- --- --- --- --- PACKAGE TO UPDATE --- --- --- --- ---')
logger_debug(sorted(['{}'.format(i)
for i in pkg_name_toupd_lst]))
self.pkg_upd_online(pkg_name_toupd_lst)
if pkg_name_toins_lst:
logger_debug(
'--- --- --- --- --- PACKAGE TO INSTALL --- --- --- --- ---')
logger_debug(sorted(['{}'.format(i)
for i in pkg_name_toins_lst]))
self.pkg_ins_online(pkg_name_toins_lst)
def main():
parser = get_parser()
args = parser.parse_args()
pkgs = pip.get_installed_distributions(local_only=args.local_only,
user_only=args.user_only)
dist_index = build_dist_index(pkgs)
tree = construct_tree(dist_index)
if args.json:
print(jsonify_tree(tree, indent=4))
return 0
elif args.output_format:
output = dump_graphviz(tree, output_format=args.output_format)
print_graphviz(output)
return 0
return_code = 0
# show warnings about possibly conflicting deps if found and
# warnings are enabled
if args.warn != 'silence':
conflicting = conflicting_deps(tree)
if conflicting:
print('Warning!!! Possibly conflicting dependencies found:',
file=sys.stderr)
for p, reqs in conflicting.items():
pkg = p.render_as_root(False)
print('* {}'.format(pkg), file=sys.stderr)
for req in reqs:
req_str = req.render_as_branch(False)
print(' - {}'.format(req_str), file=sys.stderr)
print('-'*72, file=sys.stderr)
cyclic = cyclic_deps(tree)
if cyclic:
print('Warning!! Cyclic dependencies found:', file=sys.stderr)
for a, b, c in cyclic:
print('* {0} => {1} => {2}'.format(a.project_name,
b.project_name,
c.project_name),
file=sys.stderr)
print('-'*72, file=sys.stderr)
if args.warn == 'fail' and (conflicting or cyclic):
return_code = 1
show_only = set(args.packages.split(',')) if args.packages else None
tree = render_tree(tree if not args.reverse else reverse_tree(tree),
list_all=args.all, show_only=show_only,
frozen=args.freeze)
print(tree)
return return_code
def cli(dry_run, force, find_links, index_url, extra_index_url, no_index, quiet, src_files):
"""Synchronize virtual environment with requirements.txt."""
if not src_files:
if os.path.exists(DEFAULT_REQUIREMENTS_FILE):
src_files = (DEFAULT_REQUIREMENTS_FILE,)
else:
msg = 'No requirement files given and no {} found in the current directory'
log.error(msg.format(DEFAULT_REQUIREMENTS_FILE))
sys.exit(2)
if any(src_file.endswith('.in') for src_file in src_files):
msg = ('Some input files have the .in extension,which is most likely an error and can '
'cause weird behavIoUr. You probably meant to use the corresponding *.txt file?')
if force:
log.warning('WARNING: ' + msg)
else:
log.error('ERROR: ' + msg)
sys.exit(2)
requirements = flat_map(lambda src: pip.req.parse_requirements(src, session=True),
src_files)
try:
requirements = sync.merge(requirements, ignore_conflicts=force)
except PipToolsError as e:
log.error(str(e))
sys.exit(2)
installed_dists = pip.get_installed_distributions(skip=[])
to_install, to_uninstall = sync.diff(requirements, installed_dists)
install_flags = []
for link in find_links or []:
install_flags.extend(['-f', link])
if no_index:
install_flags.append('--no-index')
if index_url:
install_flags.extend(['-i', index_url])
if extra_index_url:
for extra_index in extra_index_url:
install_flags.extend(['--extra-index-url', extra_index])
sys.exit(sync.sync(to_install, to_uninstall, verbose=(not quiet), dry_run=dry_run,
install_flags=install_flags))