从流式输出到服务端推送技术再到 Java 的 WebFlux
大模型流式输出
像 ChatGPT 这样的网页,我们不难发现问出问题后,大模型吐字是一段接一段的,但我们传统的 Http 请求,一般是每次获取一段数据就要再次发起请求一次。这是一种耗费资源的方式,简言之就是 Http 轮询(短轮询和长轮询 Comet),所以服务端主动推送数据的计数就应运而生。
服务端主动推送技术
这里抛开 Http 轮询计数,主要涉及到了 SSE 和 WebSocket,其实 SSE 和 WebSocket 都是服务于 “实时” 二字的。
SSE 协议
SSE(Server Send Events),顾名思义,服务端发送事件,是指服务端能够主动给客户端发送消息。其基于 Http 协议,需要按照 SSE 协议规范在消息响应体中填充数据,如果需要 SSE 协议,则需要 Http 长连接(默认),并且将请求中的 content-type 设置为 text/event-stream。
其原理实际上是在建立好的 Http 连接上,于客户端协商,返回的类型不为一次性的数据包,而是返回一个 Stream。而基于这个 Strem,服务器可以不断的往内部填入数据,客户端也可以依次接受数据。
其实 SSE 是比较常见的,因为很多时候,只需要服务器推送给客户端,而客户端不需要给服务器发送内容,比如说在一个常用开源容器监控系统 Dozzle 中,就能看到其身影。可以类比,如果一个系统,类似比赛的看板或者日志的看板,就比较适合用 SSE 协议。
因为 SSE 并非一个完全新的协议,而是使用了 Http 协议的功能,并定义一系列规范,所以 SSE 的优点就是:
- 轻量级(并非全新协议)(相较于 WebSocket)
- 基于 Http,基本上所有的浏览器都支持、
- 支持断开重连
缺点:
- 单工通讯,建立的连接只支持服务器向客户端推送
WebSocket 协议
WebSocket 是一个 Http 协议之外的独立协议,若要想将一个 Http 协议转化为 WebSocket 协议,需要将 Http 请求中的 upgrade 字段改为 websocket,并且将 connect 字段改为 upgrade,这样就能使用 WebSocket 协议。其需要在 Http 握手的基础上,再进行一次 WebSocket 握手。也就是说,WebSocket 只是在建立连接的时候,用到了 Http 协议。 WebSocket 建立了全双工的通讯管道,能实现真正的客户端服务器对等通讯。其优点:
- 全双工,而 SSE 只是半双工
- 独立协议,不依赖于 Http
- 支持跨域
缺点:
- 独立协议,协议栈复杂
- 不支持自动重连,需要在代码中考虑断开重连的情况
服务端实现 SSE 举例
Java 实现
这里直接举例一个大模型平台的 demo:
package com.lei.sse_test.controller;
import java.time.Duration;
import java.time.LocalTime;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@RestController
public class FluxController {
private String I = "1";
@GetMapping("/getStream")
public Flux<ServerSentEvent<SseResponse>> stremFlux() {
Flux<ServerSentEvent<SseResponse>> flux = Flux.interval(Duration.ofSeconds(1))
.map(seq -> {
SseResponse sseResponse = new SseResponse();
String data = I + I;
I = I + I;
sseResponse.setData(data);
return sseResponse;
})
.map(response -> {
return ServerSentEvent.<SseResponse>builder()
.id(LocalTime.now().toString())
.event("complete")
.data(response)
.build();
})
.startWith(Mono.fromCallable(() -> {
SseResponse sseResponse = new SseResponse();
sseResponse.setData("Begin");
return ServerSentEvent.<SseResponse>builder()
.id(LocalTime.now().toString())
.event("messages")
.data(sseResponse)
.build();
}))
.doOnNext(response -> {
if (response.data().getData().equals("11")) {
System.out.println("doOnNext");
}
})
.doOnCancel(() -> {
I = "1";
System.out.println("cancel");
});
return flux;
}
}
这里后台是模拟大模型的输出,用每秒发送一个通知的形式来模拟。启动后台,发送请求可以看到:
服务端:
上面的代码内容也很简单,就是在请求到来的时候,创建一个 Flux 对象,然后注册了一系列回调函数,例如:
- doOnNext(),当数据流经过的时候,检查触发
- doOnCancel(),当订阅者取消订阅的时候,检查触发 等等在 Flux 中还有许多这样的回调可以注册,非常的方便。 这就是 Flux 的用法,所以 Flux 是什么呢?
Reactor
三大概念:
- Publisher:数据的来源
- Subscriber:数据的消费者
- Subscription:数据流中间的一些规定,比如说背压或者其他注册一些自定义回调也算
Flux & Mono
按照官方的说法,Flux 代表了 0…N 个元素的 Publisher,而 Mono 代表了 0…1 个元素的 Publisher。Mono 可以转化为 Flux,可以说 Mono 是 Flux 的一个子集。