在MongoDB中使用聚合管道存储前N个结果

问题描述

我正在使用pymongo在MongoDB上进行一些分析。

在MongoDB中,有480000个json对象,代表2020年3月至2020年4月之间关于Covid-19病毒的推文。

尤其是,这些对象包含两个字段:

1)“ created_at”,代表日期类型的tweet的创建时间戳(例如created_at:2020-03-20T10:57:57.000 + 00:00);

2)“ retweet_count”,表示转发了该推文多少次(例如“ retweet_count:30”);

我将创建一条聚合管道,该管道每天都要使用retweet_count的最大值的前5000个json对象。

问题在于,如果我必须使用group子句,match子句或project子句(我是新手),我也不理解。

这里有我尝试过的尝试:

import pymongo
from datetime import datetime,tzinfo,timezone
from pymongo import MongoClient

client['Covid19']['tweets'].aggregate([
    {
        '$match' : {
            "created_at": { '$gte': datetime(2020,3,20),'$lt':  datetime(2020,21) } 
        }

    },{
        '$merge': {
            'into': 'tweets_filtered'
        }
    }
])

print(client['Covid19']['tweets_filtered'].count_documents({}))

此管道最终提供了从3月20日到3月21日的推文,但我将对此过程进行概括,并每天获取前5000条推文,其中retweet_count的值最高。

解决方法

您可以通过编程生成所需的边界并使用$ bucket。

  1. $ sort输入以实现所需的排序(大多数转发为第一个)。
  2. $ bucket按天划分集合。
  3. $ push在相应的日期下移动每个文档。
  4. $ project和$ slice一起获得前X个结果。

使用时间,计数和消息字段的Ruby示例:

require 'mongo'

Mongo::Logger.logger.level = Logger::WARN

client = Mongo::Client.new(['localhost:14420'])
c = client['foo']


c.delete_many

10.times do |i|
  day_time = Time.now - i*86400
  100.times do |j|
    time = day_time + j*100
    count = rand*1000
    message = "message #{count}"

    c.insert_one(time: time,count: count,message: message)
  end
end


days = (-1..10).map { |i| Time.now - i*86400 }.reverse
pp c.aggregate([
  {'$sort' => {count: -1}},{'$bucket' => {groupBy: '$time',boundaries: days,output: {messages: {'$push' => '$$ROOT'}},}},{'$project' => {top_messages: {'$slice' => ['$messages',5]}}},]).to_a
,

Pymongo使用熊猫回答:

from pymongo import MongoClient
from datetime import datetime
import pandas as pd

TOP_N_PER_DAY = 5000

# Perform the find with a filter; strip out the _id
tweets = db.tweets.find({ 'created_at': {'$gte': datetime(2020,3,20),'$lt':  datetime(2020,22) }},{'_id': 0})

# Create a dataframe from the find
df = pd.DataFrame(list(tweets))

# Convert the datetime to a date only timeseries
df['date'] = df['created_at'].dt.date

# Group by date and sort by retweet count
df = df.groupby('date').apply(lambda x: x.sort_values('retweet_count',ascending = False)).reset_index(drop=True)

# Take the top n per day
df = df.groupby('date').head(TOP_N_PER_DAY)

# Convert the pandas timeseries back to a datetime
df['date'] = pd.to_datetime(df['date'])

# Convert the dataframe into a list of dicts
records = df.to_dict('records')

# Insert the filtered tweets into a new collection
db.tweets_filtered.insert_many(records)

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...