当前位置: 首页 > news >正文

fasthttp 的 server.Shutdown() 究竟能不能实现 graceful shutdown

文将通过源码阅读的方式,推导 fasthttp 实现 graceful shutdown 的细节。

1. 业务代码中的 graceful shutdown 实现方法

func main(){

// ...

// 容器退出前会先发送 SIGTERM 信号

sigs := make(chan os.Signal, 1)

signal.Notify(sigs, syscall.SIGHUP,

syscall.SIGINT,

syscall.SIGTERM,

syscall.SIGQUIT)

//

server := &fasthttp.Server{

Handler: httphandle.FastHTTPHandler,

}

go func() {

log.Info("Server start on", cfg.HTTP.Addr)

if err := server.ListenAndServe(cfg.HTTP.Addr); err != nil {

log.Fatal(err.Error())

}

log.Info("Http Server Ended")

}()

// server.CloseOnShutdown = true

<-sigs // 一旦收到信号,就开始进入退出流程

log.Info("Ready to shutdown...")

// 避免缓存中的日志没有刷到磁盘

log.Close()

_ = server.Shutdown() // 这一行是实现 graceful shutdown 的关键

}

2. Shutdown() 函数的实现细节

func (s *Server) ShutdownWithContext(ctx context.Context) (err error) {

s.mu.Lock()

defer s.mu.Unlock()

// stop 标志置 1

// stop 标志主要影响 tcp 长连接中,是否在一个连接上处理多次请求和响应后 —— 如果遇到服务关停的消息,如何决定长连接上的后续行为

atomic.StoreInt32(&s.stop, 1)

defer atomic.StoreInt32(&s.stop, 0)

if s.ln == nil {

return nil

}

// 所有监听的端口进行 close

// 关闭监听的端口后,意味着新的 tcp 连接将不再 accept

for _, ln := range s.ln {

if err = ln.Close(); err != nil {

return err

}

}

// done 是一个用于信号通知的 channel

// done 用于影响 RequestCtx 端的服务关停消息

// 例如:业务代码如果做很长时间的处理,可以通过 reqCtx.Done() 来判断服务是否开始关停

if s.done != nil {

close(s.done) // 调用后,业务代码中就可以通过 reqCtx.Done() 知道服务开始关停了

}

// Closing the listener will make Serve() call Stop on the worker pool.

// Setting .stop to 1 will make serveConn() break out of its loop.

// Now we just have to wait until all workers are done or timeout.

ticker := time.NewTicker(time.Millisecond * 100)

defer ticker.Stop()

END:

for {

// 主动关闭空闲连接

// 只有当前没有请求和响应的空闲长连接,才会被归类到 IdleConns

// 放入 idle 中的 tcp 连接只会等着被关闭,再也不会被拿出来统计

s.closeIdleConns()

// open 是一个计数器,代表了活跃的 tcp 连接数

if open := atomic.LoadInt32(&s.open); open == 0 {

// 如果所有的活跃的 tcp 连接都没有了,那么就退出这个等待循环。

// 如果没有任何活跃的 tcp 连接,也就意味着没有任何正在处理中的 request

// 这个是 graceful shutdown 的关键判断条件

// 只有当所有请求都处理完了,才会退出服务器。满足这个条件时,必然不会存在丢失用户请求的情况。

break

}

// This is not an optimal solution but using a sync.WaitGroup

// here causes data races as it's hard to prevent Add() to be called

// while Wait() is waiting.

select {

case <-ctx.Done(): // 外部是否触发了信号,导致 context 变成 done (通常都是 context.Background(),不会触发这个分支)

err = ctx.Err()

break END

case <-ticker.C: // 每 100 毫秒检查一次

continue

}

}

s.done = nil

s.ln = nil

return err

}

3. fasthttp 对活跃连接的处理细节

tcp 连接数的累加过程

func (s *Server) Serve(ln net.Listener) error {

//...

// Count our waiting to accept a connection as an open connection.

// This way we can't get into any weird state where just after accepting

// a connection Shutdown is called which reads open as 0 because it isn't

// incremented yet.

atomic.AddInt32(&s.open, 1)

defer atomic.AddInt32(&s.open, -1) // 端口不再监听,才会退出下面的循环

for {

// 处理新连接进入

if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil {

// 当 监听器 close 后,这里必然退出

wp.Stop()

if err == io.EOF {

return nil

}

return err

}

s.setState(c, StateNew)

atomic.AddInt32(&s.open, 1) // 每当来了一个新的 tcp 连接,活跃连接数 + 1

// ...

}

// ...

}

tcp 连接数的减少过程

func (s *Server) serveConnCleanup() {

atomic.AddInt32(&s.open, -1)

atomic.AddUint32(&s.concurrency, ^uint32(0))

}

// 这个时候已经建立链接了

// 这个函数负责从连接上收/发数据

func (s *Server) serveConn(c net.Conn) (err error) {

defer s.serveConnCleanup() // 一旦退出这个函数,活跃连接数 - 1

atomic.AddUint32(&s.concurrency, 1)

// ...

}

4. 阶段性总结

从以上的源码可以得到以下结论:

当发起服务的关停信号后,首先就是关闭监听端口。也就是说:关停期间,新的 tcp 将无法再连接进来。上游的代理服务器如果配置好,会因为 tcp 无法建立连接而转发给别的可用的后端。

Shutdown() 在一个循环中不断检查活跃的 tcp 连接数,直到连接数为 0 才退出循环

这样的逻辑必然导致:所有正在处理的请求一定全部处理完成后才会退出服务。

但是也存在另一种可能:

上游的代理服务器与当前的服务建立了长连接,然后不断在这个长连接上发送请求,导致服务永远不会 shutdown,直到 k8s 环境中触发 sigkill 强制杀掉容器。

可以推断出:如果一个 fasthttp 的服务在容器关闭时是因为 sigkill 而关闭的,那么它肯定没有做到 graceful shutdown.

5. fasthttp 中对于长连接的处理细节

func (s *Server) serveConn(c net.Conn) (err error) {

for {

// 1. 收数据

// 2. 解析 http 头

// 3. 调用用户配置的 callback handler

// 4. 检查是否有 connection close 标志

connectionClose = connectionClose ||

(s.MaxRequestsPerConn > 0 && connRequestNum >= uint64(s.MaxRequestsPerConn)) || // 长连接上允许的最多的往返请求数

ctx.Response.Header.ConnectionClose() || // http 协议里要求使用短连接

(s.CloseOnShutdown && atomic.LoadInt32(&s.stop) == 1) // 服务处于关停状态,且配置了 `一旦进入关停就不要再处理新连接了` 的标志

if connectionClose{

// 如果判断出这是一个需要关闭的连接,那么退出循环后就会把连接放到 Idel 中

break

}

// ...

// 5. 检查服务器关停的标志

if atomic.LoadInt32(&s.stop) == 1 {

err = nil

break // 如果调用 Shutdown(), 则循环就不会再处理新的请求了

}

}

}

这里会存在另一种极端情况:上游在长连接中再次发来请求,因为循环中未调用 Read() 来接收数据,因此请求留在了内核的 socket buffer 里。

这种情况只有依赖上游的负载均衡来解决。

总结

从流程来看,fasthttp 的 Shutdown() 能够保障所有正在处理的请求都处理完后再关闭服务。

http://www.cnnetsun.cn/news/111675.html

相关文章:

  • 2025零代码AE动画:Lottie-Web让Web动效轻松实现
  • 架构:不仅仅是建模,而是一种思维
  • Book118文档下载神器:Java工具帮你免费获取学习资料
  • PLabel图像标注工具极速上手指南
  • 10分钟掌握FunASR:流式语音识别从入门到部署的完整实战指南
  • 教师考评新方式:线上系统让评分变得更简单
  • Biotin-PEG-NH2/NHS/N3:结构、反应特性与应用场景的全面对比
  • DTLN噪声抑制实战指南:从原理到部署的全流程解析
  • 深入libgit2:从零开始构建跨平台Git库的完整指南
  • AI选岗工具提升求职效率200%
  • ReadCat跨平台阅读器:打造专属数字书房的全新体验
  • Zotero阅读进度管理终极指南:告别文献混乱的完整解决方案
  • WAN2.2AllInOne V5:重新定义AI视频生成的“极速创作时代“
  • 开源项目商业化实战:Continue如何构建技术价值与商业回报的完美闭环
  • Yuzu模拟器终极配置指南:从零到60帧的完整优化方案
  • 终极SonarQube代码质量报告自动化解决方案:企业级数据驱动决策指南
  • 开展性能测试步骤
  • Coze工作流实战:从踩坑到精通
  • JSON性能革命:RapidJSON如何用SIMD技术改写C++数据处理格局
  • ImageOptim跨版本兼容性全面解析:从macOS 10.13到最新系统的实战指南
  • Qwen3-30B-A3B-Instruct-2507:小参数激活大智慧的AI新范式
  • 打造极速构建体验:BuildKit配置文件深度调优实战
  • 从线上事故看 Java 系统的真实韧性:为什么它总能撑到最后一刻
  • AI Agent框架终极部署指南:从零到生产环境的完整路径
  • 前端性能优化终极指南:让文件转换体验如丝般顺滑
  • 3步彻底解决Dokploy中.traefik.me证书失效问题
  • MCP AI-102模型评估指标全曝光:为什么你的F1-score总是偏低?
  • 量子模拟器环境搭建陷阱与解决方案(90%新手都会犯的3个错误)
  • 【仅限专业人士】量子机器学习调试内幕(VSCode高级功能首次公开)
  • Monet色彩系统如何让Seal视频下载器实现完美的主题一致性