Server-Sent Events (SSE)异步客户端实现

最近遇到了一个解析和转发基于HTTP Server-Sent Events (SSE)应用协议的需求,其实现方法和普通HTTP接口调用差异较大。 本文将讨论该协议的异步客户端实现方法,希望能帮助到遇到类似需求的人。

1. SSE简介

SSE是一种基于HTTP的服务端向客户端推送消息的格式,其可以基于HTTP 1实现,比WebSocket简单。 SSE本质上是服务端通过一个长链接向客户端不停地推送消息,如下图所示:

sse.jpg

服务器发送的SSE响应格式如下:

  1. header: Content-Type必须为text/event-stream
  2. body由多个event构成,event间由​\n\n​分隔,这些event不必被一次发送完,而是如上图般被多次发送

中转SSE协议时不能使用普通的同步http client, 因为其仅当收到完整的http响应后才会返回,而SSE协议一般要求每收到一个SSE event后就做相应的处理。 因此,SSE协议中转时一般需要使用异步http client.

目前,如腾讯元宝等LLM大模型在聊天回复时就使用了SSE,以便流式地快速响应客户端请求。

2. 基于Apache HttpAsyncClient的异步SSE客户端实现

正如前文所言,中转SSE协议时不能使用普通的同步Apache HttpClient, 而是需要使用异步的Apache HttpAsyncClient,其可以实现每收到一点服务端数据后就回调某个用户自定义函数来处理之。

2.1. HttpAsyncClient接口简介

Apache HttpAsyncClient底层基于Java NIO, 其提供了异步接口来接收HTTP响应,该接口实现了Reactor语义,如下:

<T> Future<T> execute(HttpAsyncRequestProducer requestProducer, HttpAsyncResponseConsumer<T> responseConsumer, FutureCallback<T> callback);

public abstract class AsyncCharConsumer<T> extends AbstractAsyncResponseConsumer<T> implements HttpAsyncResponseConsumer<T> {
    protected abstract void onCharReceived(CharBuffer buf, IOControl ioControl) throws IOException;

    protected abstract void onResponseReceived(HttpResponse response) throws HttpException, IOException;

    protected abstract T buildResult(HttpContext context) throws Exception;
}

public interface FutureCallback<T> {
    void completed(T var1);

    void failed(Exception var1);

    void cancelled();
}

AsyncCharConsumer#onCharReceived(CharBuffer buf, IOControl ioControl)将在收到响应时被回调,其中的IOControl入参可被用于操作连接,比如关闭连接等。 FutureCallback#completed/failed()则分别在收到完整响应/出错时被回调。 这些回调函数都在HttpAsyncClient的IO Dispatcher Worker线程中执行,而其线程数可以通过IOReactorConfig.setIoThreadCount()来设置。

2.2. SSE粘包与拆包

Apache HttpAsyncClient AsyncCharConsumer#onCharReceived()收到只是数据片段,而非一个完整的SSE event, 因此会遇到类似于TCP粘包的问题。 为了解析出SSE event,需要对这些数据片段进行累积,其可以通过如下的SseParser来实现:

public class SseParser {
    private final StringBuilder sb = new StringBuilder();

    private int last = 0;

    public void append(String s) {
        sb.append(s);
    }

    /**
     * @return 返回一个SSE event
     */
    public String next() {
        for (int i = last; i < sb.length() - 1; ++i) {
            // SSE event以"\n\n"分割
            if (sb.charAt(i) == '\n' && sb.charAt(i + 1) == '\n') {
                String s = sb.substring(last, i);
                last = i + 2;
                return s;
            }
        }
        return null;
    }

    public boolean isEmpty() {
        return sb.isEmpty();
    }
}

2.3. SSE异步回调接口

一般而言,异步客户端在接收消息的不同阶段(如每收到一块数据,收到全部数据,数据接收时发送错误)一般需要回调不同的函数,因此可以仿照RxJava Observer接口定义一下异步SSE客户端回调接口,如下:

public interface AsyncSseCallback {
    /**
     * 收到一个SSE event时调用
     *
     * @param event
     * @return 是否成功处理event, 返回false将导致终止接收数据
     * @throw 可以抛出异常,异常将导致终止接收数据
     */
    boolean onNext(String event);

    /**
     * 收到全部数据时可以抛出异常,异常发生时将自动调用onError()
     *
     * @throw 可以抛出异常,异常发生时将自动调用onError()
     */
    void onComplete();

    /**
     * 出错时调用
     *
     * @throw 不能抛出异常
     * @param t
     */
    void onError(Throwable t);
}

onNext()返回类型为boolean, 目的是用户可以在接收过程中终止接收后续SSE event, 比如当用户处理当前event出错后可能就不再愿意处理后续event。 上述接口中除onError()外都可以抛出异常,SSE异步客户端在实现时会恰当地处理这些异常。

2.4. 核心实现

在上述内容的基础上,可以比较直观地实现异步SSE客户端,核心逻辑如下:

AsyncSseCallback consumer;
client.execute(
    new BasicAsyncRequestProducer(host, request),
    new AsyncCharConsumer<Void>() {
        @Override
        protected void onCharReceived(CharBuffer buf, IOControl ioControl) throws IOException {
            parser.append(buf.toString());
            String next;
            while ((next = parser.next()) != null) {
                try {
                    if (!consumer.onNext(next)) {
                        ioControl.shutdown();
                    }
                } catch (Throwable t) {
                    // client.execute()将调用FutureCallback#failed(), 最终将调用consumer#fail()
                    ioControl.shutdown();
                }
            }
        }

        @Override
        protected void onResponseReceived(HttpResponse response) throws HttpException, IOException {}

        @Override
        protected Void buildResult(HttpContext context) throws Exception {
            return null;
        }
    }, new FutureCallback<Void>() {
        @Override
        public void completed(Void unused) {
            try {
                consumer.onComplete();
            } catch (Throwable t) {
                consumer.onError(t);
            }
        }

        @Override
        public void failed(Exception e) {
            try {
                consumer.onError(e);
            } catch (Throwable t) {
                t.printStackTrace();
                //log.error("onError() should not throw exceptions", e);
            }
        }

        @Override
        public void cancelled() {}
 });

3. 基于Sogou Workflow的异步SSE客户端实现方法概述

在程序的某部分采用了异步接口后,代码中将充斥着大量callback (除非其它模块对将其转为同步), 而这些callback相比同步代码而言可读性一般较差。 Sogou Workflow提供了任务流来封装这些回调,使其可读性大为提升。 此外,Workflow提供的异步资源管理调度能力使其很适合异步程序编写。 因此,异步SSE客户端很适合采用Workflow来实现。

Workflow虽然原生支持HTTP协议,但未提供SSE协议支持。 使用Workflow来实现异步SSE客户端的几个要点如下:

  1. 基于已实现有的HTTP协议来实现SSE.
  2. Communicator handler thread中可以拿到connection fd, 可以基于其实现连接控制,以给上层业务提供类似于HttpAsyncClient onCharReceived() IOControl的回调入参。
  3. Workflow交付的是完整的协议消息,其在语义比Proactor走得更远。 因此,在SSE客户端实现需要注意:应该交付SSE event, 而不是完整的HTTP协议消息。

4. 总结

本文介绍了SSE协议以及其异步客户端实现方法,希望能对大家有所帮助。