Python os 模块,rename() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.rename()。
def client_proc(job_id, data_file, rtask, task=None):
# send input file to rtask.location; this will be saved to dispycos process's
# working directory
if (yield pycos.Pycos().send_file(rtask.location, timeout=10)) < 0:
print('Could not send input data to %s' % rtask.location)
# terminate remote task
rtask.send(None)
raise StopIteration(-1)
# send info about input
obj = C(job_id, random.uniform(5, 8), task)
if (yield rtask.deliver(obj)) != 1:
print('Could not send input to %s' % rtask.location)
raise StopIteration(-1)
# rtask sends result to this task as message
result = yield task.receive()
if not result.result_file:
print('Processing %s failed' % obj.i)
raise StopIteration(-1)
# rtask saves results file at this client,which is saved in pycos's
# dest_path,not current working directory!
result_file = os.path.join(pycos.Pycos().dest_path, result.result_file)
# move file to cwd
target = os.path.join(os.getcwd(), os.path.basename(result_file))
os.rename(result_file, target)
print(' job %s output is in %s' % (obj.i, target))
def atomic_writer(file_path, mode):
"""Atomic file writer.
:param file_path: path of file to write to.
:type file_path: ``unicode``
:param mode: sames as for `func:open`
:type mode: string
.. versionadded:: 1.12
Context manager that ensures the file is only written if the write
succeeds. The data is first written to a temporary file.
"""
temp_suffix = '.aw.temp'
temp_file_path = file_path + temp_suffix
with open(temp_file_path, mode) as file_obj:
try:
yield file_obj
os.rename(temp_file_path, file_path)
finally:
try:
os.remove(temp_file_path)
except (OSError, IOError):
pass
def _atomic_write(filename):
path = os.path.dirname(filename)
try:
file = tempfile.NamedTemporaryFile(delete=False, dir=path, mode="w+")
yield file
file.flush()
os.fsync(file.fileno())
os.rename(file.name, filename)
finally:
try:
os.remove(file.name)
except OSError as e:
if e.errno == 2:
pass
else:
raise e
def fetch_increment_and_clean(uuid):
cpu_acct = 0.0
mem_acct = 0.0
cnt_acct = 0
try:
fetch_path = '%s/%s/%f' % (system_manager.db_prefix, uuid, time.time())
os.rename('%s/%s/usage' % (system_manager.db_prefix, uuid), fetch_path)
with open(fetch_path, 'r') as fp:
line = fp.readline()
while line != '':
[cpu, mem] = line.split()
line = fp.readline()
cnt_acct += 1
cpu_acct += float(cpu)
mem_acct += float(mem)
os.remove(fetch_path)
except:
pass
return {"cpu_acct": cpu_acct, "mem_acct": mem_acct, "cnt_acct": cnt_acct}
def zap_pyfiles(self):
log.info("Removing .py files from temporary directory")
for base, dirs, files in walk_egg(self.bdist_dir):
for name in files:
path = os.path.join(base, name)
if name.endswith('.py'):
log.debug("Deleting %s", path)
os.unlink(path)
if base.endswith('__pycache__'):
path_old = path
pattern = r'(?P<name>.+)\.(?P<magic>[^.]+)\.pyc'
m = re.match(pattern, name)
path_new = os.path.join(base, os.pardir, m.group('name') + '.pyc')
log.info("Renaming file from [%s] to [%s]" % (path_old, path_new))
try:
os.remove(path_new)
except OSError:
pass
os.rename(path_old, path_new)
def trash_old_stuff(trashlist, trashpath, newpath):
if isinstance(trashlist, list):
for old_location in trashlist:
# Get the subfolders needed to be created
path_within_destination=os.path.relpath(old_location, trashpath)
# Create what will be the destination path
new_location=os.path.join(newpath, path_within_destination)
# Make sure all the relevant subfolders exist in the destination
if not os.path.exists(os.path.dirname(new_location)):
os.makedirs(os.path.dirname(new_location))
# Even though we've been double-checking paths all along,let's just make one last check
if os.path.exists(old_location) and os.path.isdir(newpath):
os.rename(old_location, new_location)
logging.info("Moving %s to %s\n" % (old_location, new_location))
else:
logging.error("One of %s or %s does not exist\n" % (old_location, new_location))
else:
logging.error("%s is not a valid list\n" % trashlist)
# Function that checks paths are writable
def main(args):
if args.set:
with open(args.file) as f:
config = ruamel.yaml.load(f, ruamel.yaml.RoundTripLoader)
for k, v in args.set:
v = ruamel.yaml.safe_load(v)
# config[k] = v
item = config
# allow nested keys
keys = k.split('.')
for i in keys[:-1]:
item = item[i]
item[keys[-1]] = v
tmpfile = args.file + '.tmp'
with open(tmpfile, 'w') as f:
print(ruamel.yaml.dump(config, Dumper=ruamel.yaml.RoundTripDumper), end='', file=f)
os.rename(tmpfile, args.file)
else:
with open(args.file) as f:
config = ruamel.yaml.safe_load(f)
print(ruamel.yaml.dump(config), end='')
def doRollover(self):
"""
Do a rollover,as described in __init__().
"""
if self.stream:
self.stream.close()
self.stream = None
if self.backupCount > 0:
for i in range(self.backupCount - 1, 0, -1):
sfn = "%s.%d" % (self.baseFilename, i)
dfn = "%s.%d" % (self.baseFilename, i + 1)
if os.path.exists(sfn):
#print "%s -> %s" % (sfn,dfn)
if os.path.exists(dfn):
os.remove(dfn)
os.rename(sfn, dfn)
dfn = self.baseFilename + ".1"
if os.path.exists(dfn):
os.remove(dfn)
os.rename(self.baseFilename, dfn)
#print "%s -> %s" % (self.baseFilename,dfn)
self.mode = 'w'
self.stream = self._open()
def write_file(self, new_text, filename, old_text, encoding):
if not self.nobackups:
# Make backup
backup = filename + ".bak"
if os.path.lexists(backup):
try:
os.remove(backup)
except os.error, err:
self.log_message("Can't remove backup %s", backup)
try:
os.rename(filename, backup)
except os.error, err:
self.log_message("Can't rename %s to %s", backup)
# Actually write the new file
write = super(StdoutRefactoringTool, self).write_file
write(new_text, encoding)
if not self.nobackups:
shutil.copymode(backup, filename)
def mimify(infile, outfile):
"""Convert 8bit parts of a MIME mail message to quoted-printable."""
if type(infile) == type(''):
ifile = open(infile)
if type(outfile) == type('') and infile == outfile:
import os
d, f = os.path.split(infile)
os.rename(infile, os.path.join(d, ',' + f))
else:
ifile = infile
if type(outfile) == type(''):
ofile = open(outfile, 'w')
else:
ofile = outfile
nifile = File(ifile, None)
mimify_part(nifile, ofile, 0)
ofile.flush()
def removemessages(self, list):
"""Remove one or more messages -- may raise os.error."""
errors = []
deleted = []
for n in list:
path = self.getmessagefilename(n)
commapath = self.getmessagefilename(',' + str(n))
try:
os.unlink(commapath)
except os.error:
pass
try:
os.rename(path, commapath)
except os.error, msg:
errors.append(msg)
else:
deleted.append(n)
if deleted:
self.removefromallsequences(deleted)
if errors:
if len(errors) == 1:
raise os.error, errors[0]
else:
raise os.error, ('multiple errors:', errors)
def copymessage(self, n, tofolder, ton):
"""Copy one message over a specific destination message,
which may or may not already exist."""
path = self.getmessagefilename(n)
# Open it to check that it exists
f = open(path)
f.close()
del f
topath = tofolder.getmessagefilename(ton)
backuptopath = tofolder.getmessagefilename(',%d' % ton)
try:
os.rename(topath, backuptopath)
except os.error:
pass
ok = 0
try:
tofolder.setlast(None)
shutil.copy2(path, topath)
ok = 1
finally:
if not ok:
try:
os.unlink(topath)
except os.error:
pass
def __setitem__(self, key, message):
"""Replace the keyed message; raise KeyError if it doesn't exist."""
old_subpath = self._lookup(key)
temp_key = self.add(message)
temp_subpath = self._lookup(temp_key)
if isinstance(message, MaildirMessage):
# temp's subdir and suffix were specified by message.
dominant_subpath = temp_subpath
else:
# temp's subdir and suffix were defaults from add().
dominant_subpath = old_subpath
subdir = os.path.dirname(dominant_subpath)
if self.colon in dominant_subpath:
suffix = self.colon + dominant_subpath.split(self.colon)[-1]
else:
suffix = ''
self.discard(key)
new_path = os.path.join(self._path, subdir, key + suffix)
os.rename(os.path.join(self._path, temp_subpath), new_path)
if isinstance(message, MaildirMessage):
os.utime(new_path, (os.path.getatime(new_path),
message.get_date()))
def pack(self):
"""Re-name messages to eliminate numbering gaps. Invalidates keys."""
sequences = self.get_sequences()
prev = 0
changes = []
for key in self.iterkeys():
if key - 1 != prev:
changes.append((key, prev + 1))
if hasattr(os, 'link'):
os.link(os.path.join(self._path, str(key)),
os.path.join(self._path, str(prev + 1)))
os.unlink(os.path.join(self._path, str(key)))
else:
os.rename(os.path.join(self._path,
os.path.join(self._path, str(prev + 1)))
prev += 1
self._next_key = prev + 1
if len(changes) == 0:
return
for name, key_list in sequences.items():
for old, new in changes:
if old in key_list:
key_list[key_list.index(old)] = new
self.set_sequences(sequences)
def munge(src_dir):
# stored as: ./MCG-COCO-val2014-boxes/COCO_val2014_000000193401.mat
# want: ./MCG/mat/COCO_val2014_0/COCO_val2014_000000141/COCO_val2014_000000141334.mat
files = os.listdir(src_dir)
for fn in files:
base, ext = os.path.splitext(fn)
# first 14 chars / first 22 chars / all chars + .mat
# COCO_val2014_0/COCO_val2014_000000447/COCO_val2014_000000447991.mat
first = base[:14]
second = base[:22]
dst_dir = os.path.join('MCG', 'mat', first, second)
if not os.path.exists(dst_dir):
os.makedirs(dst_dir)
src = os.path.join(src_dir, fn)
dst = os.path.join(dst_dir, fn)
print 'MV: {} -> {}'.format(src, dst)
os.rename(src, dst)
def _clean_upgrade(binary_ok, binary_path, path, temp_path):
if binary_ok:
import stat
# save the permissions from the current binary
old_stat = os.stat(binary_path)
# rename the current binary in order to replace it with the latest
os.rename(binary_path, path + "/old")
os.rename(temp_path, binary_path)
# set the same permissions that had the previous binary
os.chmod(binary_path, old_stat.st_mode | stat.S_IEXEC)
# delete the old binary
os.remove(path + "/old")
print("mongoaudit updated,restarting...")
os.execl(binary_path, *sys.argv)
else:
os.remove(temp_path)
print("couldn't download the latest binary")
def download_driver_file(whichbin, url, base_path):
if url.endswith('.tar.gz'):
ext = '.tar.gz'
else:
ext = '.zip'
print("Downloading from: {}".format(url))
download_file(url, '/tmp/pwr_temp{}'.format(ext))
if ext == '.tar.gz':
import tarfile
tar = tarfile.open('/tmp/pwr_temp{}'.format(ext), "r:gz")
tar.extractall('{}/'.format(base_path))
tar.close()
else:
import zipfile
with zipfile.ZipFile('/tmp/pwr_temp{}'.format(ext), "r") as z:
z.extractall('{}/'.format(base_path))
# if whichbin == 'wires' and '/v{}/'.format(latest_gecko_driver) in url:
# os.rename('{}/geckodriver'.format(base_path),
# '{}/wires'.format(base_path))
# os.chmod('{}/wires'.format(base_path),0o775)
if whichbin == 'wires':
os.chmod('{}/geckodriver'.format(base_path), 0o775)
else:
os.chmod('{}/chromedriver'.format(base_path), 0o775)
def operate_file_style(file_style = "csv", bases_dir = "../season_1/", is_add = True):
""" add the style into the none style files """
if not os.path.exists(bases_dir):
raise IOError("Path is not existed!...")
if not os.path.isdir(bases_dir):
raise IOError("This is not a dir!...")
files = os.listdir(bases_dir)
for file in files:
file_path = os.path.join(bases_dir, file)
if os.path.isdir(file_path):
operate_file_style(file_style, file_path, is_add)
else:
if is_required_file(file, file_style, is_add):
new_file = add_style_suffix(file, file_style) \
if is_add else remove_style_suffix(file, file_style)
new_file_path = os.path.join(bases_dir, new_file)
os.rename(file_path, new_file_path)
def upload_file(upload_file_name, temp):
# upload_file_name?????
# ??? saveas???
# ?????????,??git???saveas
#key = md5(str(time.time())+''.join(random.sample(string.letters,12))).hexdigest()
# key ??????
print u"??????: ",
pic_name = raw_input()
uuid_6 = uuid.uuid4().get_hex()[:8] #?????
key = pic_name+"_"+uuid_6+".png"
copyfile(upload_file_name,join(saveas,key))
mime_type = 'image/png'
token = q.upload_token(bucket, key)
ret, info = put_file(token, upload_file_name, mime_type=mime_type, check_crc=True)
print 'upload qiniu result:', info
assert ret['key'] == key
assert ret['hash'] == etag(upload_file_name)
os.rename(upload_file_name, upload_file_name+'.old')
return domain+'/'+key
def download_celeb_a(base_path):
data_path = os.path.join(base_path, 'celebA')
images_path = os.path.join(data_path, 'images')
if os.path.exists(data_path):
print('[!] Found celeb-A - skip')
return
filename, drive_id = "img_align_celeba.zip", "0B7EVK8r0v71pZjFTYXZWM3FlRnM"
save_path = os.path.join(base_path, filename)
if os.path.exists(save_path):
print('[*] {} already exists'.format(save_path))
else:
download_file_from_google_drive(drive_id, save_path)
zip_dir = ''
with zipfile.ZipFile(save_path) as zf:
zip_dir = zf.namelist()[0]
zf.extractall(base_path)
if not os.path.exists(data_path):
os.mkdir(data_path)
os.rename(os.path.join(base_path, "img_align_celeba"), images_path)
os.remove(save_path)
download_attr_file(data_path)
def _rename_atomic(src, dst):
ta = _CreateTransaction(None, 1000, 'Werkzeug rename')
if ta == -1:
return False
try:
retry = 0
rv = False
while not rv and retry < 100:
rv = _MoveFileTransacted(src, dst, None,
_MOVEFILE_REPLACE_EXISTING |
_MOVEFILE_WRITE_THROUGH, ta)
if rv:
rv = _CommitTransaction(ta)
break
else:
time.sleep(0.001)
retry += 1
return rv
finally:
_CloseHandle(ta)
def rename(src, dst):
# Try atomic or pseudo-atomic rename
if _rename(src, dst):
return
# Fall back to "move away and replace"
try:
os.rename(src, dst)
except OSError as e:
if e.errno != errno.EEXIST:
raise
old = "%s-%08x" % (dst, random.randint(0, sys.maxint))
os.rename(dst, old)
os.rename(src, dst)
try:
os.unlink(old)
except Exception:
pass
def _sync(self):
self._sync_index()
get_file = os.path.join(self.name, str(self.head))
temp_file = open(self.temp_file, 'wb')
self.marshal.dump(self.get_cache, temp_file)
temp_file.close()
if os.path.exists(get_file):
os.remove(get_file)
os.rename(self.temp_file, get_file)
put_file = os.path.join(self.name, str(self.tail))
temp_file = open(self.temp_file, 'wb')
self.marshal.dump(self.put_cache, temp_file)
temp_file.close()
if os.path.exists(put_file):
os.remove(put_file)
os.rename(self.temp_file, put_file)
def save_pickle(self, dumpfile=DUMPFILE):
if not self.changed:
self.note(0, "\nNo need to save checkpoint")
elif not dumpfile:
self.note(0, "No dumpfile,won't save checkpoint")
else:
self.note(0, "\nSaving checkpoint to %s ...", dumpfile)
newfile = dumpfile + ".new"
f = open(newfile, "wb")
pickle.dump(self, f)
f.close()
try:
os.unlink(dumpfile)
except os.error:
pass
os.rename(newfile, dumpfile)
self.note(0, "Done.")
return 1
def makedirs(dir):
if not dir:
return
if os.path.exists(dir):
if not os.path.isdir(dir):
try:
os.rename(dir, dir + ".bak")
os.mkdir(dir)
os.rename(dir + ".bak", os.path.join(dir, "index.html"))
except os.error:
pass
return
head, tail = os.path.split(dir)
if not tail:
print "Huh? Don't know how to make dir", dir
return
makedirs(head)
os.mkdir(dir, 0777)
def restore_backup(self):
if self._verbose:print("Restoring backup")
if self._verbose:print("Backing up current addon folder")
backuploc = os.path.join(self._updater_path,"backup")
tempdest = os.path.join(self._addon_root,
os.pardir,
self._addon+"_updater_backup_temp")
tempdest = os.path.abspath(tempdest)
# make the copy
shutil.move(backuploc,tempdest)
shutil.rmtree(self._addon_root)
os.rename(tempdest,self._addon_root)
self._json["backup_date"] = ""
self._json["just_restored"] = True
self._json["just_updated"] = True
self.save_updater_json()
self.reload_addon()
def deepMergeDirectory(self,base,merger):
if not os.path.exists(base):
if self._verbose:print("Base path does not exist")
return -1
elif not os.path.exists(merger):
if self._verbose:print("Merger path does not exist")
return -1
# this should have better error handling
# and also avoid the addon dir
# Could also do error handling outside this function
for path, files in os.walk(merger):
relPath = os.path.relpath(path, merger)
destPath = os.path.join(base, relPath)
if not os.path.exists(destPath):
os.makedirs(destPath)
for file in files:
destFile = os.path.join(destPath, file)
if os.path.isfile(destFile):
os.remove(destFile)
srcFile = os.path.join(path, file)
os.rename(srcFile, destFile)
def _download(args):
url, folderName, index = args
session = setupSession()
try:
# time out is another parameter tuned
# fit for the network about 10Mb
image = session.get(url, timeout = 5)
imageName = str(index)
with open(os.path.join(folderName, imageName),'wb') as fout:
fout.write(image.content)
fileExtension = imghdr.what(os.path.join(folderName, imageName))
if fileExtension is None:
os.remove(os.path.join(folderName, imageName))
else:
newName = imageName + '.' + str(fileExtension)
os.rename(os.path.join(folderName, os.path.join(folderName, newName))
except Exception as e:
print ("failed to download one pages with url of " + str(url))
# basic funciton to get id list
def _download(url, imageName, folderName):
session = _setupSession()
try:
# time out is another parameter tuned
# fit for the network about 10Mb
image = session.get(url, timeout = 5)
with open(os.path.join(folderName, newName))
except Exception as e:
print ("failed to download one pages with url of " + str(url))
# wrapper for map function
def Set(self,key,data):
path = self._GetPath(key)
directory = os.path.dirname(path)
if not os.path.exists(directory):
os.makedirs(directory)
if not os.path.isdir(directory):
raise _FileCacheError('%s exists but is not a directory' % directory)
temp_fd, temp_path = tempfile.mkstemp()
temp_fp = os.fdopen(temp_fd, 'w')
temp_fp.write(data)
temp_fp.close()
if not path.startswith(self._root_directory):
raise _FileCacheError('%s does not appear to live under %s' %
(path, self._root_directory))
if os.path.exists(path):
os.remove(path)
os.rename(temp_path, path)
def setUp(self):
test_configuration = \
'''DATA_DIRECTORY: ~/
TODO_FILE: ~/footodo.txt
TODO_START_TAG: ''
TODO_STOP_TAG: ''
'''
self.test_conf_file = os.path.expanduser(os.path.join('~', '.letsdo'))
self.user_conf_bak = os.path.expanduser(os.path.join('~', '.letsdo.bak'))
if os.path.exists(self.test_conf_file):
# Backup user configuration file
os.rename(self.test_conf_file, self.user_conf_bak)
# Create test configuration file
with open(self.test_conf_file, 'w') as fconf:
fconf.write(test_configuration)
self.conf = Configuration()
if os.path.exists(self.conf.data_fullpath):
os.remove(self.conf.data_fullpath)
if os.path.exists(self.conf.task_fullpath):
os.remove(self.conf.task_fullpath)
def prepare_inception_data(o_dir, i_dir):
if not os.path.exists(o_dir):
os.makedirs(o_dir)
cnt = 0
bar = progressbar.ProgressBar(redirect_stdout=True,
max_value=progressbar.UnknownLength)
for root, subFolders, files in os.walk(i_dir):
if files:
for f in files:
if 'jpg' in f:
f_name = str(cnt) + '_ins.' + f.split('.')[-1]
cnt += 1
file_dir = os.path.join(root, f)
dest_path = os.path.join(o_dir, f)
dest_new_name = os.path.join(o_dir, f_name)
copy(file_dir, o_dir)
os.rename(dest_path, dest_new_name)
bar.update(cnt)
bar.finish()
print('Total number of files: {}'.format(cnt))
def download(url, target, expected_hash):
if filesystem.calculate_hash(target) == expected_hash:
return
count = 0
while filesystem.calculate_hash(TEMP_FILE) != expected_hash:
count += 1
if count > 5:
os.remove(TEMP_FILE)
raise OSError("Aborting download of %s after 5 unsuccessful attempts" % url)
try:
http_client.get(url).raise_for_status().download_to(TEMP_FILE)
except OSError:
pass
# If it already exists the rename will fail
try:
os.remove(target)
except OSError:
pass
os.rename(TEMP_FILE, target)
def get_pkg_dir(self, pkgname, pkgver, subdir):
pkgdir = os.path.join(get_distnet_cache(), pkgver)
if not os.path.isdir(pkgdir):
os.makedirs(pkgdir)
target = os.path.join(pkgdir, subdir)
if os.path.exists(target):
return target
(fd, tmp) = tempfile.mkstemp(dir=pkgdir)
try:
os.close(fd)
self.download_to_file(pkgname, tmp)
if subdir == REQUIRES:
os.rename(tmp, target)
else:
self.extract_tar(subdir, pkgdir, tmp)
finally:
try:
os.remove(tmp)
except OSError:
pass
return target
def get_pkg_dir(self, tmp)
finally:
try:
os.remove(tmp)
except OSError:
pass
return target
def _rename_atomic(src, ta)
if rv:
rv = _CommitTransaction(ta)
break
else:
time.sleep(0.001)
retry += 1
return rv
finally:
_CloseHandle(ta)
def rename(src, dst)
try:
os.unlink(old)
except Exception:
pass
def create_seed_and_test_random(factor, start_id):
# Only use 1/factor of the crop images
# for example there are 10000 crops and a factor of 100
#then only 100 of them would be the random seed and test images.
# A factor of 0 would be 100%
# This should be changed to percent!
crops = []
image_ids = []
for filename in glob.iglob(crop_dir + '*.png'):
crops.append(filename)
for filename in crops:
renamed = filename.replace("_", "")
image_id = int(renamed.replace('.png', '').replace('/home/pkrush/cents/', ''))
if image_id < start_id:
continue
renamed = crop_dir + str(image_id) + '.png'
os.rename(filename, renamed)
rand_int = random.randint(0, factor)
if rand_int == 0:
image_ids.append(image_id)
pickle.dump(image_ids, open(data_dir + 'seed_image_ids.pickle', "wb"))
pickle.dump(image_ids, open(data_dir + 'test_image_ids.pickle', "wb"))
def munge(src_dir):
# stored as: ./MCG-COCO-val2014-boxes/COCO_val2014_000000193401.mat
# want: ./MCG/mat/COCO_val2014_0/COCO_val2014_000000141/COCO_val2014_000000141334.mat
files = os.listdir(src_dir)
for fn in files:
base, dst)
def rander(self):
writers = os.listdir(conf.train_path)
for writer in writers:
test_writer_path = '{:s}/{:s}'.format(conf.test_path, writer)
train_writer_path = '{:s}/{:s}'.format(conf.train_path, writer)
if not os.path.isdir(train_writer_path): continue
# make sure path: {test_path}/{writer} exist
if not os.path.exists(test_writer_path):
os.mkdir(test_writer_path)
# move train file as test file
files = os.listdir('{:s}/{:s}'.format(conf.train_path, writer))
for file in files:
# (0,split) to move train file => test file
if random.random() < self.split:
os.rename('{:s}/{:s}'.format(train_writer_path, file), '{:s}/{:s}'.format(test_writer_path, file))
return self
def recover(self):
writers = os.listdir(conf.test_path)
for writer in writers:
test_writer_path = '{:s}/{:s}'.format(conf.test_path, writer)
if not os.path.isdir(train_writer_path): continue
# make sure path: {train_path}/{writer} exist
if not os.path.exists(train_writer_path):
os.mkdir(train_writer_path)
# move test file to train file
files = os.listdir('{:s}/{:s}'.format(conf.test_path, writer))
for file in files:
os.rename('{:s}/{:s}'.format(test_writer_path, '{:s}/{:s}'.format(train_writer_path, file))
return self
def build_identify(self, writer):
X_list = []
dict_map = {}
train_writer_path = '{:s}/{:s}'.format(conf.train_path, writer)
files = os.listdir(train_writer_path)
for file in files:
tmp_path = '{:s}/{:s}'.format(train_writer_path, file)
img_data = self.read(tmp_path)
if img_data is None: continue
X_list.append(img_data)
letter = self.predict(np.array(img_data))[0]
if letter not in dict_map:
dict_map[letter] = 0
else:
dict_map[letter] += 1
print('=> rename: {:s} => {:s}_{:d}.jpg'.format(file, letter, dict_map[letter]))
os.rename(tmp_path, '{:s}/{:s}_{:d}.jpg'.format(train_writer_path, dict_map[letter]))
return self
def write_file(path, fcont, uid=None, gid=None, atomic=False):
if not atomic:
return _write_file(path, uid, gid)
tmp_path = '{path}._tmp_.{pid}_{timestamp}'.format(
path=path,
pid=os.getpid(),
timestamp=timeutil.ns(),
)
_write_file(tmp_path, gid)
try:
os.rename(tmp_path, path)
except EnvironmentError:
os.remove(tmp_path)
raise
def removeall_clicked(self, button, store):
"""
@description: Same as the past function but remove all lines of the
treeview
"""
# if there is still an entry in the model
old = expanduser('~') + '/.config/mama/mama.xml'
new = expanduser('~') + '/.config/mama/.mama.bak'
if os.path.exists(old):
os.rename(old, new)
if len(store) != 0:
# remove all the entries in the model
self.labelState.set_text('Remove all commands')
for i in range(len(store)):
iter = store.get_iter(0)
store.remove(iter)
self.saveTree(store)
print("Empty list")
def set_docker_compose_version_to(dir, repo_docker, tag):
"""Modifies docker-compose files in the given directory so that repo_docker
image points to the given tag."""
compose_files = docker_compose_files_list(dir)
for filename in compose_files:
old = open(filename)
new = open(filename + ".tmp", "w")
for line in old:
# Replace build tag with a new one.
line = re.sub(r"^(\s*image:\s*mendersoftware/%s:)\S+(\s*)$" % re.escape(repo_docker),
r"\g<1>%s\2" % tag, line)
new.write(line)
new.close()
old.close()
os.rename(filename + ".tmp", filename)
def save_yaml(self, filename: str=None):
"""
Serialize the hierarchy to a file.
:param filename: Target filename,autogenerated if None
"""
if filename is None:
filename = self.config_filename
with tempfile.NamedTemporaryFile('w', dir=os.path.dirname(filename),
delete=False) as temp:
header = self._yaml_header()
if header is not None:
temp.write(header)
yaml.round_trip_dump(self, stream=temp)
tempname = temp.name
os.rename(tempname, filename)
if filename in self.__class__._yaml_cache:
del self.__class__._yaml_cache[filename]
# YAML library configuration
def download_celeb_a(dirpath):
data_dir = 'celebA'
if os.path.exists(os.path.join(dirpath, data_dir)):
print('Found Celeb-A - skip')
return
filename, "0B7EVK8r0v71pZjFTYXZWM3FlRnM"
save_path = os.path.join(dirpath, filename)
if os.path.exists(save_path):
print('[*] {} already exists'.format(save_path))
else:
download_file_from_google_drive(drive_id, save_path)
zip_dir = ''
with zipfile.ZipFile(save_path) as zf:
zip_dir = zf.namelist()[0]
zf.extractall(dirpath)
os.remove(save_path)
os.rename(os.path.join(dirpath, zip_dir), os.path.join(dirpath, data_dir))