澳门新浦京娱乐场网站-www.146.net-新浦京娱乐场官网
做最好的网站

一个复杂系统的拆分改造实践,支付宝双11的功臣

有些场景下,需要隔离不同的DB,彼此DB之间不能互相访问,但实际的业务场景又需要从A DB访问B DB的情形,这时怎么办?我认为有如下常规的三种方案:

我们都知道阿里双11,除了创造了世界史上的交易奇迹之外,也创造了世界技术史上的奇迹。支付宝的峰值达到了每秒12万笔,这在技术界简直是一个奇迹。为什么说他是一个奇迹呢?简单的来解释一下:其实在日常开发中,打交道最多的就是数据库,好多开发都戏称只会增、删、改。但是千万不要小看增、删、改,因为假设你只有一个用户访问的你的数据库,你怎么写都可以,但是如果有几十万,上百万,上千万,乃至上亿用户呢?如果操作不当,那么你的数据库一定会down掉。所以看上去简单的东西其实一点都不简单,就好像风清扬一样,简单的剑招却蕴含着上千变化。

1、触发器方式
触发器方式是普遍采取的一种增量抽取机制。该方式是根据抽取要求,在要被抽取的源表上建立插入、修改、删除3个触发器,每当源表中的数据发生变化,就被相应的触发器将变化的数据写入一个增量日志表,ETL的增量抽取则是从增量日志表中而不是直接在源表中抽取数据,同时增量日志表中抽取过的数据要及时被标记或删除。为了简单起见,增量日志表一般不存储增量数据的所有字段信息,而只是存储源表名称、更新的关键字值和更新操作类型(KNSEN、UPDATE或DELETE),ETL增量抽取进程首先根据源表名称和更新的关键字值,从源表中提取对应的完整记录,再根据更新操作类型,对目标表进行相应的处理。

1 为什么要拆分?

先看一段对话。

图片 1

从上面对话可以看出拆分的理由:

1)  应用间耦合严重。系统内各个应用之间不通,同样一个功能在各个应用中都有实现,后果就是改一处功能,需要同时改系统中的所有应用。这种情况多存在于历史较长的系统,因各种原因,系统内的各个应用都形成了自己的业务小闭环;

2)  业务扩展性差。数据模型从设计之初就只支持某一类的业务,来了新类型的业务后又得重新写代码实现,结果就是项目延期,大大影响业务的接入速度;

3)  代码老旧,难以维护。各种随意的if else、写死逻辑散落在应用的各个角落,处处是坑,开发维护起来战战兢兢;

4)  系统扩展性差。系统支撑现有业务已是颤颤巍巍,不论是应用还是DB都已经无法承受业务快速发展带来的压力;

图片 2

5)  新坑越挖越多,恶性循环。不改变的话,最终的结果就是把系统做死了。

什么是数据同步服务?顾名思义,就是在不同的系统之间同步数据。根据具体业务目的和应用场景的不同,各种数据同步服务框架的功能侧重点往往不尽相同,因而大家也会用各种大同小异的名称来称呼这类服务,比如数据传输服务,数据采集服务,数据交换服务等等

1.双方提供RESET API,需要访问不同DB数据时,可以通过API来获取指定数据;

这里,我主要想揭秘下oceanbase,因为整个支付宝的交易的库都是依赖于它。oceanbase究竟是什么?用官方的话是这样的:OceanBase是一个支持海量数据的高性能分布式数据库系统,实现了数千亿条记录、数百TB数据上的跨行跨表事务,由淘宝核心系统研发部、运维、DBA、广告、应用研发等部门共同完成。那么以前在没有使用ob之前,支付宝都用的什么呢?mysql或者oracle。这两个一个是开源的数据库,一个是甲骨文公司的商业付费数据库。简单的来说都是人家老外搞得!其实这两个数据库已经很强大了,支付宝以前的框架都是基于这两种数据库的。但是随着业务的发展,这两种数据库也带来了弊端。

例如,对于源表为ORACLE类型的数据库,采用触发器方式进行增量数据捕获的过程如下:

2 拆前准备什么?

至于大数据开发平台的数据同步服务,加上了限定词,那当然是进一步把业务的范围限定在了和数据平台业务相关的一些组件和应用场景之下了。

这种方案优点是隔离性、定制性强,统一出入口,只能通过指定的API访问指定的数据;缺点与优点是对立的,也就是定制性太强,导致每次业务发生变更,需要访问不同数据的时候,需要双方更改API的入参或返参,降低了开发效率;而且无法使用表JOIN,这样在某些情况下也会导致查询数据效率变低。目前主流的方案都是建议使用API方案

-------------------------------------------------------------华丽的分割线-------------------------------------------------------------

这样,对表T的所有DML操作就记录在增量日志表DML_LOG中,注意增量日志表中并没有完全记录增量数据本身,只是记录了增量数据的来源。进行增量ETL时,只需要根据增量日志表中的记录情况,反查源表得到真正的增量数据。
SQL代码
(1)创建增量日志表DML_LOG:
CREATE TABLE DML_LOG(
ID NUMBER PRIMARY KEY, //自增主键
TABLE NAME VARCHAR2(200). //源表名称
RECORD ID NUMBER, //源表增量记录的主键值
DML TYPE CH根(1)。∥增量类型,I表示新增:U表示更新;D表示删除
EXECUTE DATE DATE //发生时间
);

2.1 多维度把握业务复杂度

一个老生常谈的问题,系统与业务的关系?

图片 3

我们最期望的理想情况是第一种关系(车辆与人),业务觉得不合适,可以马上换一辆新的。但现实的情况是更像心脏起搏器与人之间的关系,不是说换就能换。一个系统接的业务越多,耦合越紧密。如果在没有真正把握住业务复杂度之前贸然行动,最终的结局就是把心脏带飞。

如何把握住业务复杂度?需要多维度的思考、实践。

一个是技术层面,通过与pd以及开发的讨论,熟悉现有各个应用的领域模型,以及优缺点,这种讨论只能让人有个大概,更多的细节如代码、架构等需要通过做需求、改造、优化这些实践来掌握。

各个应用熟悉之后,需要从系统层面来构思,我们想打造平台型的产品,那么最重要也是最难的一点就是功能集中管控,打破各个应用的业务小闭环,统一收拢,这个决心更多的是开发、产品、业务方、各个团队之间达成的共识,可以参考《微服务(Microservice)那点事》一文,“按照业务或者客户需求组织资源”。

此外也要与业务方保持功能沟通、计划沟通,确保应用拆分出来后符合使用需求、扩展需求,获取他们的支持。

大数据平台数据同步服务业务场景

讨论场景之前,先来看一下数据同步的目的,为什么我们需要在不同的系统之间进行数据的同步?

从大数据开发平台的角度来说,很显然,是因为我们通常不能直接对线上业务系统所存储或生成的数据进行各种运算或检索处理,组件技术架构是一方面原因,业务安全性隔离是另一方面原因。

所以,我们就需要把这些数据采集到开发平台的各种存储计算组件中来进行加工处理,这个过程也就是所谓的ETL过程。

然后,在开发平台中处理完毕的数据,有时候也并不能或着不适合在大数据开发平台的相关服务中直接使用,需要反馈回线上的业务系统中,这个过程我们称为数据的回写或导出。

最后,即使在大数据开发平台自身的各种存储/计算/查询服务组件之间,因为架构方案,读写方式,业务需求的不同,也可能存在数据的传输同步需求。

从上述三类应用场景来看,我们可以看到,通常来说我们所说的大数据开发平台环境下的数据同步服务,主要处理的是不同系统组件之间的数据导入导出工作。 比如将DB的数据采集到Hive中来,将Hive中的数据导出给HBase之类。也就是输入和输出的数据源是异构的,数据同步的目的是让数据可以适合业务需求的形式,在不同的系统中用各自擅长的方式运转起来。

除此之外,还有另外一种出于数据备份,或者负载均衡的目的而存在的数据同步场景。比如DB的主从同步,HBase集群的Replicator备份等等,他们的输入输出数据源往往是同构的。这类场景下,具体的同步方案和流程通常和系统自身的健康,功能逻辑,服务诉求等有着较强的关联性,所以往往对应的系统会自带同步方案实现,属于系统自身功能实现的一部分,比如MySQL的binlog主从同步复制机制。这类特定系统自带的数据同步架构方案实现,不在本文讨论的范围之类。

2.利用DB的同步技术(如:SQL SERVER的订阅复制、MYSQL的主从复制脚本等)来实现不同DB的数据同步共享

假设我们要撑起上千万的并发量,上百PB,乃至TB的数据量。如何设计?

(2)为DML_LOG创建一个序列SEQ_DML_LOG上,以便触发器写增量日志表时生成ID值。
(3)针对要监听的每一张表,创建一个触发器,例如对表TEST创建触发器如下:
CREATE OR REPLACE TRIGGER T BEFORE INSERT OR UPDATE
OR DELETE ON T FOR EACH ROW
DECLARE 1 DML TYPE VARCHAR2(1);
BEGIN
IF INSERTING THEN L_DML TYPE:= I’;
ELSIF UPDATING THEN I_DML_TYPE:=。TY;
ELSIF DELETING THEN L_DML_TYPE:= D’;
ENDIF;

2.2 定义边界,原则:高内聚,低耦合,单一职责!

业务复杂度把握后,需要开始定义各个应用的服务边界。怎么才算是好的边界?像葫芦娃兄弟一样的应用就是好的!

举个例子,葫芦娃兄弟(应用)间的技能是相互独立的,遵循单一职责原则,比如水娃只能喷水,火娃只会喷火,隐形娃不会喷水喷火但能隐身。更为关键的是,葫芦娃兄弟最终可以合体为金刚葫芦娃,即这些应用虽然功能彼此独立,但又相互打通,最后合体在一起就成了我们的平台。

图片 4

这里很多人会有疑惑,拆分粒度怎么控制?很难有一个明确的结论,只能说是结合业务场景、目标、进度的一个折中。但总体的原则是先从一个大的服务边界开始,不要太细,因为随着架构、业务的演进,应用自然而然会再次拆分,让正确的事情自然发生才最合理。

数据源

业务范围明确了,那么让我们来看看在这种业务场景下,需要处理的数据源可能都有哪些,简单分以一下类,常见的数据源大致可以分为;

  • 关系型数据库类 : 比如 MySql, Oracle, SqlServer, PostgreSQL 等等
  • 文件类:比如日志log,csv,excel等各种传统单机文件
  • 消息队列类:比如kafka和各种MQ
  • 各种大数据相关组件:比如HDFS/Hive/HBase/ES /Cansandra
  • 其它网络接口或服务类:比如FTP/HTTP/Socket 等

这种方案优点是可以在同一个DB访问到另一个DB中所需表的数据,可以直接JOIN,把原来的跨DB访问变成了同一个DB的事情;缺点是依赖DB的同步技术,而且两台DB服务器的网络必需互通,没有完全的隔离,且往往同步过来的表不允许直接修改,或需修改仍然需要跨DB修改或使用方案1的API来进行修改。

方案一、单库(热备)

IF DELETING THEN
INSERT INTO DML_LOG(ID,TABLE_NAME,RECORD—
ID,EXECUTE_DATE,DMLJYPE)
VALUES(SEQ_DML_LOG.NEXTVAL,’TEST ,:OLD.ID,SYSDATE,
L_DML_TYPE);
ELSE
INSERT INTO DML_LOG(ID,TABLE_NAME,RECORD_
ID,EXECUTE_DATE,DMLJYPE)
VALUES(SEQ_DML_LOG.NEXTVAL,。TEST ,:NEW.ID,SYSDATE,L
TIROL_TYPE);
ENDIF;
END;

2.3 确定拆分后的应用目标

一旦系统的宏观应用拆分图出来后,就要落实到某一具体的应用拆分上了。

首先要确定的就是某一应用拆分后的目标。拆分优化是没有底的,可能越做越深,越做越没结果,继而又影响自己和团队的士气。比如说可以定这期的目标就是将db、应用分拆出去,数据模型的重新设计可以在第二期。

现有的解决方案介绍

如上所述,数据同步服务可能涉及到的外部系统多种多样,实际上,但凡能存储或产生数据的系统,都可能成为需要接入数据同步服务的数据源。因此,也不难想象,市面上一定存在众多的解决方案。

为什么呢?很显然,这些各式各样的数据同步服务方案,在不同的业务场景中,无论整体功能定位还是业务覆盖范围都可能千差万别。即使某些方案的业务定位类似,在具体的功能实现方面,大家关注的重点也可能有所区别。此外部分系统在设计的时候,为了保证易用性或者提供一站式的解决方案,其架构和具体的功能逻辑与上下游系统可能还有一定的业务关联性,再加上程序员又喜欢用各种开发语言来折腾一遍, Python/Java/Ruby/Go 。所以,这类服务系统的解决方案想不多也很难啊。

那么常见的解决方案都有哪些呢?

3.通过程序代码实现两个DB的数据同步(增、删、改、查),如:可以定时轮询源DB的A表,然后获取变更的记录(一般是:增、删、改的记录),再通过程序代码把源DB的A表的变更记录批量更新(若是新增、则是插入,若是修改,则是更新,若是删除,则是删除)到目的DB的A表中。

这个方案完全不行,原因不多说了。

2、时间戳方式

2.4 确定当前要拆分应用的架构状态、代码情况、依赖状况,并推演可能的各种异常。

动手前的思考成本远远低于动手后遇到问题的解决成本。应用拆分最怕的是中途说“他*的,这块不能动,原来当时这样设计是有原因的,得想别的路子!”这时的压力可想而知,整个节奏不符合预期后,很可能会接二连三遇到同样的问题,这时不仅同事们士气下降,自己也会丧失信心,继而可能导致拆分失败。

以关系型数据库为主要处理对象的系统:

这种方案的优点是:可以根据实际情况灵活定制同步的表数据,不局限于某一张表或某一个DB,可以保证不同DB间同步表的数据一致性,让本来跨DB操作表变成了同一个DB的事情,而且可以增、删、改、查,功能不受限;缺点是灵活性太强,程序代码实现可靠的跨DB的实时同步逻辑的实现复杂度较高,对于开发人员的要求较高,如果写的同步逻辑无法保证实时、可靠、高可用,那对于业务来讲是灾难性的。

方案二、数据拆分(分库分表)

时间戳方式是指增量抽取时,抽取进程通过比较系统时间与抽取源表的时间戳字段的值来决定抽取哪些数据。这种方式需要在源表上增加一个时间戳字段,系统中更新修改表数据的时候,同时修改时间戳字段的值。有的数据库(例如SQL SERVER)的时间戳支持自动更新,即表的其它字段的数据发生改变时,时间戳字段的值会被自动更新为记录改变的时刻。在这种情下,进行ETL实施时就只需要在源表加上时间戳字段就可以了。对于不支持时间戳自动更新的数据库,这就要求业务系统在更新业务数据时,通过编程的方式手工更新时间戳字段。使用时间戳方式可以正常捕获源表的插入和更新操作,但对于删除操作则无能为力,需要结合其它机制才能完成。

2.5 给自己留个锦囊,“有备无患”。

锦囊就四个字“有备无患”,可以贴在桌面或者手机上。在以后具体实施过程中,多思考下“方案是否有多种可以选择?复杂问题能否拆解?实际操作时是否有预案?”,应用拆分在具体实践过程中比拼得就是细致二字,多一份方案,多一份预案,不仅能提升成功概率,更给自己信心。

Tungsten-replicator

是Continuent公司开发的数据库运维管理的两个相关联的工具产品套件中的一个,负责Oracle 和 MySQL两个数据库之间异构的数据复制同步工作,以及对外导出到比如Redshift,Vertica等数据库,也包括导出到Hadoop环境。

Continuent的另一个产品是tungsten-clustering,为MySQL等数据库提供拓扑逻辑管理,灾备,数据恢复,高可用等功能,很显然这些功能很大程度上是和Replicator相结合的。

我们没有真正使用过Tungsten的产品,只是在架构和代码方面有过一些调研了解,总体感觉,作为商业解决方案,架构完善,但是相对复杂,一些业务流程方案是定制化的,对关系型数据库自身的数据同步和管理以及稳定性应该是它的强项,不过作为一个开放的系统来用的话,接入成本可能有点高。

上述三种方案,第1、2方案基本都是定制化的常规方案,我(梦在旅途,)今天要分享的是第3种方案:跨DB增量(增、改)同步两张表的数据,注意是增量同步,其中删除这个我没有说明,原因是如果DB表中记录是物理删除(即:真实的DELETE),那就无法简单的通过程序代码获取到删除的记录,除非在DB中加入DELETE触发器记录删除记录的主键到临时表或开启更改追踪(CHANGE_TRACKING)或DB日志分析,故本文讲的是不给表、DB增加额外负担的情况实时增量同步,至于删的同步这个我认为最好是逻辑标记删除(过期最后清理【真实删除】),而不要物理删除。

按照业务特点将数据拆分:

更新时间戳:

2.6 放松心情,缓解压力

收拾下心情,开干!

Canal 和 Otter

Canal是阿里的MySql增量数据同步工具,Otter则是构建在Canal之上的数据库远程同步管理工具。两者结合起来,产品的目标范畴大致和Tungsten-replicator差不多。

和Tungsten类似,Canal也是基于MySql的Binlog机制来获取增量数据的,通过伪装MySql的Slave,来获取Binlog并解析增量数据。

这两种方案通常是大家用来接入Mysql binlog的最常见的选择,毕竟MySql Binlog的格式解析模块也是一个相对专业化的格式逆向工程工作,即使不直接使用这两个方案,大家也会借用这两个方案中的binlog Parser模块来做二次开发。

Canal的主要优点是结构流程比较简单,部署起来并不太难,额外做一些配置管理方面的改造就可以更加自动化的使用起来。不过,主要问题是Server和Client之间是一对一的消费。不太适用于需要多消费和数据分发的场景。

我司之前既有对Canal简单的封装应用,也有在借用Canal的Binlog Parser的基础上,开发的DB增量数据分发系统

关于程序代码实现跨DB同步表数据方案,之前已有总结过,详见:https://www.cnblogs.com/zuowj/p/6264711.html  ---》4.利用BCP(sqlbulkcopy)来实现两个不同数据库之间进行数据差异传输(即:数据同步)

垂直拆分以及水平拆分------比如说利用用户的user_id通过hash取模,然后路由到不同的分区。

3、全表删除插入方式

3 实践

阿里的DRC/精卫等

DRC按阿里官方的说法定位于支持异构数据库实时同步,数据记录变更订阅服务。为跨域实时同步、实时增量分发、异地双活、分布式数据库等场景提供产品级的解决方案。

其实精卫也是类似的产品,不过是由不同的团队开发的,除了这两者,阿里内部可能也还有过其它大大小小类似的产品,最后大概都整合合并了,DRC胜出 ;)

从定位说明就可以看到,很明显,除了点对点同步,DRC还需要支持一对多的消费和灵活的消费链路串联,对性能,顺序一致性等方面的要求也可能会因此而变得更加复杂(未必更难实现或要求更苛刻,但是可能有更多不同角度的功能需求),比如,可能需要支持有限时间段内的回溯,和精确定位消费的能力等。

DRC相关系统,阿里并没有开源,不过我司之前和阿里的同学有过一些简单交流,我们也从中了解和学习了一些产品设计和架构方面的思想。

我司的DB增量数据分发系统Pigeon的第一版,就是在Canal的Binlog解析代码模块基础上,参照DRC的部分思想进行开发的。大致的方案是将前端的Parser对接到消息队列上,让消息队列来承担消息持久化和分发的工作,此外在Server层面再辅助以服务节点和消费链路的动态管理,负载均衡,加上数据的过滤,转换和分发模块及策略的管理,消费端SDK的封装等工作。

 之前的文章同步主要是基于TranFlag标记字段 或触发器来实现同步,这种方式必需对表数据的增、删、改逻辑都有要求与规范,也就是增、改必需更改TranFlag=0,删必需记录表删除临进表中,这样才能实现同步逻辑,而今天是在这个同步基础上(BCP),不给表、DB增加额外负担的情况实时增量同步,对数据源的插入、改动没有要求。

这么做带来的问题有两个:1、当数据/负载增加时,需要人工介入,代价非常大。

全表删除插入方式是指每次抽取前先删除目标表数据,抽取时全新加载数据。该方式实际上将增量抽取等同于全量抽取。对于数据量不大,全量抽取的时间代价小于执行增量抽取的算法和条件代价时,可以采用该方式。

3.1 db拆分实践

DB拆分在整个应用拆分环节里最复杂,分为垂直拆分和水平拆分两种场景,我们都遇到了。垂直拆分是将库里的各个表拆分到合适的数据库中。比如一个库中既有消息表,又有人员组织结构表,那么将这两个表拆分到独立的数据库中更合适。

水平拆分:以消息表为例好了,单表突破了千万行记录,查询效率较低,这时候就要将其分库分表。

图片 5

以日志或消息队列为主要业务对象的系统

这类系统一开始可能是以日志查询为主要业务场景,其中的数据同步服务相关组件,有些是独立的组件,有些则是一整套采集,计算,展示等完整的业务方案中的一部分。不过随着架构的不断发展和成熟,有些系统渐渐的也不仅仅是只定位于处理日志类业务场景了,开始向通用数据采集传输服务的角色靠近。

代码如下:(以下同步适用于SQL SERVER 不同DB的表增量同步)

2、select查询有时候需要便利所有的分区,速度非常慢。

4、全表比对方式

3.1.1 主键id接入全局id发生器

DB拆分的第一件事情就是使用全局id发生器来生成各个表的主键id。为什么?

举个例子,假如我们有一张表,两个字段id和token,id是自增主键生成,要以token维度来分库分表,这时继续使用自增主键会出现问题。

图片 6

正向迁移扩容中,通过自增的主键,到了新的分库分表里一定是唯一的,但是,我们要考虑迁移失败的场景,如下图所示,新的表里假设已经插入了一条新的记录,主键id也是2,这个时候假设开始回滚,需要将两张表的数据合并成一张表(逆向回流),就会产生主键冲突!

图片 7

因此在迁移之前,先要用全局唯一id发生器生成的id来替代主键自增id。这里有几种全局唯一id生成方法可以选择。

1)snowflake:;(非全局递增)

2) mysql新建一张表用来专门生成全局唯一id(利用auto_increment功能)(全局递增);

3)有人说只有一张表怎么保证高可用?那两张表好了(在两个不同db),一张表产生奇数,一张表产生偶数。或者是n张表,每张表的负责的步长区间不同(非全局递增)

4)……

我们使用的是阿里巴巴内部的tddl-sequence(mysql 内存),保证全局唯一但非递增,在使用上遇到一些坑:

1)对按主键id排序的sql要提前改造。因为id已经不保证递增,可能会出现乱序场景,这时候可以改造为按gmt_create排序;

2)报主键冲突问题。这里往往是代码改造不彻底或者改错造成的,比如忘记给某一insert sql的id添加#{},导致继续使用自增,从而造成冲突;

图片 8

Flume

Flume现在大家用的多的,是Flume-NG这个redesign过的版本,它的定位是离线的日志采集,聚合和传输。Flume的特点是在聚合传输这块花了比较多的力气,特别是早期的版本,需要配置各种节点角色,而在NG版本的设计中,其拓扑逻辑已经简化成只有Agent一个单一角色了

图片 9

通过Agent的串联可以构建出复杂的数据传输链路,此外还有事务的机制设计来确保链路传输的可靠性。 不过,个人觉得,由于Kafka等通用消息队列的广泛使用,Flume在聚合,传输这方面的作用,在一些场景下其实是可以通过其它方式来实现和弱化的(比如没有网络带宽或远程二次传输问题的场景中)。

我司的日志链路中,也没有使用Flume,而是采用了自主研发的Agent采集器直接对接Kafka(当然,一些场景下,也未必绝对合理)

            try
            {
                SqlConnection obConnSrc = new SqlConnection(connLMSStr);
                SqlConnection obConnDest = new SqlConnection(mconnCCSStr);

                string lastTamp = ClsDatabase.gGetFieldValue(obConnSrc, "update TS_SyncUptime set UPTime=GETDATE() OUTPUT (deleted.LastUPstamp) as oldtamp FROM TS_CCSUptime WHERE TableName=N'tableNameA'", "oldtamp");


                string selectSql = @"SELECT id,aaa,bbb,ccc,ddd,eee,fff  
                                  FROM tableNameA WHERE 其它同步过滤查询条件 AND CONVERT(bigint,sys_tamp)>{0}";

                selectSql = string.Format(selectSql, lastTamp);

                master.TransferBulkCopy(selectSql, obConnSrc,
                                "tableNameA", obConnDest,
                                 (stable) =>
                                 {
                                     var colMaps = new Dictionary<string, string>();
                                     foreach (DataColumn col in stable.Columns)
                                     {
                                         colMaps.Add(col.ColumnName, col.ColumnName);
                                     }
                                     return colMaps;
                                 },
                                 (tempTableName, stable, destConn, srcConn) =>
                                 {
                                     StringBuilder saveSqlBuilder = new StringBuilder("begin tran"   Environment.NewLine);

                                     string IUSql = master.BuildInsertOrUpdateToDestTableSql("tableNameA", tempTableName, new[] { "id" }, stable.ExtendedProperties[master.MapDestColNames_String], 2);
                                     saveSqlBuilder.Append(IUSql);

                                     saveSqlBuilder.AppendLine("commit");

                                     ClsDatabase.gExecCommand(destConn, saveSqlBuilder.ToString());


                                     ClsDatabase.gExecCommand(srcConn, "update TS_SyncUptime set UPTime=GETDATE(),LastUPstamp=CONVERT(bigint,sys_tamp) FROM TS_SyncUptime WHERE TableName=N'tableNameA'");

                                     return false;
                                 });


            }
            catch (Exception ex)
            {
                writeLog(ex);//记错误日志
            }

3、每一台机器都要主从同步,管理起来太麻烦。

全表比对即在增量抽取时,ETL进程逐条比较源表和目标表的记录,将新增和修改的记录读取出来。优化之后的全部比对方式是采用MD5校验码,需要事先为要抽取的表建立一个结构类似的MD5临时表,该临时表记录源表的主键值以及根据源表所有字段的数据计算出来的(BI)

3.1.2 建新表&迁移数据&binlog同步

1)  新表字符集建议是utf8mb4,支持表情符。新表建好后索引不要漏掉,否则可能会导致慢sql!从经验来看索引被漏掉时有发生,建议事先列计划的时候将这些要点记下,后面逐条检查;

2)  使用全量同步工具或者自己写job来进行全量迁移;全量数据迁移务必要在业务低峰期时操作,并根据系统情况调整并发数;

3)  增量同步。全量迁移完成后可使用binlog增量同步工具来追数据,比如阿里内部使用精卫,其它企业可能有自己的增量系统,或者使用阿里开源的cannal/otter:

增量同步起始获取的binlog位点必须在全量迁移之前,否则会丢数据,比如我中午12点整开始全量同步,13点整全量迁移完毕,那么增量同步的binlog的位点一定要选在12点之前。

位点在前会不会导致重复记录?不会!线上的MySQL binlog是row 模式,如一个delete语句删除了100条记录,binlog记录的不是一条delete的逻辑sql,而是会有100条binlog记录。insert语句插入一条记录,如果主键冲突,插入不进去。

LogStash

LogStash是著名的ELK套件中的一个组件,负责日志采集和转换,另外ES负责存储和检索,Kibana负责查询结果的展示

LogStash从设计上来看,在数据传输和链路串联方面的考量就简单了很多,它的重点放在了数据的转换处理上。所以它在过滤器,编解码器等环节下了很多的功夫,比如支持grok脚本做过滤器逻辑开发,在内部链路上还有各种的buffer设计,用来支持数据的合并,转换,条件触发输出等功能

应该说从数据转换处理的角度来看,个人觉得LogStash的设计已经足够灵活和完备了。不过,它的主体实现语言是Ruby...

 上述同步代码逻辑很简单,可以参照之前的文章,这里主要是说明几个重要点:

方案三、参考google的bigtable

MD5校验码,每次进行数据抽取时,对源表和MD5临时表进行MD5校验码的比对,如有不同,进行UPDATE操作:如目标表没有存在该主键值,表示该记录还没有,则进行INSERT操作。

3.1.3 联表查询sql改造

现在主键已经接入全局唯一id,新的库表、索引已经建立,且数据也在实时追平,现在可以开始切库了吗?no!

考虑以下非常简单的联表查询sql,如果将B表拆分到另一个库里的话,这个sql怎么办?毕竟跨库联表查询是不支持的!

图片 10

因此,在切库之前,需要将系统中上百个联表查询的sql改造完毕。

如何改造呢?

1) 业务避免

业务上松耦合后技术才能松耦合,继而避免联表sql。但短期内不现实,需要时间沉淀;

2) 全局表

每个应用的库里都冗余一份表,缺点:等于没有拆分,而且很多场景不现实,表结构变更麻烦;

3) 冗余字段

就像订单表一样,冗余商品id字段,但是我们需要冗余的字段太多,而且要考虑字段变更后数据更新问题;

4) 内存拼接

4.1)通过RPC调用来获取另一张表的数据,然后再内存拼接。1)适合job类的sql,或改造后RPC查询量较少的sql;2)不适合大数据量的实时查询sql。假设10000个ID,分页RPC查询,每次查100个,需要5ms,共需要500ms,rt太高。

图片 11

4.2)本地缓存另一张表的数据

适合数据变化不大、数据量查询大、接口性能稳定性要求高的sql。

图片 12

Camus

Camus严格的说算不上是一个框架,它是Linkedin开发的基于Kafka消费日志,批量写入Hdfs的一个工具,不过用的人也不少,所以提一下,我司之前也有大量的日志是通过Camus来采集的。(话说Linkedin是把自家的kafka用到极致了,各种链路但凡能依托kafka实现的,大概都不会考虑其它的实现方式)

Camus的架构方案,基本上就是写了一个MR任务,实现批量从Kafka读取日志并写入Hdfs,此外自身维护了kafka中各个topic的消费进度。用它来做kafka topic的简单映射采集,稍微做一点管理方面的适配开发,基本还是可行的。不过,它的缺点主要是对Topic进行定制化的处理比较困难,需然也提供了一些Hook接口,但是毕竟架构过于简单,对数据进行一些Per topic的过滤转换工作,就有点力不从心了。

1.TS_SyncUptime表用于记录与管理同步任务的信息,主要包含如下几个字段:

主要是将一个bigtable拆分成几百万个子表(主键有序)。

然后,还需要对在源表中已不存在而目标表仍保留的主键值,执行DELETE操作。

3.1.4切库方案设计与实现(两种方案)

以上步骤准备完成后,就开始进入真正的切库环节,这里提供两种方案,我们在不同的场景下都有使用。

a)DB停写方案

图片 13

优点:快,成本低;

缺点:

1)如果要回滚得联系DBA执行线上停写操作,风险高,因为有可能在业务高峰期回滚;

2)只有一处地方校验,出问题的概率高,回滚的概率高

举个例子,如果面对的是比较复杂的业务迁移,那么很可能发生如下情况导致回滚:

sql联表查询改造不完全;

sql联表查询改错&性能问题;

索引漏加导致性能问题;

字符集问题

此外,binlog逆向回流很可能发生字符集问题(utf8mb4到gbk),导致回流失败。这些binlog同步工具为了保证强最终一致性,一旦某条记录回流失败,就卡住不同步,继而导致新老表的数据不同步,继而无法回滚!

b)双写方案

图片 14

第2步“打开双写开关,先写老表A再写新表B”,这时候确保写B表时try catch住,异常要用很明确的标识打出来,方便排查问题。第2步双写持续短暂时间后(比如半分钟后),可以关闭binlog同步任务。

优点:

1)将复杂任务分解为一系列可测小任务,步步为赢;

2)线上不停服,回滚容易;

3)字符集问题影响小

缺点:

1)流程步骤多,周期长;

2)双写造成RT增加

另外一些想不出怎么强行分类的数据同步解决方案 ;)

 图片 15

好处:1、数据不会丢失(hdfs),故障迁移,可扩展。2、子表有序,查询快。

5、日志表方式

3.1.5 开关要写好

不管什么切库方案,开关少不了,这里开关的初始值一定要设置为null!

如果随便设置一个默认值,比如”读老表A“,假设我们已经进行到读新表B的环节了。这时重启了应用,在应用启动的一瞬间,最新的“读新表B”的开关推送等可能没有推送过来,这个时候就可能使用默认值,继而造成脏数据!

Sqoop :

Sqoop大家应该不陌生了,即使没用过总应该也听过,也有不少公司使用Sqoop来构建自己的大数据平台数据采集同步方案。Sqoop从一开始,就几乎是完全定位于大数据平台的数据采集业务的,整体框架以Hadoop为核心,包括任务的分布执行这些,多半都是依托MR任务来实现的。数据同步的工作,也是以任务的方式提交给Server来执行,以服务的形式对外提供业务支持。

Sqoop的处理流程,定制化程度比较高,主要通过参数配置的方式来调整组件行为,在用户自定义逻辑和业务链路流程方面能力比较弱,另外,依托于MR的任务处理方式,在功能拓展方面也有一些约束和局限性。此外各种数据源的输入输出实现部分,稳定性和工程实现细节方面,也只是可用,但算不上完善和成熟。

我司也没有使用Sqoop来构建大数据平台的数据采集,导入导出服务。上述原因虽然是一方面原因,但绝对不是主要的原因。最主要原因还是因为数据的采集和导入导出服务体系,具体的输入输出模块的构建只是一部分内容。更重要的是要构建任务的配置,管理,监控,调度等服务,以及对整个数据同步业务流程和生命周期的封装,和对用户交互体验及产品形态的完善。理想中,需要和开发平台整体开发环境深度集成。

TableName:要同步的表名,UPTime每一次同步的触发时间点(可更改),sys_tamp行变更时间戳(不可更改),LastUPstamp行最后有效变量时间戳(可以更新)

这样的话,方案就生成了,参考bigtable,在hbase的开源基础上自己开发一套。后来经过验证发现不行,因为,首先hbase的开源不彻底,每台单机支持的数据有限,然后是必须引入分布式事务2PC,一般时间在2~5s左右,因为对于hbase这种nosql只保证单行事务,如果要跨行跨表操作是支持不了的。并且分布式事务太耗时,所以这个方案只能抛弃!

对于建立了业务系统的生产数据库,可以在数据库中创建业务日志表,当特定需要监控的业务数据发生变化时,由相应的业务系统程序模块来更新维护日志表内容。增量抽取时,

3.2 拆分后一致性怎么保证?

以前很多表都在一个数据库内,使用事务非常方便,现在拆分出去了,如何保证一致性?

1)分布式事务

性能较差,几乎不考虑。

2)消息机制补偿(如何用消息系统避免分布式事务?)

3)定时任务补偿

用得较多,实现最终一致,分为加数据补偿,删数据补偿两种。

DataX

DataX是阿里开源的一款插件式的,以通用的异构数据交换为目标的产品。其核心思想,简单的说,就是之前阿里的同学写各种数据源之间的同步工具,都是点对点的实现,写多了以后,发现这种两两之间网状链路的开发代价比较高。而DataX呢,是通过标准化的输入输出模块,将点对点的实现变成了星形的拓扑结构,增加一个数据源只要单独写这个数据源的输入输出实现模块就好了。

其实,这个思路也没什么大不了的,和前面的Flume/LogStash等的思想并没有本质的差别,人家一开始就没有走网状结构的路 :)

不过,DataX的特点是内部的结构更加简单一些,没有channel啊之类的概念,不具备持久化能力,也没打算构建复杂的数据流动链路。你可以认为它本质上就是将两个数据源之间点对点的传输工作模块化标准化了,最终构建出来的,还是一个简单的进程内直连的数据读写链路。

此外从一开始,DataX的目标就是在简化新链路开发代价的基础上,追求数据的传输效率。比如,使用了Ringbuffer类的技术来做input和output模块之间的数据转发工作。

因为DataX简单和标准化的特点,所以也有不少公司基于DataX来构建自己的异构数据交换服务系统。

当然,DataX也在持续改进中,目前的3.0版本在作业的分片处理,业务容错,数据转换,流量控制等方面也做了不少的功能拓展。

2.具体关键同步逻辑如下:

在设计oceanbase的时候,目标是支持10w tps,100w qps,100TB 数据,难道没有方案了么?-------------------------------------------------------------华丽的分割线-------------------------------------------------------------

通过读日志表数据决定加载哪些数据及如何加载。日志表的维护需要由业务系统程序用代码来完成。

3.3 应用拆分后稳定性怎么保证?

一句话:怀疑第三方防备使用方做好自己!

图片 16**

1)怀疑第三方

a)防御式编程,制定好各种降级策略;

  • 比如缓存主备、推拉结合、本地缓存……

b)遵循快速失败原则,一定要设置超时时间,并异常捕获;

c)强依赖转弱依赖,旁支逻辑异步化

  • 我们对某一个核心应用的旁支逻辑异步化后,响应时间几乎缩短了1/3,且后面中间件、其它应用等都出现过抖动情况,而核心链路一切正常;

d)适当保护第三方,慎重选择重试机制

2)防备使用方

a)设计一个好的接口,避免误用

  • 遵循接口最少暴露原则;很多同学搭建完新应用后会随手暴露很多接口,而这些接口由于没人使用而缺乏维护,很容易给以后挖坑。听到过不只一次对话,”你怎么用我这个接口啊,当时随便写的,性能很差的“;
  • 不要让使用方做接口可以做的事情;比如你只暴露一个getMsgById接口,别人如果想批量调用的话,可能就直接for循环rpc调用,如果提供getMsgListByIdList接口就不会出现这种情况了。
  • 避免长时间执行的接口;特别是一些老系统,一个接口背后对应的可能是for循环select DB的场景。

b)容量限制

  • 按应用优先级进行流控;不仅有总流量限流,还要区分应用,比如核心应用的配额肯定比非核心应用配额高;
  • 业务容量控制。有些时候不仅仅是系统层面的限制,业务层面也需要限制。举个例子,对saas化的一些系统来说,”你这个租户最多1w人使用“。

3)做好自己

a)单一职责

b)及时清理历史坑

  • 例:例如我们改造时候发现一年前留下的坑,去掉后整个集群cpu使用率下降1/3

c) 运维SOP化

  • 说实话,线上出现问题,如果没有预案,再怎么处理都会超时。曾经遇到过一次DB故障导致脏数据问题,最终只能硬着头皮写代码来清理脏数据,但是时间很长,只能眼睁睁看着故障不断升级。经历过这个事情后,我们马上设想出现脏数据的各种场景,然后上线了三个清理脏数据的job,以防其它不可预知的产生脏数据的故障场景,以后只要遇到出现脏数据的故障,直接触发这三个清理job,先恢复再排查。

d)资源使用可预测

  • 应用的cpu、内存、网络、磁盘心中有数
    • 正则匹配耗cpu
    • 耗性能的job优化、降级、下线(循环调用rpc或sql)
    • 慢sql优化、降级、限流
    • tair/redis、db调用量要可预测
    • 例:tair、db

举个例子: 某一个接口类似于秒杀功能,qps非常高(如下图所示),请求先到tair,如果找不到会回源到DB,当请求突增时候,甚至会触发tair/redis这层缓存的限流,此外由于缓存在一开始是没数据的,请求会穿透到db,从而击垮db。

图片 17

这里的核心问题就是tair/redis这层资源的使用不可预测,因为依赖于接口的qps,怎么让请求变得可预测呢?

如果我们再增加一层本地缓存(guava,比如超时时间设置为1秒),保证单机对一个key只有一个请求回源,那样对tair/redis这层资源的使用就可以预知了。假设有500台client,对一个key来说,一瞬间最多500个请求穿透到Tair/redis,以此类推到db。

图片 18

再举个例子:

比如client有500台,对某key一瞬间最多有500个请求穿透到db,如果key有10个,那么请求最多可能有5000个到db,恰好这些sql的RT有些高,怎么保护DB的资源?

可以通过一个定时程序不断将数据从db刷到缓存。这里就将不可控的5000个qps的db访问变为可控的个位数qps的db访问。

图片 19

Heka

Heka是Mozilla开源的一套流式数据采集和分析工具,最主要的架构实现,其实也就是数据采集同步这部分框架。整体的结构设计和LogStash等系统看起来大同小异。这个系统,我并没有做过实际的实践应用,只是简单了解了一下产品设计,提它呢,是因为架构看起来也相对比较完善,另外,它是用Go写的,偏底层后端服务开发的同学可能会喜欢。

2.1先更新TS_SyncUptime表,以便触发sys_tamp行变更时间戳发生改变(相当于记录同步触发时间点),在更改的同时取出LastUPstamp行最后有效变更时间戳(相当于上次同步的触发时间点)

既要有非关系数据库的海量数据存储,还要有关系型数据库的事务,到底如何该解决呢?

6、系统日志分析方式

4  总结

1)做好准备面对压力!

2)复杂问题要拆解为多步骤,每一步可测试可回滚!

这是应用拆分过程中的最有价值的实践经验!

3)墨菲定律:你所担心的事情一定会发生,而且会很快发生,所以准备好你的SOP(标准化解决方案)! 

某个周五和组里同事吃饭时讨论到某一个功能存在风险,约定在下周解决,结果周一刚上班该功能就出现故障了。以前讲小概率不可能发生,但是概率再小也是有值的,比如p=0.00001%,互联网环境下,请求量足够大,小概率事件就真发生了。

4)借假修真

这个词看上去有点玄乎,顾名思义,就是在借者一些事情,来提升另外一种能力,前者称为假,后者称为真。在任何一个单位,对核心系统进行大规模拆分改造的机会很少,因此一旦你承担起责任,就毫不犹豫地全力以赴吧!不要被过程的曲折所吓倒,心智的磨砺,才是本真。

数据交换服务产品设计和需求分析

从前面的业务场景讨论和市面上常见的系统的介绍中,你应该不难看出,数据同步是一个业务覆盖范围很广的术语,具体的产品形态设计和功能需求,其实在很大程度上取决于你所定位的业务的职能范围。

我司的大数据开发平台中,数据交换服务系统的定位,和DataX比较类似,系统的功能和产品形态定位,是异构数据源之间的点对点数据读写链路的构建。至于比如业务端的数据采集,数据分级传输链路的构建,增量数据的分发,数据库同步拓扑逻辑管理等环节,并不在我们的数据交换服务系统的功能定义范围之内,这些环节并不是不重要,只是在我司的实践中,是由其它的系统来独立提供服务的。

而点对点的数据读写链路服务产品的组成,又可以分为两部分,一是底层具体承载了单个数据交换任务的插件式的数据交换组件,二是上层的数据交换任务管控平台,其职能范围不仅包括系统和任务自身的配置运行管理,有时候还需要考虑针对上下游系统和具体业务的一些特性进行流程上的适配和定制。

下面的讨论基于上述产品定位展开:

2.2使用LastUPstamp作为过滤条件,查询>源DB的源表中时间戳字段,这样就可以查询出自上一次同步触发点到当前时间待同步的记录(增、改)

经过数据分析,发现了隐藏在数据中的一个秘密:虽然业务线的数据量庞大,但是修改量实际很少。这个怎么理解。

该方式通过分析数据库自身的日志来判断变化的数据。关系犁数据库系统都会将所有的DML操作存储在日志文件中,以实现数据库的备份和还原功能。ETL增晕抽取进程通过对数据库的日志进行分析,提取对相关源表在特定时间后发生的DML操作信息,就可以得知自上次抽取时刻以来该表的数据变化情况,从而指导增量抽取动作。有些数据库系统提供了访问日志的专用的程序包(例如ORACLE的LOGMINDER),使数据库日志的分析工作得到大大简化。

数据交换底层组件

底层组件设计需要关注的地方,在前面的各种开源系统的介绍中,其实大多都已经涉及到了。

2.3利作BCP执行同步(详见之前文章说明)

我打个比喻:

、特定数据库方式(ORACLE)
以下介绍常见的针对特有数据库系统的增景抽取方式。
7.1 ORACLE改变数据捕获(CHANGEDDATACAPTURE,CDC)方式:ORACLECDC特性是在ORAELE9I数据库中引入的。CDC能够帮助识别从上次抽取之后发生变化的数据。
利用CDC,在对源表进行INSERT、UPCLATE或DELETE等操作的同时就可以提取数据,并且变化的数据被保存在数据库的变化表中。这样就可以捕获发生变化的数据,然后利用数据库视图以一种可控的方式提供给ETL抽取进程,作为增量抽取的依据。CDC方式对源表数据变化情况的捕获有两种方式:同步CDC和异步CDC。同步CDC使用源数据库触发器来捕获变更的数据。这种方式是实时的,没有任何延迟。当DML操作提交后,变更表中就产生了变更数据。异步CDC使用数据库重做日志(REDOLOG)文件,在源数据库发生变更以后,才进行数据捕获。
7.2 ORACLE闪回查询方式:ORACLE9I以上版本的数据库系统提供了闪回查询机制,允许用户查询过去某个时刻的数据库状态。这样,抽取进程可以将源数据库的(BI)
当前状态和上次抽取时刻的状态进行对比,快速得出源表数据记录的变化情况。

首先,从框架结构的角度来说

整个数据的读写转换流程,理想中当然是每个环节都能以Plugin的方式进行灵活拓展。链路环节拆得越细,定制能力当然就越好,但是要同时保持系统整体的易用性也就相对更加困难一点。

那么,数据交换读写链路的分解,大致可以分为几个模块呢,往大了拆分,差不多就是:输入,过滤转换,输出这三个模块。

再细化一些,为了提高模块的复用能力,那么还可以从输入模块中拆解出Decoder模块,从输出模块中拆解出Encoder模块

最后,为了达到数据链路复用的目的,还可以在输出模块之前增加一个路由模块,将一份数据拆分或复制输出到多个目标源中。不过,如果在框架中引入了这样的设计,实际上是将业务流程方面的复杂度下沉到底层组件中来,是否值得,如何取舍,就要看整体系统的设计思路了。

2.4确保同步成功后,再次更新TS_SyncUptime表,并把sys_tamp行变更时间戳(当前触发时间点)更新到LastUPstamp行最后有效变量时间戳(记住本次触发时间点)

1、人口基数实际上非常大,但是考虑到出生/死亡/失踪,这部分人口实际上在总人口中占比很小。

8、比较和分析

其次,从性能的角度考虑

为了提升性能,除了要求执行节点具备水平拓展能力,还需要考虑支持单个作业的分布式执行能力。

前者如何实现取决于数据同步服务系统的架构设计,如果是采用server模式的服务,客户端提交任务请求到服务端执行的,那么需要Server端能够水平拓展任务的worker执行节点,这个通常不会太难,就是需要自己管理工作节点,或者依托其它集群服务,比如提交MR任务到Hadoop集群上执行。而如果采用的是本地进程模式,客户端在哪里发起调用就在哪里执行,那么资源调度和负载均衡的工作,通常就会上移到工作流调度系统上来管理,数据同步服务自身不负责工作节点的管控。

后者,单个作业的分布式执行能力,实现起来就复杂一些了。因为这涉及到单个作业内部数据的分片处理。当数据源是Hadoop类的系统时,由于这类系统从架构设计的角度,天生就支持数据分片的能力,所以实现起来通常都不会太困难,但是对于DB,消息队列类的数据源,如何实现分片,往往就要复杂一些了。

以DB扫表任务为例,你要分片执行,那就需要数据表具备分段检索的能力,最好是可以基于主键索引进行分段检索,否则只是单纯的条件过滤,会大大加大对DB的压力。但是,现实应用中,很可能并不是所有的表都具备确定范围的主键,有些主键也可能是非连续离散的,这些都会导致很难均衡的对数据进行分片,进而影响分片执行的效率。

另外,基于DataX这种输入输出端独立插件思想构建的数据交换链路,如何和Hadoop体系的数据源的数据分片处理流程更好的结合,充分利用好原生的分布式处理能力,也是需要仔细构思的。

如上步骤即可实现可靠的同步,有人可能有疑问,这样就能实现可靠同步吗?我这里解释一下:

2、金融账务系统每天要记录很多的流水,但是考虑到一半在线上保存一年的流水,那么每天新增的几乎占比很小。

可见,ETL在进行增量抽取操作时,有以上各种机制可以选择。现从兼容性、完备性、性能和侵入性3个方面对这些机制的优劣进行比较分析。数据抽取需要面对的源系统,并不一定都是关系型数据库系统。某个ETL过程需要从若干年前的遗留系统中抽取EXCEL或者CSV文本数据的情形是经常发牛的。这时,所有基于关系型数据库产品的增量机制都无法工作,时间戳方式和全表比对方式可能有一定的利用价值,在最坏的情况下,只有放弃增量抽取的思路,转而采用全表删除插入方式。完备性方面,时间戳方式不能捕获DELETE操作,需要结合其它方式一起使用。增量抽取的性能因素表现在两个方面,一是抽取进程本身的性能,二是对源系统性能的负面影响。触发器方式、日志表方式以及系统日志分析方式由于不需要在抽取过程中执行比对步骤,所以增量抽取的性能较佳。全表比对方式需要经过复杂的比对过程才能识别出更改的记录,抽取性能最差。在对源系统的性能影响方面,触发器方式由于是直接在源系统业务表上建立触发器,同时写临时表,对于频繁操作的业务系统可能会有一定的性能损失,尤其是当业务表上执行批量操作时,行级触发器将会对性能产生严重的影响;同步CDC方式内部采用触发器的方式实现,也同样存在性能影响的问题;全表比对方式和日志表方式对数据源系统数据库的性能没有任何影响,只是它们需要业务系统进行额外的运算和数据库操作,会有少许的时间损耗;时间戳方式、系统日志分析方式以及基于系统日志分析的方式(异步CDC和闪回查询)对数据库性能的影响也是非常小的。对数据源系统的侵入性是指业务系统是否要为实现增抽取机制做功能修改和额外操作,在这一点上,时间戳方式值得特别关注该方式除了要修改数据源系统表结构外,对于不支持时间戳字段自动更新的关系型数据库产品,还必须要修改业务系统的功能,让它在源表T执行每次操作时都要显式的更新表的时间戳字段,这在ETL实施过程中必须得到数据源系统高度的配合才能达到,并且在多数情况下这种要求在数据源系统看来是比较“过分”的,这也是时间戳方式无法得到广泛运用的主要原因。另外,触发器方式需要在源表上建立触发器,这种在某些场合中也遭到拒绝。还有一些需要建立临时表的方式,例如全表比对和日志表方式。可能因为开放给ETL进程的数据库权限的限制而无法实施。同样的情况也可能发生在基于系统日志分析的方式上,因为大多数的数据库产品只允许特定组的用户甚至只有DBA才能执行日志分析。闪回查询在侵入性方面的影响是最小的.

最后,从业务稳定性的角度考虑

要保证业务的稳定性,从底层组件的角度来说,整体系统的流控和失败重试这两个环节往往也是需要重点考虑的。因为数据交换服务所对接的外部存储系统,通常还承载了其它的业务。所以其负载能力往往都有一定的约束要求,其业务环境也不是完全可控的。因此数据交换服务组件,需要能够约束自己的行为,同时应对可能发生的错误。其目的,都是为了提升整体链路的稳定性,降低维护代价。

3.1同步触发时记录当前触发时间点,并取得上一次的触发时间点(这里的上一次触发时间点是指上一次开始准备同步的记录时间点,确保从上一次查询到同步完成之间的时间点都包括其中,防止漏数据)

3、金融交易系统每天虽然要记录很多交易,但是考虑到一半都保存一年以上的交易记录,那么新增的占比很小的。

图片 20

数据交换服务管控平台

作为服务,不提供可视化的管控平台,只提供命令行交互方式,那就是耍流氓。

管控平台管什么?首先,当然是管理数据交换作业的任务配置信息了

标准的做法,基本都是让用户通过UI界面,以参数的形式配置任务信息,比如输入输出数据源,表格,字段信息,分区信息,过滤条件,异常数据处理方式,调度时间,并发度控制,流量控制,增量或全量配置,生命周期等等。总之,就是尽量让用户能够通过配置信息来表达自己的业务诉求。

当然,任务可供配置的参数越多,使用起来可能就越繁琐,此外,一些复杂的过滤,聚合或转换逻辑,很可能也没办法简单的用配置的方式进行表达,这时候就需要考虑提供自定义组件的管理方式了。

除去数据交换任务自身配置信息的管理,数据交换服务管控平台需要提供的其它服务,其实和大数据开发平台上其它类型的作业任务的管理十分类似,比如:

  • 提供数据交换任务的执行流水信息,便于用户查询任务执行情况和进行业务健康分析
  • 提供权限管控和业务分组管理,更好的支持多租户环境应用场景
  • 提供系统流量负载监控,任务错误跟踪报警等,更好的支持日常的系统及业务运行维护工作。

这些服务可以由数据交换服务平台独立提供,但最理想的,还是和开发平台的其它作业任务融合到同一个平台上进行管理,即使底层支撑对应服务的后台可能是独立的,在用户交互后台上,也要尽可能集成到一起。一方面减少重复开发的代价,另一方面,降低用户的学习使用成本。

3.2如果同步的任一环节失败(只要最终没有同步成功),那么再次同步触发时均取到的是同 一个时间点(LastUPstamp),而且即使重复执行同步逻辑,也不会出现重复(因为存在则更新不存在则插入原则),保证幂等,这样就确保了同步的可靠性

这就是隐藏在数据中的秘密!

 

上下游系统和业务流程适配

你无法左右别人,但是你可以改变自己。很多时候,数据同步服务,需要配合上下游系统,进行必要的流程定制,来满足业务的需求。

3.3当然如果某个时间点的数据或某个DB有问题,导致一直同不不成功,可能会出现一直同步不过去的情况,这种情况可以加上预警 人工干预,这个是概率的事情。

其实大部分的数据,都是基数大,新增,删除,修改量占比不大。

在我从事的ETL工作中,大部分都是采用时间戳方式进行增量抽取,如银行业务,VT新开户,使用时间戳方式,可以在固定时间内,组织人员进行数据抽取,进行整合后,加载到目标系统。而触发器方式,虽然可以自动进行抽取,但是执行频率过多,影响效率!第三种方式对于大数据量来说是非常不可取的,尤其是对于一些银行、电信行业,因为数据全量比较大,所以进行增量校对是比较耗时的,总起来说,个人趋向使用时间戳方式进行增量抽取,当然具体情况要看工作的使用环境!

数据结构变更

数据同步业务,最经常遇到的问题,就是业务DB的数据结构发生变更,导致任务运行失败。

数据结构的变更,通常很难自动解决。比如用户自定义了数据扫描的语法,当数据结构变更以后,已经非法了;比如源表的字段信息发生了增删改,目标表如何映射适配?历史数据能否转换处理,是否需要转换处理?另外,不同的数据源,增删改的处理方式也可能不同,业务方希望采取的应对方式可能也和具体业务逻辑相关。所以,很多情况下,数据结构的变更,都是需要人工干预的。

那么系统能做些什么呢?自然是通过工具尽可能的降低这个变更过程的代价,比如

  • 监控源表元数据的变更,提早发现问题,提早解决,避免在半夜真正执行任务时才出错报警
  • 规范业务流程,比如约定字段的变更方式,变更的通知机制等,通过最佳实践降低问题风险概率
  • 对一些已知场景提供标准化的自动处理方式,减少人工干预的需要,加快数据转换,重建处理流程等等

好了,如果大家有什么好的意见或建议欢迎下方留言评论,谢谢!

那么可以这么解决。采用单台服务器记录最近一段时间的修改增量(内存中记录),而以前的数据不变(基线数据)。写事务只在单台服务器写,避免了2PC,高效的实现了跨行跨表事务。然后定期合并修改增量到基线数据服务器。

数据时间问题

在离线业务中,大量的数据导入任务都是在凌晨附近导入前一天的数据进行批处理分析。这种场景下经常可能会遇到以下问题:

  • 数据可能由于各种原因晚到,在数据导入任务开始执行的时候,前一天的数据还没有完全到位。

数据晚到的可能原因很多,比如DB主从延迟太大,客户端上报不及时,业务端采集链路因为流量或负载或故障等原因未能及时采集数据等等。

这时候,通常的做法,一是将日常数据采集时间适当往后推迟一小段时间(比如15分钟到半个小时)降低问题出现的概率。二是往往需要对各种链路已知可能延迟的环节进行监控,比如采集DB主从延迟时间,队列消费进度等等,及时报警或阻断下游任务的执行。三是对晚到的数据,需要根据业务需求制定适当处理策略,是丢弃还是补充回写到前一天的数据中,还是直接划入第二天的数据里等等。

  • 数据本身没有手段区分业务更新时间,具体执行结果依赖于任务执行时间

比如DB扫表的任务,如果表格中没有用于区分业务时间的字段,但是统计业务中却需要按日期划分统计,就只能靠凌晨精确的时间点采集来实现了,这就很尴尬了,因为你很可能无法保证任务开始执行的时间。你可能会说这种情况是DB表结构设计得有问题,的确如此,这时候就需要推动业务方进行改造了。

还有一种情况更常见一些,就是DB表格中的确存在业务更新字段,但是,同一主键的数据可能有多个状态变迁,会被更新多次。而时间戳只有一个。举个例子,比如你有一个订单信息表格,里面记录了下单,付款,发货,收货,确认等等不同的状态,但是,只有一个update字段。那么根据某一个时间点扫描的数据,你可能无法判断出这些状态发生变化的准确时间,那么就有可能发生统计归属错误或者遗漏的情况。

这两种情况,通常都是因为业务方的业务流程本身并不依赖于这些时间信息的记录,但是做数据统计的时候需要这些信息,而业务开发方和数据统计方负责的同学是两拨人,开发方没有充分考虑统计的需求。

有时候这种情况问题也不大,比如半夜业务变更不频繁,数据采集过程迟一些早一些,数据偏差都不大,或者这类数据统计到前一天还是后一天都没有太大的关系。但是,当出现大范围时间偏移,或者你需要重跑历史数据的时候,比如今天重跑上周的数据,那么从当前DB快照无法复原业务字段变更的具体时间点,就会成为一个无法忽视的问题了。

总体来说,这类问题的解决,首先数据同步服务自身得提供根据业务时间过滤数据的手段,其次要推动业务方改造数据结构,避免出现无法还原的场景,最后,有些业务还可以通过采集binlog等实时增量的形式,通过分析每次数据的具体变化时序来解决(当然,由于log保存时间有限,对于长时间跨度重跑的场景,是无法通过这种方式来解决的)

-------------------------------------------------------------华丽的分割线-------------------------------------------------------------

分库分表处理

分库分表,大概是业务上了规模以后,大家都喜欢做的事。但是DB中分表可以,导入到比如Hive中以后,你得想办法合并啊,便于后续各种运算逻辑的开发和统计查询脚本的撰写,那么问题来了:

比如你是通过扫表的方式获取数据,如果没有类似阿里的TDDL这样的分库分表中间件来屏蔽DB分库分表细节,你会需要自己处理相关逻辑,管理和连接所有数据实例。如果走binlog获取数据,在分库的场景下也需要自己想办法合并数据采集流程和结果。

更麻烦的是,如果你的业务方分表设计的时候,不够规范,不同的分表之间没有唯一的主键可以加以区分(可以区分的字段,也可能不是主键),那么在合并数据的时候,你可能就需要允许用户自定义合并用字段,或者自动捏造出一个主键出来,避免数据的冲突

这个问题同样,最理想的解决方案也是通过推动DB分库分表中间件的建设和业务规范的建立来解决,但这对很多公司来说往往不是一件简单的事,所以,在此之前,就需要自己想办法定制解决了。

基于上面描述,OB整个系统架构:

数据合并去重等

通过binlog增量方式来获取DB变更数据,优势是时效性好,有时也是某些场景下唯一的解决方案。但是因为走Binlog来给离线批处理任务同步数据,实际上,数据是经过了表-流-表这种模式的切换,而这种切换也会带来附加的问题

从表的变化解析成数据流,这个过程问题不大,但从数据流重新构建回表格,就会有几个问题需要关注了:

  • 取决于数据流传输的方式,数据流可能发生乱序,重复的问题,对重构表格带来困难。

比如用消息队列传输数据,各个分区的数据可能无法保持全局有序性,消息队列本身可能也无法保证Exactly once的投递。如果业务流程不能允许这类问题的发生,那就需要针对性的加以防范了,比如结合业务知识,使用合适的分区字段,使得局部有序的数据对业务结果不会造成影响。

  • 目标端数据源,比如像HDFS或Hive文件,可能只允许添加记录,或全局重写,而无法单条删除或者更新记录。

这种情况下如果源端数据源类似DB中,一条记录发生多次变更,就会生成多条变更记录,而下游任务比如一天的批处理任务,只需要最后凌晨时间点上的状态信息,这时候就需要对变更记录进行合并了。

合并数据的可能方法很多,取决于具体的业务场景和代价,未必有统一的最佳方案。首先你需要解决数据乱序问题,然后:

你可以在数据流式采集方案的后端,将数据先写入一个支持单条记录删改操作的中间数据源,然后到点再从这个中间数据源导出最终数据到目标数据源。

如果数据量不大,你也可以在采集程序中汇总所有数据,去重完再写出到目标数据源。

你也可以不去重直接将所有变更流水写入目标数据源,事后再运行一个清理程序进行去重,前提是除去采集时间,原始数据中还具备可以用作去重判断的依据。

图片 21

我司相关服务的现状和未来改进计划

目前我司的数据交换服务,日志相关链路,采用Camus和自定义的Hive Kafka Handler两种方式采集,后者在采集的基础上添加了Per topic的过滤转换逻辑,可以通过自定义Hiveql一步完成数据的采集和转换工作。

其它大数据组件之间以及与DB间的数据交换服务,由自研的与DataX类似架构的系统承担,插件式开发,能够处理增量/全量,并发流控,分库分表等前面所描述的常见需求。另外,管控平台基本实现了用户可视化的配置,管理,执行流水查询,变更记录查询,系统负载和业务进度监控报警等功能。此外,在数据交换任务的数据质量监控方面,也做了部分采集和统计分析工作。

整体来说,主要的服务框架流程没有很大的问题,但是在与开发平台的整体集成和用户自助服务的易用性方面与理想的状态还有很大的差距,其次在性能,稳定性,拓展性等方面也有很多工作等待开展,在数据质量监控方面做的工作也相对粗糙,所以未来的改进方向,包括:

  • 底层数据交换组件的进一步模块化,标准化,重点加强用户自定义数据过滤和转换模块的建设
  • 单个作业分布式分片处理方案的改进,提升大表同步作业的处理效率
  • 数据合并/去重方案的改进,提升性能规避容量瓶颈(目前的变更合并工作还是通过二次写入专属DB来实现)
  • 任务流量,负载,进度,异常等Metrics信息的全面采集和汇总分析,便于及时发现问题,持续改进业务
  • 全链路的分级容错和自动重试恢复机制的完善改进(目前的容错重试机制是作业级别的,粒度太粗)
  • 更加自动,更加平滑的流控和负载隔离机
  • 数据交换服务管控后台与大数据平台整体开发环境的进一步融合,提升用户自主服务能力,降低业务开发维护成本
  • 完善异常,错误反馈机制: 比如对常见问题,汇总,解析后再明确的反馈给用户,可能的话,提供解决意见和方案,而不是直接抛出异常代码,降低用户支持的代价。
  • 前述业务数据时间问题的全面推动改进,降低数据同步任务结果的不确定性

整个ob集群包括:rootserver,updateserver,chunkserver,mergeserver这几个类服务器。

小结

总体来说,大数据开发平台的数据同步服务的构建,可以参考的方案很多,具体的读写组件的开发也并不困难,能够找到很多现成的解决方案。对于多数公司的大多数业务来说,底层不论采取什么方案,通常都是可行的。所以数据同步服务建设的成熟度水平,往往体现在管控平台的服务能力水平和业务接入及运维代价的高低。


常按扫描下面的二维码,关注“大数据务虚杂谈”,务虚,我是认真的 ;)

图片 22

client:与mysql兼容,协议相同。

rootserver:管理集群,子表,数据分布,副本。分为主,副(主备数据同步)

updateserver:存储ob中的增量数据(内存)主备

chunkserver:存储基线数据

mergeserver:接受sql,解析,优化,转发给chunkserver或者updateserver,合并结果给客户端。

接下来我们来深入探讨一下:

首先ob部署在多个机房,每个机房一个ob集群。

客户端的请求过程:

1、请求rootserver获取ob集群中的mergeserver列表

2、按照一定策略选择mergeserver

3、请求失败后,重新选择一台mergeserver,如果某一台被请求失败超过一定次数,拉黑。

oceanbase集群会根据路由规则控制流量比,所以不用担心负载的问题。

ob中的基线数据按照主键排序(查询非常快)并划分为子表(每一个256M),并且都有副本。而在rootserver中记录了每个子表在chunkserver的位置。

图片 23

mergeserver会缓存子表的分部信息,根据请求转发给该子表所在的chunkserver,如果写操作还会转发给updateserver。

在chunkserver中,一般存储子表,而一个子表由多个sstable过程,每个sstable的容量4k~64(主键有序)。

合并操作:

oceanbase定期触发合并/数据分发操作,chunkserver会从updateserver中获取一段时间更新的操作。(业务低谷时操作)

updateserver:

更新操作写入内存,当内存数据量超过一定值时,生成快照存储在SSD中。

定期合并/数据分发:把updateserver增量更新分发到chunkserver中。

1、updateserver冻结当前的活跃的内存表(Active Memory),生成冻结内存表,开启新的活跃内存表后,缓存更新操作写入新的活跃内存表。

2、updateserver通知rootserver数据版本变化,rootserver心跳通知chunkserver。

3、每台chunkserver启动定期合并或数据分发,从updateserver获取每个子表对应的增量更新数据。

为什么分为定期合并和数据分发?

定期合并:chunkserver讲本地sstable中的基线数据与冻结内存表中的增量更新数据归并,生成新的sstable。(因为合并操作对服务器性能影响非常大,需要在业务低估时进行)

数据分发:chunkserver将updateserver中的冻结内存表中的增量缓存到本地。(不受业务高峰限制)

一个复杂系统的拆分改造实践,支付宝双11的功臣。以上就是我对ob的原理的总结,其中也看出一些问题,首先updateserver需要非常大的内存,第二为了避免单点,应该是主备切换,这里面用了zookeeper中的paxos算法,选举主机。整个ob还是非常复杂的,如果想深入探究还需要花费很大的功夫啊!

本文由澳门新浦京娱乐场网站发布于www.146.net,转载请注明出处:一个复杂系统的拆分改造实践,支付宝双11的功臣