概述
在众多分布式系统中,节点间的交互总是必不可少的,数据的shuffle又是重中之重。那么在Presto中如何进行数据shuffle呢。
Presto中的数据shuffle方式
在Presto中数据shuffle大致分为以下三种。

如图1所示,在Presto对于第一个执行计划,将Stage1的数据汇在Stage0进行汇总,这个可以看做是N对1的数据shuffle。对于第二个执行计划,将Stage2的数据shuffle到Stage1的多个节点上进行aggregation操作,这个可以看做是N对N或者是M对N的数据shuffle。对于第三个执行计划,将Stage2的数据根据Stage1中的TableScan的表的分区方式进行repartition式的数据shuffle,这个可以看做是M对M或者N对M的数据shuffle方式。
Presto中的shuffle实现

如图2所示,假设上游节点与下游节点分别是4个以及节点中并发数也是4(节点个数与并发数的问题请看文章https://zhuanlan.zhihu.com/p/55785284)。那么图中有几个重点:
- 上游节点怎么感知需要分多少个buffer
- 怎么保证相同的数据分布到相同的节点
- 下游节点用多少个client来拉取数据
partitionCount是多少
上游节点需要分多少个buffer其实这个真是值是下游节点的数量,但是在代码实现中,这个值是通过bucketToPartition计算而来的。
public class BucketPartitionFunction
implements PartitionFunction
{
private final BucketFunction bucketFunction;
private final int[] bucketToPartition;
private final int partitionCount;
public BucketPartitionFunction(BucketFunction bucketFunction, int[] bucketToPartition)
{
this.bucketFunction = requireNonNull(bucketFunction, "bucketFunction is null");
this.bucketToPartition = requireNonNull(bucketToPartition, "bucketToPartition is null").clone();
partitionCount = IntStream.of(bucketToPartition).max().getAsInt() + 1;
}
public int getPartitionCount()
{
return partitionCount;
}
/**
* @param functionArguments the arguments to bucketing function in order (no extra columns)
*/
public int getPartition(Page functionArguments, int position)
{
int bucket = bucketFunction.getBucket(functionArguments, position);
return bucketToPartition[bucket];
}
}
在进行数据partition时这个partitionCount就是通过bucketToPartition的最大值加1算出来的,这个值等于下游节点的数量,为什么这个值永远等于下游节点的数量,这个需要看bucketToPartition的由来。这个数据结构在创建Stage时决定的,
public NodePartitionMap(List<Node> partitionToNode, ToIntFunction<Split> splitToBucket)
{
this.partitionToNode = ImmutableList.copyOf(requireNonNull(partitionToNode, "partitionToNode is null"));
this.bucketToPartition = IntStream.range(0, partitionToNode.size()).toArray();
this.splitToBucket = requireNonNull(splitToBucket, "splitToBucket is null");
}
public NodePartitionMap(List<Node> partitionToNode, int[] bucketToPartition, ToIntFunction<Split> splitToBucket)
{
this.bucketToPartition = requireNonNull(bucketToPartition, "bucketToPartition is null");
this.partitionToNode = ImmutableList.copyOf(requireNonNull(partitionToNode, "partitionToNode is null"));
this.splitToBucket = requireNonNull(splitToBucket, "splitToBucket is null");
}
我们看到bucketToPartition是放在NodePartitionMap这个类中的,这个类有连个构造器,其中对于第一个构造器bucketToPartition是自己生成的,并且他是一个0到partitionToNode.size()的数组。其实这个构造器是对于非源头节点使用的,既对于N对N的那种shuffle方式使用的,这个bucketToPartition是0到下游节点size的一个数组,那么如果下游有4个节点,那么这个数据就应该是[0,1,2,3]。那么根据上文提到的partitionCount的计算方式,计算结果就是4,正好是节点个数。
对于第二个构造器,他的bucketToPartition是传入NodePartitionMap的,这个构造器是源头节点使用的,既对于N对M或者M对M的shuffle方式。这个bucketToPartition需要看如下代码
/*
* 假设是8个分区,4个节点有,那么bucketToNode结果如下
* 1 -> n1
* 2 -> n1
* 3 -> n2
* 4 -> n2
* 5 -> n3
* 6 -> n3
* 7 -> n4
* 8 -> n4
*/
int[] bucketToPartition = new int[connectorBucketNodeMap.getBucketCount()];
BiMap<Node, Integer> nodeToPartition = HashBiMap.create();
int nextPartitionId = 0;
for (int bucket = 0; bucket < bucketToNode.size(); bucket++) {
Node node = bucketToNode.get(bucket);
Integer partitionId = nodeToPartition.get(node);
if (partitionId == null) {
partitionId = nextPartitionId++;
nodeToPartition.put(node, partitionId);
}
bucketToPartition[bucket] = partitionId;
}
这个里bucketToPartition的size是源头节点的分区数,假设一张表有8个分区,4个节点,从上面的算法中可以看出,bucketToPartition的最后得到一个数组[0,0,1,1,2,2,3,3],那么根据上文提到的partitionCount的计算方式,bucketToPartition的最大值加1,正好也是节点数4个。
如何进行Hash的
一行数据选择哪一个分区,取决于hash算法,其实就是刚才的BucketPartitionFunction,可以看到有一个getPartition方法
public int getPartition(Page functionArguments, int position)
{
int bucket = bucketFunction.getBucket(functionArguments, position);
return bucketToPartition[bucket];
}
其中bucketFunction是一个接口,它有比较多的实现,分别来实现不同的情况下的Hash计算方式。
public interface BucketFunction
{
/**
* Gets the bucket for the tuple at the specified position.
* Note the tuple values may be null.
*/
int getBucket(Page page, int position);
}
比如对于N对M的数据shuffle,这个接口就要调用ConnectorNodePartitioningProvider的getBucketFunction来实现,才可以感知相应到Connector的数据分区方式。
但是最终,数据放在哪一个Buffer分区中是由bucketToPartition数组决定的,这个数据的内容取决于是源头还是非源头节点,比较方便的兼容了两种方式。
下游拉取数据
对于下游拉取数据,有两个问题,一个是拉数据的节点的地址,两一个是拉这些节点的那些分区。
在上游节点调度完成之后,会调用下游节点addExchangeLocations,在addExchangeLocations时,将需要拉取数据的节点注册给下游节点。这样下游节点就感知到了需要拉取节点的数据。
public synchronized void addExchangeLocations(PlanFragmentId fragmentId, Set<RemoteTask> sourceTasks, boolean noMoreExchangeLocations)
{
requireNonNull(fragmentId, "fragmentId is null");
requireNonNull(sourceTasks, "sourceTasks is null");
RemoteSourceNode remoteSource = exchangeSources.get(fragmentId);
checkArgument(remoteSource != null, "Unknown remote source %s. Known sources are %s", fragmentId, exchangeSources.keySet());
this.sourceTasks.putAll(remoteSource.getId(), sourceTasks);
for (RemoteTask task : getAllTasks()) {
ImmutableMultimap.Builder<PlanNodeId, Split> newSplits = ImmutableMultimap.builder();
for (RemoteTask sourceTask : sourceTasks) {
URI exchangeLocation = sourceTask.getTaskStatus().getSelf();
newSplits.put(remoteSource.getId(), createRemoteSplitFor(task.getTaskId(), exchangeLocation));
}
task.addSplits(newSplits.build());
}
if (noMoreExchangeLocations) {
completeSourceFragments.add(fragmentId);
// is the source now complete?
if (completeSourceFragments.containsAll(remoteSource.getSourceFragmentIds())) {
completeSources.add(remoteSource.getId());
for (RemoteTask task : getAllTasks()) {
task.noMoreSplits(remoteSource.getId());
}
}
}
}
一个下游的task只会拉上游task的固定分区,而且只需要保证相同的数据在相同的节点上,所以这里直接用TaskId就可以保证这点。
private static Split createRemoteSplitFor(TaskId taskId, URI taskLocation)
{
// Fetch the results from the buffer assigned to the task based on id
URI splitLocation = uriBuilderFrom(taskLocation).appendPath("results").appendPath(String.valueOf(taskId.getId())).build();
return new Split(REMOTE_CONNECTOR_ID, new RemoteTransactionHandle(), new RemoteSplit(splitLocation));
}
这里URI的拼的方式是results/Id,id指定了buffer的Id,就是TaskId。
结论
Presto的数据shuffle是带有Presto特色的数据shuffle,使用一套工程实现代码,实现了三种数据shuffle方式,既考虑了非源头节点shuffle,又考虑了Connetor的数据分布方式进行性能优化,还是比较值得学习的。

关注 易学在线 公众号
每日更新大量优质技术文档
第一时间获知最新上架课程
与众多大数据猿分享与交流
DOIT.EDU 资料来自网络收集整理,如有侵权请告知