96SEO 2026-02-19 16:02 0
。

它是Spark中用于表示不可变、可分区、里面的元素可并行计算的集合。
RDD提供了一种高度受限的共享内存模型#xff0c;即RD…一
Spark中的一个核心概念。
它是Spark中用于表示不可变、可分区、里面的元素可并行计算的集合。
RDD提供了一种高度受限的共享内存模型即RDD是只读的记录集的分区集合只能通过在其他RDD执行确定的转换操作如map、filter、join等来创建新的RDD。
函数实际上是作用在RDD中的分区上的一个分区是由一个task处理有多少个分区总共就有多少个task
RDD之间存在一些依赖关系后一个RDD中的数据是依赖与前一个RDD的计算结果数据像水流一样在RDD之间流动
spark为task计算提供了精确的计算位置移动计算而不移动数据
RDD由很多分区partition构成有多少partition就对应有多少任务task
Spark默认是hash分区ByKey类的算子只能作用在kv格式的rdd上
Spark为task的计算提供了最佳的计算位置移动计算而不是移动数据
所有byKey算子如partitionBy、groupByKey、reduceByKey、aggregateByKey、foldByKey、combineByKey、sortByKey等。
repartition、cartesian算子。
部分join算子特别是非hash-partitioned的join算子。
map、filter、flatMap、mapPartitions、mapPartitionsWithIndex、sample、union、distinct、coalesce、repartitionAndUnion等。
7.窄依赖的分区数是不可以改变取决于第一个RDD分区数宽依赖可以在产生shuffle的算子上设置分区数
SparkConf()//明确运行模式conf.setMaster(local)//给任务取名字conf.setAppName(Map算子的演示)//创建对象val
context.textFile(spark/data/ws/students.csv)//Map算子:将rdd中的数据一条一条的取出来传入到map函数中map会返回一个新的rddmap不会改变总数据条数val
{stu.split(,).toList})//使用foreach行动算子mapRDD.foreach(println)
SparkConf()//确定运行模式conf.setMaster(local)//给任务取名字conf.setAppName(filter算子演示)//创建对象val
context.textFile(spark/data/ws/students.csv)/***
filter:过滤将RDD中的数据一条一条取出传递给filter后面的函数*
如果函数的结果是true该条数据就保留否则丢弃,取出来的结果组成一个新的RDD*///取出所有男生val
stu.split(,)男.equals(stuList(3))})filterRDD.foreach(println)//全都是男生的全部信息
SparkConf()//确定运行模式conf.setMaster(local)//给任务取名字conf.setAppName(flatMap算子演示)//创建对象val
context.textFile(spark/data/ws/students.csv)/***
flatMap算子将RDD中的数据一条一条的取出传递给后面的函数*
函数的返回值必须是一个集合。
最后会将集合展开构成一个新的RDD*///扁平化val
{stu.split(,)})value.foreach(println)//结果是所有信息都是换行的
SparkConf()//确定运行模式conf.setMaster(local)//给任务取名字conf.setAppName(sample算子演示)//创建对象val
context.textFile(spark/data/ws/students.csv)/***
抽取的结果在100条左右*///withReplacement:
1.按照指定的字段进行分组返回的是一个键是分组字段值是一个存放原本数据的迭代器的键值对
SparkConf()//确定运行模式conf.setMaster(local)//给任务取名字conf.setAppName(groupBy算子演示)//创建对象val
context.textFile(spark/data/ws/students.csv)val
{stu.split(,)})//求出每个班级的评价年龄//使用匹配模式//1、先取出班级和年龄val
groupBy按照指定的字段进行分组返回的是一个键是分组字段*
迭代器中的数据不是完全被加载到内存中计算迭代器只能迭代一次**
clazzWithAgeRDD.groupBy((_._1))//(理科二班,CompactBuffer((理科二班,21),
allAge.size)})clazzAvgAgeRDD.foreach(println)
3.只有kv类型键值对RDD才可以调用groupByKey算子
数据格式groupBy不用考虑数据格式而groupByKey必须是kv键值对数据格式。
分组规则groupBy需要指定分组规则即根据某个或某些字段进行分组而groupByKey则是根据key对value进行分组。
返回值类型groupBy是将整条数据放在集合中即它会将数据集按照指定的规则划分成若干个小区域并将这些小区域包含整个数据行作为集合返回而groupByKey只是将具有相同key的value放在集合中即它会把RDD的类型由RDD[(Key,
性能groupByKey的性能更好执行速度更快因为groupByKey相比较与groupBy算子来说shuffle所需要的数据量较少
SparkConf()//确定运行模式conf.setMaster(local)//给任务取名字conf.setAppName(groupByKey算子演示)//创建对象val
context.textFile(spark/data/ws/students.csv)val
{stu.split(,)})//需求求出每个班级平均年龄//使用模式匹配的方式取出班级和年龄val
只有kv类型键值对RDD才可以调用groupByKey算子**/val
clazzWithAgeRDD.groupByKey()//(理科六班,CompactBuffer(22,
{(kv._1,kv._2.sum.toDouble/kv._2.size)})clazzAvgAgeRDD.foreach(println)
1.利用reduceByKey实现按照键key对value值直接进行聚合需要传入聚合的方式
2.reduceByKey算子也是只有kv类型的RDD才能调用
reduceByKey该函数用于对具有相同键的值进行聚合操作。
它会将具有相同键的值按照指定的合并函数进行迭代和聚合最终生成一个新的RDD其中每个键都是唯一的与每个键相关联的值是经过合并操作后的结果。
groupByKey该函数仅根据键对RDD中的元素进行分组不执行任何聚合操作。
它只是将具有相同键的元素放在一个组中形成一个包含键和其对应值的迭代器。
因此groupByKey的结果是一个新的RDD其中每个键都与一个迭代器相关联迭代器包含了与该键关联的所有值。
reduceByKey返回一个新的RDD其中每个键都是唯一的与每个键相关联的值是经过合并操作后的结果。
groupByKey返回一个新的RDD其中每个键都与一个迭代器相关联迭代器包含了与该键关联的所有值。
reduceByKey在某些情况下可能更高效因为它可以在分布式计算中在map阶段进行一些本地聚合从而减少数据传输。
groupByKey可能导致数据移动较多因为它只是对键进行分组而不进行本地聚合。
因此在处理大数据集时groupByKey可能会导致更高的网络传输成本和更长的处理时间。
4.以后遇见key相同value相加直接用reduceByKey
xxx.reduceByKey((x:Int,y:Int)xy)
SparkConf()//确定运行模式conf.setMaster(local)//给任务取名字conf.setAppName(reduceByKey算子演示)//创建对象val
context.textFile(spark/data/ws/students.csv)val
{stu.split(,)})//求每个班级的人数//1、将每个元素变成(clazz,1)val
利用reduceByKey实现按照键key对value值直接进行聚合需要传入聚合的方式*
reduceByKey算子也是只有kv类型的RDD才能调用*///聚合val
y)clazzSumPersonRDD.foreach(println)
1.上下合并两个RDD,前提是两个RDD中的数据类型要一致合并后不会对结果进行去重
SparkConf()conf.setMaster(local)conf.setAppName(Union算子演示)val
context.textFile(spark/data/ws/w1.txt)
context.textFile(spark/data/ws/w2.txt)
union:上下合并两个RDD,前提是两个RDD中的数据类型要一致合并后不会对结果进行去重**
注这里的合并只是逻辑层面上的合并物理层面其实是没有合并*/val
w1RDD.union(w2RDD)unionRDD.foreach(println)
SparkConf()conf.setMaster(local)conf.setAppName(Join算子演示)val
SparkContext(conf)////两个kv类型的RDD之间的关联//通过scala中的集合构建RDD,通过context中的parallelize方法将集合变成RDDval
context.parallelize(List((1001,
joinRDD.foreach(println)//(1005,(hkx,学习))//leftJoin
leftRDD1.foreach(println)//rightJoin
rightRDD1.foreach(println)//fullJoinval
没有爱好)}fullJoin1.foreach(println)
SparkConf()//明确运行模式conf.setMaster(local)//给任务取名字conf.setAppName(统计总分年级排名前10的学生的各科分数)//创建对象val
SparkContext(conf)//需求统计总分年级排名前10的学生的各科分数//1、读取文件val
context.textFile(spark/data/ws/score.txt).map((s:
score.toInt)}//筛选数据//2、计算每个学生的总分val
{//RDD中的模式匹配case后面不需要加类型直接是RDD小括号中的数据类型匹配case
score)}//以后遇见key相同value相加直接用reduceByKey。
reduceByKey((x:
{-kv._2}).take(10)//4、求各科成绩//拿出学号val
ids.contains(sid)}clazzScoreTop10RDD.foreach(println)
//需求统计总分年级排名前10的学生的各科分数//1、读取文件val
context.textFile(spark/data/ws/score.txt).map((s:
score.toInt)}//筛选数据//2、计算每个学生的总分val
{//RDD中的模式匹配case后面不需要加类型直接是RDD小括号中的数据类型匹配case
score)}//以后遇见key相同value相加直接用reduceByKey。
reduceByKey((x:
1.mapPartitions与mapPartitionsWithIndex的用法
mapPartitions不用指定分区里面传入的是迭代器迭代器存储的是每个分区的数据
SparkConf()//明确运行模式conf.setMaster(local)//给任务取名字conf.setAppName(mapPartition算子的演示)//创建对象val
context.textFile(spark/data/ws/*)
读取数据文件//打印分区println(scoreRDD.getNumPartitions)//
scoreRDD.mapPartitionsWithIndex{
(index:Int,itr:Iterator[String])
scoreRDD.mapPartitionsWithIndex((i:
{println(s分区是${i})itr.flatMap(_.split(\\|))})mapPartitionRDD.foreach(println)
SparkConf()//明确运行模式conf.setMaster(local)//给任务取名字conf.setAppName(action算子)//创建对象val
context.textFile(spark/data/ws/students.csv)println(hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh)//val
println(jjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjj)(id,
clazz)}println(xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx)studentsRDD.foreach(println)
1.默认将数据缓存在内存中catch的实际就是用的persist程序结束缓存数据没了
xxxRDD.persist(StorageLevel.级别)
1.可以将RDD运行时的数据永久持久化在HDFS上这个方案叫做checkpoint,需要在spark环境中设置checkpoint的路径
SparkConf()//明确运行模式conf.setMaster(local)//给任务取名字conf.setAppName(persist)//创建对象val
SparkContext(conf)//设置缓冲路径context.setCheckpointDir(spark/data/checkpoint)//读取文件val
context.textFile(spark/data/ws/students.csv)//切分筛选元素val
clazz)}//统计每个班的人数studentsRDD.checkpoint()val
//可以简写为(__)clazzSumRDD.saveAsTextFile(spark/data/clazz_num)//统计男生女有多少人val
//可以简写为(__)clazzSumRDD.saveAsTextFile(spark/data/gender_num)//def
persist(StorageLevel.MEMORY_ONLY)//
clazzSumRDD.persist(StorageLevel.MEMORY_ONLY)
JAVA_HOME/usr/local/soft/jdk1.8.0_171
org.apache.spark.examples.SparkPi
spark-examples_2.12-3.1.3.jar是jars下的jar名
org.apache.spark.examples.SparkPi类名
org.apache.spark.examples.SparkPi
SparkConf()//提交Linux运行不需要明确运行模式
conf.setMaster(local)//给任务取个名字conf.setAppName(Standalone运行模式)//创建对象val
SparkContext(conf)//使用对象中的parallelize方法将Scala中的集合变成RDDval
sparkContext.parallelize((List(java,hello,world,hello,scala,spark,java,hello,spark)))val
arrayRDD.flatMap(_.split(,))val
_)reduceRDD.foreach(println)/***
将项目打包放到spark集群中使用standalone模式运行*
com.shujia.core.Demo17SparkStandaloneSubmit
com.shujia.core.Demo17SparkStandaloneSubmit
com.shujia.core.Demo17SparkStandaloneSubmit
com.shujia.core.Demo17SparkStandaloneSubmit
HADOOP_CONF_DIR/usr/local/soft/hadoop-3.1.1/etc/hadoop
nameyarn.nodemanager.pmem-check-enabled/name
nameyarn.nodemanager.vmem-check-enabled/name
nameyarn.application.classpath/name
org.apache.spark.examples.SparkPi
org.apache.spark.examples.SparkPi类名
org.apache.spark.examples.SparkPi
Application过后会分配一个子节点启动ApplicationMaster进程
4.ApplicationMaster向RM申请节点并启动Executor
2.Driver根据RDD之间的依赖关系将Application形成一个DAG有向无环图。
Scheduler会根据产生的shuffle划分窄宽依赖通过宽依赖划分Stage
Scheduler将Stage包装成taskset发送给Task
Scheduler(stage里面有很多并行的tasktaskset是每个stage里面的并行task封装的)
Scheduler拿到了task后发送到Executor中的线程池执行
1.如果task执行失败taskscheduler会重试3次如果还失败DAGscheduler会重试4次
如果是因为shuffle过程中文件找不到的异常taskscheduler不负责重试task,而是由DAGscheduler重试上一个stage
1.如果有的task执行很慢taskscheduler会在发生一个一摸一样的task到其它节点中执行让多个task竟争谁先执行完成以谁的结果为准
SparkConf()conf.setMaster(local)conf.setAppName(累加器演示)val
context.textFile(spark/data/ws/students.csv)var
1println(-------------------------)println(count)println(-------------------------)})println(scount的值为${count})//0
上述这个程序RDD里面的count输出是1000而RDD外面的count还是0
SparkConf()conf.setMaster(local)conf.setAppName(累加器演示)val
context.textFile(spark/data/ws/students.csv)//
println(-------------------------)
println(-------------------------)
println(scount的值为${count})//0/***
1、因为累加器的执行实在RDD中执行的而RDD是在Executor中执行的而要想在Executor中执行就得有一个action算子触发任务调度*
3、SparkContext无法在RDD内部使用因为SparkContext对象无法进行序列化不能够通过网络发送到Executor中*/val
context.longAccumulatorstudentRDD.foreach((line:String){longAccumulator.add(1)})println(longAccumulator.value)
1.避免了每次Task任务拉取都要附带一个副本拉取的速度变快了执行速度也就变快了
SparkConf()//明确模式conf.setMaster(local)//给任务取个名字conf.setAppName(广播变量)//创建对象val
SparkContext(conf)//以scala的方式读取students.csv文件并进行相关操作val
Source.fromFile(spark/data/ws/students.csv).getLines().toList.map((line:
lines.mkString(,))}).toMap//使用spark的形式读取文件val
context.textFile(spark/data/ws/score.txt)/***
将Spark读取的分数RDD与外部变量学生Map集合进行关联*
循环遍历scoresRDD将学号一样的学生信息关联起来*/val
infos)})idWithInfosRDD.foreach(println)
SparkConf()//明确模式conf.setMaster(local)//给任务取个名字conf.setAppName(广播变量)//创建对象val
SparkContext(conf)//以scala的方式读取students.csv文件并进行相关操作val
Source.fromFile(spark/data/ws/students.csv).getLines().toList.map((line:
lines.mkString(,))}).toMap//使用spark的形式读取文件val
context.textFile(spark/data/ws/score.txt)/***
将Spark读取的分数RDD与外部变量学生Map集合进行关联*
作为专业的SEO优化服务提供商,我们致力于通过科学、系统的搜索引擎优化策略,帮助企业在百度、Google等搜索引擎中获得更高的排名和流量。我们的服务涵盖网站结构优化、内容优化、技术SEO和链接建设等多个维度。
| 服务项目 | 基础套餐 | 标准套餐 | 高级定制 |
|---|---|---|---|
| 关键词优化数量 | 10-20个核心词 | 30-50个核心词+长尾词 | 80-150个全方位覆盖 |
| 内容优化 | 基础页面优化 | 全站内容优化+每月5篇原创 | 个性化内容策略+每月15篇原创 |
| 技术SEO | 基本技术检查 | 全面技术优化+移动适配 | 深度技术重构+性能优化 |
| 外链建设 | 每月5-10条 | 每月20-30条高质量外链 | 每月50+条多渠道外链 |
| 数据报告 | 月度基础报告 | 双周详细报告+分析 | 每周深度报告+策略调整 |
| 效果保障 | 3-6个月见效 | 2-4个月见效 | 1-3个月快速见效 |
我们的SEO优化服务遵循科学严谨的流程,确保每一步都基于数据分析和行业最佳实践:
全面检测网站技术问题、内容质量、竞争对手情况,制定个性化优化方案。
基于用户搜索意图和商业目标,制定全面的关键词矩阵和布局策略。
解决网站技术问题,优化网站结构,提升页面速度和移动端体验。
创作高质量原创内容,优化现有页面,建立内容更新机制。
获取高质量外部链接,建立品牌在线影响力,提升网站权威度。
持续监控排名、流量和转化数据,根据效果调整优化策略。
基于我们服务的客户数据统计,平均优化效果如下:
我们坚信,真正的SEO优化不仅仅是追求排名,而是通过提供优质内容、优化用户体验、建立网站权威,最终实现可持续的业务增长。我们的目标是与客户建立长期合作关系,共同成长。
Demand feedback