Presto 数据如何进行shuffle

概述

在众多分布式系统中,节点间的交互总是必不可少的,数据的shuffle又是重中之重。那么在Presto中如何进行数据shuffle呢。

Presto中的数据shuffle方式

在Presto中数据shuffle大致分为以下三种。

Presto 数据如何进行shuffle
图1

如图1所示,在Presto对于第一个执行计划,将Stage1的数据汇在Stage0进行汇总,这个可以看做是N对1的数据shuffle。对于第二个执行计划,将Stage2的数据shuffle到Stage1的多个节点上进行aggregation操作,这个可以看做是N对N或者是M对N的数据shuffle。对于第三个执行计划,将Stage2的数据根据Stage1中的TableScan的表的分区方式进行repartition式的数据shuffle,这个可以看做是M对M或者N对M的数据shuffle方式。

Presto中的shuffle实现

Presto 数据如何进行shuffle
图2

如图2所示,假设上游节点与下游节点分别是4个以及节点中并发数也是4(节点个数与并发数的问题请看文章https://zhuanlan.zhihu.com/p/55785284)。那么图中有几个重点:

  1. 上游节点怎么感知需要分多少个buffer
  2. 怎么保证相同的数据分布到相同的节点
  3. 下游节点用多少个client来拉取数据

partitionCount是多少

上游节点需要分多少个buffer其实这个真是值是下游节点的数量,但是在代码实现中,这个值是通过bucketToPartition计算而来的。

 public class BucketPartitionFunction
        implements PartitionFunction
{
    private final BucketFunction bucketFunction;
    private final int[] bucketToPartition;
    private final int partitionCount;

    public BucketPartitionFunction(BucketFunction bucketFunction, int[] bucketToPartition)
    {
        this.bucketFunction = requireNonNull(bucketFunction, "bucketFunction is null");
        this.bucketToPartition = requireNonNull(bucketToPartition, "bucketToPartition is null").clone();
        partitionCount = IntStream.of(bucketToPartition).max().getAsInt() + 1;
    }

    public int getPartitionCount()
    {
        return partitionCount;
    }

    /**
     * @param functionArguments the arguments to bucketing function in order (no extra columns)
     */
    public int getPartition(Page functionArguments, int position)
    {
        int bucket = bucketFunction.getBucket(functionArguments, position);
        return bucketToPartition[bucket];
    }
}

在进行数据partition时这个partitionCount就是通过bucketToPartition的最大值加1算出来的,这个值等于下游节点的数量,为什么这个值永远等于下游节点的数量,这个需要看bucketToPartition的由来。这个数据结构在创建Stage时决定的,

   public NodePartitionMap(List<Node> partitionToNode, ToIntFunction<Split> splitToBucket)
    {
        this.partitionToNode = ImmutableList.copyOf(requireNonNull(partitionToNode, "partitionToNode is null"));
        this.bucketToPartition = IntStream.range(0, partitionToNode.size()).toArray();
        this.splitToBucket = requireNonNull(splitToBucket, "splitToBucket is null");
    }

    public NodePartitionMap(List<Node> partitionToNode, int[] bucketToPartition, ToIntFunction<Split> splitToBucket)
    {
        this.bucketToPartition = requireNonNull(bucketToPartition, "bucketToPartition is null");
        this.partitionToNode = ImmutableList.copyOf(requireNonNull(partitionToNode, "partitionToNode is null"));
        this.splitToBucket = requireNonNull(splitToBucket, "splitToBucket is null");
    }

我们看到bucketToPartition是放在NodePartitionMap这个类中的,这个类有连个构造器,其中对于第一个构造器bucketToPartition是自己生成的,并且他是一个0到partitionToNode.size()的数组。其实这个构造器是对于非源头节点使用的,既对于N对N的那种shuffle方式使用的,这个bucketToPartition是0到下游节点size的一个数组,那么如果下游有4个节点,那么这个数据就应该是[0,1,2,3]。那么根据上文提到的partitionCount的计算方式,计算结果就是4,正好是节点个数。

对于第二个构造器,他的bucketToPartition是传入NodePartitionMap的,这个构造器是源头节点使用的,既对于N对M或者M对M的shuffle方式。这个bucketToPartition需要看如下代码

 /*
  * 假设是8个分区,4个节点有,那么bucketToNode结果如下
  * 1      ->      n1
  * 2      ->      n1
  * 3      ->      n2
  * 4      ->      n2 
  * 5      ->      n3 
  * 6      ->      n3
  * 7      ->      n4
  * 8      ->      n4
  */
 int[] bucketToPartition = new int[connectorBucketNodeMap.getBucketCount()];
 BiMap<Node, Integer> nodeToPartition = HashBiMap.create();
 int nextPartitionId = 0;
 for (int bucket = 0; bucket < bucketToNode.size(); bucket++) {
      Node node = bucketToNode.get(bucket);
      Integer partitionId = nodeToPartition.get(node);
      if (partitionId == null) {
           partitionId = nextPartitionId++;
           nodeToPartition.put(node, partitionId);
      }
      bucketToPartition[bucket] = partitionId;
 }

这个里bucketToPartition的size是源头节点的分区数,假设一张表有8个分区,4个节点,从上面的算法中可以看出,bucketToPartition的最后得到一个数组[0,0,1,1,2,2,3,3],那么根据上文提到的partitionCount的计算方式,bucketToPartition的最大值加1,正好也是节点数4个。

如何进行Hash的

一行数据选择哪一个分区,取决于hash算法,其实就是刚才的BucketPartitionFunction,可以看到有一个getPartition方法

 public int getPartition(Page functionArguments, int position)
 {
      int bucket = bucketFunction.getBucket(functionArguments, position);
      return bucketToPartition[bucket];
 }   

其中bucketFunction是一个接口,它有比较多的实现,分别来实现不同的情况下的Hash计算方式。

public interface BucketFunction
{
    /**
     * Gets the bucket for the tuple at the specified position.
     * Note the tuple values may be null.
     */
    int getBucket(Page page, int position);
}

比如对于N对M的数据shuffle,这个接口就要调用ConnectorNodePartitioningProvider的getBucketFunction来实现,才可以感知相应到Connector的数据分区方式。

但是最终,数据放在哪一个Buffer分区中是由bucketToPartition数组决定的,这个数据的内容取决于是源头还是非源头节点,比较方便的兼容了两种方式。

下游拉取数据

对于下游拉取数据,有两个问题,一个是拉数据的节点的地址,两一个是拉这些节点的那些分区。

在上游节点调度完成之后,会调用下游节点addExchangeLocations,在addExchangeLocations时,将需要拉取数据的节点注册给下游节点。这样下游节点就感知到了需要拉取节点的数据。

    public synchronized void addExchangeLocations(PlanFragmentId fragmentId, Set<RemoteTask> sourceTasks, boolean noMoreExchangeLocations)
    {
        requireNonNull(fragmentId, "fragmentId is null");
        requireNonNull(sourceTasks, "sourceTasks is null");

        RemoteSourceNode remoteSource = exchangeSources.get(fragmentId);
        checkArgument(remoteSource != null, "Unknown remote source %s. Known sources are %s", fragmentId, exchangeSources.keySet());

        this.sourceTasks.putAll(remoteSource.getId(), sourceTasks);

        for (RemoteTask task : getAllTasks()) {
            ImmutableMultimap.Builder<PlanNodeId, Split> newSplits = ImmutableMultimap.builder();
            for (RemoteTask sourceTask : sourceTasks) {
                URI exchangeLocation = sourceTask.getTaskStatus().getSelf();
                newSplits.put(remoteSource.getId(), createRemoteSplitFor(task.getTaskId(), exchangeLocation));
            }
            task.addSplits(newSplits.build());
        }

        if (noMoreExchangeLocations) {
            completeSourceFragments.add(fragmentId);

            // is the source now complete?
            if (completeSourceFragments.containsAll(remoteSource.getSourceFragmentIds())) {
                completeSources.add(remoteSource.getId());
                for (RemoteTask task : getAllTasks()) {
                    task.noMoreSplits(remoteSource.getId());
                }
            }
        }
    }

一个下游的task只会拉上游task的固定分区,而且只需要保证相同的数据在相同的节点上,所以这里直接用TaskId就可以保证这点。

private static Split createRemoteSplitFor(TaskId taskId, URI taskLocation)
{
     // Fetch the results from the buffer assigned to the task based on id
     URI splitLocation = uriBuilderFrom(taskLocation).appendPath("results").appendPath(String.valueOf(taskId.getId())).build();
     return new Split(REMOTE_CONNECTOR_ID, new RemoteTransactionHandle(), new RemoteSplit(splitLocation));
}

这里URI的拼的方式是results/Id,id指定了buffer的Id,就是TaskId。

结论

Presto的数据shuffle是带有Presto特色的数据shuffle,使用一套工程实现代码,实现了三种数据shuffle方式,既考虑了非源头节点shuffle,又考虑了Connetor的数据分布方式进行性能优化,还是比较值得学习的。

Presto 数据如何进行shuffle

关注 易学在线 公众号

每日更新大量优质技术文档
第一时间获知最新上架课程
与众多大数据猿分享与交流

DOIT.EDU 资料来自网络收集整理,如有侵权请告知

(2)
打赏 微信扫一扫 微信扫一扫
上一篇 12月 30, 2020 2:54 上午
下一篇 1月 1, 2021 3:10 下午

相关推荐

wx