离线数据同步变迁
第一代-根据Hadoop体系的离线数据同步
一、布景
跟着事务的开展,体系进行了微服务的差分,导致数据越来越涣散,很难进行一个完好的生命周期的数据查询,关于某些事务的需求支撑变得越来越难,越来越杂乱,也越来越难以进行责任区分。对着事务的开展,数据量越来越大之后,为了杰出的事务支撑,进行了分库分表,分库分表规矩形形色色,一旦脱离了事务逻辑,很难确认某一条数据在哪个库哪个表。
根据这样的问题和状况,为了满意事务需求,很天然的就想到了运用大数据服务,将事务数据归集到一同,树立完好的数据仓库,便于数据的查询。
二、数据同步架构
为了寻求简略和通用,因为自身的知道现在,挑选了最规范的大数据架构,即根据Hadoop的大数据表现。整个集群选用三节点,经过CDH进行集群的布置和保护。
整个数据链路为:
经过Azkaban调用Spark运用,将数据从RDS同步到Hive,运营渠道和报表体系选用Presto加快拜访Hive的数据。
三、数据同步具体进程
数据同步选用Spark使命来进行,将使命打包之后,上传到Azkaban调度渠道,运用Azkaban进行守时调度,完结T+1级其他数据同步作业。
数据同步代码示例:
object MarketMysqlToHiveEtl extends SparkHivePartitionOverwriteApplication{
/**
* 删去已存在的分区
*
* @param spark SparkSessions实例
* @param date 日期
* @param properties 数据库装备
*/
def delete_partition(spark: SparkSession, properties:Properties, date: String):Unit={
val odsDatabaseName = properties.getProperty("hive.datasource.ods")
DropPartitionTools
.dropPartitionIfExists(spark,odsDatabaseName,"ods_t_money_record","ds",date)
DropPartitionTools
.dropPartitionIfExists(spark,odsDatabaseName,"ods_t_account","ds",date)
}
/**
* 抽取数据
* @param spark SparkSession实例
* @param properties 数据库装备
* @param date 日期
*/
def loadData(spark: SparkSession, properties:Properties, date: String): Unit ={
// 删去前史数据,处理重复同步问题
delete_partition(spark,properties,date)
// 获取数据源装备
val odsDatabaseName = properties.get("hive.datasource.ods")
val dataSource = DataSourceUtils.getDataSourceProperties(FinalCode.MARKET_MYSQL_FILENAME,properties)
var sql = s"select id,account_id,type,original_id,original_code,money,reason,user_type,user_id,organization_id," +
s"create_time,update_time,detail,deleted,parent_id,counts,'${date}' AS ds from TABLENAME where date(update_time) ='${date}'"
// 同步数据
MysqlToHiveTools.readFromMysqlIncrement(spark,dataSource,sql.replace("TABLENAME","t_money_record"),
s"${odsDatabaseName}.ods_t_money_record",SaveMode.Append,"ds")
sql = s"select id,code,customer_code,name,mobile,type,organization_id,organization_name,create_time,update_time,deleted,status,customer_name," +
s"customer_id,channel_type,nike_name,version,register_Time,'${date}' AS ds from TABLENAME where date(update_time) ='${date}'"
MysqlToHiveTools.readFromMysqlIncrement(spark,dataSource,sql.replace("TABLENAME","t_account"),
s"${odsDatabaseName}.ods_t_account",SaveMode.Append,"ds")
}
/**
* 数据etl
* @param spark SparkSession实例
* @param SparkSession 数据库装备
*/
def etl(spark: SparkSession, properties:Properties): Unit = {
val sparkConf = spark.sparkContext.getConf
// 获取同步的日期
var lastDate = sparkConf.get("spark.etl.last.day", DateUtils.getLastDayString)
val dateList = new ListBuffer[String]()
if(lastDate.isEmpty){
// 未装备,设置为前一天
lastDate = DateUtils.getLastDayString
}
if(lastDate.contains("~")){
// 如果是时刻段,获取时刻段中的每一天,解析为时刻list
val dateArray = lastDate.split("~")
DateUtils.findBetweenDates(dateArray(0), dateArray(1)).foreach(it => dateList.append(it))
}else if(lastDate.contains(",")){
// 如果是运用,分隔的多个日期,解析为时刻list
lastDate.split(",").foreach(it => dateList.append(it))
}else{
// 添加进时刻列表
dateList.append(lastDate)
}
// 循环同步每天的数据
dateList.foreach(it => loadData(spark, properties, it))
}
def main(args: Array[String]): Unit = {
job() {
val sparkAndProperties = SparkUtils.get()
val spark = sparkAndProperties.spark
val properties = sparkAndProperties.properties
// 调度使命
etl(spark,properties)
}
}
}
删去Partition的代码示例:
object DropPartitionTools {
/**
* 删去指定的Partition
* @param SparkSession实例
* @param database数据库称号
* @param table表称号
* @param partitionKey 分区字段的称号
* @param partitionValue 具体的分区值
*/
def dropPartitionIfExists(spark: SparkSession, database: String, table: String, partitionKey: String, partitionValue:String): Unit ={
val df = spark.sql(
s"""
| show tables in ${database} like '${table}'
|""".stripMargin)
if(df.count() > 0 ){
// 表存在,删去分区
spark.sql(
s"""
|ALTER TABLE ${database}.${table} DROP IF EXISTS PARTITION (${partitionKey}='${partitionValue}')
|""".stripMargin)
}
}
/**
* 删去Partition
* @param SparkSession实例
* @param database数据库称号
* @param table表称号
* @param partitionKey 分区字段的称号
*/
def dropHistoryPartitionIfExists(spark: SparkSession, database: String, table: String, partitionKey: String): Unit ={
val df = spark.sql(
s"""
| show tables in ${database} like '${table}'
|""".stripMargin)
if(df.count() > 0 ){
// 表存在,删去前史分区,获取8天前的日期
val sevenDay = DateUtils.getSomeLastDayString(8);
spark.sql(
s"""
|ALTER TABLE ${database}.${table} DROP IF EXISTS PARTITION (${partitionKey} ='${sevenDay}')
|""".stripMargin)
}
}
}
从RDS同步数据到HIVE的代码示例:
object MysqlToHiveTools {
/**
* 从mysql抽取数据到hive -- 全量
* @param spark spark实例
* @param dataSource 数据库装备信息
* @param tableName 抽取的数据库表名
* @param destTableName 方针表名
* @param mode 抽取的形式
*/
def mysqlToHiveTotal(spark: SparkSession, dataSource: JSONObject,tableName: String, destTableName:String,mode: SaveMode, partition: String): Unit = {
val sql = "(select * from " + tableName + ") as t"
mysqlToHive(spark, dataSource, sql, destTableName, mode, partition)
}
/**
* 从mysql抽取数据到hive -- 增量量
* @param spark spark实例
* @param dataSource 数据库装备信息
* @param sql 抽取数据的SQL
* @param destTableName 方针表名
* @param mode 抽取的形式
*/
def readFromMysqlIncrement(spark: SparkSession, dataSource: JSONObject,sql: String, destTableName:String,mode: SaveMode, partition: String): Unit = {
mysqlToHive(spark, dataSource, sql, destTableName, mode, partition)
}
/**
* 真实的抽取数据
* @param spark spark实例
* @param properties 数据库装备信息
* @param sql 抽取数据的SQL
* @param destTableName 方针表名
* @param mode 抽取的形式
*/
def mysqlToHive(spark: SparkSession, dataSource: JSONObject,sql: String, destTableName:String, mode: SaveMode, partition: String):Unit={
val df = spark.read.format("jdbc")
.option("url",dataSource.getString("url"))
.option("driver",dataSource.getString("driver"))
.option("fetchSize", 10000)
.option("numPartitions",2)
.option("dbtable",s"(${sql}) AS t")
.option("user",dataSource.getString("user"))
.option("password",dataSource.getString("password"))
.load()
if(partition == null || partition.isEmpty){
df.write.format("parquet").mode(mode).saveAsTable(destTableName)
}else{
df.write.format("parquet").mode(mode).partitionBy("ds").saveAsTable(destTableName)
}
}
}
Spark Application代码示例
trait SparkHivePartitionOverwriteApplication extends Logging{
def getProperties(): Properties ={
val prop:Properties = new Properties()
val inputStream = this.getClass.getClassLoader.getResourceAsStream("config.properties")
prop.load(inputStream);
prop
}
def job(appName: String = null,
master: String = null)(biz: => Unit): Unit = {
var spark: SparkSession = null
System.setProperty("HADOOP_USER_NAME", "mapred")
val prop:Properties = getProperties()
if (null == appName) {
spark = SparkSession.builder
.config("spark.sql.parquet.writeLegacyFormat", true)
.config("spark.sql.sources.partitionOverwriteMode","dynamic")
.config("hive.exec.dynamic.partition.mode","nonstrict")
.config("spark.sql.hive.convertMetastoreParquet",false)
.enableHiveSupport
.getOrCreate
var sparkAndProperties = SparkAndProperties(spark, prop)
SparkUtils.set(sparkAndProperties)
} else {
spark = SparkSession.builder.master(master).appName(appName)
.config("spark.sql.parquet.writeLegacyFormat", true)
.config("spark.sql.sources.partitionOverwriteMode","dynamic")
.config("hive.exec.dynamic.partition.mode","nonstrict")
.config("spark.sql.hive.convertMetastoreParquet",false)
.config("spark.testing.memory","2147480000")
.config("spark.driver.memory","2147480000")
.enableHiveSupport.getOrCreate
var sparkAndProperties = SparkAndProperties(spark, prop)
SparkUtils.set(sparkAndProperties)
SparkUtils.set(sparkAndProperties)
}
biz
spark.stop()
SparkUtils.remove()
}
}
case class SparkAndProperties(spark: SparkSession,
properties: Properties)
四、配套生态
- 自界说UDF函数
在运用的进程中,需求将表中的IP地址,解析为地点地的称号,这需求调用第三方的一个服务接口来完结,为了完结这个使命,界说了一个自界说UDF函数,进行解析。
a. 自界说UDF函数
object ParseIp {
def evaluate(ip: String):String= {
// 具体的IP解析服务
SplitAddress.getPlaceFromIp(ip)
}
}
b. 运用自界说UDF函数
object TraceTmpEtl extends SparkHivePartitionOverwriteApplication{
/**
* 数据同步使命
* @param spark sparkSession实例
* @param properties 数据库装备
* @param date 日期
*/
def tmp_t_trace_user_visit_real_time_statistic(spark: SparkSession,properties:Properties,date: String):Unit ={
// 获取数据库装备的数据库称号
val odsDatabaseName = properties.get("hive.datasource.ods")
val tmpDatabaseName = properties.get("hive.datasource.tmp")
// 注册自界说的UDF函数
spark.udf.register("parseIP", (ip: String) => SplitAddress.getPlaceFromIp(ip))
// 在Spark SQL中运用UDF函数
spark.sql(
s"""
|INSERT OVERWRITE TABLE ${tmpDatabaseName}.tmp_t_statistic partition(ds='${date}')
|select
| `id` ,
| `create_time` ,
| `update_time` ,
| `ip` ,
| replace( replace( replace(replace( case when parseIP(ip) rlike '^我国' then replace(parseIP(ip),'我国','')
| when parseIP(ip) rlike '^内蒙古' then replace(parseIP(ip),'内蒙古','内蒙古自治区')
| when parseIP(ip) rlike '^广西' then replace(parseIP(ip),'广西','广西壮族自治区')
| when parseIP(ip) rlike '^西藏' then replace(parseIP(ip),'西藏','西藏自治区')
| when parseIP(ip) rlike '^宁夏' then replace(parseIP(ip),'宁夏','宁夏回族自治区')
| when parseIP(ip) rlike '^新疆' then replace(parseIP(ip),'新疆','新疆维吾尔自治区')
| when parseIP(ip) rlike '^香港' then replace(parseIP(ip),'香港','香港特别行政区')
| when parseIP(ip) rlike '^澳门' then replace(parseIP(ip),'澳门','澳门特别行政区')
| else parseIP(ip) end, "省", "省."),"市", "市."),"县", "县."),"区", "区.") as ip_place,
| `page_view`
|from ${odsDatabaseName}.ods_t_statistic where ds ='${date}'
|""".stripMargin)
}
/**
* 数据etl
* @param spark SparkSession实例
* @param properties 数据库装备
*/
def etl(spark: SparkSession, properties:Properties): Unit = {
val lastDate = DateUtils.getLastDayString
tmp_t_trace_user_visit_real_time_statistic(spark,properties, lastDate)
}
def main(args: Array[String]): Unit = {
job() {
val sparkAndProperties = SparkUtils.get()
val spark = sparkAndProperties.spark
val properties = sparkAndProperties.properties
etl(spark,properties)
}
}
}
- 数据库的装备安全性问题
刚开端数据库装备同步装备文件直接写死,可是后续发现这样存在一些安全性的问题,后来选用将数据库相关的装备组合为一个JSON字符串,将其加密之后保存到MongoDB中,在运用时进行查询解密。
public class DataSourceUtils {
private static Logger logger = LoggerFactory.getLogger(DataSourceUtils.class);
public static JSONObject getDataSourceProperties(String dataSourceKey,Properties properties){
List<ServerAddress> adds = new ArrayList<>();
try {
String filePath = properties.getProperty("spark.mongo.properties.file.url");
properties = new Properties();
File file = new File(filePath);
FileInputStream inputStream = null;
inputStream = new FileInputStream(file);
properties.load(inputStream);
}catch (Exception e){
logger.info("not load file, reason:" + e.getMessage());
e.printStackTrace();
}
String mongoUrl = properties.getProperty("mongo_url");
String mongoPort = properties.getProperty("mongo_port");
String mongoDbName = properties.getProperty("mongo_dbName");
String mongoCollect = properties.getProperty("mongo_collect");
String mongoUser = properties.getProperty("mongo_user");
String mongoPassword = properties.getProperty("mongo_password");
String desKey = properties.getProperty("data_des_key");
ServerAddress serverAddress = new ServerAddress(mongoUrl, Integer.parseInt(mongoPort));
adds.add(serverAddress);
List<MongoCredential> credentials = new ArrayList<>();
MongoCredential mongoCredential = MongoCredential.createScramSha1Credential(mongoUser, mongoDbName, mongoPassword.toCharArray());
credentials.add(mongoCredential);
MongoClient mongoClient = new MongoClient(adds, credentials);
MongoDatabase mongoDatabase = mongoClient.getDatabase(mongoDbName);
MongoCollection<Document> collection = mongoDatabase.getCollection(mongoCollect);
//指定查询过滤器
Bson filter = Filters.eq("key", dataSourceKey);
//指定查询过滤器查询
FindIterable findIterable = collection.find(filter);
//取出查询到的第一个文档
Document document = (Document) findIterable.first();
//打印输出
String content = DESUtil.decrypt(desKey, document.getString("content"));
return JSON.parseObject(content);
}
public static Properties json2Properties(JSONObject jsonObject){
String tmpKey = "";
String tmpKeyPre = "";
Properties properties = new Properties();
j2p(jsonObject, tmpKey, tmpKeyPre, properties);
return properties;
}
private static void j2p(JSONObject jsonObject, String tmpKey, String tmpKeyPre, Properties properties){
for (String key : jsonObject.keySet()) {
// 取得key
String value = jsonObject.getString(key);
try {
JSONObject jsonStr = JSONObject.parseObject(value);
tmpKeyPre = tmpKey;
tmpKey += key + ".";
j2p(jsonStr, tmpKey, tmpKeyPre, properties);
tmpKey = tmpKeyPre;
} catch (Exception e) {
properties.put(tmpKey + key, value);
System.out.println(tmpKey + key + "=" + value);
}
}
}
public static void main(String[] args) {
}
}
- Spark使命脚本示例
#!/bin/sh
##### env ###########
export JAVA_HOME=/usr/java/jdk1.8.0_151
export SPARK_HOME=/opt/cloudera/parcels/CDH/lib/spark
export PATH=${JAVA_HOME}/bin:${SPARK_HOME}/bin:${PATH}
export SPARK_USER=hadoop
export HADOOP_USER_NAME=hadoop
LAST_DAY="$1"
echo LAST_DAY
spark-submit \
--class net.app315.bigdata.operatereport.ods.MarketMysqlToHiveEtl \
--conf spark.sql.hive.metastore.version=2.1.1 \
--conf spark.sql.hive.metastore.jars=/opt/cloudera/parcels/CDH/lib/hive/lib/* \
--jars /opt/cloudera/parcels/CDH/lib/spark/jars/mysql-connector-java-5.1.48.jar,/opt/cloudera/parcels/CDH/lib/spark/jars/druid-1.1.10.jar \
--master yarn \
--deploy-mode cluster \
--executor-memory 4G \
--driver-memory 2G \
--num-executors 4 \
--executor-cores 2 \
--conf spark.dynamicAllocation.minExecutors=1 \
--conf spark.dynamicAllocation.maxExecutors=8 \
--conf spark.yarn.am.attemptFailuresValidityInterval=1h \
--conf spark.yarn.max.executor.failures=128 \
--conf spark.yarn.executor.failuresValidityInterval=1h \
--conf spark.task.maxFailures=4 \
--conf spark.yarn.maxAppAttempts=2 \
--conf spark.scheduler.mode=FIFO \
--conf spark.network.timeout=420000 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.executor.heartbeatInterval=360000 \
--conf spark.sql.crossJoin.enabled=true \
--conf spark.mongo.properties.file.url=/opt/conf/mongo.properties \
--conf spark.etl.last.day="${LAST_DAY}" \
./target/spark-operate-report-project-1.0.jar
- Job使命脚本实例
nodes:
- name: bigdata_market_ods_etl
type: command
config:
command: sh -x ./script/bigdata_market_ods_etl.sh "${spark.etl.last.day}"
failure.emails: [email protected]
- name: bigdata_market_dim_etl
type: command
config:
command: sh -x ./script/bigdata_market_dim_etl.sh "${spark.etl.last.day}"
failure.emails: [email protected]
dependsOn:
- bigdata_market_ods_etl
- name: bigdata_market_dw_etl
type: command
config:
command: sh -x ./script/bigdata_market_dw_etl.sh "${spark.etl.last.day}"
failure.emails: [email protected]
dependsOn:
- bigdata_market_dim_etl
- bigdata_user_dw_etl
五、补白
- Davinci报表 一个开源的报表渠道
第二代-根据DolphinScheduler的离线数据同步
一、布景
自从前次开端运用根据Hadoop的大数据表现计划之后,事务平稳开展,可是跟着时刻的推移,新的问题开端呈现,首要呈现的问题为两个:
- 数据的改变越来越频频,根据之前SparkSQL使命的方法,只需需求对表结构进行改变,就需求从头修正Scala代码,然后从头进行使命的打包,这关于一些不熟悉代码的人来说,不太友爱,并且本钱也很高。
- 尽管运用了Presto对HIVE的数据查询进行了加快,可是地点数据量越来越大,剖析要求越来越杂乱,即席查询越来越多,因为集群自身资源有限,查询才能呈现了明显瓶颈。
二、数据同步架构
跟着技能的开展现已对大数据的知道,触摸到了更多的大数据相关的常识与组件,根据此,经过仔细剖析与考虑之后,对数据的同步计划进行了如下的从头规划。
- 数据存储与查询抛弃了HDFS+HIVE+Presto的组合,转而选用现代化的MPP数据库StarRocks,StarRocks在数据查询的功率层面十分优异,在相同资源的状况下,能够处理现在遇到的数据查询瓶颈。
- 数据同步抛弃了SparkSQL,转而选用愈加轻量级的DATAX来进行,其只需求经过简略的装备,即可完结数据的同步,一起其也支撑StarRocks Writer,开发人员只需求具有简略的SQL常识,就能够完结整个数据同步使命的装备,难度大大下降,功率大大提高,友爱度大大提高。
- 守时使命调度抛弃Azkaban,选用现代化的使命调度作业Apache DolphinScheduler,经过可视化的页面进行调度使命作业流的装备,愈加友爱。
三、数据同步的具体流程
数据同步在这种方法下变化十分简略,只需求可视化的装备DataX使命,即可主动调度。下面的一个使命的装备示例
{
"job": {
"setting": {
"speed": {
"channel":1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "",
"password": "",
"connection": [
{
"querySql": [
"SELECT CustomerId AS customer_id FROM base_info.base_customer where date(UpdateTime) > '${sdt}' and date(UpdateTime) < '${edt}'"
],
"jdbcUrl": [
"jdbc:mysql://IP:3306/base_info?characterEncoding=utf-8&useSSL=false&tinyInt1isBit=false"
]
}
]
}
},
"writer": {
"name": "starrockswriter",
"parameter": {
"username": "xxx",
"password": "xxx",
"database": "ods_cjm_test",
"table": "ods_base_customer",
"column": ["id"],
"preSql": [],
"postSql": [],
"jdbcUrl": "jdbc:mysql://IP:9030/",
"loadUrl": ["IP:8050", "IP:8050", "IP:8050"],
"loadProps": {
"format": "json",
"strip_outer_array": true
}
}
}
}
]