
北京公司注册_代记账服务_财务SaaS服务_智能记账
近期评论
分类
-
商务合作
- 邮箱:aifinacn@qq.com
- 手机:13871128117
- 座机:(+86)
- Q Q:32556627点击交谈
- 地址:
之前设计篇讲了数据拆分的方式、场景、优缺点以及实施步骤,偏方法与理论。技术篇会介绍分布式数据服务平台设计与实现,讲述如何通过技术手段解决数据拆分带来的各种问题,以及各中间件的架构与原理。
平台主要包括分布式数据访问中间件(SDK、Proxy)、平滑扩容、数据集成、管控平台等四部分。
一、分布式数据访问中间件
数据拆分后,分散在多个库与表中,但应用开发时怎样才能准确访问数据库,换言之,如何才能拿到准确的数据库连接,拼接出正确的sql(主要是实际表名),然后执行返回结果集呢?
为了尽可能减少业务侵入性,应用少做改造,往往都会抽象出一个数据访问层负责上述功能。数据访问层按实现方式不同,可分为应用自定义、数据中间件、分布式数据库三种方式,在我们项目中采用的是中间件方式,其技术架构如下:
分布式数据访问层
按照接入方式不同,数据访问中间件可以分为 SDK、Proxy(云原生架构下可能还会有sidecar方式)。
一个典型的分库分表中间件由JDBC接口实现(SDK模式)、MySQL报文解析(Proxy、Sider模式)、SQL解析器,路由计算、SQL重写, SQL执行、聚合处理、结果集合并、数据源管理、配置管理等部分构成。
1、JDBC接口实现
JDBC接口实现起来并不太难,数据库连接池都是基于此实现,本质上就是一种装饰器模式,主要就是java.sql与javax.sql包下DataSource、Connection、Statement,PreparedStatement,ResultSet、DatabaseMetaData、ResultSetMetaData等接口。这些接口也并不是都需要实现,不常用的接口可在集成一些框架时根据需要再实现。
2、MySQL报文解析
MySQL报文解析比JDBC接口复杂些,它包含了很多MySQL的命令,需要对照MySQL报文规范分别进行解析,另外由于proxy还要支持常见DBA工具接入,比如MySQL CLI、Navicat、Dbvisualizer、MySQL workbench等,这些工具甚至不同版本使用的MySQL报文都不完全一样,这块的兼容性也是一个繁琐的工作,考验对Mysql报文的支持的完整度。这部分像Sharding-Proxy、Mycat等都有实现,如果要自行研发或者扩展优化,可参考其实现细节。
MySQL报文规范:https://dev.mysql.com/doc/internals/en/client-server-protocol.html
3、SQL解析
SQL解析是个繁琐复杂的活儿,对应就是词法Lexer与语法分析Parser,因为要最大程度兼容各数据库厂商SQL,这块是需要不断的迭代增强的。开源的手写解析器有阿里开源的druid,也可以使用javacc、antlr等进行实现,相比手写解析器速度要慢些,但扩展定制化能力更好。这类解析器在使用方式上,多采用vistor设计模式,如果需要可以编写自己的vistor从而获取所需AST(Abstract Syntax Tree)中的各类值。
4、路由计算
路由计算是根据SQL解析后AST,提取分库分表列值(提取规则是预先配置好的),然后根据应用指定的运算表达式或者函数进行计算,分别得到数据库与表对应的序号(一般就是一个整型数值,类似一个数组下标)或者是真正的物理表名。读写分离模式下,只涉及库路由,会根据一个负载均衡算法选取一个合适的物理库,如果写SQL则会选择主库,如果是读则会按照随机、轮询或者权重等算法选择一个从库。
5、SQL重写
SQL重写主要为表名添加后缀(应用写SQL时是逻辑表名,实际表名往往是逻辑表名+序号),根据路由计算环节得到的物理表名,替换原SQL中的逻辑表名。另外SQL中有聚合函数、多库表分页等操作时,也会涉及到对SQL的改写,这部分有的开源中间件里也叫做SQL优化。注意这里最好不要简单的用字符串匹配去替换表名,例如当存在列名与表名一样的情况下会出现问题。
6、SQL执行
SQL执行负责SQL的真正执行,对应的就是执行连接池或数据库驱动中Statement的execute、executeQuery、executeUpdate、executeBatch等方法。当然如果是涉及到多库多表的SQL,例如where条件不包含分库分表键,这时会涉及到库表扫描,则需要考虑是连接优先还是内存优先,即采用多少个并发数据库连接执行,连接数太大则会可能耗尽连接池,给内存以及数据库带来很大压力;但连接数太小则会拉长SQL执行时间,很有可能带来超时问题,所以一个强大的SQL执行器还会根据SQL类型、数据分布、连接数等因素生成一个到合适的执行计划。
7、数据源管理
数据源管理负责维护各数据库的连接,这块实现起来比较简单,一般维护一个数据库连接池DataSource对象的Map就可以,只要根据数据源下标或者名称可以拿到对应的数据库连接即可。
8、聚合处理
聚合处理负责对聚合类函数的处理,因为分库分表后,实际执行的SQL都是面向单库的,而对于max、min、sum、count、avg等聚合操作,需要将各单库返回的结果进行二次处理才能计算出准确的值,例如max、min、sum、count需要遍历各库结果,然后分别取最大、最小、累加,对于avg操作,还需要将原SQL修改为select sum,count,然后分别累加,最后用累积后的sum除以累加后count才能得到准确值。另外对于多库表的分页操作,例如limit 1,10,则将单库SQL的起始页都修改为第一页即limit 0,10,然后再整体排序取出前10个才是正确的数据。
9、结果集合并
结果集合并负责将多个SQL执行单元返回的数据集进行合并,然后返回给调用客户端。一般当进行库表遍历、或者涉及多个库SQL(例如使用in时)会需要进行合并。当然并不一定需要把数据全部读到内存再合并,有时基于数据库驱动实现的ResultSet.next函数,逐条从数据库获取数据即可满足要求。关于结果集合并,sharding-jdbc对此有一个更丰富的抽象与分类,支持流式归并、内存归并、分组归并等,具体可参见归并引擎。
归并引擎:https://shardingsphere.apache.org/document/current/cn/features/sharding/principle/merge/
10、配置管理
配置管理负责分库分表的规则以及数据源的定义,这块是面向应用开发者的,在使用体验上应当简单、易用、灵活。其中会涉及到物理数据源(参数跟连接池类似)、逻辑表、路由规则(库路由、表路由,库表分布,支持指定java函数或者groovy表达式),逻辑表->路由规则的映射关系。另外我们在实践时还包括了一些元数据信息,包括shardID->库表序号,这样做有个好处,业务在配置路由规则时只需要关注业务对象->shardID即可。配置管理在具体形式方面,可以支持xml、yaml、也支持在管控平台上在线进行配置,后者会通过将配置同步到配置中心,进而支持数据访问层进行编排(orchestration),例如在线扩容时需要动态增加数据源、修改路由规则、元数据信息等。
一个完整的分布式数据访问中间件,在架构上和数据库的计算层很像,尤其如果涉及到DB协议报文与SQL的解析,还是一个复杂和工作量较大的工程,因此一般应用团队建议还是采用开源成熟的方案,基于此做定制优化即可,没必要重复造轮子。
SDK和Proxy方式各有优缺点,在我们项目中分别用在不同的场景,简单总结如下:
接下来介绍下我们在开源中间件方面的实践,分为三个阶段:
第一阶段
早些年这类开源中间件还挺多,但其实都没有一个稳定的社区支持。2015年时我们基于一个类似TDDL的组件,对其事务、数据连接池、SQL解析等方面进行了优化,修复了数十个开发遇到的bug,实现SDK版本的数据访问中间件,暂就叫做DAL。
第二阶段
2017年,系统上线后发现,开发测试以及运维还需要一个执行分库分表SQL的平台,于是我们调研了Mycat,但当时1.6版本只支持单维度拆分(单库内分表或者只分库),因此我们重写了其后端SQL路由模块,结合原SDK版本数据组件,利用Mycat的报文解析实现了Proxy的数据访问层。
Proxy模式的数据访问层上线后,可以很好的应对带分库分表键的SQL操作,但在涉及到库表遍历时,由于并发连接太多,经常会导致连接数不够,但如果串行执行则经常导致执行时间太长,最后超时报错。针对这个问题,我们做了个新的优化:
在将这类库表遍历的查询在生成执行计划时,通过union all进行了改写,类似map-reduce,同一库上的不同表的sql通过union all合并,然后发到数据库执行,这样连接数=物理数据库总数,同时尽可能的利用了数据库的计算能力,在损耗较少连接数的前提下,大大提升了这类SQL的执行效率。(注意order by 和limit需要加在union all的最后,为了不影响主库,可以将这类查询在从库执行)。
例如user表拆分成1024表,分布在4个库,SQL拆分与合并示意图如下:
通过union all实现库表遍历
第三阶段
这两个中间件在运行3年左右后,也暴露出来了很多问题,例如SQL限制太多,兼容性太差,开源社区不活跃,部分核心代码设计结构不够清晰等,这给后续更复杂场景的使用带来了很多桎梏。因此在19年,我们决定对数据访问层进行升级重构,将底层分库分表组件与上层配置、编排进行剥离,改成插拔式设计,增加更加多元的分库分表组件。在那时开源社区已经涌现了一些优秀的分库分表项目,目前来看做的最好的就是shardingshpere(后面简称ss)了,ss的设计与使用手册其官网都有详细介绍,这里主要简单介绍下我们集成ss的一些实践。
shardingsphere整体设计架构清晰,内核各个引擎设计职责明确,jdbc 与proxy版本共享内核,接入端支持的多种实现方式。治理、事务、SQL解析器分别单独抽象出来,都可以hook方式进行集成,通过SPI进行扩展。这种灵活的设计也为我们定制带来了很大的方便,代码实现上比较优雅。我们在集成时开始是3.0.0版本,后来升级到4.0.0-RC1版本,目前ss已发布4.0.0的release版本。
1)配置兼容
因为要在上层应用无感知的情况下更换底层分库分表引擎,所以改造的第一个问题就是兼容以前的配置。基于此,也就无法直接使用sharding-jdbc的spring或者yaml配置方式,而改用API方式,将原配置都转换为sharding-jdbc的配置对象。这块工作量时改造里最大的,但如果项目之前并没有分库分表配置,则直接在sharding-jdbc提供的方式中选择一种即可。由于我们项目中需要支持规则链、读权重等ss不支持功能,所以我们是基于ComplexKeysShardingAlgorithm接口进行的实现。
更简洁的yaml配置形式:
ds:
master_0:
blockingTimeoutMillis: 5000
borrowConnectionTimeout: 30
connectionProperties: {}
idleTimeoutMinutes: 30
jdbcUrl: jdbc:mysql://localhost:3306/shard_0
logAbandoned: false
maintenanceInterval: 60
maxConn: 10
maxIdleTime: 61
minConn: 1
userName: root
queryTimeout: 30
testOnBorrow: false
testOnReturn: false
testQuery: null
testWhileIdle: true
timeBetweenEvictionRunsMillis: 60000
master_1:
jdbcUrl: jdbc:mysql://localhost:3306/shard_1
parent: master_0
groupRule: null
shardRule:
bindingTables:
– user,name
rules:
userTableRule:
dbIndexs: master_0,master_1
dataNodes: master_0.user_${[\’00\’,\’01\’]},master_1.user_${[\’02\’,\’03\’]}
dbRules:
– return test.dal.jdbc.shardingjdbc.YamlShardRuleParser.parserDbIndex(#user_id#)
– return test.dal.jdbc.shardingjdbc.YamlShardRuleParser.parserDbIndexByName(#name#,#address#)
tbRules:
– return test.dal.jdbc.shardingjdbc.YamlShardRuleParser.parserTbIndex(#user_id#)
– return test.dal.jdbc.shardingjdbc.YamlShardRuleParser.parserTbIndexByName(#name#,#address#)
tableRuleMap: {name: nameTableRule, user: userTableRule}
2)事务级别
sharding-jdbc的默认事务是local,即最大努力一阶段提交,或者叫链式提交,这种方式的好处是对应用透明,性能也还不错,互联网中使用较多。但这种方式可能会由于网络等原因导致部分提交成功,部分失败。虽然这种概率可能并不高,但一旦出现则会产生事务不一致的问题,这在金融关键场景下风险是很高的。所以我们在联机交易场景下禁止使用这种方式,而是要求必须严格单库事务,我们在先前SDK版本的数据访问中间件增加了校验,一旦跨库就直接抛异常。因此切换到sharding-jdbc,这种事务级别也要继续支持。实现代码片段:
/**
* Single DB Transaction Manager
* SPI: org.apache.shardingsphere.transaction.spi.ShardingTransactionManager
*/
@NoArgsConstructor
public class SingleDBTransactionManager implements ShardingTransactionManager {
private Map<String, DataSource> dataSources = new HashMap<String, DataSource>;
private ThreadLocal<String> targetDataSourceName = new ThreadLocal<String> {
protected String initialValue {
return null;
}
};
private ThreadLocal<Connection> connection = new ThreadLocal<Connection> {
protected Connection initialValue {
return null;
}
};
private ThreadLocal<Boolean> autoCommitted = new ThreadLocal<Boolean> {
protected Boolean initialValue {
return true;
}
};
@Override
public void close throws Exception {
if (connection.get != null) {
connection.get.close;
}
}
@Override
public void init(DatabaseType databaseType, Collection<ResourceDataSource> resourceDataSources) {
for (ResourceDataSource res : resourceDataSources) {
dataSources.put(res.getOriginalName, res.getDataSource);
}
}
@Override
public TransactionType getTransactionType {
return TransactionType.SINGLEDB;
}
@Override
public Connection getConnection(String dataSourceName) throws SQLException {
if (!ConditionChecker.getInstance.isMultiDbTxAllowed && targetDataSourceName.get != null
&& !targetDataSourceName.get.equals(dataSourceName)) {
throw new TransactionException(
\”Don\’t allow multi-db transaction currently.previous dataSource key=\”
+ targetDataSourceName.get + \”, new dataSource key=\” + dataSourceName);
}
targetDataSourceName.set(dataSourceName);
if (connection.get == null) {
connection.set(dataSources.get(dataSourceName).getConnection);
}
return connection.get;
}
…
}
3)读库权重
虽然多个从库(一个主一般都要挂两个或者三个从,从库的数量由RPO、多活甚至监管要求等因素决定)可以提供读功能,但细分的话,这些从库其实是有“差别”的,这种差异性有可能是由于机器硬件配置,也可能是由于所在机房、网络原因导致,这种时候就会需要支持读权限的权重配置,例如我们项目中有单元化的设计,需要根据当前所在单元及权重配置路由到当前机房的从库。另外也可以通过调整权重,支持在线对数据库进行维护或者升级等运维操作。实现代码片段:
/**
* Weight based slave database load-balance algorithm.
* SPI: org.apache.shardingsphere.spi.masterslave.MasterSlaveLoadBalanceAlgorithm
*/
public final class WeightMasterSlaveLoadBalanceAlgorithm implements MasterSlaveLoadBalanceAlgorithm {
public final static String TYPE = \”WEIGHT\”;
protected DataSource dataSource;
public WeightMasterSlaveLoadBalanceAlgorithm(DataSource ds) {
this.dataSource = ds;
}
public WeightMasterSlaveLoadBalanceAlgorithm{
}
@Override
public String getDataSource(final String name, final String masterDataSourceName, final List<String> slaveDataSourceNames) {
String selectReadDb = dataSource.getTableRuleContext.getGroupRule(name).selectReadDb;
return slaveDataSourceNames.contains(selectReadDb) ? selectReadDb : null;
}
@Override
public String getType {
return TYPE;
}
4)SQL开关
SDK模式的数据访问中间件,主要用在联机交易中,在这类场景下,是没有DDL操作需求的,也是不允许的,但shading-jdbc作为一个通用的数据分片中间件。对此并没有相应的开关配置,因此我们增加开关功能,应用在默认情况下,对DDL、DCL等语句进行了校验,不允许执行该类SQL,在技术层面杜绝了应用的误用。实现代码片段:
//SPI: org.apache.shardingsphere.core.parse.hook.ParsingHook
public class AccessPrevilegeCheckHook implements ParsingHook {
@Override
public void start(String sql) {
}
@Override
public void finishSuccess(SQLStatement sqlStatement, ShardingTableMetaData shardingTableMetaData) {
ConditionChecker.getInstance.checkDdlAndDcl(sqlStatement);
}
…
}
//SPI:org.apache.shardingsphere.core.rewrite.hook.RewriteHook
@NoArgsConstructor
public class TableScanCheckHook implements RewriteHook {
private List<TableUnit> tableUnits = new LinkedList<TableUnit>;
@Override
public void start(TableUnit tableUnit) {
if(tableUnits.size > 0 && !ConditionChecker.getInstance.isTableScanAllowed){
throw new RouteException(\”Don\’t allow table scan.\”);
}
tableUnits.add(tableUnit);
}
…
}
public class ConditionChecker {
private static ThreadLocal<SQLType> sqlTypeSnapshot = new ThreadLocal<SQLType>;
private boolean defalutTableScanAllowed = true;
private boolean defalutMultiDbTxAllowed = true;
private boolean defalutDdlAndDclAllowed = true;
private static ConditionChecker checker = new ConditionChecker;
public static ConditionChecker getInstance {
return checker;
}
private ConditionChecker {
}
private ThreadLocal<Boolean> tableScanAllowed = new ThreadLocal<Boolean> {
protected Boolean initialValue {
return defalutTableScanAllowed;
}
};
private ThreadLocal<Boolean> multiDbTxAllowed = new ThreadLocal<Boolean> {
protected Boolean initialValue {
return defalutMultiDbTxAllowed;
}
};
private ThreadLocal<Boolean> ddlAndDclAllowed = new ThreadLocal<Boolean> {
protected Boolean initialValue {
return defalutDdlAndDclAllowed;
}
};
public void setDefaultCondtion(boolean tableScanAllowed, boolean multiDbTxAllowed, boolean ddlAndDclAllowed) {
defalutTableScanAllowed = tableScanAllowed;
defalutMultiDbTxAllowed = multiDbTxAllowed;
defalutDdlAndDclAllowed = ddlAndDclAllowed;
}
public boolean isTableScanAllowed {
return tableScanAllowed.get;
}
public void setTableScanAllowed(boolean tableScanAllowed) {
this.tableScanAllowed.set(tableScanAllowed);
}
public boolean isMultiDbTxAllowed {
return multiDbTxAllowed.get;
}
public void setMultiDbTxAllowed(boolean multiDbTxAllowed) {
this.multiDbTxAllowed.set(multiDbTxAllowed);
}
public boolean isDdlAndDclAllowed {
return ddlAndDclAllowed.get;
}
public void setDdlAndDclAllowed(boolean ddlAllowed) {
this.ddlAndDclAllowed.set(ddlAllowed);
}
public SQLType getSqlTypeSnapshot {
return sqlTypeSnapshot.get;
}
public void checkTableScan(boolean isTableScan) {
if (!isTableScanAllowed)
throw new ConditionCheckException(\”Don\’t allow table scan.\”);
}
public void checkDdlAndDcl(SQLStatement sqlStatement) {
sqlTypeSnapshot.set(sqlStatement.getType);
if (!isDdlAndDclAllowed
&& (sqlStatement.getType.equals(SQLType.DDL) || sqlStatement.getType.equals(SQLType.DCL))) {
throw new ConditionCheckException(\”Don\’t allow DDL or DCL.\”);
}
}
public void checkMultiDbTx(Map<String, Connection> cachedConnections, String newDataSource) {
if (!isMultiDbTxAllowed && cachedConnections.size > 0 && !cachedConnections.containsKey(newDataSource)) {
throw new ConditionCheckException(\”Don\’t allow multi-db transaction currently.old connection key=\”
+ cachedConnections.keySet + \”new connection key=\” + newDataSource);
}
}
}
5)路由规则链
在我们项目中,对于一张表,在不同场景下可能会使用不同的分库分表列,例如有的是账号、有的是客户号(这两列都可路由到同一库表中),这时候就需要路由模块可以依次匹配搭配多个规则,例如SQL中有账号则用account-rule,有客户号则用customer-rule,因此我们支持了规则链配置功能,但sharding-jdbc只支持配置一个路由规则,因此在自定义路由算法函数中,我们增加了对规则链的支持。实现代码片段:
public abstract class ChainedRuleShardingAlgorithm implements ComplexKeysShardingAlgorithm {
protected final DataSource dataSource;
public ChainedRuleShardingAlgorithm(DataSource ds) {
this.dataSource = ds;
}
@Override
public Collection<String> doSharding(Collection availableTargetNames, ComplexKeysShardingValue shardingValue) {
List<String> targets = new ArrayList<String>;
Set<String> actualNames = HintManager.isDatabaseShardingOnly ? getHintActualName(shardingValue)
: calculateActualNames(shardingValue);
for (String each : actualNames) {
if (availableTargetNames.contains(each)) {
targets.add(each);
}
}
clear;
return targets;
}
@SuppressWarnings({ \”serial\”, \”unchecked\” })
protected Set<String> calculateActualNames(ComplexKeysShardingValue shardingValue) {
Set<String> target = new HashSet<String>;
Map<String/* table */, Map<String/* column */, Collection/* value */>> shardingMap = new HashMap<String, Map<String, Collection>>;
String logicalTableName = shardingValue.getLogicTableName;
Map<String, Collection> shardingValuesMap = shardingValue.getColumnNameAndShardingValuesMap;
for (final Entry<String, Collection> entry : shardingValuesMap.entrySet) {
if (shardingMap.containsKey(logicalTableName)) {
shardingMap.get(logicalTableName).put(entry.getKey, entry.getValue);
} else {
shardingMap.put(logicalTableName, new HashMap<String, Collection> {
{
put(entry.getKey, entry.getValue);
}
});
}
}
// 遍历规则链,查询匹配规则
for (String tableName : shardingMap.keySet) {
RuleChain ruleChain = dataSource.getTableRuleContext.getRuleChain(tableName);
for (GroovyListRuleEngine engine : getRuleEngine(ruleChain)) {
Set<String> parameters = engine.getParameters;
Map<String, Collection> columnValues = shardingMap.get(tableName);
Set<String> eval = eval(columnValues, parameters, engine, ruleChain);
if (eval.size > 0) {// 匹配即中止
target.addAll(eval);
return target;
}
}
}
return target;
}
@SuppressWarnings(\”unchecked\”)
protected Set<String> eval(final Map<String, Collection> columnValues, Set<String> parameters,
GroovyListRuleEngine engine, RuleChain ruleChain) {
Set<String> targetNames = new HashSet<String>;
if (columnValues.keySet.containsAll(parameters)) {// 匹配
List<Set<Object>> list = new LinkedList<Set<Object>>;// 参数集合
List<String> columns = new LinkedList<String>;// 列名集合
for (final String requireParam : parameters) {
list.add(convertToSet(columnValues.get(requireParam)));
columns.add(requireParam);
}
Set<List<Object>> cartesianProduct = Sets.cartesianProduct(list);
for (List<Object> values : cartesianProduct) {
Map<String, Object> arugmentMap = createArugmentMap(values, columns);
int index = engine.evaluate(arugmentMap);
targetNames.add(getActualName(ruleChain, index));
}
}
return targetNames;
}
private Set<Object> convertToSet(final Collection<Object> values) {
return Sets.newLinkedHashSet(values);
}
private Map<String, Object> createArugmentMap(List<Object> values, List<String> columns) {
HashMap<String, Object> map = new HashMap<String, Object>;
for (int i = 0; i < columns.size; i++) {
map.put(columns.get(i).toLowerCase, values.get(i));
}
return map;
}
protected abstract List<GroovyListRuleEngine> getRuleEngine(RuleChain ruleChain);
protected abstract String getActualName(RuleChain ruleChain, int index);
}
/**
* 库路由算法
*/
public class ChainedRuleDbShardingAlgorithm extends ChainedRuleShardingAlgorithm {
public ChainedRuleDbShardingAlgorithm(DataSource ds) {
super(ds);
}
@Override
protected List<GroovyListRuleEngine> getRuleEngine(RuleChain ruleChain) {
return ruleChain.getDbRuleList;
}
@Override
protected String getActualName(RuleChain ruleChain,int
北京公司注册_代记账服务_财务SaaS服务_智能记账