ThinkORM4流式查询最佳实践指南
流年 · 3个月前

ThinkORM 4.0 内置了大数据集的高效处理方法,可以有效避免大数据查询的内存溢出问题,提供了包括游标查询、惰性查询以及分段查询等方法。
## 概述
当处理大量数据时,传统的 `select()` 方法会将所有结果一次性加载到内存,可能导致内存溢出。
ThinkORM4 提供了以下解决方案:
1. cursor() - 游标查询,逐行返回数据
2. lazy() - 惰性加载,分批查询数据
以及上述两个方法的衍生方法
1. steam():内部调用cursor方法实现并支持回调
2. chunk():内部调用lazy方法实现并支持回调
## 使用方法
### 游标查询 (Cursor)
<strong>工作原理</strong>:
* 每次从数据库取一条数据
* 处理完这条后,再取下一条
* 直到所有数据处理完毕
<strong>适用场景</strong>:
* 导出大量数据到文件
* 数据清洗和转换
* 不需要随机访问数据的顺序处理
游标查询使用 PHP Generator 逐行返回数据,内存占用极低:
```
use think\facade\Db;
// 基础游标查询
foreach (Db::table('user')->cursor() as $user) {
// 处理单条记录
echo $user['name'] . PHP_EOL;
}
// 带条件的游标查询
$cursor = Db::table('order')
->where('status', 1)
->where('created_time', '>', '2024-01-01')
->cursor();
foreach ($cursor as $order) {
// 逐行处理订单
processOrder($order);
}
```
如果需要使用回调的话,可以直接使用`stream`方法
```
// 返回处理的数据数量
$count = Db::table('user')->stream(function($user) {
// 处理单条记录
echo $user['name'] . PHP_EOL;
// 返回 false 可以中断处理
// return false;
});
echo "总共处理了 {$count} 条数据";
// 带条件的游标查询
$count = Db::table('order')
->where('status', 1)
->where('created_time', '>', '2025-01-01')
->stream(function($order){
// 逐行处理订单
processOrder($order);
});
echo '处理订单数据:'. $count;
```
`stream`方法返回处理的数据数量。
> 如果需要在处理过程中进行中断操作,可以在回调方法中返回`false`即可。
### 惰性查询 (Lazy)
<strong>工作原理</strong>:
* 第一批:例如查询 1-1000 条数据
* 处理完第一批后
* 第二批:查询 1001-2000 条数据
* 以此类推...
<strong>优势</strong>:
* 支持关联查询(预加载)
* 支持链式操作(filter、map 等)
分批查询,每批数据单独查询:
```
// 默认每次查询1000条
foreach (Db::table('user')->lazy() as $user) {
// 处理用户
}
// 使用自定义字段分批
// 默认使用主键进行排序处理
// 如果没有主键需要指定一个具备连续值的字段
foreach (Db::table('order')->lazy(500, 'order_no', 'asc') as $order) {
// 处理订单
}
```
同样,`chunk`方法内部调用了`lazy`方法并提供了回调机制更方便处理数据
```
Db::table('user')->chunk(500, function($users){
foreach($users as $user) {
// 处理用户数据
}
});
```
需要注意的是,`chunk`回调方法的参数是批次数据,而不是单个数据。
> lazy查询方法最终会根据每批的数据数量来自动处理,对大数据量的查询可能会分成多条SQL来执行。
cursor和lazy方法都会返回LazyCollection数据集对象,关于更多LazyCollection的用法可以参考官方手册的[惰性数据集](https://doc.thinkphp.cn/@think-orm/v4_0/lazy-collection.html)部分。
## 注意事项
### 缓存与流式查询
流式查询不适合与查询缓存一起使用,因为:
* 游标查询使用了 Generator机制,无法序列化缓存
* 流式查询的目的是处理大数据集,缓存会失去意义
如果需要使用查询缓存,应该使用 `select()` 方法:
```
// 正确:使用 select 进行缓存
$users = Db::table('user')
->where('status', 1)
->cache(600)
->select();
// 错误:cursor 不支持缓存
$cursor = Db::table('user')
->cache(600) // 这个会被忽略
->cursor();
```
### 关联查询与流式处理
在使用模型的流式查询时,如果需要使用预载入关联查询,可以使用`lazy`方法
```
User::with(['profile'])->lazy()
->each(function($user) {
echo $user->profile->bio;
});
```
`cursor`查询默认不支持关联,但可以调用`LazyCollection`数据集对象的load方法实现:
```
User::cursor()->load(['profile'])
->each(function($user) {
echo $user->profile->bio;
});
```
### 模型中使用
在模型中同样可以使用流式查询:
```
use app\model\User;
// 模型游标查询
foreach (User::cursor() as $user) {
// $user 是 User 模型实例
echo $user->name;
}
// 带条件的流式处理
User::where('status', 1)->cursor()
->each(function($user) {
$user->last_visit = time();
$user->save();
});
```
### 无缓冲查询(MySQL专用)
对于超大数据集,MySQL数据库可以使用无缓冲查询模式。
```
// 在非MySQL数据库上,无缓冲查询模式选项会被忽略
// 不会抛出异常,只是使用普通游标
$cursor = Db::table('big_table')->cursor(true);
foreach ($cursor as $row) {
// 处理数据
// 注意:无缓冲查询模式下,必须读取完所有结果才能执行下一个查询
}
```
### 中断处理
所有流式查询都支持中断:
```
// cursor/stream 返回 false 中断
Db::table('user')->stream(function($user) {
if ($user['id'] == 100) {
return false; // 停止处理
}
// 处理数据
});
// chunk 返回 false 中断
Db::table('user')->chunk(100, function($users) {
// 处理批次
if (time() > $endTime) {
return false; // 超时停止
}
});
```
### 使用 LazyCollection 的强大功能
`cursor()` 和 `lazy()` 都返回 `LazyCollection`,支持丰富的集合操作:
```
Db::table('orders')
->lazy()
->filter(fn($order) => $order['amount'] > 100)
->map(fn($order) => [
'amount' => $order['amount'] * 1.1 // 加价 10%
])
->take(1000) // 只处理前 1000 条
->each(function ($chunk) {
// 处理数据块
});
```
## 使用场景
### 场景1:导出大量数据到CSV
```
// 使用 cursor - 内存占用最小
$file = fopen('export.csv', 'w');
fputcsv($file, ['id', 'name', 'email', 'register_time']);
Db::table('users')
->field('id,name,email,register_time')
->cursor(true) // 无缓冲查询
->each(function ($user) use ($file) {
fputcsv($file, [
$user['id'],
$user['name'],
$user['email'],
$user['register_time']
]);
});
fclose($file);
```
### 场景2:数据清洗和转换
```
// 使用 lazy - 支持链式操作
$processedCount = Db::table('raw_data')
->lazy(500)
->filter(function ($row) {
// 过滤无效数据
return !empty($row['email']) && filter_var($row['email'], FILTER_VALIDATE_EMAIL);
})
->map(function ($row) {
// 数据转换
return [
'email' => strtolower($row['email']),
'name' => ucwords($row['name']),
'processed_at' => date('Y-m-d H:i:s')
];
})
->each(function ($data) {
// 保存处理后的数据
Db::table('clean_data')->insert($data);
})
->count();
echo "Processed $processedCount records\n";
```
### 场景3:处理关联数据
```
// 使用 lazy - 支持关联查询
use app\model\User;
// 批量加载用户及其订单
User::with(['orders'])
->lazy(100)
->each(function ($user) {
echo "User: {$user->name}\n";
foreach ($user->orders as $order) {
echo " Order #{$order->id}: {$order->total}\n";
}
});
// 使用 chunk 处理关联数据
User::with(['orders', 'profile'])
->chunk(50, function ($users) {
foreach ($users as $user) {
// 导出用户完整信息
$data = [
'user' => $user->toArray(),
'orders' => $user->orders->toArray(),
'profile' => $user->profile->toArray()
];
file_put_contents(
"user_{$user->id}.json",
json_encode($data)
);
}
});
```
## 总结
流式查询是处理大数据的利器,选择合适的方法可以让你的应用在处理大量数据时依然保持高效。记住:
* **简单导出** → `cursor()` / `stream()`
* **需要关联** → `lazy()`
* **批量操作** → `chunk()`
* **小数据量** → `select()`
### 性能优化建议
1. <strong>选择合适的方法</strong>:根据数据量大小和需求选择最合适的方法
2. <strong>合理设置批次大小</strong>:`lazy()` 和 `chunk()` 的批次大小一般在 500-2000 之间
3. <strong>避免在循环中执行查询</strong>:这会导致 N+1 查询问题
掌握这些方法,你就能轻松应对各种大数据处理场景!
<br>
资讯来源:https://doc.thinkphp.cn/@wiki/thinkorm-steam-query-guide.html
推荐资讯
-
你和专业文档手册之间,只差一个“录制”按钮
2025年11月12日
-
客服团队的效率革命:培训文档制作时间立省80%,告别无效内耗
2025年11月12日
-
从AI焦虑到AI从容:给企业的AI转型心理指南
2025年11月12日
-
从"文档"到"知识资产":企业知识管理的三个进化阶段
2025年11月12日
-
ThinkWiki上线智写流程,一键生成用户手册
2025年10月24日
最新资讯
-
你和专业文档手册之间,只差一个“录制”按钮
2025年11月12日
-
客服团队的效率革命:培训文档制作时间立省80%,告别无效内耗
2025年11月12日
-
从AI焦虑到AI从容:给企业的AI转型心理指南
2025年11月12日
-
从"文档"到"知识资产":企业知识管理的三个进化阶段
2025年11月12日
-
ThinkWiki上线智写流程,一键生成用户手册
2025年10月24日