异步链路

同步链路
用户请求 → 应用 → Redis (改) → MySQL (同步改,阻塞) → 返回
异步链路
用户请求 → 应用 → Redis (改) → MQ (发消息) → 立即返回
              ↓
          消费服务 → MySQL (异步落库)
本质削峰填谷

需注意的风险

缓存与数据库短暂不一致:Redis 已更新,DB 还没消费完成,短时间查 DB 是旧数据(业务要容忍)
消息队列丢消息 → 缓存和 DB 永久不一致(需做消息重试、死信队列、对账补偿)
Redis 宕机 → 热点数据丢失(要开 RDB/AOF 持久化)

NIST FF1 格式保留加密(FPE) 对趋势递增 ID 进行可逆混淆

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package main

import (
"encoding/hex"
"fmt"
"math/big"

"github.com/bwmarrin/snowflake"
"github.com/capitalone/fpe/ff1"
)

const (
// AES-256 key(32字节)
keyHex = "0123456789ABCDEF0123456789ABCDEF"
// 随便一个 tweak,固定即可
tweakHex = "SAFEGUARD-ID-TWEAK"
// 十进制,固定19位(刚好覆盖 uint64)
radix = 10
dlen = 19
)

var (
snow *snowflake.Node
ff ff1.Cipher
)

func init() {
// 1. snowflake
var err error
snow, err = snowflake.NewNode(1)
if err != nil {
panic(err)
}

// 2. FF1 cipher
key, _ := hex.DecodeString(keyHex)
tweak := []byte(tweakHex)
// 参数:radix, maxTweakLen, key, tweak
ff, err = ff1.NewCipher(radix, len(tweak), key, tweak)
if err != nil {
panic(err)
}
}

func encryptID(u uint64) (uint64, error) {
// 补零到19位
s := fmt.Sprintf("%019d", u)
out, err := ff.Encrypt(s)
if err != nil {
return 0, err
}
num, ok := new(big.Int).SetString(out, 10)
if !ok {
return 0, fmt.Errorf("bad num")
}
return num.Uint64(), nil
}

func decryptID(u uint64) (uint64, error) {
s := fmt.Sprintf("%019d", u)
out, err := ff.Decrypt(s)
if err != nil {
return 0, err
}
num, ok := new(big.Int).SetString(out, 10)
if !ok {
return 0, fmt.Errorf("bad num")
}
return num.Uint64(), nil
}

func main() {
raw := snow.Generate().Uint64()
enc, _ := encryptID(raw)
dec, _ := decryptID(enc)

fmt.Println("raw:", raw)
fmt.Println("enc:", enc)
fmt.Println("dec:", dec)
fmt.Println("ok:", dec == raw)
}

【uint64 最大 18446744073709551615 → 十进制 19 位,所以 FF1 必须固定 19 位数字】

针对用户点赞/取消点赞的并发竞态问题,基于 Redis SETNX 实现分布式锁,消除重复计数 bug

通过redis锁将用户是否点赞设置为是 点赞计数增加 两者绑定为一次事务

eino库vs go原生

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//原生
package main

import (
"bytes"
"encoding/json"
"fmt"
"net/http"
)

func main() {
// 1. 手动拼请求体
reqBody := map[string]any{
"model": "gpt-3.5-turbo",
"messages": []map[string]string{
{"role": "user", "content": "你好"},
},
}
jsonData, _ := json.Marshal(reqBody)

// 2. 手动创建请求
req, _ := http.NewRequest("POST", "https://api.openai.com/v1/chat/completions", bytes.NewBuffer(jsonData))
req.Header.Set("Authorization", "Bearer YOUR_API_KEY")
req.Header.Set("Content-Type", "application/json")

// 3. 发送请求
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()

// 4. 手动解析响应
var result map[string]any
json.NewDecoder(resp.Body).Decode(&result)
fmt.Println(result)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//eino
package main

import (
"context"
"fmt"
"github.com/Cloudtogo/eino/llms/openai"
)

func main() {
// 1. 一行创建客户端
llm, _ := openai.NewLLM(openai.WithAPIKey("YOUR_API_KEY"))

// 2. 一行调用,自动处理所有格式
msg := llm.Chat(context.Background(), "你好")

fmt.Println(msg.Content)
}

eino RAG

标准 RAG 流程:

1
2
文档 → 加载 → 切分 → 向量化 → 向量库
用户问题 → 向量化 → 检索(向量+标量)→ 重排 → 拼上下文 → LLM → 回答

1)初始化组件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import (
"context"
"github.com/Cloudtogo/eino/schema"
"github.com/Cloudtogo/eino/llms/openai"
"github.com/Cloudtogo/eino/embedding"
"github.com/Cloudtogo/eino/indexer/milvus"
"github.com/Cloudtogo/eino/retriever"
"github.com/Cloudtogo/eino/splitter"
)

ctx := context.Background()

// 1. 嵌入模型(向量化)
embedder, _ := openai.NewEmbedder(openai.WithAPIKey("sk-xxx"))

// 2. 文档切分器(按 Markdown 标题切)
spl := splitter.NewMarkdownSplitter(500, 100) // 块大小、重叠

// 3. Milvus 索引(入库)
idx, _ := milvus.NewIndexer(
milvus.WithAddr("localhost:19530"),
milvus.WithCollection("rag_demo"),
milvus.WithEmbedder(embedder),
)

// 4. 检索器
ret, _ := retriever.NewRetriever(
retriever.WithIndexer(idx),
retriever.WithTopK(3),
)

// 5. LLM
llm, _ := openai.NewLLM(openai.WithAPIKey("sk-xxx"))

2)建库:加载→切→向量化→入库

1
2
3
4
5
6
7
8
9
// 加载本地文件
loader := schema.NewFileLoader("./docs/")
docs, _ := loader.Load(ctx)

// 切分
chunks := spl.SplitDocuments(docs)

// 入库
_ = idx.StoreDocuments(ctx, chunks)

3)问答:检索→拼prompt→生成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
query := "Eino RAG 怎么实现?"

// 检索
results, _ := ret.Retrieve(ctx, query)

// 拼上下文
var context string
for i, d := range results {
context += fmt.Sprintf("【参考%d】%s\n", i+1, d.Content)
}

// 拼消息
messages := []*schema.Message{
schema.SystemMessage("基于参考资料回答,不要编造。"),
schema.UserMessage(fmt.Sprintf("参考:\n%s\n问题:%s", context, query)),
}

// 调用 LLM
resp, _ := llm.Generate(ctx, messages)
fmt.Println(resp.Content)

4)用 Chain 编排

1
2
3
4
5
6
7
8
9
10
11
12
import "github.com/Cloudtogo/eino/compose"

chain := compose.NewChain[string, *schema.Message]()
chain.AppendRetriever(ret) // 输入:string(query) → 输出:[]*schema.Document
chain.AppendLambda(func(ctx context.Context, docs []*schema.Document) ([]*schema.Message, error) {
// 拼上下文+消息
return messages, nil
})
chain.AppendChatModel(llm)

runnable, _ := chain.Compile(ctx)
answer, _ := runnable.Invoke(ctx, "Eino RAG 怎么实现?")

Eino 支持

  1. 混合检索(Hybrid Search)
    向量相似度 + 关键词检索 + 元数据过滤(如 source=xxx.md
  2. 重排(Rerank)
    BGE-Reranker/CrossEncoder,TopN 召回→TopK 精排,效果提升明显
  3. 语义切分
    不要硬切 500 字,用 Markdown/标题/语义切分,避免逻辑断裂
  4. 流式输出
    ChatStream 逐字返回,提升体验
  5. 可观测性
    全链路 callback:加载/切分/嵌入/检索/生成的耗时、错误、日志

ES替换milvus

Eino 原生支持 ES 做 Indexer/Retriever,替换 Milvus 很方便
可以,但要看数据规模与性能诉求;Eino 原生支持 ES 做 Indexer/Retriever,替换 Milvus 很方便。下面分:能不能换、差别在哪、Eino 怎么用 ES、什么时候不建议换。


一、能不能直接替换?

  • Elasticsearch 8.x 原生支持 dense_vector + HNSW,可存向量、做 ANN 相似性检索。
  • Eino 官方提供 ES7/ES8 的 Indexer 和 Retriever 组件,和 Milvus 一样是标准接口,上层 RAG 链路不用改。
  • 一句话:Milvus ↔ ES,在 Eino 里只是换个 Indexer/Retriever 实现

二、Milvus vs ES:核心差别(决定你能不能换)

1)定位不同

  • Milvus专用向量数据库,为高维向量相似搜索而生(C++ 内核)。
  • ES全文搜索引擎+向量扩展(Java/Lucene),主打文本检索,向量是附加能力。

2)性能差距(关键)

  • 小数据(<10w):ES 够用,延迟差不大。
  • 大数据(>10w,尤其百万/亿级)
    • Milvus:P99 延迟 50–150ms,QPS 高 4–5 倍。
    • ES:P99 200–500ms,数据越大慢得越明显(可慢 3–10 倍)。
    • 原因:ES 向量索引基于 Lucene,JVM 内存开销大、优化不如专用库。

3)资源开销

  • Milvus:内存效率高,IVF_PQ 压缩可省 70–80% 存储。
  • ES:吃内存/CPU 多,堆内存、GC 调优成本高。

4)混合检索能力(ES 强项)

  • ES:向量+关键词+过滤+聚合 一锅搞定,比如:
    (向量相似) AND (title:Go) AND (date:2026)
  • Milvus:也支持过滤,但文本关键词能力远不如 ES

5)Eino 支持度

  • 两者都支持:
    • Indexer:milvus / es7 / es8
    • Retriever:milvus / es7 / es8
  • 切换成本:改初始化代码,RAG 链路(Loader→Splitter→Embedder→LLM)完全不动

三、Eino 里用 ES 做 RAG(可直接跑)

1)ES 准备(建 index 与 mapping)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
PUT /eino_rag
{
"mappings": {
"properties": {
"content": { "type": "text" },
"meta": { "type": "object" },
"content_vector": {
"type": "dense_vector",
"dims": 1536, // 适配 OpenAI 1536维
"index": true,
"similarity": "cosine"
}
}
}
}

2)Eino 初始化 ES Indexer/Retriever

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main

import (
"context"
"github.com/Cloudtogo/eino-ext/components/indexer/es8"
"github.com/Cloudtogo/eino-ext/components/retriever/es8"
"github.com/Cloudtogo/eino-ext/components/retriever/es8/search_mode"
"github.com/elastic/go-elasticsearch/v8"
"github.com/Cloudtogo/eino/embedding"
)

func main() {
ctx := context.Background()
// 1. ES 客户端
esCfg := elasticsearch.Config{
Addresses: []string{"http://localhost:9200"},
Username: "elastic",
Password: "xxx",
}
esClient, _ := elasticsearch.NewClient(esCfg)

// 2. 嵌入模型(和 Milvus 时一样)
embedder, _ := openai.NewEmbedder(openai.WithAPIKey("sk-xxx"))

// 3. ES Indexer(入库用)
idx, _ := es8.NewIndexer(ctx, &es8.IndexerConfig{
Client: esClient,
Index: "eino_rag",
Embedding: embedder,
})

// 4. ES Retriever(检索用)
ret, _ := es8.NewRetriever(ctx, &es8.RetrieverConfig{
Client: esClient,
Index: "eino_rag",
TopK: 3,
Embedding: embedder,
SearchMode: search_mode.DenseVectorSimilarity(search_mode.Cosine, "content_vector"),
})

// 后续:建库、检索、拼 prompt、LLM 生成
// 和用 Milvus 时完全一样!
}

四、什么时候可以换 ES?什么时候别换?

✅ 推荐用 ES 替换 Milvus

  1. 数据量小(<10w 条),追求简单、不想部署 Milvus;
  2. 强混合检索:既要向量语义,又要关键词/过滤/聚合;
  3. 已有 ES 集群,不想额外维护 Milvus;
  4. RAG 以文本检索为主、向量为辅

❌ 不推荐换(坚持 Milvus)

  1. 数据量大(>10w,尤其百万/亿级),要低延迟、高 QPS;
  2. 纯向量检索,追求极致性能与低成本;
  3. 生产高并发 RAG,对延迟敏感(如在线客服)。