Python os 模块,fstat() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.fstat()。
def commonprefix(m):
"Given a list of pathnames,returns the longest common leading component"
if not m: return ''
# Some people pass in a list of pathname parts to operate in an OS-agnostic
# fashion; don't try to translate in that case as that's an abuse of the
# API and they are already doing what they need to be OS-agnostic and so
# they most likely won't be using an os.PathLike object in the sublists.
if not isinstance(m[0], (list, tuple)):
m = tuple(map(os.fspath, m))
s1 = min(m)
s2 = max(m)
for i, c in enumerate(s1):
if c != s2[i]:
return s1[:i]
return s1
# Are two stat buffers (obtained from stat,fstat or lstat)
# describing the same file?
def _read_from_header(self):
folder_path = os.path.dirname(os.path.realpath(self.file_name))
self.all_channels = self._get_sorted_channels_()
self.all_files = [os.path.join(folder_path, x) for x in self.all_channels]
self.header = self._read_header_(self.all_files[0])
header = {}
header['sampling_rate'] = float(self.header['sampleRate'])
header['nb_channels'] = len(self.all_files)
header['gain'] = float(self.header['bitVolts'])
g = open(self.all_files[0], 'rb')
self.size = ((os.fstat(g.fileno()).st_size - self.NUM_HEADER_BYTES)//self.RECORD_SIZE - 1) * self.SAMPLES_PER_RECORD
self._shape = (self.size, header['nb_channels'])
g.close()
return header
def _set_content_length(self, body):
# Set the content-length based on the body.
thelen = None
try:
thelen = str(len(body))
except TypeError, te:
# If this is a file-like object,try to
# fstat its file descriptor
try:
thelen = str(os.fstat(body.fileno()).st_size)
except (AttributeError, OSError):
# Don't send a length if this failed
if self.debuglevel > 0: print "Cannot stat!!"
if thelen is not None:
self.putheader('Content-Length', thelen)
def super_len(o):
if hasattr(o, '__len__'):
return len(o)
if hasattr(o, 'len'):
return o.len
if hasattr(o, 'fileno'):
try:
fileno = o.fileno()
except io.UnsupportedOperation:
pass
else:
return os.fstat(fileno).st_size
if hasattr(o, 'getvalue'):
# e.g. BytesIO,cStringIO.StringI
return len(o.getvalue())
def get_path_uid(path):
"""
Return path's uid.
Does not follow symlinks: https://github.com/pypa/pip/pull/935#discussion_r5307003
Placed this function in backwardcompat due to differences on AIX and Jython,
that should eventually go away.
:raises OSError: When path is a symlink or can't be read.
"""
if hasattr(os, 'O_NOFOLLOW'):
fd = os.open(path, os.O_RDONLY | os.O_NOFOLLOW)
file_uid = os.fstat(fd).st_uid
os.close(fd)
else: # AIX and Jython
# WARNING: time of check vulnerabity,but best we can do w/o NOFOLLOW
if not os.path.islink(path):
# older versions of Jython don't have `os.fstat`
file_uid = os.stat(path).st_uid
else:
# raise OSError for parity with os.O_NOFOLLOW above
raise OSError("%s is a symlink; Will not return uid for symlinks" % path)
return file_uid
def get_last_offset(self, f):
st = os.fstat(f.fileno())
ino = st[stat.ST_INO]
size = st[stat.ST_SIZE]
try:
last = self.read_last_stat()
except IOError:
# no such file
last = {'inode': 0, 'offset': 0}
if last['inode'] == ino and last['offset'] <= size:
return last['offset']
else:
return 0
def readData(self, filename, file_index):
# Open file and read file size
self.warning("Load input file: %s" % filename)
data = open(filename, 'rb')
orig_filesize = fstat(data.fileno())[ST_SIZE]
if not orig_filesize:
raise ValueError("Input file (%s) is empty!" % filename)
# Read bytes
if self.max_size:
data = data.read(self.max_size)
else:
data = data.read()
# Display message if input is truncated
if len(data) < orig_filesize:
percent = len(data) * 100.0 / orig_filesize
self.warning("Truncate file to %s bytes (%.2f%% of %s bytes)" \
% (len(data), percent, orig_filesize))
# Convert to Python array object
return array('B', data)
def multipart_encode(vars, files, boundary = None, buffer = None):
if boundary is None:
boundary = mimetools.choose_boundary()
if buffer is None:
buffer = ''
for(key, value) in vars:
buffer += '--%s\r\n' % boundary
buffer += 'Content-Disposition: form-data; name="%s"' % key
buffer += '\r\n\r\n' + value + '\r\n'
for(key, fd) in files:
file_size = os.fstat(fd.fileno())[stat.ST_SIZE]
filename = fd.name.split('/')[-1]
contenttype = mimetypes.guess_type(filename)[0] or 'application/octet-stream'
buffer += '--%s\r\n' % boundary
buffer += 'Content-Disposition: form-data; name="%s"; filename="%s"\r\n' % (key, filename)
buffer += 'Content-Type: %s\r\n' % contenttype
# buffer += 'Content-Length: %s\r\n' % file_size
fd.seek(0)
buffer += '\r\n' + fd.read() + '\r\n'
buffer += '--%s--\r\n\r\n' % boundary
return boundary, buffer
def follow_file(filename):
while True:
try:
fd = os.open(filename, os.O_RDONLY | os.O_NONBLOCK)
except OSError:
yield None
continue
try:
inode = os.fstat(fd).st_ino
first = True
while True:
try:
stat = os.stat(filename)
except OSError:
stat = None
yield first, time.time(), fd
if stat is None or inode != stat.st_ino:
break
first = False
finally:
os.close(fd)
def __init__(self, devname=None):
if devname is None:
self.name = "/dev/urandom"
else:
self.name = devname
# Test that /dev/urandom is a character special device
f = open(self.name, "rb", 0)
fmode = os.fstat(f.fileno())[stat.ST_MODE]
if not stat.S_ISCHR(fmode):
f.close()
raise TypeError("%r is not a character special device" % (self.name,))
self.__file = f
BaseRNG.__init__(self)
def start(self):
try:
os.fstat(self._savefd)
except OSError:
raise ValueError("saved filedescriptor not valid,"
"did you call start() twice?")
if self.targetfd == 0 and not self.tmpfile:
fd = os.open(devnullpath, os.O_RDONLY)
os.dup2(fd, 0)
os.close(fd)
if hasattr(self, '_oldsys'):
setattr(sys, patchsysdict[self.targetfd], DontReadFromInput())
else:
os.dup2(self.tmpfile.fileno(), self.targetfd)
if hasattr(self, self.tmpfile)
def startedConnecting(self, connector):
fd = connector.transport.fileno()
stats = os.fstat(fd)
try:
filestats = os.stat(connector.transport.addr)
except:
connector.stopConnecting()
return
if stat.S_IMODE(filestats[0]) != 0600:
log.msg("socket mode is not 0600: %s" % oct(stat.S_IMODE(stats[0])))
elif filestats[4] != os.getuid():
log.msg("socket not owned by us: %s" % stats[4])
elif filestats[5] != os.getgid():
log.msg("socket not owned by our group: %s" % stats[5])
# XXX reenable this when i can fix it for cygwin
#elif filestats[-3:] != stats[-3:]:
# log.msg("socket doesn't have same create times")
else:
log.msg('conecting OK')
return
connector.stopConnecting()
def maybeParseConfig(self):
if self.resolv is None:
# Don't try to parse it,don't set up a call loop
return
try:
resolvConf = file(self.resolv)
except IOError, e:
if e.errno == errno.ENOENT:
# Missing resolv.conf is treated the same as an empty resolv.conf
self.parseConfig(())
else:
raise
else:
mtime = os.fstat(resolvConf.fileno()).st_mtime
if mtime != self._lastResolvTime:
log.msg('%s changed,reparsing' % (self.resolv,))
self._lastResolvTime = mtime
self.parseConfig(resolvConf)
# Check again in a little while
from twisted.internet import reactor
self._parseCall = reactor.callLater(self._resolvReadInterval, self.maybeParseConfig)
def super_len(o):
if hasattr(o,cStringIO.StringIO
return len(o.getvalue())
def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
super().__init__(extra)
self._extra['pipe'] = pipe
self._loop = loop
self._pipe = pipe
self._fileno = pipe.fileno()
mode = os.fstat(self._fileno).st_mode
if not (stat.S_ISFIFO(mode) or
stat.S_ISSOCK(mode) or
stat.S_ISCHR(mode)):
raise ValueError("Pipe transport is for pipes/sockets only.")
_set_nonblocking(self._fileno)
self._protocol = protocol
self._closing = False
self._loop.call_soon(self._protocol.connection_made, self)
# only start reading when connection_made() has been called
self._loop.call_soon(self._loop.add_reader,
self._fileno, self._read_ready)
if waiter is not None:
# only wake up the waiter when connection_made() has been called
self._loop.call_soon(waiter._set_result_unless_cancelled, None)
def tail(fn):
"""Like tail -F """
inode = stat_inode(fn)
f = open_wait(fn)
f.seek(0, 2)
# Has the file inode changed
changed = False
while True:
l = f.readline()
if l:
yield l
elif changed:
f.close()
f = open_wait(fn)
inode = os.fstat(f.fileno()).st_ino
changed = False
elif stat_inode(fn) != inode:
#set changed to true,but then keep trying to read from the file to
#check to see if it was written to after it was rotated.
changed = True
else:
time.sleep(1)
def stream(data, writer):
packer = msgpack.Packer()
writer(packer.pack_map_header(len(data)))
for key, val in data.iteritems():
writer(packer.pack(key))
if issubclass(type(val), file): # File obj
max_size = os.fstat(val.fileno()).st_size - val.tell()
size = min(max_size, val.read_bytes)
bytes_left = size
writer(msgpackHeader(size))
buff = 1024 * 64
while 1:
writer(val.read(min(bytes_left, buff)))
bytes_left = bytes_left - buff
if bytes_left <= 0:
break
else: # Simple
writer(packer.pack(val))
return size
def encode_file(path):
f = open(path,'r')
if S_ISREG(os.fstat(f.fileno()).st_mode):
base = os.path.basename(path)
f2 = open(base + '.base64','wb')
for line in f:
encoded_line = b64encode(line.encode())
f2.write(encoded_line)
f2.close()
else:
popup('Not a regular file ! Skipped ','error')
return False
f.close()
return True
def __init__ (self, fd, args=None, timeout=30, maxread=2000, searchwindowsize=None,
logfile=None, encoding=None, codec_errors='strict'):
'''This takes a file descriptor (an int) or an object that support the
fileno() method (returning an int). All Python file-like objects
support fileno(). '''
if type(fd) != type(0) and hasattr(fd, 'fileno'):
fd = fd.fileno()
if type(fd) != type(0):
raise ExceptionPexpect('The fd argument is not an int. If this is a command string then maybe you want to use pexpect.spawn.')
try: # make sure fd is a valid file descriptor
os.fstat(fd)
except OSError:
raise ExceptionPexpect('The fd argument is not a valid file descriptor.')
self.args = None
self.command = None
SpawnBase.__init__(self, timeout, maxread, searchwindowsize, logfile,
encoding=encoding, codec_errors=codec_errors)
self.child_fd = fd
self.own_fd = False
self.closed = False
self.name = '<file descriptor %d>' % fd
def _lock_success(self, fd):
"""Did we successfully grab the lock?
Because this class deletes the locked file when the lock is
released,it is possible another process removed and recreated
the file between us opening the file and acquiring the lock.
:param int fd: file descriptor of the opened file to lock
:returns: True if the lock was successfully acquired
:rtype: bool
"""
try:
stat1 = os.stat(self._path)
except OSError as err:
if err.errno == errno.ENOENT:
return False
raise
stat2 = os.fstat(fd)
# If our locked file descriptor and the file on disk refer to
# the same device and inode,they're the same file.
return stat1.st_dev == stat2.st_dev and stat1.st_ino == stat2.st_ino
def commonprefix(m):
"Given a list of pathnames,fstat or lstat)
# describing the same file?
def __init__(self,))
self.__file = f
BaseRNG.__init__(self)
def super_len(o):
if hasattr(o,cStringIO.StringIO
return len(o.getvalue())
def request_upload_token(self, file):
"""
Request an upload token.
:param file: A file handler pointing to the file to upload.
:returns: True if the file uploaded successfully,False otherwise,\
and the JSON response from the API.
:rtype: tuple
"""
self.kwargs['name'] = os.path.basename(file.name)
self.kwargs['size'] = os.fstat(file.fileno()).st_size
response = self._requester.request(
'POST',
self.url,
_kwargs=combine_kwargs(**self.kwargs)
)
return self.upload(response, file)
def super_len(o):
if hasattr(o,cStringIO.StringIO
return len(o.getvalue())
def __init__(self,))
self.__file = f
BaseRNG.__init__(self)
def _set_content_length(self, body, method):
# Set the content-length based on the body. If the body is "empty",we
# set Content-Length: 0 for methods that expect a body (RFC 7230,
# Section 3.3.2). If the body is set for other methods,we set the
# header provided we can figure out what the length is.
thelen = None
if body is None and method.upper() in _METHODS_EXPECTING_BODY:
thelen = '0'
elif body is not None:
try:
thelen = str(len(body))
except (TypeError, AttributeError):
# If this is a file-like object,try to
# fstat its file descriptor
try:
thelen = str(os.fstat(body.fileno()).st_size)
except (AttributeError, OSError):
# Don't send a length if this failed
if self.debuglevel > 0: print "Cannot stat!!"
if thelen is not None:
self.putheader('Content-Length', thelen)
def GetFileLength(fh):
"""Returns the length of the file represented by fh.
This function is capable of finding the length of any seekable stream,
unlike os.fstat,which only works on file streams.
Args:
fh: The stream to get the length of.
Returns:
The length of the stream.
"""
pos = fh.tell()
fh.seek(0, 2)
length = fh.tell()
fh.seek(pos, 0)
return length
def super_len(o):
if hasattr(o,cStringIO.StringI
return len(o.getvalue())
def get_path_uid(path):
"""
Return path's uid.
Does not follow symlinks: https://github.com/pypa/pip/pull/935#discussion_r5307003
Placed this function in backwardcompat due to differences on AIX and Jython,but best we can do w/o NOFOLLOW
if not os.path.islink(path):
# older versions of Jython don't have `os.fstat`
file_uid = os.stat(path).st_uid
else:
# raise OSError for parity with os.O_NOFOLLOW above
raise OSError("%s is a symlink; Will not return uid for symlinks" % path)
return file_uid
def super_len(o):
if hasattr(o,cStringIO.StringI
return len(o.getvalue())
def GetFileLength(fh):
"""Returns the length of the file represented by fh.
This function is capable of finding the length of any seekable stream, 0)
return length
def _set_content_length(self, body):
# Set the content-length based on the body.
thelen = None
try:
thelen = str(len(body))
except TypeError as te:
# If this is a file-like object, OSError):
# Don't send a length if this failed
if self.debuglevel > 0: print("Cannot stat!!")
if thelen is not None:
self.putheader('Content-Length', thelen)
def __init__(self,))
self.__file = f
BaseRNG.__init__(self)
def encode(self, params, boundary=None, buffer=None):
if boundary is None:
boundary = mimetools.choose_boundary()
if buffer is None:
buffer = cStringIO.StringIO()
for (key, value) in params:
buffer.write('--%s\r\n' % boundary)
buffer.write('Content-Disposition: form-data; name="%s"' % key)
buffer.write('\r\n\r\n%s\r\n' % value)
for (key, fd) in files:
filename = fd.name.split('/')[-1]
content_type = mimetypes.guess_type(filename)[0]
content_type = content_type or 'application/octet-stream'
file_size = os.fstat(fd.fileno())[stat.ST_SIZE]
buffer.write('--%s\r\n' % boundary)
c_dis = 'Content-Disposition: form-data; name="%s"; filename="%s"%s'
content_disposition = c_dis % (key, '\r\n')
buffer.write(content_disposition)
buffer.write('Content-Type: %s\r\n' % content_type)
buffer.write('Content-Length: %s\r\n' % file_size)
fd.seek(0)
buffer.write('\r\n%s\r\n' % fd.read())
buffer.write('--%s--\r\n\r\n' % boundary)
buffer = buffer.getvalue()
return boundary, buffer
def total_len(o):
if hasattr(o,cStringIO.StringIO
return len(o.getvalue())
def get_path_uid(path):
"""
Return path's uid.
Does not follow symlinks:
https://github.com/pypa/pip/pull/935#discussion_r5307003
Placed this function in compat due to differences on AIX and
Jython,that should eventually go away.
:raises OSError: When path is a symlink or can't be read.
"""
if hasattr(os, os.O_RDONLY | os.O_NOFOLLOW)
file_uid = os.fstat(fd).st_uid
os.close(fd)
else: # AIX and Jython
# WARNING: time of check vulnerability,but best we can do w/o NOFOLLOW
if not os.path.islink(path):
# older versions of Jython don't have `os.fstat`
file_uid = os.stat(path).st_uid
else:
# raise OSError for parity with os.O_NOFOLLOW above
raise OSError(
"%s is a symlink; Will not return uid for symlinks" % path
)
return file_uid
def sameopenfile(fp1, fp2):
"""Test whether two open file objects reference the same file"""
s1 = os.fstat(fp1)
s2 = os.fstat(fp2)
return samestat(s1, s2)
# Split a path in root and extension.
# The extension is everything starting at the last dot in the last
# pathname component; the root is everything before that.
# It is always true that root + ext == p.
# Generic implementation of splitext,to be parametrized with
# the separators
def read_qstring(fid):
"""Read Qt style QString.
The first 32-bit unsigned number indicates the length of the string (in bytes).
If this number equals 0xFFFFFFFF,the string is null.
Strings are stored as unicode.
"""
length, = struct.unpack('<I', fid.read(4))
if length == int('ffffffff', 16): return ""
if length > (os.fstat(fid.fileno()).st_size - fid.tell() + 1) :
print(length)
raise Exception('Length too long.')
# convert length from bytes to 16-bit Unicode words
length = int(length / 2)
data = []
for i in range(0, length):
c, = struct.unpack('<H', fid.read(2))
data.append(c)
if sys.version_info >= (3,0):
a = ''.join([chr(c) for c in data])
else:
a = ''.join([unichr(c) for c in data])
return a
def _read_from_header(self):
folder_path = os.path.dirname(os.path.abspath(self.file_name))
self.all_files = self._get_sorted_channels_()
regexpr = re.compile('\d+')
self.all_channels = []
for f in self.all_files:
self.all_channels += [int(regexpr.findall(f)[0])]
self.header = self._read_header_(self.all_files[0])
header = {}
header['sampling_rate'] = float(self.header['SamplingFrequency'])
header['nb_channels'] = len(self.all_files)
header['gain'] = float(self.header['ADBitVolts'])*1000000
self.inverse = self.header.has_key('InputInverted') and (self.header['InputInverted'] == 'True')
if self.inverse:
header['gain'] *= -1
g = open(self.all_files[0], header['nb_channels'])
g.close()
return header
def get_path_uid(path):
"""
Return path's uid.
Does not follow symlinks:
https://github.com/pypa/pip/pull/935#discussion_r5307003
Placed this function in compat due to differences on AIX and
Jython,but best we can do w/o NOFOLLOW
if not os.path.islink(path):
# older versions of Jython don't have `os.fstat`
file_uid = os.stat(path).st_uid
else:
# raise OSError for parity with os.O_NOFOLLOW above
raise OSError(
"%s is a symlink; Will not return uid for symlinks" % path
)
return file_uid
def send_head(self):
"""Common code for GET and HEAD commands.
This sends the response code and MIME headers.
Return value is either a file object (which has to be copied
to the outputfile by the caller unless the command was HEAD,
and must be closed by the caller under all circumstances),or
None,in which case the caller has nothing further to do.
"""
path = self.translate_path(self.path)
f = None
if os.path.isdir(path):
if not self.path.endswith('/'):
# redirect browser - doing basically what apache does
self.send_response(301)
self.send_header("Location", self.path + "/")
self.end_headers()
return None
for index in "index.html", "index.htm":
index = os.path.join(path, index)
if os.path.exists(index):
path = index
break
else:
return self.list_directory(path)
ctype = self.guess_type(path)
try:
# Always read in binary mode. Opening files in text mode may cause
# newline translations,making the actual size of the content
# transmitted *less* than the content-length!
f = open(path, 'rb')
except IOError:
self.send_error(404, "File not found")
return None
self.send_response(200)
self.send_header("Content-type", ctype)
fs = os.fstat(f.fileno())
self.send_header("Content-Length", str(fs[6]))
self.send_header("Last-Modified", self.date_time_string(fs.st_mtime))
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
return f
def send_head(self):
"""Common code for GET and HEAD commands.
This sends the response code and MIME headers.
Return value is either a file object (which has to be copied
to the outputfile by the caller unless the command was HEAD, "*")
self.end_headers()
return f
def sameopenfile(fp1, s2)
# Are two stat buffers (obtained from stat,fstat or lstat)
# describing the same file?
def askUserForEncryptionKey():
if DEBUG: # Allows testing instead of requiring a user prompt
return 'ipwb'
outputRedirected = os.fstat(0) != os.fstat(1)
promptString = 'Enter a key for encryption: '
if outputRedirected: # Prevents prompt in redir output
promptString = ''
print(promptString, file=sys.stderr)
key = raw_input(promptString)
return key
def _open(self):
sig = self.fp.read(4)
if sig == b'\xff\x4f\xff\x51':
self.codec = "j2k"
self.size, self.mode = _parse_codestream(self.fp)
else:
sig = sig + self.fp.read(8)
if sig == b'\x00\x00\x00\x0cjP \x0d\x0a\x87\x0a':
self.codec = "jp2"
self.size, self.mode = _parse_jp2_header(self.fp)
else:
raise SyntaxError('not a JPEG 2000 file')
if self.size is None or self.mode is None:
raise SyntaxError('unable to determine size/mode')
self.reduce = 0
self.layers = 0
fd = -1
length = -1
try:
fd = self.fp.fileno()
length = os.fstat(fd).st_size
except:
fd = -1
try:
pos = self.fp.tell()
self.fp.seek(0, 2)
length = self.fp.tell()
self.fp.seek(pos, 0)
except:
length = -1
self.tile = [('jpeg2k', (0, 0) + self.size, 0,
(self.codec, self.reduce, self.layers, length))]