Python os 模块,SEEK_SET 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.SEEK_SET。
def create_media(media):
"""Download media link"""
if is_valid_url(media.data_value):
filename = media.data_value.split('/')[-1]
data_file = NamedTemporaryFile()
content_type = mimetypes.guess_type(filename)
with closing(requests.get(media.data_value, stream=True)) as r:
for chunk in r.iter_content(chunk_size=CHUNK_SIZE):
if chunk:
data_file.write(chunk)
data_file.seek(os.SEEK_SET, os.SEEK_END)
size = os.path.getsize(data_file.name)
data_file.seek(os.SEEK_SET)
media.data_value = filename
media.data_file = InMemoryUploadedFile(
data_file, 'data_file', filename, content_type,
size, charset=None)
return media
return None
def seek(self, pos, whence=os.SEEK_SET):
"""Seek to a position in the file.
"""
if self.closed:
raise ValueError("I/O operation on closed file")
if whence == os.SEEK_SET:
self.position = min(max(pos, 0), self.size)
elif whence == os.SEEK_CUR:
if pos < 0:
self.position = max(self.position + pos, 0)
else:
self.position = min(self.position + pos, self.size)
elif whence == os.SEEK_END:
self.position = max(min(self.size + pos, self.size), 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = ""
self.fileobj.seek(self.position)
def setup(self):
# Reset everything.
self.f.seek(self.beginning, os.SEEK_SET)
self.whereToWriteNewIFDOffset = None
self.offsetOfNewPage = 0
self.IIMM = IIMM = self.f.read(4)
if not IIMM:
# empty file - first page
self.isFirst = True
return
self.isFirst = False
if IIMM == b"II\x2a\x00":
self.setEndian("<")
elif IIMM == b"MM\x00\x2a":
self.setEndian(">")
else:
raise RuntimeError("Invalid TIFF file header")
self.skipIFDs()
self.goToEnd()
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def md5sum2(filename, offset=0, partsize=0):
m = get_md5()
fp = open(filename, 'rb')
if offset > os.path.getsize(filename):
fp.seek(os.SEEK_SET, os.SEEK_END)
else:
fp.seek(offset)
left_len = partsize
BufferSize = BUFFER_SIZE
while True:
if left_len <= 0:
break
elif left_len < BufferSize:
buffer_content = fp.read(left_len)
else:
buffer_content = fp.read(BufferSize)
m.update(buffer_content)
left_len = left_len - len(buffer_content)
md5sum = m.hexdigest()
return md5sum
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def setup(self):
# Reset everything.
self.f.seek(self.beginning, os.SEEK_SET)
self.whereToWriteNewIFDOffset = None
self.offsetOfNewPage = 0
self.IIMM = IIMM = self.f.read(4)
if not IIMM:
# empty file - first page
self.isFirst = True
return
self.isFirst = False
if IIMM == b"II\x2a\x00":
self.setEndian("<")
elif IIMM == b"MM\x00\x2a":
self.setEndian(">")
else:
raise RuntimeError("Invalid TIFF file header")
self.skipIFDs()
self.goToEnd()
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def _get_org(self, ipnum):
"""
Seek and return organization or ISP name for ipnum.
Return org/isp name.
:arg ipnum: Result of ip2long conversion
"""
seek_org = self._seek_country(ipnum)
if seek_org == self._databaseSegments:
return None
read_length = (2 * self._recordLength - 1) * self._databaseSegments
try:
self._lock.acquire()
self._fp.seek(seek_org + read_length, os.SEEK_SET)
buf = self._fp.read(const.MAX_ORG_RECORD_LENGTH)
finally:
self._lock.release()
if PY3 and type(buf) is bytes:
buf = buf.decode(ENCODING)
return buf[:buf.index(chr(0))]
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = ""
self.fileobj.seek(self.position)
def chunk_generator(chunk_count, chunk_size, file_data):
"""
Generic chunk generator logic
:param chunk_count: Number of chunks wanted
:param chunk_size: Size of each chunk
:param file_data: bytes to be split into chunk
:return:
"""
try:
total_len = len(file_data)
is_fp = False
except TypeError:
total_len = get_file_size(file_data)
is_fp = True
for i in range(chunk_count):
start_range = i * chunk_size
end_range = (start_range + chunk_size) if i < (chunk_count - 1) else total_len
chunk_info = Chunk(i, start_range, end_range, chunk_count)
if is_fp:
file_data.seek(chunk_info.start, os.SEEK_SET)
yield chunk_info, file_data.read(chunk_info.length)
else:
yield chunk_info, file_data[chunk_info.start: chunk_info.end]
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def setup(self):
# Reset everything.
self.f.seek(self.beginning, os.SEEK_SET)
self.whereToWriteNewIFDOffset = None
self.offsetOfNewPage = 0
self.IIMM = IIMM = self.f.read(4)
if not IIMM:
# empty file - first page
self.isFirst = True
return
self.isFirst = False
if IIMM == b"II\x2a\x00":
self.setEndian("<")
elif IIMM == b"MM\x00\x2a":
self.setEndian(">")
else:
raise RuntimeError("Invalid TIFF file header")
self.skipIFDs()
self.goToEnd()
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def read_random_line(f):
import os
chunk_size = 16
with open(f, 'rb') as f_handle:
f_handle.seek(0, os.SEEK_END)
size = f_handle.tell()
# i = random.randint(0,size)
i = u_randint(0, size)
while True:
i -= chunk_size
if i < 0:
chunk_size += i
i = 0
f_handle.seek(i, os.SEEK_SET)
d = f_handle.read(chunk_size)
i_newline = d.rfind(b'\n')
if i_newline != -1:
i += i_newline + 1
break
if i == 0:
break
f_handle.seek(i, os.SEEK_SET)
return f_handle.readline()
def uuid_from_file(fn, block_size=1 << 20):
"""
Returns an arbitrary sized unique ASCII string based on the file contents.
(exact hashing method may change).
"""
with open(fn, 'rb') as f:
# first get the size
import os
f.seek(0, os.SEEK_END)
size = f.tell()
f.seek(0, os.SEEK_SET)
del os
# done!
import hashlib
sha1 = hashlib.new('sha512')
while True:
data = f.read(block_size)
if not data:
break
sha1.update(data)
# skip the '0x'
return hex(size)[2:] + sha1.hexdigest()
def _uuid_from_file(fn, block_size=1 << 20):
with open(fn, 'rb') as f:
# first get the size
f.seek(0, os.SEEK_SET)
# done!
import hashlib
sha1 = hashlib.new('sha512')
while True:
data = f.read(block_size)
if not data:
break
sha1.update(data)
return (hex(size)[2:] + sha1.hexdigest()).encode()
def close(self):
"""
Close the blend file
writes the blend file to disk if changes has happened
"""
handle = self.handle
if self.is_modified:
if self.is_compressed:
log.debug("close compressed blend file")
handle.seek(os.SEEK_SET, 0)
log.debug("compressing started")
fs = gzip.open(self.filepath_orig, "wb")
data = handle.read(FILE_BUFFER_SIZE)
while data:
fs.write(data)
data = handle.read(FILE_BUFFER_SIZE)
fs.close()
log.debug("compressing finished")
handle.close()
def get(self, path,
default=...,
sdna_index_refine=None,
use_nil=True, use_str=True,
base_index=0,
):
ofs = self.file_offset
if base_index != 0:
assert(base_index < self.count)
ofs += (self.size // self.count) * base_index
self.file.handle.seek(ofs, os.SEEK_SET)
if sdna_index_refine is None:
sdna_index_refine = self.sdna_index
else:
self.file.ensure_subtype_smaller(self.sdna_index, sdna_index_refine)
dna_struct = self.file.structs[sdna_index_refine]
return dna_struct.field_get(
self.file.header, self.file.handle,
default=default,
use_nil=use_nil, use_str=use_str,
)
def set(self, value,
):
if sdna_index_refine is None:
sdna_index_refine = self.sdna_index
else:
self.file.ensure_subtype_smaller(self.sdna_index, sdna_index_refine)
dna_struct = self.file.structs[sdna_index_refine]
self.file.handle.seek(self.file_offset, os.SEEK_SET)
self.file.is_modified = True
return dna_struct.field_set(
self.file.header, value)
# ---------------
# Utility get/set
#
# avoid inline pointer casting
def iter_array(block, length=-1):
assert(block.code == b'DATA')
from . import blendfile
import os
handle = block.file.handle
header = block.file.header
for i in range(length):
block.file.handle.seek(block.file_offset + (header.pointer_size * i), os.SEEK_SET)
offset = blendfile.DNA_IO.read_pointer(handle, header)
sub_block = block.file.find_block_from_offset(offset)
yield sub_block
# -----------------------------------------------------------------------------
# ID Expand
def uuid_from_file(fn, os.SEEK_SET)
del os
# done!
import hashlib
sha1 = hashlib.new('sha512')
while True:
data = f.read(block_size)
if not data:
break
sha1.update(data)
# skip the '0x'
return hex(size)[2:] + sha1.hexdigest()
def set(self, value)
# ---------------
# Utility get/set
#
# avoid inline pointer casting
def iter_array(block, header)
sub_block = block.file.find_block_from_offset(offset)
yield sub_block
# -----------------------------------------------------------------------------
# ID Expand
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def rar_v4(f):
# based on http://www.forensicswiki.org/wiki/RAR
# and http://acritum.com/winrar/rar-format
while True:
pos = f.tell()
crc, typ, flags, size = struct.unpack('<HBHH', f.read(7))
if flags & 0x8000:
size += struct.unpack('<L', f.read(4))[0]
if not 0x72 <= typ <= 0x7b:
raise FileCorrupted
f.try_seek(pos + size, os.SEEK_SET)
# f.try_seek(size,os.SEEK_CUR)
f.update_pos()
if typ == 0x7b:
break
elif typ == 0x73 and flags & 0x80:
return Ellipsis # encrypted,assume bad faith
return True
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def _get_org(self, ipnum):
"""
Seek and return organization (or ISP) name for converted IP addr.
@param ipnum: Converted IP address
@type ipnum: int
@return: org/isp name
@rtype: str
"""
seek_org = self._seek_country(ipnum)
if seek_org == self._databaseSegments:
return None
record_pointer = seek_org + (2 * self._recordLength - 1) * self._databaseSegments
self._filehandle.seek(record_pointer, os.SEEK_SET)
org_buf = self._filehandle.read(const.MAX_ORG_RECORD_LENGTH)
return org_buf[:org_buf.index(chr(0))]
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def write(self, stream, seekfirst=True):
"""Write header and child elements.
This checks first whether the Element is in a consistent state.
Args:
+ stream: As in Element.write().
+ seekfirst: As in Element.write().
Raises:
+ EbmlException,if the write fails.
+ Inconsistent,if the Element is not in a consistent state.
"""
self.check_consistency()
if seekfirst:
stream.seek(self.pos_absolute, SEEK_SET)
stream.write(self.header.encode())
Container._write(self, False)
def __init__(self, f, summary=True):
"""Args:
+ f: Either a file name or a seekable binary stream.
+ summary: If True,call self.read_summary().
"""
super().__init__(0)
if isinstance(f, IOBase):
self.stream = f
else:
self.stream = open(f, 'rb')
self.stream.seek(0, SEEK_END)
self.stream_size = self.stream.tell()
self.stream.seek(0, SEEK_SET)
if summary:
self.read_summary()
def parse_SeekHead(self, child, stream): #pylint: disable=invalid-name
"Parse SeekHead element and recursively read elements."
LOG.debug("Segment: parsed {}".format(child))
recursed = False
for seek_entry in child.children_named('Seek'):
try:
self.find(seek_entry.seek_pos)
except ValueError:
# Recurse if this is the first time we've seen this seek entry
LOG.debug("Segment: adding seek entry {}".format(seek_entry))
if seek_entry.seek_id_name != 'Cluster' and stream:
# This recursively reads any elements this seek entry
# points to that haven't been read already.
self.read_element(stream, seek_entry.seek_pos,
summary=True, seekfirst=True)
recursed = True
if recursed:
stream.seek(child.pos_end_absolute, SEEK_SET)
def write(self, seekfirst=True):
"""Write this element to a binary stream.
If this element element is not dirty,this method should reproduce the
byte stream used to read it.
Raises:
+ ValueError: if the object's data cannot be encoded in self.size
bytes.
"""
if seekfirst:
stream.seek(self.pos_absolute, SEEK_SET)
stream.write(self.header.encode())
stream.write(self.encode(self.value, self.size))
# Virtual
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, offset, whence=os.SEEK_SET):
"""Set the file's current position.
Args:
offset: seek offset as number.
whence: seek mode. Supported modes are os.SEEK_SET (absolute seek),
and os.SEEK_CUR (seek relative to the current position) and os.SEEK_END
(seek relative to the end,offset should be negative).
"""
self._verify_read_mode()
if whence == os.SEEK_SET:
self._offset = offset
elif whence == os.SEEK_CUR:
self._offset += offset
elif whence == os.SEEK_END:
file_stat = self.stat()
self._offset = file_stat.st_size + offset
else:
raise InvalidArgumentError('Whence mode %d is not supported', whence)