Skip to content

Commit 41cd998

Browse files
committed
kafka
1 parent 21b2f0e commit 41cd998

27 files changed

+278
-2
lines changed

Microservices/readme.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
## 微服务架构
2+
### 服务注册和服务发现
3+
使用 K8S 的 Service 和 DNS:
4+
5+
每个微服务 都在 K8S 中创建一个 Service ,名起名比如: user.xingren.host ,
6+
然后,其他微服务只需要 配置好这个 K8s 中的 Service name 即可,
7+
最后,只要这些微服务服务都在一个 k8S 集群中运行,便可省去注册中心与服务发现的这些微服务组件
8+
9+
! [K8S 在微服务架构下做服务注册中心的一种思路](https://blog.csdn.net/itguangit/article/details/109731971)
10+

golang/array/array_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,3 +106,18 @@ func TestSliceCopy(t *testing.T) {
106106
fmt.Println(c, d)
107107
fmt.Printf("c addres is %p , d address is %p \n", &c[0], &d[0])
108108
}
109+
110+
func TestSliceAdd(t *testing.T) {
111+
a := []int{0}
112+
b := a
113+
114+
fmt.Printf("a address : %p,b address : %p\n", a, b)
115+
116+
a = append(a, 1)
117+
a = append(a, 2)
118+
a = append(a, 3)
119+
a = append(a, 4)
120+
121+
fmt.Printf("a address : %p,b address : %p\n", a, b)
122+
123+
}

golang/concurrent_programming/channel_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,14 @@ func TestNilChannel(t *testing.T) {
2424

2525
time.Sleep(1 * time.Second)
2626
}
27+
28+
func TestChannelBuffer(t *testing.T) {
29+
ch := make(chan int, 5)
30+
31+
for i := 1; i <= 10; i++ {
32+
fmt.Println(i)
33+
ch <- i
34+
}
35+
36+
fmt.Println(<-ch)
37+
}

golang/concurrent_programming/sync.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,4 +479,36 @@ func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, e
479479
```
480480
原理很简单,先加锁,然后判断key是否在map中有值,如果有就waitGroup等待执行,如果没有就add(1)阻塞执行,最后解锁,然后调用call()
481481

482+
# sync map
483+
结构模型
484+
```
485+
type Map struct {
486+
mu Mutex
487+
read atomic.Value // readOnly
488+
dirty map[interface{}]*entry
489+
misses int
490+
}
491+
```
492+
```
493+
type readOnly struct {
494+
m map[interface{}]*entry
495+
amended bool // true if the dirty map contains some key not in m.
496+
}
497+
```
498+
499+
两个map,优先存dirty中,触发一定条件后刷进read中
500+
501+
每次操作dirty都有mutex锁
502+
503+
![sync_map](./sync_map.png)
504+
505+
流程概述:
506+
1. 读的时候先去readonly里面找,找不到就判断amended,如果为true,说明dirty里面可能有,就去dirty里面找,然后....
507+
2. 写的时候先判断readonly是否有,有直接更新,没有就去dirty里面新建一个
508+
3. 相信流程看下面文章链接
509+
510+
! [年度最佳【golang】sync.Map详解](https://segmentfault.com/a/1190000023879083#item-4-2)
511+
512+
# sync pool
513+
! [年度最佳【golang】sync.Pool详解](https://segmentfault.com/a/1190000023878185)
482514

78.9 KB
Loading

golang/concurrent_programming/sync_test.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,36 @@ func TestErrGroup(t *testing.T) {
104104
}
105105
}
106106

107+
type Person struct {
108+
Name string
109+
Age int
110+
}
111+
112+
func initPool() *sync.Pool {
113+
return &sync.Pool{
114+
New: func() interface{} {
115+
fmt.Println("创建一个 person.")
116+
return &Person{}
117+
},
118+
}
119+
}
120+
107121
func TestSyncPool(t *testing.T) {
108-
//pool := sync.Pool{}
122+
pool := initPool()
123+
person := pool.Get().(*Person)
124+
fmt.Println("首次从sync.Pool中获取person:", person)
125+
person.Name = "Jack"
126+
person.Age = 23
127+
pool.Put(person)
128+
pool.Put(person)
129+
pool.Put(person)
130+
pool.Put(person)
131+
pool.Put(person)
132+
pool.Put(person)
133+
pool.Put(person)
134+
pool.Put(person)
135+
fmt.Println("设置的对象Name: ", person.Name)
136+
fmt.Println("设置的对象Age: ", person.Age)
137+
fmt.Println("Pool 中有一个对象,调用Get方法获取:", pool.Get().(*Person))
138+
fmt.Println("Pool 中没有对象了,再次调用Get方法:", pool.Get().(*Person))
109139
}

golang/map/map_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,5 +39,23 @@ func TestMapValue(t *testing.T) {
3939
func TestSyncMap(t *testing.T) {
4040
m := sync.Map{}
4141
m.Store(1, 1)
42+
m.Store(2, 2)
4243
m.Load(1)
44+
m.Load(2)
45+
m.Store(3, 3)
46+
m.Store(4, 4)
47+
m.Load(3)
48+
}
49+
50+
func TestMapAdress(t *testing.T) {
51+
m1 := map[int]int{1: 1}
52+
m2 := m1
53+
54+
fmt.Printf("m1 address : %p,m2 address : %p\n", m1, m2)
55+
56+
for i := 10; i <= 10000; i++ {
57+
m1[i] = i
58+
}
59+
60+
fmt.Printf("m1 address : %p,m2 address : %p\n", m1, m2)
4361
}

golang/memory/gc.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ STW停止整个程序,从根节点开始遍历标记. STW-->遍历-->标记-->
3434

3535

3636
## 垃圾收集的多个阶段
37-
1. 清理终止阶段;
37+
1. 清理终止阶段;z
3838
+ 暂停程序,所有的处理器在这时会进入安全点(Safe point);
3939
+ 如果当前垃圾收集循环是强制触发的,我们还需要处理还未被清理的内存管理单元;
4040
2. 标记阶段;

queue/kafka/image/GC_init.png

409 KB
Loading

queue/kafka/image/GC_process.png

336 KB
Loading
241 KB
Loading

queue/kafka/image/consumer_data.png

328 KB
Loading
372 KB
Loading

queue/kafka/image/consumer_range.png

344 KB
Loading
300 KB
Loading
345 KB
Loading

queue/kafka/image/kafka_index.png

377 KB
Loading
Loading

queue/kafka/image/page_cache.png

274 KB
Loading
401 KB
Loading
297 KB
Loading
107 KB
Loading

queue/kafka/kafka.md

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
#kafka
2+
整体分为三个部分分解,producer,broker,consumer
3+
4+
## producer
5+
### 发送流程
6+
7+
![producer_process](./image/producer_process.png)
8+
9+
### 分区的好处
10+
1. 不同服务器broker的硬盘容量不同
11+
2. 提高并行度(发送,消费)
12+
13+
### produce默认的分区策略
14+
1. 有指定按照指定发送
15+
2. 按照 key 的hash 取模
16+
3. sticky partition模式,先随机一个,然后一段时间内一直使用这个
17+
18+
### producer提高吞吐量
19+
1. 配置batch.size 合适的批次大小
20+
2. liner.ms 等待时间 (默认是0ms)
21+
3. compression 压缩
22+
4. 缓冲区大小
23+
24+
### producer的数据可靠性
25+
broker的ack应答模式有三种 0(落入缓冲区就应答) 1(leader partition写入磁盘就应答) all(所有partition写入磁盘才应答,一般情况下都用all)
26+
27+
引出一个问题: 如果配置是all,然后同步过程中一个follower挂了,是不是leader会一直等待
28+
A: leader节点会维护ISR (in-sync replica set) 如果一个follower长时间没有和leader通信,会被踢到,默认30s
29+
30+
#### 确保发送的数据完全可靠条件
31+
数据完全可靠条件 = AKC->ALL + follower partition >= 2 + ISR应答副本 >= 2
32+
33+
#### producer数据重复发送问题
34+
![producer_repetition](./image/producer_repetition.png)
35+
leader节点在同步完所有follower之后正准备给producer返回ack,但是突然挂了,然后另一个follower成了leader,然后producer一直没收到这条消息的ACK,
36+
于是又发送了一次msg,造成数据重复发送
37+
38+
解决方案:kafka内部支持幂等性,上面的情况再次发送kafka集群是不会存入broker的
39+
40+
原理: 内部有一个实现幂等性的标准 集群的pid+partition+seq_number 不重复,但是这也只能保证不重启之后幂等,因为重启kafka之后,pid会变
41+
42+
解决上述问题的方案: 用事务替代,事务可以自定义transaction_id替代pid实现幂等
43+
44+
### producer发送数据的有序性
45+
目前kafka只能实现单个partition的有序性
46+
47+
## broker
48+
### broker原理 (ZK版本), leader的选举
49+
1. kafka开启之后去zk注册对应的broker_id
50+
2. zk的controller中,那个kafka先注册那个说了算,controller决定了leader的选举
51+
3. 选举规则 在iSR存货为前提,按照AR中的顺序,来轮询作为leader节点
52+
4. controller将选举结果上传到ZK决定leader节点
53+
54+
### broker上下线
55+
ZK中 broker_ids管理节点
56+
#### broker 服役新节点
57+
问题: 之前有[0,1,2]三个broker,一个topic存储在[0,1,2]上,当新上线一个名为3的broker节点之后,怎么把之前的topic分配在3号broker上面的
58+
解答: 可以在配置文件中添加新节点
59+
60+
#### broker 退役旧节点
61+
和服役新节点类似
62+
63+
### broker的副本
64+
副本统称AR , AR = ISR + OSR (和leader通信超时的副本)
65+
66+
leader的颗粒度,每一个partition都有一个leader
67+
68+
### 节点故障处理
69+
![broker_follower_failed](./image/broker_follower_failed.png)
70+
71+
LEO : 每个副本最后一个offset + 1
72+
73+
HW : high watermark , 所有副本的最小LEO
74+
75+
follower节点挂掉之后重新连接,需要追上HW之后,才能重新加入ISR
76+
77+
leader节点故障挂掉之后,选出了一个新的leader,当前旧的leader节点需要把高于当前的HW丢掉,从新从leader同步
78+
79+
### leader partition 自动平衡
80+
问题: 如果某些broker宕机,导致leader partition集中在少数broker上,会导致这几台服务器压力过高,造成集群不平衡
81+
82+
解决:kafka有自动平衡机制,不平衡的leader比率默认10%,默认检查时间300s
83+
84+
不平衡算法:
85+
![leader_auto_balance](./image/leader_partition_auto_balance.png)
86+
87+
生成环境一般不开启自动平衡算法,频繁的选举对性能消耗大,或者增大比率
88+
89+
### kafka的文件存储
90+
91+
#### topic和partition概念
92+
topic只是一个逻辑概念,partition才是一个物理概念
93+
94+
![kafka_file_store](./image/kafka_file_store.png)
95+
96+
#### 文件存储和index索引
97+
![index](./image/kafka_index.png)
98+
99+
#### 文件保存
100+
默认保存7天,7天之后默认删除,也可以配置成压缩
101+
102+
### kafka如何搞笑读写数据
103+
1. 本身是分布式集群,采用了分区技术,数据分散,并行度高
104+
2. 读数据采用稀疏索引,快速定位消费数据
105+
3. 采用顺序写磁盘
106+
4. 页缓存+零拷贝技术
107+
![page_cache](./image/page_cache.png)
108+
109+
## kafka consumer
110+
### process
111+
kafka采用pull的模式,好处是consumer可以自己控制速度,不足是如果kafka没有数据,consumer会陷入循环
112+
113+
![producer_process](./image/consumer_process.png)
114+
115+
### 消费组
116+
每个组内负责消费不同分区数据,一个分区只能由一个消费组内消费
117+
118+
#### 消费组初始化流程
119+
![CG_init](./image/GC_init.png)
120+
121+
#### 消费组消费流程
122+
![CG_process](./image/GC_process.png)
123+
124+
### 消费者分区以及再平衡
125+
问题: 一个CG(consumer group)里面有多个consumer,到底那个consumer消费那个partition呢
126+
![producer_balance](./image/producer_balance.png)
127+
128+
1. range方案
129+
![range](./image/consumer_range.png)
130+
2. roundRobin : hash code 排序分配
131+
3. sticky: 和range类型,不过是随机乱序分配
132+
133+
### offset的维护
134+
以前的存储在zk中,现在存储在一个topic中
135+
136+
#### offset的自动提交
137+
默认是开启自动提交,默认一个MSG到达kafka的partition之后,每隔5S自动提交offset
138+
139+
#### offset的手动提交
140+
分为同步提交和异步提交
141+
142+
#### 指定offset消费
143+
分为指定offset消费和指定时间消费,注意的是客户端需要等到分区初始化完成之后再设置需要指定的offset才生效
144+
145+
### consumer的重读消费问题
146+
![consumer_repetition](./image/consumer_repetition.png)
147+
使用消费事务解决,在数据操作落库之后再提交事务
148+
149+
### consumer数据积压问题
150+
![consumer_data](./image/consumer_data.png)
151+
152+
## kafka的监控
153+
可以使用kafka-eagle监控kafka状态,又名EFAK
154+
155+
## kafka新模式 KRAFT
156+
去掉了依赖的ZK,原理就是用一个broker节点成为controller,让这个替代ZK所做的工作,这个成为的controller的broker也可以成为一个partition节点
157+
158+
159+
160+
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.

0 commit comments

Comments
 (0)