Mongodb聚合

neevop 三月 12, 2023

聚合管道

mongosh为调用聚合框架就要定义一个管道,聚合管道(aggregation pipeline)中的每一步输出都作为下一步的输入。聚合框架支持10个管道操作符:

  • $project —— 指定要处理的字段

    // 返回字段的投影管道操作符
    db.users.aggregate([
      {$match: {username: 'kbanker',
      					hashed_password: 'xxxxx'}},
      {$project: {first_name: 1, last_name: 1}}
    ])
    
  • $group —— 根据指定的key进行分组,等价于SQL的GROUP BY语句,提供min, max, average等统计功能。

    // 通过月和年统计销售信息
    db.orders.aggregate([
      {$match: {purchase_date: {$gte: new Date(2010, 0, 1)}}},
      {$group: {
        _id: {year: {$year: '$purchase_data'},
             month: {$month: '$purchase_data'}},
        count: {$sum: 1},
        total: {$sum: '$sub_total'}}},
      {$sort: {_id: -1}}
    ]);
    

    这个例子演示了2个重塑函数:$year, $month,只有_id字段可以使用重塑功能(reshaping)。

    // $group函数
    $addToSet —— 为组里唯一的值创建一个数组集合中的元素必须确保唯一
    $first    —— 组里的第一个值 只有前缀`$sort`才有意义
    $last     —— 组里的最后一个值只有前缀`$sort`才有意义
    $max      —— 组里某个字段的最大值
    $min      —— 组里某个字段的最小值
    $avg      —— 组里某个字段的平均值
    $push     —— 返回组内所有值的数组
    $sum      —— 求组内所有值的和
    
  • $match —— 选择要处理的文档,与find()类似

  • $limit —— 限制传递到下一步的文档数量

  • $skip —— 不传递给下一步的文档数量

  • $sort —— 对文档进行排序

    // 分页查询的例子
    review2 = db.reviews.aggregate([
      {$match: {'product_id': product['_id']}},
      {$skip: (page_num - 1) * 12},
      {$limit: 12},
      {$sort: {'helpful_votes': -1}}
    ]).toArray();
    

    一个意外之处是find() $where函数,允许我们使用Javascript表达式选择文档。$where不能使用聚合框架的$match操作符。

  • $unwind —— 扩展数组,为每个数组元素生成一个输出文档

    db.products.findOne({}, {category_ids: 1})
    //{"_id": ObjectId("xxx"), "category_ids": [ObjectId("xxx2"), ObjectId("xxx3")]}
    db.products.aggregate([
      {$project: {category_ids: 1}},
      {$unwind: '$category_ids'},
      {$limit: 2}
    ]);
    // {"_id": ObjectId("xxx"), "category_ids": ObjectId("xxx2")}
    // {"_id": ObjectId("xxx1"), "category_ids": ObjectId("xxx3")}
    
  • $out —— 把管道的结果输入一个集合里

    // 使用`$out`管道操作符保存结果在 targetedCustomers 集合里
    // 选择曼哈顿上城区的订单
    upperManhattanOrders = {'shopping_address.zip': {$gte: 10019, $lt: 10040}};
    // 根据客户求和
    sumByUserId = {_id: '$user_id', total: {$sum: '$sub_total'}};
    // 筛选总额大于1000美元的客户
    orderTotalLarge = {total: {$gt: 1000}};
    // 降序排列订单总额
    sortTotalDesc = {total: -1};
    // 整个管道调用如下
    db.orders.aggregate([
      {$match: upperManhattanOrders},
      {$group: sumByUserId},
      {$match: orderTotalLarge},
    	{$sort: sortTotalDesc}
      {$out: 'targetedCustemers'}
    ]);
    
  • $redact —— 控制特定数据的访问

  • $geoNear —— 地理位置附近

重塑文档

最简单的重塑功能就是把一个字段进行重命名,生成一个新的字段。也可以通过修改或者创建一个新文档来重塑一个文档。

除了上述重构之外,我们也可以使用不同的重塑函数来创建新的字段。重塑函数根据处理数据类型的不同进行分组:字符串、算数运算、日期、逻辑、集合、其他类型。

字符串函数:

  • $concat —— 连接2个或者跟多的字符串为一个字符串
  • $strcasecmp —— 大小写铭感的比较,返回数字
  • $substr —— 获取字符串的子串
  • $toLower —— 转换为小写字符串
  • $toUpper —— 转换为大写字符串
db.users.aggregate([
  {$match: {username: 'kbanker'}},
  {$project: 
  	{name: {$concat: ['$first_name', ' ', '$last_name']},
    firstInitial: {$substr: ['$first_name', 0, 1]},
    usernameUppercase: {$toUpper: '$username'}}}
]);

算数运算函数:

  • $add —— 求和
  • $devide —— 除法
  • $mod —— 求余数
  • $multiply —— 乘法
  • subtract —— 减法

日期函数:

  • dayOfYear
  • dayOfMonth
  • dayOfWeek —— 1表示周日
  • year
  • month
  • week
  • hour
  • minute
  • second
  • millisecond —— 0~999

逻辑函数:

  • $and true —— 如果数组的所有值都为true, 则返回true
  • $cmp —— 两个数相等返回0
  • $cond if ... then ... else —— 三元操作符
  • $eq
  • $gt
  • $gte
  • $ifNull —— 把null值/表达式转换为特定的值
  • $lt
  • $lte
  • $ne
  • $not
  • $or

集合运算符:

  • $setEquals —— 两个集合元素完全相同,返回true

  • $setIntersection —— 返回两个集合的公共元素

  • $setDifference —— 返回第一个集合中与第二个集合不同的元素

  • $setUnion —— 合并集合

  • $setIsSubset —— 如果第二个集合是第一个集合的子集,返回true

  • $anyElementTrue —— 如果某个集合元素为true

  • allElementTrue —— 如果所有集合元素都为true

其他函数:

  • $meta —— 文本搜索
  • $size —— 返回数组大小, 可以用于判断数组是否包含某个元素,或者是否为空
  • $map —— 对数组的每个成员应用表达式,生成新的数组
  • $let —— 定义表达式内使用的变量, 允许我们使用临时变量,而不需要使用$project步骤
  • $literal —— 返回表达式的值,允许我们避免初始化字段值为0, 1, $的问题

聚合管道性能:

以下是主要影响聚合管道性能的关键点:

  • 尽早在管道里尝试减少文档的数量和大小
  • 索引只能用于$match$sort操作,而且可以大大加速查询
  • 在管道使用$match$sort之外的操作符后不能使用索引
  • 如果使用分片,则$match$sort会在单独的片上执行,其余的管道将会在主要片上执行

索引可以大大加快大集合选择性查询和排序,有时候特别是当使用聚合框架的时候要处理大量的数据,此时索引不是恰当的方式。通过聚合框架的explain()函数来研究一下如何判断查询是否使用了索引机制。

聚合管道选项

// 格式
db.collection.aggregate(pipeline, additionalOptions)
// `additionalOptions`是一个可选JSON对象
{explain: true, allowDiskUse: true, curser: {batchSize: n}}

我们可以为aggregate()传递第二个参数来指定聚合调用。选项参数如下:

  • explain() —— 运行管道并且只返回管道处理详细信息

    // 索引查询的`explain()`函数
    db.numbers.find({num: {"$gt": 19995}}).explain('executionStats')
    // 聚合函数的`explain()`函数
    countsByRating = db.reviews.aggregate([
      {$match: {'product_id': product['_id']}},
      {$group: {_id: '$rating', count: {$sum: 1}}}
    ], {explain: true});
    
  • allowDiskUse —— 使用磁盘存储数据

    处理超大型集合数据,可能会由于管道返回了超过MongoDB RAM内存限制的100MB数据而中途失败,返回错误aggregate failed。设置allowDiskUse: true参数就可以修复。该参数可能降低管道性能,只推荐在需要的时候使用。

  • cursor —— 指定初始批处理大小

    // 为避免程序崩溃,限制管道返回文档大小为16MB
    countsByRating = db.reviews.aggregate([
      {$match: {'producti_id': product['_id']}},
      {$group: {_id: '$rating', count: {$sum: 1}}}
    ], {cursor: {}})
    

    聚合管道返回的光标(cursor)结果支持如下调用:

    • cursor.hasNext() —— 确定结果集合是否包含下一个元素
    • cursor.next() —— 返回结果集合的下一个文档
    • cursor.toArray() —— 以数组返回结果
    • cursor.forEach() —— 遍历结果集合的每一行
    • cursor.map() —— 遍历结果集合的每一行,返回一个结果数组
    • cursor.itcount() —— 返回结果数量(仅作测试)
    • cursor.pretty() —— 显示格式化结果的数组

    光标允许我们处理大规模数据流,让我们在返回少量文档结果的时候处理大的结果集,因此可以减少一次性处理数据所需的内存。

其他聚合功能

聚合管道是处理MongoDB聚合查询数据的首选方式,也还有一些替代方式,例如:

  • .count()

    product = db.products.findOne({'slug': 'WheelBarrow-9092'})
    reviews_count = db.reviews.count({'product_id': product['_id']})  // 统计商品的评价
    
  • .distinct()

    // 返回一串快递订单的邮政编码信息,且不重复
    db.orders.distinct('shopping_address.zip')
    
  • .mapReduce()

    使用map-reduce就可以使用Javascript定义整个处理流程,提供了很大的灵活性,但是性能比聚合框架要低很多。编写过程十分复杂,而且比之前构建的聚合框架更加难以理解。虽然map-reduce没有淘汰,但是未来的改进都会在聚合框架上进行。