Python os 模块,rmdir() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.rmdir()。
def rollback(self):
if not self.dry_run:
for f in list(self.files_written):
if os.path.exists(f):
os.remove(f)
# dirs should all be empty Now,except perhaps for
# __pycache__ subdirs
# reverse so that subdirs appear before their parents
dirs = sorted(self.dirs_created, reverse=True)
for d in dirs:
flist = os.listdir(d)
if flist:
assert flist == ['__pycache__']
sd = os.path.join(d, flist[0])
os.rmdir(sd)
os.rmdir(d) # should fail if non-empty
self._init_record()
def _test_NoAccessDir(self, nodeName):
devBooter, devMgr = self.launchdeviceManager("/nodes/%s/DeviceManager.dcd.xml" % nodeName)
device = devMgr._get_registeredDevices()[0]
fileMgr = self._domMgr._get_fileMgr()
dirname = '/noaccess'
testdir = os.path.join(scatest.getSdrPath(), 'dom' + dirname)
if not os.path.exists(testdir):
os.mkdir(testdir, 0000)
else:
os.chmod(testdir, 0000)
try:
self.assertFalse(os.access(testdir, os.R_OK|os.X_OK), 'Current user can still access directory')
self.assertRaises(CF.LoadableDevice.LoadFail, device.load, fileMgr, dirname, CF.LoadableDevice.SHARED_LIBRARY)
finally:
os.rmdir(testdir)
def test_comp_macro_directories_config_python(self):
file_loc = os.getcwd()
self.comp = sb.launch(self.cname, impl="python", execparams={'LOGGING_CONfig_URI':'file://'+os.getcwd()+'/logconfig.cfg'} )
fp = None
try:
fp = open('foo/bar/test.log','r')
except:
pass
try:
os.remove('foo/bar/test.log')
except:
pass
try:
os.rmdir('foo/bar')
except:
pass
try:
os.rmdir('foo')
except:
pass
self.assertNotEquals(fp, None)
def test_comp_macro_directories_config_cpp(self):
file_loc = os.getcwd()
self.comp = sb.launch(self.cname, impl="cpp", None)
def test_comp_macro_directories_config_java(self):
file_loc = os.getcwd()
self.comp = sb.launch(self.cname, impl="java", None)
def test_ExistsException(self):
self.assertNotEqual(self._domMgr, None)
fileMgr = self._domMgr._get_fileMgr()
# Makes sure that FileSystem::exists() throws correct exception and
# doesn't kill domain for files in directories it cannot access
dirname = '/noaccess'
testdir = os.path.join(scatest.getSdrPath(), 0644)
else:
os.chmod(testdir, 0644)
try:
self.assertFalse(os.access(testdir, 'Current user can still access directory')
self.assertRaises(CF.InvalidFileName, fileMgr.exists, os.path.join(dirname, 'testfile'))
finally:
os.rmdir(testdir)
def remove_folder(self, folder):
"""Delete the named folder,which must be empty."""
path = os.path.join(self._path, '.' + folder)
for entry in os.listdir(os.path.join(path, 'new')) + \
os.listdir(os.path.join(path, 'cur')):
if len(entry) < 1 or entry[0] != '.':
raise NotEmptyError('Folder contains message(s): %s' % folder)
for entry in os.listdir(path):
if entry != 'new' and entry != 'cur' and entry != 'tmp' and \
os.path.isdir(os.path.join(path, entry)):
raise NotEmptyError("Folder contains subdirectory '%s': %s" %
(folder, entry))
for root, dirs, files in os.walk(path, topdown=False):
for entry in files:
os.remove(os.path.join(root, entry))
for entry in dirs:
os.rmdir(os.path.join(root, entry))
os.rmdir(path)
def reformat(self, sourcefile, destfile, configfile):
# type: (str,str,str) -> None
"""Reformats sourcefile according to configfile and writes it to destfile.
This method is only used for testing.
"""
tmpdir = tempfile.mkdtemp(prefix='whatstyle_')
cfg = os.path.join(tmpdir, self.configfilename)
copyfile(configfile, cfg)
tmpfilename = os.path.join(tmpdir, os.path.basename(sourcefile))
copyfile(sourcefile, tmpfilename)
cmdargs = [tmpfilename]
exeresult = run_executable(self.exe, cmdargs)
writebinary(destfile, exeresult.stdout)
os.remove(tmpfilename)
os.remove(cfg)
os.rmdir(tmpdir)
def test_replace_loggroup_section(mocked_configparser_class):
config_dir = api_utils.user_config_dir(cli.lecli.__name__)
if not os.path.exists(api_utils.CONfig_FILE_PATH):
if not os.path.exists(config_dir):
os.makedirs(config_dir)
loggroups_section = api_utils.LOGGROUPS_SECTION
config_parser_mock = mocked_configparser_class.return_value
config_parser_mock.add_section(loggroups_section)
with patch.object(api_utils.CONfig, 'items',
return_value=[('test-log-group-favs', ["test-log-key1", "test-log-key2"])]):
api_utils.replace_loggroup_section()
assert not api_utils.CONfig.has_section(loggroups_section)
assert config_parser_mock.has_section(api_utils.CLI_FAVORITES_SECTION)
try:
os.remove(api_utils.CONfig_FILE_PATH)
os.rmdir(config_dir)
except OSError:
pass
def cleanup(self):
"""Close and remove all created temporary files.
For the purposes of debugging,files are not removed
if BAP finished with a positive nonzero code. I.e.,
they are removed only if BAP terminated normally,or was
killed by a signal (terminated).
All opened file descriptros are closed in any case."""
for desc in self.fds:
desc.close()
if not self.DEBUG and (self.proc is None or
self.proc.returncode <= 0):
for path in os.listdir(self.tmpdir):
os.remove(os.path.join(self.tmpdir, path))
os.rmdir(self.tmpdir)
def remove_directory_files(path, remove_directory=False, remove_path=False):
"Remove directory at path,respecting the flags."
for root, False):
# Ignore path if remove_path is false.
if remove_path or root != path:
for name in files:
filename = os.path.join(root, name)
try:
os.remove(filename)
except OSError:
pass
# Ignore directory if remove_directory is false.
if remove_directory:
try:
os.rmdir(root)
except OSError:
pass
def unlock(self):
'''does not raise an exception,
safe to unlock as often as you want
it may just do nothing'''
if self.locked.has_key(self.d2):
#we're the ones that unlocked it,
#if time matched
if self.locked[self.d2]==\
os.stat(self.d2)[8]:
try:
del(self.locked[self.d2])
os.rmdir(self.d2)
os.rmdir(self.d)
return 1
except:
return 0
else:
del(self.locked[self.d2])
return 0
else:
return 0
def prepare_working_dir(directory_path, purge=False):
folders = ["/findings/", "/corpus", "/findings/panic", "/findings/kasan", "/findings/timeout", "/rbuf", "/evaluation"]
if purge:
if os.path.isdir(directory_path):
for root, files in os.walk(directory_path, topdown=False):
for name in files:
os.remove(os.path.join(root, name))
for name in dirs:
os.rmdir(os.path.join(root, name))
if os.path.exists("/dev/shm/kafl_filter0"):
os.remove("/dev/shm/kafl_filter0")
if os.path.exists("/dev/shm/kafl_tfilter"):
os.remove("/dev/shm/kafl_tfilter")
if len(os.listdir(directory_path)) == 0:
for folder in folders:
os.makedirs(directory_path + folder)
else:
for folder in folders:
if not os.path.isdir(directory_path + folder):
return False
return True
def test_log_file_with_timed_rotating(self):
tmpdir = tempfile.mkdtemp()
try:
self.options.log_file_prefix = tmpdir + '/test_log'
self.options.log_rotate_mode = 'time'
enable_pretty_logging(options=self.options, logger=self.logger)
self.logger.error('hello')
self.logger.handlers[0].flush()
filenames = glob.glob(tmpdir + '/test_log*')
self.assertEqual(1, len(filenames))
with open(filenames[0]) as f:
self.assertRegexpMatches(
f.read(),
r'^\[E [^]]*\] hello$')
finally:
for handler in self.logger.handlers:
handler.flush()
handler.close()
for filename in glob.glob(tmpdir + '/test_log*'):
os.unlink(filename)
os.rmdir(tmpdir)
def test_log_file(self):
tmpdir = tempfile.mkdtemp()
try:
self.options.log_file_prefix = tmpdir + '/test_log'
enable_pretty_logging(options=self.options, logger=self.logger)
self.assertEqual(1, len(self.logger.handlers))
self.logger.error('hello')
self.logger.handlers[0].flush()
filenames = glob.glob(tmpdir + '/test_log*')
self.assertEqual(1, len(filenames))
with open(filenames[0]) as f:
self.assertRegexpMatches(f.read(), r'^\[E [^]]*\] hello$')
finally:
for handler in self.logger.handlers:
handler.flush()
handler.close()
for filename in glob.glob(tmpdir + '/test_log*'):
os.unlink(filename)
os.rmdir(tmpdir)
def test_log_file_with_timed_rotating(self):
tmpdir = tempfile.mkdtemp()
try:
self.options.log_file_prefix = tmpdir + '/test_log'
self.options.log_rotate_mode = 'time'
enable_pretty_logging(options=self.options,
r'^\[E [^]]*\] hello$')
finally:
for handler in self.logger.handlers:
handler.flush()
handler.close()
for filename in glob.glob(tmpdir + '/test_log*'):
os.unlink(filename)
os.rmdir(tmpdir)
def test_log_file(self):
tmpdir = tempfile.mkdtemp()
try:
self.options.log_file_prefix = tmpdir + '/test_log'
enable_pretty_logging(options=self.options, r'^\[E [^]]*\] hello$')
finally:
for handler in self.logger.handlers:
handler.flush()
handler.close()
for filename in glob.glob(tmpdir + '/test_log*'):
os.unlink(filename)
os.rmdir(tmpdir)
def test_log_file_with_timed_rotating(self):
tmpdir = tempfile.mkdtemp()
try:
self.options.log_file_prefix = tmpdir + '/test_log'
self.options.log_rotate_mode = 'time'
enable_pretty_logging(options=self.options,
r'^\[E [^]]*\] hello$')
finally:
for handler in self.logger.handlers:
handler.flush()
handler.close()
for filename in glob.glob(tmpdir + '/test_log*'):
os.unlink(filename)
os.rmdir(tmpdir)
def retrive_osd_details(device_name):
osd_details = {}
if device_name is None:
return None
try:
tmpd = tempfile.mkdtemp()
log.info("Create temp directory %s" %(tmpd))
try:
out_mnt = utils.execute_local_command(['mount',device_name,tmpd])
if out_mnt['retcode'] == 0:
osd_details = _retrive_osd_details_from_dir(tmpd)
finally:
utils.execute_local_command(['umount',tmpd])
finally:
log.info("Destroy temp directory %s" %(tmpd))
os.rmdir(tmpd)
return osd_details
def paired_files():
root = tempfile.mkdtemp()
files = [[root, 'file1.mp3'], [root, 'file1.jams'],
[root, 'file2.ogg'], 'file2.jams'], 'file3.wav'], 'file3.jams'], 'file4.flac'], 'file4.jams']]
files = [os.sep.join(_) for _ in files]
for fname in files:
with open(fname, 'w'):
pass
yield root, files
for fname in files:
try:
os.remove(fname)
except FileNotFoundError:
pass
os.rmdir(root)
def symlinks_supported():
"""
A function to check if creating symlinks are supported in the
host platform and/or if they are allowed to be created (e.g.
on Windows it requires admin permissions).
"""
tmpdir = tempfile.mkdtemp()
original_path = os.path.join(tmpdir, 'original')
symlink_path = os.path.join(tmpdir, 'symlink')
os.makedirs(original_path)
try:
os.symlink(original_path, symlink_path)
supported = True
except (OSError, NotImplementedError, AttributeError):
supported = False
else:
os.remove(symlink_path)
finally:
os.rmdir(original_path)
os.rmdir(tmpdir)
return supported
def test_forge_globus_download():
f = forge.Forge()
# Simple case
res1 = f.globus_download(example_result1)
assert os.path.exists("./test_fetch.txt")
os.remove("./test_fetch.txt")
# With dest and preserve_dir
dest_path = os.path.expanduser("~/mdf")
f.globus_download(example_result1, dest=dest_path, preserve_dir=True)
assert os.path.exists(os.path.join(dest_path, "test", "test_fetch.txt"))
os.remove(os.path.join(dest_path, "test_fetch.txt"))
os.rmdir(os.path.join(dest_path, "test"))
# With multiple files
f.globus_download(example_result2, dest=dest_path)
assert os.path.exists(os.path.join(dest_path, "test_fetch.txt"))
assert os.path.exists(os.path.join(dest_path, "test_multifetch.txt"))
os.remove(os.path.join(dest_path, "test_multifetch.txt"))
def remove(self, rec=1, ignore_errors=False):
""" remove a file or directory (or a directory tree if rec=1).
if ignore_errors is True,errors while removing directories will
be ignored.
"""
if self.check(dir=1, link=0):
if rec:
# force remove of readonly files on windows
if iswin32:
self.chmod(448, rec=1) # octcal 0700
py.error.checked_call(py.std.shutil.rmtree, self.strpath,
ignore_errors=ignore_errors)
else:
py.error.checked_call(os.rmdir, self.strpath)
else:
if iswin32:
self.chmod(448) # octcal 0700
py.error.checked_call(os.remove, self.strpath)
def testFindBrainClasses(self):
brainDir = "tmp-test-brains"
brainModule1 = "barBrain"
brainClass1 = "BarBrain"
brainModule2 = "fooBrain"
brainClass2 = "FooBrain"
brainFile1 = os.path.join(brainDir, "%s.py" % brainModule1)
brainFile2 = os.path.join(brainDir, "%s.py" % brainModule2)
os.mkdir(brainDir)
open(brainFile1, "w+").write("\n")
open(brainFile2, "w+").write("\n")
expected = [brainModule1 + "." + brainClass1, brainModule2 + "." + brainClass2]
try:
self.assertEqual(sorted(expected), sorted(simLauncher.SimulatorLauncher.findBrainClasses(brainDir)))
finally:
os.unlink(brainFile1)
os.unlink(brainFile2)
os.rmdir(brainDir)
def testWriteMapFile_writesstringIntoFile(self):
dirname = "testWriteMapFile"
filename = "test.txt"
filepath = os.path.join(dirname, filename)
os.mkdir(dirname)
mObj = gameMap.GameMap()
mObj.loadMapFile("maps/test-room2-l-shape.txt")
expected = mObj.toText()
mObj.writeMapFile(filepath)
actual = None
if os.path.exists(filepath):
actual = open(filepath, "r").read()
try:
self.assertEqual(expected, actual)
finally:
if os.path.exists(filepath):
os.unlink(filepath)
os.rmdir(dirname)
def cleanAll(path=None):
trash = getTrashFolder(path)
if not os.path.isdir(trash):
print "[Trashcan] No trash.", trash
return 0
for root, files in os.walk(trash, topdown=False):
for name in files:
fn = os.path.join(root, name)
try:
enigma.eBackgroundFileEraser.getInstance().erase(fn)
except Exception, e:
print "[Trashcan] Failed to erase %s:"% name, e
# Remove empty directories if possible
for name in dirs:
try:
os.rmdir(os.path.join(root, name))
except:
pass
def destroy(self):
"""Removes any files in this storage object and then removes the
storage object's directory. What happens if any of the files or the
directory are in use depends on the underlying platform.
"""
# Remove all files
self.clean()
try:
# Try to remove the directory
os.rmdir(self.folder)
except IOError:
e = sys.exc_info()[1]
if e.errno == errno.ENOENT:
pass
else:
raise e
def __del__(self):
if os.path.isfile(os.path.join(self.rootdir, 'GAMIT.status')):
os.remove(os.path.join(self.rootdir, 'GAMIT.status'))
if os.path.isfile(os.path.join(self.rootdir, 'GAMIT.fatal')):
os.remove(os.path.join(self.rootdir, 'GAMIT.fatal'))
if os.path.isfile(os.path.join(self.rootdir, 'Grdtab.out')):
os.remove(os.path.join(self.rootdir, 'Grdtab.out'))
if os.path.isfile(os.path.join(self.rootdir, 'harpos.' + self.StationCode)):
os.remove(os.path.join(self.rootdir, 'harpos.' + self.StationCode))
if os.path.isfile(os.path.join(self.rootdir, 'otl.grid')):
os.remove(os.path.join(self.rootdir, 'otl.grid'))
if os.path.isfile(os.path.join(self.rootdir, 'ufile.' + self.StationCode)):
os.remove(os.path.join(self.rootdir, 'ufile.' + self.StationCode))
if os.path.isdir(self.rootdir):
os.rmdir(self.rootdir)
def remove_empty_folders(folder):
for dirpath, _, files in os.walk(folder, topdown=False): # Listing the files
for file in files:
if file.endswith('DS_Store'):
# delete the stupid mac files
try:
os.remove(os.path.join(dirpath, file))
except:
sys.exc_clear()
if dirpath == folder:
break
try:
os.rmdir(dirpath)
except OSError:
sys.exc_clear()
return
def contain(command, image_name, image_dir, container_id, container_dir):
linux.unshare(linux.CLONE_NEWNS) # create a new mount namespace
linux.mount(None, '/', None, linux.MS_PRIVATE | linux.MS_REC, None)
new_root = create_container_root(
image_name, container_dir)
print('Created a new root fs for our container: {}'.format(new_root))
_create_mounts(new_root)
old_root = os.path.join(new_root, 'old_root')
os.makedirs(old_root)
linux.pivot_root(new_root, old_root)
os.chdir('/')
linux.umount2('/old_root', linux.MNT_DETACH) # umount old root
os.rmdir('/old_root') # rmdir the old_root dir
os.execvp(command[0], command)
def contain(command, container_dir):
linux.unshare(linux.CLONE_NEWNS) # create a new mount namespace
# Todo: switch to a new UTS namespace,change hostname to container_id
# HINT: use linux.sethostname()
linux.mount(None, linux.MNT_DETACH) # umount old root
os.rmdir('/old_root') # rmdir the old_root dir
os.execvp(command[0], command)
def contain(command, container_dir):
linux.sethostname(container_id) # change hostname to container_id
linux.mount(None, command)
def clear_cache_directory():
for root, files in os.walk(download_cache_directory):
for d in dirs:
if d == '.git':
dirpath = os.path.join(root,d)
# print('rm dir ' + dirpath)
try:
os.rmdir(dirpath)
except:
pass
for f in files:
if f.endswith('.h') or f.endswith('.m'):
continue
filepath = os.path.join(root, f)
#remove
try:
# print('remove ' + filepath)
os.remove(filepath)
except:
pass
def tree_delete_selected(self):
try:
item = self.parent.widget_tree.item(self.parent.widget_tree.focus())["tags"][0]
os.remove(item)
print("{} | Deleting: {}".format(datetime.Now().strftime("%H:%M:%s"), item))
except FileNotFoundError:
for root, files in os.walk(self.parent.widget_tree.item(self.parent.widget_tree.focus())["tags"][1],
topdown=False):
for name in files:
os.remove(os.path.join(root, name))
print("{} | Deleting: {}".format(datetime.Now().strftime("%H:%M:%s"), name))
item = self.parent.widget_tree.item(self.parent.widget_tree.focus())["tags"][1]
os.rmdir(item)
print("{} | Deleting: {}".format(datetime.Now().strftime("%H:%M:%s"), item))
self.parent.cmd.tree_refresh()
def run(parser, args):
tmpdir = tempfile.mkdtemp(dir='.')
cmd = ("porechop --verbosity 2 --untrimmed -i \"%s\" -b %s --barcode_threshold 80 --threads %s --check_reads 10000 --barcode_diff 5 --require_two_barcodes > %s.demultiplexreport.txt" % (args.fasta, tmpdir, args.threads, args.fasta))
print >>sys.stderr, cmd
os.system(cmd)
a, b = os.path.split(args.fasta)
prefix, ext = os.path.splitext(b)
for fn in os.listdir(tmpdir):
newfn = "%s-%s" % (prefix, os.path.basename(fn))
shutil.move(tmpdir + '/' + fn, newfn)
os.system("gunzip -f %s" % (newfn,))
if not args.no_remove_directory:
os.rmdir(tmpdir)
def get_key(self, name):
try:
# pull it down from ipfs
self.api.get('{}/names/{}'.format(self.root_hash, name))
# stores in cwdir,so load the files into memory and delete them
n = self.file_to_memory('{}/{}/n'.format(os.getcwd(), name))
e = self.file_to_memory('{}/{}/e'.format(os.getcwd(), name))
os.rmdir('{}/{}'.format(os.getcwd(), name))
return (n, e)
except:
return False