redis 是一个内存数据库,如果你把进程杀掉,那么里面存储的数据都会消失,那么这篇文章就是来解决 redis 持久化的问题
我们在 redis.conf 文件中增加两个配置
appendonly yes appendfilename appendonly.aof
appenonly表示只追加appendfilename表示追加到那什么文件中
指令: *3\r\n$3\r\nset\r\n$3\r\nkey\r\n$5\r\nvalue\r\n 落在 appendonly.aof 文件中
*3 $3 set $3 key $5 value
这里要实现的功能就是把用户发过来的指令,用 resp 的形式记录在 appendonly.aof 文件中
这个文件是在机器的硬盘上,当 redis 停了之后,内存中的数据都没了,但这个文件会保存下
redis 重启后,会读取这个文件,把之前内存中的数据再次加载回来
定义 aofhandler
在项目下新建文件 aof/aof.go
在里面定义一个 aofhandler 结构体,它的作用就是用来处理 appendonly.aof 文件
type aofhandler struct {
database databaseface.database // 持有 db,db 有业务核心
aoffile *os.file // 持有 aof 文件
aoffilename string // aof 文件名
currentdb int // 当前 db
aofchan chan *payload // 写文件的缓冲区
}
这里有注意的是 aofchan,它是写文件的缓冲区
因为从文件中读取指令,指令是非常密集的,但是将指令写入硬盘时非常慢的,我们又不可能每次都等待指令写完成后再去操作 redis
这时我们就把所有想写入 aof 文件的指令放到 aofchan 中,然后在另一个 goroutine 中去写入硬盘
所以这个 aofchan 的类型是 payload 结构体
type cmdline = [][]byte
type payload struct {
cmdline cmdline // 指令
dbindex int // db 索引
}
aofhandler 结构体定义好之后,我们需要定义一个 newaofhandler 函数来初始化 aofhandler 结构体
还需要定义一个 addaof 方法,用来往 aofchan 中添加指令
放到缓冲区之后,还需要一个方法 handleaof 将指令写入硬盘
最后还要实现一个从硬盘加载 aof 文件到内存的的函数 loadaof
实现 newaofhandler
newaofhandler 函数用来初始化 aofhandler 结构体
func newaofhandler(database databaseface.database) (*aofhandler, error) {
// 初始化 aofhandler 结构体
handler := &aofhandler{}
// 从配置文件中读取 aof 文件名
handler.aoffilename = config.properties.appendfilename
// 持有 db
handler.database = database
// 从硬盘加载 aof 文件
handler.loadaof()
// 打开 aof 文件, 如果不存在则创建
aoffile, err := os.openfile(handler.aoffilename, os.o_append|os.o_create|os.o_rdwr, 0600)
if err != nil {
return nil, err
}
// 持有 aof 文件
handler.aoffile = aoffile
// 初始化 aofchan
handler.aofchan = make(chan *payload, aofbuffersize)
// 启动一个 goroutine 处理 aofchan
go func() {
handler.handleaof()
}()
// 返回 aofhandler 结构体
return handler, nil
}
实现 addaof
addaof 方法用来往 aofchan 中添加指令,它不做落盘的操作
因为在执行指令的时候,等待它落盘的话,效率太低了,所以我们把指令放到 aofchan 中,然后在另一个 goroutine 中去处理
func (handler *aofhandler) addaof(dbindex int, cmdline cmdline) {
// 如果配置文件中的 appendonly 为 true 并且 aofchan 不为 nil
if config.properties.appendonly && handler.aofchan != nil {
// 往 aofchan 中添加指令
handler.aofchan <- &payload{
cmdline: cmdline,
dbindex: dbindex,
}
}
}
实现 handleaof
handleaof 方法用来处理 aofchan 中的指令,将指令写入硬盘
currentdb 记录的是当前工作的 db,如果切换了 db,会在 aof 文件中插入 select 0 这样切换 db 的语句
func (handler *aofhandler) handleaof() {
// 初始化 currentdb
handler.currentdb = 0
// 遍历 chan
for p := range handler.aofchan {
// 如果当前 db 不等于上一次工作的 db,就要插入一条 select 语句
if p.dbindex != handler.currentdb {
// 我们要把 select 0 编码成 resp 格式
// 也就是 *2\r\n$6\r\nselect\r\n$1\r\n0\r\n
data := reply.makemultibulkreply(utils.tocmdline("select", strconv.itoa(p.dbindex))).tobytes()
// 写入 aof 文件
_, err := handler.aoffile.write(data)
if err != nil {
logger.warn(err)
continue
}
// 更新 currentdb
handler.currentdb = p.dbindex
}
// 这里是插入正常的指令
data := reply.makemultibulkreply(p.cmdline).tobytes()
// 写入 aof 文件
_, err := handler.aoffile.write(data)
if err != nil {
logger.warn(err)
}
}
}
实现 aof 落盘功能
我们之前在实现指令的部分,都是直接执行指令,现在我们要把指令写入 aof 文件
我们在 standalonedatabase 结构体中增加一个 aofhandler 字段
type standalonedatabase struct {
dbset []*db
aofhandler *aof.aofhandler // 增加落盘功能
}
然后新建 database 时需要对 aofhandler 进行初始化
func newstandalonedatabase() *standalonedatabase {
// ...
// 先看下配置文件中的 appendonly 是否为 true
if config.properties.appendonly {
// 初始化 aofhandler
aofhandler, err := aof.newaofhandler(database)
if err != nil {
panic(err)
}
// 持有 aofhandler
database.aofhandler = aofhandler
// 遍历 dbset
for _, db := range database.dbset {
// 解决闭包问题
sdb := db
// 为每个 db 添加 addaof 方法
// 这个 addaof 方法是在执行指令的时候调用的
sdb.addaof = func(line cmdline) {
database.aofhandler.addaof(sdb.index, line)
}
}
}
return database
}
这里要注意的是 addaof 方法,它是在执行指令的时候调用的
因为我们需要在指令中调用 addaof 函数,实现指令写入 aof 文件
但是在指令中,���们只能拿到 db,db 上又没有操作 aof 相关的方法,所以我们需要在 db 中增加一个 addaof 方法
type db struct {
index int // 数据的编号
data dict.dict // 数据类型
addaof func(cmdline) // 每个 db 都有一个 addaof 方法
}
然后就在需要落盘的指令中调用 addaof 方法
del 方法需要记录下来,因为 del 方法是删除数据的,如果不记录下来,那么 aof 文件中的数据就会和内存中的数据不一致
// del k1 k2 k3
func del(db *db, args [][]byte) resp.reply {
deleted := db.removes(keys...)
// delete 大于 0 说明有数据被删除
if deleted > 0 {
db.addaof(utils.tocmdline2("del", args...))
}
}
flushdb 方法也需要记录下来,因为 flushdb 方法是删除当前 db 中的所有数据
// flushdb
func flushdb(db *db, args [][]byte) resp.reply {
db.addaof(utils.tocmdline2("flusehdb", args...))
}
rename 和 renamenx 方法也需要记录下来,因为这两个方法是修改 key 的名字
// rename k1 k2
func rename(db *db, args [][]byte) resp.reply {
db.addaof(utils.tocmdline2("rename", args...))
}
// renamenx k1 k2
func renamenx(db *db, args [][]byte) resp.reply {
db.addaof(utils.tocmdline2("renamenx", args...))
}
set 和 setnx 方法也需要记录下来,因为这两个方法是设置数据的
// set k1 v
func set(db *db, args [][]byte) resp.reply {
db.addaof(utils.tocmdline2("set", args...))
}
// setnx k1 v
func setnx(db *db, args [][]byte) resp.reply {
db.addaof(utils.tocmdline2("setnx", args...))
}
getset 方法也需要记录下来,因为这个方法是设置数据的同时返回旧数据
// getset k1 v1
func getset(db *db, args [][]byte) resp.reply {
db.addaof(utils.tocmdline2("getset", args...))
}
实现 loadaof
loadaof 方法用来从硬盘加载 aof 文件到内存
aof 中的指令是符合 resp 协议的,我们就可以把这些指令当成用户发过来的指令,执行就可以了
func (handler *aofhandler) loadaof() {
// 打开 aof 文件
file, err := os.open(handler.aoffilename)
if err != nil {
logger.error(err)
return
}
// 关闭文件
defer func() {
_ = file.close()
}()
// 创建一个 resp 解析器,将 file 传入,解析后的指令会放到 chan 中
ch := parser.parsestream(file)
fackconn := &connection.connection{}
// 遍历 chan,执行指令
for p := range ch {
if p.err != nil {
// 如果是 eof,说明文件读取完毕
if p.err == io.eof {
break
}
logger.error(err)
continue
}
if p.data == nil {
logger.error("empty payload")
continue
}
// 将指令转换成 multibulkreply 类型
r, ok := p.data.(*reply.multibulkreply)
if !ok {
logger.error("exec multi mulk")
continue
}
// 执行指令
rep := handler.database.exec(fackconn, r.args)
if reply.iserrreply(rep) {
logger.error(rep)
}
}
}
到此这篇关于使用go语言实现redis持久化的示例代码的文章就介绍到这了,更多相关go实现redis持久化内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论