代码 main.go

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"os"
	"sync"
	"time"

	"github.com/gin-gonic/gin"
	"github.com/go-redis/redis/v8"
	"github.com/google/uuid"
	"github.com/gorilla/websocket"
)

// WebSocket 升级器配置
var upgrader = websocket.Upgrader{
	ReadBufferSize:  1024,
	WriteBufferSize: 1024,
	CheckOrigin: func(r *http.Request) bool {
		return true // 允许跨域
	},
}

// 统一消息结构(发送方使用)
type SendMessagePayload struct {
	RequestType    string `json:"request_type"`     // 消息类型
	TargetClientID string `json:"target_client_id"` // 目标客户端ID
	SenderUserID   string `json:"sender_user_id"`   // 发送者用户ID
	ReceiverUserID string `json:"receiver_user_id"` // 接收者用户ID
	MessageType    int    `json:"message_type"`     // 消息类型(0-6)
	MessageContent string `json:"message_content"`  // 消息内容
}

// 客户端接收消息结构(最终格式)
type ClientReceivedMessage struct {
	SenderID       string `json:"sender_id"`        // 发送者连接ID
	ReceiverID     string `json:"receiver_id"`      // 接收者连接ID
	SenderUserID   string `json:"sender_user_id"`   // 发送者用户ID
	ReceiverUserID string `json:"receiver_user_id"` // 接收者用户ID
	MessageType    int    `json:"message_type"`     // 消息类型(0-6)
	Content        string `json:"content"`          // 消息内容
	SendTime       string `json:"send_time"`        // 发送时间
}

// Redis 消息结构
type RedisMessage struct {
	SenderNodeID string `json:"sender_node_id"` // 发送节点ID
	ClientID     string `json:"client_id"`      // 目标客户端ID
	Message      string `json:"message"`        // 消息内容
}

// 全局变量
var (
	clients    = make(map[string]*websocket.Conn) // 客户端连接映射
	clientsMux sync.RWMutex                       // 连接映射的读写锁
	redisCtx   = context.Background()             // Redis 上下文
	nodeID     string                             // 当前节点ID
	redisCli   *redis.Client                      // Redis 客户端
	port       string                             // 服务端口
)

func main() {
	// 配置系统
	configureSystem()

	// 初始化Redis客户端
	initRedisClient()

	// 创建Gin路由器
	router := gin.Default()
	router.Use(gin.Recovery()) // 添加恢复中间件

	// 注册路由
	router.GET("/ws", handleWebSocket)       // WebSocket连接端点
	router.POST("/send", sendMessageHandler) // 消息发送端点
	router.GET("/health", healthHandler)     // 健康检查端点

	// 输出启动信息
	printStartupInfo()

	// 启动Redis消息订阅
	go subscribeToRedis()

	// 启动HTTP服务
	runServer(router)
}

// 配置系统环境
func configureSystem() {
	nodeID = getEnv("NODE_ID", "local")              // 获取节点ID,默认"local"
	port = getEnv("PORT", "12080")                   // 获取端口,默认12080
	log.SetPrefix(fmt.Sprintf("[Node:%s] ", nodeID)) // 设置日志前缀
	log.SetFlags(log.LstdFlags | log.Lmicroseconds)  // 设置日志格式
}

// 初始化Redis客户端
func initRedisClient() {
	redisAddr := getEnv("REDIS_ADDR", "localhost:6379")
	redisCli = redis.NewClient(&redis.Options{
		Addr:     redisAddr, // Redis地址
		Password: "",        // 无密码
		DB:       0,         // 默认DB
	})

	// 测试Redis连接
	if err := checkRedisConnection(); err != nil {
		log.Fatalf("❌ Redis连接失败: %v", err)
	}
}

// 检查Redis连接
func checkRedisConnection() error {
	_, err := redisCli.Ping(redisCtx).Result()
	return err
}

// 输出启动信息
func printStartupInfo() {
	hostname, _ := os.Hostname()
	log.Printf("🚀 WebSocket服务启动: NodeID=%s", nodeID)
	log.Printf("🌐 监听端口: %s", port)
	log.Printf("📡 Redis地址: %s", getEnv("REDIS_ADDR", "localhost:6379"))
	log.Printf("💻 主机: %s", hostname)
	log.Printf("🕒 启动时间: %s", time.Now().Format("2006-01-02 15:04:05"))
	log.Printf("🔔 支持消息类型: 0:文本 1:图片 2:音频 3:视频 4:处方 5:病例 6:视频通话")
	log.Println("🔗 等待客户端连接...")
}

// 启动服务
func runServer(router *gin.Engine) {
	addr := "0.0.0.0:" + port
	log.Printf("🔌 服务地址: http://%s", addr)
	log.Printf("🔌 WebSocket连接地址: ws://%s/ws", addr)

	if err := router.Run(addr); err != nil {
		log.Fatalf("❌ 服务启动失败: %v", err)
	}
}

// 健康检查处理
func healthHandler(c *gin.Context) {
	c.JSON(http.StatusOK, gin.H{
		"status": "ok",
		"node":   nodeID,
		"time":   time.Now().Format(time.RFC3339),
	})
}

// WebSocket处理函数
func handleWebSocket(c *gin.Context) {
	start := time.Now()
	clientIP := c.ClientIP()

	log.Printf("👤 客户端连接中: IP=%s", clientIP)

	// 升级为WebSocket连接
	conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
	if err != nil {
		log.Printf("⚠️ WebSocket升级失败: %v | ClientIP=%s", err, clientIP)
		c.JSON(http.StatusBadRequest, gin.H{"error": "无法升级为WebSocket连接"})
		return
	}

	defer conn.Close()

	// 生成客户端ID
	clientID := generateClientID()
	log.Printf("✅ 客户端已连接: ClientID=%s | IP=%s | Duration=%s",
		clientID, clientIP, time.Since(start))

	// 将连接添加到客户端映射
	addClient(clientID, conn)
	defer removeClient(clientID)

	// 发送客户端ID
	if err := conn.WriteJSON(gin.H{"clientId": clientID}); err != nil {
		log.Printf("⚠️ 发送客户端ID失败: %v | ClientID=%s", err, clientID)
		return
	}

	// 设置心跳
	conn.SetPongHandler(func(string) error {
		conn.SetReadDeadline(time.Now().Add(60 * time.Second))
		return nil
	})

	// 保持连接活跃
	for {
		messageType, message, err := conn.ReadMessage()
		if err != nil {
			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
				log.Printf("❌ 连接意外断开: %v | ClientID=%s", err, clientID)
			} else {
				log.Printf("⚠️ 连接正常关闭: ClientID=%s", clientID)
			}
			break
		}

		// 只处理文本消息
		if messageType == websocket.TextMessage {
			go handleClientMessage(clientID, message)
		}
	}
}

// 处理客户端消息
func handleClientMessage(senderID string, message []byte) {
	log.Printf("📥 收到客户端消息: SenderID=%s | Size=%d bytes", senderID, len(message))

	// 尝试解析为发送消息请求
	var payload SendMessagePayload
	if err := json.Unmarshal(message, &payload); err == nil && payload.RequestType != "" {
		log.Printf("📦 解析JSON消息成功: Type=%s", payload.RequestType)

		// 只处理发送消息请求
		if payload.RequestType == "send_message" && payload.TargetClientID != "" {
			log.Printf("📨 处理客户端发起的发送请求: TargetID=%s | MsgType=%d",
				payload.TargetClientID, payload.MessageType)

			// 构建客户端消息结构
			clientMsg := ClientReceivedMessage{
				SenderID:       senderID,
				ReceiverID:     payload.TargetClientID,
				SenderUserID:   payload.SenderUserID,
				ReceiverUserID: payload.ReceiverUserID,
				MessageType:    payload.MessageType,
				Content:        payload.MessageContent,
				SendTime:       time.Now().Format(time.RFC3339),
			}

			// 发送消息
			sendMessageToClient(payload.TargetClientID, clientMsg)
			return
		}
	}

	// 尝试解析为已封装的消息结构
	var clientMsg ClientReceivedMessage
	if err := json.Unmarshal(message, &clientMsg); err == nil && clientMsg.ReceiverID != "" {
		log.Printf("📨 处理客户端封装消息: TargetID=%s | MsgType=%d",
			clientMsg.ReceiverID, clientMsg.MessageType)

		// 设置发送者ID(避免客户端冒充)
		clientMsg.SenderID = senderID

		// 发送消息
		sendMessageToClient(clientMsg.ReceiverID, clientMsg)
		return
	}

	// 无法识别的消息格式
	log.Printf("⚠️ 无法识别的消息格式: Size=%d bytes | Message=%s", len(message), string(message))
}

// 消息发送处理(HTTP API)
func sendMessageHandler(c *gin.Context) {
	start := time.Now()

	var payload SendMessagePayload
	if err := c.ShouldBindJSON(&payload); err != nil {
		log.Printf("⚠️ 无效的JSON请求格式: %v", err)
		c.JSON(http.StatusBadRequest, gin.H{"error": "无效的JSON格式"})
		return
	}

	if payload.RequestType == "" || payload.TargetClientID == "" ||
		payload.SenderUserID == "" || payload.ReceiverUserID == "" {
		log.Printf("⚠️ 缺少必要参数: request_type=%s target_client_id=%s sender_user_id=%s receiver_user_id=%s",
			payload.RequestType, payload.TargetClientID, payload.SenderUserID, payload.ReceiverUserID)
		c.JSON(http.StatusBadRequest, gin.H{"error": "缺少必要参数"})
		return
	}

	// 只处理发送消息请求
	if payload.RequestType == "send_message" {
		log.Printf("📤 处理API发送请求: TargetID=%s | SenderUser=%s | MsgType=%d",
			payload.TargetClientID, payload.SenderUserID, payload.MessageType)

		// 构建客户端消息结构
		clientMsg := ClientReceivedMessage{
			SenderID:       "system", // API消息标记为系统发送
			ReceiverID:     payload.TargetClientID,
			SenderUserID:   payload.SenderUserID,
			ReceiverUserID: payload.ReceiverUserID,
			MessageType:    payload.MessageType,
			Content:        payload.MessageContent,
			SendTime:       time.Now().Format(time.RFC3339),
		}

		if err := sendMessageToClient(payload.TargetClientID, clientMsg); err != nil {
			c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
			return
		}

		c.JSON(http.StatusOK, gin.H{"status": "消息已发送"})
		log.Printf("✅ API请求完成: TargetID=%s | Duration=%s",
			payload.TargetClientID, time.Since(start))
		return
	}

	c.JSON(http.StatusBadRequest, gin.H{"error": "不支持的request_type"})
}

// 发送消息给客户端(核心函数)
func sendMessageToClient(clientID string, msg ClientReceivedMessage) error {
	start := time.Now()

	// 序列化客户端消息
	msgJSON, err := json.Marshal(msg)
	if err != nil {
		log.Printf("❌ 消息序列化失败: %v", err)
		return fmt.Errorf("内部错误")
	}

	// 检查目标客户端是否在当前节点
	if conn := getClient(clientID); conn != nil {
		log.Printf("📤 向本地客户端发送消息: ClientID=%s | MsgType=%d | Size=%d bytes",
			clientID, msg.MessageType, len(msgJSON))

		if err := conn.WriteMessage(websocket.TextMessage, msgJSON); err != nil {
			log.Printf("❌ 发送消息失败: %v | ClientID=%s", err, clientID)
			removeClient(clientID) // 移除失效连接
			return fmt.Errorf("发送消息失败")
		}

		log.Printf("✅ 消息成功发送到本地客户端: ClientID=%s | Duration=%s",
			clientID, time.Since(start))
		return nil
	}

	// 如果目标客户端不在当前节点,则通过Redis转发
	log.Printf("🌐 向远程节点转发消息: ClientID=%s", clientID)

	redisMsg := RedisMessage{
		SenderNodeID: nodeID,
		ClientID:     clientID,
		Message:      string(msgJSON), // 原始JSON字符串
	}

	redisMsgJSON, err := json.Marshal(redisMsg)
	if err != nil {
		log.Printf("❌ Redis消息序列化失败: %v", err)
		return fmt.Errorf("内部错误")
	}

	// 发布到Redis的ws_messages频道
	if err := redisCli.Publish(redisCtx, "ws_messages", redisMsgJSON).Err(); err != nil {
		log.Printf("❌ 发布消息到Redis失败: %v", err)
		return fmt.Errorf("无法转发消息")
	}

	log.Printf("📡 消息已转发到Redis: ClientID=%s | Size=%d bytes | Duration=%s",
		clientID, len(redisMsgJSON), time.Since(start))
	return nil
}

// 订阅Redis消息
func subscribeToRedis() {
	pubsub := redisCli.Subscribe(redisCtx, "ws_messages")
	defer pubsub.Close()

	ch := pubsub.Channel()

	log.Println("🔔 开始监听Redis消息...")

	for msg := range ch {
		start := time.Now()
		var redisMsg RedisMessage

		// 解析Redis消息
		if err := json.Unmarshal([]byte(msg.Payload), &redisMsg); err != nil {
			log.Printf("❌ 解析Redis消息失败: %v", err)
			continue
		}

		log.Printf("📥 收到Redis消息: Sender=%s | ClientID=%s | Size=%d bytes",
			redisMsg.SenderNodeID, redisMsg.ClientID, len(msg.Payload))

		// 如果是本地节点发送的消息,忽略
		if redisMsg.SenderNodeID == nodeID {
			log.Println("ℹ️ 忽略本节点转发的消息")
			continue
		}

		// 发送给本地客户端
		if conn := getClient(redisMsg.ClientID); conn != nil {
			// 直接使用Redis中的原始消息字符串
			if err := conn.WriteMessage(websocket.TextMessage, []byte(redisMsg.Message)); err != nil {
				log.Printf("❌ 处理Redis消息失败: %v | ClientID=%s", err, redisMsg.ClientID)
				removeClient(redisMsg.ClientID)
				continue
			}

			log.Printf("✅ Redis消息已处理: ClientID=%s | Duration=%s",
				redisMsg.ClientID, time.Since(start))
		} else {
			log.Printf("⚠️ 目标客户端不在本节点: ClientID=%s", redisMsg.ClientID)
		}
	}
}

// 生成客户端ID (格式: <节点ID>-<UUID>)
func generateClientID() string {
	return fmt.Sprintf("%s-%s", nodeID, uuid.New().String())
}

// 添加客户端到映射
func addClient(clientID string, conn *websocket.Conn) {
	clientsMux.Lock()
	defer clientsMux.Unlock()
	clients[clientID] = conn
	log.Printf("📌 添加客户端到连接池: ClientID=%s | 当前连接数=%d",
		clientID, len(clients))
}

// 从映射中移除客户端
func removeClient(clientID string) {
	clientsMux.Lock()
	defer clientsMux.Unlock()
	delete(clients, clientID)
	log.Printf("🗑️ 从连接池移除客户端: ClientID=%s | 当前连接数=%d",
		clientID, len(clients))
}

// 获取客户端连接
func getClient(clientID string) *websocket.Conn {
	clientsMux.RLock()
	defer clientsMux.RUnlock()
	return clients[clientID]
}

// 获取环境变量值,如果不存在则使用默认值
func getEnv(key, defaultValue string) string {
	if value := os.Getenv(key); value != "" {
		return value
	}
	return defaultValue
}

启用后使用工具连接


ws://127.0.0.1:12080/ws

消息json结构

{
  "request_type": "send_message",
  "sender_id": "local-7b9a2e51-ebb0-4d18-ad07-322fe0f0aace",
  "receiver_id": "local-d050091d-1543-43e4-b41c-ed5c17f59e5d",
  "sender_user_id": "李二狗",
  "receiver_user_id": "user67890",
  "message_type": 1,
  "content": "嗯嗯你好",
  "send_time": "2025-06-27T14:30:25.123Z"
}