kylin的总体概述及总结汇总

Apache Kylin 是什么

Apache Kylin是一个开源的、基于Hadoop生态系统的OLAP引擎(OLAP查询引擎、OLAP多维分析引擎),能够通过SQL接口对十亿、甚至百亿行的超大数据集实现秒级的多维分析查询。

Apache Kylin 核心:Kylin OLAP引擎基础框架,包括元数据引擎,查询引擎,Job(Build)引擎及存储引擎等,同时包括REST服务器以响应客户端请求。

kylin的总体概述及总结汇总

OLAP 是什么

即联机分析处理:以复杂的分析型查询为主,需要扫描,聚合大量数据。

Kylin如何实现超大数据集的秒级多维分析查询

预计算

对于超大数据集的复杂查询,既然现场计算需要花费较长时间,那么根据空间换时间的原理,我们就可以提前将所有可能的计算结果计算并存储下来,从而实现超大数据集的秒级多维分析查询。

Kylin的预计算是如何实现的

将数据源Hive表中的数据按照指定的维度和指标 由计算引擎MapReduce离线计算出所有可能的查询结果(即Cube)存储到HBase中。

Cube 和 Cuboid是什么

简单地说,一个cube就是一个Hive表的数据按照指定维度与指标计算出的所有组合结果。

其中每一种维度组合称为cuboid,一个cuboid包含一种具体维度组合下所有指标的值。

如下图,整个立方体称为1个cube,立方体中每个网格点称为1个cuboid,图中(A,B,C,D)和(A,D)都是cuboid,特别的,(A,B,C,D)称为Base cuboid。cube的计算过程是逐层计算的,首先计算Base cuboid,然后计算维度数依次减少,逐层向下计算每层的cuboid。

kylin的总体概述及总结汇总
kylin的总体概述及总结汇总

Build引擎Cube构建流程

kylin的总体概述及总结汇总

BatchCubingJobBuilder2.build方法逻辑如下:



  public CubingJob build() {
        logger.info("MR_V2 new job to BUILD segment " + seg);       

        final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);

        final String jobId = result.getId();

        final String cuboidRootPath = getCuboidRootPath(jobId);

        // Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables

        // 根据事实表和维表抽取需要的维度和度量,创建一张宽表或平表,并且进行文件再分配(执行Hive命令行来完成操作)

        inputSide.addStepPhase1_CreateFlatTable(result);       

        // Phase 2: Build Dictionary

        // 创建字典由三个子任务完成,由MR引擎完成,分别是抽取维度值(包含抽样统计)、创建维度字典和保存统计信息

        result.addTask(createFactDistinctColumnsStep(jobId));

        result.addTask(createBuildDictionaryStep(jobId));

        result.addTask(createSaveStatisticsStep(jobId));

        // add materialize lookup tables if needed

        LookupMaterializeContext lookupMaterializeContext = addMaterializeLookupTableSteps(result);

        // 创建HTable

        outputSide.addStepPhase2_BuildDictionary(result);

        // Phase 3: Build Cube

        // 构建Cube,包含两种Cube构建算法,分别是逐层算法和快速算法,在执行时会根据源数据的统计信息自动选择一种算法(各个Mapper的小Cube的行数之和 / reduce后的Cube行数 > 7,重复度高就选逐层算法,重复度低就选快速算法)

        addLayerCubingSteps(result, jobId, cuboidRootPath); // layer cubing, only selected algorithm will execute

        addInMemCubingSteps(result, jobId, cuboidRootPath); // inmem cubing, only selected algorithm will execute

        // 构建HFile文件及把HFile文件BulkLoad到HBase

        outputSide.addStepPhase3_BuildCube(result);

        // Phase 4: Update Metadata & Cleanup

        // 更新Cube元数据,其中需要更新的包括cube是否可用、以及本次构建的数据统计,包括构建完成的时间,输入的record数目,输入数据的大小,保存到Hbase中数据的大小等,并将这些信息持久到元数据库中

        // 以及清理临时数据,是在整个执行过程中产生了很多的垃圾文件,其中包括:1、临时的hive表,2、因为hive表是一个外部表,存储该表的文件也需要额外删除,3、fact distinct 这一步将数据写入到HDFS上为建立词典做准备,这时候也可以删除了,4、rowKey统计的时候会生成一个文件,此时可以删除。

        result.addTask(createUpdateCubeInfoAfterBuildStep(jobId, lookupMaterializeContext));

        inputSide.addStepPhase4_Cleanup(result);

        outputSide.addStepPhase4_Cleanup(result);        

        return result;

    }

一、 根据事实表和维表抽取需要的维度和度量,创建一张宽表或平表,并且进行文件再分配

1.1 生成Hive宽表或平表(Create Intermediate Flat Hive Table)(执行Hive命令行)

这一步的操作是根据cube的定义生成原始数据,这里会新创建一个hive外部表,然后再根据cube中定义的星状模型,查询出维度(对于DERIVED类型的维度使用的是外键列)和度量的值插入到新创建的表中,这个表是一个外部表,表的数据文件(存储在HDFS)作为下一个子任务的输入,它首先根据维度中的列和度量中作为参数的列得到需要出现在该表中的列,然后执行三步hive操作,这三步hive操作是通过hive -e的方式执行的shell命令。

1. drop TABLE IF EXISTS xxx

2. CREATE EXTERNAL TABLE IF NOT EXISTS xxx() ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘\177’ STORED AS SEQUENCEFILE LOCATION xxxx,其中表名是根据当前的cube名和segment的uuid生成的,location是当前job的临时文件,只有当insert插入数据的时候才会创建,注意这里每一行的分隔符指定的是’\177’(目前是写死的,十进制为127)。

3. 插入数据,在执行之前需要首先设置一些配置项,这些配置项通过hive的SET命令设置,是根据这个cube的job的配置文件(一般是在kylin的conf目录下)设置的,最后执行的是INSERT OVERWRITE TABLE xxx SELECT xxxx语句,SELECT子句中选出cube星状模型中事实表与维度表按照设置的方式join之后的出现在维度或者度量参数中的列(特殊处理derived列),然后再加上用户设置的where条件和partition的时间条件(根据输入build的参数)。

需要注意的是这里无论用户设置了多少维度和度量,每次join都会使用事实表和所有的维度表进行join,这可能造成不必要的性能损失(多一个join会影响hive性能,毕竟要多读一些文件)。这一步执行完成之后location指定的目录下就有了原始数据的文件,为接下来的任务提供了输入。

JoinedFlatTable.generateDropTableStatement(flatDesc);

JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir);

JoinedFlatTable.generateInsertDataStatement(flatDesc);

二、 提取纬度值、创建维度字典和保存统计信息

2.1 提取事实表维度去重值(Extract Fact Table Distinct Columns)(执行一个MapReduce任务,包含抽取纬度值及统计各Mapper间的重复度两种任务)

    在这一步是根据上一步生成的hive表计算出还表中的每一个出现在事实表中的维度的distinct值,并写入到文件中,它是启动一个MR任务完成的,MR任务的输入是HCatInputFormat,它关联的表就是上一步创建的临时表,这个MR任务的map阶段首先在setup函数中得到所有维度中出现在事实表的维度列在临时表的index,根据每一个index得到该列在临时表中在每一行的值value,然后将<index+value,EMPTY_TEXT>作为mapper的输出,通过index决定由哪个Reduce处理(而Reduce启动的时候根据ReduceTaskID如0000,0001来初始化决定处理哪个index对应的维度列),该任务还启动了一个combiner,它所做的只是对同一个key(维度值)进行去重(同一个mapper的结果),reducer所做的事情也是进行key(维度值)去重(所有mapper的结果),然后在Reduce中将该维度列去重后的维度值一行行的写入到以列名命名的文件中(注意kylin实现的方式,聚合的key是纬度值,而不是index)。

提取事实表维度列的唯一值是通过FactDistinctColumnsJob这个MapReduce来完成,核心思想是每个Reduce处理一个维度列,然后每个维度列Reduce单独输出该维度列对应的去重后的数据文件(output written to baseDir/colName/-r-00000,baseDir/colName2/-r-00001 or 直接输出字典 output written to baseDir/colName/colName.rldict-r-00000)。另外会输出各Mapper间重复度统计文件(output written to baseDir/statistics/statistics-r-00000,baseDir/statistics/statistics-r-00001)

FactDistinctColumnsJob

FactDistinctColumnsMapper

FactDistinctColumnPartitioner

FactDistinctColumnsCombiner

FactDistinctColumnsReducer

org.apache.kylin.engine.mr.steps.FactDistinctColumnsMapper

org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducer

在FactDistinctColumnsMapper中输出维度值或通过HHL近似算法统计每个Mapper中各个CuboID的去重行数

    public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
        Collection<String[]> rowCollection = flatTableInputFormat.parseMapperInput(record);

        for (String[] row : rowCollection) {
            context.getCounter(RawDataCounter.BYTES).increment(countSizeInBytes(row));

            for (int i = 0; i < allCols.size(); i++) {
                String fieldValue = row[columnIndex[i]];

                if (fieldValue == null)

                    continue;

                final DataType type = allCols.get(i).getType();

                if (dictColDeduper.isDictCol(i)) {
                    if (dictColDeduper.add(i, fieldValue)) {
                        // 输出维度值,KEY=COLUMN_INDEX+COLUME_VALUE,VALUE=EMPTY_TEXT

                        writeFieldValue(context, type, i, fieldValue);

                    }

                } else {
                    DimensionRangeInfo old = dimensionRangeInfoMap.get(i);

                    if (old == null) {
                        old = new DimensionRangeInfo(fieldValue, fieldValue);

                        dimensionRangeInfoMap.put(i, old);

                    } else {
                        old.setMax(type.getOrder().max(old.getMax(), fieldValue));

                        old.setMin(type.getOrder().min(old.getMin(), fieldValue));

                    }

                }

            }

            // 抽样统计,KEY=CUBOID,VALUE=HLLCount

            if (rowCount % 100 < samplingPercentage) {
                putRowKeyToHLL(row);

            }

           

            if (rowCount % 100 == 0) {
                dictColDeduper.resetIfShortOfMem();

            }

            rowCount++;

        }

    }

    protected void doCleanup(Context context) throws IOException, InterruptedException {
        ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);

        // output each cuboid's hll to reducer, key is 0 - cuboidId

        for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
            cuboidStatCalculator.waitForCompletion();

        }

        for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
            Long[] cuboidIds = cuboidStatCalculator.getCuboidIds();

            HLLCounter[] cuboidsHLL = cuboidStatCalculator.getHLLCounters();

            HLLCounter hll;

            // 输出各个CuboID的去重行数HLLCount

            for (int i = 0; i < cuboidIds.length; i++) {
                hll = cuboidsHLL[i];

                tmpbuf.clear();

                tmpbuf.put((byte) FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER); // one byte

                tmpbuf.putLong(cuboidIds[i]);

                outputKey.set(tmpbuf.array(), 0, tmpbuf.position());

                hllBuf.clear();

                hll.writeRegisters(hllBuf);

                outputValue.set(hllBuf.array(), 0, hllBuf.position());

                sortableKey.init(outputKey, (byte) 0);

                context.write(sortableKey, outputValue);

            }

        }

        for (Integer colIndex : dimensionRangeInfoMap.keySet()) {
            DimensionRangeInfo rangeInfo = dimensionRangeInfoMap.get(colIndex);

            DataType dataType = allCols.get(colIndex).getType();

            writeFieldValue(context, dataType, colIndex, rangeInfo.getMin());

            writeFieldValue(context, dataType, colIndex, rangeInfo.getMax());

        }

    }

 

在FactDistinctColumnPartitioner中根据SelfDefineSortableKey(COLUMN_INDEX)选择分区

    public int getPartition(SelfDefineSortableKey skey, Text value, int numReduceTasks) {
        Text key = skey.getText();

        // 统计任务

        if (key.getBytes()[0] == FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER) {
            Long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG);

            return reducerMapping.getReducerIdForCuboidRowCount(cuboidId);

        } else {
            // 抽取纬度值任务,直接根据COLUMN_INDEX指定分区

            return BytesUtil.readUnsigned(key.getBytes(), 0, 1);

        }

    }

 

在FactDistinctColumnsReducer中输出去重后的维度值或输出通过HLL近似算法统计CuboID去重后的行数

    public void doReduce(SelfDefineSortableKey skey, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        Text key = skey.getText();

       

        // 统计逻辑

        if (isStatistics) {
            // for hll

            long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG);

            for (Text value : values) {
                HLLCounter hll = new HLLCounter(cubeConfig.getCubeStatsHLLPrecision());

                ByteBuffer bf = ByteBuffer.wrap(value.getBytes(), 0, value.getLength());

                hll.readRegisters(bf);

                // 累计Mapper输出的各个CuboID未去重的行数(每个Reduce处理部分CuboIDs)

                totalRowsBeforeMerge += hll.getCountEstimate();

                if (cuboidId == baseCuboidId) {
                    baseCuboidRowCountInMappers.add(hll.getCountEstimate());

                }

                // 合并CuboID

                if (cuboidHLLMap.get(cuboidId) != null) {
                    cuboidHLLMap.get(cuboidId).merge(hll);

                } else {
                    cuboidHLLMap.put(cuboidId, hll);

                }

            }

        } else {
            String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1);

            logAFewRows(value);

            // if dimension col, compute max/min value

            if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
                if (minValue == null || col.getType().compare(minValue, value) > 0) {
                    minValue = value;

                }

                if (maxValue == null || col.getType().compare(maxValue, value) < 0) {
                    maxValue = value;

                }

            }

            //if dict column

            if (cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col)) {
                if (buildDictInReducer) {
                    // 如果需要在Reduce阶段构建词典,则在doCleanup后构建完输出词典文件

                    // output written to baseDir/colName/colName.rldict-r-00000 (etc)

                    builder.addValue(value);

                } else {
                    // 直接输出去重后的维度值

                    byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1);

                    // output written to baseDir/colName/-r-00000 (etc)

                    String fileName = col.getIdentity() + "/";

                    mos.write(BatchConstants.CFG_OUTPUT_COLUMN, NullWritable.get(), new Text(keyBytes), fileName);

                }

            }

        }

        rowCount++;

    }

    protected void doCleanup(Context context) throws IOException, InterruptedException {
        if (isStatistics) {
            //output the hll info;

            List<Long> allCuboids = Lists.newArrayList();

            allCuboids.addAll(cuboidHLLMap.keySet());

            Collections.sort(allCuboids);

            logMapperAndCuboidStatistics(allCuboids); // for human check

            输出通过HLL近似算法统计CuboID去重后的行数

            outputStatistics(allCuboids);

        } else {
            //dimension col

            if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
                outputDimRangeInfo();

            }

            // dic col

            if (buildDictInReducer) {
                Dictionary<String> dict = builder.build();

                outputDict(col, dict);

            }

        }

        mos.close();

    }

    private void outputStatistics(List<Long> allCuboids) throws IOException, InterruptedException {
        // output written to baseDir/statistics/statistics-r-00000 (etc)

        String statisticsFileName = BatchConstants.CFG_OUTPUT_STATISTICS + "/" + BatchConstants.CFG_OUTPUT_STATISTICS;

        ByteBuffer valueBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);

        // 获取进入这个Reduce各个CuboID去重后的最终统计行数

        // mapper overlap ratio at key -1

        long grandTotal = 0;

        for (HLLCounter hll : cuboidHLLMap.values()) {
            // 累计各个CuboID去重后的最终统计行数

            grandTotal += hll.getCountEstimate();

        }

       

        // 输出进入这个Reduce中的各Mapper间的重复度,totalRowsBeforeMerge / grandTotal

        double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal;

        mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-1), new BytesWritable(Bytes.toBytes(mapperOverlapRatio)), statisticsFileName);

        //  Mapper数量

        // mapper number at key -2

        mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-2), new BytesWritable(Bytes.toBytes(baseCuboidRowCountInMappers.size())), statisticsFileName);

        // 抽样百分比

        // sampling percentage at key 0

        mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(0L), new BytesWritable(Bytes.toBytes(samplingPercentage)), statisticsFileName);

        // 输出进入这个Reduce的各个cuboId的最终统计结果

        for (long i : allCuboids) {
            valueBuf.clear();

            cuboidHLLMap.get(i).writeRegisters(valueBuf);

            valueBuf.flip();

            mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(i), new BytesWritable(valueBuf.array(), valueBuf.limit()), statisticsFileName);

        }

    }

2.2 基于维度去重值构建维度字典(Build Dimension Dictionary)(在kylin进程内的一个线程中去创建所有维度的dictionary)

  这一步是根据上一步生成的distinct column文件和维度表计算出所有维度的词典信息,词典是为了节约存储而设计的,用于将一个成员值编码成一个整数类型并且可以通过整数值获取到原始成员值,每一个cuboid的成员是一个key-value形式存储在hbase中,key是维度成员的组合,但是一般情况下维度是一些字符串之类的值(例如商品名),所以可以通过将每一个维度值转换成唯一整数而减少内存占用,在从hbase查找出对应的key之后再根据词典获取真正的成员值。使用字典的好处是有很好的数据压缩率,可降低存储空间,同时也提升存储读取的速度。缺点是构建字典需要较多的内存资源,创建维度基数超过千万的容易造成内存溢出。

   这一步是在kylin进程内的一个线程中执行的,它会创建所有维度的dictionary,如果是事实表上的维度则可以从上一步生成的文件中读取该列的distinct成员值(FileTable),否则则需要从原始的hive表中读取每一列的信息(HiveTable),根据不同的源(文件或者hive表)获取所有的列去重之后的成员列表,然后根据这个列表生成dictionary,kylin中针对不同类型的列使用不同的实现方式,对于time之类的(date、time、dtaetime和timestamp)使用DateStrDictionary,这里目前还存在着一定的问题,因为这种编码方式会首先将时间转换成‘yyyy-MM-dd’的格式,会导致timestamp之类的精确时间失去天以后的精度。针对数值型的使用NumberDictionary,其余的都使用一般的TrieDictionary(字典树)。这些dictionary会作为cube的元数据存储的kylin元数据库里面,执行query的时候进行转换。

   针对这一步需要注意的问题:首先,这一步的两个步骤都是在kylin进程的一个线程中执行的,第一步会加载某一个维度的所有distinct成员到内存,如果某一个维度的基数比较大 ,可能会导致内存出现OOM,然后在创建snapshotTable的时候会限制原始表的大小不能超过配置的一个上限值,如果超过则会执行失败。但是应该强调的是这里加载全部的原始维度表更可能出现OOM。

CreateDictionaryJob

2.3 保存统计信息(合并保存统计信息及基于上一个HyperLogLog模拟去重统计信息选择Cube构建算法等)

   针对上一个MR的HyperLogLog模拟去重统计结果文件baseDir/statistics/statistics-r-00000,baseDir/statistics/statistics-r-00001,合并相关统计信息,根据最终重复度选择Cube构建算法

在FactDistinctColumnsReducer中输出进入这个Reduce的各个CuboID的统计信息   

private void outputStatistics(List<Long> allCuboids) throws IOException, InterruptedException {
        // output written to baseDir/statistics/statistics-r-00000 (etc)

        String statisticsFileName = BatchConstants.CFG_OUTPUT_STATISTICS + "/" + BatchConstants.CFG_OUTPUT_STATISTICS;

        ByteBuffer valueBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);

        // 获取进入这个Reduce各个CuboID去重后的最终统计行数

        // mapper overlap ratio at key -1

        long grandTotal = 0;

        for (HLLCounter hll : cuboidHLLMap.values()) {
            // 累计各个CuboID去重后的最终统计行数

            grandTotal += hll.getCountEstimate();

        }

        // 输出进入这个Reduce中的各Mapper间的重复度,totalRowsBeforeMerge / grandTotal

        double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal;

        mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-1), new BytesWritable(Bytes.toBytes(mapperOverlapRatio)), statisticsFileName);

        //  Mapper数量

        // mapper number at key -2

        mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-2), new BytesWritable(Bytes.toBytes(baseCuboidRowCountInMappers.size())), statisticsFileName);

        // 抽样百分比

        // sampling percentage at key 0

        mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(0L), new BytesWritable(Bytes.toBytes(samplingPercentage)), statisticsFileName);

        // 输出进入这个Reduce的各个cuboId的最终统计结果

        for (long i : allCuboids) {
            valueBuf.clear();

            cuboidHLLMap.get(i).writeRegisters(valueBuf);

            valueBuf.flip();

            mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(i), new BytesWritable(valueBuf.array(), valueBuf.limit()), statisticsFileName);

        }

}

在SaveStatisticsStep保存统计信息任务阶段会去读取上一步任务产出的cuboID统计结果文件,产出最终统计信息保存到元数据引擎中并且根据各个Mapper重复度选择Cube构建算法。

Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();

 long totalRowsBeforeMerge = 0;

 long grantTotal = 0;

 int samplingPercentage = -1;

 int mapperNumber = -1;

 for (Path item : statisticsFiles) {
 // 读取解析统计文件

CubeStatsReader.CubeStatsResult cubeStatsResult = new CubeStatsReader.CubeStatsResult(item,

                        kylinConf.getCubeStatsHLLPrecision());           

                // 获取各个CuboID的计数器

                cuboidHLLMap.putAll(cubeStatsResult.getCounterMap());

                long pGrantTotal = 0L;

                for (HLLCounter hll : cubeStatsResult.getCounterMap().values()) {
                    pGrantTotal += hll.getCountEstimate();

                }                

                // 累计所有Mapper输出的cuboID行数

                totalRowsBeforeMerge += pGrantTotal * cubeStatsResult.getMapperOverlapRatio();

                // 累计去重后的cuboID统计行数

                grantTotal += pGrantTotal;

            double mapperOverlapRatio = grantTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grantTotal;

            CubingJob cubingJob = (CubingJob) getManager()

                    .getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));

            // fact源数据行数

            long sourceRecordCount = cubingJob.findSourceRecordCount();

            // 保存CuboID最终统计信息到最终统计文件cuboid_statistics.seq中

            // cuboidHLLMap CuboID的统计信息

            // samplingPercentage 抽样百分比

            // mapperNumber Mapper数

            // mapperOverlapRatio 各个Mapper间的重复度

            // sourceRecordCount fact源数据行数

            CubeStatsWriter.writeCuboidStatistics(hadoopConf, statisticsDir, cuboidHLLMap, samplingPercentage,mapperNumber, mapperOverlapRatio, sourceRecordCount);

            Path statisticsFile = new Path(statisticsDir, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);

            logger.info(newSegment + " stats saved to hdfs " + statisticsFile);

            FSDataInputStream is = fs.open(statisticsFile);

            try {
                // put the statistics to metadata store

                // 把统计信息存储到kylin的元数据引擎中

                String resPath = newSegment.getStatisticsResourcePath();

                rs.putResource(resPath, is, System.currentTimeMillis());

                logger.info(newSegment + " stats saved to resource " + resPath);

                // 根据抽样数据计算重复度,选择Cube构建算法,如mapperOverlapRatio > 7 选逐层算法,否则选快速算法

                StatisticsDecisionUtil.decideCubingAlgorithm(cubingJob, newSegment);

                StatisticsDecisionUtil.optimizeCubingPlan(newSegment);

            } finally {
                IOUtils.closeStream(is);

}

用户该如何选择算法呢?无需担心,Kylin会自动选择合适的算法。Kylin在计算Cube之前对数据进行采样,在“fact distinct”步,利用HyperLogLog模拟去重,估算每种组合有多少不同的key,从而计算出每个Mapper输出的数据大小,以及所有Mapper之间数据的重合度,据此来决定采用哪种算法更优。在对上百个Cube任务的时间做统计分析后,Kylin选择了7做为默认的算法选择阀值(参数kylin.cube.algorithm.layer-or-inmem-threshold):如果各个Mapper的小Cube的行数之和,大于reduce后的Cube行数的7倍,采用Layered Cubing, 反之采用Fast Cubing。如果用户在使用过程中,更倾向于使用Fast Cubing,可以适当调大此参数值,反之调小。

rg.apache.kylin.engine.mr.steps.SaveStatisticsStep               

 int mapperNumLimit = kylinConf.getCubeAlgorithmAutoMapperLimit();

                double overlapThreshold = kylinConf.getCubeAlgorithmAutoThreshold(); // 默认7

                logger.info("mapperNumber for " + seg + " is " + mapperNumber + " and threshold is " + mapperNumLimit);

                logger.info("mapperOverlapRatio for " + seg + " is " + mapperOverlapRatio + " and threshold is "+ overlapThreshold);

                // in-mem cubing is good when

                // 1) the cluster has enough mapper slots to run in parallel

                // 2) the mapper overlap ratio is small, meaning the shuffle of in-mem MR has advantage

                alg = (mapperNumber <= mapperNumLimit && mapperOverlapRatio <= overlapThreshold)//

                        ? CubingJob.AlgorithmEnum.INMEM     // 快速算法

                        : CubingJob.AlgorithmEnum.LAYER;    // 逐层算法

三、 构建Cube

3.1 计算BaseCuboid文件(Build Base Cuboid Data)(执行一个MapReduce任务)

   何谓Base cuboid呢?假设一个cube包含了四个维度:A/B/C/D,那么这四个维度成员间的所有可能的组合就是base cuboid,这就类似在查询的时候指定了select count(1) from xxx group by A,B,C,D;这个查询结果的个数就是base cuboid集合的成员数。这一步也是通过一个MR任务完成的,输入是临时表的路径和分隔符,map对于每一行首先进行split,然后获取每一个维度列的值组合作为rowKey,但是rowKey并不是简单的这些维度成员的内容组合,而是首先将这些内容从dictionary中查找出对应的id,然后组合这些id得到rowKey,这样可以大大缩短hbase的存储空间,提升查找性能。然后在查找该行中的度量列。这个MR任务还会执行combiner过程,执行逻辑和reducer相同,在reducer中的key是一个rowKey,value是相同的rowKey的measure组合的数组,reducer会分解出每一个measure的值,然后再根据定义该度量使用的聚合函数计算得到这个rowKey的结果,其实这已经类似于hbase存储的格式了。

org.apache.kylin.engine.mr.steps.BaseCuboidJob

org.apache.kylin.engine.mr.steps.HiveToBaseCuboidMappe

org.apache.kylin.engine.mr.steps.CuboidReducer

3.2 计算第N层cuboid文件(Build N-Dimension Cuboid Data)(执行N个MapReduce任务)

  这一个流程是由多个步骤的,它是根据维度组合的cuboid的总数决定的,上一层cuboid执行MR任务的输入是下一层cuboid计算的输出,由于最底层的cuboid(base)已经计算完成,所以这几步不需要依赖于任何的hive信息,它的reducer和base cuboid的reducer过程基本一样的(相同rowkey的measure执行聚合运算),mapper的过程只需要根据这一行输入的key(例如A、B、C、D中某四个成员的组合)获取可能的下一层的的组合(例如只有A、B、C和B、C、D),那么只需要将这些可能的组合提取出来作为新的key,value不变进行输出就可以了。

举个例子,假设一共四个维度A/B/C/D,他们的成员分别是(A1、A2、A3),(B1、B2)、(C1)、(D1),有一个measure(对于这列V,计算sum(V)),这里忽略dictionary编码。原始表如下:

kylin的总体概述及总结汇总

那么base cuboid最终的输出如下

(<A1、B1、C1、D1>、2)

(<A1、B2、C1、D1>, 3)

(<A2、B1、C1、D1>, 5)

(<A3、B1、C1、D1>, 6)

(<A3、B2、C1、D1>, 8)

那么它作为下面一个cuboid的输入,对于第一行输入

(<A1、B1、C1、D1>, 2),mapper执行完成之后会输出

(<A1、B1、C1>, 2)、

(<A1、B1、D1>, 2)、

(<A1、C1、D1>, 2)、

(<B1、C1、D1>, 2)这四项,

同样对于其他的内一行也会输出四行,最终他们经过reducer的聚合运算,得到如下的结果:

(<A1、B1、C1>, 2)

(<A1、B1、D1>, 2)

(<A1、C1、D1>, 2 + 3)

(<B1、C1、D1>,2 + 5 +6)

这样一次将下一层的结果作为输入计算上一层的cuboid成员,直到最顶层的cuboid,这一个层cuboid只包含一个成员,不按照任何维度进行group by。

     上面的这些步骤用于生成cuboid,假设有N个维度(对于特殊类型的),那么就需要有N +1层cuboid,每一层cuboid可能是由多个维度的组合,但是它包含的维度个数相同。

org.apache.kylin.engine.mr.steps.NDCuboidJob

org.apache.kylin.engine.mr.steps.NDCuboidMapper

org.apache.kylin.engine.mr.steps.CuboidReducer

3.3 创建HTable

  在上面几步中,我们已经将每一层的cuboid计算完成,每一层的cuboid文件都是一些cuboid的集合,每一层的cuboid的key包含相同的维度个数,下面一步就是将这些cuboid文件导入到hbase中,根据上一步计算出的rowKey分布情况(split数组)创建HTable,创建一个HTable的时候还需要考虑一下几个事情:1、列组的设置,2、每一个列组的压缩方式,3、部署coprocessor,4、HTable中每一个region的大小。在这一步中,列组的设置是根据用户创建cube时候设置的,在hbase中存储的数据key是维度成员的组合,value是对应聚合函数的结果,列组针对的是value的,一般情况下在创建cube的时候只会设置一个列组,该列包含所有的聚合函数的结果;在创建HTable时默认使用LZO压缩,如果不支持LZO则不进行压缩,在后面kylin的版本中支持更多的压缩方式;kylin强依赖于hbase的coprocessor,所以需要在创建HTable为该表部署coprocessor,这个文件会首先上传到HBase所在的HDFS上,然后在表的元信息中关联,这一步很容易出现错误,例如coprocessor找不到了就会导致整个regionServer无法启动,所以需要特别小心;region的划分已经在上一步确定了,所以这里不存在动态扩展的情况,所以kylin创建HTable使用的接口如下:

public void createTable( final HTableDescriptor desc , byte [][] splitKeys)。

CreateHTableJob

3.4 转换HFile文件

  创建完了HTable之后一般会通过插入接口将数据插入到表中,但是由于cuboid中的数据量巨大,频繁的插入会对Hbase的性能有非常大的影响,所以kylin采取了首先将cuboid文件转换成HTable格式的HFile文件,然后在通过bulkLoad的方式将文件和HTable进行关联,这样可以大大降低Hbase的负载,这个过程通过一个MR任务完成。

  这个任务的输入是所有的cuboid文件,在mapper阶段根据每一个cuboid成员的key-value输出,如果cube定义时指定了多个列组,那么同一个key要按照不同列组中的值分别输出,例如在cuboid文件中存在一行cuboid=1,key=1,value=sum(cost),count(1)的数据,而cube中将这两个度量划分到两个列组中,这时候对于这一行数据,mapper的输出为<1, sum(cost)>和<1,count(1)>。reducer使用的是org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer,它会按照行排序输出,如果一行中包含多个值,那么会将这些值进行排序再输出。输出的格式则是根据HTable的文件格式定义的。

CubeHFileJob

3.5 BulkLoad文件

  这一步将HFile文件load到HTable中,因为load操作会将原始的文件删除(相当于remove),在操作之前首先将所有列组的Hfile的权限都设置为777,然后再启动LoadIncrementalHFiles任务执行load操作,它的输入为文件的路径和HTable名,这一步完全依赖于HBase的工具。这一步完成之后,数据已经存储到HBase中了,key的格式由cuboid编号+每一个成员在字典树的id组成,value可能保存在多个列组里,包含在原始数据中按照这几个成员进行GROUP BY计算出的度量的值。

BulkLoadJob

四、 收尾工作

  执行完上一步就已经完成了从输入到输出的计算过程,接下来要做的就是一些kylin内部的工作,分别是更新Cube元数据,更新cube状态,临时数据清理。

4.1 更新Cube元数据信息

  这一步主要是更新cube的状态,其中需要更新的包括cube是否可用、以及本次构建的数据统计,包括构建完成的时间,输入的record数目,输入数据的大小,保存到Hbase中数据的大小等,并将这些信息持久到元数据库中。

UpdateCubeInfoAfterBuildStep

4.2 清理临时数据

  这一步是否成功对正确性不会有任何影响,因为经过上一步之后这个segment就可以在这个cube中被查找到了,但是在整个执行过程中产生了很多的垃圾文件,其中包括:1、临时的hive表,2、因为hive表是一个外部表,存储该表的文件也需要额外删除,3、fact distinct 这一步将数据写入到HDFS上为建立词典做准备,这时候也可以删除了,4、rowKey统计的时候会生成一个文件,此时可以删除。5、生成HFile时文件存储的路径和hbase真正存储的路径不同,虽然load是一个remove操作,但是上层的目录还是存在的,也需要删除。这一步kylin做的比较简单,并没有完全删除所有的临时文件,其实在整个计算过程中,真正还需要保留的数据只有多个cuboid文件(需要增量build的cube),这个因为在不同segment进行merge的时候是基于cuboid文件的,而不是根据HTable的。

GarbageCollectionStep

Cuboid 的维度和指标如何转换为HBase的KV结构

简单的说Cuboid的维度会映射为HBase的Rowkey,Cuboid的指标会映射为HBase的Value。如下图所示: 图2

kylin的总体概述及总结汇总

如上图原始表所示:Hive表有两个维度列year和city,有一个指标列price。

如上图预聚合表所示:我们具体要计算的是year和city这两个维度所有维度组合(即4个cuboid)下的sum(priece)指标,这个指标的具体计算过程就是由MapReduce完成的。

如上图字典编码所示:为了节省存储资源,Kylin对维度值进行了字典编码。图中将beijing和shanghai依次编码为0和1。

如上图HBase KV存储所示:在计算cuboid过程中,会将Hive表的数据转化为HBase的KV形式。Rowkey的具体格式是cuboid id + 具体的维度值(最新的Rowkey中为了并发查询还加入了ShardKey),以预聚合表内容的第2行为例,其维度组合是(year,city),所以cuboid id就是00000011,cuboid是8位,具体维度值是1994和shanghai,所以编码后的维度值对应上图的字典编码也是11,所以HBase的Rowkey就是0000001111,对应的HBase Value就是sum(priece)的具体值。

所有的cuboid计算完成后,会将cuboid转化为HBase的KeyValue格式生成HBase的HFile,最后将HFile load进cube对应的HBase表中。

Cube 构建过程重要源码分析

1 从Hive表生成Base Cuboid

在实际的cube构建过程中,会首先根据cube的Hive事实表和维表生成一张大宽表,然后计算大宽表列的基数,建立维度字典,估算cuboid的大小,建立cube对应的HBase表,再计算base cuboid。

计算base cuboid就是一个MapReduce作业,其输入是上面提到的Hive大宽表,输出的是key是各种维度组合,value是Hive大宽表中指标的值。

org.apache.kylin.engine.mr.steps.BaseCuboidJob

org.apache.kylin.engine.mr.steps.HiveToBaseCuboidMapper

org.apache.kylin.engine.mr.steps.CuboidReducer

map阶段生成key-value的代码如下:   

public void doMap(KEYIN key, Object value, Context context) throws IOException, InterruptedException {
        Collection<String[]> rowCollection = flatTableInputFormat.parseMapperInput(value);

        for (String[] row: rowCollection) {
            try {
                outputKV(row, context);

            } catch (Exception ex) {
                handleErrorRecord(row, ex);

            }

        }

 

    }

2 从Base Cuboid 逐层计算 Cuboid(Cube构建算法-逐层算法)

从base cuboid 逐层计算每层的cuboid,也是MapReduce作业,map阶段每层维度数依次减少。

org.apache.kylin.engine.mr.steps.NDCuboidJob

org.apache.kylin.engine.mr.steps.NDCuboidMapper

org.apache.kylin.engine.mr.steps.CuboidReducer

        public void doMap(Text key, Text value, Context context) throws Exception {
            long cuboidId = rowKeySplitter.split(key.getBytes());

            Cuboid parentCuboid = Cuboid.findForMandatory(cubeDesc, cuboidId);

            /**

             * Build N-Dimension Cuboid

              ## 构建N维cuboid

              这些步骤是“逐层”构建cube的过程,每一步以前一步的输出作为输入,然后去掉一个维度以聚合得到一个子cuboid。举个例子,cuboid ABCD去掉A得到BCD,去掉B得到ACD。

              有些cuboid可以从一个以上的父cuboid聚合得到,这种情况下,Kylin会选择最小的一个父cuboid。举例,AB可以从ABC(id:1110)和ABD(id:1101)生成,则ABD会被选中,因为它的比ABC要小。

              在这基础上,如果D的基数较小,聚合运算的成本就会比较低。所以,当设计rowkey序列的时候,请记得将基数较小的维度放在末尾。这样不仅有利于cube构建,而且有助于cube查询,因为预聚合也遵循相同的规则。

              通常来说,从N维到(N/2)维的构建比较慢,因为这是cuboid数量爆炸性增长的阶段:N维有1个cuboid,(N-1)维有N个cuboid,(N-2)维有(N-2)*(N-1)个cuboid,以此类推。经过(N/2)维构建的步骤,整个构建任务会逐渐变快。

             */

            Collection<Long> myChildren = cuboidScheduler.getSpanningCuboid(cuboidId);

            // if still empty or null

            if (myChildren == null || myChildren.size() == 0) {
                context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, "Skipped records").increment(1L);

                if (skipCounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
                    logger.info("Skipping record with ordinal: " + skipCounter);

                }

                return;

            }           

            context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, "Processed records").increment(1L);

            Pair<Integer, ByteArray> result;

            for (Long child : myChildren) {
                Cuboid childCuboid = Cuboid.findForMandatory(cubeDesc, child);

                result = ndCuboidBuilder.buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers());

                outputKey.set(result.getSecond().array(), 0, result.getFirst());

                context.write(outputKey, value);

            }         

        }

从base cuboid 逐层计算每层的cuboid,也是MapReduce作业,map阶段每层维度数依次减少,reduce阶段对指标进行聚合。

org.apache.kylin.engine.mr.steps.CuboidReducer

    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        aggs.reset();  //MeasureAggregators 根据每种指标的不同类型对指标进行聚合

        for (Text value : values) {
            codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), input);

            if (cuboidLevel > 0) { // Base Cuboid 的 cuboidLevel 是0

                aggs.aggregate(input, needAggr); //指标进行进一步聚合

            } else {
                aggs.aggregate(input);

            }

        }

        aggs.collectStates(result);

        ByteBuffer valueBuf = codec.encode(result);

        outputValue.set(valueBuf.array(), 0, valueBuf.position());

        context.write(key, outputValue);

} 

3 读取Hive宽表直接在Mapper端预聚合构建完整Cube(Cube构建算法-快速算法)

快速算法的核心思想是清晰简单的,就是最大化利用Mapper端的CPU和内存,对分配的数据块,将需要的组合全都做计算后再输出给Reducer;由Reducer再做一次合并(merge),从而计算出完整数据的所有组合。如此,经过一轮Map-Reduce就完成了以前需要N轮的Cube计算。本质就是在Mapper端基于内存提前做预聚合。

org.apache.kylin.engine.mr.steps.InMemCuboidJob

org.apache.kylin.engine.mr.steps.InMemCuboidMapper

org.apache.kylin.engine.mr.steps.InMemCuboidReducer

map阶段生成key-value的代码如下:

    public void doMap(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException {

        // put each row to the queue

        T row = getRecordFromKeyValue(key, value);

        if (offer(context, row, 1, TimeUnit.MINUTES, 60)) {

            counter++;

            countOfLastSplit++;

            if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {

                logger.info(“Handled ” + counter + ” records, internal queue size = ” + queue.size());

            }

        } else {

            throw new IOException(“Failed to offer row to internal queue due to queue full!”);

        }

        if (counter % unitRows == 0 && shouldCutSplit(nSplit, countOfLastSplit)) {

            if (offer(context, inputConverterUnit.getCutRow(), 1, TimeUnit.MINUTES, 60)) {

                countOfLastSplit = 0;

            } else {

                throw new IOException(“Failed to offer row to internal queue due to queue full!”);

            }

            nSplit++;

        }

}

reduce阶段整体合并的代码如下:

public void doReduce(ByteArrayWritable key, Iterable<ByteArrayWritable> values, Context context) throws IOException, InterruptedException {
        aggs.reset();

        for (ByteArrayWritable value : values) {
            if (vcounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
                logger.info("Handling value with ordinal (This is not KV number!): " + vcounter);

            }

            codec.decode(value.asBuffer(), input);

            aggs.aggregate(input);

        }

        aggs.collectStates(result);

        // output key

        outputKey.set(key.array(), key.offset(), key.length());

        // output value

        ByteBuffer valueBuf = codec.encode(result);

        outputValue.set(valueBuf.array(), 0, valueBuf.position());

        context.write(outputKey, outputValue);

    }

4 Cuboid 转化为HBase的HFile。

主要就是数据格式的转化。详情请参考: Hive 数据 bulkload 导入 HBase

不同类型的指标是如何进行聚合的

每种不同的指标都会有对应的聚合算法,所有指标聚合的基类是org.apache.kylin.measure.MeasureAggregator。其核心方法如下:   

    abstract public void reset();

    //不同类型的指标算法会实现该方法

    abstract public void aggregate(V value);

    abstract public V getState();

以最简单的long类型的sum指标为例:   

public class LongSumAggregator extends MeasureAggregator<LongMutable> {
        LongMutable sum = new LongMutable();

        @Override

        public void reset() {
            sum.set(0);

        }

        @Override

        public void aggregate(LongMutable value) {
            sum.set(sum.get() + value.get());

        }

        @Override

        public LongMutable getState() {
            return sum;

        }

}

SQL查询是如何转化为HBase的Scan操作的

还是以图2举例,假设查询SQL如下:   

select year, sum(price)

from table

where city = “beijing”

group by year

这个SQL涉及维度year和city,所以其对应的cuboid是00000011,又因为city的值是确定的beijing,所以在Scan HBase时就会Scan Rowkey以00000011开头且city的值是beijing的行,取到对应指标sum(price)的值,返回给用户。

kylin构建cube优化

创建hive中间表

kylin会在cube构建的第一步先构建一张hive的中间表,该表关联了所有的事实表和维度表,也就是一张宽表。

优化点:

1. hive表分区优化,在构建宽表的时候,kylin需要遍历hive表,事实表和维度表如果是分区表,那么会减少遍历时间

2. hive相关配置调整,join相关配置,mapreduce相关配置等

创建完成后,为了防止文件大小不一致的情况,kylin又基于hive做了一次重均衡操作,

`kylin.engine.mr.mapper-input-rows=1000000`,默认每个文件包含100w的数据量

代码 `CreateFlatHiveTableStep`

找出所有维度的基数

通过HyperLogLog 算法找出去重后的维度列,如果某个维度的基数很大,那么这种维度为被称为ultra high cardinality column(UHC),也就是超高基数维度。那么如何处理这类维度呢?

业务层处理UHC

比如时间戳维度基数可能是亿级的,可以转成为日期,基数降到几十万.

技术层处理UHC

kylin通过mapreduce进行此步骤,在reduce端,一个维度用一个reduce去重,因此当某个维度的基数很大时,会导致该维度所在的reduce运行很慢,甚至内存溢出,为了应对这种场景,kylin提供了两种解决方案

1. 全局唯一维度,也就是在count_dintinct中选择0错误率的统计分析。

2. 需要被shard by的维度,在rowkey构建时配置的维度。

接着可以通过配置`kylin.engine.mr.uhc-reducer-count=1`来声明这些列需要被分割成多少个reducer执行

当然,kylin也支持基于cuboid个数来进行reducer个数的分配,`kylin.engine.mr.hll-max-reducer-number=1`,默认情况下kylin不开启此功能,可以修改配置来提高最小个数;然后通过配置`kylin.engine.mr.per-reducer-hll-cuboid-number`来调整具体的reduce数量

最终的reducer数量由UHC和cuboids两个部分相加得到,具体代码参考

`FactDistinctColumnsReducerMapping`构造函数

# 配置UHC增加另外步骤,需要配置zk的地址(作为全局分布式锁使用)

# 因为在跑mapreduce的过程中,kylin没有将hbase-site.xml等配置上传到yarn,所以只能在kylin.properties中额外配置一遍

kylin.engine.mr.build-uhc-dict-in-additional-step=true

kylin.env.zookeeper-connect-string=host:port,host:port

代码 `FactDistinctColumnsJob`, `UHCDictionaryJob`

构建维度字典

找出所有维度的基数后,kyin为每个维度构建一个数据字典,字典的metadata存储在hdfs上,实际数据存储在hbase

字典在hdfs的路径规则为

kylin/kylin_meta_data/kylin-$jobid/%cubeid/metadata/dict/$catalog.$table/$dimension/$uuid.dict

字典数据在hbase的rowkey规则为

/dict/$catalog.$table/$dimension/$uuid.dict

rowkey长度

过长的rowkey会占用很大的存储空间,所以需要对rowkey长度进行控制。

当前kylin直接在当前进程内做了字典编码,也就是把string映射成int,如果维度列的基数很大,那么可能会出现内存溢出的情况(当列的基础大于1kw),这时候就需要考虑更改维度列的编码方式,改用`fixed_length`等。如果一个维度的长度超过`fixed_length`,那么超过的部分会被截断。 

rowkey构建

对rowkey的构建也有一定的要求,一般而言,需要把基数大的字段放在前面,这样可以在scan的过程中尽可能的跳过更多的rowkey。

另一方面将基数小的列放在rowkey的后面,可以减少构建的重复计算,有些cuboid可以通过一个以上的父cuboid聚合而成,在这种情况下,Kylin将会选择最小的父cuboid。例如,AB能够通过ABC(id:1110)和ABD(id:1101)聚合生成,因此ABD会被作为父cuboid使用,因为它的id比ABC要小。基于以上处理,如果D的基数很小,那么此次聚合操作就会花费很小的代价。因此,当设计cube的rowkey顺序的时候,请记住,将低基数的维度列放在尾部。这不仅对cube的构建过程有好处,而且对cube查询也有好处,因为后聚合(应该是指在HBase查找对应cuboid的过程)也遵循这个规则。

维度分片

在构建rowkey过程中,有一个选项,可以声明哪个维度用于shard。
这个shard的作用是,将该shard维度和总shard数hash,得到的hash结果插入到encoding后的rowkey中,这样就可以让该维度下相同的数据尽可能的分配到一个shard中,而在hbase存储里,一个shard对应的是一个region,这样处理另一个好处是,在聚合的时候可以很好的把相同数据合并一起,减少网络传输io。参考类`RowKeyEncoder`。一个encoding的rowkey的结构是这样的

head+shard+dim1+dim2+…+dimn

一个segment的总shard数计算方式如下,参考类`CreateHTableJob`,其中,estimatedSize参数类`CubeStatsReader.estimateCuboidStorageSize`

int shardNum = (int) (estimatedSize * magic / mbPerRegion + 1);

因此,声明的shard维度最好是被频繁group by的维度或者是基数很大的维度,这样在coprocess处理的时候可以加速

构建cube

构建引擎

可以选择spark或者mapreduce来构建cube,通常来说,构建引擎的选择方式是这样的

  1. 内存消耗型的cube选择mapreduce,例如Count Distinct, Top-N
  2. 简单的cube选择spark,例如SUM/MIN/MAX/COUNT

spark引擎

spark构建引擎采用` by-layer`算法,也就是分层计算

比如有3个维度ABC,cube会构建A,B,C,AB,AC,ABC6种组合,这里就有3层,

第1层:A,B,C

第2层:AB,AC

第3层:ABC

每一层在计算对于spark而言都是一个action,并且该层计算的rdd会依赖其上一层的结果继续计算,这样避免了很大重复性计算工作。

代码` SparkCubingByLayer` 

数据转换为HFile

kylin将生成的cube通过生成HFile的方式导入到hbase,这个优化点可以配置hbase的相关参数。

  1. region数量默认是1,如果数据量大的话可以提高region数量
  2. region大小默认是5GB,也就是hbae官方建议的大小;如果cube大小比这个值小太多,可以减小单region的大小
  3. hfile文件大小,默认是1GB,由于是通过mapreduce写入的,小文件意味着写入快,但是读取慢,大文件意味着写入慢,读取快

代码`CubeHFileJob`

cleanup

  1. 清理hive中的中间表,
  2. 清理hbase表
  3. 清理hdfs数据

理命令

查看需要清理的数据

./bin/kylin.sh org.apache.kylin.tool.StorageCleanupJob –delete false

清理

./bin/kylin.sh org.apache.kylin.tool.StorageCleanupJob –delete true

 总结

基于kylin的ui,可以看到kylin在构建cube时各个流程的耗时,可以依据这些耗时做相应的优化,常见的,可以从耗时最长的步骤开始优化,比如:

  1. 遇到创建hive中间表时间很长,考虑对hive表进行分区处理,对表中的文件格式更改,使用orc,parquet等高性能的文件格式
  2. 遇到cube构建时间过长,查看cube设计是否合理,维度的组合关系是否可以再减少,构建引擎是否可以优化

优化的思路还是以cube为中心,优化cube的整个生命周期,其中涉及到的所有组件都是优化点,具体情况还是要和实际的数据维度和业务结合起来。

kylin的总体概述及总结汇总

关注 易学在线 公众号

每日更新大量优质技术文档
第一时间获知最新上架课程
与众多大数据猿分享与交流

DOIT.EDU 资料来自网络收集整理,如有侵权请告知

wx