异步链路
同步链路
用户请求 → 应用 → Redis (改) → MySQL (同步改,阻塞) → 返回
异步链路
用户请求 → 应用 → Redis (改) → MQ (发消息) → 立即返回
↓
消费服务 → MySQL (异步落库)
本质削峰填谷
需注意的风险
缓存与数据库短暂不一致:Redis 已更新,DB 还没消费完成,短时间查 DB 是旧数据(业务要容忍)
消息队列丢消息 → 缓存和 DB 永久不一致(需做消息重试、死信队列、对账补偿)
Redis 宕机 → 热点数据丢失(要开 RDB/AOF 持久化)
NIST FF1 格式保留加密(FPE) 对趋势递增 ID 进行可逆混淆
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
| package main
import ( "encoding/hex" "fmt" "math/big"
"github.com/bwmarrin/snowflake" "github.com/capitalone/fpe/ff1" )
const ( keyHex = "0123456789ABCDEF0123456789ABCDEF" tweakHex = "SAFEGUARD-ID-TWEAK" radix = 10 dlen = 19 )
var ( snow *snowflake.Node ff ff1.Cipher )
func init() { var err error snow, err = snowflake.NewNode(1) if err != nil { panic(err) }
key, _ := hex.DecodeString(keyHex) tweak := []byte(tweakHex) ff, err = ff1.NewCipher(radix, len(tweak), key, tweak) if err != nil { panic(err) } }
func encryptID(u uint64) (uint64, error) { s := fmt.Sprintf("%019d", u) out, err := ff.Encrypt(s) if err != nil { return 0, err } num, ok := new(big.Int).SetString(out, 10) if !ok { return 0, fmt.Errorf("bad num") } return num.Uint64(), nil }
func decryptID(u uint64) (uint64, error) { s := fmt.Sprintf("%019d", u) out, err := ff.Decrypt(s) if err != nil { return 0, err } num, ok := new(big.Int).SetString(out, 10) if !ok { return 0, fmt.Errorf("bad num") } return num.Uint64(), nil }
func main() { raw := snow.Generate().Uint64() enc, _ := encryptID(raw) dec, _ := decryptID(enc)
fmt.Println("raw:", raw) fmt.Println("enc:", enc) fmt.Println("dec:", dec) fmt.Println("ok:", dec == raw) }
|
【uint64 最大 18446744073709551615 → 十进制 19 位,所以 FF1 必须固定 19 位数字】
针对用户点赞/取消点赞的并发竞态问题,基于 Redis SETNX 实现分布式锁,消除重复计数 bug
通过redis锁将用户是否点赞设置为是 点赞计数增加 两者绑定为一次事务
eino库vs go原生
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package main
import ( "bytes" "encoding/json" "fmt" "net/http" )
func main() { reqBody := map[string]any{ "model": "gpt-3.5-turbo", "messages": []map[string]string{ {"role": "user", "content": "你好"}, }, } jsonData, _ := json.Marshal(reqBody)
req, _ := http.NewRequest("POST", "https://api.openai.com/v1/chat/completions", bytes.NewBuffer(jsonData)) req.Header.Set("Authorization", "Bearer YOUR_API_KEY") req.Header.Set("Content-Type", "application/json")
client := &http.Client{} resp, err := client.Do(req) if err != nil { panic(err) } defer resp.Body.Close()
var result map[string]any json.NewDecoder(resp.Body).Decode(&result) fmt.Println(result) }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| package main
import ( "context" "fmt" "github.com/Cloudtogo/eino/llms/openai" )
func main() { llm, _ := openai.NewLLM(openai.WithAPIKey("YOUR_API_KEY"))
msg := llm.Chat(context.Background(), "你好")
fmt.Println(msg.Content) }
|
eino RAG
标准 RAG 流程:
1 2
| 文档 → 加载 → 切分 → 向量化 → 向量库 用户问题 → 向量化 → 检索(向量+标量)→ 重排 → 拼上下文 → LLM → 回答
|
1)初始化组件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| import ( "context" "github.com/Cloudtogo/eino/schema" "github.com/Cloudtogo/eino/llms/openai" "github.com/Cloudtogo/eino/embedding" "github.com/Cloudtogo/eino/indexer/milvus" "github.com/Cloudtogo/eino/retriever" "github.com/Cloudtogo/eino/splitter" )
ctx := context.Background()
embedder, _ := openai.NewEmbedder(openai.WithAPIKey("sk-xxx"))
spl := splitter.NewMarkdownSplitter(500, 100)
idx, _ := milvus.NewIndexer( milvus.WithAddr("localhost:19530"), milvus.WithCollection("rag_demo"), milvus.WithEmbedder(embedder), )
ret, _ := retriever.NewRetriever( retriever.WithIndexer(idx), retriever.WithTopK(3), )
llm, _ := openai.NewLLM(openai.WithAPIKey("sk-xxx"))
|
2)建库:加载→切→向量化→入库
1 2 3 4 5 6 7 8 9
| loader := schema.NewFileLoader("./docs/") docs, _ := loader.Load(ctx)
chunks := spl.SplitDocuments(docs)
_ = idx.StoreDocuments(ctx, chunks)
|
3)问答:检索→拼prompt→生成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| query := "Eino RAG 怎么实现?"
results, _ := ret.Retrieve(ctx, query)
var context string for i, d := range results { context += fmt.Sprintf("【参考%d】%s\n", i+1, d.Content) }
messages := []*schema.Message{ schema.SystemMessage("基于参考资料回答,不要编造。"), schema.UserMessage(fmt.Sprintf("参考:\n%s\n问题:%s", context, query)), }
resp, _ := llm.Generate(ctx, messages) fmt.Println(resp.Content)
|
4)用 Chain 编排
1 2 3 4 5 6 7 8 9 10 11 12
| import "github.com/Cloudtogo/eino/compose"
chain := compose.NewChain[string, *schema.Message]() chain.AppendRetriever(ret) chain.AppendLambda(func(ctx context.Context, docs []*schema.Document) ([]*schema.Message, error) { return messages, nil }) chain.AppendChatModel(llm)
runnable, _ := chain.Compile(ctx) answer, _ := runnable.Invoke(ctx, "Eino RAG 怎么实现?")
|
Eino 支持
- 混合检索(Hybrid Search)
向量相似度 + 关键词检索 + 元数据过滤(如 source=xxx.md)
- 重排(Rerank)
BGE-Reranker/CrossEncoder,TopN 召回→TopK 精排,效果提升明显
- 语义切分
不要硬切 500 字,用 Markdown/标题/语义切分,避免逻辑断裂
- 流式输出
ChatStream 逐字返回,提升体验
- 可观测性
全链路 callback:加载/切分/嵌入/检索/生成的耗时、错误、日志
ES替换milvus
Eino 原生支持 ES 做 Indexer/Retriever,替换 Milvus 很方便
可以,但要看数据规模与性能诉求;Eino 原生支持 ES 做 Indexer/Retriever,替换 Milvus 很方便。下面分:能不能换、差别在哪、Eino 怎么用 ES、什么时候不建议换。
一、能不能直接替换?
能:
- Elasticsearch 8.x 原生支持
dense_vector + HNSW,可存向量、做 ANN 相似性检索。
- Eino 官方提供 ES7/ES8 的 Indexer 和 Retriever 组件,和 Milvus 一样是标准接口,上层 RAG 链路不用改。
- 一句话:Milvus ↔ ES,在 Eino 里只是换个 Indexer/Retriever 实现。
二、Milvus vs ES:核心差别(决定你能不能换)
1)定位不同
- Milvus:专用向量数据库,为高维向量相似搜索而生(C++ 内核)。
- ES:全文搜索引擎+向量扩展(Java/Lucene),主打文本检索,向量是附加能力。
2)性能差距(关键)
- 小数据(<10w):ES 够用,延迟差不大。
- 大数据(>10w,尤其百万/亿级):
- Milvus:P99 延迟 50–150ms,QPS 高 4–5 倍。
- ES:P99 200–500ms,数据越大慢得越明显(可慢 3–10 倍)。
- 原因:ES 向量索引基于 Lucene,JVM 内存开销大、优化不如专用库。
3)资源开销
- Milvus:内存效率高,IVF_PQ 压缩可省 70–80% 存储。
- ES:吃内存/CPU 多,堆内存、GC 调优成本高。
4)混合检索能力(ES 强项)
- ES:向量+关键词+过滤+聚合 一锅搞定,比如:
(向量相似) AND (title:Go) AND (date:2026)。
- Milvus:也支持过滤,但文本关键词能力远不如 ES。
5)Eino 支持度
- 两者都支持:
- Indexer:
milvus / es7 / es8。
- Retriever:
milvus / es7 / es8。
- 切换成本:改初始化代码,RAG 链路(Loader→Splitter→Embedder→LLM)完全不动。
三、Eino 里用 ES 做 RAG(可直接跑)
1)ES 准备(建 index 与 mapping)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| PUT /eino_rag { "mappings": { "properties": { "content": { "type": "text" }, "meta": { "type": "object" }, "content_vector": { "type": "dense_vector", "dims": 1536, // 适配 OpenAI 1536维 "index": true, "similarity": "cosine" } } } }
|
2)Eino 初始化 ES Indexer/Retriever
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| package main
import ( "context" "github.com/Cloudtogo/eino-ext/components/indexer/es8" "github.com/Cloudtogo/eino-ext/components/retriever/es8" "github.com/Cloudtogo/eino-ext/components/retriever/es8/search_mode" "github.com/elastic/go-elasticsearch/v8" "github.com/Cloudtogo/eino/embedding" )
func main() { ctx := context.Background() esCfg := elasticsearch.Config{ Addresses: []string{"http://localhost:9200"}, Username: "elastic", Password: "xxx", } esClient, _ := elasticsearch.NewClient(esCfg)
embedder, _ := openai.NewEmbedder(openai.WithAPIKey("sk-xxx"))
idx, _ := es8.NewIndexer(ctx, &es8.IndexerConfig{ Client: esClient, Index: "eino_rag", Embedding: embedder, })
ret, _ := es8.NewRetriever(ctx, &es8.RetrieverConfig{ Client: esClient, Index: "eino_rag", TopK: 3, Embedding: embedder, SearchMode: search_mode.DenseVectorSimilarity(search_mode.Cosine, "content_vector"), })
}
|
四、什么时候可以换 ES?什么时候别换?
✅ 推荐用 ES 替换 Milvus
- 数据量小(<10w 条),追求简单、不想部署 Milvus;
- 强混合检索:既要向量语义,又要关键词/过滤/聚合;
- 已有 ES 集群,不想额外维护 Milvus;
- RAG 以文本检索为主、向量为辅。
❌ 不推荐换(坚持 Milvus)
- 数据量大(>10w,尤其百万/亿级),要低延迟、高 QPS;
- 纯向量检索,追求极致性能与低成本;
- 生产高并发 RAG,对延迟敏感(如在线客服)。