云计算、AI、云原生、大数据等一站式技术学习平台

网站首页 > 教程文章 正文

使用 Spring Cloud Gateway 实现 SSE 协议的支持

jxf315 2025-05-27 15:21:25 教程文章 4 ℃

在使用 Spring Cloud Gateway 时,虽然其本身支持 SSE(Server-Sent Events)协议,但如果不进行特殊处理,数据通常会一次性发送到客户端,而无法实现流式输出。本文整理了一套完整的解决方案,帮助开发者解决这一问题。


问题分析

  1. 流式输出中断 默认情况下,Spring Cloud Gateway 在代理 SSE 请求时,可能会将数据缓冲,导致数据无法流式发送。
  2. 立即刷新的必要性 SSE 协议要求数据即时推送到客户端,因此每次写入数据后需要调用 flush(),否则数据会堆积在缓冲区。

解决方案

方案 1:在 Gateway 层处理 SSE 请求

通过自定义响应过滤器,拦截和处理 SSE 响应,确保每次写入后立即刷新数据。

@Component
public class SseResponseFilter extends AbstractGatewayFilterFactory<Object> {

    @Override
    public GatewayFilter apply(Object config) {
        return (exchange, chain) -> {
            ServerHttpResponse response = exchange.getResponse();
            DataBufferFactory bufferFactory = response.bufferFactory();

            return chain.filter(exchange).then(Mono.defer(() -> {
                Flux<? extends DataBuffer> fluxBody = response.getBody();

                if (fluxBody != null) {
                    return response.writeWith(
                            fluxBody.map(dataBuffer -> {
                                byte[] content = new byte[dataBuffer.readableByteCount()];
                                dataBuffer.read(content);
                                String responseData = new String(content, StandardCharsets.UTF_8);

                                // 日志记录或数据拦截
                                if (StringUtils.isNotBlank(responseData)) {
                                    log.info("Intercepted Response: " + responseData);
                                }

                                // 确保每次写入后立即刷新
                                try {
                                    exchange.getResponse().flush();
                                } catch (IOException e) {
                                    log.error("Flush error", e);
                                }

                                return bufferFactory.wrap(content);
                            })
                    );
                } else {
                    return Mono.empty();
                }
            }));
        };
    }
}

关键点

  • 使用 Flux 处理流式数据,确保每段数据都可以被实时处理。
  • 显式调用 flush() 确保数据被即时发送到客户端。

方案 2:在应用服务器中处理 SSE 请求

对于直接处理 SSE 的应用服务器,可以通过自定义过滤器实现立即刷新的功能。

public class SseFilter extends OncePerRequestFilter {
    @Override
    protected void doFilterInternal(
            HttpServletRequest request,
            HttpServletResponse response,
            FilterChain chain) throws ServletException, IOException {
        if ("text/event-stream".equals(request.getHeader("Accept"))) {
            chain.doFilter(request, new SseResponseWrapper(response));
        } else {
            chain.doFilter(request, response);
        }
    }

    static class SseResponseWrapper extends HttpServletResponseWrapper {

        public SseResponseWrapper(HttpServletResponse response) {
            super(response);
        }

        @Override
        public ServletOutputStream getOutputStream() throws IOException {
            ServletOutputStream sos = super.getOutputStream();
            return new ServletOutputStream() {
                @Override
                public boolean isReady() {
                    return sos.isReady();
                }

                @Override
                public void setWriteListener(WriteListener listener) {
                    sos.setWriteListener(listener);
                }

                @Override
                public void write(int b) throws IOException {
                    sos.write(b);
                    flush();
                }

                @Override
                public void write(byte[] b, int off, int len) throws IOException {
                    sos.write(b, off, len);
                    flush();
                }

                @Override
                public void flush() throws IOException {
                    sos.flush();
                }

                @Override
                public void close() throws IOException {
                    sos.close();
                }
            };
        }
    }
}


关键点

  • 自定义 HttpServletResponseWrapper,确保每次写入数据后立即调用 flush()。
  • 通过过滤器判断请求头中的 Accept 是否为 text/event-stream,以区分 SSE 请求。

示例代码:服务端实现流式响应

以下是一个简单的 SSE 响应示例:

@RestController
@RequestMapping("/sse")
public class SseController {

    @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public void streamEvents(HttpServletResponse response) throws IOException {
        PrintWriter writer = response.getWriter();
        for (int i = 0; i < 10; i++) {
            writer.write("data: Event " + i + "\n\n");
            writer.flush();
            try {
                Thread.sleep(1000); // 模拟事件生成
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

验证效果

  • 使用浏览器 打开浏览器访问 /sse 接口,即可看到实时输出的事件流。
  • http://localhost:8080/sse

总结

  1. Gateway 处理:通过过滤器对响应体进行拦截和处理,适合代理流式请求。
  2. 应用服务器处理:通过自定义 HttpServletResponseWrapper 实现实时数据推送。
  3. 即时性保障:无论在哪层处理,都需要显式调用 flush() 以满足 SSE 协议要求。
最近发表
标签列表