`
hongs_yang
  • 浏览: 59550 次
  • 性别: Icon_minigender_1
  • 来自: 西安
社区版块
存档分类
最新评论
文章列表
mapPartitions/mapPartitionsWithIndex 这 两个transform中:mapPartitions与map的区别是map中是对每个partition中的iterator执行map操作,对 map过程中的每一条record进行传入的function的处理,而mapPartitions是把partition中整个iterator传给 function进行处理.如果是map操作,你并不能知道这个iterator什么时候结束,但mapPartitions时给你的是一个 iterator,所以你的函数中知道这个iterator什么时候会结束.而mapPartitions ...
Sample是对rdd中的数据集进行采样,并生成一个新的RDD,这个新的RDD只有原来RDD的部分数据,这个保留的数据集大小由fraction来进行控制,这个分析中,不分析sample的两个算法的具体实现,如果后期有必要时,可以分析这两个算法的具体的实现. 首先,先看看sample的实现代码: def sample(    withReplacement: Boolean,    fraction: Double,    seed: Long = Utils.random.nextLong): RDD[T] = withScope {
reduceByKey 通过PairRDDFunctions进行的实现,reduceByKey的操作是把两个V类型的值进行处理,并最终返回的还是一个V类型的结果(V类型 就是value的类型).针对一个reduceByKey的操作,需要执行shuffle的操作,也就是说如果包含有reduceByKey时,会生成两 个执行的stage,第一个stage会根据shuffle的partition与分区的算子,对数据重新进行分区操作,第二个stage去读取重新分区 的数据. def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.with ...
该 函数主要功能:通过指定的排序规则与进行排序操作的分区个数,对当前的RDD中的数据集按KEY进行排序,并生成一个SHUFFLEdrdd的实例,这个 过程会执行shuffle操作,在执行排序操作前,sortBy操作会执行一次到两次的数据取样的操作,取出RDD中每个PARTITION的部分数据, 并根据进行分区的partition的个数,按key的compare大小把某个范围内的key放到一个指定的partition中进行排序. 该函数的操作示例: import org.apache.spark.SparkContext._**   val rdd: RDD[(String, Int)]  ...
这个操作的作用根据相同的key的所有的value存储到一个集合中的一个玩意. def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {  groupByKey(defaultPartitioner(self))} 在 做groupByKey的操作时,由于需要根据key对数据进行重新的分区操作,因此这个操作需要有一个partitioner的实例.默认是hash算 子.这个操作根据当前操作的RDD中是否有partitioner,同时这个partitioner与当前的传入的partitioner的实例是否相同 来判断是否需要执行shuf ...
关于Hbase的cache配置 在hbase中的hfilecache中,0.96版本中新增加了bucket cache, bucket cache通过把hbase.offheapcache.percentage配置为0来启用, 如果hbase.offheapcache.percentage的配置值大于0时,直接使用堆外内存来管理hbase的cache, 通过把hfi ...
HADOOP HA配置 hadoop2.x的ha配置,此文档中描述有hdfs与yarn的ha配置。 此文档的假定条件是zk已经安装并配置完成,其实也没什么安装的。 hdfs ha配置 首先,先配置core-site.xml配置文件: <property> 在老版本中使用mr1时,还可能使用fs.default.name来进行配置
ReduceTask的运行   Reduce处理程序中需要执行三个类型的处理,   1.copy,从各map中copy数据过来   2.sort,对数据进行排序操作。   3.reduce,执行业务逻辑的处理。   ReduceTask的运行也是通过run方法开始,   通过mapreduce.job.reduce.shuffle.consumer.plugin.class配置shuffle的
MapTask运行通过执行.run方法:   1.生成TaskAttemptContextImpl实例,此实例中的Configuration就是job本身。   2.得到用户定义的Mapper实现类,也就是map函数的类。
HFileV2文件   HFileV2文件写入通过StoreFile.Writer-->HFileWriterV2进行写入。   文件格式通过hfile.format.version配置。默认为2,也只有2这个值在0.96可用。   可通过cf中配置
关于MemStore的补充   在通过HStore.add向store中添加一个kv时,首先把数据写入到memstore中。这一点没有什么说明;   publiclong add(final KeyValue kv) {   lock.readLock().lock();   try {   return
spark shuffle流程分析   回到ShuffleMapTask.runTask函数   现在回到ShuffleMapTask.runTask函数中:   override def runTask(context: TaskContext): MapStatus = {   首先得到要reduce的task的个数。   valnumOutputSplits = dep.partitioner
Task的执行过程分析   Task的执行通过Worker启动时生成的Executor实例进行,   case RegisteredExecutor(sparkProperties) =>   logInfo("Successfully registered with driver")   // Make this host instead of hostPort ?   executor = new Executor(executorId, Utils.parseHostPort(hostPort).
Spark中的Scheduler   scheduler分成两个类型,一个是TaskScheduler与其实现,一个是DAGScheduler。   TaskScheduler:主要负责各stage中传入的task的执行与调度。   DAGScheduler:主要负责对JOB中的各种依赖进行解析,根据RDD的依赖生成stage并通知TaskScheduler执行。   实例生成   TaskScheduler实例生成:   scheduler实例生成,我目前主要是针对on yarn的spark进行的相关分析,   在appmaster启动后,通过调用startU ...
RDD的依赖关系   Rdd之间的依赖关系通过rdd中的getDependencies来进行表示,   在提交job后,会通过在 DAGShuduler.submitStage-->getMissingParentStages  
Global site tag (gtag.js) - Google Analytics