当前位置:首页 > 数据库 > 正文内容

离线数据同步变迁

邻居的猫1个月前 (12-09)数据库1902

第一代-根据Hadoop体系的离线数据同步

一、布景

跟着事务的开展,体系进行了微服务的差分,导致数据越来越涣散,很难进行一个完好的生命周期的数据查询,关于某些事务的需求支撑变得越来越难,越来越杂乱,也越来越难以进行责任区分。对着事务的开展,数据量越来越大之后,为了杰出的事务支撑,进行了分库分表,分库分表规矩形形色色,一旦脱离了事务逻辑,很难确认某一条数据在哪个库哪个表。

根据这样的问题和状况,为了满意事务需求,很天然的就想到了运用大数据服务,将事务数据归集到一同,树立完好的数据仓库,便于数据的查询。

二、数据同步架构

为了寻求简略和通用,因为自身的知道现在,挑选了最规范的大数据架构,即根据Hadoop的大数据表现。整个集群选用三节点,经过CDH进行集群的布置和保护。

整个数据链路为:

经过Azkaban调用Spark运用,将数据从RDS同步到Hive,运营渠道和报表体系选用Presto加快拜访Hive的数据。

三、数据同步具体进程

数据同步选用Spark使命来进行,将使命打包之后,上传到Azkaban调度渠道,运用Azkaban进行守时调度,完结T+1级其他数据同步作业。

数据同步代码示例:

object MarketMysqlToHiveEtl extends SparkHivePartitionOverwriteApplication{


  /**
   * 删去已存在的分区
   *
   * @param spark SparkSessions实例
   * @param date 日期
   * @param properties 数据库装备
   */
  def delete_partition(spark: SparkSession, properties:Properties, date: String):Unit={
    val odsDatabaseName = properties.getProperty("hive.datasource.ods")
    DropPartitionTools
     .dropPartitionIfExists(spark,odsDatabaseName,"ods_t_money_record","ds",date)
    DropPartitionTools
     .dropPartitionIfExists(spark,odsDatabaseName,"ods_t_account","ds",date)
  }



  /**
   * 抽取数据
   * @param spark SparkSession实例
   * @param properties 数据库装备
   * @param date 日期
   */
  def loadData(spark: SparkSession, properties:Properties, date: String): Unit ={
    // 删去前史数据,处理重复同步问题
    delete_partition(spark,properties,date)

    // 获取数据源装备
    val odsDatabaseName = properties.get("hive.datasource.ods")
    val dataSource = DataSourceUtils.getDataSourceProperties(FinalCode.MARKET_MYSQL_FILENAME,properties)

    var sql = s"select id,account_id,type,original_id,original_code,money,reason,user_type,user_id,organization_id," +
    s"create_time,update_time,detail,deleted,parent_id,counts,'${date}' AS ds from TABLENAME where date(update_time) ='${date}'"

    // 同步数据
    MysqlToHiveTools.readFromMysqlIncrement(spark,dataSource,sql.replace("TABLENAME","t_money_record"),
                                            s"${odsDatabaseName}.ods_t_money_record",SaveMode.Append,"ds")


    sql = s"select id,code,customer_code,name,mobile,type,organization_id,organization_name,create_time,update_time,deleted,status,customer_name," +
    s"customer_id,channel_type,nike_name,version,register_Time,'${date}' AS ds from TABLENAME where date(update_time) ='${date}'"
    MysqlToHiveTools.readFromMysqlIncrement(spark,dataSource,sql.replace("TABLENAME","t_account"),
                                            s"${odsDatabaseName}.ods_t_account",SaveMode.Append,"ds")
  }



  /**
   * 数据etl
   * @param spark SparkSession实例
   * @param SparkSession 数据库装备
   */
  def etl(spark: SparkSession, properties:Properties): Unit = {
    val sparkConf = spark.sparkContext.getConf
    // 获取同步的日期
    var lastDate = sparkConf.get("spark.etl.last.day", DateUtils.getLastDayString)
    val dateList = new  ListBuffer[String]()
    if(lastDate.isEmpty){
      // 未装备,设置为前一天
      lastDate = DateUtils.getLastDayString
    }
    if(lastDate.contains("~")){
      // 如果是时刻段,获取时刻段中的每一天,解析为时刻list
      val dateArray = lastDate.split("~")
      DateUtils.findBetweenDates(dateArray(0), dateArray(1)).foreach(it => dateList.append(it))
    }else if(lastDate.contains(",")){
      // 如果是运用,分隔的多个日期,解析为时刻list
      lastDate.split(",").foreach(it => dateList.append(it))
    }else{
      // 添加进时刻列表
      dateList.append(lastDate)
    }
    // 循环同步每天的数据
    dateList.foreach(it =>  loadData(spark, properties, it))
  }


  def main(args: Array[String]): Unit = {
    job() {
      val sparkAndProperties = SparkUtils.get()
      val spark = sparkAndProperties.spark
      val properties = sparkAndProperties.properties
      // 调度使命
      etl(spark,properties)
    }
  }
}

删去Partition的代码示例:

object DropPartitionTools {


  /**
   * 删去指定的Partition
   * @param SparkSession实例
   * @param database数据库称号
   * @param table表称号
   * @param partitionKey 分区字段的称号
   * @param partitionValue 具体的分区值
   */
  def dropPartitionIfExists(spark: SparkSession, database: String, table: String, partitionKey: String, partitionValue:String): Unit ={

     val df = spark.sql(
       s"""
         | show tables in ${database} like '${table}'
         |""".stripMargin)

    if(df.count() > 0 ){
      // 表存在,删去分区
      spark.sql(
        s"""
           |ALTER TABLE  ${database}.${table} DROP  IF EXISTS  PARTITION (${partitionKey}='${partitionValue}')
           |""".stripMargin)
    }
  }


  /**
   * 删去Partition
   * @param SparkSession实例
   * @param database数据库称号
   * @param table表称号
   * @param partitionKey 分区字段的称号
   */
  def dropHistoryPartitionIfExists(spark: SparkSession, database: String, table: String, partitionKey: String): Unit ={

    val df = spark.sql(
      s"""
         | show tables in ${database} like '${table}'
         |""".stripMargin)

    if(df.count() > 0 ){
      // 表存在,删去前史分区,获取8天前的日期
      val sevenDay = DateUtils.getSomeLastDayString(8);
      spark.sql(
        s"""
           |ALTER TABLE  ${database}.${table} DROP  IF EXISTS  PARTITION (${partitionKey} ='${sevenDay}')
           |""".stripMargin)
    }
  }

}

从RDS同步数据到HIVE的代码示例:

object MysqlToHiveTools {


  /**
   * 从mysql抽取数据到hive -- 全量
   * @param spark spark实例
   * @param dataSource 数据库装备信息
   * @param tableName 抽取的数据库表名
   * @param destTableName 方针表名
   * @param mode 抽取的形式
   */
  def mysqlToHiveTotal(spark: SparkSession, dataSource: JSONObject,tableName: String, destTableName:String,mode: SaveMode, partition: String): Unit = {
     val sql = "(select * from " + tableName + ") as t"
     mysqlToHive(spark, dataSource, sql, destTableName, mode, partition)
  }


  /**
   * 从mysql抽取数据到hive -- 增量量
   * @param spark spark实例
   * @param dataSource 数据库装备信息
   * @param sql 抽取数据的SQL
   * @param destTableName 方针表名
   * @param mode 抽取的形式
   */
  def readFromMysqlIncrement(spark: SparkSession, dataSource: JSONObject,sql: String, destTableName:String,mode: SaveMode, partition: String): Unit = {
    mysqlToHive(spark, dataSource, sql, destTableName, mode, partition)
  }


  /**
   * 真实的抽取数据
   * @param spark spark实例
   * @param properties 数据库装备信息
   * @param sql 抽取数据的SQL
   * @param destTableName 方针表名
   * @param mode 抽取的形式
   */
  def mysqlToHive(spark: SparkSession, dataSource: JSONObject,sql: String, destTableName:String, mode: SaveMode, partition: String):Unit={
    val df = spark.read.format("jdbc")
      .option("url",dataSource.getString("url"))
      .option("driver",dataSource.getString("driver"))
      .option("fetchSize", 10000)
      .option("numPartitions",2)
      .option("dbtable",s"(${sql}) AS t")
      .option("user",dataSource.getString("user"))
      .option("password",dataSource.getString("password"))
      .load()
    if(partition == null || partition.isEmpty){
      df.write.format("parquet").mode(mode).saveAsTable(destTableName)
    }else{
      df.write.format("parquet").mode(mode).partitionBy("ds").saveAsTable(destTableName)
    }
  }
}

Spark Application代码示例

trait SparkHivePartitionOverwriteApplication extends Logging{


  def getProperties(): Properties ={
    val prop:Properties = new Properties()
    val inputStream = this.getClass.getClassLoader.getResourceAsStream("config.properties")
    prop.load(inputStream);
    prop
  }

  def job(appName: String = null,
          master: String = null)(biz: => Unit): Unit = {
    var spark: SparkSession = null
    System.setProperty("HADOOP_USER_NAME", "mapred")
    val prop:Properties = getProperties()
    if (null == appName) {
      spark = SparkSession.builder
        .config("spark.sql.parquet.writeLegacyFormat", true)
        .config("spark.sql.sources.partitionOverwriteMode","dynamic")
        .config("hive.exec.dynamic.partition.mode","nonstrict")
        .config("spark.sql.hive.convertMetastoreParquet",false)
        .enableHiveSupport
        .getOrCreate
      var sparkAndProperties = SparkAndProperties(spark, prop)
      SparkUtils.set(sparkAndProperties)
    } else {
      spark = SparkSession.builder.master(master).appName(appName)
        .config("spark.sql.parquet.writeLegacyFormat", true)
        .config("spark.sql.sources.partitionOverwriteMode","dynamic")
        .config("hive.exec.dynamic.partition.mode","nonstrict")
        .config("spark.sql.hive.convertMetastoreParquet",false)
        .config("spark.testing.memory","2147480000")
        .config("spark.driver.memory","2147480000")
        .enableHiveSupport.getOrCreate
      var sparkAndProperties = SparkAndProperties(spark, prop)
      SparkUtils.set(sparkAndProperties)
      SparkUtils.set(sparkAndProperties)
    }
    biz
    spark.stop()
    SparkUtils.remove()
  }

}

case class SparkAndProperties(spark: SparkSession,
                              properties: Properties)

四、配套生态

  1. 自界说UDF函数

在运用的进程中,需求将表中的IP地址,解析为地点地的称号,这需求调用第三方的一个服务接口来完结,为了完结这个使命,界说了一个自界说UDF函数,进行解析。

a. 自界说UDF函数

object ParseIp  {
    def evaluate(ip: String):String= {
      // 具体的IP解析服务
      SplitAddress.getPlaceFromIp(ip)
   }
}

b. 运用自界说UDF函数

object TraceTmpEtl extends SparkHivePartitionOverwriteApplication{

  /**
   * 数据同步使命
   * @param spark sparkSession实例
   * @param properties 数据库装备
   * @param date 日期
   */
  def tmp_t_trace_user_visit_real_time_statistic(spark: SparkSession,properties:Properties,date: String):Unit ={
    // 获取数据库装备的数据库称号
    val odsDatabaseName = properties.get("hive.datasource.ods")
    val tmpDatabaseName = properties.get("hive.datasource.tmp")

    // 注册自界说的UDF函数
    spark.udf.register("parseIP", (ip: String) => SplitAddress.getPlaceFromIp(ip))
    // 在Spark SQL中运用UDF函数
    spark.sql(
      s"""
         |INSERT OVERWRITE TABLE ${tmpDatabaseName}.tmp_t_statistic partition(ds='${date}')
         |select
         |	  `id` ,
         |	  `create_time` ,
         |	  `update_time` ,
         |	  `ip` ,
         |      replace( replace( replace(replace( case when parseIP(ip) rlike '^我国' then replace(parseIP(ip),'我国','')
         |          when parseIP(ip) rlike '^内蒙古' then replace(parseIP(ip),'内蒙古','内蒙古自治区')
         |          when parseIP(ip) rlike '^广西' then replace(parseIP(ip),'广西','广西壮族自治区')
         |          when parseIP(ip) rlike '^西藏' then replace(parseIP(ip),'西藏','西藏自治区')
         |          when parseIP(ip) rlike '^宁夏' then replace(parseIP(ip),'宁夏','宁夏回族自治区')
         |          when parseIP(ip) rlike '^新疆' then replace(parseIP(ip),'新疆','新疆维吾尔自治区')
         |          when parseIP(ip) rlike '^香港' then replace(parseIP(ip),'香港','香港特别行政区')
         |          when parseIP(ip) rlike '^澳门' then replace(parseIP(ip),'澳门','澳门特别行政区')
         |     else parseIP(ip) end, "省", "省."),"市", "市."),"县", "县."),"区", "区.") as ip_place,
         |	  `page_view` 
         |from ${odsDatabaseName}.ods_t_statistic where ds ='${date}'
         |""".stripMargin)
  }

  /**
   * 数据etl
   * @param spark SparkSession实例
   * @param properties 数据库装备
   */
  def etl(spark: SparkSession, properties:Properties): Unit = {
    val lastDate = DateUtils.getLastDayString
    tmp_t_trace_user_visit_real_time_statistic(spark,properties, lastDate)
  }


  
  def main(args: Array[String]): Unit = {
    job() {
      val sparkAndProperties = SparkUtils.get()
      val spark = sparkAndProperties.spark
      val properties = sparkAndProperties.properties
      etl(spark,properties)
    }
  }
}

  1. 数据库的装备安全性问题

刚开端数据库装备同步装备文件直接写死,可是后续发现这样存在一些安全性的问题,后来选用将数据库相关的装备组合为一个JSON字符串,将其加密之后保存到MongoDB中,在运用时进行查询解密。

public class DataSourceUtils {

    private  static Logger logger = LoggerFactory.getLogger(DataSourceUtils.class);

    public static JSONObject getDataSourceProperties(String dataSourceKey,Properties properties){
        List<ServerAddress> adds = new ArrayList<>();
        try {
            String filePath = properties.getProperty("spark.mongo.properties.file.url");
            properties = new Properties();
            File file = new File(filePath);
            FileInputStream inputStream = null;
             inputStream = new FileInputStream(file);
            properties.load(inputStream);
        }catch (Exception e){
            logger.info("not load file, reason:" + e.getMessage());
            e.printStackTrace();
        }
        String mongoUrl = properties.getProperty("mongo_url");
        String mongoPort = properties.getProperty("mongo_port");
        String mongoDbName = properties.getProperty("mongo_dbName");
        String mongoCollect = properties.getProperty("mongo_collect");
        String mongoUser = properties.getProperty("mongo_user");
        String mongoPassword = properties.getProperty("mongo_password");
        String desKey = properties.getProperty("data_des_key");
        ServerAddress serverAddress = new ServerAddress(mongoUrl, Integer.parseInt(mongoPort));
        adds.add(serverAddress);
        List<MongoCredential> credentials = new ArrayList<>();
        MongoCredential mongoCredential = MongoCredential.createScramSha1Credential(mongoUser, mongoDbName, mongoPassword.toCharArray());
        credentials.add(mongoCredential);
        MongoClient mongoClient = new MongoClient(adds, credentials);
        MongoDatabase mongoDatabase = mongoClient.getDatabase(mongoDbName);
        MongoCollection<Document> collection = mongoDatabase.getCollection(mongoCollect);
        //指定查询过滤器
        Bson filter = Filters.eq("key", dataSourceKey);
        //指定查询过滤器查询
        FindIterable findIterable = collection.find(filter);
        //取出查询到的第一个文档
        Document document = (Document) findIterable.first();
        //打印输出
        String content = DESUtil.decrypt(desKey, document.getString("content"));
        return JSON.parseObject(content);
    }


    public static  Properties json2Properties(JSONObject jsonObject){
        String tmpKey = "";
        String tmpKeyPre = "";
        Properties properties = new Properties();
        j2p(jsonObject, tmpKey, tmpKeyPre, properties);
        return properties;
    }



    private static void j2p(JSONObject jsonObject, String tmpKey, String tmpKeyPre, Properties properties){
        for (String key : jsonObject.keySet()) {
            // 取得key
            String value = jsonObject.getString(key);
            try {
                JSONObject jsonStr = JSONObject.parseObject(value);
                tmpKeyPre = tmpKey;
                tmpKey += key + ".";
                j2p(jsonStr, tmpKey, tmpKeyPre, properties);
                tmpKey = tmpKeyPre;
            } catch (Exception e) {
                properties.put(tmpKey + key, value);
                System.out.println(tmpKey + key + "=" + value);
            }
        }
    }
    public static void main(String[] args) {

    }
}

  1. Spark使命脚本示例
#!/bin/sh

##### env ###########
export JAVA_HOME=/usr/java/jdk1.8.0_151
export SPARK_HOME=/opt/cloudera/parcels/CDH/lib/spark
export PATH=${JAVA_HOME}/bin:${SPARK_HOME}/bin:${PATH}
export SPARK_USER=hadoop
export HADOOP_USER_NAME=hadoop
LAST_DAY="$1"
echo LAST_DAY

spark-submit \
--class net.app315.bigdata.operatereport.ods.MarketMysqlToHiveEtl \
--conf spark.sql.hive.metastore.version=2.1.1 \
--conf spark.sql.hive.metastore.jars=/opt/cloudera/parcels/CDH/lib/hive/lib/* \
--jars /opt/cloudera/parcels/CDH/lib/spark/jars/mysql-connector-java-5.1.48.jar,/opt/cloudera/parcels/CDH/lib/spark/jars/druid-1.1.10.jar \
--master yarn \
--deploy-mode cluster \
--executor-memory 4G \
--driver-memory 2G \
--num-executors 4 \
--executor-cores 2 \
--conf spark.dynamicAllocation.minExecutors=1 \
--conf spark.dynamicAllocation.maxExecutors=8 \
--conf spark.yarn.am.attemptFailuresValidityInterval=1h \
--conf spark.yarn.max.executor.failures=128 \
--conf spark.yarn.executor.failuresValidityInterval=1h \
--conf spark.task.maxFailures=4 \
--conf spark.yarn.maxAppAttempts=2 \
--conf spark.scheduler.mode=FIFO \
--conf spark.network.timeout=420000 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.executor.heartbeatInterval=360000 \
--conf spark.sql.crossJoin.enabled=true \
--conf spark.mongo.properties.file.url=/opt/conf/mongo.properties \
--conf spark.etl.last.day="${LAST_DAY}" \
./target/spark-operate-report-project-1.0.jar
  1. Job使命脚本实例
nodes:

  - name: bigdata_market_ods_etl
    type: command
    config:
      command: sh -x ./script/bigdata_market_ods_etl.sh "${spark.etl.last.day}"
      failure.emails: [email protected]

  - name: bigdata_market_dim_etl
    type: command
    config:
      command: sh -x ./script/bigdata_market_dim_etl.sh "${spark.etl.last.day}"
      failure.emails: [email protected]
    dependsOn:
          - bigdata_market_ods_etl
          
  - name: bigdata_market_dw_etl
    type: command
    config:
      command: sh -x ./script/bigdata_market_dw_etl.sh "${spark.etl.last.day}"
      failure.emails: [email protected]
    dependsOn:
          - bigdata_market_dim_etl
          - bigdata_user_dw_etl

五、补白

  1. Davinci报表 一个开源的报表渠道

第二代-根据DolphinScheduler的离线数据同步

一、布景

自从前次开端运用根据Hadoop的大数据表现计划之后,事务平稳开展,可是跟着时刻的推移,新的问题开端呈现,首要呈现的问题为两个:

  1. 数据的改变越来越频频,根据之前SparkSQL使命的方法,只需需求对表结构进行改变,就需求从头修正Scala代码,然后从头进行使命的打包,这关于一些不熟悉代码的人来说,不太友爱,并且本钱也很高。
  2. 尽管运用了Presto对HIVE的数据查询进行了加快,可是地点数据量越来越大,剖析要求越来越杂乱,即席查询越来越多,因为集群自身资源有限,查询才能呈现了明显瓶颈。

二、数据同步架构

跟着技能的开展现已对大数据的知道,触摸到了更多的大数据相关的常识与组件,根据此,经过仔细剖析与考虑之后,对数据的同步计划进行了如下的从头规划。

  1. 数据存储与查询抛弃了HDFS+HIVE+Presto的组合,转而选用现代化的MPP数据库StarRocks,StarRocks在数据查询的功率层面十分优异,在相同资源的状况下,能够处理现在遇到的数据查询瓶颈。
  2. 数据同步抛弃了SparkSQL,转而选用愈加轻量级的DATAX来进行,其只需求经过简略的装备,即可完结数据的同步,一起其也支撑StarRocks Writer,开发人员只需求具有简略的SQL常识,就能够完结整个数据同步使命的装备,难度大大下降,功率大大提高,友爱度大大提高。
  3. 守时使命调度抛弃Azkaban,选用现代化的使命调度作业Apache DolphinScheduler,经过可视化的页面进行调度使命作业流的装备,愈加友爱。

三、数据同步的具体流程

数据同步在这种方法下变化十分简略,只需求可视化的装备DataX使命,即可主动调度。下面的一个使命的装备示例

{
  "job": {
    "setting": {
      "speed": {
        "channel":1
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "",
            "password": "",
            "connection": [
              {
                "querySql": [
                  "SELECT CustomerId AS customer_id FROM base_info.base_customer where date(UpdateTime) > '${sdt}' and date(UpdateTime) < '${edt}'"
                ],
                "jdbcUrl": [
                  "jdbc:mysql://IP:3306/base_info?characterEncoding=utf-8&useSSL=false&tinyInt1isBit=false"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "starrockswriter",
          "parameter": {
            "username": "xxx",
            "password": "xxx",
            "database": "ods_cjm_test",
            "table": "ods_base_customer",
            "column": ["id"],
            "preSql": [],
            "postSql": [], 
            "jdbcUrl": "jdbc:mysql://IP:9030/",
            "loadUrl": ["IP:8050", "IP:8050", "IP:8050"],
            "loadProps": {
              "format": "json",
              "strip_outer_array": true
            }
          }
        }
      }
        ]
  

扫描二维码推送至手机访问。

版权声明:本文由51Blog发布,如需转载请注明出处。

本文链接:https://www.51blog.vip/?id=526

分享给朋友:

“离线数据同步变迁” 的相关文章

【GreatSQL优化器-03】查询开支预算

【GreatSQL优化器-03】查询开支预算

【GreatSQL优化器-03】查询开支预算 一、cost和read_time介绍 GreatSQL的优化器在创立履行计划的时分是依据每张表的行数和数据散布以及读数据硬盘耗费等信息来判别先查询哪张表后查询哪张表,要不要运用索引,这些表资源信息就被称为cost,俗称为"开支"。在这之前现已履行了upd...

时序数据库排名,性能与功能的较量

时序数据库排名,性能与功能的较量

根据多个来源的信息,以下是时序数据库的排名和性能评测情况: 全球时序数据库排名根据DBEngines的最新排名,以下是2024年4月10日更新的全球时序数据库排名:1. InfluxDB 2013年发布,主要用于存储时间序列数据,适用于物联网、分析和监控软件。2. Prometheus...

mysql进入数据库,轻松掌握数据库访问技巧

在MySQL中,要进入一个数据库,首先需要登录到MySQL服务器。登录成功后,可以使用 `USE` 语句来选择特定的数据库。下面是具体的步骤:1. 登录MySQL服务器: 打开命令行工具,输入以下命令: ``` mysql u username p ``` 这里 `usernam...

备份oracle数据库,深入解析Oracle数据库备份策略与实施

备份oracle数据库,深入解析Oracle数据库备份策略与实施

备份Oracle数据库是一个重要的维护任务,它确保了数据的安全性和可恢复性。以下是备份Oracle数据库的一些基本步骤:1. 确定备份类型: 完全备份:备份整个数据库,包括所有数据文件、控制文件和归档日志。 增量备份:只备份自上次备份以来更改的数据。 差异备份:备份自上次完全备份以...

mysql账号,MySQL账号管理概述

mysql账号,MySQL账号管理概述

MySQL 是一种广泛使用的开源关系数据库管理系统。要创建 MySQL 账号,通常需要遵循以下步骤:1. 安装 MySQL:确保你的系统上已经安装了 MySQL。如果还没有安装,你可以从官方网站下载并安装它。2. 登录 MySQL:打开命令行界面,并输入以下命令来登录 MySQL: ```...

大数据单位,背景与意义

大数据单位,背景与意义

1. 字节(Byte):是计算机存储数据的基本单位,通常用 B 表示。1 字节等于 8 位(bit)。2. 千字节(Kilobyte):简写为 KB,等于 1024 字节。3. 兆字节(Megabyte):简写为 MB,等于 1024 千字节。4. 吉字节(Gigabyte):简写为 GB,等于 1...