聚簇索引

聚簇索引(Clustered Index)和非聚簇索引(Non-Clustered Index)是数据库中两种主要的索引类型,它们在数据存储方式和查询性能上有显著区别。

特性 聚簇索引 非聚簇索引
数据存储 数据行物理上按索引键顺序存储 索引与数据分开存储,索引包含指向数据行的指针
数量限制 一张表只能有1个 一张表可以有多个(通常最多249-999个)
叶子节点 存储的是实际数据行 存储的是索引键值 + 行定位符(如RID或聚簇键)
查询速度 范围查询极快(数据物理连续) 可能需要二次查找(回表)
插入/更新 较慢(需维护物理顺序) 相对较快
空间占用 不需要额外存储空间(数据即索引) 需要额外存储空间
1
2
3
4
5
-- 创建聚簇索引(主键默认创建)
CREATE CLUSTERED INDEX IX_EmpID ON Employees(EmpID);

-- 创建非聚簇索引
CREATE NONCLUSTERED INDEX IX_Name ON Employees(LastName);

乐观锁如何实现

乐观锁(Optimistic Locking)是一种数据库并发控制机制,用于在多用户并发访问同一数据时,确保数据的一致性。

版本号实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
-- 表结构
CREATE TABLE products (
id INT PRIMARY KEY,
name VARCHAR(100),
price DECIMAL(10,2),
stock INT,
version INT DEFAULT 0 -- 版本号字段
);

-- 查询时获取版本号
SELECT id, name, price, stock, version FROM products WHERE id = 1;
-- 结果: id=1, stock=100, version=5

-- 更新时检查版本号
UPDATE products
SET stock = stock - 1, version = version + 1
WHERE id = 1 AND version = 5;

-- 如果返回 affected_rows = 0,说明版本已变,更新失败(需重试或报错)

最后更新时间戳实现

1
2
3
4
5
-- 使用最后更新时间戳
UPDATE products
SET stock = stock - 1, updated_at = NOW()
WHERE id = 1 AND updated_at = '2024-01-15 10:30:00';
-- 如果返回 affected_rows = 0,说明时间戳已变,更新失败(需重试或报错)

状态机

– 订单状态:待支付 -> 已支付 -> 已发货
UPDATE orders
SET status = ‘已支付’, pay_time = NOW()
WHERE order_id = ‘ORD001’ AND status = ‘待支付’; – 必须当前是待支付状态

Redis 缓存的三大经典问题:缓存击穿、缓存穿透、缓存雪崩。

缓存击穿

热点 key 在失效的瞬间,大量请求同时打到数据库,造成数据库压力剧增。

方案 原理 优缺点
互斥锁 缓存失效时,只允许一个线程重建缓存 简单有效,但可能短暂阻塞
逻辑过期 设置逻辑过期时间,永不过期,异步重建 不阻塞,但可能读到旧数据
热点key预加载 提前识别热点,缓存不过期 需要预测热点

二、缓存穿透 (Cache Penetration)

查询一个不存在的数据,缓存中没有,数据库中也没有,导致每次请求都打到数据库。

方案 原理 适用场景
布隆过滤器 前置过滤,快速判断 key 是否可能存在 数据量大,误判可接受
缓存空值 数据库不存在也缓存空值(短过期时间) 简单场景,注意内存消耗
参数校验 接口层过滤非法参数 必须做,第一道防线
IP限流 对异常 IP 进行限流 防恶意攻击

三、缓存雪崩 (Cache Avalanche)

大量缓存同时失效 或 Redis 宕机,导致所有请求直接打到数据库,数据库瞬间崩溃。

方案 作用 实现
过期时间加随机值 避免同时失效 expire = base + random(0, 300)
多级缓存 本地缓存兜底 Caffeine + Redis
Redis 高可用 防止单点故障 主从 + 哨兵 / Cluster
服务熔断降级 保护数据库 Hystrix / Sentinel
提前预热 避免冷启动 定时任务预热热点数据

Mysql Redis一致性

Redis和数据库的数据一致性是分布式系统中的经典问题,核心在于缓存与持久化存储之间的同步。以下是主流解决方案及其适用场景:


一、核心策略:Cache-Aside(旁路缓存)模式

这是业界最常用方案,应用程序直接管理缓存与数据库的交互。

1. 读操作流程

1
2
3
4
5
6
7
8
def get_data(key):
data = redis.get(key) # 1. 先查缓存
if data:
return data # 缓存命中(Cache Hit)

data = db.query(key) # 2. 缓存未命中,查数据库
redis.set(key, data) # 3. 写入缓存(设置过期时间)
return data

2. 写操作流程(关键决策点)

方案 流程 优点 缺点 适用场景
Cache-Aside(先更新DB,后删缓存) 1. 更新DB → 2. 删除缓存 简单、最终一致 短暂不一致窗口 通用首选
Write-Through(同步写) 1. 更新DB → 2. 同步更新缓存 强一致性 性能差、写放大 读多写极少
Write-Behind(异步写) 1. 更新缓存 → 2. 异步批量写DB 极高写性能 可能丢数据 日志、计数

推荐:先更新数据库,再删除缓存(而非更新缓存)

  • 原因:删除缓存比更新缓存更安全。若两个并发写操作都更新缓存,后完成的操作可能覆盖先完成的数据,导致缓存与DB长期不一致。
  • 风险点:在”更新DB”和”删除缓存”之间存在短暂窗口(通常毫秒级),可能读到旧数据。

二、解决短暂不一致的高级方案

方案1:延迟双删(Delayed Double Delete)

1
2
3
4
5
6
7
def write_data(key, value):
redis.delete(key) # 1. 先删缓存(可选,降低脏读概率)
db.update(key, value) # 2. 更新数据库
redis.delete(key) # 3. 再删缓存(主操作)

# 4. 延迟再次删除(异步队列或定时任务)
async_task.schedule(500ms, lambda: redis.delete(key))
  • 原理:覆盖”读操作在写操作间隙回填旧数据”的竞态条件
  • 延迟时间:通常设置为数据库主从同步延迟 + 100ms

方案2:基于消息队列的异步删除

1
2
3
4
5
6
7
8
def write_data(key, value):
db.update(key, value) # 1. 更新数据库
mq.publish("cache_invalidate", key) # 2. 发送删除消息

# 消费者服务
def on_invalidate_message(key):
redis.delete(key)
# 可选:重试机制确保删除成功
  • 优势:解耦业务逻辑,支持重试保证最终一致性
  • 框架:Canal(MySQL Binlog监听)、Debezium

方案3:Binlog订阅(终极方案)

Canal架构

  • 流程:MySQL Binlog → Canal/Maxwell → MQ → 缓存删除服务
  • 优势:业务代码零侵入,数据库变更自动同步
  • 适用:大规模分布式系统,如阿里电商体系

三、极端一致性场景:分布式锁

当业务要求强一致性(如库存扣减)时,使用分布式锁串行化操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def deduct_stock(product_id, quantity):
lock_key = f"lock:stock:{product_id}"

# 1. 获取Redis分布式锁(Redisson实现)
if not redisson.try_lock(lock_key, wait_time=3s, lease_time=10s):
raise "系统繁忙"

try:
# 2. 查缓存/DB获取当前库存
stock = get_stock(product_id)

# 3. 业务校验
if stock < quantity:
return "库存不足"

# 4. 更新DB
db.update_stock(product_id, stock - quantity)

# 5. 更新/删除缓存
redis.delete(f"stock:{product_id}")
finally:
redisson.unlock(lock_key)

代价:性能下降,仅用于关键数据。


四、方案选型决策树

1
2
3
4
5
6
7
是否需要强一致性?
├── 是 → 使用分布式锁 + 立即删除缓存
└── 否 → 数据更新频率高?
├── 是 → 延迟双删 或 Binlog订阅
└── 否 → 简单Cache-Aside(先DB后删缓存)
└── 是否有大量并发写?
└── 是 → 消息队列异步删除

五、关键配置建议

配置项 推荐值 说明
缓存过期时间 业务容忍度 + 随机偏移 防止缓存雪崩
空值缓存 短暂过期(如60秒) 防止缓存穿透
布隆过滤器 大数据量时启用 防止缓存穿透
热Key探测 自动本地缓存 防止缓存击穿

六、总结

  • 默认选择:Cache-Aside + 先更新DB后删除缓存
  • 高并发优化:增加延迟双删或消息队列
  • 大规模系统:采用Canal订阅Binlog实现无侵入同步
  • 强一致场景:分布式锁保证,但需评估性能代价

最终一致性是分布式系统的常态,根据业务对不一致的容忍度选择合适方案,而非追求绝对一致。

快速排序

快速排序(Quick Sort)是一种非常高效的**分治(Divide and Conquer)**排序算法,由Tony Hoare于1959年发明。它的核心思想可以用三个步骤概括:


核心思想

1. 选择基准(Pivot)

从数组中选择一个元素作为”基准”(pivot)。选择方式有多种:

  • 第一个元素
  • 最后一个元素
  • 中间元素
  • 随机选择(更优)

2. 分区(Partition)

重新排列数组,使得:

  • 左边:所有元素 ≤ 基准
  • 右边:所有元素 > 基准
  • 基准元素放在最终正确的位置

这个过程称为分区操作

3. 递归排序

对基准左右两个子数组递归地执行上述过程,直到子数组长度为1或0。


直观例子

假设数组:[3, 6, 2, 7, 1, 5, 4]

1
2
3
4
5
6
7
步骤1: 选最后一个元素 4 作为基准
步骤2: 分区 → [3, 2, 1] [4] [6, 7, 5]
↑左边都比4小 ↑基准位置 ↑右边都比4大

步骤3: 递归处理左右两边
左边 [3, 2, 1] → 选1为基准 → [ ] [1] [3, 2] → 继续...
右边 [6, 7, 5] → 选5为基准 → [ ] [5] [6, 7] → 继续...

最终得到有序数组:[1, 2, 3, 4, 5, 6, 7]


时间复杂度

情况 复杂度 说明
最优 O(n log n) 每次基准都能将数组均匀分成两半
平均 O(n log n) 随机数据的表现
最坏 O(n²) 数组已有序,且选第一个/最后一个作为基准

为什么快?

  1. 原地排序:不需要额外数组空间(空间复杂度 O(log n),用于递归栈)
  2. 缓存友好:数据局部性好,CPU缓存命中率高
  3. 常数因子小:实际运行比归并排序、堆排序更快

代码框架(伪代码)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
function quickSort(arr, left, right):
if left < right:
pivotIndex = partition(arr, left, right) // 分区,返回基准最终位置
quickSort(arr, left, pivotIndex - 1) // 递归左半
quickSort(arr, pivotIndex + 1, right) // 递归右半

function partition(arr, left, right):
pivot = arr[right] // 选最右为基准
i = left - 1
for j from left to right-1:
if arr[j] <= pivot:
i++
swap(arr[i], arr[j])
swap(arr[i+1], arr[right])
return i + 1

关键优化

优化策略 作用
随机选基准 避免最坏情况
三数取中法 选左、中、右的中位数作为基准
小数组用插入排序 递归到小规模时,插入排序更快
尾递归优化 减少栈深度

快速排序是实际应用中最常用的排序算法之一,C++ STL的sort()、Java的Arrays.sort()(对于基本类型)底层都采用了快速排序或其变种。

冒泡排序 快速排序
比较方式 相邻元素两两比较 基准与多个元素一次比较
元素移动 每次只能移动一位(像气泡慢慢上浮) 一次分区能让元素跳跃到最终位置附近
信息利用 只比较相邻,信息利用率低 分区后知道”左边都小、右边都大”
平均复杂度 O(n²) O(n log n)

消息队列可靠性问题

消息丢失和重复消费是消息队列(MQ)系统中的核心可靠性问题。让我为您详细解析这两个问题的解决方案:

一、消息丢失的解决

消息丢失可能发生在三个环节:生产者 → MQ → 消费者

1. 生产者端防丢失

方案 原理 适用场景
同步发送 + 确认机制 等待Broker确认后才认为发送成功 对可靠性要求高的场景
异步发送 + 回调 发送失败时自动重试,通过回调感知结果 高吞吐场景
本地消息表 消息先落库,再异步发送,发送成功才更新状态 金融级可靠性要求
1
2
3
4
5
// RocketMQ 同步发送示例
SendResult sendResult = producer.send(msg);
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
// 记录日志,人工介入或自动重试
}

2. Broker端防丢失

核心策略:持久化 + 复制

  • 同步刷盘:消息写入磁盘后才返回确认(性能换可靠)
  • 同步复制:Master写入同时同步到Slave,主从都成功才返回
  • RAFT/Paxos协议:多副本共识机制(如RocketMQ 5.0的DLedger模式)
1
2
3
// RocketMQ 配置示例:同步刷盘 + 同步复制
brokerRole=SYNC_MASTER // 同步复制
flushDiskType=SYNC_FLUSH // 同步刷盘

3. 消费者端防丢失

关键:先处理业务,再确认消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 错误做法(自动提交)
consumer.registerMessageListener((msgs, context) -> {
// 处理业务... 如果这里崩溃,消息已确认但业务未执行
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

// 正确做法(手动提交)
consumer.registerMessageListener((msgs, context) -> {
try {
// 1. 先执行业务逻辑(如数据库操作)
processBusiness(msgs);

// 2. 业务成功后,再返回成功(MQ才会删除消息)
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 3. 失败时返回RECONSUME_LATER,消息会重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});

二、重复消费的解决

核心原则:MQ保证至少一次投递(At-least-once),去重由消费者实现

1. 幂等性设计(推荐)

让多次执行结果与一次执行相同:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 方案1:数据库唯一索引(简单场景)
CREATE TABLE order_consume_log (
msg_id VARCHAR(64) PRIMARY KEY, -- MQ消息ID
biz_id VARCHAR(64), -- 业务单号
create_time TIMESTAMP
);

// 插入时利用唯一键防重
INSERT IGNORE INTO order_consume_log (msg_id, biz_id) VALUES (?, ?);

// 方案2:Redis原子操作(高性能场景)
Boolean success = redisTemplate.opsForValue()
.setIfAbsent("msg:" + msgId, "1", 24, TimeUnit.HOURS);
if (!success) {
return; // 已处理过,直接返回
}

2. 业务层幂等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 订单支付场景:状态机幂等
public void payOrder(String orderId, BigDecimal amount) {
Order order = orderDao.select(orderId);

// 已支付直接返回(幂等)
if (order.getStatus() == OrderStatus.PAID) {
log.info("订单已支付,重复消息忽略: {}", orderId);
return;
}

// 只有待支付状态才执行
if (order.getStatus() == OrderStatus.PENDING) {
// 执行支付逻辑...
orderDao.updateStatus(orderId, OrderStatus.PAID);
}
}

3. 消息去重表(通用方案)

1
2
3
4
5
6
7
8
9
10
11
12
13
@Transactional
public void consumeMessage(Message msg) {
// 1. 先插入去重表(利用唯一键)
try {
dedupeDao.insert(new MessageRecord(msg.getMsgId()));
} catch (DuplicateKeyException e) {
log.warn("重复消息: {}", msg.getMsgId());
return; // 已消费过,直接返回
}

// 2. 执行业务逻辑
doBusiness(msg);
}

三、主流MQ的可靠性对比

特性 Kafka RocketMQ RabbitMQ
生产确认 acks=all 同步发送 Publisher Confirm
持久化 顺序写磁盘 同步/异步刷盘 队列持久化
复制 ISR机制 主从同步/异步 镜像队列
消费确认 手动提交offset 手动返回ACK 手动ACK
消息去重 需业务实现 支持消息去重 需业务实现
事务消息 不支持 支持 支持

四、最佳实践总结

1
2
3
4
5
6
7
8
9
10
1. 消息丢失防护链:
生产者本地表 → 同步刷盘复制 → 消费者先业务后ACK

2. 重复消费解决:
业务天然幂等 > 去重表 > Redis幂等 > 数据库唯一键

3. 监控告警:
- 消息堆积监控
- 消费延迟监控
- 死信队列监控

redis相关

RDB(Redis Database)和 AOF(Append Only File)

特性 RDB AOF
存储内容 某一时刻的内存数据快照(二进制压缩) 所有写操作命令的日志(文本格式)
文件体积 紧凑,经过压缩,体积小 较大,会记录每条命令
恢复速度 ,直接加载二进制数据到内存 ,需要逐条重放命令
数据安全性 可能丢失最后一次快照后的数据 更可靠,可配置每秒或每次写入都同步
性能影响 fork 子进程时会有短暂阻塞 持续写入,fsync 策略决定性能开销
可读性 二进制,不可读 文本文件,人类可读
场景 推荐方案
需要快速恢复、容灾备份 RDB
数据安全性要求高,不能容忍数据丢失 AOF
生产环境最佳实践 RDB + AOF 混合使用(Redis 4.0+ 支持)

我来详细讲解**堆(Heap)**这种数据结构,从原理到实现,再到各种应用场景。


一、堆的核心概念

1. 什么是堆?

堆是一种特殊的完全二叉树,满足堆序性质(Heap Property)

类型 定义
大根堆(Max Heap) 父节点的值 ≥ 子节点的值,堆顶是最大值
小根堆(Min Heap) 父节点的值 ≤ 子节点的值,堆顶是最小值

堆的结构示意图

2. 完全二叉树的数组表示

堆通常用数组存储,不需要指针,非常高效:

1
2
3
4
5
6
7
8
9
10
数组索引:  0   1   2   3   4   5   6
[100, 19, 36, 17, 3, 25, 1] ← 大根堆示例

100
/ \
19 36
/ \ /
17 3 25
/
1

父子节点索引关系(假设根节点索引为 0):

关系 公式
父节点索引 parent(i) = (i - 1) // 2
左子节点索引 left(i) = 2 * i + 1
右子节点索引 right(i) = 2 * i + 2

二、堆的核心操作

1. 上浮(Sift Up / Heapify Up)

场景:新元素插入到堆尾,需要调整到正确位置

1
2
过程:与子节点比较,如果不满足堆序,则与较大(大根堆)/较小(小根堆)的子节点交换
时间复杂度:O(log n)

2. 下沉(Sift Down / Heapify Down)

场景:删除堆顶后,将堆尾元素放到堆顶,然后调整

1
2
过程:与父节点比较,如果不满足堆序,则与父节点交换,直到根节点
时间复杂度:O(log n)

3. 建堆(Heapify)

两种方法

方法 过程 时间复杂度
逐个插入 每个元素执行 Sift Up O(n log n)
堆化算法 从最后一个非叶子节点开始,自底向上执行 Sift Down O(n)

三、完整代码实现(Python)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class MinHeap:
"""小根堆实现"""

def __init__(self):
self.heap = []

def parent(self, i):
return (i - 1) // 2

def left(self, i):
return 2 * i + 1

def right(self, i):
return 2 * i + 2

def insert(self, val):
"""插入元素:放到末尾,然后上浮"""
self.heap.append(val)
self._sift_up(len(self.heap) - 1)

def _sift_up(self, i):
"""上浮操作"""
parent = self.parent(i)
if i > 0 and self.heap[i] < self.heap[parent]:
self.heap[i], self.heap[parent] = self.heap[parent], self.heap[i]
self._sift_up(parent)

def extract_min(self):
"""删除并返回堆顶(最小值)"""
if not self.heap:
return None

min_val = self.heap[0]
# 将最后一个元素放到堆顶
self.heap[0] = self.heap[-1]
self.heap.pop()

if self.heap:
self._sift_down(0)
return min_val

def _sift_down(self, i):
"""下沉操作"""
min_idx = i
left = self.left(i)
right = self.right(i)

# 找到三者中的最小值
if left < len(self.heap) and self.heap[left] < self.heap[min_idx]:
min_idx = left
if right < len(self.heap) and self.heap[right] < self.heap[min_idx]:
min_idx = right

# 如果需要交换
if min_idx != i:
self.heap[i], self.heap[min_idx] = self.heap[min_idx], self.heap[i]
self._sift_down(min_idx)

def peek(self):
"""查看堆顶,不删除"""
return self.heap[0] if self.heap else None

def build_heap(self, arr):
"""堆化:O(n) 建堆"""
self.heap = arr[:]
# 从最后一个非叶子节点开始下沉
for i in range(len(arr) // 2 - 1, -1, -1):
self._sift_down(i)


# 使用示例
heap = MinHeap()
heap.insert(5)
heap.insert(3)
heap.insert(8)
heap.insert(1)

print(heap.extract_min()) # 1
print(heap.extract_min()) # 3

四、Python 内置堆模块 heapq

Python 标准库提供了高效的堆实现,默认是小根堆

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import heapq

# 1. 基本操作
heap = []
heapq.heappush(heap, 5)
heapq.heappush(heap, 3)
heapq.heappush(heap, 8)

print(heapq.heappop(heap)) # 3 (最小值)

# 2. 从列表建堆(原地堆化,O(n))
data = [5, 3, 8, 1, 2]
heapq.heapify(data) # 变成 [1, 2, 8, 3, 5]

# 3. 查看堆顶不弹出
print(data[0]) # 1

# 4. 大根堆技巧:存储负数
max_heap = []
heapq.heappush(max_heap, -5) # 实际存 -5,表示 5
heapq.heappush(max_heap, -3)
print(-heapq.heappop(max_heap)) # 5

# 5. 获取最大的/最小的 n 个元素
nums = [1, 8, 2, 23, 7, -4, 18, 23, 42, 37, 2]
print(heapq.nlargest(3, nums)) # [42, 37, 23]
print(heapq.nsmallest(3, nums)) # [-4, 1, 2]

# 6. 合并多个有序迭代器
sorted1 = [1, 3, 5]
sorted2 = [2, 4, 6]
merged = heapq.merge(sorted1, sorted2) # 生成器,产生 1,2,3,4,5,6

五、经典应用场景详解

场景 1:Top K 问题 ⭐⭐⭐

问题:从 N 个元素中找到最大的 K 个(或最小的 K 个)

解法:维护一个大小为 K 的小根堆

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import heapq

def top_k(nums, k):
"""找最大的 K 个数"""
# 维护大小为 K 的小根堆
heap = nums[:k]
heapq.heapify(heap)

for num in nums[k:]:
if num > heap[0]: # 比堆顶大
heapq.heapreplace(heap, num) # 替换堆顶

return heap # 最大的 K 个数(无序)

# 时间复杂度:O(n log k),空间复杂度:O(k)

应用:热搜榜、销量排行榜、实时统计


场景 2:数据流的中位数 ⭐⭐⭐

问题:动态数据流中实时获取中位数

解法对顶堆 - 一个大根堆(存较小的一半)+ 一个小根堆(存较大的一半)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import heapq

class MedianFinder:
def __init__(self):
# 大根堆(存较小的一半,用负数实现)
self.small = []
# 小根堆(存较大的一半)
self.large = []

def addNum(self, num: int) -> None:
# 先放入大根堆
heapq.heappush(self.small, -num)

# 平衡:确保 small 的最大值 <= large 的最小值
if (self.small and self.large and
(-self.small[0]) > self.large[0]):
val = -heapq.heappop(self.small)
heapq.heappush(self.large, val)

# 平衡大小:两个堆的大小差不超过 1
if len(self.small) > len(self.large) + 1:
val = -heapq.heappop(self.small)
heapq.heappush(self.large, val)
elif len(self.large) > len(self.small) + 1:
val = heapq.heappop(self.large)
heapq.heappush(self.small, -val)

def findMedian(self) -> float:
if len(self.small) > len(self.large):
return -self.small[0]
elif len(self.large) > len(self.small):
return self.large[0]
else:
return (-self.small[0] + self.large[0]) / 2

场景 3:任务调度与优先级队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import heapq
import time

class TaskScheduler:
def __init__(self):
# (优先级, 插入时间, 任务) - 时间戳避免比较错误
self._queue = []
self._counter = 0

def add_task(self, priority, task):
"""priority 越小优先级越高"""
heapq.heappush(self._queue, (priority, self._counter, task))
self._counter += 1

def run_next(self):
if self._queue:
priority, _, task = heapq.heappop(self._queue)
print(f"执行: {task} (优先级: {priority})")
return task
return None

# 使用
scheduler = TaskScheduler()
scheduler.add_task(2, "普通邮件发送")
scheduler.add_task(1, "紧急报警处理") # 先执行
scheduler.add_task(3, "日志备份")

scheduler.run_next() # 紧急报警处理
scheduler.run_next() # 普通邮件发送

场景 4:Dijkstra 最短路径算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import heapq
from collections import defaultdict

def dijkstra(graph, start):
"""
graph: {u: [(v, weight), ...]} 邻接表
"""
dist = {node: float('inf') for node in graph}
dist[start] = 0
pq = [(0, start)] # (距离, 节点)

while pq:
d, u = heapq.heappop(pq)

if d > dist[u]: # 已经处理过更短的路径
continue

for v, weight in graph[u]:
if dist[u] + weight < dist[v]:
dist[v] = dist[u] + weight
heapq.heappush(pq, (dist[v], v))

return dist

# 时间复杂度:O((V+E) log V)

场景 5:堆排序(Heap Sort)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def heap_sort(arr):
"""原地堆排序,空间 O(1)"""
# 建堆
heapq.heapify(arr)

result = []
while arr:
result.append(heapq.heappop(arr))

return result

# 或者原地排序(从大到小)
def heap_sort_desc(arr):
n = len(arr)
heapq.heapify(arr)

for i in range(n - 1, 0, -1):
arr[0], arr[i] = arr[i], arr[0] # 堆顶与末尾交换
# 对前 i 个元素重新堆化
heapq._siftup(arr, 0, i) # 内部方法,谨慎使用

return arr # 此时是从大到小

六、堆 vs 其他数据结构的对比

操作 数组/链表 二叉搜索树
查找最大值 O(n) O(log n) O(1)
删除最大值 O(n) O(log n) O(log n)
插入 O(1)/O(n) O(log n) O(log n)
查找任意元素 O(n) O(log n) O(n) ❌
适用场景 随机访问 有序遍历 极值操作

关键洞察

  • 如果你只需要快速获取极值(最大/最小),堆是最佳选择
  • 如果你需要有序遍历所有元素,用二叉搜索树或排序数组
  • 堆牺牲了”查找任意元素”的能力,换取了极值操作的极致效率

七、面试高频考点

考点 要点
建堆的时间复杂度 O(n),不是 O(n log n)
为什么堆适合 Top K 空间 O(k),时间 O(n log k)
第 K 大 vs 第 K 小 大根堆存小的,小根堆存大的
对顶堆 两个堆维护中位数、数据流问题
堆的数组表示 父子索引公式,不需要指针

堆是算法面试中性价比极高的数据结构,掌握它能在很多问题上快速找到最优解!

堆插入的logn是怎么来的

要理解堆插入的时间复杂度是 $O(\log n)$,我们先从堆的结构和插入操作的核心逻辑讲起,再用通俗的方式推导时间复杂度。

1. 先明确堆的基本结构

堆本质是一棵完全二叉树,特点是:

  • 树的每一层都被完全填满(除了最后一层,且最后一层的节点都靠左排列);
  • 对于大小为 $n$ 的堆,树的高度 $h$ 满足 $h = \lfloor \log_2 n \rfloor$(比如 $n=8$ 时高度是3,$n=15$ 时高度是4)。

以最常见的大顶堆为例(父节点值 ≥ 子节点值),插入操作的核心目标是:新元素插入后,依然保持堆的结构和性质。

2. 堆插入的完整过程(以大顶堆为例)

插入操作分两步,核心耗时在第二步:

步骤1:尾插新元素

把新元素直接放到堆的最后一个位置(完全二叉树的最后一个节点),这一步的时间复杂度是 $O(1)$(只需找到最后位置,直接赋值)。

步骤2:向上调整(堆化/Percolate Up)

新元素可能破坏堆的性质(比如大顶堆中,新元素比父节点大),因此需要从下往上逐层比较并交换,直到新元素找到合适的位置:

  1. 比较新元素与其父节点的值;
  2. 如果不满足堆性质(比如大顶堆中子 > 父),交换两者位置;
  3. 重复步骤1-2,直到新元素 ≤ 父节点(大顶堆),或到达堆顶(根节点)。

3. 为什么时间复杂度是 $O(\log n)$?

时间复杂度的核心是「向上调整的最大次数」,而调整次数由堆的高度决定:

  • 堆是完全二叉树,高度 $h = \lfloor \log_2 n \rfloor$($n$ 是堆的元素总数);
  • 新元素最多需要从最后一层,逐层交换到根节点,最多交换 $h$ 次;
  • 每次交换/比较的操作是 $O(1)$,因此总时间复杂度 = 调整次数 × 单次操作复杂度 = $O(\log n)$。

举个直观例子

堆的元素数 $n$ 堆的高度 $h$ 最大调整次数
1 1 0
2-3 2 1
4-7 3 2
8-15 4 3
$2^k - 1$ $k$ $k-1$

可以看到,调整次数始终是 $\log_2 n$ 级别(比如 $n=1000$ 时,$\log_2 1000 ≈ 10$,最多只需要调整10次)。

4. 代码示例(Python 大顶堆插入)

用代码直观感受插入的核心逻辑(Python 标准库的 heapq 是小顶堆,这里手动实现大顶堆):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class MaxHeap:
def __init__(self):
self.heap = [] # 用列表存储堆,索引i的父节点是(i-1)//2,左子节点是2i+1,右子节点是2i+2

def insert(self, val):
# 步骤1:尾插新元素
self.heap.append(val)
# 步骤2:向上调整
self._percolate_up(len(self.heap) - 1)

def _percolate_up(self, idx):
# 递归/循环向上调整,直到满足堆性质或到达根节点
if idx == 0: # 根节点,无需调整
return
parent_idx = (idx - 1) // 2 # 计算父节点索引
# 大顶堆:如果当前节点 > 父节点,交换并继续调整
if self.heap[idx] > self.heap[parent_idx]:
self.heap[idx], self.heap[parent_idx] = self.heap[parent_idx], self.heap[idx]
self._percolate_up(parent_idx) # 递归调整父节点位置

# 测试
heap = MaxHeap()
heap.insert(5)
heap.insert(3)
heap.insert(8) # 插入8后,会和父节点5交换,最终堆为 [8,3,5]
print(heap.heap) # 输出: [8, 3, 5]

代码中 _percolate_up 方法的递归次数,就是堆的高度($\log n$ 次),因此插入的时间复杂度是 $O(\log n)$。

总结

  1. 堆是完全二叉树,大小为 $n$ 的堆高度为 $\lfloor \log_2 n \rfloor$;
  2. 堆插入的核心耗时在「向上调整」,调整次数最多等于堆的高度;
  3. 单次调整是 $O(1)$,因此总时间复杂度为 $O(\log n)$(忽略常数和低阶项)。

简单来说:堆插入的调整次数只和“树的高度”有关,而完全二叉树的高度是对数级别的,因此时间复杂度是 $O(\log n)$。

父子节点关系

我来详细讲解堆中父节点和子节点的关系,这是理解堆结构的核心。


一、索引关系公式(数组实现)

堆用数组存储完全二叉树,父子节点通过索引计算相互定位:

关系 公式 说明
父节点索引 parent(i) = (i - 1) // 2 整数除法,向下取整
左子节点索引 left(i) = 2 * i + 1 奇数位置
右子节点索引 right(i) = 2 * i + 2 偶数位置(左子节点+1)

可视化示例

1
2
3
4
5
6
7
8
9
10
11
12
13
数组索引:  0    1    2    3    4    5    6    7    8
[30, 20, 15, 5, 10, 8, 12, 3, 6]
↑ ↑ ↑ ↑ ↑ ↑ ↑ ↑ ↑
根 左 右 左左 左右 右左 右右 左左左 左左右

树形结构:
30 (0) ← 根节点,无父节点
/ \
20 (1) 15 (2) ← 父节点是 0: (1-1)//2=0, (2-1)//2=0
/ \ / \
5 (3) 10 (4) 8 (5) 12 (6) ← 父节点分别是 1,1,2,2
/ \
3 (7) 6 (8) ← 父节点是 3: (7-1)//2=3, (8-1)//2=3

验证计算

节点 索引 父节点计算 左子节点 右子节点
30 0 无(根) 2×0+1=1 (20) 2×0+2=2 (15)
20 1 (1-1)//2=0 (30) 2×1+1=3 (5) 2×1+2=4 (10)
15 2 (2-1)//2=0 (30) 2×2+1=5 (8) 2×2+2=6 (12)
5 3 (3-1)//2=1 (20) 2×3+1=7 (3) 2×3+2=8 (6)
10 4 (4-1)//2=1 (20) 9(超出范围,无) 10(超出范围,无)

二、值的关系(堆序性质)

除了索引关系,父子节点还有值的大小关系,这是堆的核心特性:

小根堆(Min Heap)

1
2
父节点的值 ≤ 子节点的值
即:heap[parent] ≤ heap[left] 且 heap[parent] ≤ heap[right]
1
2
3
4
5
6
示例:
3 ← 根最小
/ \
5 8 ← 5≥3, 8≥3 ✓
/ \ /
10 7 9 ← 10≥5, 7≥5, 9≥8 ✓

大根堆(Max Heap)

1
2
父节点的值 ≥ 子节点的值
即:heap[parent] ≥ heap[left] 且 heap[parent] ≥ heap[right]
1
2
3
4
5
6
7
8
示例:
100 ← 根最大
/ \
19 36 ← 19≤100, 36≤100 ✓
/ \ /
17 3 25 ← 17≤19, 3≤19, 25≤36 ✓
/
1 ← 1≤17 ✓

⚠️ 重要:父子关系 vs 兄弟关系

关系 约束 说明
父子 严格满足堆序 小根堆:父 ≤ 子;大根堆:父 ≥ 子
兄弟 无约束! 左子节点可以大于或小于右子节点
1
2
3
4
5
6
7
8
9
10
合法的小根堆:
5
/ \
8 10 ← 8 < 10(左小于右)

5
/ \
10 8 ← 10 > 8(左大于右,也合法!)

只要 5 ≤ 8 且 5 ≤ 10,就是合法堆

三、代码实现验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class Heap:
def __init__(self, is_min_heap=True):
self.heap = []
self.is_min_heap = is_min_heap # True:小根堆, False:大根堆

def parent(self, i):
"""获取父节点索引"""
if i == 0:
return None # 根节点无父节点
return (i - 1) // 2

def left(self, i):
"""获取左子节点索引"""
left_idx = 2 * i + 1
return left_idx if left_idx < len(self.heap) else None

def right(self, i):
"""获取右子节点索引"""
right_idx = 2 * i + 2
return right_idx if right_idx < len(self.heap) else None

def compare(self, parent_val, child_val):
"""根据堆类型比较父子大小"""
if self.is_min_heap:
return parent_val <= child_val # 小根堆:父应≤子
else:
return parent_val >= child_val # 大根堆:父应≥子

def check_heap_property(self):
"""验证整棵树是否满足堆序性质"""
for i in range(len(self.heap)):
p_idx = self.parent(i)
if p_idx is not None:
if not self.compare(self.heap[p_idx], self.heap[i]):
return False, f"节点{i}({self.heap[i]})违反堆序,父节点{p_idx}({self.heap[p_idx]})"
return True, "堆序性质满足"

def get_relationships(self, i):
"""获取某个节点的完整关系信息"""
info = {
"index": i,
"value": self.heap[i],
"parent": None,
"left_child": None,
"right_child": None
}

p_idx = self.parent(i)
if p_idx is not None:
info["parent"] = {
"index": p_idx,
"value": self.heap[p_idx],
"relation": "父节点"
}

l_idx = self.left(i)
if l_idx is not None:
info["left_child"] = {
"index": l_idx,
"value": self.heap[l_idx],
"relation": "左子节点"
}

r_idx = self.right(i)
if r_idx is not None:
info["right_child"] = {
"index": r_idx,
"value": self.heap[r_idx],
"relation": "右子节点"
}

return info


# 测试
heap = Heap(is_min_heap=True)
heap.heap = [1, 3, 6, 5, 9, 8]

print("堆数组:", heap.heap)
print("\n各节点关系:")
for i in range(len(heap.heap)):
rel = heap.get_relationships(i)
print(f"节点 {i} (值 {rel['value']}):")
if rel['parent']:
p = rel['parent']
print(f" 父节点: 索引{p['index']}, 值{p['value']}")
if rel['left_child']:
l = rel['left_child']
print(f" 左子节点: 索引{l['index']}, 值{l['value']}")
if rel['right_child']:
r = rel['right_child']
print(f" 右子节点: 索引{r['index']}, 值{r['value']}")

print(f"\n堆序验证: {heap.check_heap_property()}")

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
堆数组: [1, 3, 6, 5, 9, 8]

各节点关系:
节点 0 (值 1):
左子节点: 索引1, 值3
右子节点: 索引2, 值6
节点 1 (值 3):
父节点: 索引0, 值1
左子节点: 索引3, 值5
右子节点: 索引4, 值9
节点 2 (值 6):
父节点: 索引0, 值1
左子节点: 索引5, 值8
节点 3 (值 5):
父节点: 索引1, 值3
节点 4 (值 9):
父节点: 索引1, 值3
节点 5 (值 8):
父节点: 索引2, 值6

堆序验证: (True, '堆序性质满足')

四、层级关系图

1
2
3
4
5
6
7
8
9
10
11
12
13
层级 0: 索引 0 (2^0 = 1 个节点)

层级 1: 索引 1-2 (2^1 = 2 个节点)
/ \
层级 2: 索引 3-6 (2^2 = 4 个节点)
/ \ / \
层级 3: 索引 7-14 (2^3 = 8 个节点)
...以此类推

关键规律:
- 第 k 层的起始索引 = 2^k - 1
- 第 k 层最多有 2^k 个节点
- 节点 i 所在的层级 = floor(log₂(i+1))

五、父子关系的应用

1. 上浮操作利用父节点关系

1
2
3
4
5
6
7
8
9
def sift_up(heap, i):
"""通过父节点索引向上调整"""
while i > 0:
parent = (i - 1) // 2
if heap[parent] > heap[i]: # 小根堆:父>子,需要交换
heap[parent], heap[i] = heap[i], heap[parent]
i = parent # 继续向上检查新的父节点
else:
break

2. 下沉操作利用子节点关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def sift_down(heap, i, n):
"""通过子节点索引向下调整"""
while True:
smallest = i
left = 2 * i + 1
right = 2 * i + 2

# 找三者中的最小值(小根堆)
if left < n and heap[left] < heap[smallest]:
smallest = left
if right < n and heap[right] < heap[smallest]:
smallest = right

if smallest != i:
heap[i], heap[smallest] = heap[smallest], heap[i]
i = smallest # 继续向下检查新的子节点
else:
break

六、总结

关系类型 具体含义 公式/约束
物理关系(索引) 数组中的位置计算 父=(i-1)//2, 左子=2i+1, 右子=2i+2
逻辑关系(值) 堆序性质 小根堆:父≤子;大根堆:父≥子
层级关系 树的深度 节点i在层级⌊log₂(i+1)⌋

核心洞察

  • 索引关系让堆可以用数组高效存储,无需指针
  • 值的关系保证了堆顶永远是全局最值(最小或最大)
  • 这两种关系配合,使得堆能在 $O(1)$ 获取极值,$O(\log n)$ 插入删除

汉诺塔问题

有一个栈A存了多个无序整数,还有一个空栈B,要求借助栈B对无序整数排序,空间复杂度O(1)

问题分析与解题思路

首先要明确核心要求:仅借助空栈B,将栈A中的无序整数排序,且空间复杂度为O(1)(即除了两个栈本身,不额外申请大规模存储空间)。这个问题的核心逻辑类似栈排序 + 汉诺塔的”借助辅助栈转移元素”思想,核心思路是:

  1. 每次从栈A弹出栈顶元素(记为temp);
  2. 将栈B中比temp大(或小,取决于升序/降序)的元素逐个移回栈A;
  3. temp压入栈B的正确位置;
  4. 重复上述步骤,直到栈A为空,此时栈B中就是有序的元素(若要恢复到栈A,可将B全部移回A)。

完整实现代码(Python)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class Stack:
"""栈的基础实现(模拟实际栈的操作)"""
def __init__(self):
self.items = []

def is_empty(self):
return len(self.items) == 0

def push(self, item):
self.items.append(item)

def pop(self):
if not self.is_empty():
return self.items.pop()
raise IndexError("栈为空,无法弹出元素")

def peek(self):
if not self.is_empty():
return self.items[-1]
raise IndexError("栈为空,无法查看栈顶")

def __str__(self):
return str(self.items)

def sort_stack_with_helper(A, B, ascending=True):
"""
借助空栈B对栈A排序,空间复杂度O(1)
:param A: 待排序的栈(非空)
:param B: 辅助栈(初始为空)
:param ascending: True=升序(栈B顶为最大值),False=降序(栈B顶为最小值)
"""
while not A.is_empty():
# 步骤1:弹出A的栈顶元素作为待插入元素
temp = A.pop()

# 步骤2:将B中不符合顺序的元素移回A
while not B.is_empty():
# 升序:B中比temp大的元素移回A;降序:B中比temp小的元素移回A
if (ascending and B.peek() > temp) or (not ascending and B.peek() < temp):
A.push(B.pop())
else:
break

# 步骤3:将temp压入B的正确位置
B.push(temp)

# 可选:若需要将排序结果放回栈A,执行以下代码(此时A为空,B有序)
# while not B.is_empty():
# A.push(B.pop())

# 测试示例
if __name__ == "__main__":
# 初始化栈A,存入无序整数
stack_A = Stack()
for num in [5, 2, 8, 1, 3]:
stack_A.push(num)
stack_B = Stack()

print("排序前栈A:", stack_A) # 输出:排序前栈A: [5, 2, 8, 1, 3]
sort_stack_with_helper(stack_A, stack_B, ascending=True)
print("排序后栈B:", stack_B) # 输出:排序后栈B: [1, 2, 3, 5, 8](栈底到栈顶升序,栈顶是最大值)

代码关键解释

  1. 栈的基础实现:用列表模拟栈(append=压栈,pop=弹栈),封装成Stack类,保证操作符合栈的”后进先出”特性;
  2. 核心排序逻辑
    • 外层循环:只要栈A不为空,就持续处理栈A的栈顶元素;
    • 内层循环:调整栈B的元素——把B中不符合排序顺序的元素移回A,为temp腾出正确位置;
    • 压入temp:此时B的栈顶元素已满足”≤temp(升序)”或”≥temp(降序)”,压入后B保持局部有序;
  3. 空间复杂度:仅使用两个栈本身的空间,没有额外申请数组、字典等大规模存储,满足O(1)要求(注:这里的O(1)是”除输入输出外的额外空间”,符合汉诺塔类问题的空间复杂度定义)。

执行过程示例(升序)

以栈A初始元素[5,2,8,1,3](栈底→栈顶)为例:

  • 第一步:A弹出3 → B为空,直接压入B → B=[3];
  • 第二步:A弹出1 → B顶3>1,将3移回A → B为空,压入1 → B=[1];
  • 第三步:A弹出8 → B顶1<8,直接压入B → B=[1,8];
  • 第四步:A弹出2 → B顶8>2,将8移回A → B顶1<2,压入2 → B=[1,2];
  • 第五步:A弹出5 → B顶2<5,直接压入B → B=[1,2,5];
  • 第六步:A弹出8(之前移回的)→ B顶5<8,压入B → B=[1,2,5,8];
  • 第七步:A弹出3(之前移回的)→ B顶8>3,移回8;B顶5>3,移回5 → B顶2<3,压入3 → B=[1,2,3];
  • 最后将移回A的5、8依次压入B → B最终为[1,2,3,5,8]

总结

  1. 核心逻辑:逐个取出栈A元素,借助栈B调整位置,将元素插入B的有序区间,最终B存储有序结果;
  2. 空间复杂度:仅使用两个栈的原生空间,无额外大规模存储,满足O(1)要求;
  3. 灵活性:通过ascending参数可切换升序/降序,若需将结果放回栈A,只需将B的元素全部弹出并压入A。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
初始状态:
栈 A(乱序盘子):最下→最上 5 → 2 → 8 → 1 → 3(3 在最上面)
栈 B(空盘子堆):空
步骤 1:处理 A 的最上面盘子「3」
从 A 拿走 3(A 现在:5→2→8→1);
B 是空的,直接把 3 放到 B(B 现在:3);
✅ 状态:A=[5,2,8,1],B=[3]
步骤 2:处理 A 的最上面盘子「1」
从 A 拿走 1(A 现在:5→2→8);
看 B 的最上面是 3,3 比 1 大(1 要放在 3 下面才对),所以先把 3 从 B 挪回 A(A 现在:5→2→8→3);
B 现在空了,把 1 放到 B(B 现在:1);
✅ 状态:A=[5,2,8,3],B=[1]
步骤 3:处理 A 的最上面盘子「3」
从 A 拿走 3(A 现在:5→2→8);
看 B 的最上面是 1,1 比 3 小(3 放在 1 上面刚好),直接把 3 放到 B(B 现在:1→3);
✅ 状态:A=[5,2,8],B=[1,3]
步骤 4:处理 A 的最上面盘子「8」
从 A 拿走 8(A 现在:5→2);
看 B 的最上面是 3,3 比 8 小,直接把 8 放到 B(B 现在:1→3→8);
✅ 状态:A=[5,2],B=[1,3,8]
步骤 5:处理 A 的最上面盘子「2」
从 A 拿走 2(A 现在:5);
看 B 的最上面是 8,8 比 2 大,先把 8 挪回 A(A 现在:5→8);
再看 B 的最上面是 3,3 比 2 大,把 3 挪回 A(A 现在:5→8→3);
现在 B 的最上面是 1,1 比 2 小,把 2 放到 B(B 现在:1→2);
✅ 状态:A=[5,8,3],B=[1,2]
步骤 6:处理 A 的最上面盘子「3」
从 A 拿走 3(A 现在:5→8);
看 B 的最上面是 2,2 比 3 小,直接把 3 放到 B(B 现在:1→2→3);
✅ 状态:A=[5,8],B=[1,2,3]
步骤 7:处理 A 的最上面盘子「8」
从 A 拿走 8(A 现在:5);
看 B 的最上面是 3,3 比 8 小,直接把 8 放到 B(B 现在:1→2→3→8);
✅ 状态:A=[5],B=[1,2,3,8]
步骤 8:处理 A 的最上面盘子「5」
从 A 拿走 5(A 现在:空);
看 B 的最上面是 8,8 比 5 大,把 8 挪回 A(A 现在:8);
现在 B 的最上面是 3,3 比 5 小,把 5 放到 B(B 现在:1→2→3→5);
✅ 状态:A=[8],B=[1,2,3,5]
步骤 9:处理 A 的最后一个盘子「8」
从 A 拿走 8(A 现在:空);
看 B 的最上面是 5,5 比 8 小,直接把 8 放到 B(B 现在:1→2→3→5→8);
✅ 最终状态:A = 空,B=[1→2→3→5→8](完全有序)

好难我靠,看别人的面经每篇都有一半的内容能给我问死,我现在就搁这天天拷打AI,真能找到实习嘛,别到暑假真在皇帝电竞接一暑假单了我靠
这集拉了

tcp&udp

这歌好听13.0℃ / ユシナ feat.GUMI

一、网络分层模型

现代网络通信采用分层架构,最经典的是 OSI 七层模型 和实际广泛使用的 TCP/IP 四层模型

OSI 七层 TCP/IP 四层 核心功能 典型协议/设备
7. 应用层 为用户程序提供网络服务 HTTP、FTP、SMTP、DNS
6. 表示层 应用层 数据格式转换、加密压缩 SSL/TLS、JPEG、ASCII
5. 会话层 建立、管理、终止会话 NetBIOS、RPC
4. 传输层 传输层 端到端的数据传输控制 TCP、UDP
3. 网络层 网络层 寻址和路由选择 IP、ICMP、路由器
2. 数据链路层 网络接口层 物理寻址、帧传输 MAC、以太网、交换机
1. 物理层 比特流传输 网线、光纤、集线器

核心思想:每一层只与相邻层交互,上层无需关心下层实现细节。就像寄快递,你只需要把包裹交给快递员(应用层→传输层),而不需要关心货车走哪条路(网络层)或轮胎怎么转(物理层)。


二、TCP vs UDP:两种传输哲学

传输层是网络通信的”交通枢纽”,TCP 和 UDP 代表了两种截然不同的设计哲学:

TCP(Transmission Control Protocol)—— 可靠传输

设计理念:像打电话,确保每一句话对方都听清且按顺序收到。

特性 说明
面向连接 通信前需三次握手建立连接,结束后四次挥手断开
可靠传输 通过确认应答(ACK)、超时重传、序号机制确保数据不丢、不错、不乱序
流量控制 滑动窗口机制,防止发送方淹没接收方
拥塞控制 慢启动、拥塞避免、快重传、快恢复,避免网络瘫痪
全双工通信 双方可同时发送和接收数据

三次握手过程

1
2
3
客户端 → SYN(seq=x) → 服务端     (我能连你吗?我的序号是x)
客户端 ← SYN(seq=y, ack=x+1) ← 服务端 (能,我的序号是y,期待x+1)
客户端 → ACK(ack=y+1) → 服务端 (好的,期待y+1)

适用场景:网页浏览(HTTP/HTTPS)、文件传输(FTP)、邮件(SMTP)、数据库事务等要求数据完整性的场景。


UDP(User Datagram Protocol)—— 快速传输

设计理念:像发短信,发出去就不管了,快就完事了。

特性 说明
无连接 不需要建立连接,直接发送数据
不可靠 不保证送达、不保证顺序、不保证不重复
低延迟 头部仅8字节(TCP是20字节),无需握手挥手
无拥塞控制 以恒定速率发送,网络拥塞时可能丢包
支持多播/广播 一个包可发给多个接收者

UDP 头部极简

1
2
3
4
5
6
7
 0      7 8     15 16    23 24    31
+--------+--------+--------+--------+
| 源端口 | 目的端口 |
+--------+--------+--------+--------+
| 长度 | 校验和 |
+--------+--------+--------+--------+
| 数据(可选)... |

适用场景:视频直播、在线游戏、DNS查询、VoIP语音通话、物联网传感器上报等追求实时性、容忍少量丢包的场景。


三、TCP vs UDP 对比总结

维度 TCP UDP
可靠性 ✅ 高(不丢包、按序) ❌ 低(可能丢包、乱序)
速度 较慢(需握手、确认) 极快(即发即走)
连接开销 高(维护连接状态)
数据边界 字节流(无消息边界) 报文(保留消息边界)
头部开销 20字节 8字节
传输模式 1对1 1对1、1对多、多对多
典型应用 网页、文件、邮件 直播、游戏、DNS、视频会议

四、实际应用中的选择

  • 选 TCP:当数据完整性比速度更重要(银行转账、文件下载)
  • 选 UDP:当实时性比完整性更重要(Zoom会议卡一下 vs 延迟3秒)
  • 混合方案:QUIC协议(HTTP/3基础)在UDP上实现可靠性,兼具两者优势

网络协议的选择本质是在可靠性和效率之间做权衡,理解业务需求才能做出正确决策。

channel相关

要理解 nil channel已关闭 channel 的区别,我们先从最基础的定义入手,再结合代码示例和核心行为对比。

一、核心定义

  • nil channel:未初始化的 channel(值为 nil),既不是可读的,也不是可写的,更不能被关闭。
  • 已关闭 channel:通过 close() 函数显式关闭的、初始化过的 channel,关闭后仍有特定的读写行为。

二、关键行为对比(附代码示例)

我们从读操作写操作关闭操作三个维度对比两者的核心差异:

1. nil channel 的行为

nil channel 是“完全阻塞”的,所有操作都会陷入永久阻塞(goroutine 不会 panic,只是卡住)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
"fmt"
"time"
)

func main() {
// 声明但未初始化,是 nil channel
var nilCh chan int

// 1. 写 nil channel:永久阻塞(不会执行后续代码)
go func() {
nilCh <- 10 // 写操作:永久阻塞
fmt.Println("写 nil channel 完成") // 永远不会执行
}()

// 2. 读 nil channel:永久阻塞
go func() {
<-nilCh // 读操作:永久阻塞
fmt.Println("读 nil channel 完成") // 永远不会执行
}()

// 3. 关闭 nil channel:直接 panic
// close(nilCh) // 执行这行会 panic: close of nil channel

// 等待一段时间,观察 goroutine 状态
time.Sleep(2 * time.Second)
fmt.Println("主 goroutine 结束")
}

2. 已关闭 channel 的行为

已关闭的 channel 不会阻塞,但读写行为有明确规则:

  • 写操作:直接 panic;
  • 读操作:先读完 channel 中剩余数据,之后持续读取对应类型的“零值”,且不会阻塞;
  • 重复关闭:直接 panic。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import "fmt"

func main() {
// 初始化并写入数据
closedCh := make(chan int, 2)
closedCh <- 1
closedCh <- 2
close(closedCh) // 关闭 channel

// 1. 读已关闭 channel:先读剩余数据,再读零值
fmt.Println(<-closedCh) // 输出:1(剩余数据)
fmt.Println(<-closedCh) // 输出:2(剩余数据)
fmt.Println(<-closedCh) // 输出:0(int 类型零值,无阻塞)
fmt.Println(<-closedCh) // 输出:0(持续返回零值)

// 2. 写已关闭 channel:panic
// closedCh <- 3 // 执行这行会 panic: send on closed channel

// 3. 重复关闭已关闭 channel:panic
// close(closedCh) // 执行这行会 panic: close of closed channel
}

3. 补充:带“ok”的读操作(区分零值和关闭)

读已关闭 channel 时,推荐用 val, ok := <-ch 形式,okfalse 表示 channel 已关闭且无数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package main

import "fmt"

func main() {
ch := make(chan int, 1)
ch <- 5
close(ch)

// 第一次读:有数据,ok=true
val1, ok1 := <-ch
fmt.Printf("val1=%d, ok1=%t\n", val1, ok1) // val1=5, ok1=true

// 第二次读:无数据,channel 已关闭,ok=false
val2, ok2 := <-ch
fmt.Printf("val2=%d, ok2=%t\n", val2, ok2) // val2=0, ok2=false
}

三、典型使用场景

  • nil channel:常用于“暂时禁用 channel 操作”,比如在 select 中让某个 case 暂时失效:
    1
    2
    3
    4
    5
    6
    7
    8
    // select 中 nil channel 的 case 永远不会被选中
    var ch chan int
    select {
    case <-ch: // 永久阻塞,不会触发
    fmt.Println("触发 ch 读")
    case <-time.After(1 * time.Second):
    fmt.Println("触发超时") // 最终执行这行
    }
  • 已关闭 channel:常用于“通知多个 goroutine 任务结束”(广播通知),比如批量关闭工作协程:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    func worker(id int, quit chan struct{}) {
    for {
    select {
    default:
    fmt.Printf("worker %d 工作中...\n", id)
    time.Sleep(500 * time.Millisecond)
    case <-quit:
    fmt.Printf("worker %d 收到退出信号\n", id)
    return
    }
    }
    }

    func main() {
    quit := make(chan struct{})
    // 启动 3 个工作协程
    for i := 1; i <= 3; i++ {
    go worker(i, quit)
    }
    // 3 秒后关闭 quit channel,通知所有 worker 退出
    time.Sleep(3 * time.Second)
    close(quit)
    time.Sleep(1 * time.Second) // 等待 worker 退出
    fmt.Println("所有 worker 已退出")
    }

总结

  1. nil channel:未初始化,读写永久阻塞,关闭会 panic;常用于 select 中禁用某个 case。
  2. 已关闭 channel:初始化后被显式关闭,写操作/重复关闭会 panic,读操作先读剩余数据再返回零值;常用于广播通知 goroutine 退出。
  3. 核心区分:nil channel 是“未就绪”状态,所有操作阻塞;已关闭 channel 是“已终止”状态,读不阻塞(返回零值)、写 panic。