Python pkg_resources 模块,resource_filename() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pkg_resources.resource_filename()。
def prepare_zip():
from pkg_resources import resource_filename as resource
from config import config
from json import dumps
logger.info('creating/updating gimel.zip')
with ZipFile('gimel.zip', 'w', ZIP_DEFLATED) as zipf:
info = ZipInfo('config.json')
info.external_attr = 0o664 << 16
zipf.writestr(info, dumps(config))
zipf.write(resource('gimel', 'config.py'), 'config.py')
zipf.write(resource('gimel', 'gimel.py'), 'gimel.py')
zipf.write(resource('gimel', 'logger.py'), 'logger.py')
for root, dirs, files in os.walk(resource('gimel', 'vendor')):
for file in files:
real_file = os.path.join(root, file)
relative_file = os.path.relpath(real_file,
resource('gimel', ''))
zipf.write(real_file, relative_file)
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
return json.loads(open(filename, 'rb').read().decode('utf-8'))
pyMeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/Metadata.json'):
pyMeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pyMeta, pyMeta_schema)
valid += 1
assert valid > 0, "No Metadata.json found"
def create_params_file(self, fname):
msg = QMessageBox()
msg.setIcon(QMessageBox.Question)
msg.setText("Parameter file %r not found,do you want SpyKING CIRCUS to "
"create it for you?" % fname)
msg.setwindowTitle("Generate parameter file?")
msg.setinformativeText("This will create a parameter file from a "
"template file and open it in your system's "
"standard text editor. Fill properly before "
"launching the code. See the documentation "
"for details")
msg.setStandardButtons(QMessageBox.Yes | QMessageBox.No)
answer = msg.exec_()
if answer == QMessageBox.Yes:
user_path = os.path.join(os.path.expanduser('~'), 'spyking-circus')
if os.path.exists(user_path + 'config.params'):
config_file = os.path.abspath(user_path + 'config.params')
else:
config_file = os.path.abspath(
pkg_resources.resource_filename('circus', 'config.params'))
shutil.copyfile(config_file, fname)
self.params = fname
self.last_log_file = fname.replace('.params', '.log')
self.update_params()
def init_gui_layout(self):
gui_fname = pkg_resources.resource_filename('circus',
os.path.join('qt_GUI',
'qt_merge.ui'))
if comm.rank == 0:
self.ui = uic.loadUi(gui_fname, self)
# print dir(self.ui)
self.score_ax1 = self.ui.score_1.axes
self.score_ax2 = self.ui.score_2.axes
self.score_ax3 = self.ui.score_3.axes
self.waveforms_ax = self.ui.waveforms.axes
self.detail_ax = self.ui.detail.axes
self.data_ax = self.ui.data_overview.axes
self.current_order = self.ui.cmb_sorting.currentIndex()
self.mpl_toolbar = NavigationToolbar(self.ui.waveforms, None)
self.mpl_toolbar.pan()
self.ui.show()
else:
self.ui = None
def init_gui_layout(self):
gui_fname = pkg_resources.resource_filename('circus',
'qt_preview.ui'))
self.ui = uic.loadUi(gui_fname, self)
self.electrode_ax = self.ui.electrodes.axes
self.data_x = self.ui.raw_data.axes
if self.show_fit:
self.ui.btn_next.clicked.connect(self.increase_time)
self.ui.btn_prev.clicked.connect(self.decrease_time)
else:
self.ui.btn_next.setVisible(False)
self.ui.btn_prev.setVisible(False)
# Toolbar will not be displayed
self.mpl_toolbar = NavigationToolbar(self.ui.raw_data, None)
self.mpl_toolbar.pan()
self.ui.show()
def generate_pdf(card):
"""
Make a PDF from a card
:param card: dict from fetcher.py
:return: Binary PDF buffer
"""
from eclaire.base import SPECIAL_LABELS
pdf = FPDF('L', 'mm', (62, 140))
pdf.set_margins(2.8, 2.8, 2.8)
pdf.set_auto_page_break(False, margin=0)
pdf.add_page()
font = pkg_resources.resource_filename('eclaire', 'font/Clairifont.ttf')
pdf.add_font('Clairifont', fname=font, uni=True)
pdf.set_font('Clairifont', size=48)
pdf.multi_cell(0, 18, txt=card.name.upper(), align='L')
qrcode = generate_qr_code(card.url)
qrcode_file = mktemp(suffix='.png', prefix='trello_qr_')
qrcode.save(qrcode_file)
pdf.image(qrcode_file, 118, 35, 20, 20)
os.unlink(qrcode_file)
# May we never speak of this again.
pdf.set_fill_color(255, 255, 255)
pdf.rect(0, 55, 140, 'F')
pdf.set_font('Clairifont', '', 16)
pdf.set_y(-4)
labels = ','.join([label.name for label in card.labels
if label.name not in SPECIAL_LABELS])
pdf.multi_cell(0, 0, labels, 'R')
return pdf.output(dest='S')
def test_installation():
'''
Test the installation
'''
import pkg_resources
PYNUFFT_PATH = pkg_resources.resource_filename('pynufft', './')
DATA_PATH = pkg_resources.resource_filename('pynufft', 'src/data/')
import os.path
print('Does pynufft.py exist? ',os.path.isfile(PYNUFFT_PATH+'pynufft.py'))
print('Does om1D.npz exist?',os.path.isfile(DATA_PATH+'om1D.npz'))
print('Does om2D.npz exist?',os.path.isfile(DATA_PATH+'om2D.npz'))
print('Does om3D.npz exist?',os.path.isfile(DATA_PATH+'om3D.npz'))
print('Does phantom_3D_128_128_128.npz exist?', os.path.isfile(DATA_PATH+'phantom_3D_128_128_128.npz'))
print('Does phantom_256_256.npz exist?', os.path.isfile(DATA_PATH+'phantom_256_256.npz'))
print('Does example_1D.py exist?', os.path.isfile(PYNUFFT_PATH+'./tests/example_1D.py'))
print('Does example_2D.py exist?', os.path.isfile(PYNUFFT_PATH+'./tests/example_2D.py'))
for pkgname in ('reikna', 'pyopencl', 'pycuda'):
error_code = test_pkg(pkgname)
if 1 == error_code:
break
def test_installation():
'''
Test the installation
'''
import pkg_resources
PYNUFFT_PATH = pkg_resources.resource_filename('pynufft', os.path.isfile(DATA_PATH+'phantom_256_256.npz'))
print('Does 1D_example.py exist?', os.path.isfile(PYNUFFT_PATH+'example/1D_example.py'))
print('Does 2D_example.py exist?', os.path.isfile(PYNUFFT_PATH+'example/1D_example.py'))
def _transfer_ping_script(self, ssh):
"""
Transfert vping script to VM.
Uses SCP to copy the ping script via the SSH client
:param ssh: the SSH client
:return:
"""
self.logger.info("Trying to transfer ping.sh")
scp = SCPClient(ssh.get_transport())
ping_script = pkg_resources.resource_filename(
'functest.opnfv_tests.openstack.vping', 'ping.sh')
try:
scp.put(ping_script, "~/")
except Exception: # pylint: disable=broad-except
self.logger.error("Cannot SCP the file '%s'", ping_script)
return False
cmd = 'chmod 755 ~/ping.sh'
# pylint: disable=unused-variable
(stdin, stdout, stderr) = ssh.exec_command(cmd)
for line in stdout.readlines():
print line
return True
def __init__(self):
"""Initialize helper object."""
self.functest_test = pkg_resources.resource_filename(
'functest', 'opnfv_tests')
self.conf_path = pkg_resources.resource_filename(
'functest',
'opnfv_tests/openstack/refstack_client/refstack_tempest.conf')
self.defcore_list = pkg_resources.resource_filename(
'functest', 'opnfv_tests/openstack/refstack_client/defcore.txt')
self.confpath = os.path.join(self.functest_test,
self.conf_path)
self.defcorelist = os.path.join(self.functest_test,
self.defcore_list)
self.parser = argparse.ArgumentParser()
self.parser.add_argument(
'-c', '--config',
help='the file path of refstack_tempest.conf',
default=self.confpath)
self.parser.add_argument(
'-t', '--testlist',
help='Specify the file path or URL of a test list text file. '
'This test list will contain specific test cases that '
'should be tested.',
default=self.defcorelist)
def test_resources():
# loading by resource
cls = bob.bio.base.load_resource("pca", "algorithm")
assert isinstance (cls, bob.bio.base.algorithm.PCA)
# loading by configuration file
cls = bob.bio.base.load_resource(pkg_resources.resource_filename("bob.bio.base.config.algorithm", "pca.py"), bob.bio.base.algorithm.PCA)
# loading by instatiation
cls = bob.bio.base.load_resource("bob.bio.base.algorithm.PCA(10,distance_function=scipy.spatial.distance.euclidean)", "algorithm", imports=['bob.bio.base', 'scipy.spatial'])
assert isinstance (cls, bob.bio.base.algorithm.PCA)
# get list of extensions
extensions = bob.bio.base.extensions()
assert isinstance(extensions, list)
assert 'bob.bio.base' in extensions
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
with open(filename, 'rb') as json_file:
return json.loads(json_file.read().decode('utf-8'))
pyMeta_schema = open_json(resource_filename('wheel.test', "No Metadata.json found"
def setUp(self):
file_path = resource_filename(Requirement.parse('search_google'), 'search_google/config.json')
with open(file_path, 'r') as in_file:
defaults = json.load(in_file)
buildargs = {
'serviceName': 'customsearch',
'version': 'v1',
'developerKey': defaults['build_developerKey']
}
cseargs = {
'q': 'google',
'num': 1,
'fileType': 'png',
'cx': defaults['cx']
}
self.results = search_google.api.results(buildargs, cseargs)
tempfile = TemporaryFile()
self.tempfile = str(tempfile.name)
tempfile.close()
self.tempdir = str(TemporaryDirectory().name)
def register_adapters():
global adapters_registered
if adapters_registered is True:
return
try:
import pkg_resources
packageDir = pkg_resources.resource_filename('pyamf', 'adapters')
except:
packageDir = os.path.dirname(__file__)
for f in glob.glob(os.path.join(packageDir, '*.py')):
mod = os.path.basename(f).split(os.path.extsep, 1)[0]
if mod == '__init__' or not mod.startswith('_'):
continue
try:
register_adapter(mod[1:].replace('_', '.'), PackageImporter(mod))
except ImportError:
pass
adapters_registered = True
def _instantiate(self, rsc):
# First,load the pump
with open(resource_filename(__name__,
os.path.join(rsc, 'pump.pkl')),
'rb') as fd:
self.pump = pickle.load(fd)
# Now load the model
with open(resource_filename(__name__, 'model_spec.pkl')),
'rb') as fd:
spec = pickle.load(fd)
self.model = keras.models.model_from_config(spec)
# And the model weights
self.model.load_weights(resource_filename(__name__,
os.path.join(rsc,
'model.h5')))
# And the version number
with open(resource_filename(__name__, 'version.txt')),
'r') as fd:
self.version = fd.read().strip()
def populate_table(table, data_file, print_msg):
"""Method to populate a table with records stored in a dict loaded from a json file
Args:
table(api.aws.DynamoTable): the table to write to
data_json(dict): the data to write. Should be an array in an object named 'data'
Returns:
None
"""
with open(os.path.join(resource_filename("manage", "data"), data_file), 'rt') as df:
data = json.load(df)
if len(data["data"]) == 1:
# Assume this is a campaign file for Now.
print(" - Example campaign loaded: https://<your_stack>/campaign.html?id={}".format(data["data"][0]["campaign_id"]))
for item in data["data"]:
table.put_item(item)
print(print_msg)
def get_code(self):
"""Zip up the code and return bytes
Returns:
bytes
"""
with open(tempfile.NamedTemporaryFile().name, 'w') as zf:
zfh = zipfile.ZipFile(zf.name, mode='w')
old_path = os.getcwd()
os.chdir(os.path.join(resource_filename("manage", "configs")))
zfh.write("url_rewrite.js")
zfh.close()
zf.close()
os.chdir(old_path)
with open(zf.name, "rb") as zfr:
return zfr.read()
def main():
insurance_lib_data_dir = resource_filename('resources', 'insurance_lib_v2')
print('Using data from {}'.format(insurance_lib_data_dir))
# Either re-use an existing collection id by over riding the below,or leave as is to create one
collection_id = "TestCollection-InsLibV2"
discovery = discoveryProxy()
collection_id = discovery.setup_collection(collection_id=collection_id,
config_id="889a08c9-cad9-4287-a87d-2f0380363bff")
discovery.print_collection_stats(collection_id)
# This thing seems to misbehave when run from python notebooks due to its use of multiprocessing,so just
# running in a script
discovery.upload_documents(collection_id=collection_id,
corpus=document_corpus_as_iterable(
path.join(insurance_lib_data_dir, 'document_corpus.solr.xml')))
discovery.print_collection_stats(collection_id)
def test_cached_phrases_no_files(self,
corpus_base_path,
doc_content_stream):
from eea.corpus.processing.phrases.process import cached_phrases
from pkg_resources import resource_filename
base_path = resource_filename('eea.corpus', 'tests/fixtures/')
corpus_base_path.return_value = base_path
# we want the B.phras.* files in fixtures
env = {'phash_id': 'X', 'file_name': 'ignore'}
settings = {}
stream = cached_phrases(doc_content_stream, env, settings)
with pytest.raises(stopiteration):
next(stream)
def test_cached_phrases_cached_files(self,
corpus_base_path,
doc_content_stream):
# Todo: this test should be improved. Text quality should be tested
from eea.corpus.processing.phrases.process import cached_phrases
from pkg_resources import resource_filename
base_path = resource_filename('eea.corpus', 'tests/fixtures/')
corpus_base_path.return_value = base_path
# we want the B.phras.* files in fixtures
env = {'phash_id': 'B', settings)
doc = next(stream)
assert 'water_stress_conditions' in doc.text
assert 'positive_development' in doc.text
def test_preview_phrases_with_cache_files(self, corpus_base_path):
from eea.corpus.processing.phrases.process import preview_phrases
from pkg_resources import resource_filename
base_path = resource_filename('eea.corpus', 'tests/fixtures/')
corpus_base_path.return_value = base_path
content = ['hello', 'world']
env = {
'file_name': 'x.csv',
'text_column': 'text',
'phash_id': 'B',
}
stream = preview_phrases(content, {})
assert list(stream) == []
def test_preview_phrases_nocache_files_with_job(self,
corpus_base_path,
get_assigned_job):
from eea.corpus.processing.phrases.process import preview_phrases
from pkg_resources import resource_filename
get_assigned_job.return_value = Mock(id='job1')
base_path = resource_filename('eea.corpus',
'phash_id': 'X', {})
assert list(stream) == ['hello', 'world']
def test_produce_phrases_with_no_job(self,
cached_phrases,
get_pipeline_for_component,
build_phrases
):
from eea.corpus.processing.phrases.process import produce_phrases
from pkg_resources import resource_filename
content = ['hello', 'world']
env = {'phash_id': 'X', 'file_name': 'x.csv', 'text_column': 'text'}
base_path = resource_filename('eea.corpus', 'tests/fixtures/')
corpus_base_path.return_value = base_path
cached_phrases.return_value = ['something', 'else']
stream = produce_phrases(content, {})
assert list(stream) == ['something', 'else']
assert corpus_base_path.call_count == 1
assert get_pipeline_for_component.call_count == 1
assert build_phrases.call_count == 1
assert cached_phrases.call_count == 1
def test_produce_phrases_with_ok_job(self,
build_phrases,
get_job_finish_status
):
from eea.corpus.processing.phrases.process import produce_phrases
from pkg_resources import resource_filename
content = ['hello', 'else']
get_job_finish_status.return_value = True
stream = produce_phrases(content, 'else']
assert corpus_base_path.call_count == 1
assert get_pipeline_for_component.call_count == 0
assert build_phrases.call_count == 0
assert cached_phrases.call_count == 1
def test_phrase_model_status(self, get_assigned_job):
from eea.corpus.processing.phrases.views import phrase_model_status
from pkg_resources import resource_filename
base_path = resource_filename('eea.corpus', 'tests/fixtures/')
o_st = phrase_model_status.__globals__['CORPUS_STORAGE']
phrase_model_status.__globals__['CORPUS_STORAGE'] = base_path
req = Mock(matchdict={'phash_id': 'A'})
assert phrase_model_status(req) == {'status': 'OK'}
get_assigned_job.return_value = None
req = Mock(matchdict={'phash_id': 'X'})
assert phrase_model_status(req) == {'status': 'unavailable'}
job = Mock()
get_assigned_job.return_value = job
job.get_status.return_value = '_job_status_here_'
assert phrase_model_status(req) == {'status':
'preview__job_status_here_'}
phrase_model_status.__globals__['CORPUS_STORAGE'] = o_st
def test_build_pipeline_for_preview(self, upload_location):
from eea.corpus.processing import build_pipeline
from pkg_resources import resource_filename
file_name = 'test.csv'
upload_location.return_value = resource_filename(
'eea.corpus', 'tests/fixtures/test.csv')
text_column = 'text'
pipeline = [
('eea_corpus_processing_limit_process', 'ABC', {'max_count': 2})
]
stream = build_pipeline(file_name, text_column, pipeline,
preview_mode=True)
docs = list(stream)
assert len(docs) == 2
def dashboard(global_config, **settings):
""" Wsgi entry point for the Flask app RQ Dashboard
"""
redis_uri = os.environ.get('REdis_URL', 'redis://localhost:6379/0')
p = parse.urlparse(redis_uri)
host, port = p.netloc.split(':')
db = len(p.path) > 1 and p.path[1:] or '0'
redis_settings = {
'REdis_URL': redis_uri,
'REdis_DB': db,
'REdis_HOST': host,
'REdis_PORT': port,
}
app = Flask(__name__,
static_url_path="/static",
static_folder=resource_filename("rq_dashboard", "static")
)
app.config.from_object(rq_dashboard.default_settings)
app.config.update(redis_settings)
app.register_blueprint(rq_dashboard.blueprint)
return app.wsgi_app
def example_audio_file():
"""Get path of audio file.
Returns:
str: Path of the example audio file.
See also:
:func:`example_label_file`
Examples:
>>> from nnmnkwii.util import example_audio_file
>>> from scipy.io import wavfile
>>> fs,x = wavfile.read(example_audio_file())
"""
name = "arctic_a0009"
wav_path = pkg_resources.resource_filename(
__name__, '_example_data/{}.wav'.format(name))
return wav_path
def example_question_file():
"""Get path of example question file.
The question file was taken from merlin_.
.. _merlin: https://github.com/CSTR-Edinburgh/merlin
Returns:
str: Path of the example audio file.
Examples:
>>> from nnmnkwii.util import example_question_file
>>> from nnmnkwii.io import hts
>>> binary_dict,continuous_dict = hts.load_question_set(example_question_file())
"""
return pkg_resources.resource_filename(
__name__, '_example_data/questions-radio_dnn_416.hed')
def __init__(self, weights_path=None,
vocab_path=None):
if weights_path is None:
weights_path = resource_filename(__name__,
'reactionrnn_weights.hdf5')
if vocab_path is None:
vocab_path = resource_filename(__name__,
'reactionrnn_vocab.json')
with open(vocab_path, 'r') as json_file:
self.vocab = json.load(json_file)
self.tokenizer = Tokenizer(filters='', char_level=True)
self.tokenizer.word_index = self.vocab
self.num_classes = len(self.vocab) + 1
self.model = reactionrnn_model(weights_path, self.num_classes)
self.model_enc = Model(inputs=self.model.input,
outputs=self.model.get_layer('rnn').output)
def main(argv=None):
args = docopt(__doc__, argv=argv, version='1.0.3')
assert 'config.toml' in (p.name for p in Path().iterdir()), "config.toml not found in directory. Are you sure you're in the project's root?"
if args['--init']:
notebooks_dir = Path('./notebooks/')
notebooks_dir.mkdir(exist_ok=True)
with open(resource_filename('hugo_jupyter', '__fabfile.py')) as fp:
fabfile = Path('fabfile.py')
fabfile.write_text(fp.read())
print(dedent("""
Successfully initialized. From this directory,the following commands are available.
Just remember to prepend them with `fab`
"""))
run(('fab', '-l'))
def test_copy_config(self):
tempdir = tempfile.mkdtemp()
server_root = pkg_resources.resource_filename(__name__, "testdata")
letshelp_le_apache.copy_config(server_root, tempdir)
temp_testdata = os.path.join(tempdir, "testdata")
self.assertFalse(os.path.exists(os.path.join(
temp_testdata, os.path.basename(_PASSWD_FILE))))
self.assertFalse(os.path.exists(os.path.join(
temp_testdata, os.path.basename(_KEY_FILE))))
self.assertFalse(os.path.exists(os.path.join(
temp_testdata, os.path.basename(_SECRET_FILE))))
self.assertTrue(os.path.exists(os.path.join(
temp_testdata, _PARTIAL_CONF_PATH)))
self.assertTrue(os.path.exists(os.path.join(
temp_testdata, _PARTIAL_LINK_PATH)))
def pkg_file(path):
return pkg_resources.resource_filename(
pkg_resources.Requirement.parse("docker-utils-aba"),
os.path.join("docker_utils_aba", path))
def __init__(self, *args):
self.log = logging.getLogger(resource_filename(__name__, __file__))
self.cluster = Cluster(args[0])
self.connection = self.cluster.connect()
self.connection.row_factory = tuple_factory
self.connection.execute("CREATE KEYSPACE IF NOT EXISTS public \
WITH REPLICATION = { 'class' : 'SimpleStrategy','replication_factor' : 1 };")
self.connection.execute("CREATE KEYSPACE IF NOT EXISTS internal \
WITH REPLICATION = { 'class' : 'SimpleStrategy','replication_factor' : 1 };")
self.connection.execute("CREATE TABLE IF NOT EXISTS public.users ( \
name text PRIMARY KEY,\
n text,\
e text,\
secret text);")
self.connection.execute("CREATE TABLE IF NOT EXISTS public.contracts ( \
id uuid PRIMARY KEY,\
owner text,\
package text,\
template blob,\
example blob);")
def __init__(self, __file__))
self.engine = create_engine(args[0])
self.connection = self.engine.connect()
def __init__(self, round_number):
self.round_number = round_number
self.train_file_name = 'r' + str(round_number) + '_numerai_training_data.csv'
self.test_file_name = 'r' + str(round_number) + '_numerai_tournament_data.csv'
self.sorted_file_name = 'r' + str(round_number) + '_numerai_sorted_training_data.csv'
if not os.path.exists(resource_filename('numerai.data', self.train_file_name)):
raise IOError('File {} not found.'.format(self.train_file_name))
if not os.path.exists(resource_filename('numerai.data', self.test_file_name)):
raise IOError('File {} not found.'.format(self.test_file_name))
def training_set(self):
return pd.read_csv(resource_filename('numerai.data', self.train_file_name))
def test_set(self):
return pd.read_csv(resource_filename('numerai.data', self.test_file_name))
def sorted_training_set(self):
return pd.read_csv(resource_filename('numerai.data', self.sorted_file_name))
def build_wheel():
"""Build wheels from test distributions."""
for dist in test_distributions:
pwd = os.path.abspath(os.curdir)
distdir = pkg_resources.resource_filename('wheel.test', dist)
os.chdir(distdir)
try:
sys.argv = ['', 'bdist_wheel']
exec(compile(open('setup.py').read(), 'setup.py', 'exec'))
finally:
os.chdir(pwd)