Python os 模块,SEEK_END 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.SEEK_END。
def create_media(media):
"""Download media link"""
if is_valid_url(media.data_value):
filename = media.data_value.split('/')[-1]
data_file = NamedTemporaryFile()
content_type = mimetypes.guess_type(filename)
with closing(requests.get(media.data_value, stream=True)) as r:
for chunk in r.iter_content(chunk_size=CHUNK_SIZE):
if chunk:
data_file.write(chunk)
data_file.seek(os.SEEK_SET, os.SEEK_END)
size = os.path.getsize(data_file.name)
data_file.seek(os.SEEK_SET)
media.data_value = filename
media.data_file = InMemoryUploadedFile(
data_file, 'data_file', filename, content_type,
size, charset=None)
return media
return None
def seek(self, pos, whence=os.SEEK_SET):
"""Seek to a position in the file.
"""
if self.closed:
raise ValueError("I/O operation on closed file")
if whence == os.SEEK_SET:
self.position = min(max(pos, 0), self.size)
elif whence == os.SEEK_CUR:
if pos < 0:
self.position = max(self.position + pos, 0)
else:
self.position = min(self.position + pos, self.size)
elif whence == os.SEEK_END:
self.position = max(min(self.size + pos, self.size), 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = ""
self.fileobj.seek(self.position)
def map_file(cls, fileobj, offset = 0, size = None):
# If no size is given,it's the whole file by default
if size is None:
fileobj.seek(0, os.SEEK_END)
size = fileobj.tell() - offset
# Read the footer
fileobj.seek(offset + size - cls._Footer.size)
values_pos, = cls._Footer.unpack(fileobj.read(cls._Footer.size))
fileobj.seek(offset)
# Map everything
id_mapper = cls.IdMapper.map_file(fileobj, offset, size = values_pos)
value_array = cls.ValueArray.map_file(fileobj, offset + values_pos,
size = size - cls._Footer.size - values_pos)
return cls(value_array, id_mapper)
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def md5sum2(filename, offset=0, partsize=0):
m = get_md5()
fp = open(filename, 'rb')
if offset > os.path.getsize(filename):
fp.seek(os.SEEK_SET, os.SEEK_END)
else:
fp.seek(offset)
left_len = partsize
BufferSize = BUFFER_SIZE
while True:
if left_len <= 0:
break
elif left_len < BufferSize:
buffer_content = fp.read(left_len)
else:
buffer_content = fp.read(BufferSize)
m.update(buffer_content)
left_len = left_len - len(buffer_content)
md5sum = m.hexdigest()
return md5sum
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def try_enc():
import os
inFile = open('x.wav', 'rb')
inFile.seek(0, os.SEEK_END)
wavFileSize = inFile.tell()
inFile.seek(44) # skip wav header
outFile = open('x.mp3', 'wb')
lame = LameEncoder(44100,1,128)
while(1):
inBytes = inFile.read(512)
if inBytes == '':
break
#inBuf = ctypes.create_string_buffer(inBytes,512)
sample_count = len(inBytes) /2
output_buff_len = int(1.25 * sample_count + 7200)
output_buff = (ctypes.c_char*output_buff_len)()
lame.dll.lame_encode_buffer.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int, ctypes.POINTER(ctypes.c_char), ctypes.c_int];
output_size = lame.dll.lame_encode_buffer(lame.lame, inBytes, 0, len(inBytes)/2, output_buff, output_buff_len);
outFile.write(output_buff[0:output_size])
outFile.close()
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = ""
self.fileobj.seek(self.position)
def test_stream(self):
# guess content types from extension
descriptor = AttachableDescriptor(io.BytesIO(b'Simple text'), extension='.txt')
self.assertIsInstance(descriptor, StreamDescriptor)
self.assertEqual(descriptor.content_type, 'text/plain')
descriptor.seek(2)
self.assertEqual(descriptor.tell(), 2)
descriptor.seek(0, os.SEEK_END)
self.assertEqual(descriptor.tell(), 11)
# guess extension from original filename
descriptor = AttachableDescriptor(io.BytesIO(b'Simple text'), original_filename='letter.pdf')
self.assertEqual(descriptor.extension, '.pdf')
# guess extension from content type
descriptor = AttachableDescriptor(io.BytesIO(b'Simple text'), content_type='application/json')
self.assertEqual(descriptor.extension, '.json')
self.assertRaises(DescriptorOperationError, lambda: descriptor.filename)
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def reopen(self):
if self.inner_file is not None:
self.inner_file.close()
open(self.filename, 'a').close()
f = open(self.filename, 'rb')
f.seek(0, os.SEEK_END)
length = f.tell()
if length > 100*1000*1000:
f.seek(-1000*1000, os.SEEK_END)
while True:
if f.read(1) in ('', '\n'):
break
data = f.read()
f.close()
f = open(self.filename, 'wb')
f.write(data)
f.close()
self.inner_file = codecs.open(self.filename, 'a', 'utf-8')
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def xor_file(input_file, output_file, xorkey):
number_added = 0
while True:
some_bytes = input_file.read(4)
if len(some_bytes) == 0:
break
if len(some_bytes) % 4 != 0:
number_added = 4 - len(some_bytes)
some_bytes = some_bytes + "\x00" * (number_added)
writable_bytes = struct.pack("<I", (struct.unpack("<I", some_bytes)[0]) ^ xorkey)
output_file.write(writable_bytes)
if number_added != 0:
number_added = 0 - number_added
output_file.seek(number_added, os.SEEK_END)
output_file.truncate()
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def read_random_line(f):
import os
chunk_size = 16
with open(f, 'rb') as f_handle:
f_handle.seek(0, os.SEEK_END)
size = f_handle.tell()
# i = random.randint(0,size)
i = u_randint(0, size)
while True:
i -= chunk_size
if i < 0:
chunk_size += i
i = 0
f_handle.seek(i, os.SEEK_SET)
d = f_handle.read(chunk_size)
i_newline = d.rfind(b'\n')
if i_newline != -1:
i += i_newline + 1
break
if i == 0:
break
f_handle.seek(i, os.SEEK_SET)
return f_handle.readline()
def _is_ascii_file(data):
"""
This function returns True if the data represents an ASCII file.
Please note that a False value does not necessary means that the data
represents a binary file. It can be a (very *RARE* in real life,but
can easily be forged) ascii file.
"""
# Skip header...
data.seek(BINARY_HEADER)
size = struct.unpack('<I', data.read(4))[0]
# Use seek() method to get size of the file.
data.seek(0, os.SEEK_END)
file_size = data.tell()
# Reset to the start of the file.
data.seek(0)
if size == 0: # Odds to get that result from an ASCII file are null...
print("WARNING! Reported size (facet number) is 0,assuming invalid binary STL file.")
return False # Assume binary in this case.
return (file_size != BINARY_HEADER + 4 + BINARY_STRIDE * size)
def uuid_from_file(fn, block_size=1 << 20):
"""
Returns an arbitrary sized unique ASCII string based on the file contents.
(exact hashing method may change).
"""
with open(fn, 'rb') as f:
# first get the size
import os
f.seek(0, os.SEEK_END)
size = f.tell()
f.seek(0, os.SEEK_SET)
del os
# done!
import hashlib
sha1 = hashlib.new('sha512')
while True:
data = f.read(block_size)
if not data:
break
sha1.update(data)
# skip the '0x'
return hex(size)[2:] + sha1.hexdigest()
def _uuid_from_file(fn, block_size=1 << 20):
with open(fn, 'rb') as f:
# first get the size
f.seek(0, os.SEEK_SET)
# done!
import hashlib
sha1 = hashlib.new('sha512')
while True:
data = f.read(block_size)
if not data:
break
sha1.update(data)
return (hex(size)[2:] + sha1.hexdigest()).encode()
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def __init__(self, f, summary=True):
"""Args:
+ f: Either a file name or a seekable binary stream.
+ summary: If True,call self.read_summary().
"""
super().__init__(0)
if isinstance(f, IOBase):
self.stream = f
else:
self.stream = open(f, 'rb')
self.stream.seek(0, SEEK_END)
self.stream_size = self.stream.tell()
self.stream.seek(0, SEEK_SET)
if summary:
self.read_summary()
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, whence=os.SEEK_SET):
"""Set the file's current position.
Args:
offset: seek offset as number.
whence: seek mode. Supported modes are os.SEEK_SET (absolute seek),
and os.SEEK_CUR (seek relative to the current position) and os.SEEK_END
(seek relative to the end,offset should be negative).
"""
self._verify_read_mode()
if whence == os.SEEK_SET:
self._offset = offset
elif whence == os.SEEK_CUR:
self._offset += offset
elif whence == os.SEEK_END:
file_stat = self.stat()
self._offset = file_stat.st_size + offset
else:
raise InvalidArgumentError('Whence mode %d is not supported', whence)
def seek(self,
os.SEEK_CUR (seek relative to the current position),and os.SEEK_END
(seek relative to the end,offset should be negative).
"""
if whence == os.SEEK_SET:
self._position = offset
elif whence == os.SEEK_CUR:
self._position += offset
elif whence == os.SEEK_END:
file_stat = stat(self._filename)
self._position = file_stat.st_size + offset
else:
raise InvalidArgumentError('Whence mode %d is not supported', whence)
self._buffer = ''
self._buffer_pos = 0
self._eof = False
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def test_strip_header1(self):
test_input_file = io.BytesIO(b"\x46\x2E\x6C\x6F\x61\x64\x2E\x41"
b"\x43\x00\x80\xF9\x06\x00\x07\xB5"
b"\x50\x00\x00\x3B\x20\x4C\x4F\x41"
b"\x44\x45\x52\x20\x66\x6F\x72")
temp_output_path, bytes_count = self.strip_header(
test_input_file, True)
temp_output_file = open(temp_output_path, "rb")
temp_output_file.seek(0, os.SEEK_END)
try:
self.assertEqual(temp_output_file.tell(), 14)
self.assertEqual(bytes_count, 14)
finally:
temp_output_file.close()
os.remove(temp_output_path)
def test_strip_header2(self):
test_input_file = io.BytesIO(b"\x46\x2E\x6C\x6F\x61\x64\x2E\x41"
b"\x43\x00\x80\xF9\x06\x00\x07\xB5"
b"\x50\x00\x00\x3B\x20\x4C\x4F\x41"
b"\x44\x45\x52\x20\x66\x6F\x72\x20\x20")
temp_output_path, False)
temp_output_file = open(temp_output_path, 16)
self.assertEqual(bytes_count, 16)
finally:
temp_output_file.close()
os.remove(temp_output_path)
def test_strip_header3(self):
test_input_file = io.BytesIO(b"\x46\x2E\x6C\x6F\x61\x64\x2E\x41"
b"\x43\x00\x80\x0A\x00\x00\x07\xB5"
b"\x50\x00\x00\x3B\x20\x4C\x4F\x41"
b"\x44\x45\x52\x20\x66\x6F\x72\x20")
temp_output_path, 10)
self.assertEqual(bytes_count, 10)
finally:
temp_output_file.close()
os.remove(temp_output_path)
def reopen(self):
if self.inner_file is not None:
self.inner_file.close()
open(self.filename, 'utf-8')
def seek(self, whence)
self._buffer = ''
self._buffer_pos = 0
self._eof = False
def write_org_json(self, date=(datetime.date.today()),
organization='llnl',dict_to_write={}, path_ending_type='',
is_list=False):
"""
Writes stats from the organization to JSON.
"""
path = ('../github-data/' + organization + '-org/'
+ path_ending_type + '/' + str(date) + '.json')
self.checkDir(path)
with open(path, 'w') as out_clear:#clear old data
out_clear.close()
with open(path, 'a') as out:
if is_list:#used for list of items
out.write('[')
for item in dict_to_write:
out.write(json.dumps(dict_to_write[item], sort_keys=True,
indent=4, separators=(',', ': ')) + ',')
out.seek(-1, os.SEEK_END)#kill last comma
out.truncate()
if is_list:
out.write(']')
out.close()
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)
def seek(self, 0)
else:
raise ValueError("Invalid argument")
self.buffer = b""
self.fileobj.seek(self.position)