可读流
ReadableStream 表示一个 可读的数据流(data stream),用于逐步读取数据而不是一次性加载全部内容。
在 Scripting app 中,ReadableStream<Data> 通常用于:
- 处理网络响应中的流式数据(如
Response.body)
- 实现大文件的分块读取或实时下载
- 支持长连接或持续推送的数据(如 SSE、分块 JSON、日志流)
与标准 Web API 一致,Scripting 的 ReadableStream 允许异步迭代(for await...of)以及通过读取器 (ReadableStreamDefaultReader) 手动读取流内容。
定义
1class ReadableStream<T = any> {
2 constructor(underlyingSource?: UnderlyingSource<T>)
3
4 get locked(): boolean
5 cancel(reason?: any): Promise<void>
6 getReader(): ReadableStreamDefaultReader<T>
7 tee(): [ReadableStream<T>, ReadableStream<T>]
8}
基本概念
ReadableStream 代表一个“流式可消费的数据源”。
- 它不会立即持有全部数据,而是按需从源(网络、文件、生成器等)获取。
- 每个流只能由一个读取器(reader)读取,一旦被锁定 (
locked = true),必须释放或取消后才能再次读取。
属性说明
locked: boolean
指示当前流是否已被读取器(reader)锁定。
若为 true,则其他代码无法再调用 getReader() 或消费该流。
示例
1const reader = response.body.getReader()
2console.log(response.body.locked) // true
方法说明
getReader(): ReadableStreamDefaultReader<T>
返回一个 ReadableStreamDefaultReader 实例,用于逐步读取流中的数据块(chunk)。
每次调用 reader.read() 会返回一个 Promise,解析为 { value, done } 对象。
示例
1const reader = response.body.getReader()
2
3while (true) {
4 const { done, value } = await reader.read()
5 if (done) break
6 console.log("Received chunk:", value)
7}
cancel(reason?: any): Promise<void>
取消流的读取操作。
传入的 reason 可用于描述取消原因。
示例
1const reader = response.body.getReader()
2await response.body.cancel("User aborted reading")
tee(): [ReadableStream<T>, ReadableStream<T>]
将当前流复制成两个新的流。
每个分支都可独立消费数据,但需注意内存开销。
示例
1const [stream1, stream2] = response.body.tee()
2
3const reader1 = stream1.getReader()
4const reader2 = stream2.getReader()
ReadableStreamDefaultReader(读取器)
当通过 getReader() 获取读取器后,你可以手动控制数据的读取过程。
读取器定义
1interface ReadableStreamDefaultReader<T> {
2 read(): Promise<{ value: T; done: boolean }>
3 releaseLock(): void
4 cancel(reason?: any): Promise<void>
5}
方法说明:
| 方法 |
说明 |
| read() |
读取下一个数据块(chunk),返回 { value, done }。当 done = true 时,流已结束。 |
| releaseLock() |
释放读取器,使流可被其他消费者重新读取。 |
| cancel(reason?) |
取消流读取。 |
示例:读取响应流数据
1const reader = response.body.getReader()
2
3while (true) {
4 const { done, value } = await reader.read()
5 if (done) break
6
7 // 处理每个 Data 对象(chunk)
8 const text = value.toRawString()
9 console.log("Chunk:", text)
10}
11
12reader.releaseLock()
与 Response 的关系
Response.body 属性是一个 ReadableStream<Data>,可用于流式读取响应内容。
示例:实时处理网络响应
1const response = await fetch("https://example.com/stream")
2
3const reader = response.body.getReader()
4while (true) {
5 const { done, value } = await reader.read()
6 if (done) break
7 console.log("Received:", value.toRawString())
8}
这种方式可在 响应尚未完全结束时 实时处理部分数据,非常适合:
- 实时日志输出
- 大文件下载进度控制
- AI/LLM 流式生成内容(如 ChatGPT 的逐字输出)
与 Data 的关系
在 Scripting 中,流的每个块(chunk)通常是一个 Data 实例。
你可以使用 Data 提供的方法(如 .toRawString()、.toUint8Array())读取或转换二进制内容。
示例:将流数据保存到文件
1const reader = response.body.getReader()
2const chunks: Data[] = []
3
4while (true) {
5 const { done, value } = await reader.read()
6 if (done) break
7 chunks.push(value)
8}
9
10const fileData = Data.combine(chunks)
11FileManager.write(fileData, "/local/download.bin")
示例:使用异步迭代器读取流
ReadableStream 支持异步迭代 (for await...of),可简化读取逻辑:
1for await (const chunk of response.body) {
2 console.log("Chunk size:", chunk.size)
3}
此语法会自动处理 done 状态,代码更简洁直观。
使用场景
| 场景 |
示例 |
| 大文件下载 |
按块读取网络响应并写入本地文件,避免内存占用过大。 |
| AI 输出流接收 |
实时接收服务器的推送内容(如 ChatGPT 流式响应)。 |
| 本地流式处理 |
对本地文件或输入流实现增量读取或实时处理。 |
注意事项
- 单次锁定:一个
ReadableStream 在被读取器锁定后,不能被多个消费者同时读取。
- 内存管理:流式处理有助于降低内存占用,但应及时释放或取消读取器以防资源泄漏。
- 错误处理:读取过程中出现错误(如网络断开)会导致
read() Promise 拒绝,应使用 try...catch 捕获。
- Data 类型约定:在
Response.body 中,流的每个块类型为 Data,而不是普通字符串或字节数组。
小结
ReadableStream 是 Scripting 数据流架构的核心组件,为开发者提供了高效的流式数据读取方式:
- 支持异步逐块读取
- 可与
fetch()、Response、Data 无缝集成
- 适用于实时处理、分块下载、长连接流等高级场景
- 完全兼容 Web 标准的 Streams API