Python discord 模块,ConnectionClosed() 实例源码
我们从Python开源项目中,提取了以下6个代码示例,用于说明如何使用discord.ConnectionClosed()。
def status_clockwork(ev):
while True:
if ev.bot.cfg.pref.status_rotation:
if not status_cache:
status_files = await ev.db[ev.db.db_cfg.database].StatusFiles.find().to_list(None)
for status_file in status_files:
status_text = status_file.get('Text')
status_cache.append(status_text)
if status_cache:
status = status_cache.pop(secrets.randbelow(len(status_cache)))
mode_roll = secrets.randbelow(10)
if mode_roll == 0:
hgen = hashlib.new('md5')
hgen.update(status.encode('utf-8'))
digest = hgen.hexdigest()
max_end = abs(len(digest) - 10)
cut = secrets.randbelow(max_end)
cut_text = digest[cut:(cut + 10)]
status = random_capitalize(cut_text)
game = discord.Game(name=status)
try:
await ev.bot.change_presence(game=game)
except discord.ConnectionClosed:
pass
await asyncio.sleep(60)
def summon(self, ctx):
"""Summons the bot to join your voice channel."""
# This method will be invoked by other commands,so we should return True or False instead of just returning
# First check if the author is even in a voice_channel
summoned_channel = ctx.message.author.voice_channel
if summoned_channel is None:
await self.bot.say('You are not in a voice channel.')
return False
# Then simply create a voice client
try:
success = await self.create_voice_client(summoned_channel)
except (asyncio.TimeoutError, discord.ConnectionClosed):
await self.bot.say("I Failed to connect! This usually happens if I don't have permission to join the"
" channel,but can sometimes be caused by your server region being far away."
" Otherwise this is an issue on discord's end,causing the connect to timeout!")
await self.remove_voice_client(summoned_channel.server)
return False
if success:
try:
await self.bot.say('Ready to play audio in ' + summoned_channel.name)
except discord.Forbidden:
pass
return success
def status_clockwork(ev):
while True:
if ev.bot.cfg.pref.status_rotation:
if not status_cache:
status_files = ev.db[ev.db.db_cfg.database].StatusFiles.find()
for status_file in status_files:
status_text = status_file.get('Text')
status_cache.append(status_text)
if status_cache:
status = status_cache.pop(secrets.randbelow(len(status_cache)))
game = discord.Game(name=status)
try:
await ev.bot.change_presence(game=game)
except discord.ConnectionClosed:
pass
await asyncio.sleep(180)
def dispatch_timers(self):
try:
while not self.bot.is_closed():
# can only asyncio.sleep for up to ~48 days reliably
# so we're gonna cap it off at 40 days
# see: http://bugs.python.org/issue20493
timers = await self.wait_for_active_timers(days=40)
timer = self._current_timer = timers[0]
Now = datetime.datetime.utcNow()
if timer.expires >= Now:
to_sleep = (timer.expires - Now).total_seconds()
await asyncio.sleep(to_sleep)
await self.call_timer(timer)
except asyncio.CancelledError:
pass
except (OSError, discord.ConnectionClosed, asyncpg.PostgresConnectionError):
self._task.cancel()
self._task = self.bot.loop.create_task(self.dispatch_timers())
def run_bot():
is_running = True
while is_running:
try:
main_logger.info('Starting bot...')
client.loop.run_until_complete(client.start(token)) # Start the bot
except KeyboardInterrupt: # It has been stopped?
client.loop.run_until_complete(client.logout())
pending = asyncio.Task.all_tasks()
gathered = asyncio.gather(*pending)
try:
gathered.cancel()
client.loop.run_until_complete(gathered)
gathered.exception()
finally:
is_running = False
except discord.ConnectionClosed as e: # We lost connection?
main_logger.exception(e)
main_logger.info('retrying connection in 5 seconds...')
time.sleep(5)
finally:
client.loop.close()
def join(self, *, channel: discord.Channel):
"""Joins a voice channel."""
try:
await self.create_voice_client(channel)
# Check if the channel given was an actual voice channel
except discord.InvalidArgument:
await self.bot.say('This is not a voice channel...')
except (asyncio.TimeoutError,causing the connect to timeout!")
await self.remove_voice_client(channel.server)
else:
await self.bot.say('Ready to play audio in ' + channel.name)