网站首页 > 教程文章 正文
在使用 Spring Cloud Gateway 时,虽然其本身支持 SSE(Server-Sent Events)协议,但如果不进行特殊处理,数据通常会一次性发送到客户端,而无法实现流式输出。本文整理了一套完整的解决方案,帮助开发者解决这一问题。
问题分析
- 流式输出中断 默认情况下,Spring Cloud Gateway 在代理 SSE 请求时,可能会将数据缓冲,导致数据无法流式发送。
- 立即刷新的必要性 SSE 协议要求数据即时推送到客户端,因此每次写入数据后需要调用 flush(),否则数据会堆积在缓冲区。
解决方案
方案 1:在 Gateway 层处理 SSE 请求
通过自定义响应过滤器,拦截和处理 SSE 响应,确保每次写入后立即刷新数据。
@Component
public class SseResponseFilter extends AbstractGatewayFilterFactory<Object> {
@Override
public GatewayFilter apply(Object config) {
return (exchange, chain) -> {
ServerHttpResponse response = exchange.getResponse();
DataBufferFactory bufferFactory = response.bufferFactory();
return chain.filter(exchange).then(Mono.defer(() -> {
Flux<? extends DataBuffer> fluxBody = response.getBody();
if (fluxBody != null) {
return response.writeWith(
fluxBody.map(dataBuffer -> {
byte[] content = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(content);
String responseData = new String(content, StandardCharsets.UTF_8);
// 日志记录或数据拦截
if (StringUtils.isNotBlank(responseData)) {
log.info("Intercepted Response: " + responseData);
}
// 确保每次写入后立即刷新
try {
exchange.getResponse().flush();
} catch (IOException e) {
log.error("Flush error", e);
}
return bufferFactory.wrap(content);
})
);
} else {
return Mono.empty();
}
}));
};
}
}
关键点
- 使用 Flux 处理流式数据,确保每段数据都可以被实时处理。
- 显式调用 flush() 确保数据被即时发送到客户端。
方案 2:在应用服务器中处理 SSE 请求
对于直接处理 SSE 的应用服务器,可以通过自定义过滤器实现立即刷新的功能。
public class SseFilter extends OncePerRequestFilter {
@Override
protected void doFilterInternal(
HttpServletRequest request,
HttpServletResponse response,
FilterChain chain) throws ServletException, IOException {
if ("text/event-stream".equals(request.getHeader("Accept"))) {
chain.doFilter(request, new SseResponseWrapper(response));
} else {
chain.doFilter(request, response);
}
}
static class SseResponseWrapper extends HttpServletResponseWrapper {
public SseResponseWrapper(HttpServletResponse response) {
super(response);
}
@Override
public ServletOutputStream getOutputStream() throws IOException {
ServletOutputStream sos = super.getOutputStream();
return new ServletOutputStream() {
@Override
public boolean isReady() {
return sos.isReady();
}
@Override
public void setWriteListener(WriteListener listener) {
sos.setWriteListener(listener);
}
@Override
public void write(int b) throws IOException {
sos.write(b);
flush();
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
sos.write(b, off, len);
flush();
}
@Override
public void flush() throws IOException {
sos.flush();
}
@Override
public void close() throws IOException {
sos.close();
}
};
}
}
}
关键点
- 自定义 HttpServletResponseWrapper,确保每次写入数据后立即调用 flush()。
- 通过过滤器判断请求头中的 Accept 是否为 text/event-stream,以区分 SSE 请求。
示例代码:服务端实现流式响应
以下是一个简单的 SSE 响应示例:
@RestController
@RequestMapping("/sse")
public class SseController {
@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public void streamEvents(HttpServletResponse response) throws IOException {
PrintWriter writer = response.getWriter();
for (int i = 0; i < 10; i++) {
writer.write("data: Event " + i + "\n\n");
writer.flush();
try {
Thread.sleep(1000); // 模拟事件生成
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
验证效果
- 使用浏览器 打开浏览器访问 /sse 接口,即可看到实时输出的事件流。
- http://localhost:8080/sse
总结
- Gateway 处理:通过过滤器对响应体进行拦截和处理,适合代理流式请求。
- 应用服务器处理:通过自定义 HttpServletResponseWrapper 实现实时数据推送。
- 即时性保障:无论在哪层处理,都需要显式调用 flush() 以满足 SSE 协议要求。
猜你喜欢
- 2025-05-27 还在为 Spring Boot3 整合 Easy Excel 发愁?一文搞定!
- 2025-05-27 7种方式,教你提升 SpringBoot 项目的吞吐量
- 2025-05-27 Tomcat处理HTTP请求流程解析
- 2025-05-27 SpringBoot中六种设计模式应用案例二
- 2025-05-27 生产实战:循环继承依赖 导致web 项目启动失败,总是超时
- 2025-05-27 Java Web应用调优线程池:没你想的那么复杂
- 2025-05-27 SpringMVC异常处理句柄这些细节你知道吗?
- 2025-05-27 惊呆了!Controller接口返回值支持17种逆天类型
- 2025-05-27 【干货】EasyExcel确实好用,Springboot+EasyExcel实操
- 2025-05-27 Spring Boot 应用如何防护 XSS 攻击
- 最近发表
- 标签列表
-
- location.href (44)
- document.ready (36)
- git checkout -b (34)
- 跃点数 (35)
- 阿里云镜像地址 (33)
- qt qmessagebox (36)
- mybatis plus page (35)
- vue @scroll (38)
- 堆栈区别 (33)
- 什么是容器 (33)
- sha1 md5 (33)
- navicat导出数据 (34)
- 阿里云acp考试 (33)
- 阿里云 nacos (34)
- redhat官网下载镜像 (36)
- srs服务器 (33)
- pico开发者 (33)
- https的端口号 (34)
- vscode更改主题 (35)
- 阿里云资源池 (34)
- os.path.join (33)
- redis aof rdb 区别 (33)
- 302跳转 (33)
- http method (35)
- js array splice (33)