Python os 模块,urandom() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.urandom()。
def new_session(account):
if account.get('session',None) is None:
session = requests.session()
session.verify = True
session.headers.update({'User-Agent': 'Niantic App'}) # session.headers.update({'User-Agent': 'niantic'})
if not account['proxy'] is None:
session.proxies.update(account['proxy'])
account['session'] = session
else:
account['session'].close()
account['session'].cookies.clear()
account['session_time'] = get_time()
account['session_hash'] = os.urandom(32)
account['api_url'] = API_URL
account['auth_ticket'] = None
def test_copy(tmpdir, storages):
f = tmpdir.join('alpha')
f.write_binary(os.urandom(1024 * 40))
f_http = Resource(storages['http']('index.html'))
f_file = Resource(storages['file'](f.strpath))
f_sftp = Resource(storages['sftp']('/gamma'))
f_ftp = Resource(storages['ftp']('/beta'))
assert f_http.exists()
delete_files(f_file, f_sftp, f_ftp)
assert not f_file.exists()\
and not f_sftp.exists()\
and not f_ftp.exists()
transfert.actions.copy(f_http, f_ftp, size=40960)
assert f_ftp.exists() and f_http.exists()
transfert.actions.copy(f_ftp, size=40960)
assert f_ftp.exists() and f_sftp.exists()
transfert.actions.copy(f_sftp, f_file, size=40960)
assert f_sftp.exists() and f_file.exists()
def encrypt(self, data, offset=None, length=None):
"""Encrypts the given data with the current key"""
if offset is None:
offset = 0
if length is None:
length = len(data)
with BinaryWriter() as writer:
# Write SHA
writer.write(sha1(data[offset:offset + length]).digest())
# Write data
writer.write(data[offset:offset + length])
# Add padding if required
if length < 235:
writer.write(os.urandom(235 - length))
result = int.from_bytes(writer.get_bytes(), byteorder='big')
result = pow(result, self.e, self.m)
# If the result byte count is less than 256,since the byte order is big,
# the non-used bytes on the left will be 0 and act as padding,
# without need of any additional checks
return int.to_bytes(
result, length=256, byteorder='big', signed=False)
def seed(self, a=None):
"""Initialize internal state from hashable object.
None or no argument seeds from current time or from an operating
system specific randomness source if available.
If a is not None or an int or long,hash(a) is used instead.
"""
if a is None:
try:
a = long(_hexlify(_urandom(16)), 16)
except NotImplementedError:
import time
a = long(time.time() * 256) # use fractional seconds
super(Random, self).seed(a)
self.gauss_next = None
def uuid4():
"""Generate a random UUID."""
# When the system provides a version-4 UUID generator,use it.
if _uuid_generate_random:
_buffer = ctypes.create_string_buffer(16)
_uuid_generate_random(_buffer)
return UUID(bytes=_buffer.raw)
# Otherwise,get randomness from urandom or the 'random' module.
try:
import os
return UUID(bytes=os.urandom(16), version=4)
except:
import random
bytes = [chr(random.randrange(256)) for i in range(16)]
return UUID(bytes=bytes, version=4)
def _key():
global __key
if __key:
return __key
data_dir = _key_dir()
key_file = os.path.join(data_dir, 'key')
if os.path.isfile(key_file):
with open(key_file, 'rb') as f:
__key = base64.b64decode(f.read())
return __key
__key = base64.b64encode(os.urandom(16))
try:
os.makedirs(data_dir)
except OSError as e:
# errno17 == dir exists
if e.errno != 17:
raise
with open(key_file, 'wb') as f:
f.write(__key)
return __key
def yandex(url):
try:
cookie = client.request(url, output='cookie')
r = client.request(url, cookie=cookie)
r = re.sub(r'[^\x00-\x7F]+', ' ', r)
sk = re.findall('"sk"\s*:\s*"([^"]+)', r)[0]
idstring = re.findall('"id"\s*:\s*"([^"]+)', r)[0]
idclient = binascii.b2a_hex(os.urandom(16))
post = {'idClient': idclient, 'version': '3.9.2', 'sk': sk, '_model.0': 'do-get-resource-url', 'id.0': idstring}
post = urllib.urlencode(post)
r = client.request('https://yadi.sk/models/?_m=do-get-resource-url', post=post, cookie=cookie)
r = json.loads(r)
url = r['models'][0]['data']['file']
return url
except:
return
def create_file(path, size):
"""Create a file with random contents and a given size"""
print('Creating file: {} having a size of {} Bytes'.format(path, size))
t_start = time.time()
with open(path, mode='wb') as fp:
# Write integer number of blocks
data_left = size
for _ in range(size // DATA_BLOCK):
fp.write(os.urandom(DATA_BLOCK))
data_left -= DATA_BLOCK
# Write the remainder (if any)
if data_left > 0:
fp.write(os.urandom(data_left))
print('File created in {} sec.'.format(time.time()-t_start))
def check_for_wildcards(args, server, name, rectype, tries=4):
"""
Verify that the DNS server doesn't return wildcard results for domains
which don't exist,it should correctly return NXDOMAIN.
"""
resolver = Resolver()
resolver.timeout = args.timeout
resolver.lifetime = args.timeout
resolver.nameservers = [server]
nx_names = [base64.b32encode(
os.urandom(
random.randint(8, 10))
).strip('=').lower() + name
for _ in range(0, tries)]
correct_result_count = 0
for check_nx_name in nx_names:
try:
result = resolver.query(check_nx_name, rectype)
return False # Any valid response = immediate fail!
except (NXDOMAIN, NoNameservers):
correct_result_count += 1
except DNSException:
continue
return correct_result_count > (tries / 2.0)
def __init__(self, auth_provider, device_info=None):
self.log = logging.getLogger(__name__)
self._auth_provider = auth_provider
# mystical unkNown6 - resolved by PokemonGoDev
self._signal_agglom_gen = False
self._signature_lib = None
if RpcApi.START_TIME == 0:
RpcApi.START_TIME = get_time(ms=True)
if RpcApi.RPC_ID == 0:
RpcApi.RPC_ID = int(random.random() * 10 ** 18)
self.log.debug('Generated new random RPC Request id: %s', RpcApi.RPC_ID)
# data fields for unkNown6
self.session_hash = os.urandom(32)
self.token2 = random.randint(1,59)
self.course = random.uniform(0, 360)
self.device_info = device_info
def read_random_bits(nbits):
'''Reads 'nbits' random bits.
If nbits isn't a whole number of bytes,an extra byte will be appended with
only the lower bits set.
'''
nbytes, rbits = divmod(nbits, 8)
# Get the random bytes
randomdata = os.urandom(nbytes)
# Add the remaining random bits
if rbits > 0:
randomvalue = ord(os.urandom(1))
randomvalue >>= (8 - rbits)
randomdata = byte(randomvalue) + randomdata
return randomdata
def _make_flow(request, scopes, return_url=None):
"""Creates a Web Server Flow"""
# Generate a CSRF token to prevent malicIoUs requests.
csrf_token = hashlib.sha256(os.urandom(1024)).hexdigest()
request.session[_CSRF_KEY] = csrf_token
state = json.dumps({
'csrf_token': csrf_token,
'return_url': return_url,
})
flow = client.OAuth2WebServerFlow(
client_id=django_util.oauth2_settings.client_id,
client_secret=django_util.oauth2_settings.client_secret,
scope=scopes,
state=state,
redirect_uri=request.build_absolute_uri(
urlresolvers.reverse("google_oauth:callback")))
flow_key = _FLOW_KEY.format(csrf_token)
request.session[flow_key] = pickle.dumps(flow)
return flow
def init_field(self, startpoint):
'''Place the mines and reveal the starting point.
Internal details:
It will wrap in `init_field2` in guessless mode.
It will place the mines by itself when not in guessless
mode.
'''
if self.guessless:
# Wrap in the best version.
self.init_field2(startpoint)
else:
safe = self.field.get_neighbours(startpoint) + [startpoint]
cells = list(filter(
lambda x: x not in safe,
self.field.all_cells()
))
# Choose self.n_mines randomly selected mines.
cells.sort(key=lambda x: os.urandom(1))
mines = cells[:self.n_mines]
self.field.clear()
self.field.fill(mines)
self.field.reveal(startpoint)
def newcursor(self, dictcursor=False):
'''
This creates a DB cursor for the current DB connection using a
randomly generated handle. Returns a tuple with cursor and handle.
dictcursor = True -> use a cursor where each returned row can be
addressed as a dictionary by column name
'''
handle = hashlib.sha256(os.urandom(12)).hexdigest()
if dictcursor:
self.cursors[handle] = self.connection.cursor(
cursor_factory=psycopg2.extras.DictCursor
)
else:
self.cursors[handle] = self.connection.cursor()
return (self.cursors[handle], handle)
def cluster_config(self):
"""
Provide the default configuration for a cluster
"""
if self.cluster:
cluster_dir = "{}/config/stack/default/{}".format(self.root_dir, self.cluster)
if not os.path.isdir(cluster_dir):
_create_dirs(cluster_dir, self.root_dir)
filename = "{}/cluster.yml".format(cluster_dir)
contents = {}
contents['fsid'] = str(uuid.uuid3(uuid.NAMESPACE_DNS, os.urandom(32)))
public_networks_str = ",".join([str(n) for n in self.public_networks])
cluster_networks_str = ",".join([str(n) for n in self.cluster_networks])
contents['public_network'] = public_networks_str
contents['cluster_network'] = cluster_networks_str
contents['available_roles'] = self.available_roles
self.writer.write(filename, contents)
def test_send_receive(self):
random.shuffle(self.swarm)
senders = self.swarm[:len(self.swarm)/2]
receivers = self.swarm[len(self.swarm)/2:]
for sender, receiver in zip(senders, receivers):
message = binascii.hexlify(os.urandom(64))
# check queue prevIoUsly empty
self.assertFalse(bool(receiver.message_list()))
# send message
self.assertTrue(sender.message_send(receiver.dht_id(), message))
# check received
received = receiver.message_list()
self.assertTrue(sender.dht_id() in received)
messages = received[sender.dht_id()]
self.assertTrue(len(messages) == 1)
self.assertEqual(messages[0], message)
# check queue empty after call to message_list
self.assertFalse(bool(receiver.message_list()))
def test_ordering(self):
random.shuffle(self.swarm)
sender = self.swarm[0]
receiver = self.swarm[-1]
# send messages
message_alpha = binascii.hexlify(os.urandom(64))
message_beta = binascii.hexlify(os.urandom(64))
message_gamma = binascii.hexlify(os.urandom(64))
self.assertTrue(sender.message_send(receiver.dht_id(), message_alpha))
self.assertTrue(sender.message_send(receiver.dht_id(), message_beta))
self.assertTrue(sender.message_send(receiver.dht_id(), message_gamma))
# check received in order
received = receiver.message_list()
self.assertTrue(sender.dht_id() in received)
messages = received[sender.dht_id()]
self.assertEqual(messages[0], message_alpha)
self.assertEqual(messages[1], message_beta)
self.assertEqual(messages[2], message_gamma)
def test_flood(self):
# every node subscribes and should receive the event
topic = "test_flood_{0}".format(binascii.hexlify(os.urandom(32)))
for peer in self.swarm:
peer.pubsub_subscribe(topic)
# wait until subscriptions propagate
time.sleep(SLEEP_TIME)
# send event
peer = random.choice(self.swarm)
event = binascii.hexlify(os.urandom(32))
peer.pubsub_publish(topic, event)
# wait until event propagates
time.sleep(SLEEP_TIME)
# check all peers received the event
for peer in self.swarm:
events = peer.pubsub_events(topic)
self.assertEqual(events, [event])
def test_multihop(self):
random.shuffle(self.swarm)
senders = self.swarm[:len(self.swarm) / 2]
receivers = self.swarm[len(self.swarm) / 2:]
for sender, receivers):
# receiver subscribes to topic
topic = "test_miltihop_{0}".format(binascii.hexlify(os.urandom(32)))
receiver.pubsub_subscribe(topic)
# wait until subscriptions propagate
time.sleep(SLEEP_TIME)
# send event
event = binascii.hexlify(os.urandom(32))
sender.pubsub_publish(topic, event)
# wait until event propagates
time.sleep(SLEEP_TIME)
# check all peers received the event
events = receiver.pubsub_events(topic)
self.assertEqual(events, [event])
def server_udp_pre_encrypt(self, buf, uid):
if uid in self.server_info.users:
user_key = self.server_info.users[uid]
else:
uid = None
if not self.server_info.users:
user_key = self.server_info.key
else:
user_key = self.server_info.recv_iv
authdata = os.urandom(7)
mac_key = self.server_info.key
md5data = hmac.new(mac_key, authdata, self.hashfunc).digest()
rand_len = self.udp_rnd_data_len(md5data, self.random_server)
encryptor = encrypt.Encryptor(to_bytes(base64.b64encode(user_key)) + to_bytes(base64.b64encode(md5data)), 'rc4')
out_buf = encryptor.encrypt(buf)
buf = out_buf + os.urandom(rand_len) + authdata
return buf + hmac.new(user_key, self.hashfunc).digest()[:1]
def __init__(self, method, hashfunc):
super(auth_aes128_sha1, self).__init__(method)
self.hashfunc = hashfunc
self.recv_buf = b''
self.unit_len = 8100
self.raw_trans = False
self.has_sent_header = False
self.has_recv_header = False
self.client_id = 0
self.connection_id = 0
self.max_time_dif = 60 * 60 * 24 # time dif (second) setting
self.salt = hashfunc == hashlib.md5 and b"auth_aes128_md5" or b"auth_aes128_sha1"
self.no_compatible_method = hashfunc == hashlib.md5 and "auth_aes128_md5" or 'auth_aes128_sha1'
self.extra_wait_size = struct.unpack('>H', os.urandom(2))[0] % 1024
self.pack_id = 1
self.recv_id = 1
self.user_id = None
self.user_key = None
self.last_rnd_len = 0
self.overhead = 9
def writeConfiguredCookieFile(self, cookie_string = None):
'''
Write a random 32-byte value to the configured cookie file.
If cookie_string is not None,use that value.
Return the value written to the file,or None if there is no cookie
file,or if writing the file fails.
'''
cookie_file = self.getConfiguredCookieFile()
if cookie_file is not None:
if cookie_string is None:
cookie_string = urandom(TorControlProtocol.SAFECOOKIE_LENGTH)
try:
with open(cookie_file, 'w') as f:
f.write(cookie_string)
except IOError as e:
logging.warning("disabling SAFECOOKIE authentication,writing cookie file '{}' Failed with error: {}"
.format(cookie_file, e))
return None
# sanity check: this will fail in write-only environments
assert cookie_string == TorControlProtocol.readCookieFile(
cookie_file)
return cookie_string
else:
return None
def test_io_and_many_files(self):
import os
from time import time
start = time()
for i in range(10000): # ?????????????
obj = os.urandom(16 * 1024)
self.cache.put_obj(i, obj, info_dict={"bin": obj})
for i in range(10000):
info = self.cache.get_info(i)
obj = self.cache.get_obj(i)
self.assertEqual(info['bin'], obj)
print("test_io_and_many_files IO total time:", time() - start)
# test clean delete
all_cache_file_path = [v[0] for v in self.cache.items_dict.values()]
start = time()
del self.cache
print("test_io_and_many_files DELETE ALL total time:", time() - start)
for path in all_cache_file_path:
self.assertFalse(os.path.exists(path), msg=path)
def test_base64io_encode_file(tmpdir):
source_plaintext = os.urandom(1024 * 1024)
plaintext_b64 = base64.b64encode(source_plaintext)
plaintext = tmpdir.join('plaintext')
b64_plaintext = tmpdir.join('base64_plaintext')
with open(str(plaintext), 'wb') as file:
file.write(source_plaintext)
with open(str(plaintext), 'rb') as source, open(str(b64_plaintext), 'wb') as target:
with Base64IO(target) as encoder:
for chunk in source:
encoder.write(chunk)
with open(str(b64_plaintext), 'rb') as file2:
encoded = file2.read()
assert encoded == plaintext_b64
def test_encrypt_with_Metadata_output_write_to_file(tmpdir):
plaintext = tmpdir.join('source_plaintext')
plaintext.write_binary(os.urandom(1024))
ciphertext = tmpdir.join('ciphertext')
Metadata = tmpdir.join('Metadata')
encrypt_args = encrypt_args_template(Metadata=True).format(
source=str(plaintext),
target=str(ciphertext),
Metadata='--Metadata-output ' + str(Metadata)
)
aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))
raw_Metadata = Metadata.read()
output_Metadata = json.loads(raw_Metadata)
for key, value in (('a', 'b'), ('c', 'd')):
assert output_Metadata['header']['encryption_context'][key] == value
assert output_Metadata['mode'] == 'encrypt'
assert output_Metadata['input'] == str(plaintext)
assert output_Metadata['output'] == str(ciphertext)
def test_encrypt_with_Metadata_full_file_path(tmpdir):
plaintext_filename = 'source_plaintext'
plaintext_file = tmpdir.join(plaintext_filename)
plaintext_file.write_binary(os.urandom(1024))
plaintext_file_full_path = str(plaintext_file)
ciphertext_filename = 'ciphertext'
ciphertext_file = tmpdir.join(ciphertext_filename)
ciphertext_file_full_path = str(ciphertext_file)
Metadata = tmpdir.join('Metadata')
encrypt_args = encrypt_args_template(Metadata=True).format(
source=plaintext_filename,
target=ciphertext_filename,
Metadata='--Metadata-output ' + str(Metadata)
)
with tmpdir.as_cwd():
aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))
raw_Metadata = Metadata.read()
output_Metadata = json.loads(raw_Metadata)
assert output_Metadata['input'] == plaintext_file_full_path
assert output_Metadata['output'] == ciphertext_file_full_path
def test_encrypt_with_Metadata_output_write_to_stdout(tmpdir, capsys):
plaintext = tmpdir.join('source_plaintext')
plaintext.write_binary(os.urandom(1024))
ciphertext = tmpdir.join('ciphertext')
encrypt_args = encrypt_args_template(Metadata=True).format(
source=str(plaintext),
Metadata='--Metadata-output -'
)
aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))
out, _err = capsys.readouterr()
output_Metadata = json.loads(out)
for key, 'd')):
assert output_Metadata['header']['encryption_context'][key] == value
assert output_Metadata['mode'] == 'encrypt'
assert output_Metadata['input'] == str(plaintext)
assert output_Metadata['output'] == str(ciphertext)
def test_file_to_file_decrypt_required_encryption_context_success(tmpdir, required_encryption_context):
plaintext = tmpdir.join('source_plaintext')
ciphertext = tmpdir.join('ciphertext')
decrypted = tmpdir.join('decrypted')
with open(str(plaintext), 'wb') as f:
f.write(os.urandom(1024))
encrypt_args = encrypt_args_template().format(
source=str(plaintext),
target=str(ciphertext)
)
decrypt_args = decrypt_args_template().format(
source=str(ciphertext),
target=str(decrypted)
) + ' --encryption-context ' + required_encryption_context
aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))
aws_encryption_sdk_cli.cli(shlex.split(decrypt_args, posix=not is_windows()))
assert filecmp.cmp(str(plaintext), str(decrypted))
def test_file_to_file_decrypt_required_encryption_context_fail(tmpdir, required_encryption_context):
plaintext = tmpdir.join('source_plaintext')
plaintext.write_binary(os.urandom(1024))
ciphertext = tmpdir.join('ciphertext')
Metadata_file = tmpdir.join('Metadata')
decrypted = tmpdir.join('decrypted')
encrypt_args = encrypt_args_template().format(
source=str(plaintext),
target=str(ciphertext)
)
decrypt_args = decrypt_args_template(Metadata=True).format(
source=str(ciphertext),
target=str(decrypted),
Metadata=' --Metadata-output ' + str(Metadata_file)
) + ' --encryption-context ' + required_encryption_context
aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))
assert not decrypted.isfile()
raw_Metadata = Metadata_file.read()
parsed_Metadata = json.loads(raw_Metadata)
assert parsed_Metadata['skipped']
assert parsed_Metadata['reason'] == 'Missing encryption context key or value'
def test_file_to_file_cycle(tmpdir):
plaintext = tmpdir.join('source_plaintext')
ciphertext = tmpdir.join('ciphertext')
decrypted = tmpdir.join('decrypted')
with open(str(plaintext),
target=str(decrypted)
)
aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, str(decrypted))
def test_file_to_file_cycle_target_through_symlink(tmpdir):
plaintext = tmpdir.join('source_plaintext')
output_dir = tmpdir.mkdir('output')
os.symlink(str(output_dir), str(tmpdir.join('output_link')))
ciphertext = tmpdir.join('output_link', 'ciphertext')
decrypted = tmpdir.join('decrypted')
with open(str(plaintext), str(decrypted))
def test_file_overwrite_source_file_to_file_custom_empty_prefix(tmpdir):
plaintext_source = os.urandom(2014)
plaintext = tmpdir.join('source_plaintext')
with open(str(plaintext), 'wb') as f:
f.write(plaintext_source)
encrypt_args = encrypt_args_template().format(
source=str(plaintext),
target=str(plaintext)
) + ' --suffix'
test_result = aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))
assert test_result == 'Destination and source cannot be the same'
with open(str(plaintext), 'rb') as f:
assert f.read() == plaintext_source
def test_file_overwrite_source_dir_to_dir_custom_empty_prefix(tmpdir):
plaintext_source = os.urandom(2014)
plaintext = tmpdir.join('source_plaintext')
with open(str(plaintext), 'wb') as f:
f.write(plaintext_source)
encrypt_args = encrypt_args_template().format(
source=str(tmpdir),
target=str(tmpdir)
) + ' --suffix'
test_result = aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, 'rb') as f:
assert f.read() == plaintext_source
def test_file_overwrite_source_file_to_dir_custom_empty_prefix(tmpdir):
plaintext_source = os.urandom(2014)
plaintext = tmpdir.join('source_plaintext')
with open(str(plaintext), posix=not is_windows()))
assert test_result is None
with open(str(plaintext), 'rb') as f:
assert f.read() == plaintext_source
def test_file_to_dir_cycle(tmpdir):
inner_dir = tmpdir.mkdir('inner')
plaintext = tmpdir.join('source_plaintext')
ciphertext = inner_dir.join('source_plaintext.encrypted')
decrypted = tmpdir.join('decrypted')
with open(str(plaintext),
target=str(inner_dir)
)
decrypt_args = decrypt_args_template().format(
source=str(ciphertext), posix=not is_windows()))
assert os.path.isfile(str(ciphertext))
aws_encryption_sdk_cli.cli(shlex.split(decrypt_args, str(decrypted))
def test_stdin_to_file_to_stdout_cycle(tmpdir):
ciphertext_file = tmpdir.join('ciphertext')
plaintext = os.urandom(1024)
encrypt_args = 'aws-encryption-cli ' + encrypt_args_template(decode=True).format(
source='-',
target=str(ciphertext_file)
)
decrypt_args = 'aws-encryption-cli ' + decrypt_args_template(encode=True).format(
source=str(ciphertext_file),
target='-'
)
proc = Popen(shlex.split(encrypt_args, posix=not is_windows()), stdout=PIPE, stdin=PIPE, stderr=PIPE)
_stdout, _stderr = proc.communicate(input=base64.b64encode(plaintext))
proc = Popen(shlex.split(decrypt_args, stderr=PIPE)
decrypted_stdout, _stderr = proc.communicate()
assert base64.b64decode(decrypted_stdout) == plaintext
def encryptPassword(plaintext):
try:
letters = 'qwertyuiopasdfghjklzxcvbnmQWERTYUIOPASDFGHJKLZXCVBNM1234567890????????????????????????????????????????????????????????????????'
SALT_SIZE = 8
salt = urandom(SALT_SIZE)
randomText = ''.join([random.choice(letters) for i in range(8)])
plaintext = "%3d" % len(plaintext) + plaintext + randomText
plaintext = plaintext.encode('utf-8')
arc4 = ARC4.new(salt)
crypted = arc4.encrypt(plaintext)
res = salt + crypted
# print('salt=',salt)
# print('cr=',crypted)
return b64encode(res).decode('ascii')
except:
return ""
def _maybe_seed(self):
if not self.seeded:
try:
seed = os.urandom(16)
except:
try:
r = open('/dev/urandom', 'rb', 0)
try:
seed = r.read(16)
finally:
r.close()
except:
seed = str(time.time())
self.seeded = True
seed = bytearray(seed)
self.stir(seed, True)
def seed(self,hash(a) is used instead.
"""
if a is None:
try:
# Seed with enough bytes to span the 19937 bit
# state space for the Mersenne Twister
a = long(_hexlify(_urandom(2500)), self).seed(a)
self.gauss_next = None
def touch(self, path, size=None, random=False, perm=None, time=None):
"""Simplify the dynamic creation of files or the updating of their
modified time. If a size is specified,then a file of that size
will be created on the disk. If the file already exists,then the
size= attribute is ignored (for safey reasons).
if random is set to true,then the file created is actually
created using tons of randomly generated content. This is MUCH
slower but nessisary for certain tests.
"""
path = abspath(path)
if not isdir(dirname(path)):
mkdir(dirname(path), 0700)
if not exists(path):
size = strsize_to_bytes(size)
if not random:
f = open(path, "wb")
if isinstance(size, int) and size > 0:
f.seek(size-1)
f.write("\0")
f.close()
else: # fill our file with randomly generaed content
with open(path, 'wb') as f:
# Fill our file with garbage
f.write(urandom(size))
# Update our path
utime(path, time)
if perm is not None:
# Adjust permissions
chmod(path, perm)
# Return True
return True
def initialize(self):
with open(self.manifest_file, 'wb', 4096) as f:
f.write(os.urandom(20))
def dsa_test():
import os
msg = str(random.randint(q,q+q)).encode('utf-8')
sk = os.urandom(32)
pk = publickey(sk)
sig = signature(msg, sk, pk)
return checkvalid(sig, msg, pk)
def crypto_sign_keypair(seed=None):
"""Return (verifying,secret) key from a given seed,or os.urandom(32)"""
if seed is None:
seed = os.urandom(PUBLICKEYBYTES)
else:
warnings.warn("ed25519ll should choose random seed.",
RuntimeWarning)
if len(seed) != 32:
raise ValueError("seed must be 32 random bytes or None.")
skbytes = seed
vkbytes = djbec.publickey(skbytes)
return Keypair(vkbytes, skbytes+vkbytes)
def random(self):
"""Get the next random number in the range [0.0,1.0)."""
return (int.from_bytes(_urandom(7), 'big') >> 3) * RECIP_BPF
def getrandbits(self, k):
"""getrandbits(k) -> x. Generates an int with k random bits."""
if k <= 0:
raise ValueError('number of bits must be greater than zero')
if k != int(k):
raise TypeError('number of bits should be an integer')
numbytes = (k + 7) // 8 # bits / 8 and rounded up
x = int.from_bytes(_urandom(numbytes), 'big')
return x >> (numbytes * 8 - k) # trim excess bits
def generate_signature(signature_plain, signature_lib):
iv = os.urandom(32)
output_size = ctypes.c_size_t()
signature_lib.encrypt(signature_plain, len(signature_plain), iv, 32, None, ctypes.byref(output_size))
output = (ctypes.c_ubyte * output_size.value)()
signature_lib.encrypt(signature_plain, ctypes.byref(output), ctypes.byref(output_size))
signature = b''.join(list(map(lambda x: six.int2byte(x), output)))
return signature
def start(self):
self.logger.info("starting web server listening at https port {0}".format(self.httpsPort))
dir = os.path.dirname(os.path.realpath(__file__))
handlersArgs = dict(iotManager=self.iotManager)
application = [
(r'/(favicon.ico)', tornado.web.staticfilehandler, {'path': dir + '/img'}),
(r'/static/(.*)', {'path': dir + '/static'}),
(r'/upload/(.*)', handlers.AuthFileHandler, {'path': self.uploadDir}),
(r'/img/(.*)',
(r'/login', handlers.LoginWebHandler, dict(adminPasswordHash=self.adminPasswordHash)),
(r'/logout', handlers.logoutWebHandler),
(r'/ws', handlers.WSHandler, handlersArgs),
(r'/device/(.*)', handlers.DeviceWebHandler,
(r'/RSS', handlers.RSSWebHandler,
(r'/history', handlers.HistoryWebHandler,
(r'/devices', handlers.DevicesWebHandler,
(r'/video', handlers.VideoWebHandler, dict(localVideoPort=self.localVideoPort)),
(r'/', handlers.HomeWebHandler,
]
self.logger.info("starting web server listening at http {0} (plain)".format(self.httpPort))
self.httpsApp = tornado.web.Application(application, cookie_secret=os.urandom(32), compiled_template_cache=True)
sslOptions={ "certfile": self.httpsCertFile, "keyfile": self.httpsKeyFile, "ssl_version": ssl.PROTOCOL_TLSv1 }
if self.httpsChainFile:
sslOptions["certfile"] = self.httpsChainFile
self.logger.info("Using certificate file at {0}".format(sslOptions["certfile"]))
self.httpsServer = tornado.httpserver.HTTPServer(self.httpsApp, ssl_options=sslOptions)
self.httpsServer.listen(self.httpsPort)
httpApplication = [
(r'/RSS',
(r'/', handlers.RedirectorHandler, dict(manager = self)),
(r'/(.*)', {'path': dir + '/plain' })
]
self.httpApp = tornado.web.Application(httpApplication)
self.httpServer = tornado.httpserver.HTTPServer(self.httpApp)
self.httpServer.listen(self.httpPort)
tornado.ioloop.IOLoop.current().start()
def _temporary_keychain():
"""
This function creates a temporary Mac keychain that we can use to work with
credentials. This keychain uses a one-time password and a temporary file to
store the data. We expect to have one keychain per socket. The returned
SecKeychainRef must be freed by the caller,including calling
SecKeychainDelete.
Returns a tuple of the SecKeychainRef and the path to the temporary
directory that contains it.
"""
# Unfortunately,SecKeychainCreate requires a path to a keychain. This
# means we cannot use mkstemp to use a generic temporary file. Instead,
# we're going to create a temporary directory and a filename to use there.
# This filename will be 8 random bytes expanded into base64. We also need
# some random bytes to password-protect the keychain we're creating,so we
# ask for 40 random bytes.
random_bytes = os.urandom(40)
filename = base64.b64encode(random_bytes[:8]).decode('utf-8')
password = base64.b64encode(random_bytes[8:]) # Must be valid UTF-8
tempdirectory = tempfile.mkdtemp()
keychain_path = os.path.join(tempdirectory, filename).encode('utf-8')
# We Now want to create the keychain itself.
keychain = Security.SecKeychainRef()
status = Security.SecKeychainCreate(
keychain_path,
len(password),
password,
False,
None,
ctypes.byref(keychain)
)
_assert_no_error(status)
# Having created the keychain,we want to pass it off to the caller.
return keychain, tempdirectory