由 neevop 三月 12, 2023
聚合管道
mongosh
为调用聚合框架就要定义一个管道,聚合管道(aggregation pipeline)中的每一步输出都作为下一步的输入。聚合框架支持10个管道操作符:
-
$project —— 指定要处理的字段
// 返回字段的投影管道操作符 db.users.aggregate([ {$match: {username: 'kbanker', hashed_password: 'xxxxx'}}, {$project: {first_name: 1, last_name: 1}} ])
-
$group —— 根据指定的key进行分组,等价于SQL的
GROUP BY
语句,提供min
,max
,average
等统计功能。// 通过月和年统计销售信息 db.orders.aggregate([ {$match: {purchase_date: {$gte: new Date(2010, 0, 1)}}}, {$group: { _id: {year: {$year: '$purchase_data'}, month: {$month: '$purchase_data'}}, count: {$sum: 1}, total: {$sum: '$sub_total'}}}, {$sort: {_id: -1}} ]);
这个例子演示了2个重塑函数:
$year
,$month
,只有_id
字段可以使用重塑功能(reshaping
)。// $group函数 $addToSet —— 为组里唯一的值创建一个数组,集合中的元素必须确保唯一 $first —— 组里的第一个值。 只有前缀`$sort`才有意义 $last —— 组里的最后一个值。只有前缀`$sort`才有意义 $max —— 组里某个字段的最大值 $min —— 组里某个字段的最小值 $avg —— 组里某个字段的平均值 $push —— 返回组内所有值的数组 $sum —— 求组内所有值的和
-
$match —— 选择要处理的文档,与
find()
类似 -
$limit —— 限制传递到下一步的文档数量
-
$skip —— 不传递给下一步的文档数量
-
$sort —— 对文档进行排序
// 分页查询的例子 review2 = db.reviews.aggregate([ {$match: {'product_id': product['_id']}}, {$skip: (page_num - 1) * 12}, {$limit: 12}, {$sort: {'helpful_votes': -1}} ]).toArray();
一个意外之处是
find() $where
函数,允许我们使用Javascript表达式选择文档。$where
不能使用聚合框架的$match
操作符。 -
$unwind —— 扩展数组,为每个数组元素生成一个输出文档
db.products.findOne({}, {category_ids: 1}) //{"_id": ObjectId("xxx"), "category_ids": [ObjectId("xxx2"), ObjectId("xxx3")]} db.products.aggregate([ {$project: {category_ids: 1}}, {$unwind: '$category_ids'}, {$limit: 2} ]); // {"_id": ObjectId("xxx"), "category_ids": ObjectId("xxx2")} // {"_id": ObjectId("xxx1"), "category_ids": ObjectId("xxx3")}
-
$out —— 把管道的结果输入一个集合里
// 使用`$out`管道操作符保存结果在 targetedCustomers 集合里 // 选择曼哈顿上城区的订单 upperManhattanOrders = {'shopping_address.zip': {$gte: 10019, $lt: 10040}}; // 根据客户求和 sumByUserId = {_id: '$user_id', total: {$sum: '$sub_total'}}; // 筛选总额大于1000美元的客户 orderTotalLarge = {total: {$gt: 1000}}; // 降序排列订单总额 sortTotalDesc = {total: -1}; // 整个管道调用如下 db.orders.aggregate([ {$match: upperManhattanOrders}, {$group: sumByUserId}, {$match: orderTotalLarge}, {$sort: sortTotalDesc} {$out: 'targetedCustemers'} ]);
-
$redact —— 控制特定数据的访问
-
$geoNear —— 地理位置附近
重塑文档
最简单的重塑功能就是把一个字段进行重命名,生成一个新的字段。也可以通过修改或者创建一个新文档来重塑一个文档。
除了上述重构之外,我们也可以使用不同的重塑函数来创建新的字段。重塑函数根据处理数据类型的不同进行分组:字符串、算数运算、日期、逻辑、集合、其他类型。
字符串函数:
$concat
—— 连接2个或者跟多的字符串为一个字符串$strcasecmp
—— 大小写铭感的比较,返回数字$substr
—— 获取字符串的子串$toLower
—— 转换为小写字符串$toUpper
—— 转换为大写字符串
db.users.aggregate([
{$match: {username: 'kbanker'}},
{$project:
{name: {$concat: ['$first_name', ' ', '$last_name']},
firstInitial: {$substr: ['$first_name', 0, 1]},
usernameUppercase: {$toUpper: '$username'}}}
]);
算数运算函数:
$add
—— 求和$devide
—— 除法$mod
—— 求余数$multiply
—— 乘法subtract
—— 减法
日期函数:
dayOfYear
dayOfMonth
dayOfWeek
—— 1表示周日year
month
week
hour
minute
second
millisecond
—— 0~999
逻辑函数:
$and true
—— 如果数组的所有值都为true, 则返回true$cmp
—— 两个数相等返回0$cond if ... then ... else
—— 三元操作符$eq
$gt
$gte
$ifNull
—— 把null值/表达式转换为特定的值$lt
$lte
$ne
$not
$or
集合运算符:
-
$setEquals
—— 两个集合元素完全相同,返回true -
$setIntersection
—— 返回两个集合的公共元素 -
$setDifference
—— 返回第一个集合中与第二个集合不同的元素 -
$setUnion
—— 合并集合 -
$setIsSubset
—— 如果第二个集合是第一个集合的子集,返回true -
$anyElementTrue
—— 如果某个集合元素为true -
allElementTrue
—— 如果所有集合元素都为true
其他函数:
$meta
—— 文本搜索$size
—— 返回数组大小, 可以用于判断数组是否包含某个元素,或者是否为空$map
—— 对数组的每个成员应用表达式,生成新的数组$let
—— 定义表达式内使用的变量, 允许我们使用临时变量,而不需要使用$project
步骤$literal
—— 返回表达式的值,允许我们避免初始化字段值为0
,1
,$
的问题
聚合管道性能:
以下是主要影响聚合管道性能的关键点:
- 尽早在管道里尝试减少文档的数量和大小
- 索引只能用于
$match
和$sort
操作,而且可以大大加速查询 - 在管道使用
$match
和$sort
之外的操作符后不能使用索引 - 如果使用分片,则
$match
和$sort
会在单独的片上执行,其余的管道将会在主要片上执行
索引可以大大加快大集合选择性查询和排序,有时候特别是当使用聚合框架的时候要处理大量的数据,此时索引不是恰当的方式。通过聚合框架的explain()
函数来研究一下如何判断查询是否使用了索引机制。
聚合管道选项
// 格式
db.collection.aggregate(pipeline, additionalOptions)
// `additionalOptions`是一个可选JSON对象
{explain: true, allowDiskUse: true, curser: {batchSize: n}}
我们可以为aggregate()
传递第二个参数来指定聚合调用。选项参数如下:
-
explain()
—— 运行管道并且只返回管道处理详细信息// 索引查询的`explain()`函数 db.numbers.find({num: {"$gt": 19995}}).explain('executionStats') // 聚合函数的`explain()`函数 countsByRating = db.reviews.aggregate([ {$match: {'product_id': product['_id']}}, {$group: {_id: '$rating', count: {$sum: 1}}} ], {explain: true});
-
allowDiskUse
—— 使用磁盘存储数据处理超大型集合数据,可能会由于管道返回了超过MongoDB RAM内存限制的100MB数据而中途失败,返回错误
aggregate failed
。设置allowDiskUse: true
参数就可以修复。该参数可能降低管道性能,只推荐在需要的时候使用。 -
cursor
—— 指定初始批处理大小// 为避免程序崩溃,限制管道返回文档大小为16MB countsByRating = db.reviews.aggregate([ {$match: {'producti_id': product['_id']}}, {$group: {_id: '$rating', count: {$sum: 1}}} ], {cursor: {}});
聚合管道返回的光标(cursor)结果支持如下调用:
cursor.hasNext()
—— 确定结果集合是否包含下一个元素cursor.next()
—— 返回结果集合的下一个文档cursor.toArray()
—— 以数组返回结果cursor.forEach()
—— 遍历结果集合的每一行cursor.map()
—— 遍历结果集合的每一行,返回一个结果数组cursor.itcount()
—— 返回结果数量(仅作测试)cursor.pretty()
—— 显示格式化结果的数组
光标允许我们处理大规模数据流,让我们在返回少量文档结果的时候处理大的结果集,因此可以减少一次性处理数据所需的内存。
其他聚合功能
聚合管道是处理MongoDB聚合查询数据的首选方式,也还有一些替代方式,例如:
-
.count()
product = db.products.findOne({'slug': 'WheelBarrow-9092'}) reviews_count = db.reviews.count({'product_id': product['_id']}) // 统计商品的评价
-
.distinct()
// 返回一串快递订单的邮政编码信息,且不重复 db.orders.distinct('shopping_address.zip')
-
.mapReduce()
使用
map-reduce
就可以使用Javascript定义整个处理流程,提供了很大的灵活性,但是性能比聚合框架要低很多。编写过程十分复杂,而且比之前构建的聚合框架更加难以理解。虽然map-reduce
没有淘汰,但是未来的改进都会在聚合框架上进行。