Thrift Streaming
介绍
许多业务场景(例如 LLM 的流式响应、大量数据传输)需要 Streaming API:先在 client、server instance 之间建立一个 Stream,然后基于该 Stream 进行消息的单向或双向收发。
名词解释
PingPong API (KitexThrift)
Kitex 默认的 Thrift API 模式:
- 只支持 PingPong,不支持 Streaming API
- Thrift Payload 可能包含前缀 TTHeader、Framed,或二者的组合(TTHeader + Framed + Payload)
Unary API
从 gRPC 借用的词汇,特指在 Stream 上的 PingPong API。
基于 Stream 的(例如 HTTP2 stream)的 PingPong 请求,Client 发送一个 Message,Server 返回一个 Message,然后关闭 Stream。 因为有性能损失,不建议使用 Unary API,如无特殊需求请直接使用 KitexThrift PingPong API。 如确有需要,可通过 Thrift IDL 里的注解开启(详见后文)。
Streaming API
Streaming API 分成三类(参考 gRPC core concepts)。
Server Streaming
Client 发送一个 Message,Server 返回多个 Message,然后关闭 Stream。
Client Streaming
Client 发送多个 Message,Server 返回一个 Message,然后关闭 Stream。
Bidirectional Streaming
Client/Server 的收、发均为独立的流,可根据业务需求,按任意顺序执行 Recv、Send。
Streaming over HTTP2
该方案是基于 gRPC/HTTP2 实现的,将 Payload 的编码由 Protobuf 替换成 Thrift。
Getting Started
环境准备
安装支持 Thrift Streaming 的 Kitex (不低于 v0.9.0) 和 Thriftgo (不低于 v0.3.6):
go install github.com/cloudwego/thriftgo@latest
go install github.com/cloudwego/kitex/tool/cmd/kitex@latest
编写 IDL
Kitex 通过 streaming.mode
注解判断方法的 Streaming 类型。
取值 | 含义 | 说明 |
---|---|---|
bidirectional | Bidirectional streaming | 收、发是独立的流,业务可以按需处理 |
client | Client Side Streaming | Client 发送多个 Message,Server 返回一个 Message |
server | Server Side Streaming | Client 发送一个 Message,Server 返回多个 Message |
unary | Unary over Streaming | Client 发送一个 Message,Server 返回一个 Message (不建议使用, 性能损失较大) |
(其他值) | 无效,报错 |
注意:
-
Streaming API 有且只有 一个 request 和一个 response,否则 Kitex 会报错
-
Kitex 支持 在同一个 Service 里同时定义 PingPong API(非Streaming) 和 Streaming API
- Server 会自动探测协议、路由请求
-
不建议使用 Unary over HTTP2(性能损失较大),建议使用 PingPong API(KitexThrift )
-
streaming.mode 只能出现最多一次(不支持指定多个值),否则 Kitex 会报错
示例 IDL(下文的示例均基于该 IDL):
namespace go echo
struct Request {
1: optional string message,
}
struct Response {
1: optional string message,
}
service TestService {
Response Echo (1: Request req) (streaming.mode="bidirectional"),
Response EchoClient (1: Request req) (streaming.mode="client"),
Response EchoServer (1: Request req) (streaming.mode="server"),
// Response EchoUnary (1: Request req) (streaming.mode="unary"), // not recommended
Response EchoPingPong (1: Request req), // KitexThrift, non-streaming
}
生成项目脚手架
对于新增项目,先初始化项目目录:
mkdir demo-project && cd demo-project
module=demo
go mod init $module
Kitex 的使用与原 KitexThrift 项目一致,例如:
kitex -module $module -service demo-server api.thrift
注意:对于现有项目,也需要重新生成代码,并更新 go.mod 里的 Kitex 版本
然后执行:
go mod tidy
编写业务代码
创建 Stream Client(调用端)
注意:
- 对于 Streaming API,需要创建 StreamClient
- 创建 StreamClient 时应指定
streamclient.Option
(不是 client.Option) - 调用 Streaming API 时应指定
streamcall.Option
(不是 callopt.Option)
示例代码:
import "github.com/cloudwego/kitex/client/streamclient"
import "github.com/cloudwego/kitex/client/callopt/streamcall"
var streamClient = testservice.MustNewStreamClient(
"demo-server", // Service Name
streamclient.WithHostPorts("127.0.0.1:8888"), // streamclient.Option...
)
stream, err := streamClient.Echo(ctx, streamcall.WithHostPorts("127.0.0.1:8888"))
// business logic
Bidirectional Streaming API
注意:
-
请求双方应协商好关闭 Stream 的条件,否则可能导致双方都一直等待下去(goroutine 泄漏)
-
示例展示了全双工模式(Recv 和 Send 完全独立)
- 可按业务需求调整处理逻辑,例如(半双工模式)Server 总是在收到一个 Message 处理完再将结果发送给 Client
Server Handler
注意:
- method handler 结束后,Kitex 会写 Trailer Frame(等同于关闭 stream);业务代码不需要主动调用
stream.Close()
- 新启动的 goroutine 应当自行 recover
- 「Recv 返回
io.EOF
」表示 client 已发送结束
示例代码:kitex-examples:thrift_streaming/handler.go#L34
Stream Client
注意:
-
新启动的 goroutine 应当自行 recover
-
Client 发送结束后应及时调用 stream.Close() 告知 server
-
「Recv 返回
io.EOF
或其他 non-nil error」表示 server 已发送结束(或出错)- 此时 Kitex 才会记录 RPCFinish 事件(Tracer 依赖该事件)
- 如 client 和 server 约定了其他结束方式,应主动调用 streaming.FinishStream(stream, err) 记录 RPCFinish 事件
示例代码:kitex-examples:thrift_streaming/client/demo_client.go#L119
Server Streaming API
Server Handler
注意:method handler 结束后,Kitex 会写 Trailer Frame(等同于关闭 stream);业务代码不需要主动调用 stream.Close()
示例代码:kitex-examples:thrift_streaming/handler.go#L94
Stream Client
注意:「Recv 返回 io.EOF
或其他 non-nil error」表示 server 已发送结束(或出错)
- 此时 Kitex 才会记录 RPCFinish 事件(Tracer 依赖该事件)
- 如 client 和 server 约定了其他结束方式,应主动调用 streaming.FinishStream(stream, err) 记录 RPCFinish 事件
示例代码:kitex-examples:thrift_streaming/client/demo_client.go#L185
Client Streaming API
Server Handler
注意:「Recv 返回 io.EOF
」表示 client 已发送结束
示例代码:kitex-examples:thrift_streaming/handler.go#L82
Stream Client
示例代码:kitex-examples:thrift_streaming/client/demo_client.go#L162
Options
StreamClient Options
Kitex 在设计上区分了 Client(for KitexThrift PingPong API)和 StreamClient(for Streaming API),并且要求 StreamClient 使用另一套 Option(类型不同),避免用户给 StreamClient 指定了不支持的 Option。
注意:
- 如果某个 client/callopt Option 没有对应的 streamclient/streamcall Option(例如 WithRPCTimeout),说明 StreamClient 不支持该能力
- 如果你认为 StreamClient 应当支持该能力,可以给 Kitex 提 issue
streamclient.Option
- 在 NewStreamClient 时指定;
- 新增的 Option:
WithRecvMiddleware
、WithRecvMiddlewareBuilder
:详见 Recv/Send 中间件WithSendMiddleware
、WithSendMiddlewareBuilder
:详见 Recv/Send 中间件
示例代码:
import "github.com/cloudwego/kitex/client/streamclient"
var streamClient = testservice.MustNewStreamClient(
"demo-server", // Service Name
streamclient.WithHostPorts("127.0.0.1:8888"), // streamclient.Option...
)
streamcall.Option
- 在创建 Stream 时指定
- 优先级高于同名(如果有的话)的 streamclient.Option
示例代码:
import "github.com/cloudwego/kitex/client/callopt/streamcall"
stream, err := streamClient.Echo(
context.Background(),
streamcall.WithHostPort("127.0.0.1:8888"),
)
Server Options
由于 Server 支持自动探测协议,可以同时支持 Streaming API 和 KitexThrift API,因此无法像 StreamClient 一样使用不同的 Option 类型。
- 大部分 Server Option 对 Streaming API 也有效
- 对于不确定的 Option,请确保在验证有效后再部署到生产环境
- 对 Streaming API 无效的 Option 包括:
- WithReadWriteTimeout
- WithBoundHandler
- …
- 新增 Recv/Send Middleware 相关的几个 Option
- WithRecvMiddleware、WithRecvMiddlewareBuilder
- 详见后文「Recv/Send 中间件」
- WithSendMiddleware、WithSendMiddlewareBuilder
- 详见后文「Recv/Send 中间件」
- WithCompatibleMiddlewareForUnary
- 该 Option 主要是允许 gRPC/Protobuf Streaming 的 Unary API 使用与 PingPong API 相同的 Server Middleware(统一了入参)
- 对于 Thrift Streaming,用户无需关注(不推荐使用 Unary API,且 Kitex 已默认指定该 Option)
- WithRecvMiddleware、WithRecvMiddlewareBuilder
服务治理 | Governance
超时 | Timeout
连接超时
支持通过 option 指定:
- streamclient.WithConnectTimeout
- streamcall.WithConnectTimeout (优先级高于前者)
请求超时(不支持)
没有对应的 Option。 对于 Streaming API,Kitex 的 Timeout 中间件会直接调用 next。
Stream 超时
可通过 context.WithTimeout
或 context.WithDeadline
创建带有 Deadline 的 context,并在创建 Stream 时指定该 context,用于控制 Stream 的整体执行时间:
- Kitex Client
- 通过 header
grpc-timeout
发送给服务端 - 超时后 Recv/Send 会直接返回
rpc error: code = 4 desc = context deadline exceeded
- 通过 header
- Kitex Server
- 读取
grpc-timeout
并设置到 request context 中 - 超时后 Recv/Send 会直接返回
rpc error: code = 4 desc = context deadline exceeded
- 读取
示例代码:
// inject deadline into context BEFORE creating a stream
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
stream, err := cli.Echo(ctx)
Recv/Send 超时
可使用 Kitex 提供的 streaming.CallWithTimeout
方法。
注意:
-
需要 在创建 Stream 之前 给 ctx 注入 cancel(用 WithCancel 或 WithTimeout 都可以,取决于需求)
-
将 cancel 方法作为
streaming.CallWithTimeout
的第二个参数- 否则 Send/Recv 可能会长时间阻塞等待(取决于 server 端),导致 goroutine 泄漏
示例代码:
import "github.com/cloudwego/kitex/pkg/streaming"
// Add a cancel func to the context BEFORE creating a stream
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := cli.Echo(ctx)
if err != nil {
// ...
}
// Send with timeout
err = streaming.CallWithTimeout(time.Second, cancel, func() error {
return stream.Send(&test.Request{Message: "hello"})
})
// Recv with timeout
var resp *test.Response
err = streaming.CallWithTimeout(time.Second, cancel, func() (err error) {
resp, err = stream.Recv()
return err
})
熔断 | CircuitBreak
只支持 创建连接(Stream)时 的错误率熔断。 不支持 Recv/Send 的熔断。
重试 | Retry (不支持)
不支持重试。
Fallback (不支持)
Streaming API 不支持 fallback。
负载均衡 | LoadBalancer
- 仅支持创建 Stream 时(等同于创建网络连接)的负载均衡
- 如已经创建 Stream,后续的 Send/Recv 只会发往该 Stream 的对端
- 业务需自行处理流量倾斜问题,避免造成负载不均
一致性哈希
注意:由于在中间件中获取的 Request 总是 nil,因此 keyFunc 不能直接读取 request
参考方案:
- 在创建 Stream 前,先计算好 hashKey,放入 ctx 中
- 在 keyFunc 里从 ctx 读取并返回
示例代码:
streamClient := testservice.MustNewStreamClient(
"demo-server",
streamclient.WithLoadBalancer(loadbalance.NewConsistBalancer(
loadbalance.NewConsistentHashOption(
func(ctx context.Context, request interface{}) string {
hashKey, _ := ctx.Value("MY_HASH_KEY")
if hashKey == "" {
// if necessary, return a random string, but never an empty string
panic("invalid hashKey for consistent hash")
}
return hashKey
},
),
))
)
request := &echo.Request{Message: "hello"}
ctx := context.WithKey(context.background(), "MY_HASH_KEY", keyFunc(request))
stream, err := streamClient.Echo(ctx, request)
Server 端限流 | Limit (QPS/Concurrency)
- 支持在创建 Stream 时限流
- 创建 Stream 后对 Recv/Send 的调用无限制,需要业务自行实现
中间件 | Middleware
Client/Server 中间件
Client Middleware
说明:
- Client 中间件的执行仅覆盖「创建 Stream」的环节(详见后附示意图)
- Stream 创建完成、返回业务代码后,中间件就执行完成了
- request 总是 nil(包括 Server Streaming API)
- response 总是 *streaming.Result 类型,该类型包含的 Stream 最终会返回给业务代码
- 如有需要,可替换该 Stream,加入自定义逻辑(基于 decorator 模式)
- 如获取到的 err != nil,说明创建 Stream 失败
- Client 中间件无法获取到 Recv、Send 的 Message
执行流示意图:
注意:
- 下图为 Bidirectional API 流程(Client Streaming API 也类似)
- 但 Server Streaming API 略有不同,在生成代码(kitex_gen)里调用了 Send 及 Close,然后才返回业务代码
获取到的 Request/Response 字段类型
request | response | |
---|---|---|
Bidirectional | interface{} = nil | *streaming.Result |
Client Streaming | interface{} = nil | *streaming.Result |
Server Streaming | interface{} = nil | *streaming.Result |
注:
- Server Streaming API 请求也无法在中间件中读取 request,请使用 Client Send Middleware 获取
- Thrift Streaming Unary API 的参数类型和 PingPong API 一致
识别 Streaming/Non-Streaming 请求
Client middleware 可以通过 response 的类型来判断是否是 streaming 请求:
func clientMW(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, req, resp interface{}) (err error) {
if _, ok := resp.(*streaming.Result); ok {
// it's a streaming request
return next(ctx, req, resp)
} else {
// it's a non-streaming request
return next(ctx, req, resp)
}
}
}
Server Middleware
说明:
- Server 中间件的 next 方法里涵盖了整个 server handler 的处理过程
- request 总是 *streaming.Args 类型,该类型包含的 Stream 最终会返回给业务代码
- 如有需要,可替换该 Stream,加入自定义逻辑(基于 decorator 模式)
- response 总是 nil(包括 Client Streaming API)
- 如获取到的 err != nil,说明内层 Server 中间件或 handler 返回了 error
- 对于 Streaming API(不含Unary),Server 中间件无法获取到 Recv、Send 的 Message
执行流示意图
- Bidirectional Streaming API
- Server Streaming API
- Client Streaming API
获取到的 Request/Response 字段类型
request | response | |
---|---|---|
Bidirectional | *streaming.Args | interface{} = nil |
Client Streaming | *streaming.Args | interface{} = nil |
Server Streaming | *streaming.Args | interface{} = nil |
注意:
- Server Streaming API 请求也无法在中间件中读取 client request,请使用 Server Recv Middleware 获取
- Thrift Streaming Unary API 的参数类型和 PingPong API 一致
识别 Streaming/Non-Streaming 请求
Server middleware 应通过 request 参数的类型来判断是否是 Streaming 请求:
func serverMW(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, req, resp interface{}) (err error) {
if _, ok := req.(*streaming.Args); ok {
// it's a streaming request
return next(ctx, req, resp)
} else {
// it's a non-streaming request
return next(ctx, req, resp)
}
}
}
在 Client/Server 和 Recv/Send 中间件之间交换数据
我们可以通过给用于创建 Stream 的 ctx 注入 key,实现在 middleware 之间共享数据的能力。
Kitex 提供了一组简单的工具方法,通过给 ctx 注入一个 sync.Map ,以便在各 middleware 之间交换数据:
注意:因为 Kitex 内部经常需要从 ctx 读取信息(例如 RPCInfo),每注入一个 key 就增加读取链表的深度,会有一点性能损失,因此 Kitex 默认不注入该 key
Client 示例代码
在 Client Middleware 里,调用 next
之前尚未创建 Stream,因此可以通过往 ctx 里注入 map,再调用 next
,就可以在 Recv/Send middleware 里从 stream.Context()
获取,用于读写:
import "github.com/cloudwego/kitex/pkg/utils/contextmap"
streamClient = testservice.MustNewStreamClient(
"server",
streamclient.WithHostPorts("127.0.0.1:8888"),
streamclient.WithMiddleware(func(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, req, resp interface{}) (err error) {
// inject sync.Map in client middleware
ctx = contextmap.WithContextMap(ctx)
if m, ok := contextmap.GetContextMap(ctx); ok {
m.Store("hello", "world")
}
return next(ctx, req, resp)
}
}),
streamclient.WithRecvMiddleware(func(next endpoint.RecvEndpoint) endpoint.RecvEndpoint {
return func(stream streaming.Stream, resp interface{}) (err error) {
// get the map in Recv/Send middleware for read/write
if m, ok := contextmap.GetContextMap(stream.Context()); ok {
if value, ok := m.Load("hello"); ok {
klog.Infof("[Recv Middleware] hello = %v", value)
}
}
return next(stream, resp)
}
}),
)
Server 示例代码
Server 端与 Client 端不同,进入 Middleware 时已经创建好了 Stream,因此不能在 Server Middleware 里注入,但可以通过如下方式实现:
- 用
server.WithMetaHandler
Option 指定一个MetaHandler
- 该
MetaHandler
需要实现StreamingMetaHandler
接口 - 在该 handler 的
OnReadStream
里给 ctx 注入sync.Map
,返回新的 ctx
package main
import (
"context"
"demo/kitex_gen/echo/testservice"
"github.com/cloudwego/kitex/pkg/endpoint"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/remote"
"github.com/cloudwego/kitex/pkg/streaming"
"github.com/cloudwego/kitex/pkg/utils/contextmap"
"github.com/cloudwego/kitex/server"
)
var _ remote.MetaHandler = &serverMetaHandler{}
var _ remote.StreamingMetaHandler = &serverMetaHandler{}
type serverMetaHandler struct{}
func (s *serverMetaHandler) OnReadStream(ctx context.Context) (context.Context, error) {
// inject a sync.Map before creating a stream
return contextmap.WithContextMap(ctx), nil
}
func (s *serverMetaHandler) OnConnectStream(ctx context.Context) (context.Context, error) {
return ctx, nil // it's only used by the client side
}
func (s *serverMetaHandler) WriteMeta(ctx context.Context, msg remote.Message) (context.Context, error) {
return ctx, nil
}
func (s *serverMetaHandler) ReadMeta(ctx context.Context, msg remote.Message) (context.Context, error) {
return ctx, nil
}
func main() {
svr := testservice.NewServer(new(TestServiceImpl),
server.WithMetaHandler(&serverMetaHandler{}),
server.WithMiddleware(func(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, req, resp interface{}) (err error) {
if m, ok := contextmap.GetContextMap(stream.Context()); ok {
m.Store("hello", "world")
}
return next(ctx, req, resp)
}
}),
server.WithRecvMiddleware(func(next endpoint.RecvEndpoint) endpoint.RecvEndpoint {
return func(stream streaming.Stream, message interface{}) (err error) {
if m, ok := contextmap.GetContextMap(stream.Context()); ok {
if value, ok := m.Load("hello"); ok {
klog.Infof("hello = %v", value)
}
}
return next(stream, message)
}
}),
)
if err := svr.Run(); err != nil {
klog.Infof(err.Error())
}
}
元数据透传 | Metainfo
请参考:CloudWeGo 官网 - Kitex - Metainfo 的「gRPC Metadata」一节。
注意:
- metainfo
- Key 里不能包含小写字母和横线,例如 “Abc”, “A-B” 都是无效 key,会被丢弃
- metadata
- 必须使用 Kitex fork 的这个 pkg:
github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/metadata
- 必须使用 Kitex fork 的这个 pkg:
- header & trailer
- Key 不能包含大写字母,否则会导致客户端报错
可观测性 | Observability
基本埋点:RPCStart 和 RPCFinish
- 对于 Server/Bidirectional Streaming API,Kitex Client 将
Recv()
收到 non-nil error(io.EOF
或其他错误)作为流结束的标志,此时才会记录 RPCFinish 事件- 如果业务希望提前结束,应当调用
streaming.FinishStream(stream, err)
来产生 RPCFinish 事件
- 如果业务希望提前结束,应当调用
- 对于 Client Streaming API,Kitex Client 将在 CloseAndRecv() 方法返回前自动记录 RPCFinish 事件
Kitex 用户可以通过添加自己的 Tracer,在 Finish() 方法里处理该事件,详见 Kitex - 可观测性 - 链路追踪 - 自定义 Tracer
细粒度埋点:StreamSend 和 StreamRecv
如果自定义 Tracer 实现了 rpcinfo.StreamEventReporter 接口,Kitex 会注入 Recv、Send 中间件,在每次 Recv、Send 执行完后调用 tracer 的 ReportStreamEvent 方法; 在该方法里,可以获取到本次 Recv、Send 的消息大小:(注意不要另起 goroutine,否则可能读取到的不是本次调用)
- ri.Stats().LastSendSize()
- ri.Stats().LastRecvSize()
具体示例可参考:kitex-tests: testTracer
泛化调用 | Generic (暂未支持)
计划实现,但暂无明确时间表。
注意事项 | Attention
Send/Recv 操作的是「本地缓冲区」
gRPC/HTTP2 的实现基于「本地缓冲区」,Send 和 Recv 操作是直接在缓冲区上操作的。 因此需注意以下几点:
-
「Send 返回 nil」只表明消息放入了本地缓冲区,不等于「消息已发送到对端」
-
Send 和 Recv 操作的「耗时」和 PingPong API 的「Latency」含义有显著差别:
- 如果对端的 Recv 调用频率更高,那么本地的 Send 通常会立即返回(缓冲区总是有空闲)
- 如果对端的 Recv 调用频率更低,那么本地的 Send 可能会阻塞较长时间(等待对方消费腾出缓冲区)
- 如果缓冲区里有数据,Recv 调用会立刻返回,否则需要等待对端的 Send
Client/Server 中间件无法读到 Request/Response
设计如此。 对于 Client Streaming/Server Streaming,虽然形式上和 PingPong API 类似(有 Request/Response),但是底层实现完全不同,在中间件中是读不到的。 请使用 Recv/Send 中间件处理流上的消息。
采集 RPCFinish:client 请求 Server/Bidirectional API 需调用 Recv 收到 non-nil error
- 对于 Server/Bidirectional Streaming API,Kitex Client 将 Recv() 收到 non-nil error(
io.EOF
或其他错误)作为流结束的标志,此时才会记录 RPCFinish 事件- 如果业务希望提前结束,应当调用 streaming.FinishStream(stream, err) 来产生 RPCFinish 事件
- 对于 Client Streaming API,Kitex Client 将在 CloseAndRecv() 方法返回前自动记录 RPCFinish 事件
- StreamRecv/StreamSend 是在 Recv/Send 调用时实时触发的,不依赖 RPCFinish 事件
业务异常(BizStatusError):不会被 Client RecvMiddleware 直接感知
业务异常是自定义业务状态码,在框架的链路上不会认为是错误请求,会被封装到 rpcinfo 内部。 因此如果有业务异常,在 Client 的 RecvMiddleware 里 next 返回的 err 为 nil,可用如下代码读取业务异常:
bizErr := rpcinfo.GetRPCInfo(stream.Context()).Invocation().BizStatusErr()
Streaming over TTHeader(规划中)
预计相比 HTTP2 能显著提高性能。
FAQ
是否支持 Multi Service(单 Server 多 Service)?
支持,详见 CloudWeGo 官网文档:Kitex - 单 Server 多 Service。
Streaming over HTTP2 能否与 gRPC 的其他实现(尤其是其他语言)互通?
不能。 不过如果该 gRPC library 支持二进制泛化调用,可以结合 thrift 编解码器,发送thrift payload 实现互通。
注意: Kitex client 发送的 “content-type” header 值为 “application/grpc+thrift”