
使用Gin构建一个Websocket服务
代码 main.go
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"sync"
"time"
"github.com/gin-gonic/gin"
"github.com/go-redis/redis/v8"
"github.com/google/uuid"
"github.com/gorilla/websocket"
)
// WebSocket 升级器配置
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true // 允许跨域
},
}
// 统一消息结构(发送方使用)
type SendMessagePayload struct {
RequestType string `json:"request_type"` // 消息类型
TargetClientID string `json:"target_client_id"` // 目标客户端ID
SenderUserID string `json:"sender_user_id"` // 发送者用户ID
ReceiverUserID string `json:"receiver_user_id"` // 接收者用户ID
MessageType int `json:"message_type"` // 消息类型(0-6)
MessageContent string `json:"message_content"` // 消息内容
}
// 客户端接收消息结构(最终格式)
type ClientReceivedMessage struct {
SenderID string `json:"sender_id"` // 发送者连接ID
ReceiverID string `json:"receiver_id"` // 接收者连接ID
SenderUserID string `json:"sender_user_id"` // 发送者用户ID
ReceiverUserID string `json:"receiver_user_id"` // 接收者用户ID
MessageType int `json:"message_type"` // 消息类型(0-6)
Content string `json:"content"` // 消息内容
SendTime string `json:"send_time"` // 发送时间
}
// Redis 消息结构
type RedisMessage struct {
SenderNodeID string `json:"sender_node_id"` // 发送节点ID
ClientID string `json:"client_id"` // 目标客户端ID
Message string `json:"message"` // 消息内容
}
// 全局变量
var (
clients = make(map[string]*websocket.Conn) // 客户端连接映射
clientsMux sync.RWMutex // 连接映射的读写锁
redisCtx = context.Background() // Redis 上下文
nodeID string // 当前节点ID
redisCli *redis.Client // Redis 客户端
port string // 服务端口
)
func main() {
// 配置系统
configureSystem()
// 初始化Redis客户端
initRedisClient()
// 创建Gin路由器
router := gin.Default()
router.Use(gin.Recovery()) // 添加恢复中间件
// 注册路由
router.GET("/ws", handleWebSocket) // WebSocket连接端点
router.POST("/send", sendMessageHandler) // 消息发送端点
router.GET("/health", healthHandler) // 健康检查端点
// 输出启动信息
printStartupInfo()
// 启动Redis消息订阅
go subscribeToRedis()
// 启动HTTP服务
runServer(router)
}
// 配置系统环境
func configureSystem() {
nodeID = getEnv("NODE_ID", "local") // 获取节点ID,默认"local"
port = getEnv("PORT", "12080") // 获取端口,默认12080
log.SetPrefix(fmt.Sprintf("[Node:%s] ", nodeID)) // 设置日志前缀
log.SetFlags(log.LstdFlags | log.Lmicroseconds) // 设置日志格式
}
// 初始化Redis客户端
func initRedisClient() {
redisAddr := getEnv("REDIS_ADDR", "localhost:6379")
redisCli = redis.NewClient(&redis.Options{
Addr: redisAddr, // Redis地址
Password: "", // 无密码
DB: 0, // 默认DB
})
// 测试Redis连接
if err := checkRedisConnection(); err != nil {
log.Fatalf("❌ Redis连接失败: %v", err)
}
}
// 检查Redis连接
func checkRedisConnection() error {
_, err := redisCli.Ping(redisCtx).Result()
return err
}
// 输出启动信息
func printStartupInfo() {
hostname, _ := os.Hostname()
log.Printf("🚀 WebSocket服务启动: NodeID=%s", nodeID)
log.Printf("🌐 监听端口: %s", port)
log.Printf("📡 Redis地址: %s", getEnv("REDIS_ADDR", "localhost:6379"))
log.Printf("💻 主机: %s", hostname)
log.Printf("🕒 启动时间: %s", time.Now().Format("2006-01-02 15:04:05"))
log.Printf("🔔 支持消息类型: 0:文本 1:图片 2:音频 3:视频 4:处方 5:病例 6:视频通话")
log.Println("🔗 等待客户端连接...")
}
// 启动服务
func runServer(router *gin.Engine) {
addr := "0.0.0.0:" + port
log.Printf("🔌 服务地址: http://%s", addr)
log.Printf("🔌 WebSocket连接地址: ws://%s/ws", addr)
if err := router.Run(addr); err != nil {
log.Fatalf("❌ 服务启动失败: %v", err)
}
}
// 健康检查处理
func healthHandler(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{
"status": "ok",
"node": nodeID,
"time": time.Now().Format(time.RFC3339),
})
}
// WebSocket处理函数
func handleWebSocket(c *gin.Context) {
start := time.Now()
clientIP := c.ClientIP()
log.Printf("👤 客户端连接中: IP=%s", clientIP)
// 升级为WebSocket连接
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
log.Printf("⚠️ WebSocket升级失败: %v | ClientIP=%s", err, clientIP)
c.JSON(http.StatusBadRequest, gin.H{"error": "无法升级为WebSocket连接"})
return
}
defer conn.Close()
// 生成客户端ID
clientID := generateClientID()
log.Printf("✅ 客户端已连接: ClientID=%s | IP=%s | Duration=%s",
clientID, clientIP, time.Since(start))
// 将连接添加到客户端映射
addClient(clientID, conn)
defer removeClient(clientID)
// 发送客户端ID
if err := conn.WriteJSON(gin.H{"clientId": clientID}); err != nil {
log.Printf("⚠️ 发送客户端ID失败: %v | ClientID=%s", err, clientID)
return
}
// 设置心跳
conn.SetPongHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(60 * time.Second))
return nil
})
// 保持连接活跃
for {
messageType, message, err := conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
log.Printf("❌ 连接意外断开: %v | ClientID=%s", err, clientID)
} else {
log.Printf("⚠️ 连接正常关闭: ClientID=%s", clientID)
}
break
}
// 只处理文本消息
if messageType == websocket.TextMessage {
go handleClientMessage(clientID, message)
}
}
}
// 处理客户端消息
func handleClientMessage(senderID string, message []byte) {
log.Printf("📥 收到客户端消息: SenderID=%s | Size=%d bytes", senderID, len(message))
// 尝试解析为发送消息请求
var payload SendMessagePayload
if err := json.Unmarshal(message, &payload); err == nil && payload.RequestType != "" {
log.Printf("📦 解析JSON消息成功: Type=%s", payload.RequestType)
// 只处理发送消息请求
if payload.RequestType == "send_message" && payload.TargetClientID != "" {
log.Printf("📨 处理客户端发起的发送请求: TargetID=%s | MsgType=%d",
payload.TargetClientID, payload.MessageType)
// 构建客户端消息结构
clientMsg := ClientReceivedMessage{
SenderID: senderID,
ReceiverID: payload.TargetClientID,
SenderUserID: payload.SenderUserID,
ReceiverUserID: payload.ReceiverUserID,
MessageType: payload.MessageType,
Content: payload.MessageContent,
SendTime: time.Now().Format(time.RFC3339),
}
// 发送消息
sendMessageToClient(payload.TargetClientID, clientMsg)
return
}
}
// 尝试解析为已封装的消息结构
var clientMsg ClientReceivedMessage
if err := json.Unmarshal(message, &clientMsg); err == nil && clientMsg.ReceiverID != "" {
log.Printf("📨 处理客户端封装消息: TargetID=%s | MsgType=%d",
clientMsg.ReceiverID, clientMsg.MessageType)
// 设置发送者ID(避免客户端冒充)
clientMsg.SenderID = senderID
// 发送消息
sendMessageToClient(clientMsg.ReceiverID, clientMsg)
return
}
// 无法识别的消息格式
log.Printf("⚠️ 无法识别的消息格式: Size=%d bytes | Message=%s", len(message), string(message))
}
// 消息发送处理(HTTP API)
func sendMessageHandler(c *gin.Context) {
start := time.Now()
var payload SendMessagePayload
if err := c.ShouldBindJSON(&payload); err != nil {
log.Printf("⚠️ 无效的JSON请求格式: %v", err)
c.JSON(http.StatusBadRequest, gin.H{"error": "无效的JSON格式"})
return
}
if payload.RequestType == "" || payload.TargetClientID == "" ||
payload.SenderUserID == "" || payload.ReceiverUserID == "" {
log.Printf("⚠️ 缺少必要参数: request_type=%s target_client_id=%s sender_user_id=%s receiver_user_id=%s",
payload.RequestType, payload.TargetClientID, payload.SenderUserID, payload.ReceiverUserID)
c.JSON(http.StatusBadRequest, gin.H{"error": "缺少必要参数"})
return
}
// 只处理发送消息请求
if payload.RequestType == "send_message" {
log.Printf("📤 处理API发送请求: TargetID=%s | SenderUser=%s | MsgType=%d",
payload.TargetClientID, payload.SenderUserID, payload.MessageType)
// 构建客户端消息结构
clientMsg := ClientReceivedMessage{
SenderID: "system", // API消息标记为系统发送
ReceiverID: payload.TargetClientID,
SenderUserID: payload.SenderUserID,
ReceiverUserID: payload.ReceiverUserID,
MessageType: payload.MessageType,
Content: payload.MessageContent,
SendTime: time.Now().Format(time.RFC3339),
}
if err := sendMessageToClient(payload.TargetClientID, clientMsg); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"status": "消息已发送"})
log.Printf("✅ API请求完成: TargetID=%s | Duration=%s",
payload.TargetClientID, time.Since(start))
return
}
c.JSON(http.StatusBadRequest, gin.H{"error": "不支持的request_type"})
}
// 发送消息给客户端(核心函数)
func sendMessageToClient(clientID string, msg ClientReceivedMessage) error {
start := time.Now()
// 序列化客户端消息
msgJSON, err := json.Marshal(msg)
if err != nil {
log.Printf("❌ 消息序列化失败: %v", err)
return fmt.Errorf("内部错误")
}
// 检查目标客户端是否在当前节点
if conn := getClient(clientID); conn != nil {
log.Printf("📤 向本地客户端发送消息: ClientID=%s | MsgType=%d | Size=%d bytes",
clientID, msg.MessageType, len(msgJSON))
if err := conn.WriteMessage(websocket.TextMessage, msgJSON); err != nil {
log.Printf("❌ 发送消息失败: %v | ClientID=%s", err, clientID)
removeClient(clientID) // 移除失效连接
return fmt.Errorf("发送消息失败")
}
log.Printf("✅ 消息成功发送到本地客户端: ClientID=%s | Duration=%s",
clientID, time.Since(start))
return nil
}
// 如果目标客户端不在当前节点,则通过Redis转发
log.Printf("🌐 向远程节点转发消息: ClientID=%s", clientID)
redisMsg := RedisMessage{
SenderNodeID: nodeID,
ClientID: clientID,
Message: string(msgJSON), // 原始JSON字符串
}
redisMsgJSON, err := json.Marshal(redisMsg)
if err != nil {
log.Printf("❌ Redis消息序列化失败: %v", err)
return fmt.Errorf("内部错误")
}
// 发布到Redis的ws_messages频道
if err := redisCli.Publish(redisCtx, "ws_messages", redisMsgJSON).Err(); err != nil {
log.Printf("❌ 发布消息到Redis失败: %v", err)
return fmt.Errorf("无法转发消息")
}
log.Printf("📡 消息已转发到Redis: ClientID=%s | Size=%d bytes | Duration=%s",
clientID, len(redisMsgJSON), time.Since(start))
return nil
}
// 订阅Redis消息
func subscribeToRedis() {
pubsub := redisCli.Subscribe(redisCtx, "ws_messages")
defer pubsub.Close()
ch := pubsub.Channel()
log.Println("🔔 开始监听Redis消息...")
for msg := range ch {
start := time.Now()
var redisMsg RedisMessage
// 解析Redis消息
if err := json.Unmarshal([]byte(msg.Payload), &redisMsg); err != nil {
log.Printf("❌ 解析Redis消息失败: %v", err)
continue
}
log.Printf("📥 收到Redis消息: Sender=%s | ClientID=%s | Size=%d bytes",
redisMsg.SenderNodeID, redisMsg.ClientID, len(msg.Payload))
// 如果是本地节点发送的消息,忽略
if redisMsg.SenderNodeID == nodeID {
log.Println("ℹ️ 忽略本节点转发的消息")
continue
}
// 发送给本地客户端
if conn := getClient(redisMsg.ClientID); conn != nil {
// 直接使用Redis中的原始消息字符串
if err := conn.WriteMessage(websocket.TextMessage, []byte(redisMsg.Message)); err != nil {
log.Printf("❌ 处理Redis消息失败: %v | ClientID=%s", err, redisMsg.ClientID)
removeClient(redisMsg.ClientID)
continue
}
log.Printf("✅ Redis消息已处理: ClientID=%s | Duration=%s",
redisMsg.ClientID, time.Since(start))
} else {
log.Printf("⚠️ 目标客户端不在本节点: ClientID=%s", redisMsg.ClientID)
}
}
}
// 生成客户端ID (格式: <节点ID>-<UUID>)
func generateClientID() string {
return fmt.Sprintf("%s-%s", nodeID, uuid.New().String())
}
// 添加客户端到映射
func addClient(clientID string, conn *websocket.Conn) {
clientsMux.Lock()
defer clientsMux.Unlock()
clients[clientID] = conn
log.Printf("📌 添加客户端到连接池: ClientID=%s | 当前连接数=%d",
clientID, len(clients))
}
// 从映射中移除客户端
func removeClient(clientID string) {
clientsMux.Lock()
defer clientsMux.Unlock()
delete(clients, clientID)
log.Printf("🗑️ 从连接池移除客户端: ClientID=%s | 当前连接数=%d",
clientID, len(clients))
}
// 获取客户端连接
func getClient(clientID string) *websocket.Conn {
clientsMux.RLock()
defer clientsMux.RUnlock()
return clients[clientID]
}
// 获取环境变量值,如果不存在则使用默认值
func getEnv(key, defaultValue string) string {
if value := os.Getenv(key); value != "" {
return value
}
return defaultValue
}
启用后使用工具连接
ws://127.0.0.1:12080/ws
消息json结构
{
"request_type": "send_message",
"sender_id": "local-7b9a2e51-ebb0-4d18-ad07-322fe0f0aace",
"receiver_id": "local-d050091d-1543-43e4-b41c-ed5c17f59e5d",
"sender_user_id": "李二狗",
"receiver_user_id": "user67890",
"message_type": 1,
"content": "嗯嗯你好",
"send_time": "2025-06-27T14:30:25.123Z"
}
- 感谢你赐予我前进的力量
赞赏者名单
因为你们的支持让我意识到写文章的价值🙏
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 年糕崽崽
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果