关于MongDB数据迁移方案的研究

如果说mongodb在设计上有什么缺陷,那数据迁移应该算是不小的一个,在集群内部,不同分片之间的auto-balance问题频出,无法用于实际生产环境,而集群之间的数据迁移也没有给出一个可行的方案.

对于集群内部的负载均衡,我们使用了pre-split,关闭了auto-balance,定期move chunk,并将move chunk分成了copy到目标分片,更新config路由与remove源数据三个步骤,原因在于mongodb原生的move chunk进行数据删除的时候速度并不可控,在数据库压力本来就比较大的时候,高速的删除往往使得数据库的可用性变差,而我们自己进行数据删除的时候可以控制速度.

之所以没有直接修改mongod的代码,直接限制删除速度,是因为在一个mongod实例中中,move chunk是单任务的,数据删除完成之前,无法进行下一个move chunk,而实际上对于应用来说,数据的删除并不是必要的,我们完全可以在copy完数据之后再用较长的时间慢慢删除,一切都是为了数据库的稳定可靠.

当然,解决方案并不能停留在此,更彻底一些的两种方案也被使用.
1. 开发可以并行的move chunk,我们既可以直接在move chunk的代码中限制删除数据的速度,又不影响其他move chunk的进度.
2. 对于数据删除较慢的情况,开发了移走本分片所有数据,直接删除数据文件,停掉数据库,之后重启数据库的方案,效果也被证实是不错的.

实际使用数据库过程中,我们除了在分片之间进行数据迁移之外,还有在集群间进行数据迁移的需求,比如说,我们以性能作为区分搭建了super,middle与low三个数据库,之前有一张业务表tieba建立在low集群中,后来tieba发展迅速,各种请求数量飞速增长,我们需要将tieba这张表迁移到super集群中,这时候问题就来了.

基本的思路是不复杂的,首先,我们拷贝所有的数据文件,使用mongo提供的mongodump或者mongoexport都可以,之后使用mongorestore或者mongoimport将数据导入,结束之后利用mongod存储的oplog进行由dump开始到restore阶段之间数据的更新.

在数据量与请求量比较小的时候,这个方案是没有问题的,一切都很顺利,我们也使用这种方案进行过表在不同集群的迁移,证实了方案的可行性,但是随着数据量与请求量的变大,事情开始变得复杂.

出现的第一个问题是,在mongod中,oplog采用固定大小的磁盘空间进行历史操作的记录,使用的空间被占满后,最旧的操作历史记录将被清掉,这就意味着,我们的dump与restore总的时间不能超过oplog可记录的最大时间,否则之前的迁移策略将不再可用.

出现的第二个问题是,如果数据量本身不是很大,我们可以在oplog记录的时间限制内完成基本数据的拷贝,进行oplog同步的时候,有可能我们的同步速度无法跟上源集群的操作速度,使得集群之间的数据永远无法得到同步.

在第一个问题发生之前,我们意识到了这个问题,并决定采用按照shardkey的范围进行数据迁移的方案,首先将数据进行全量导出(事实证明,数据的导出速度要远远大于数据的导入速度,在目前的数据量下,数据进行全量导出而不是部分导出,并没有成为整个数据迁移过程中的瓶颈,同时未完成整张表的迁移,数据全部导出是必然要进行的操作,当然,如果在你进行操作时发现数据全部导出也花费了很客观的时间,那也可以在导出阶段便对数据进行过滤),按照chunk的范围对导出的多个数据文件命名,之后选择只导入选定范围的数据,以缩短数据导入的时间.

然后,第二个问题出现了,源集群数据的操作太快,我们通过之前的oplog同步方案跟不上.

一开始,我们的同步方案是这样,通过源集群的mongos查到所有的分片,并对每个分片起一个线程,逐条查询位于某个时间点(这个时间点在进行数据导出时进行了记录)之后的所有操作,同时向目标mongos进行同步,这样同步有可能导致实际被执行的oplog顺序与最初的oplog记录顺序不一致,但是可以保证对每个源分片,所有的oplog记录都是有序的,而我们的操作本身并没有多个分片数据之间的依赖性,因此这种乱序是可行的.

在发生oplog的同步速度无法跟上源集群的操作速度时,我们对同步所有的步骤进行了时间测试,发现有约95%的时间花费在了向目标集群进行写的操作上,而这个写的速度远远没有达到目标集群可以承受的最大写入速度,我们试图将safe=True的写入换成了safe=False,发现向目标集群的写入速度提高了5倍左右,由于之前发现safe=False的写入会造成约万分之一的数据错误,不安全写入是不能被采用的,但是我们发现了速度慢的问题之一所在,写入的线程在等待目标集群的返回,而只有有限几个线程在进行数据的写入(等于源集群的分片数目),对这种情况,我们想出了几个解决方案.

第一个解决方案基于使用批量操作减少耗时的前提,经过测试,在使用safe=True的批量操作时,在总数据条数为2w条时,使用1000条每次的批量操作速度是每次一条的五倍左右,在这个基础之上,我们开发了使用buffer进行批量操作的方案.

一开始,我们分析oplog,认为insert操作可以乱序,update操作不可以乱序,并且只能在inser之后进行,remove操作可以乱序,但是只能在insert与update之后进行,基于这个假设,我们在同步oplog时破坏了读源集群数据与写目标集群数据的时间一致性与顺序性,之前是读一条源集群数据,操作到目标集群,循环往复,现在对每个线程,建立了三个buffer,为insert_buffer,update_buffer与remove_buffer,当读到一条源集群操作记录时,按照操作的种类,将记录放到相应的buffer中,在buffer满或者超过指定时间(防止有一部分数据永远无法被操作到目标集群)时,将buffer中的记录操作到目标集群,操作使用以下的过程:如果insert_buffer满了,通过批量写将操作一次性应用到目标集群;如果update_buffer满了,首先将insert_buffer一次性操作至目标集群,之后将update_buffer中的数据一条条操作至目标集群;如果remove_buffer满了,首先将insert_buffer一次性操作至目标集群,之后将update_buffer中的数据一条条操作至目标集群,最后将remove_buffer一次性操作至目标集群.

使用这个方案后,在写入与删除较多的情况下,数据的同步速度可以提高至原来的5倍以上,update较多的情况下速度没有改善.

不过这个方案并不是完备的,方案有一个假设,即insert之间的操作是可以完全乱序的,实际上,由于唯一索引冲突的情况存在,各操作之间的顺序并不是可以完全乱序的,这导致这种解决方案在insert有可能出现唯一索引冲突的情况有可能导致最终数据的不一致.举个简单的例子,有insert_1:{a:1,b:2},remove_1:{a:1,b:2},insert_2:{a:1,b:2}而{a:1,b:1}为唯一索引的情况下,最终的数据{a:1,b:2}是存在的,而被乱序加速之后,可能导致remove_1操作在insert_1与insert_2之后进行,而此时insert_2是失败的,remove_1是成功的,最终{a:1,b:2}不存在.

修正策略在原理上也存在,即对唯一索引冲突增加兼容处理,在将操作分配至buffer之前,为每个操作添一个递增的序列号,当发生操作失败时,将失败的操作记录至redo_buffer,redo_buffer满执行或者定期执行,且按照序列号从小到大依次执行即可.不过由于mongod在进行批量操作时并不能告知你哪一条操作成功或者失败,所以先避过不谈.(希望有一天数据库不再支持唯一索引这种东西,由业务层保证即可,数据库就应该是存数据,取数据的地方,没有理由一条数据进来了,给你返回不能存.).

第二个解决方案是试图通过某个隔离,使得同步操作可以多线程,其实之前对不同的分片已经实现了多线程,前提便是不同的分片之间操作的独立性,而这种独立性的根源便是不同shardkey之间操作的独立性,基于这个前提,我们可以实现在同一个分片使用多线程同步oplog,设线程数为n,将shardkey一一映射为整数,之后对n取模得到i,由第i个线程处理这条记录,为避免操作堵塞,我们设立n个队列,之前从源集群的分片取到oplog操作之后应用到目标集群的线程现在做的事情是:一开始启动n个线程,每个线程有一个负责监控的队列,之后循环,从源集群取到oplog操作记录,由刚刚提到的取模策略将操作记录放到第i个队列,由相应的线程取队列的记录进行实际的操作.

在进行全部为insert操作的测试中,这种取模多线程的方案最大可以将写入速度提高到之前的4倍左右,提高的程度取决于线程数,并不是线程数越高速度越好,具体的线程数目需要实际去实践决定,建议5-10个线程即可.

这个方案的问题在于,对于insert操作与不使用set方式的update操作,oplog的记录总是存在shardkey字段的,但是对于remove操作与使用set方式的update操作,oplog是没有shardkey字段的,我们不知道这条操作属于哪一个shardkey,因此有六种解决方案.
1. 遇到没有shardkey的oplog记录,我们首先等待之前的所有队列被取空之后,应用这条记录,之后再启动写队列-读队列的循环.
2. 虽然oplog的记录没有shardkey,但是我们可以查出来,对于remove操作,由于源集群已经没有这条数据了,我们可以首先在目标集群进行查询,如果查不到,这条记录肯定就在我们存储在本地还没有应用到目标集群的队列中,因此需要再向队列查询(需要一个可以遍历的队列…),查出shardkey之后再放到相应的队列中,对于使用set方式进行更新的update操作,同样的道理,可以在目标集群+本地队列查找,肯定找得到,并且最终找到的肯定是insert记录(_id是不会更新的),insert是带有shardkey的,问题解决.
3. 第三种解决方案稍微不常规一些,即在使用set方式更新记录时,api层面人工加上某个新的字段(shardkey是不能被更新的,哪怕与之前的一样,数据库直接拒绝操作而不进行判断),比如_mk(_mk必须没有被使用过),字段的值与shardkey的值一样,以此当我们发现set方式的update时,可以查找_mk获取shardkey,发现remove时,采取方案一.
4. 第四种解决方案可以说是上面三种方案的终极解决方案,原理很简单,我们修改mongod的代码,使其在进行oplog的所有操作的[‘o’]字段加上shardkey即可.
5. 第五种方案简单粗暴,首先复制源集群的config配置,之后按照分片对分片的方式进行更新,去掉了mongos的分配作用,也不用让oplog再勉为其难做自己难以做到的路由分配的功能.
6. 第六种更为通用一些,可以实现任意操作的乱序,即将oplog取出来应用之前,先检查已有的尚未被应用的队列里是否有产生冲突的记录,具体来说,对insert,查看是否有唯一索引冲突,对update,查看是否有对同一_id的之前的update,对remove,查看是否有同_id的insert与update,如果所有检测都通过,放入队列,如果有任意一项不通过,放入等待队列,并由新的线程定时做重新检测.

几个方案都有可行性,按照最小的代价我们决定首先在方案一的基础上使用方案三,以完成目前的数据迁移,之后为长远考虑,会去实现其余的几个方案.

通过之前的分析,我们可以得出一个很明显的结论,mongod在进行数据库设计时并没有考虑集群间数据迁移这种情况,oplog只是为主从同步而存在,没有丝毫路由分配的设计理念在里面,直接导致了在目前的oplog下使用多线程操作的复杂性.

希望新的版本发布时可以在集群间数据迁移做一些设计上的改进,比如说oplog的改进,等等.