Python os 模块,getcwd() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.getcwd()。
def _git_update_requirements(venv, package_dir, reqs_dir):
"""
Update from global requirements.
Update an OpenStack git directory's requirements.txt and
test-requirements.txt from global-requirements.txt.
"""
orig_dir = os.getcwd()
os.chdir(reqs_dir)
python = os.path.join(venv, 'bin/python')
cmd = [python, 'update.py', package_dir]
try:
subprocess.check_call(cmd)
except subprocess.CalledProcessError:
package = os.path.basename(package_dir)
error_out("Error updating {} from "
"global-requirements.txt".format(package))
os.chdir(orig_dir)
def client_proc(job_id, data_file, rtask, task=None):
# send input file to rtask.location; this will be saved to dispycos process's
# working directory
if (yield pycos.Pycos().send_file(rtask.location, timeout=10)) < 0:
print('Could not send input data to %s' % rtask.location)
# terminate remote task
rtask.send(None)
raise stopiteration(-1)
# send info about input
obj = C(job_id, random.uniform(5, 8), task)
if (yield rtask.deliver(obj)) != 1:
print('Could not send input to %s' % rtask.location)
raise stopiteration(-1)
# rtask sends result to this task as message
result = yield task.receive()
if not result.result_file:
print('Processing %s Failed' % obj.i)
raise stopiteration(-1)
# rtask saves results file at this client,which is saved in pycos's
# dest_path,not current working directory!
result_file = os.path.join(pycos.Pycos().dest_path, result.result_file)
# move file to cwd
target = os.path.join(os.getcwd(), os.path.basename(result_file))
os.rename(result_file, target)
print(' job %s output is in %s' % (obj.i, target))
def build(self, file):
if self.built:
raise PermissionError("You cannot build multiple times!")
if not self.loaded:
self.load(file)
old = os.getcwd()
sys.path.append(os.path.dirname(os.path.abspath(file))) # for module import that aren't "include" call
try:
content = open(file, "rb").read()
os.chdir(os.path.dirname(os.path.abspath(file))) # set the current working directory,for open() etc.
exec(compile(content, file, 'exec'), self.user_functions)
except Exception as err:
print("An exception occured while building: ", file=sys.stderr)
lines = traceback.format_exc(None, err).splitlines()
print(" " + lines[-1], file=sys.stderr)
for l in lines[3:-1]:
print(l, file=sys.stderr)
exit(1)
os.chdir(old)
sys.path.remove(os.path.dirname(os.path.abspath(file)))
self.built = True
def load(self, file):
if self.loaded:
return
sys.path.append(os.path.dirname(os.path.abspath(file))) # for module import that aren't "include" call
old = os.getcwd()
try:
content = open(file, self.user_functions)
except Exception as err:
print("An exception occured while loading: ", file=sys.stderr)
exit(1)
os.chdir(old)
sys.path.remove(os.path.dirname(os.path.abspath(file)))
self.loaded = True
self.mem_offset = 0
def archive_context(filename):
# extracting the archive
tmpdir = tempfile.mkdtemp()
log.warn('Extracting in %s', tmpdir)
old_wd = os.getcwd()
try:
os.chdir(tmpdir)
with get_zip_class()(filename) as archive:
archive.extractall()
# going in the directory
subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
os.chdir(subdir)
log.warn('Now working in %s', subdir)
yield
finally:
os.chdir(old_wd)
shutil.rmtree(tmpdir)
def StartFileServer(fileServerDir):
"""
Start file server.
"""
if not fileServerDir:
message = \
"The PYUPDATER_FILESERVER_DIR environment variable is not set."
if hasattr(sys, "frozen"):
logger.error(message)
return None
else:
fileServerDir = os.path.join(os.getcwd(), 'pyu-data', 'deploy')
message += "\n\tSetting fileServerDir to: %s\n" % fileServerDir
logger.warning(message)
fileServerPort = GetEphemeralPort()
thread = threading.Thread(target=RunFileServer,
args=(fileServerDir, fileServerPort))
thread.start()
WaitForFileServerToStart(fileServerPort)
return fileServerPort
def save_users_and_groups_to_csv(user_data, csv_output_filepath):
"""
Creates a CSV file with exported user data
:param user_data: The exported user data
:param csv_output_filepath: The output file to save
:return: None
"""
full_output_path = os.path.join(os.getcwd(), csv_output_filepath)
with open(full_output_path, 'wb') as f:
fields = ['email', 'lastname', 'firstname', 'groups']
w = csv.DictWriter(f, fields)
w.writeheader()
for key, val in sorted(user_data.items()):
val['groups'] = ",".join(val['groups'][0::2])
row = {'email': key}
row.update(val)
w.writerow(row)
def configure(logger, path_to_config_file, export_formats):
"""
instantiate and configure logger,load config settings from file,instantiate SafetyCulture SDK
:param logger: the logger
:param path_to_config_file: path to config file
:param export_formats: desired export formats
:return: instance of SafetyCulture SDK object,config settings
"""
config_settings = load_config_settings(logger, path_to_config_file)
config_settings[EXPORT_FORMATS] = export_formats
sc_client = sp.SafetyCulture(config_settings[API_TOKEN])
if config_settings[EXPORT_PATH] is not None:
create_directory_if_not_exists(logger, config_settings[EXPORT_PATH])
else:
logger.info('Invalid export path was found in ' + path_to_config_file + ',defaulting to /exports')
config_settings[EXPORT_PATH] = os.path.join(os.getcwd(), 'exports')
create_directory_if_not_exists(logger, config_settings[EXPORT_PATH])
return sc_client, config_settings
def abspath(path):
"""Return the absolute version of a path."""
if path: # Empty path must return current working directory.
path = os.fspath(path)
try:
path = _getfullpathname(path)
except OSError:
pass # Bad path - return unchanged.
elif isinstance(path, bytes):
path = os.getcwdb()
else:
path = os.getcwd()
return normpath(path)
# realpath is a no-op on systems without islink support
def _candidate_tempdir_list():
"""Generate a list of candidate temporary directories which
_get_default_tempdir will try."""
dirlist = []
# First,try the environment.
for envname in 'TMPDIR', 'TEMP', 'TMP':
dirname = _os.getenv(envname)
if dirname: dirlist.append(dirname)
# Failing that,try OS-specific locations.
if _os.name == 'nt':
dirlist.extend([ r'c:\temp', r'c:\tmp', r'\temp', r'\tmp' ])
else:
dirlist.extend([ '/tmp', '/var/tmp', '/usr/tmp' ])
# As a last resort,the current directory.
try:
dirlist.append(_os.getcwd())
except (AttributeError, OSError):
dirlist.append(_os.curdir)
return dirlist
def archive_context(filename):
# extracting the archive
tmpdir = tempfile.mkdtemp()
log.warn('Extracting in %s', subdir)
yield
finally:
os.chdir(old_wd)
shutil.rmtree(tmpdir)
def archive_context(filename):
# extracting the archive
tmpdir = tempfile.mkdtemp()
log.warn('Extracting in %s', subdir)
yield
finally:
os.chdir(old_wd)
shutil.rmtree(tmpdir)
def init_work_dir(self):
retval = os.getcwd()
print '#current dir is : ' + retval
# ??????
store_dir = retval + os.sep + 'tmp'
print '#all imgs are going to be stored in dir :' + store_dir
if not os.path.exists(store_dir):
print '#tmp dir does not exist,attemp to mkdir'
os.mkdir(store_dir)
print '#mkdir sucessfully'
else:
print '#tmp dir is already exist'
self.store_dir = store_dir
# print '#Now change current dir to tmp'
# os.chdir(store_dir) #no neccessary
# print os.getcwd()
def init_work_dir(self):
retval = os.getcwd()
print '#current dir is : ' + retval
# ??????
store_dir = retval + os.sep + 'tmp'
print '#all imgs are going to be stored in dir :' + store_dir
if not os.path.exists(store_dir):
print '#tmp dir does not exist,attemp to mkdir'
os.mkdir(store_dir)
print '#mkdir sucessfully'
else:
print '#tmp dir is already exist'
self.store_dir = store_dir
# print '#Now change current dir to tmp'
# os.chdir(store_dir) #no neccessary
# print os.getcwd()
def _run_local_lambda(self, lambda_config):
prev_folder = os.getcwd()
os.chdir(self.config.get_projectdir())
sys.path.append(self.config.get_projectdir())
lambda_name = lambda_config["FunctionName"]
lambda_handler = self.import_function(lambda_config["Handler"])
# Run and set a counter
start = time.time()
results = lambda_handler({}, MockContext(lambda_name))
end = time.time()
# restore folder
os.chdir(prev_folder)
# Print results
logger.info("{0}".format(results))
logger.info("\nexecution time: {:.8f}s\nfunction execution "
"timeout: {:2}s".format(end - start, lambda_config["Timeout"]))
def find_dotenv(filename='.env', raise_error_if_not_found=False, usecwd=False):
"""
Search in increasingly higher folders for the given file
Returns path to the file if found,or an empty string otherwise
"""
if usecwd or '__file__' not in globals():
# should work without __file__,e.g. in REPL or IPython notebook
path = os.getcwd()
else:
# will work for .py files
frame_filename = sys._getframe().f_back.f_code.co_filename
path = os.path.dirname(os.path.abspath(frame_filename))
for dirname in _walk_to_root(path):
check_path = os.path.join(dirname, filename)
if os.path.exists(check_path):
return check_path
if raise_error_if_not_found:
raise IOError('File not found')
return ''
def _changing_cd(f, *args, **kwargs):
def inner(*args, **kwargs):
try:
state = State(args[0].view)
except AttributeError:
state = State(args[0].window.active_view())
old = os.getcwd()
try:
# FIXME: Under some circumstances,like when switching projects to
# a file whose _cmdline_cd has not been set,_cmdline_cd might
# return 'None'. In such cases,change to the actual current
# directory as a last measure. (We should probably fix this anyway).
os.chdir(state.settings.vi['_cmdline_cd'] or old)
f(*args, **kwargs)
finally:
os.chdir(old)
return inner
def test_comp_macro_directories_config_python(self):
file_loc = os.getcwd()
self.comp = sb.launch(self.cname, impl="python", execparams={'LOGGING_CONfig_URI':'file://'+os.getcwd()+'/logconfig.cfg'} )
fp = None
try:
fp = open('foo/bar/test.log','r')
except:
pass
try:
os.remove('foo/bar/test.log')
except:
pass
try:
os.rmdir('foo/bar')
except:
pass
try:
os.rmdir('foo')
except:
pass
self.assertNotEquals(fp, None)
def test_comp_macro_directories_config_cpp(self):
file_loc = os.getcwd()
self.comp = sb.launch(self.cname, impl="cpp", None)
def test_comp_macro_directories_config_java(self):
file_loc = os.getcwd()
self.comp = sb.launch(self.cname, impl="java", None)
def setUp(self):
cfg = "log4j.rootLogger=TRACE,CONSOLE,FILE\n" + \
"log4j.debug=false\n" + \
"# Direct log messages to FILE\n" + \
"log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender\n" + \
"log4j.appender.CONSOLE.File=stdout\n" + \
"log4j.appender.FILE=org.apache.log4j.FileAppender\n" + \
"log4j.appender.FILE.File="+os.getcwd()+"/tmp_logfile.log\n" + \
"log4j.appender.CONSOLE.threshold=TRACE\n" + \
"log4j.appender.FILE.threshold=TRACE\n" + \
"log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout\n" + \
"log4j.appender.CONSOLE.layout.ConversionPattern=%p:%c - %m [%F:%L]%n\n" + \
"log4j.appender.FILE.layout=org.apache.log4j.PatternLayout\n" + \
"log4j.appender.FILE.layout.ConversionPattern=%d %p:%c - %m [%F:%L]%n\n"
fp = open('tmp_logfile.config','w')
fp.write(cfg)
fp.close()
nodebooter, self._domMgr = self.launchdomainManager()
self._domBooter = nodebooter
def setUp(self):
cfg = "log4j.rootLogger=DEBUG,STDOUT,FILE\n " + \
"# Direct log messages to FILE\n" + \
"log4j.appender.STDOUT=org.apache.log4j.ConsoleAppender\n" + \
"log4j.appender.STDOUT.layout=org.apache.log4j.PatternLayout\n" + \
"log4j.appender.FILE=org.apache.log4j.FileAppender\n" + \
"log4j.appender.FILE.File=tmp_logfile.log\n" + \
"log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout\n" + \
"log4j.appender.CONSOLE.layout.ConversionPattern=%p:%c - %m [%F:%L]%n\n" + \
"log4j.appender.FILE.layout=org.apache.log4j.PatternLayout\n" + \
"log4j.appender.FILE.layout.ConversionPattern=%d %p:%c - %m [%F:%L]%n\n"
fp = open('tmp_logfile.config','w')
fp.write(cfg)
fp.close()
self.domBooter, self._domMgr = self.launchdomainManager(loggingURI=os.getcwd()+'/tmp_logfile.config')
self.devBooter, self._devMgr = self.launchdeviceManager("/nodes/test_ExecutableDevice_node/DeviceManager.dcd.xml")
self._app = None
def _git_update_requirements(venv, package_dir]
try:
subprocess.check_call(cmd)
except subprocess.CalledProcessError:
package = os.path.basename(package_dir)
error_out("Error updating {} from "
"global-requirements.txt".format(package))
os.chdir(orig_dir)
def _git_update_requirements(venv, package_dir]
try:
subprocess.check_call(cmd)
except subprocess.CalledProcessError:
package = os.path.basename(package_dir)
error_out("Error updating {} from "
"global-requirements.txt".format(package))
os.chdir(orig_dir)
def get_json(org):
d = {"nodes":[],"links":[]}
for i in graph:
dt = {}
dt["id"]=i
dt["group"]=1
d["nodes"].append(dt)
for i in graph:
for j in graph[i]:
for k in j:
dt={}
dt["source"]=i
dt["target"]=k[1::]
dt["value"]=10
d["links"].append(dt)
string_json = json.dumps(d)
filename = org + ".json"
f = open(os.path.join(os.getcwd(), "static", filename), "w")
f.write(string_json)
f.close()
def find_dotenv(filename='.env', filename)
if os.path.exists(check_path):
return check_path
if raise_error_if_not_found:
raise IOError('File not found')
return ''
def __init__(self, additional_compose_file=None, additional_services=None):
# To resolve docker client server version mismatch issue.
os.environ["COMPOSE_API_VERSION"] = "auto"
dir_name = os.path.split(os.getcwd())[-1]
self.project = "{}{}".format(
re.sub(r'[^a-z0-9]', '', dir_name.lower()),
getpass.getuser()
)
self.additional_compose_file = additional_compose_file
self.services = ["zookeeper", "schematizer", "kafka"]
if additional_services is not None:
self.services.extend(additional_services)
# This variable is meant to capture the running/not-running state of
# the dependent testing containers when tests start running. The idea
# is,we'll only start and stop containers if they aren't already
# running. If they are running,we'll just use the ones that exist.
# It takes a while to start all the containers,so when running lots of
# tests,it's best to start them out-of-band and leave them up for the
# duration of the session.
self.containers_already_running = self._are_containers_already_running()
def archive_context(filename):
"""
Unzip filename to a temporary directory,set to the cwd.
The unzipped target is cleaned up after.
"""
tmpdir = tempfile.mkdtemp()
log.warn('Extracting in %s', tmpdir)
old_wd = os.getcwd()
try:
os.chdir(tmpdir)
with ContextualZipFile(filename) as archive:
archive.extractall()
# going in the directory
subdir = os.path.join(tmpdir, subdir)
yield
finally:
os.chdir(old_wd)
shutil.rmtree(tmpdir)
def startRPC(self, port, eventListenerPort):
logging.basicConfig(filename='worldpay-within-wrapper.log', level=logging.DEBUG)
reqOS = ["darwin", "win32", "windows", "linux"]
reqArch = ["x64", "ia32"]
cfg = launcher.Config(reqOS, reqArch)
launcherLocal = launcher.launcher()
# define log file name for rpc agent,so e.g
# for "runconsumerOWP.py" it will be: "rpc-wpwithin-runconsumerOWP.log"
logfilename = os.path.basename(sys.argv[0])
logfilename = "rpc-wpwithin-" + logfilename.rsplit(".", 1)[0] + ".log"
args = []
if eventListenerPort > 0:
logging.debug(str(os.getcwd()) + "" + "-port " + str(port) + " -logfile " + logfilename + " -loglevel debug,warn,error,fatal,info" + " -callbackport " + str(eventListenerPort))
args = ['-port', str(port), '-logfile', logfilename, '-loglevel', 'debug,info', '-callbackport', str(eventListenerPort)]
else:
logging.debug(str(os.getcwd()) + "" + "-port " + str(port) + " -logfile " + logfilename + " -loglevel debug,info")
args = ['-port',info']
process = launcherLocal.launch(cfg, os.getcwd() + "", args)
return process
def compose(env):
"""Compose the configuration."""
offset = _get_utils(env)
rsync = ["rsync",
"-aczv",
USER_FOLDER,
os.path.join(offset, USER_FOLDER),
"--delete-after",
_get_exclude("*.pyc"),
_get_exclude("README.md"),
_get_exclude("__init__.py"),
_get_exclude("__config__.py")]
call(rsync, "rsync user deFinitions")
here = os.getcwd()
composition = ["python2.7",
"config_compose.py",
"--output", os.path.join(here, FILE_NAME)]
call(composition, "compose configuration", working_dir=offset)
def test_RTagsClientUpdateBuffers(self):
try:
os.chdir("dirty")
except OSError:
print("Test Error: Couldn't cd into 'dirty' test directory.")
raise
self.assertTrue(self.cmake_build_info["build_dir"].is_dir())
self.assertTrue(self.cmake_build_info["comp_data_cmake"].is_file())
self.plugin.setup_rtags_daemon()
self.plugin.connect_rtags_client()
self.plugin.update_rtags_buffers(
[str(src_info["test_cpp"]),
str(src_info["cpp"])])
try:
rtags_client_status = subprocess.check_output(
self.cmake_cmd_info["rtags_buffers"])
except subprocess.CalledProcessError as e:
print(e.output)
filepath = os.getcwd() + str(src_info["test_cpp"])
self.assertTrue(str(rtags_client_status).find(filepath))
def archive_context(filename):
# extracting the archive
tmpdir = tempfile.mkdtemp()
log.warn('Extracting in %s', subdir)
yield
finally:
os.chdir(old_wd)
shutil.rmtree(tmpdir)
def load_dynamic_config(configurations, config_dir=getcwd()):
"""Load and parse dynamic config"""
# Create full path of config
config_file = '{path}/config.py'.format(path=config_dir)
# Insert config path so we can import it
sys.path.insert(0, path.dirname(path.abspath(config_file)))
try:
config_module = __import__('config')
for key, value in config_module.CONfig.items():
LOG.debug('Importing %s with key %s', key, value)
# Update configparser object
configurations.update({key: value})
except ImportError:
# Provide a default if config not found
configurations = {}
def archive_context(filename):
# extracting the archive
tmpdir = tempfile.mkdtemp()
log.warn('Extracting in %s', subdir)
yield
finally:
os.chdir(old_wd)
shutil.rmtree(tmpdir)
def pquery(command, stdin=None, **kwargs):
if very_verbose:
info('Query "'+' '.join(command)+'" in '+getcwd())
try:
proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)
except OSError as e:
if e[0] == errno.ENOENT:
error(
"Could not execute \"%s\".\n"
"Please verify that it's installed and accessible from your current path by executing \"%s\".\n" % (command[0], command[0]), e[0])
else:
raise e
stdout, _ = proc.communicate(stdin)
if very_verbose:
log(str(stdout).strip()+"\n")
if proc.returncode != 0:
raise ProcessException(proc.returncode, command[0], ' '.join(command), getcwd())
return stdout
def seturl(url):
info("Setting url to \"%s\" in %s" % (url, getcwd()))
hgrc = os.path.join('.hg', 'hgrc')
tagpaths = '[paths]'
remote = 'default'
lines = []
try:
with open(hgrc) as f:
lines = f.read().splitlines()
except IOError:
pass
if tagpaths in lines:
idx = lines.index(tagpaths)
m = re.match(r'^([\w_]+)\s*=\s*(.*)$', lines[idx+1])
if m:
remote = m.group(1)
del lines[idx+1]
lines.insert(idx, remote+' = '+url)
else:
lines.append(tagpaths)
lines.append(remote+' = '+url)
def unignore(dest):
Hg.ignore_file = os.path.join('.hg', 'hgignore')
try:
with open(Hg.ignore_file) as f:
lines = f.read().splitlines()
except IOError:
lines = []
if dest in lines:
lines.remove(dest)
try:
with open(Hg.ignore_file, 'w') as f:
f.write('\n'.join(lines) + '\n')
except IOError:
error("Unable to write ignore file in \"%s\"" % os.path.join(getcwd(), Hg.ignore_file), 1)
# pylint: disable=no-self-argument,no-method-argument,no-member,no-self-use,unused-argument
def checkout(rev, clean=False):
if not rev:
return
info("Checkout \"%s\" in %s" % (rev, os.path.basename(getcwd())))
branch = None
refs = Git.getbranches(rev)
for ref in refs: # re-associate with a local or remote branch (rev is the same)
m = re.match(r'^(.*?)\/(.*?)$', ref)
if m and m.group(2) != "HEAD": # matches origin/<branch> and isn't HEAD ref
if not os.path.exists(os.path.join('.git', 'refs', 'heads', m.group(2))): # okay only if local branch with that name doesn't exist (git will checkout the origin/<branch> in that case)
branch = m.group(2)
elif ref != "HEAD":
branch = ref # matches local branch and isn't HEAD ref
if branch:
info("Revision \"%s\" matches a branch \"%s\" reference. Re-associating with branch" % (rev, branch))
popen([git_cmd, 'checkout', branch] + ([] if very_verbose else ['-q']))
break
if not branch:
popen([git_cmd, rev] + (['-f'] if clean else []) + ([] if very_verbose else ['-q']))
def update(rev=None, clean=False, clean_files=False, is_local=False):
if not is_local:
Git.fetch()
if clean:
Git.discard(clean_files)
if rev:
Git.checkout(rev, clean)
else:
remote = Git.getremote()
branch = Git.getbranch()
if remote and branch:
try:
Git.merge('%s/%s' % (remote, branch))
except ProcessException:
pass
else:
err = "Unable to update \"%s\" in \"%s\"." % (os.path.basename(getcwd()), getcwd())
if not remote:
info(err+" The local repository is not associated with a remote one.")
if not branch:
info(err+" Working set is not on a branch.")
def ignore(dest):
try:
with open(Git.ignore_file) as f:
exists = dest in f.read().splitlines()
except IOError:
exists = False
if not exists:
try:
ignore_file_parent_directory = os.path.dirname(Git.ignore_file)
if not os.path.exists(ignore_file_parent_directory):
os.mkdir(ignore_file_parent_directory)
with open(Git.ignore_file, 'a') as f:
f.write(dest.replace("\\", "/") + '\n')
except IOError:
error("Unable to write ignore file in \"%s\"" % os.path.join(getcwd(), Git.ignore_file), 1)
def fromrepo(cls, path=None):
repo = cls()
if path is None:
path = Repo.findparent(getcwd())
if path is None:
error(
"Could not find mbed program in current path \"%s\".\n"
"You can fix this by calling \"mbed new .\" or \"mbed config root .\" in the root of your program." % getcwd())
repo.path = os.path.abspath(path)
repo.name = os.path.basename(repo.path)
cache_cfg = Global().get_cfg('CACHE', '')
if cache_repositories and cache_cfg and cache_cfg != 'none' and cache_cfg != 'off' and cache_cfg != 'disabled':
loc = cache_cfg if (cache_cfg and cache_cfg != 'on' and cache_cfg != 'enabled') else None
repo.cache = loc or os.path.join(tempfile.gettempdir(), 'mbed-repo-cache')
repo.sync()
if repo.scm is None:
warning(
"Program \"%s\" in \"%s\" does not use source control management.\n"
"To fix this you should use \"mbed new .\" in the root of your program." % (repo.name, repo.path))
return repo
def pathtype(cls, path=None):
path = os.path.abspath(path or getcwd())
depth = 0
while cd(path):
tpath = path
path = Repo.findparent(path)
if path:
depth += 1
path = os.path.split(path)[0]
if tpath == path: # Reached root.
break
else:
break
return "directory" if depth == 0 else ("program" if depth == 1 else "library")
def __init__(self, path=None, print_warning=False):
path = os.path.abspath(path or getcwd())
self.path = path
self.is_cwd = True
while cd(path):
tpath = path
if os.path.isfile(os.path.join(path, Cfg.file)):
self.path = path
self.is_cwd = False
break
path = os.path.split(path)[0]
if tpath == path: # Reached root.
break
self.name = os.path.basename(self.path)
self.is_classic = os.path.isfile(os.path.join(self.path, 'mbed.bld'))
# is_cwd flag indicates that current dir is assumed to be root,not root repo
if self.is_cwd and print_warning:
warning(
"Could not find mbed program in current path \"%s\".\n"
"You can fix this by calling \"mbed new .\" in the root of your program." % self.path)
def main():
# Add examples to path.
parent_dir = os.path.abspath(os.path.join(os.getcwd(), os.pardir))
sys.path.insert(0, parent_dir)
# Grab all example files.
example_dir = os.path.join(os.getcwd(), "..", "examples")
example_files = [f for f in os.listdir(example_dir) if os.path.isfile(os.path.join(example_dir, f)) and "py" == f.split(".")[-1] and "init" not in f and "viz_exam" not in f]
print("\n" + "="*32)
print("== Running", len(example_files), "simple_rl tests ==")
print("="*32 + "\n")
total_passed = 0
for i, ex in enumerate(example_files):
print("\t [Test", str(i + 1) + "] ", ex + ": ",)
result = run_example(os.path.join(example_dir, ex))
if result:
total_passed += 1
print("\t\tPASS.")
else:
print("\t\tFAIL.")
print("\nResults:", total_passed, "/", "passed.")
def create(ctx, maintype, subtype, app_name, factory, args):
args = args or []
maintype = maintype or ctx.obj['SETTINGS'].get(
'default_maintype', 'python')
subtype = subtype or ctx.obj['SETTINGS'].get(
'default_subtype', 'app')
click.echo('Type: {};\t Subtype: {};\t App name: {};'.format(
maintype, app_name))
current_dir = os.getcwd()
additional_dirs = ctx.obj['SETTINGS'].get('templates', [])
factory_module, path = get_factory(maintype, additional_dirs)
if not factory_module:
click.echo('ERROR: factory not found:{}'.format(maintype), err=True)
exit(1)
app_factory = factory_module.AppFactory(path, args)
app_factory.setup(subtype, current_dir)
app_factory.set_context(ctx.obj['SETTINGS'].get('context', {}))
app_factory.run()
click.echo('Done!')
def chdir(directory):
"""Change the current working directory to a different directory for a code
block and return the prevIoUs directory after the block exits. Useful to
run commands from a specificed directory.
:param str directory: The directory path to change to for this context.
"""
cur = os.getcwd()
try:
yield os.chdir(directory)
finally:
os.chdir(cur)
def _add_services(self, this_service, other_services):
"""Add services.
Add services to the deployment where this_service is the local charm
that we're testing and other_services are the other services that
are being used in the local amulet tests.
"""
if this_service['name'] != os.path.basename(os.getcwd()):
s = this_service['name']
msg = "The charm's root directory name needs to be {}".format(s)
amulet.raise_status(amulet.FAIL, msg=msg)
if 'units' not in this_service:
this_service['units'] = 1
self.d.add(this_service['name'], units=this_service['units'],
constraints=this_service.get('constraints'))
for svc in other_services:
if 'location' in svc:
branch_location = svc['location']
elif self.series:
branch_location = 'cs:{}/{}'.format(self.series, svc['name']),
else:
branch_location = None
if 'units' not in svc:
svc['units'] = 1
self.d.add(svc['name'], charm=branch_location, units=svc['units'],
constraints=svc.get('constraints'))
def show_io(input_dir, output_dir):
''' show directory structure and inputs and autputs to scoring program'''
swrite('\n=== DIRECTORIES ===\n\n')
# Show this directory
swrite("-- Current directory " + pwd() + ":\n")
write_list(ls('.'))
write_list(ls('./*'))
write_list(ls('./*/*'))
swrite("\n")
# List input and output directories
swrite("-- Input directory " + input_dir + ":\n")
write_list(ls(input_dir))
write_list(ls(input_dir + '/*'))
write_list(ls(input_dir + '/*/*'))
write_list(ls(input_dir + '/*/*/*'))
swrite("\n")
swrite("-- Output directory " + output_dir + ":\n")
write_list(ls(output_dir))
write_list(ls(output_dir + '/*'))
swrite("\n")
# write Meta data to sdterr
swrite('\n=== MetaDATA ===\n\n')
swrite("-- Current directory " + pwd() + ":\n")
try:
Metadata = yaml.load(open('Metadata', 'r'))
for key,value in Metadata.items():
swrite(key + ': ')
swrite(str(value) + '\n')
except:
swrite("none\n");
swrite("-- Input directory " + input_dir + ":\n")
try:
Metadata = yaml.load(open(os.path.join(input_dir, 'Metadata'),value in Metadata.items():
swrite(key + ': ')
swrite(str(value) + '\n')
swrite("\n")
except:
swrite("none\n");