问题描述
我有一个 discord 机器人,可以向 Reddit API 发出请求。现在我已将其设置为每分钟发出 1 个请求以进行测试。
这段代码运行得非常好一次,直到它在某个时间点停止记录到 Heroku 日志,从那时起,无论我做什么(重置 dyno,再次部署)它都不会' t 运行计划的作业,它只是挂起,没有输出到日志。
主文件:
bot.py
import os
import discord
from discord.ext import commands
from dotenv import load_dotenv
from db_manager import Session,engine,Base
import datetime
import scheduler
from sqlalchemy import and_
import reddit_services
from user import User
from post import Post
from scraped_post import ScrapedPost
# posts id {851944236562776077}
# guild id {851944236562776074}
load_dotenv()
TOKEN = os.getenv('disCORD_TOKEN')
GUILD = os.getenv('disCORD_GUILD')
DB_PW = os.getenv('DB_PW')
# emojis
ARROW_UP = "\u2B06"
ARROW_DOWN = "\u2B07"
STAR = "\u2B50"
bot = commands.Bot(command_prefix='.')
intents = discord.Intents.default()
intents.members = True
Base.Metadata.create_all(engine)
session = Session()
# cron job - send scraped post to channel
async def send_posts_to_channel():
matching_posts_urls = await reddit_services.get_matching_posts_urls('askreddit')
guild = discord.utils.get(bot.guilds,name='Creepy Reddit')
posts = discord.utils.get(guild.text_channels,name='posts')
for url in matching_posts_urls:
print('Sending post to channel: '+url)
msg = await posts.send(url)
await msg.add_reaction(ARROW_UP)
await msg.add_reaction(ARROW_DOWN)
# cron job - post weekly top to channel
async def send_weekly_top():
today = datetime.date.today()
week_ago = today - datetime.timedelta(days=7)
session = ScrapedPost.get_session()
top_posts = session.query(ScrapedPost).filter(and_(
ScrapedPost.created >= week_ago,ScrapedPost.created <= today)).order_by(ScrapedPost.Votes.desc()).limit(3).all()
guild = discord.utils.get(bot.guilds,name='Creepy Reddit')
weekly_top = discord.utils.get(guild.text_channels,name='weekly-top')
text = "Here are the top posts of this week!"
await weekly_top.send(text)
for post in top_posts:
await weekly_top.send(post.url)
@bot.event
async def on_ready():
await reddit_services.start()
await scheduler.start(send_posts_to_channel,send_weekly_top)
bot.run(TOKEN)
reddit_services.py
import os
from dotenv import load_dotenv
import asyncpraw
from scraped_post import ScrapedPost
from datetime import datetime
load_dotenv()
CLIENT_ID = os.getenv('CLIENT_ID')
CLIENT_SECRET = os.getenv('CLIENT_SECRET')
USER_AGENT = os.getenv('USER_AGENT')
USER_NAME = os.getenv('USER_NAME')
PASSWORD = os.getenv('PASSWORD')
# only 1 for testing
keywords = [
"what"
]
reddit = None
async def start():
global reddit
reddit = asyncpraw.Reddit(
client_id = CLIENT_ID,client_secret = CLIENT_SECRET,user_agent = USER_AGENT,username = USER_NAME,password = PASSWORD
)
user = await reddit.user.me()
if user:
print("Succesfully authenticated to the Reddit API")
async def get_matching_posts_urls(subreddit):
askreddit = await reddit.subreddit(subreddit)
askreddit_hot = askreddit.hot(limit=26)
matching_posts = []
async for submission in askreddit_hot:
lower_title = submission.title.lower()
if any(word in lower_title for word in keywords):
already_scraped = await post_already_scraped(submission.id)
if not already_scraped:
await save_record_of_post(submission)
matching_posts.append(submission.url)
else:
print('Already scraped')
return matching_posts
async def save_record_of_post(submission):
post_id = submission.id
title = submission.title
url = submission.url
created = datetime.fromtimestamp(submission.created)
scraped_post = ScrapedPost(post_id,title,url,created)
try:
scraped_post.save()
return scraped_post
except:
print('Could not save scraped post to DB with id: {post_id}').format(post_id=post_id)
async def post_already_scraped(post_id):
session = ScrapedPost.get_session()
post = session.query(ScrapedPost).filter(ScrapedPost.id == post_id).first()
return post is not None
scheduler.py
from datetime import datetime
from apscheduler.schedulers.asyncio import AsyncIOScheduler
import datetime
async def start(send_posts,send_weekly_top):
scheduler = AsyncIOScheduler()
current_time = datetime.datetime.Now()
# scheduler.add_job(send_posts,'cron',hour='0,3,6,12,21',minute='*',second='*')
scheduler.add_job(send_posts,hour='*',second='0')
scheduler.add_job(send_weekly_top,day='6',hour='18')
try:
print('Starting scheduler...')
scheduler.start()
except Exception as e: print(e)
一旦我在 Heroku 上部署,代码就会到达“正在启动调度程序...”并无限期地停在那里。我昨天把它放了一夜,它超时了。
如果我的代码没有明显问题,我能得到一些想法来尝试调试吗?我认为出了点问题,必须有一种方法可以记录一些东西,以便向正确的方向发送。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)