当前位置: 首页 > news >正文

使用 Go 实现 SSE 流式推送 + 打字机效果(模拟 Coze Chat)

使用 Go 实现 SSE 流式推送 + 打字机效果(模拟 Coze Chat)

在开发实时聊天、AI 助手或者协作应用时,我们经常需要SSE(Server-Sent Events)实现服务端向前端持续推送数据。本文将分享一个Go SSE 打字机式输出实现,并附上上游模拟示例、curl 测试和前端实时渲染示例。


功能特点

  1. 使用SSE推送消息流,前端无需轮询。
  2. 对消息进行逐字符打字机式输出,模拟 AI 打字效果。
  3. 支持上游 SSE 模拟,方便本地测试。
  4. 可轻松扩展为真实 AI 聊天接口的代理服务。

技术栈

  • Go 1.21+
  • CloudWeGo Hertz 作为 HTTP 框架
  • resty 用于上游 SSE 请求
  • SSE 流式推送(使用hertz-contrib/sse

完整示例代码

packagemainimport("bufio""context""encoding/json""fmt""math/rand""strings""time""github.com/cloudwego/hertz/pkg/app""github.com/cloudwego/hertz/pkg/app/server""github.com/cloudwego/hertz/pkg/common/hlog""github.com/hertz-contrib/sse""resty.dev/v3")funcmain(){h:=server.Default(server.WithHostPorts(":8380"))h.POST("/v3/chat",CozeParseThenTypeWriter)h.GET("/upstream",MockUpstreamSSE)hlog.Info("🚀 Coze SSE parse + typewriter proxy running at :8380")h.Spin()}// 核心逻辑:解析 conversation.message.delta → 打字机式输出funcCozeParseThenTypeWriter(ctx context.Context,c*app.RequestContext){// SSE Headerc.SetStatusCode(200)h:=c.Response.Header h.Set("Content-Type","text/event-stream; charset=utf-8")h.Set("Cache-Control","no-cache, no-store, must-revalidate")h.Set("Pragma","no-cache")h.Set("Connection","keep-alive")h.Set("X-Accel-Buffering","no")stream:=sse.NewStream(c)// Resty 上游请求client:=resty.New().SetTimeout(0)resp,err:=client.R().SetContext(ctx).SetDoNotParseResponse(true).SetHeader("Accept","text/event-stream").Get("http://localhost:8380/upstream")iferr!=nil||resp.RawResponse==nil||resp.RawResponse.Body==nil{hlog.Error("upstream connect failed")return}deferresp.RawResponse.Body.Close()// 打字机准备r:=rand.New(rand.NewSource(time.Now().UnixNano()))scanner:=bufio.NewScanner(resp.RawResponse.Body)varcurrentEventstring// SSE 解析循环forscanner.Scan(){select{case<-ctx.Done():hlog.Warn("client disconnected")returndefault:}line:=scanner.Text()ifline==""{currentEvent=""continue}ifstrings.HasPrefix(line,"event:"){currentEvent=strings.TrimSpace(strings.TrimPrefix(line,"event:"))continue}ifstrings.HasPrefix(line,"data:"){payload:=strings.TrimSpace(strings.TrimPrefix(line,"data:"))ifpayload=="[DONE]"{stream.Publish(&sse.Event{Data:[]byte("[DONE]")})return}ifcurrentEvent=="conversation.message.delta"{vardstruct{Contentstring`json:"content"`}iferr:=json.Unmarshal([]byte(payload),&d);err==nil{typeWriter(stream,r,d.Content)}}ifcurrentEvent=="conversation.message.completed"{stream.Publish(&sse.Event{Event:"conversation.message.completed",Data:[]byte(`{"status":"completed"}`),})}}}}// 打字机逐字符输出functypeWriter(stream*sse.Stream,r*rand.Rand,textstring){fori,ch:=rangetext{time.Sleep(getSleepDuration(r,ch))data:=map[string]any{"id":fmt.Sprintf("char_%d_%d",i,time.Now().UnixNano()%100000),"role":"assistant","type":"answer","content":string(ch),"created_at":time.Now().UnixMilli(),}b,_:=json.Marshal(data)_=stream.Publish(&sse.Event{Event:"conversation.message.delta",ID:fmt.Sprintf("char_%d",i),Data:b,})}}// 延迟策略funcgetSleepDuration(r*rand.Rand,chrune)time.Duration{switch{casech=='\n'||ch=='。'||ch=='!'||ch=='?':returntime.Duration(300+r.Intn(200))*time.Millisecondcasech=='、'||ch==' '||ch=='-'||ch==':'||ch==',':returntime.Duration(150+r.Intn(100))*time.Millisecondcasech=='#'||ch=='*'||ch=='>':returntime.Duration(200+r.Intn(150))*time.Milliseconddefault:returntime.Duration(60+r.Intn(60))*time.Millisecond}}// 上游 Mock(模拟 Coze SSE)funcMockUpstreamSSE(ctx context.Context,c*app.RequestContext){c.SetStatusCode(200)c.Header("Content-Type","text/event-stream")c.Header("Cache-Control","no-cache")c.Header("Connection","keep-alive")c.Header("X-Accel-Buffering","no")c.Flush()send:=func(event,datastring){c.Write([]byte("event: "+event+"\n"+"data: "+data+"\n\n",))c.Flush()}deltas:=[]string{"你","好",",","这","是"," Coze"," SSE"}for_,ch:=rangedeltas{send("conversation.message.delta",`{"content":"`+ch+`"}`)time.Sleep(80*time.Millisecond)}send("conversation.message.completed",`{}`)c.Write([]byte("data: [DONE]\n\n"))c.Flush()}

使用方法

  1. 启动服务
go run main.go
  1. 访问接口
  • 上游 SSE 测试(浏览器可直接访问):
    http://localhost:8380/upstream

  • 打字机代理接口(POST):
    http://localhost:8380/v3/chat


使用curl测试 SSE

# 测试上游 SSEcurl-N http://localhost:8380/upstream# 测试打字机代理 SSEcurl-N -X POST http://localhost:8380/v3/chat

参数说明:

  • -N/--no-buffer:禁用输出缓存,实时显示流式数据。
  • -X POST:因为代理接口是 POST。

运行后,你会在终端看到类似打字机逐字符输出:

event: conversation.message.delta data: {"id":"char_0_12345","role":"assistant","type":"answer","content":"你","created_at":1700000000000} event: conversation.message.delta data: {"id":"char_1_67890","role":"assistant","type":"answer","content":"好","created_at":1700000000050} ... event: conversation.message.completed data: {"status":"completed"} data: [DONE]

前端实时渲染打字机效果示例

在前端,可以使用EventSource监听 SSE 并动态显示内容:

<divid="chat"></div><script>constchatDiv=document.getElementById("chat");constevtSource=newEventSource("http://localhost:8380/v3/chat");evtSource.addEventListener("conversation.message.delta",e=>{constdata=JSON.parse(e.data);chatDiv.innerHTML+=data.content;});evtSource.addEventListener("conversation.message.completed",e=>{console.log("消息完成");});evtSource.onopen=()=>console.log("连接已打开");evtSource.onerror=()=>console.log("连接错误或关闭");</script>

效果:消息逐字符显示,模拟 AI 打字机输出。


核心解析

  1. SSE Header 设置
h.Set("Content-Type","text/event-stream; charset=utf-8")h.Set("Cache-Control","no-cache, no-store, must-revalidate")h.Set("Connection","keep-alive")h.Set("X-Accel-Buffering","no")

保证浏览器或代理实时接收流式数据。

  1. 打字机效果

根据字符类型不同设置不同延迟:

casech=='\n'||ch=='。'||ch=='!'||ch=='?':returntime.Duration(300+r.Intn(200))*time.Millisecond
  1. 上游 SSE 模拟

方便本地测试,无需真实 AI 接口即可验证前端打字机效果。


总结

通过本文示例,你可以快速实现:

  • Go SSE 服务端代理
  • AI 聊天消息打字机式输出
  • 上游 SSE 模拟
  • curl 测试和前端实时渲染
http://www.cnnetsun.cn/news/131882.html

相关文章:

  • 元宇宙中的智能体交互难题:90%开发者忽略的延迟一致性陷阱
  • Unity内置着色器完整使用指南:从入门到精通
  • MCP与Azure量子协同容错设计,揭开企业级量子应用稳定性的秘密
  • 当JS拷贝玩起了“俄罗斯套娃”:深拷贝与浅拷贝的趣味对决
  • 北京创业省钱攻略!0 元注册公司不是梦
  • 行业标杆 | 越秀地产入选「2025年度数据湖仓应用创新先锋企业」
  • 云边 Agent 延迟优化全攻略(99%工程师忽略的底层机制曝光)
  • 【物流运输Agent路线优化】:揭秘智能路径调整背后的算法黑科技
  • 终于搞懂了!React Agent的实现思路原来这么简单,核心就这3步!
  • FLORIS风电场仿真终极指南:10个核心技巧快速掌握工程尾流模型
  • 工业HMI常见故障及排查方法:黑屏、通信中断、操作无响应,新手别慌
  • 55页PDF信息系统项目管理师考神笔记
  • 工业HMI的发展趋势:智能化、一体化、移动化,新手该怎么跟上?
  • 【高频卫星信号处理突破】:3步实现Agent端低延迟响应
  • 联想拯救者BIOS隐藏功能一键解锁:3分钟搞定高级设置终极教程
  • 从功能测试到自动化转型:测试工程师的进阶之路
  • 测试与开发的协同工作模式:从对立到共生的范式演进
  • VCAM安卓虚拟相机:打造你的专属视频魔法师
  • 抢占本地生活服务市场 开源可定制的跑腿平台源码推荐
  • 【物流量子 Agent 成本优化】:揭秘降低运营开支的5大核心技术策略
  • 企业微信Webhook Java SDK:从零构建智能消息推送系统
  • 跨境独立站+海淘代购运营指南
  • OPC-UA客户端终极实战指南:从零掌握工业数据交互
  • 从对话到共情,元宇宙Agent如何实现自然交互?
  • Windows与Linux环境兼容难题,MCP PL-600 Agent如何无缝部署?
  • (AZ-500云Agent深度研究):从注册流程到策略执行的完整技术链剖析
  • CanOpenSTM32深度解析:STM32微控制器的CANopen协议栈架构揭秘
  • 深入NVIDIA Nemotron-3:高效准确的技术、工具与数据深度解析
  • 如何用LD2410雷达传感器实现3种智能人体检测方案
  • HTML极致压缩完整指南:如何快速提升网站加载速度