从流式输出到服务端推送技术再到 Java 的 WebFlux

大模型流式输出

像 ChatGPT 这样的网页,我们不难发现问出问题后,大模型吐字是一段接一段的,但我们传统的 Http 请求,一般是每次获取一段数据就要再次发起请求一次。这是一种耗费资源的方式,简言之就是 Http 轮询(短轮询和长轮询 Comet),所以服务端主动推送数据的计数就应运而生。

服务端主动推送技术

这里抛开 Http 轮询计数,主要涉及到了 SSE 和 WebSocket,其实 SSE 和 WebSocket 都是服务于 “实时” 二字的。

SSE 协议

SSE(Server Send Events),顾名思义,服务端发送事件,是指服务端能够主动给客户端发送消息。其基于 Http 协议,需要按照 SSE 协议规范在消息响应体中填充数据,如果需要 SSE 协议,则需要 Http 长连接(默认),并且将请求中的 content-type 设置为 text/event-stream。 其原理实际上是在建立好的 Http 连接上,于客户端协商,返回的类型不为一次性的数据包,而是返回一个 Stream。而基于这个 Strem,服务器可以不断的往内部填入数据,客户端也可以依次接受数据。 其实 SSE 是比较常见的,因为很多时候,只需要服务器推送给客户端,而客户端不需要给服务器发送内容,比如说在一个常用开源容器监控系统 Dozzle 中,就能看到其身影。可以类比,如果一个系统,类似比赛的看板或者日志的看板,就比较适合用 SSE 协议。 image.png

因为 SSE 并非一个完全新的协议,而是使用了 Http 协议的功能,并定义一系列规范,所以 SSE 的优点就是:

  1. 轻量级(并非全新协议)(相较于 WebSocket)
  2. 基于 Http,基本上所有的浏览器都支持、
  3. 支持断开重连

缺点:

  1. 单工通讯,建立的连接只支持服务器向客户端推送

WebSocket 协议

WebSocket 是一个 Http 协议之外的独立协议,若要想将一个 Http 协议转化为 WebSocket 协议,需要将 Http 请求中的 upgrade 字段改为 websocket,并且将 connect 字段改为 upgrade,这样就能使用 WebSocket 协议。其需要在 Http 握手的基础上,再进行一次 WebSocket 握手。也就是说,WebSocket 只是在建立连接的时候,用到了 Http 协议。 WebSocket 建立了全双工的通讯管道,能实现真正的客户端服务器对等通讯。其优点:

  1. 全双工,而 SSE 只是半双工
  2. 独立协议,不依赖于 Http
  3. 支持跨域

缺点:

  1. 独立协议,协议栈复杂
  2. 不支持自动重连,需要在代码中考虑断开重连的情况

服务端实现 SSE 举例

Java 实现

这里直接举例一个大模型平台的 demo:

package com.lei.sse_test.controller;

import java.time.Duration;
import java.time.LocalTime;

import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

@RestController
public class FluxController {

   private String I = "1";

   @GetMapping("/getStream")
   public Flux<ServerSentEvent<SseResponse>> stremFlux() {
       Flux<ServerSentEvent<SseResponse>> flux = Flux.interval(Duration.ofSeconds(1))
               .map(seq -> {
                   SseResponse sseResponse = new SseResponse();
                   String data = I + I;
                   I = I + I;
                   sseResponse.setData(data);
                   return sseResponse;
               })
               .map(response -> {
                   return ServerSentEvent.<SseResponse>builder()
                           .id(LocalTime.now().toString())
                           .event("complete")
                           .data(response)
                           .build();
               })
               .startWith(Mono.fromCallable(() -> {
                   SseResponse sseResponse = new SseResponse();
                   sseResponse.setData("Begin");
                   return ServerSentEvent.<SseResponse>builder()
                           .id(LocalTime.now().toString())
                           .event("messages")
                           .data(sseResponse)
                           .build();
               }))
               .doOnNext(response -> {
                   if (response.data().getData().equals("11")) {
                       System.out.println("doOnNext");
                   }
               })
               .doOnCancel(() -> {
                   I = "1";
                   System.out.println("cancel");
               });
                               
       return flux;
   }
}

这里后台是模拟大模型的输出,用每秒发送一个通知的形式来模拟。启动后台,发送请求可以看到: image.png 服务端: image.png 上面的代码内容也很简单,就是在请求到来的时候,创建一个 Flux 对象,然后注册了一系列回调函数,例如:

  1. doOnNext(),当数据流经过的时候,检查触发
  2. doOnCancel(),当订阅者取消订阅的时候,检查触发 等等在 Flux 中还有许多这样的回调可以注册,非常的方便。 这就是 Flux 的用法,所以 Flux 是什么呢?

Reactor

三大概念:

  1. Publisher:数据的来源
  2. Subscriber:数据的消费者
  3. Subscription:数据流中间的一些规定,比如说背压或者其他注册一些自定义回调也算

Flux & Mono

按照官方的说法,Flux 代表了 0…N 个元素的 Publisher,而 Mono 代表了 0…1 个元素的 Publisher。Mono 可以转化为 Flux,可以说 Mono 是 Flux 的一个子集。