Python os 模块,utime() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.utime()。
def __setitem__(self, key, message):
"""Replace the keyed message; raise KeyError if it doesn't exist."""
old_subpath = self._lookup(key)
temp_key = self.add(message)
temp_subpath = self._lookup(temp_key)
if isinstance(message, MaildirMessage):
# temp's subdir and suffix were specified by message.
dominant_subpath = temp_subpath
else:
# temp's subdir and suffix were defaults from add().
dominant_subpath = old_subpath
subdir = os.path.dirname(dominant_subpath)
if self.colon in dominant_subpath:
suffix = self.colon + dominant_subpath.split(self.colon)[-1]
else:
suffix = ''
self.discard(key)
new_path = os.path.join(self._path, subdir, key + suffix)
os.rename(os.path.join(self._path, temp_subpath), new_path)
if isinstance(message, MaildirMessage):
os.utime(new_path, (os.path.getatime(new_path),
message.get_date()))
def _caches_to_file(cache_path, start, end, name, cb, concat):
start_time = time()
if concat:
all_data = []
for i in range(start, end):
data = load(os.path.join(cache_path, "{0}.jb".format(i)))
all_data.extend(data)
dump(all_data, 3)
else:
target_path = os.path.join(cache_path, name[:-3])
if not os.path.exists(target_path):
os.makedirs(target_path)
for i in range(start, end):
src_file_path = os.path.join(cache_path, "{0}.jb".format(i))
basename = os.path.basename(src_file_path)
target_file_path = os.path.join(target_path, basename)
shutil.move(src_file_path, target_file_path)
finished_flag = os.path.join(target_path, '.finished')
with open(finished_flag, 'a'):
os.utime(finished_flag, None)
logging.debug("Finished saving data to {0}. Took {1}s".format(name, time()-start_time))
cb()
def run_if_needed(base_file, then_file, now_file):
# Python uses doubles for mtime so it can't precisely represent linux's
# nanosecond precision. Round up to next whole second to ensure we get a
# stable timestamp that is guaranteed to be >= the timestamp of the
# compiler. This also avoids issues if the compiler is on a file system
# with high-precision timestamps,but the build directory isn't.
base_stat = os.stat(base_file)
mtime = math.ceil(base_stat.st_mtime)
atime = math.ceil(base_stat.st_atime)
if (os.path.exists(then_file)
and os.path.exists(now_file)
and os.stat(then_file).st_mtime == mtime):
return # Don't need to do anything.
createIfNeeded(now_file)
os.utime(now_file, None) # None means now
createIfNeeded(then_file)
os.utime(then_file, (atime, mtime))
def test_file_save_outside_roamer(self):
digest = self.session.get_digest('egg.txt')
path = os.path.join(TEST_DIR, 'egg.txt')
with open(path, 'a') as egg_file:
egg_file.write(' extra content')
os.utime(path, (1330712280, 1330712292))
self.session.process()
second_session = MockSession(DOC_DIR)
second_session.add_entry('egg.txt', digest)
second_session.process()
path = os.path.join(DOC_DIR, 'egg.txt')
self.assertTrue(os.path.exists(path))
with open(path, 'r') as egg_file:
self.assertEqual(egg_file.read(), 'egg file content extra content')
def copystat(cls, src, dest, copy_own=True, copy_xattr=True):
"""
Copy all stat info (mode bits,atime,mtime,flags) from `src` to
`dest`. If `copy_own=True`,the uid and gid are also copied.
If `copy_xattr=True`,the extended attributes are also copied
(only available on Linux).
"""
st = os.stat(src)
mode = stat.S_IMODE(st.st_mode)
os.chmod(dest, mode=mode)
os.utime(dest, ns=(st.st_atime_ns, st.st_mtime_ns))
if hasattr(st, "st_flags"):
os.chflags(dest, flags=st.st_flags)
if copy_own:
os.chown(dest, uid=st.st_uid, gid=st.st_gid)
if copy_xattr:
cls.copyxattr(src, dest)
def copystat(src, dst):
"""Copy all stat info (mode bits,flags) from src to dst"""
st = os.stat(src)
mode = stat.S_IMODE(st.st_mode)
if hasattr(os, 'utime'):
os.utime(dst, (st.st_atime, st.st_mtime))
if hasattr(os, 'chmod'):
os.chmod(dst, mode)
if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
try:
os.chflags(dst, st.st_flags)
except OSError, why:
for err in 'EOPNOTSUPP', 'ENOTSUP':
if hasattr(errno, err) and why.errno == getattr(errno, err):
break
else:
raise
def blankAFile(file_path):
'''
truncate a file to zero bytes,and preserve its original modification time
Adapted from 'Keeping Large intermediate files' (http://www.ruffus.org.uk/faq.html)
:param file: Input file path
:return: None
'''
if os.path.exists(file_path):
timeInfo = os.stat(file_path) # retrieve current time stamp of the file
try:
f = open(file_path,'w')
except IOError:
pass
else:
f.truncate(0)
f.close()
# change the time of the file back to what it was
os.utime(file_path,(timeInfo.st_atime, timeInfo.st_mtime))
print file_path + ' blanked to save disk-space.'
else:
print 'blankAFile: ' + file_path + ' not found.'
sys.exit(1)
def try_utime(self, filename, last_modified_hdr):
"""Try to set the last-modified time of the given file."""
if last_modified_hdr is None:
return
if not os.path.isfile(encodeFilename(filename)):
return
timestr = last_modified_hdr
if timestr is None:
return
filetime = timeconvert(timestr)
if filetime is None:
return filetime
# Ignore obviously invalid dates
if filetime == 0:
return
try:
os.utime(filename, (time.time(), filetime))
except Exception:
pass
return filetime
def handle(self):
local_item_tmp_path = self.local_parent_path + get_tmp_filename(self.item_name)
try:
with open(local_item_tmp_path, 'wb') as f:
self.drive.download_file(file=f, size=self._item.size, item_id=self._item.id)
local_sha1 = hasher.hash_value(local_item_tmp_path)
item_sha1 = None
if self._item.file_props is not None and self._item.file_props.hashes is not None:
item_sha1 = self._item.file_props.hashes.sha1
if item_sha1 is None:
self.logger.warn('Remote file %s has not sha1 property,we keep the file but cannot check correctness of it',
self.local_path)
elif local_sha1 != item_sha1:
self.logger.error('Mismatch hash of download file %s : remote:%s,%d local:%s %d', self.local_path,
self._item.file_props.hashes.sha1, self._item.size, local_sha1, os.path.getsize(local_item_tmp_path))
return
os.rename(local_item_tmp_path, self.local_path)
t = datetime_to_timestamp(self._item.modified_time)
os.utime(self.local_path, (t, t))
os.chown(self.local_path, OS_USER_ID, OS_USER_GID)
self.items_store.update_item(self._item, ItemRecordStatuses.DOWNLOADED)
except (IOError, OSError) as e:
self.logger.error('An IO error occurred when downloading "%s":\n%s.', traceback.format_exc())
except errors.OneDriveError as e:
self.logger.error('An API error occurred when downloading "%s":\n%s.', traceback.format_exc())
def Main():
parser = argparse.ArgumentParser()
parser.add_argument('files', nargs='+')
parser.add_argument('-f', '--force', action='store_true',
help="don't err on missing")
parser.add_argument('--stamp', required=True, help='touch this file')
args = parser.parse_args()
for f in args.files:
try:
os.remove(f)
except OSError:
if not args.force:
print >>sys.stderr, "'%s' does not exist" % f
return 1
with open(args.stamp, 'w'):
os.utime(args.stamp, None)
return 0
def setUp(self):
super(HeatConfigTest, self).setUp()
self.fake_hook_path = self.relative_path(__file__, 'hook-fake.py')
self.heat_config_path = self.relative_path(
__file__,
'..',
'heat-config/os-refresh-config/configure.d/55-heat-config')
self.hooks_dir = self.useFixture(fixtures.TempDir())
self.deployed_dir = self.useFixture(fixtures.TempDir())
with open(self.fake_hook_path) as f:
fake_hook = f.read()
for hook in self.fake_hooks:
hook_name = self.hooks_dir.join(hook)
with open(hook_name, 'w') as f:
os.utime(hook_name, None)
f.write(fake_hook)
f.flush()
os.chmod(hook_name, 0o755)
self.env = os.environ.copy()
def setUp(self):
super(HeatConfigDockerComposeORCTest, 'hook-fake.py')
self.heat_config_docker_compose_path = self.relative_path(
__file__,
'heat-config-docker-compose/os-refresh-config/configure.d/'
'50-heat-config-docker-compose')
self.docker_compose_dir = self.useFixture(fixtures.TempDir())
with open(self.fake_hook_path) as f:
fake_hook = f.read()
for hook in self.fake_hooks:
hook_name = self.docker_compose_dir.join(hook)
with open(hook_name, 0o755)
def setUp(self):
super(HeatConfigKubeletORCTest, 'hook-fake.py')
self.heat_config_kubelet_path = self.relative_path(
__file__,
'heat-config-kubelet/os-refresh-config/configure.d/'
'50-heat-config-kubelet')
self.manifests_dir = self.useFixture(fixtures.TempDir())
with open(self.fake_hook_path) as f:
fake_hook = f.read()
for hook in self.fake_hooks:
hook_name = self.manifests_dir.join(hook)
with open(hook_name, 0o755)
def execute_run_box_basics(output="default"):
if os.path.exists(DEFAULT_LOCAL_FRECKLES_BOX_BASICS_MARKER):
return {"return_code": -1}
task_config = [{'tasks':
['makkus.box-basics']
}]
result = create_and_run_nsbl_runner(task_config, task_metadata={}, output_format=output, ask_become_pass=True, run_box_basics=False)
if result["return_code"] == 0:
with open(DEFAULT_LOCAL_FRECKLES_BOX_BASICS_MARKER, 'a'):
os.utime(DEFAULT_LOCAL_FRECKLES_BOX_BASICS_MARKER, None)
return result
def touch(path, times=None, dirs=False):
''' perform UNIX touch with extra
based on: http://stackoverflow.com/questions/12654772/create-empty-file-using-python
Args:
path: file path to touch
times: a 2-tuple of the form (atime,mtime) where each member is an int or float expressing seconds.
defaults to current time.
dirs: is set,create folders if not exists
'''
if dirs:
basedir = os.path.dirname(path)
if not os.path.exists(basedir):
os.makedirs(basedir)
with open(path, 'a'):
os.utime(path, times=times)
def touch(self, mode=0o666, exist_ok=True):
"""
Create this file with the given access mode,if it doesn't exist.
"""
if self._closed:
self._raise_closed()
if exist_ok:
# First try to bump modification time
# Implementation note: GNU touch uses the UTIME_NOW option of
# the utimensat() / futimens() functions.
try:
self._accessor.utime(self, None)
except OSError:
# Avoid exception chaining
pass
else:
return
flags = os.O_CREAT | os.O_WRONLY
if not exist_ok:
flags |= os.O_EXCL
fd = self._raw_open(flags, mode)
os.close(fd)
def run(self):
if not self.has_npm():
log.error("`npm` unavailable. If you're running this command using sudo,make sure `npm` is available to sudo")
else:
env = os.environ.copy()
env['PATH'] = npm_path
log.info("Installing build dependencies with npm. This may take a while...")
check_call(["npm", "install"], cwd=jsroot, stdout=sys.stdout, stderr=sys.stderr, **prckws)
os.utime(node_modules, None)
for t in self.targets:
if not os.path.exists(t):
msg = "Missing file: %s" % t
if not self.has_npm():
msg += "\nnpm is required to build a development version of jupyter-" + name
raise ValueError(msg)
# update package data in case this created new files
update_package_data(self.distribution)
def try_utime(self, filetime))
except Exception:
pass
return filetime
def set_file_utime(filename, desired_time):
"""
Set the utime of a file,and if it fails,raise a more explicit error.
:param filename: the file to modify
:param desired_time: the epoch timestamp to set for atime and mtime.
:raises: SetFileUtimeError: if you do not have permission (errno 1)
:raises: OSError: for all errors other than errno 1
"""
try:
os.utime(filename, (desired_time, desired_time))
except OSError as e:
# Only raise a more explicit exception when it is a permission issue.
if e.errno != errno.EPERM:
raise e
raise SetFileUtimeError(
("The file was downloaded,but attempting to modify the "
"utime of the file failed. Is the file owned by another user?"))
def check_for_update():
if os.path.exists(FILE_UPDATE):
mtime = os.path.getmtime(FILE_UPDATE)
last = datetime.utcfromtimestamp(mtime).strftime('%Y-%m-%d')
today = datetime.utcnow().strftime('%Y-%m-%d')
if last == today:
return
try:
with open(FILE_UPDATE, 'a'):
os.utime(FILE_UPDATE, None)
request = urllib2.Request(
CORE_VERSION_URL,
urllib.urlencode({'version': main.__version__}),
)
response = urllib2.urlopen(request)
with open(FILE_UPDATE, 'w') as update_json:
update_json.write(response.read())
except (urllib2.HTTPError, urllib2.URLError):
pass
def main(reactor, *args):
client = http.HTTPClient()
fname = os.path.join(tmp, str(uuid.uuid4()))
yield httpRequest(client._agent, URI, method='GET', saveto=fname)
filesize = os.path.getsize(fname)
assert filesize > 1
# touch file to 5 minutes in the past
past = int(os.path.getmtime(fname)) - 300
print "PAST MTIME", past
os.utime(fname, (past, past))
assert os.path.getmtime(fname) == past
yield httpRequest(client._agent, saveto=fname)
# it was not modified
current = os.path.getmtime(fname)
print "CURRENT MTIME", current
assert int(current) == past
print 'OK'
shutil.rmtree(tmp)
def _create_dummy_file(self, base, size):
filepath = '{base}/{name}.dat'.format(base=base, name=self._gen_rand_str(5))
with open(filepath, 'wb') as f:
f.truncate(size)
# Change file creation and modification to random time
atime, mtime = self.get_rand_time_pair()
os.utime(filepath, times=(atime, mtime), follow_symlinks=False)
# Change st_birthtime on MacOS
if platform == 'darwin':
created_str = datetime.fromtimestamp(atime).strftime('%m/%d/%Y %H:%M:%S')
# subprocess.run with list arguments and shell=False behaves very strange
# and sets its st_birthtime to earlier than given timestamp very weirdly
command = 'SetFile -d "{0}" {1}'.format(created_str, filepath)
process = subprocess.Popen(command,
shell=True, close_fds=True,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
self._tasks_birthtime_flags.append(process)
self._total_size += size
return filepath, atime, mtime, os.stat(filepath).st_ctime
def check_for_update():
if os.path.exists(FILE_UPDATE):
mtime = os.path.getmtime(FILE_UPDATE)
last = datetime.utcfromtimestamp(mtime).strftime('%Y-%m-%d')
today = datetime.utcnow().strftime('%Y-%m-%d')
if last == today:
return
try:
with open(FILE_UPDATE, urllib2.URLError):
pass
def extractall(self, path=".", members=None):
"""Extract all members from the archive to the current working
directory and set owner,modification time and permissions on
directories afterwards. `path' specifies a different directory
to extract to. `members' is optional and must be a subset of the
list returned by getmembers().
"""
directories = []
if members is None:
members = self
for tarinfo in members:
if tarinfo.isdir():
# Extract directories with a safe mode.
directories.append(tarinfo)
tarinfo = copy.copy(tarinfo)
tarinfo.mode = 0o700
# Do not set_attrs directories,as we will do that further down
self.extract(tarinfo, path, set_attrs=not tarinfo.isdir())
# Reverse sort directories.
directories.sort(key=lambda a: a.name)
directories.reverse()
# Set correct owner,mtime and filemode on directories.
for tarinfo in directories:
dirpath = os.path.join(path, tarinfo.name)
try:
self.chown(tarinfo, dirpath)
self.utime(tarinfo, dirpath)
self.chmod(tarinfo, dirpath)
except ExtractError as e:
if self.errorlevel > 1:
raise
else:
self._dbg(1, "tarfile: %s" % e)
def utime(self, tarinfo, targetpath):
"""Set modification time of targetpath according to tarinfo.
"""
if not hasattr(os, 'utime'):
return
try:
os.utime(targetpath, (tarinfo.mtime, tarinfo.mtime))
except EnvironmentError as e:
raise ExtractError("could not change modification time")
#--------------------------------------------------------------------------
def copystat(src, st.st_flags)
except OSError as why:
if (not hasattr(errno, 'EOPNOTSUPP') or
why.errno != errno.EOPNOTSUPP):
raise
def extractall(self, members=None, *, numeric_owner=False):
"""Extract all members from the archive to the current working
directory and set owner,modification time and permissions on
directories afterwards. `path' specifies a different directory
to extract to. `members' is optional and must be a subset of the
list returned by getmembers(). If `numeric_owner` is True,only
the numbers for user/group names are used and not the names.
"""
directories = []
if members is None:
members = self
for tarinfo in members:
if tarinfo.isdir():
# Extract directories with a safe mode.
directories.append(tarinfo)
tarinfo = copy.copy(tarinfo)
tarinfo.mode = 0o700
# Do not set_attrs directories, set_attrs=not tarinfo.isdir(),
numeric_owner=numeric_owner)
# Reverse sort directories.
directories.sort(key=lambda a: a.name)
directories.reverse()
# Set correct owner, dirpath, numeric_owner=numeric_owner)
self.utime(tarinfo, "tarfile: %s" % e)
def utime(self, tarinfo.mtime))
except OSError:
raise ExtractError("could not change modification time")
#--------------------------------------------------------------------------
def utime(self, tarinfo.mtime))
except EnvironmentError as e:
raise ExtractError("could not change modification time")
#--------------------------------------------------------------------------
def copystat(src, 'EOPNOTSUPP') or
why.errno != errno.EOPNOTSUPP):
raise
def extractall(self,modification time and permissions on
directories afterwards. `path' specifies a different directory
to extract to. `members' is optional and must be a subset of the
list returned by getmembers().
"""
directories = []
if members is None:
members = self
for tarinfo in members:
if tarinfo.isdir():
# Extract directories with a safe mode.
directories.append(tarinfo)
tarinfo = copy.copy(tarinfo)
tarinfo.mode = 0700
self.extract(tarinfo, path)
# Reverse sort directories.
directories.sort(key=operator.attrgetter('name'))
directories.reverse()
# Set correct owner, dirpath)
except ExtractError, e:
if self.errorlevel > 1:
raise
else:
self._dbg(1, "tarfile: %s" % e)
def utime(self, tarinfo.mtime))
except EnvironmentError, e:
raise ExtractError("could not change modification time")
#--------------------------------------------------------------------------
def add(self, message):
"""Add message and return assigned key."""
tmp_file = self._create_tmp()
try:
self._dump_message(message, tmp_file)
except BaseException:
tmp_file.close()
os.remove(tmp_file.name)
raise
_sync_close(tmp_file)
if isinstance(message, MaildirMessage):
subdir = message.get_subdir()
suffix = self.colon + message.get_info()
if suffix == self.colon:
suffix = ''
else:
subdir = 'new'
suffix = ''
uniq = os.path.basename(tmp_file.name).split(self.colon)[0]
dest = os.path.join(self._path, uniq + suffix)
try:
if hasattr(os, 'link'):
os.link(tmp_file.name, dest)
os.remove(tmp_file.name)
else:
os.rename(tmp_file.name, dest)
except OSError, e:
os.remove(tmp_file.name)
if e.errno == errno.EEXIST:
raise ExternalClashError('Name clash with existing message: %s'
% dest)
else:
raise
if isinstance(message, MaildirMessage):
os.utime(dest, (os.path.getatime(dest), message.get_date()))
return uniq
def setModificationTime(self):
"""
Set the UFO modification time to the current time.
This is never called automatically. It is up to the
caller to call this when finished working on the UFO.
"""
os.utime(self._path, None)
# metainfo.plist
def touch(self, path):
if self.touch_only.value:
os.utime(path, None)
else:
with open(path) as fd:
content = fd.read()
content = "#line 1\n" + content
with open(path, "w") as fd:
fd.write(content)
def touch(filename):
with open(filename, 'a'):
os.utime(filename, None)#TODO: evtl None als 2. param
def utime(self, tarinfo.mtime))
except EnvironmentError as e:
raise ExtractError("could not change modification time")
#--------------------------------------------------------------------------
def copystat(src, 'EOPNOTSUPP') or
why.errno != errno.EOPNOTSUPP):
raise
def utime(self, tarinfo.mtime))
except EnvironmentError as e:
raise ExtractError("could not change modification time")
#--------------------------------------------------------------------------
def copystat(src, 'EOPNOTSUPP') or
why.errno != errno.EOPNOTSUPP):
raise
def redate_by_exif(fn):
"""reads EXIF from jpg/jpeg and if file datetime differs from EXIF changes file date"""
global ALL_CNT, CHANGED_CNT
ALL_CNT += 1
exif_time = None
s = os.stat(fn)
file_time = s[8]
if DEBUG:
print(fn)
try:
exif_time = get_exif_date_pil(fn)
except:
s1 = traceback.format_exc()
try:
exif_time = get_exif_date_exif(fn)
except:
s2 = traceback.format_exc()
print('Something is terribly wrong! Both PIL and exifread raises exception')
print('-' * 20)
print(s1)
print('-' * 20)
print(s2)
print('-' * 20)
print('-' * 20)
if exif_time:
dir_n = time.strftime("%Y/%Y_%m_%d", time.gmtime(exif_time))
try:
os.makedirs(dir_n)
except:
pass
secs_diff = file_time - exif_time
print("%s %s -> %s (%s)" % (fn, show_fdt(file_time), show_fdt(exif_time), dir_n))
if secs_diff > MAX_DIFF or secs_diff < -MAX_DIFF:
os.utime(fn, (exif_time, exif_time))
CHANGED_CNT += 1
shutil.move(fn, dir_n)
def touch(path):
import os, time
now = time.time()
try:
# assume it's there
os.utime(path, (now, now))
except os.error:
# if it isn't,try creating the directory,
# a file with that name
os.makedirs(os.path.dirname(path))
open(path, "w").close()
os.utime(path, now))
def utimens(self, times=None):
return os.utime(self._full_path(path), times)
# File methods
# ============