前言

2333我终于考完试了,背垃圾课恶心到反胃的期末终于结束了,这里继续记录总结下,首先回顾Lab2PartA部分(半个月前就写完了)。

正文

首先是LabPartA只完成选主的任务,没有涉及到log的部分,所以在添加参数的时候就不需要关注那些log的部分。

首先我们需要明确,在raft里面每一个节点可能有三个身份leader,follower,candidate,这三者在某些条件下进行互相转换所以我们可以把其看做状态机,然后存在两个计时器,一个计时器heartbeatTimer用来leader发送心跳包,另一个electionTimer用作选举超时,然后利用一个goroutine来维护timer相关的事件,然后在PartA中我们只有两个动作,一个是leader广播心跳包,另一个是candidate发起选举,除了这两个动作,我们还需要几个辅助函数来:一个是状态转换函数convertTo(),还有 一个生成随机时机的函数randTimeDuration(),当然在此之前我们首先要做的是完善raft节点的结构体struct和两个rpc:RequestVote和AppendEntries。

先准备些预定参数结构和函数

type NodeState uint8

const (
    // time.Duration() use nanosecond
    HeartbeatInterval    = time.Duration(120) * time.Millisecond
    ElectionTimeoutLower = time.Duration(300) * time.Millisecond
    ElectionTimeoutUpper = time.Duration(400) * time.Millisecond
)

const (
    Follower  = NodeState(1)
    Candidate = NodeState(2)
    Leader    = NodeState(3)
)

// Prepare for NodeState to use printf
func (s NodeState) String() string {
    switch {
    case s == Follower:
        return "Follower"
    case s == Candidate:
        return "Candidate"
    case s == Leader:
        return "Leader"
    }
    return "Unknown"
}

// Prepare for raft to use printf
func (rf Raft) String() string {
    return fmt.Sprintf("[node(%d), state(%v), term(%d), votedFor(%d)]",
        rf.me, rf.state, rf.currentTerm, rf.votedFor)
}

在实验中我们的时间要根据lab的要求不能只看论文里面的,然后string函数用来格式化输出,定义Follower,Leader,Candidate三个状态。

// Get the random time between lower and upper
func randTimeDuration(lower, upper time.Duration) time.Duration {
    num := rand.Int63n(upper.Nanoseconds()-lower.Nanoseconds()) + lower.Nanoseconds()
    return time.Duration(num) * time.Nanosecond
}

这个函数用来得到一个随机事件在指定的区间内。

现在我们来正式完善一下整个代码,注意何时需要去加锁,何时需要重置选举计时器。我们来完善基本的raft的节点结构体。

type Raft struct {
    mu        sync.Mutex          // Lock to protect shared access to this peer's state
    peers     []*labrpc.ClientEnd // RPC end points of all peers
    persister *Persister          // Object to hold this peer's persisted state
    me        int                 // this peer's index into peers[]

    // Your data here (2A, 2B, 2C).
    // Look at the paper's Figure 2 for a description of what
    // state a Raft server must maintain.

    // 2A
    currentTerm    int
    votedFor       int
    electionTimer  *time.Timer // election timeout
    heartbeatTimer *time.Timer // heartbeat timeout
    state          NodeState   // each node state
}

在2A中我们只需要以上几个参数,这里我使用golang的Timer作为计时器。接下来完成下GetState()函数,这个函数是节点用来返回当前任期和身份

func (rf *Raft) GetState() (int, bool) {
    // Lock the current rf
    rf.mu.Lock()
    defer rf.mu.Unlock()

    var term int
    var isleader bool
    // Your code here (2A).
    term = rf.currentTerm
    isleader = rf.state == Leader
    return term, isleader
}

加下来完成RequestVoteArgs结构体和RequestVoteReply结构体,这两个结构体都是用于请求投票rpc中,然后我们还需要完成RequestVote()函数,这个函数是来完成请求投票这个逻辑动作,最后sendRequestVote()函数是真正进行发送rpc到整个raft网络中(mit应该是对net包进行包装成一个网络环境,来进行在单机上的环境模拟,牛逼还是mit牛逼)。

//
// example RequestVote RPC arguments structure.
// field names must start with capital letters!
//
type RequestVoteArgs struct {
    // Your data here (2A, 2B).

    Term        int // 2A
    CandidateId int // 2A
}

//
// example RequestVote RPC reply structure.
// field names must start with capital letters!
//
type RequestVoteReply struct {
    // Your data here (2A).

    Term        int  // 2A
    VoteGranted bool // 2A if the candidate get this vote
}
//
// example RequestVote RPC handler.
//
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
    // Your code here (2A, 2B).

    // 2A
    rf.mu.Lock()
    defer rf.mu.Unlock()

    // when candidate's term is smaller , or its term equal to candidate and has voted for another candidate
    if args.Term < rf.currentTerm || (args.Term == rf.currentTerm && rf.votedFor != -1) {
        //_, _ = DPrintf("%v didn't vote for node at term %d\n", rf, args.CandidateId, args.Term)
        reply.Term = rf.currentTerm
        reply.VoteGranted = false
        return
    }

    rf.votedFor = args.CandidateId
    rf.currentTerm = args.Term
    reply.VoteGranted = true
    rf.electionTimer.Reset(randTimeDuration(ElectionTimeoutLower, ElectionTimeoutUpper))
    rf.convertTo(Follower)
}

现在我们来分析下RequestVote()这个函数:
因为我们在这里会对rf中的数据进行修改,所以我们需要给其加锁

rf.mu.Lock()
defer rf.mu.Unlock()

当请求投票的term小于当前节点的term,或者当两者term相等时,但此时节点已经投票给了其他节点(比如有另一个候选者发起了投票先得到了rf的投票)

// when candidate's term is smaller , or its term equal to candidate and has voted for another candidate
if args.Term < rf.currentTerm || (args.Term == rf.currentTerm && rf.votedFor != -1) {
    //_, _ = DPrintf("%v didn't vote for node at term %d\n", rf, args.CandidateId, args.Term)
    reply.Term = rf.currentTerm
    reply.VoteGranted = false
    return
}

否则rf就投票给候选者,更新term为候选者的term,VoteGranted成功,重置选举时间,然后rf转换为follower(不管是不是candidate还是本来就是follower)。

rf.votedFor = args.CandidateId
rf.currentTerm = args.Term
reply.VoteGranted = true
rf.electionTimer.Reset(randTimeDuration(ElectionTimeoutLower, ElectionTimeoutUpper))
rf.convertTo(Follower)

然后模仿着请求投票的rpc来写一下添加日志rpc(发送心跳包也是用这个rpc,只不过是发空包)。

// field names must start with capital letters!
type AppendEntriesArgs struct {
    Term     int // 2A
    LeaderId int // 2A
}

// field names must start with capital letters!
type AppendEntriesReply struct {
    Term    int  // 2A
    Success bool // 2A
}

再来看下添加日志这里逻辑怎么写

// AppendEntries RPC handler.
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
    rf.mu.Lock()
    defer rf.mu.Unlock()

    // if the heartbeat term is smaller
    if args.Term < rf.currentTerm {
        reply.Term = rf.currentTerm
        reply.Success = false
        return
    }

    reply.Success = true
    rf.currentTerm = args.Term
    // when follower receive the heartbeat
    rf.electionTimer.Reset(randTimeDuration(ElectionTimeoutLower, ElectionTimeoutUpper))
    // it aims at the candidate, it proves that leader is still alive, stop the election and convert to follower
    rf.convertTo(Follower)
}

因为还是要对rf结构体中的值进行修改所以还是需要加锁,然后注意请求的term小于当前rf的term的话需要拒绝掉。否则就更新rf的term,当rf收到成功添加日志后需要重置选举计时器,然后转换为follower。

然后我们看Make()函数,这个函数负责处理维护raft结构体的状态逻辑。

//
// the service or tester wants to create a Raft server. the ports
// of all the Raft servers (including this one) are in peers[]. this
// server's port is peers[me]. all the servers' peers[] arrays
// have the same order. persister is a place for this server to
// save its persistent state, and also initially holds the most
// recent saved state, if any. applyCh is a channel on which the
// tester or service expects Raft to send ApplyMsg messages.
// Make() must return quickly, so it should start goroutines
// for any long-running work.
//
func Make(peers []*labrpc.ClientEnd, me int,
    persister *Persister, applyCh chan ApplyMsg) *Raft {
    rf := &Raft{}
    rf.peers = peers
    rf.persister = persister
    rf.me = me

    // Your initialization code here (2A, 2B, 2C).
    rf.currentTerm = 0 // initialization is 0
    rf.votedFor = -1   // not to vote
    rf.heartbeatTimer = time.NewTimer(HeartbeatInterval)
    rf.electionTimer = time.NewTimer(randTimeDuration(ElectionTimeoutLower, ElectionTimeoutUpper))
    rf.state = Follower // init is follower

    // deal with the things about the timer
    go func(node *Raft) {
        for {
            select {
            // election timeout
            case <-rf.electionTimer.C:
                rf.mu.Lock() // add lock
                if rf.state == Follower {
                    rf.convertTo(Candidate)
                } else {
                    rf.startElection()
                }
                rf.mu.Unlock() // delete lock

            // deal with when will the leader send the AppendEntries RPC (within the heartbeatTimer)
            case <-rf.heartbeatTimer.C:
                rf.mu.Lock()
                if rf.state == Leader {
                    rf.broadcastHeartbeat()
                    rf.heartbeatTimer.Reset(HeartbeatInterval) // reset the timer to prepare for next heartbeat
                }
                rf.mu.Unlock()
            }
        }
    }(rf)

    // initialize from state persisted before a crash
    rf.readPersist(persister.ReadRaftState())

    return rf
}

任期初始化为0,默认没有投票,创建两个计时器,初始化状态为Follower。

然后维护一个goroutine来作为状态机更新状态,注意何时加解锁,当heartbeatTimer超时时发送一个信号,然后如果此时它是Leader就需要广播心跳,然后充值它的心跳计时器,在此期间都需要加解锁。

当选举计时器超时后,当rf为跟随者,那么他应该转换为候选者,否则它此时已经是一个候选者,本次选举没有成功选出,超时,那么就需要重新开始一次选举startElection()。

然后这个需要一个状态转换函数convertTo(),如果是转换的状态与自己一样那么直接返回就可以了,如果转换为Follower的话,停止heartbeatTimer计时器,重置投票对象;如果转换为Candidate的话,开始一次选举;如果转换为Leader的话,就停止electionTimer计时器,立刻广播心跳,然后充值心跳计时器。

// convert raft peer's state and deal with the condition of each state
func (rf *Raft) convertTo(s NodeState) {
    // if it converts to itself ,return
    if s == rf.state {
        return
    }
    // record the convert error
    _, _ = DPrintf("Term %d: Server %d convert from %v to %v\n", rf.currentTerm, rf.me, rf.state, s)
    rf.state = s
    switch s {
    case Follower:
        rf.heartbeatTimer.Stop() // convert to the follower , so stop to send the heartbeat and stop the heartbeatTimer
        //rf.electionTimer.Reset(randTimeDuration(ElectionTimeoutLower, ElectionTimeoutUpper)) // reset the electionTimer
        rf.votedFor = -1
    case Candidate:
        rf.startElection()
    case Leader:
        rf.electionTimer.Stop() // because convert to leader ,so don't need to elect
        rf.broadcastHeartbeat()
        rf.heartbeatTimer.Reset(HeartbeatInterval) // Reset the heartbeatTimer after send the heartbeat rpc
    }
}

现在我们来关注下最为重要的两个动作startElection(),broadcastHeartbeat():

首先看发起选举这个动作startElection():

// start the election in two conditions that
// one is heartbeat timeout that follower convert to the candidate
// another is the candidate elects timeout and restart a new election.
func (rf *Raft) startElection() {
    rf.currentTerm += 1 // term + 1

    args := RequestVoteArgs{Term: rf.currentTerm, CandidateId: rf.me} // prepare the rpc args
    var voteCount int32                                               // record the voted number

    rf.votedFor = rf.me            // candidate vote for itself
    atomic.AddInt32(&voteCount, 1) // atomic add 1

    // whenever a new election start , reset the raft peer's electionTimer
    rf.electionTimer.Reset(randTimeDuration(ElectionTimeoutLower, ElectionTimeoutUpper))

    for i := range rf.peers {
        // candidate will vote for itself
        if i == rf.me {
            continue
        }
        // use goroutine to send the RequestVote RPC to each raft peer except it self
        // (because it will takes some time to receive the reply)
        go func(server int) {
            var reply RequestVoteReply // record each reply from other peer
            if rf.sendRequestVote(server, &args, &reply) {
                rf.mu.Lock() // need to lock rf when deal with the reply

                // if VoteGranted is true , it's prove candidate's term is larger than follower
                if reply.VoteGranted && rf.state == Candidate {
                    atomic.AddInt32(&voteCount, 1)
                    // if get the major vote , candidate convert to leader
                    if atomic.LoadInt32(&voteCount) > int32(len(rf.peers)/2) {
                        rf.convertTo(Leader)
                    }
                } else {
                    // VoteGranted is false, that has two conditions that
                    // one is it has voted to another candidate
                    // the other is follower's term is larger than the candidate's
                    // so the candidate will update it's term and convert to the follower
                    if reply.Term > rf.currentTerm {
                        rf.currentTerm = reply.Term
                        rf.convertTo(Follower)
                    }
                }
                rf.mu.Unlock()
            } else {
                // send RequestVote failed,record the error
                _, _ = DPrintf("%v send RequestVote RPC to %v failed", rf, server)
            }
        }(i)
    }
}

根据论文描述,当candidate发起选举时,首先增加任期,然后给自己投票,这个声明一个变量voteCount用来记录是否收到超过半数的投票,进行原子操作。然后无论何时开启一次选举都需要重置当前节点的选举计时器。之后给节点列表peers发送投票请求,这个用协程来加速。

来具体看一下这个goroutine匿名函数,reply用来接收rpc返回值,调用sendRequestVote函数发送rpc请求给其他节点,在处理reply时需要给rf进行加锁,当VoteGranted参数为真并且此时rf还是候选者的身份时(因为有可能其他节点已经获胜,或者当前节点已经赢得了选举,已经不是candidate的身份),那么原子加一,并且判断时候获得的选票是否已经超过半数,此时Candidate转换为Leader。

否则VoteGranted为假,存在两种情况,一个是目标已经投票给了其他人,另一个是candidate的term比较小,所以当reply的term大于Candidate的term时候那么candidate更新term并且转换为Follower。

我们再看另一个动作broadcastHeartbeat(),在PartA中因为没有涉及到日志所以比较简单。

// Leader send the heartbeat to each follower
func (rf *Raft) broadcastHeartbeat() {
    args := AppendEntriesArgs{Term: rf.currentTerm, LeaderId: rf.me}

    for i := range rf.peers {
        // skip itself
        if i == rf.me {
            continue
        }
        go func(server int) {
            var reply AppendEntriesReply
            if rf.sendAppendEntries(server, &args, &reply) {
                rf.mu.Lock() // need to lock rf when deal with the reply
                // if the reply's term is larger than the Leader's term
                // leader update its term and convert to the follower
                if reply.Term > rf.currentTerm {
                    rf.currentTerm = reply.Term
                    rf.convertTo(Follower)
                }
                rf.mu.Unlock()
            } else {
                // record the heartbeat error
                _, _ = DPrintf("%v send the heartbeat to %d failed", rf, server)
            }
        }(i)
    }
}

同样先准备参数,然后依次给其他节点发送心跳包(即空包),这里逻辑比较简单就不再赘述了。

看test结果:

总结

这个Lab1还是需要有一个比较清楚的逻辑,这里选用事件循环的逻辑,用状态机的思想对节点进行处理,最重要的还是要明确什么时候需要加锁,好了明天继续把PartB的总结写了。