浅谈Go中的定时器以及时间轮算法实现

1、为什么要使用时间轮算法

1.1、应用场景

在实现需求的过程中, 经常会遇到如下的一类需求:

  • 在一个间隔时间之后做某事: 例如在最后一次消息发送的5分钟之后, 断开连接
  • 在一个间隔时间之后不停的做某事: 例如每隔5分钟之后刷新内存中的缓存

使用定时器可以方便的实现上述功能. 定时器是一种结构, 它的主要作用是在一个给定的时间间隔之后, 调用一个给定的回调函数或者发出一个信号, 应用可以在回调函数或信号处理函数中实现相应的业务逻辑.

在 Go 中, 标准库 time 包提供了一些基本的定时器相关操作, 常见的用法如下:

func delayOnce() {
    n := time.Now()
    fmt.Println("delayOnce start, ", n)

    // delay 1 second
    <-time.After(time.Second)
    fmt.Println("Cost ", time.Since(n))
}

func delayTicker() {
    n := time.Now()
    fmt.Println("delayTicker start, ", n)

    // ticker 1 second, 3 times
    t := time.NewTicker(time.Second)
    for i := 0; i < 3; i++ {
        <-t.C
        fmt.Println("Tick ", time.Since(n))
    }

    t.Stop()
    fmt.Println("Cost ", time.Since(n))
}

可以看到, 有几种方式来使用定时器:

  1. 通过 time.NewTimer 创建一个定时器, 这样可以在循环中对定时器进行复用, 降低 runtime 的压力
  2. 通过 time.After 得到一个 channel, 当该 channel 可读时即定时器到期触发
  3. 通过 time.AfterFunc 在指定时间间隔后运行一个回调函数

上述三种都是一次性的定时器, 还有一种持续性的定时器(ticker):

  1. 通过 time.NewTicker 创建一个定时器, 该定时器会周期性的向 channel 中发送信号(如果 channel 中还有未读取的信号则直接丢弃当次信号)

1.2、GO中原生定时器的实现

跟golang定时器相关的入口主要有以下几种方法:

<-time.Tick(time.Second)
<-time.After(time.Second)
<-time.NewTicker(time.Second).C
<-time.NewTimer(time.Second).C
time.AfterFunc(time.Second, func() { /*do*/ })
time.Sleep(time.Second)

这里我们以其中NewTicker为入口,NewTicker的源码如下:

func NewTicker(d Duration) *Ticker {
	if d <= 0 {
		panic(errors.New("non-positive interval for NewTicker"))
	}
	c := make(chan Time, 1)
	t := &Ticker{
		C: c,
		r: runtimeTimer{
			// when(d)返回一个runtimeNano() + int64(d)的未来时(到期时间)
			//runtimeNano运行时当前纳秒时间
			when:   when(d),
			period: int64(d), // 被唤醒的时间
			f:      sendTime, // 时间到期后的回调函数
			arg:    c,        // 时间到期后的断言参数
		},
	}
	// 将新的定时任务添加到时间堆中
	// 编译器会将这个函数翻译为runtime.startTimer(t *runtime.timer)
	// time.runtimeTimer翻译为runtime.timer
	startTimer(&t.r)
	return t
}

这里有个比较重要的是startTimer(&t.r)它的实现被翻译在runtime包内

func startTimer(t *timer) {
	if raceenabled {
		racerelease(unsafe.Pointer(t))
	}
	addtimer(t)
}

func addtimer(t *timer) {
	lock(&timers.lock)
	addtimerLocked(t)
	unlock(&timers.lock)
}

上面的代码为了看着方便,我将他们都放在一起

下面代码都写出部分注释


// 使用锁将计时器添加到堆中
// 如果是第一次运行此方法则启动timerproc
func addtimerLocked(t *timer) {
	if t.when < 0 {
		t.when = 1<<63 - 1
	}
	// t.i i是定时任务数组中的索引
	// 将新的定时任务追加到定时任务数组队尾
	t.i = len(timers.t)
	timers.t = append(timers.t, t)
	// 使用数组实现的四叉树最小堆根据when(到期时间)进行排序
	siftupTimer(t.i)
	// 如果t.i 索引为0
	if t.i == 0 {
		if timers.sleeping {
			// 如果还在sleep就唤醒
			timers.sleeping = false
			// 这里基于OS的同步,并进行OS系统调用
			// 在timerproc()使goroutine从睡眠状态恢复
			notewakeup(&timers.waitnote)
		}
		if timers.rescheduling {
			timers.rescheduling = false
			// 如果没有定时器,timerproc()与goparkunlock共同sleep
			// goready这里特殊说明下,在线程创建的堆栈,它比goroutine堆栈大。
			// 函数不能增长堆栈,同时不能被调度器抢占
			goready(timers.gp, 0)
		}
	}
	if !timers.created {
		timers.created = true
		go timerproc() //这里只有初始化一次
	}
}

// Timerproc运行时间驱动的事件。
// 它sleep到计时器堆中的下一个。
// 如果addtimer插入一个新的事件,它会提前唤醒timerproc。
func timerproc() {
	timers.gp = getg()
	for {
		lock(&timers.lock)
		timers.sleeping = false
		now := nanotime()
		delta := int64(-1)
		for {
			if len(timers.t) == 0 {
				delta = -1
				break
			}
			t := timers.t[0]
			delta = t.when - now
			if delta > 0 {
				break // 时间未到
			}
			if t.period > 0 {
				// 计算下一次时间
				// period被唤醒的间隔
				t.when += t.period * (1 + -delta/t.period)
				siftdownTimer(0)
			} else {
				// remove from heap
				last := len(timers.t) - 1
				if last > 0 {
					timers.t[0] = timers.t[last]
					timers.t[0].i = 0
				}
				timers.t[last] = nil
				timers.t = timers.t[:last]
				if last > 0 {
					siftdownTimer(0)
				}
				t.i = -1 // 标记移除
			}
			f := t.f
			arg := t.arg
			seq := t.seq
			unlock(&timers.lock)
			if raceenabled {
				raceacquire(unsafe.Pointer(t))
			}
			f(arg, seq)
			lock(&timers.lock)
		}
		if delta < 0 || faketime > 0 {
			// 没有定时器,把goroutine sleep。
			timers.rescheduling = true
			// 将当前的goroutine放入等待状态并解锁锁。
			// goroutine也可以通过呼叫goready(gp)来重新运行。
			goparkunlock(&timers.lock, "timer goroutine (idle)", traceEvGoBlock, 1)
			continue
		}
		// At least one timer pending. Sleep until then.
		timers.sleeping = true
		timers.sleepUntil = now + delta
		// 重置
		noteclear(&timers.waitnote)
		unlock(&timers.lock)
		// 使goroutine进入睡眠状态,直到notewakeup被调用,
		// 通过notewakeup 唤醒
		notetsleepg(&timers.waitnote, delta)
	}
}

golang使用最小堆(最小堆是满足除了根节点以外的每个节点都不小于其父节点的堆)实现的定时器。golang []*****timer结构如下:

imggolang存储定时任务结构

addtimer在堆中插入一个值,然后保持最小堆的特性,其实这个结构本质就是最小优先队列的一个应用,然后将时间转换一个绝对时间处理,通过睡眠和唤醒找出定时任务,这里阅读起来源码很容易,所以只将代码和部分注释写出。

1.3、为何要引入时间轮

  1. 在定时器的数量增长到百万级之后, 基于最小堆实现的定时器的性能会显著降低, 需要一种更高效的实现
  2. 在有些场景下的使用不是很方便

例如, 服务器维护有对客户端的连接, 并且定时在连接中发送心跳来确保连接的可用性, 一个普遍的实现方式如下:

func onConnect(ctx context.Context, i int) {
    t := time.NewTicker(time.Second)
    n := time.Now()
    for {
        select {
        case <-ctx.Done():
            fmt.Println("Done")
            return
        case <-t.C:
            fmt.Printf("Tick[%v] %v\n", i, time.Since(n))
        }
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    for i := 0; i < 10; i++ {
        go onConnect(ctx, i)
    }

    <-ctx.Done()
    time.Sleep(time.Second) // wait all sub goroutine exit, should use WaitGroup
}

但是这种方式中, 每一个连接就需要新增一个 goroutine, 并且对 goroutine 的清理也会比较复杂. 所以, 如果有一个独立的 goroutine 能够对这些定时任务进行触发, 操作上会方便很多.

2、时间轮是什么

一个时间轮就是一个定时器容器, 该容器可以高效的管理定时器. 思路如下:

  1. 轮盘上有多个插槽
  2. 每个定时器都放置到合适的插槽中
  3. 每次轮询时直接获取最早的插槽中的定时器并触发即可

2.1、单表盘时间轮:

整个定时器只有一个表盘,用循环数组代表表盘,每个槽的时间间隔(tick)代表了定时器能够达到的定时精度,循环数组的大小代表了整个定时器可以定时的时间范围。比如说要一个可以最大定时一分钟的时间轮,其表盘分为60个槽,每隔1秒钟(tick),指针转动一个槽.netty中就使用了这类结构实现时间轮。 单表盘时间轮的好处是实现简单,定时精确,但是也有个问题是所能代表的时间有限。设想一下如果要表示一个计时范围1小时,精度1秒的时间轮,那么循环数组的大小就需要3600,如果标识一天呢,就需要3600*24.

1b80c724801c46f0b5e89c14adf2fadf-image.png

如果任务不只限定在一天之内呢?比如我有个任务,需要每周一上午九点执行,我还有另一个任务,需要每周三的上午九点执行。一种很容易想到的解决办法是:

增大时间轮的刻度

一天24个小时,一周168个小时,为了解决上面的问题,我可以把时间轮的刻度(槽)从12个增加到168个,比如现在是星期二上午10点钟,那么下周一上午九点就是时间轮的第9个刻度,这周三上午九点就是时间轮的第57个刻度,示意图如下:

7499ddb7fa9144d3b5dd6ab9e253256f-image.png

仔细思考一下,会发现这中方式存在几个缺陷:

  1. 时间刻度太多会导致时间轮走到的多数刻度没有任务执行,比如一个月就2个任务,我得移动720次,其中718次是无用功。
  2. 时间刻度太多会导致存储空间变大,利用率变低,比如一个月就2个任务,我得需要大小是720的数组,如果我的执行时间的粒度精确到秒,那就更恐怖了。

2.2、Round类型时间轮

这次我不增加时间轮的刻度了,现在有三个任务需要执行,

  • 任务一每周二上午九点。
  • 任务二每周四上午九点。
  • 任务三每个月12号上午九点。

比如现在是9月11号星期二上午10点,时间轮转一圈是24小时, 任务一下次执行(下周二上午九点),需要时间轮转过6圈后,到第7圈的第9个刻度开始执行。 任务二下次执行第3圈的第9个刻度,任务三是第2圈的第9个刻度。 示意图如下: 在这里插入图片描述 时间轮每移动到一个刻度时,遍历任务列表,把round值-1,然后取出所有round=0的任务执行。 这样做能解决时间轮刻度范围过大造成的空间浪费,但是却带来了另一个问题:时间轮每次都需要遍历任务列表,耗时增加,当时间轮刻度粒度很小(秒级甚至毫秒级),任务列表又特别长时,这种遍历的办法是不可接受的。 当然,对于大多数场景,这种方法还是适用的。有没有既节省空间,又节省时间的办法呢?

2.3、分层时间轮

在层级时间轮中, 将插槽分为多个层次, 每一层的时间轮的插槽范围都会扩大, 例如:

  1. 第一层时间轮有20个插槽, 每个插槽为1秒, 那么第二层时间轮每个插槽为20秒, 第三层为400秒, 依次类推, 除第一层外都是按需创建

  2. 当一个10秒的定时器插入时放置到第一层时间轮中, 100秒的定时器则放置到第二层时间轮

  3. 随着时间的流逝, 高层时间轮中的定时任务会降级重新插入低层的时间轮, 直到触发为止

  4. 每个插槽共享一个触发时间, 这样可以显著降低需要触发的事件的个数

一个示意图如下:

img

3、时间轮的用途

  1. 定时任务
  2. kafka
  3. 故障恢复
  4. 流量控制
  5. 调度算法
  6. 控制网络中的数据包生命周期

好好学习,天天向上