赛事介绍:
阶段一(15): Hadoop分布式、HadoopHA、hive、spark、kafka、flume、zookeeper、Sqoop (HBase、Storm) 阶段二(20):scrapy、正则表达式、xpath、数据存储(csv、json、mysql) 阶段三(25):mapreduce、spark、hive 阶段四(20):flask、jinja2、echarts、mysql语法 阶段五(15):写报告
团队素质(5):
环境搭建 前置操作 集群hostname,hosts配置
网络环境配置
关闭防火墙 systemctl stop firewalld 永久关闭(禁用)防火墙 systemctl disable firewalld 启动防火墙 systemctl start firewalld
关闭SELinux
修改/etc/selinux/config 文件
将SELINUX=enforcing改为SELINUX=disabled
重启机器即可
配置SSH免密登录
在master机器上输入:ssh-keygen–t rsa回车 ssh-copy-id –iroot@主机名回车输入主机对应的用户密码
ssh root@node01
JDK
Mysql
可选:
配置同步脚本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 #!/bin/bash pcount=$# if [ $pcount -lt 1 ]then echo Not Enough Arguement! exit ; fi for host in master slave1 slave2do echo ==================== $host ==================== for file in $@ do if [ -e $file ] then pdir=$(cd -P $(dirname $file ); pwd ) echo pdir=$pdir fname=$(basename $file ) echo fname=$fname ssh $host "mkdir -p $pdir " rsync -av $pdir /$fname $USER @$host :$pdir else echo $file does not exists! fi done done
显示进程
1 2 3 4 5 6 7 8 9 10 11 12 #!/bin/bash echo ======================集群节点状态====================for i in master slave1 slave2do echo ====================== $i ==================== ssh root@$i '/opt/module/jdk1.8.0_141/bin/jps' done echo ======================执行完毕====================
Hadoop 集群 1 2 3 4 5 6 7 8 9 10 11 12 13 14 # core-site <configuration > <property > <name > fs.defaultFS</name > <value > hdfs://node01:9000</value > </property > </configuration >
1 2 3 4 5 6 7 8 # hdfs <configuration > <property > <name > dfs.replication</name > <value > 3</value > </property > </configuration >
1 2 # mapred-env.sh 添加环境变量 Java
1 2 3 4 5 6 7 8 # mapred <configuration > <property > <name > mapreduce.framework.name</name > <value > yarn</value > </property > </configuration >
1 2 3 4 5 6 7 8 9 10 11 12 # Yarn <configuration > <property > <name > yarn.resourcemanager.hostname</name > <value > node01</value > </property > <property > <name > yarn.nodemanager.aux-services</name > <value > mapreduce_shuffle</value > </property > </configuration >
1 2 3 4 5 6 7 8 9 10 11 1.配置slaves文件,填入datanode的节点 2.将hadoop完全拷贝至其他机器上 3.环境变量文件也需要拷贝,并且source一下 4.格式化hadoop ./hdfs namenode -format 5.启动 HDFS: http://master:50070/ yarn集群访问查看 http://master:8088/cluster
高可用 Zookepper集群搭建 zookeeper(集群之间进行协调管理,同步,故障检测等作用)
1)解压zookeeper到/usr/local/src目录下,重命名
2)配置环境变量,并且生效
3)配置文件: zoo_sample.cfg拷贝一个
cp zoo_sample.cfg zoo.cfg
修改内容:
dataDir=/usr/local/src/zookeeper/zkdata/data (自己创建的数据存放目录)
dataLogDir=/usr/local/src/zookeeper/zkdata/log (自己创建的日志目录)
server.1=主机名1:2888:3888 (集群配置)
server.2=主机名2:2888:3888 (集群配置)
server.3=主机名3:2888:3888 (集群配置)
4)在刚才配置的dataDir目录下新建一个 名为myid的文件,vim myid
在里面写上数字编号:1
5)将zookeeper远程拷贝至其他机器上。
6)修改拷贝后其他机器上zookeeper中myid的编号
7)将环境变量也同步拷贝到其他机器上,并且在其他机器上source一下
8)分别在集群机器上启动zookeeper:
启动: zkServer.sh start
查看状态: zkServer.sh status
停止: zkServer.sh stop
高可用配置 Hadoop-env/Yarn-env a) Hadoop-env.sh 配置jdk路径
b) Yarn-env.sh配置jdk路径
core-site 配置core-site.xmla 配置zookeeper和hadoop通信地址
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 <property > <name > fs.defaultFS</name > <value > hdfs://myns1/</value > </property > <property > <name > hadoop.tmp.dir</name > <value > /opt/soft/hadoop/data</value > </property > <property > <name > ha.zookeeper.quorum</name > <value > mymaster:2181,myslave1:2181,myslave2:2181</value > </property >
hdfs-site a) 配置hdfs-site.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 <property > <name > fs.replication</name > <value > 3</value > </property > <property > <name > dfs.nameservices</name > <value > myns1</value > </property > <property > <name > dfs.ha.namenodes.myns1</name > <value > nn1,nn2</value > </property > <property > <name > dfs.namenode.rpc-address.myns1.nn1</name > <value > mymaster:9000</value > </property > <property > <name > dfs.namenode.http-address.myns1.nn1</name > <value > mymaster:50070</value > </property > <property > <name > dfs.namenode.rpc-address.myns1.nn2</name > <value > myslave1:9000</value > </property > <property > <name > dfs.namenode.http-address.myns1.nn2</name > <value > myslave1:50070</value > </property > <property > <name > dfs.namenode.shared.edits.dir</name > <value > qjournal://mymaster:8485;myslave1:8485;myslave2:8485/myns1</value > </property > <property > <name > dfs.journalnode.edits.dir</name > <value > /opt/soft/hadoop/journaldata</value > </property > <property > <name > dfs.ha.automatic-failover.enabled</name > <value > true</value > </property > <property > <name > dfs.client.failover.proxy.provider.myns1</name > <value > org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value > </property > <property > <name > dfs.ha.fencing.methods</name > <value > sshfence shell(/bin/true) </value > </property > <property > <name > dfs.ha.fencing.ssh.private-key-files</name > <value > /root/.ssh/id_rsa</value > </property > <property > <name > dfs.ha.fencing.ssh.connect-timeout</name > <value > 30000</value > </property > <property > <name > dfs.webhdfs.enabled</name > <value > true</value > </property >
mapred-site 1 2 3 4 5 6 <property > <name > mapreduce.framework.name</name > <value > yarn</value > </property >
yarn-site 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 <property > <name > yarn.resourcemanager.ha.enabled</name > <value > true</value > </property > <property > <name > yarn.resourcemanager.cluster-id</name > <value > yrc</value > </property > <property > <name > yarn.resourcemanager.ha.rm-ids</name > <value > rm1,rm2</value > </property > <property > <name > yarn.resourcemanager.hostname.rm1</name > <value > mymaster</value > </property > <property > <name > yarn.resourcemanager.hostname.rm2</name > <value > myslave1</value > </property > <property > <name > yarn.resourcemanager.zk-address</name > <value > mymaster:2181,myslave1:2181,myslave2:2181</value > </property > <property > <name > yarn.nodemanager.aux-services</name > <value > mapreduce_shuffle</value > </property > <property > <name > yarn.log-aggregation.retain-seconds</name > <value > 86400</value > </property > <property > <name > yarn.resourcemanager.recovery.enabled</name > <value > true</value > </property > <property > <name > yarn.resourcemanager.store.class</name > <value > org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value > </property >
slaves
启动集群 配置文件修改完毕,同步发送到其他机器上 scp -r /opt/soft/hadoop/etc/hadoop hadoop2:/opt/soft/hadoop/etc/ scp -r /opt/soft/hadoop/etc/hadoop hadoop3:/opt/soft/hadoop/etc/
1、分别每个机器上启动zookeeper集群 (jps看到QuorumPeerMain进程) bin/zkServer.sh start 2、每个机器上启动 journalnode (最好是奇数台机器) (hadoop-daemon.sh start journalnode) 启动完毕后jps能看到JournalNode进程 3、在一个namenode节点上进行格式化 (配置了两个namenode机器,只需要一台电脑上格式化) (格式化成功后在配置的data目录下,有个dfs,里面有name和data,因为没有启动集群,data是空的因为格式化后暂时没有内容) (name中存储元数据,data存真数据) (name下有current,下面有4个文件,就是元数据信息<fsimage…..,seen_txid,VERSION>) 4、注意,在格式化后namenode的机器上找到存放数据的data (core-site.xml里面配置的元数据存储目录)目录,然后拷贝到另外一个备份的data目录下 ,保持初始元数据一致。 下面是例子,具体目录自己决定(只需要拷贝到另外一个namenode节点机器上即可)。 scp -r data hadoop02:/opt/temp 5、在任何一个namenode上执行zkfc -formatZK操作 (#格式化zookeeper),命令如下: hdfs zkfc -formatZK 6、start-dfs.sh 7、start-yarn.sh (在自己配置的对应任何一台resourcemanager机器上执行) 8、测试 1)分别在每个机器上jps查看状态 2)分别访问两个namenode机器上的50070界面 ,查看状态是否一个是Active ,另外一个是Standby 3)也可以测试访问一下yarn集群 4)高可用测试,测试主备切换 (将active状态的namenode进程干掉,kill -9 xxxxid) (可以再使用单节点启动方式启动namenode:hadoop-daemon.sh start namenode)
在另外一台resourceManager机器上单独启动 resourceManager进程
yarn高可用测试和hadoop一样,访问的端口是8088
yarn-daemon.sh start resourcemanager
hadoop-daemon.sh start namenode
启动历史服务器 mr-jobhistory-daemon.sh start historyserver
yarn-daemon.sh start nodemanager yarn-daemon.sh start resourcemanager
然后两台机器分别启动resourcemanager即可
命令方式查看hadoop状态:hdfs haadmin -getServiceState nn2 命令方式查看yarn状态:yarn rmadmin -getServiceState rm1
测试 在本地创建一个文本文件,随便放入一些单词,以空格分开 a》将本地文件上传至已经启动的hdfs中 hdfs dfs -put /本地目录/本地文件.txt / b》运行mapreduce示例(/usr/local/src/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar) hadoop jar hadoop-mapreduce-examples-2.6.0.jar wordcount /hello.txt /test1 注意:/hello.txt是hdfs中的文件路径和名称 /test1是mapreduce结果输出到hdfs的目录,该目录存在则直接报错,所以该目录不能存在
常用命令 随机返回指定行数的样本数据 hdsf dfs -cat /test/gonganbu/scene_analysis_suggestion/* | shuf -n 5
返回前几行的样本数据 hdsf dfs -cat /test/gonganbu/scene_analysis_suggestion/* | head -100
返回最后几行的样本数据 hdsf dfs -cat /test/gonganbu/scene_analysis_suggestion/* | tail -5
……
Hive 1、保证有一台机器上安装好了MySql数据库 2、解压hive 改名 配置环境变量 3、hive中conf下: 复制hive-env.sh 修改 复制hive-site.xml 修改(数据库连接,数据库驱动名,账号,密码,3个地址配置改) 4、将mysql的驱动放入hive中lib目录下 ,保证hive能够正常连接mysql数据库 5、将hive/lib中jline2.12.xxx的包拷贝到 hadoop/share/hadoop/yarn/lib 原来的jlineXXX老版本要删除掉 6、初始化元数据 schematool -dbType mysql -initSchema
7、直接输入hive启动进入hive命令行: show databases; create database test1; use test1; create table employee(eid int ,name string ,salary float) row format delimited fields terminated by ‘hdfs中文件行中字段的分隔符’ lines terminated by ‘\n’ ; 8、load data inpath ‘/hdfs中的文件’ overwrite into table employee;
本地上传:load data local inpath ‘/export/servers/hivedatas/student.csv’ into table student;
9、使用select * from 表做一些查询测试 如果涉及分组则会进行mapreduce操作,得到结果。
在HDFS上默认储存路径:/user/hive/warehouse
hive-env 1 2 3 4 export JAVA_HOME=/opt/soft/jdk export HADOOP_HOME=/opt/soft/Hadoop export HIVE_HOME=/opt/soft/hive export SPARK_HOME=/usr/lib/spark
hive-site 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 <configuration > <property > <name > javax.jdo.option.ConnectionUserName</name > <value > root</value > </property > <property > <name > javax.jdo.option.ConnectionPassword</name > <value > 123456</value > </property > <property > <name > javax.jdo.option.ConnectionURL</name > <value > jdbc:mysql://node03:3306/hive?createDatabaseIfNotExist=true& useSSL=false</value > </property > <property > <name > javax.jdo.option.ConnectionDriverName</name > <value > com.mysql.jdbc.Driver</value > </property > <property > <name > hive.querylog.location</name > <value > /opt/soft/hive/tmp</value > </property > <property > <name > hive.exec.local.scratchdir</name > <value > /opt/soft/hive/tmp</value > </property > <property > <name > hive.downloaded.resources.dir</name > <value > /opt/soft/hive/tmp</value > </property > </configuration >
Sqoop Sqoop详细介绍包括:sqoop命令,原理,流程
通过hadoop的mapreduce进行数据传输
1 2 3 4 5 6 7 export HADOOP_COMMON_HOME= /usr/app/hadoop-2.7.3export HADOOP_MAPRED_HOME= /usr/app/hadoop-2.7.3
将MySQL相关驱动jar拷贝到 sqoop下的lib目录下
测试:
sqoop-list-tables –connect jdbc:mysql://localhost:3306/bigdata_db – username root –password 123456
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 sqoop-list-tables --connect jdbc:mysql://localhost:3306/bigdata_db -- username root --password 12345 # Sqoop从Mysql数据导入HDFS sqoop import \ --connect jdbc:mysql://localhost/IP:3306/bigdata_db \ --username root \ --password 123456 \ --target-dir /hdfs目录 #该目录不能存在 --table tb_goods --m 1 #设置文件数量为1,否则一条记录会形成一个文件 # Sqoop从Hive导出MySql (有用) sqoop export --connect jdbc:mysql://localhost:3306/bigdata_db?useUnicode=true\&chara cterEncoding=utf-8 --username root --password 123456 --table tb_sp --num-mappers 1 --export-dir /user/hive/warehouse/testdb1.db/goods/part-m-00000 --inputfields-terminated-by '\001'; 注意:input-fields-terminated-by跟导入时保持一致,如果默认则是 \001 # 总结文档 1、mysql表导入到hdfs sqoop import --connect jdbc:mysql://localhost/IP:3306/bigdata_db --username root --password 123456 --target-dir /hdfs目录 --table tb_goods --m 1 --fields-terminated-by '\t' 说明: # --target-dir 设置hdfs中路径,该路径存在会报错 # --delete-target-dir 加上该参数,目录存在会自动删除 # --m 设置文件数量为1,否则每条记录都会形成一个文件 # --where 'id<3' 加上该参数,会对mysql表加入条件后导入 # --query 'select 列名1,列名2 from 表' 加上该参数,则执行查询具体数据后导入 # --fields-terminated-by 加上该参数后hdfs中文件的分隔符将会按这个设定 2、hdfs导出到mysql表 sqoop export --connect jdbc:mysql://IP地址:3306/bigdata_db?useUnicode=true\&characterEncoding=utf-8 --username root --password 123456 --table tb_goods --export-dir /sp/mytable --fields-terminated-by '\t' --m 1 3、mysql表导入到hive表(先在hive中创建表,并且加载数据后测试) sqoop import --m 1 --hive-import --connect jdbc:mysql://IP地址:3306/bigdata_db?useUnicode=true\&characterEncoding=utf-8 --username root --password 123456 --table tb_goods --fields-terminated-by ‘\t' --hive-database testdb1 --hive-table goods; 4、hive表导出到mysql表 sqoop export --connect jdbc:mysql://localhost:3306/bigdata_db?useUnicode=true\&characterEncoding=utf-8 --username root --password 123456 --table tb_sp --export-dir /user/hive/warehouse/testdb1.db/goods/part-m-00000 --input-fields-terminated-by ‘\001’; # input-fields-terminated-by是hive表对应hdfs中文件数据的分隔符
Spark 1 mv slaves.template slaves
spark-env .sh 添加 JAVA_HOME 环境变量和集群对应的 master 节点
1 2 3 4 5 6 7 8 9 # 指定 Java Home export JAVA_HOME=/export/servers/jdk1.8.0 # 指定 Spark Master 地址 export SPARK_MASTER_HOST=node01 export SPARK_MASTER_PORT=7077 分发 , sbin/start-all.sh 启动
1 spark-submit --class org.apache.spark.examples.SparkPi --master spark://master:7077 --executor-memory 1G --total-executor-cores 2 ../examples/jars/spark-examples_2.11-2.0.0.jar 100
配置 HistoryServer
默认情况下, Spark 程序运行完毕后, 就无法再查看运行记录的 Web UI 了, 通过 HistoryServer 可以提供一个服务, 通过读取日志文件, 使得我们可以在程序运行结束后, 依然能够查看运行过程
复制 spark-defaults.conf
, 以供修改
1 2 3 cd /export/servers/spark/conf cp spark-defaults.conf.template spark-defaults.conf vi spark-defaults.conf
将以下内容复制到spark-defaults.conf
末尾处, 通过这段配置, 可以指定 Spark 将日志输入到 HDFS 中
1 2 3 spark.eventLog.enabled true spark.eventLog.dir hdfs://node01:8020/spark_log spark.eventLog.compress true
将以下内容复制到spark-env.sh
的末尾 , 配置 HistoryServer 启动参数, 使得 HistoryServer 在启动的时候读取 HDFS 中写入的 Spark 日志
1 2 # 指定 Spark History 运行参数 export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=4000 -Dspark.history.retainedApplications=3 -Dspark.history.fs.logDirectory=hdfs://node01:8020/spark_log"
为 Spark 创建 HDFS 中的日志目录
1 hdfs dfs -mkdir -p /spark_log
1、在linxu中搭建好spark环境(单机或集群都可以)并启动 如果是单机在jps后可以看到:Master和Worker节点进程
2、代码中修改: val sparkConf = new SparkConf().setMaster(“spark://hadoop05:7077”).setAppName(“Task1”) 说明:hadoop05是linux主机名,7077是spark通信端口
//指明读取的文件是hdfs文件系统中的文件
val rdd = sc.textFile("hdfs://hadoop05:9000/myword.txt")
说明:hadoop05是hdfs所在机器的主机名,9000是hadoop通信端口
//保存的时候也指明保存结果目录是hdfs文件系统
rdd4.coalesce(1).saveAsTextFile("hdfs://hadoop05:9000/out1")
3、在linux服务器中运行jar包: spark-submit –class com.xyzy.spark3.MyTask1 –executor-memory 1g /opt/test/spark3_jar/spark3.jar 说明:com.xyzy.spark3.MyTask1是代码中的包名+类名
1 2 3 4 5 bin/spark-submit \ --class <main-class> --master <master-url> \ ... # other options <application-jar> \ [application-arguments]
在提交应用中,一般会同时一些提交参数
参数
解释
可选值举例
–class
Spark 程序中包含主函数的类
–master
Spark 程序运行的模式(环境)
模式:local[*]、spark://linux1:7077、 Yarn
–executor-memory 1G
指定每个 executor 可用内存为 1G
符合集群内存配置即可,具体情况具体分析。
–total-executor-cores 2
指定所有executor 使用的cpu 核数 为 2 个
符合集群内存配置即可,具体情况具体分析。
–executor-cores
指定每个executor 使用的cpu 核数
符合集群内存配置即可,具体情况具体分析。
application-jar
打包好的应用 jar,包含依赖。这 个 URL 在集群中全局可见。 比如 hdfs:// 共享存储系统,如果是
Spark master及部署方式 我们在初始化SparkConf时,或者提交Spark任务时,都会有master参数需要设置,如下:
1 2 3 4 5 6 7 conf = SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf) /bin/spark-submit \ --cluster cluster_name \ --master yarn-cluster \ ...
但是这个master到底是何含义呢?文档说是设定master url,但是啥是master url呢?说到这就必须先要了解下Spark的部署方式了。
我们要部署Spark这套计算框架,有多种方式,可以部署到一台计算机,也可以是多台(cluster)。我们要去计算数据,就必须要有计算机帮我们计算,当然计算机越多(集群规模越大),我们的计算力就越强。但有时候我们只想在本机做个试验或者小型的计算,因此直接部署在单机上也是可以的。Spark部署方式可以用如下图形展示:
Spark部署方式
下面我们就来分别介绍下。
Local模式 Local模式就是运行在一台计算机上的模式,通常就是用于在本机上练手和测试。它可以通过以下集中方式设置master。
local: 所有计算都运行在一个线程当中,没有任何并行计算,通常我们在本机执行一些测试代码,或者练手,就用这种模式。
local[K]: 指定使用几个线程来运行计算,比如local[4]就是运行4个worker线程。通常我们的cpu有几个core,就指定几个线程,最大化利用cpu的计算能力
local[*]: 这种模式直接帮你按照cpu最多cores来设置线程数了。
使用示例:
1 2 3 4 /bin/spark-submit \ --cluster cluster_name \ --master local[*] \ ...
总而言之这几种local模式都是运行在本地的单机版模式,通常用于练手和测试,而实际的大规模计算就需要下面要介绍的cluster模式。
cluster模式 cluster模式肯定就是运行很多机器上了,但是它又分为以下三种模式,区别在于谁去管理资源调度。(说白了,就好像后勤管家,哪里需要资源,后勤管家要负责调度这些资源)
standalone模式 这种模式下,Spark会自己负责资源的管理调度。它将cluster中的机器分为master机器和worker机器,master通常就一个,可以简单的理解为那个后勤管家,worker就是负责干计算任务活的苦劳力。具体怎么配置可以参考Spark Standalone Mode 使用standalone模式示例:
1 2 3 4 /bin/spark-submit \ --cluster cluster_name \ --master spark: ...
–master就是指定master那台机器的地址和端口,我想这也正是–master参数名称的由来吧。
mesos模式 这里就很好理解了,如果使用mesos来管理资源调度,自然就应该用mesos模式了,示例如下:
1 2 3 4 /bin/spark-submit \ --cluster cluster_name \ --master mesos: ...
yarn模式 同样,如果采用yarn来管理资源调度,就应该用yarn模式,由于很多时候我们需要和mapreduce使用同一个集群,所以都采用Yarn来管理资源调度,这也是生产环境大多采用yarn模式的原因。yarn模式又分为yarn cluster模式和yarn client模式:
yarn cluster: 这个就是生产环境常用的模式,所有的资源调度和计算都在集群环境上运行。
yarn client: 这个是说Spark Driver和ApplicationMaster进程均在本机运行,而计算任务在cluster上。
使用示例:
1 2 3 4 /bin/spark-submit \ --cluster cluster_name \ --master yarn-cluster \ ...
爬虫阶段 练习 点播课程 (51moot.cn)
腾讯招聘 (tencent.com)
【武汉二手丰田】武汉丰田二手车报价_武汉二手丰田价格|多少钱-人人车 (renrenche.com)
Scrapy爬虫步骤
1.创建一个scrapy项目
1 scrapy startproject mySpider
2.生成一个爬虫
1 scrapy genspider itcast itcast .cn #itcast 是爬虫名字,"itcast .cn "限制爬虫地址,防止爬到其他网站
3.提取数据
3.保存数据
启动爬虫
启动爬虫不打印日志
1 scrapy crawl 爬虫名字 --nolog
run.py启动爬虫
1 2 from scrapy import cmdline cmdline.execute("scrapy crawl lagou" .split ())
启动爬虫并保存
1 scrapy crawl 爬虫名字 --o data.csv
重写第一次请求 1 2 3 4 5 6 7 8 9 10 name = 'maoyan3' allowed_domains = ['maoyan.com' ] def start_requests (self ): for offset in range(0 , 91 , 10 ): url = 'https://maoyan.com/board/4?offset={}' .format(offset) yield scrapy.Request(url=url,callback=self.parse)
POST请求 1 2 3 4 5 6 7 8 9 10 11 post_data= dict( login="812******[email protected] " , password="******" ) yield scrapy.FormRequest( url=“http://www.xxx.xxx”, formdata=post_data, meta={‘key’:value}, callback=self.处理方法 )
scrapy shell
Scrapy shell是一个交互终端,我们可以在未启动spider的情况下尝试及调试代码,也可以用来测试XPath表达式
使用方法:
1 2 命令行输入: scrapy shell http:
常用参数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 response.url:当前响应的url地址 response.request.url:当前响应对应的请求的url地址 response.headers:响应头 response.body:响应体,也就是html代码,默认是byte类型 response.body.decode():变为字符串类型 response.request.headers:当前响应的请求头 response.xpath("//h3/text()" ).extract():调试xpath,查看xpath可不可以取出数据 [s] Available Scrapy objects: [s] scrapy scrapy module (contains scrapy.Request, scrapy.Selector, etc) [s] crawler <scrapy.crawler.Crawler object at 0x000001AB6A39EF10 > [s] item {} [s] request <GET https://movie.douban.com/top250> [s] response <403 https://movie.douban.com/top250> [s] settings <scrapy.settings.Settings object at 0x000001AB6ADAE0D0 > [s] spider <DefaultSpider 'default' at 0x1ab6b239d30 > [s] Useful shortcuts: [s] fetch(url[, redirect=True ]) Fetch URL and update local objects (by default, redirects are followed) [s] fetch(req) Fetch a scrapy.Request and update local objects [s] shelp() Shell help (print this help) [s] view(response) View response in a browser
Settings.py 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 BOT_NAME = 'yangguang' SPIDER_MODULES = ['yangguang.spiders' ] NEWSPIDER_MODULE = 'yangguang.spiders' ROBOTSTXT_OBEY = True DOWNLOAD_DELAY = 3 DEFAULT_REQUEST_HEADERS = { 'Accept' : 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8' , 'Accept-Language' : 'en' , "Cookie" : "rfnl=https://www.guazi.com/sjz/dazhong/; antipas=2192r893U97623019B485050817" , } Cookie设置COOKIES_ENABLED默认为true,下次请求带上cookie ITEM_PIPELINES = { 'yangguang.pipelines.YangguangPipeline' : 300 , }
存储数据 命令导出 您可以使用scrapy crawl myspider 命令从命令行运行您的scraper 。如果要创建输出文件,则必须设置要使用的文件名和扩展名:
1 2 3 4 5 scrapy crawl myspider -o data.json scrapy crawl myspider -o data.csv scrapy crawl myspider -o data.xml
Scrapy有自己的内置工具来生成json,csv,xml和其他序列化格式。如果要指定生成的文件的相对路径或绝对路径,或者从命令行设置其他属性,也可以执行此操作:
1 2 3 4 scrapy crawl reddit -s FEED_URI='/home/user/folder/mydata.csv' -s FEED_FORMAT=csv scrapy crawl reddit -s FEED_URI='mydata.json' -s FEED_FORMAT=json
使用Item Pipeline导出 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 1 、csv格式 import csv 在__init__方法中: self.file = open('../moot2.csv' ,'a' ,encoding='utf-8' ,newline='' ) self.writer = csv.writer(self.file,delimiter=';' ) self.writer.writerow(['表头1' ,'表头2' ,.....]) 在process_item中: self.writer.writerow([item['title' ],item['price' ],......]) 在close_spider中: self.file.close() 2 、json格式 import json 在__init__方法中: self.file = open('../moot2.json' ,'a' ,encoding='utf-8' ) 在process_item中 : jsonstr = json.dumps(dict(item),ensure_ascii=False ) self.file.write(jsonstr+'\n' ) 在close_spider中: self.file.close() 3 、mysql存储 import pymysql 在__init__或者open_spider方法中: self.conn = pymysql.connect(host='IP/localhost' ,user='root' ,password='123456' ,port=3306 ,db='数据库名' ) self.cursor = self.conn.cursor() 在process_item中: sql = "insert into tb_moot values(null,%s,%s,)" self.cursor.execute(sql,(item['title' ],item['price' ],.......)) self.conn.commit() 在close_spider中,关闭数据库资源: self.cursor.close() self.conn.close()
正则表达式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 \d 一个数字 \w 数字字母下划线 .代表任意字符 \s 代表空白字符 \d{3} 3个数字 \d{3,6} 3到6位数字 + 代表至少一个 * 号代表可有可无 ? 代表可以没有,有只能1个 […] 用来表示一组字符,单独列出,例如,[amk]匹配a,m或k [^…] 不在[]中的字符,例如,[^abc]匹配除了a,b,c之外的字符 * 匹配0个或多个的表达式 + 匹配1个或者多个的表达式 ? 匹配0个或1个由前面的正则表达式定义的片段,非贪婪方式 {n} 精确匹配n次前面的表示 {n,m} 匹配n到m次由前面的正则表达式定义的片段,贪婪模式 a|b 匹配a或者b () 匹配括号内的表达式,也表示一个组 \D 匹配任意非数字 \A 匹配字符串开始 \Z 匹配字符串结束,如果存在换行,只匹配换行前的结束字符串 \G 匹配最后匹配完成的位置 \n 匹配一个换行符 \t 匹配一个制表符 ^ 匹配字符串的开头 $ 匹配字符串的末尾 . 匹配任意字符,除了换行符,当re.DOTALL标记被指定时,则可以匹配包括换行符在内的任意字符
Python:re模块
Xpath
1 2 3 4 5 6 7 //title/text():选择title标签内的文字。 //a/@href:选择a标签的href属性。 //a[contains(@href, "image")]/@href:选择a标签中href属性值包含image的a标签,再获取它的href属性 获取所有超链接中href中带有images的超链接内容。 mystr1 = Selector(text=ahtml).xpath('//ul[@id="list2"]/li//a[contains(@href,"images")]/@href').extract() 获取所有超链接页面的href地址 mystr1 = Selector(text=ahtml).xpath('//ul[@id="list2"]/li/div[2]/a/@href').extract()
数据清洗分析 使用IDEA打包 1) 右键项目 Open Module Settings
2)找到左侧 Artifacts,点击中间的 +号,选择JAR,然后选择“”“From Module With dependicies”
3)选择要执行的Main方法
4)点击idea上面的Builde菜单,选择Builder Artifacts。然后builder即可(如果改了代码,直接点击Rebuild)
MapReduce总结 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 JobClass类继承 Configured 实现 Tool 接口 在Mian 方法中: 创建Configuration 对象 int run = ToolRunner.run(configuration, new mainclass(), args); System.exit(run); run方法中: 创建Job对象 Job job = Job.getInstance(super .getConf(), "jobname" ); job.setJarByClass(HFDSTest.class); 设置输入类,和输入路径 设置mapper类,和k,v类型 reduce类同上 设置输出类 job.waitForCompletion(true ); 执行返回结果 创建Job对象 job.setPartitionerClass(SumCountPartition.class); job.setNumReduceTasks(4 ); Partitioner<Text,FlowBean> 重写:getPartition() 返回序号 Writable 是 Hadoop 的序列化格式, Hadoop 定义了这样一个 Writable 接口. 一个类要支持可序列化只需实现这个接口即可 另外 Writable 有一个子接口是 WritableComparable, WritableComparable 是既可实现序列化, 也可以对key进行比较,我们这里可以通过自定义 Key 实现 WritableComparable 来实现我们的排序功能 自定义类实现Writable[Comparable] write 序列化 readFields 反序列化 compareTo 排序 在Map阶段做reduce阶段的任务 job.setCombinerClass(MyCombiner.class); 1. 自定义类继承WritableComparator 类2 : 调用父类的有参构造 public OrderGroupComparator () { super (OrderBean.class,true ); } 3 :指定分组的规则(重写方法) 实现compare 方法: public int compare (WritableComparable a, WritableComparable b) { OrderBean first = (OrderBean)a; OrderBean second = (OrderBean)b; return first.getOrderId().compareTo(second.getOrderId()); } 4 :设置job.setGroupingComparatorClass(CustomGroupingComparator.class)
可能会用到的方法:
1 2 3 4 setup(),此方法被MapReduce框架仅且执行一次,在执行Map任务前,进行相关变量或者资源的集中初始化工作。若是将资源初始化工作放在方法map()中,导致Mapper任务在解析每一行输入时都会进行资源初始化工作,导致重复,程序运行效率不高! cleanup(),此方法被MapReduce框架仅且执行一次,在执行完毕Map任务后,进行相关变量或资源的释放工作。若是将释放资源工作放入方法map()中,也会导致Mapper任务在解析、处理每一行文本后释放资源,而且在下一行文本解析前还要重复初始化,导致反复重复,程序运行效率不高!
排序: 如果想要全局排序,吧reduce数设为1,可在map阶段进行一次排序,reduce进行归并二次排序
数据去重: –分组在reduce只取一个value输出
非法过滤+去重+计数: –map过滤 Counter计数 去重
统计平均值案例: – reduce累加处理
TopN案例: – 如果是每一类的TopN则只有在reduce处理
MapReduce的Join操作: – 正常处理 –缓存优化
MapReduce数据清洗+排序:
找共同好友案例:
1 2 3 4 5 6 compareTo 方法用于将当前对象与方法的参数进行比较。 如果指定的数与参数相等返回 0。 如果指定的数小于参数返回 -1。 如果指定的数大于参数返回 1。 例如: o1.compareTo(o2); 返回正数的话,当前对象(调用 compareTo 方法的对象 o1)要 排在比较对象(compareTo 传参对象 o2)后面,返回负数的话,放在前面
MapReduce中计数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 context.getCounter(groupName, counterName).increment(1 ); context.getCounter(counterName).increment(1 ); increment(long incr) public enum LogProcessorCounter { Test_Count; } Counters counters=Job.getCounters(); Counter counter1 = counters.findCounter(groupName, counterName); Counter counter2 = counters.findCounter(LogProcessorCounter.Test_Count); long value = counter1.getValue();
Java字符串处理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 SimpleDateFormat sdf_1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss" ); SimpleDateFormat sdf_2 = new SimpleDateFormat("yyyy-MM-dd HH:mm" ); String dateStr = "2021-7-24 20:56:17" ; Date date = sdf_1.parse(dateStr); String newDateStr = sdf_2.format(date); Calendar cal = Calendar.getInstance(); cal.setTime(date); int year = cal.get(Calendar.YEAR);int week = cal.get(Calendar.DAY_OF_WEEK);System.out.println(year); System.out.println(week); String no = "110101197406313981" ; System.out.println(no.substring(6 ,10 )); System.out.println(no.substring(10 ,12 )); System.out.println(no.substring(12 ,14 )); String carno = "赣G 67EF2" ; String newCarno = carno.replace(" " ,"-" ); System.out.println("新的车牌号是:" +newCarno);
TopN 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 jack,88 rose,90 tom,45 cat,99 eclfsd,95 fsdf,79 e3r4,25 ppp,96 oip,65 iuu,37 adf,87 利用TreeMap进行key自然排序, TreeMap<Integer,String> treeMap = new TreeMap<Integer, String>(); map中存入数据: treeMap.put(分数,名字); if (treeMap.size() > 3 ){ treeMap.remove(treeMap.firstKey()); } reduce中存入数据: treeMap.put(key.get(),values.iterator().next().toString()); 输出在cleanup方法中: for (Integer score : treeMap.keySet()){ context.write(new IntWritable(score),new Text(treeMap.get(score))); }
Join 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 1 、创建一个工具类,实现Writable接口: private String 标示字段 private String 数据字段 public void write (DataOutput dataOutput) throws IOException { dataOutput.writeUTF(tag); dataOutput.writeUTF(data); } public void readFields (DataInput dataInput) throws IOException { this .tag = dataInput.readUTF(); this .data = dataInput.readUTF(); } 2 、在mapper中判断当前读取的文件是哪一个: String filename=((FileSplit)context.getInputSplit()).getPath().getName(); if (filename.contains("customers" )){ 实体对象的标示设置 } else { 实体对象的标示设置 } 输出(客户ID,自定义对象) 3 、reduce里面: 循环values(里面装的是1 条客户数据,和该客户对应的所有订单数据),分别取出存储。 客户使用字符串即可 订单使用List<String> 循环完毕,再次循环List组合数据输出到文件中即可 job.addCacheFile(new Path("D:\\mapreduce\\join\\customers.csv" ).toUri()); HashMap<String,String> pdInfoMap = new HashMap<String,String>(); URI uri = context.getCacheFiles()[0 ]; BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(uri.getPath()))); String line=null ; while ((line=br.readLine())!=null ){ String[] fields = line.split("," ); pdInfoMap.put(fields[0 ],fields[1 ]+"," +fields[2 ]); } br.close();
TreeMap按value排序 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 protected void cleanup (Context context) throws IOException, InterruptedException { List<Map.Entry<String, Integer>> list = new ArrayList<Map.Entry<String, Integer>>(treeMap.entrySet()); Collections.sort(list,new Comparator<Map.Entry<String, Integer>>() { public int compare (Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) { return -o1.getValue().compareTo(o2.getValue()); } }); int size = 0 ; for (Map.Entry<String, Integer> e: list) { context.write(new Text(e.getKey()),new IntWritable(e.getValue())); size++; if (size > 3 ){ break ; } } }
Hive 创建 1 2 3 4 5 6 7 8 9 10 11 12 create [external ] table [if not exists ] table_name ( col_name data_type [comment '字段描述信息' ] col_name data_type [comment '字段描述信息' ]) [comment '表的描述信息' ] [partitioned by (col_name data_type,...)] [clustered by (col_name,col_name,...)] [sorted by (col_name [asc |desc ],... ) into num_buckets buckets] [row format row_format] [storted as ....] [location '指定表的路径' ]
create table 创建一个指定名字的表。如果相同名字的表已经存在,则抛出异常;用户可以用 IF NOT EXISTS 选项来忽略这个异常。
external 可以让用户创建一个外部表,在建表的同时指定一个指向实际数据的路径 (LOCATION),Hive 创建内部表时,会将数据移动到数据仓库指向的路径;若创建外部 表,仅记录数据所在的路径,不对数据的位置做任何改变。在删除表的时候,内部表的 元数据和数据会被一起删除,而外部表只删除元数据,不删除数据。
comment 表示注释,默认不能使用中文
partitioned by 表示使用表分区,一个表可以拥有一个或者多个分区,每一个分区单独存在一个目录下 .
clustered by 对于每一个表分文件, Hive可以进一步组织成桶,也就是说桶是更为细粒 度的数据范围划分。Hive也是 针对某一列进行桶的组织。
sorted by 指定排序字段和排序规则
row format 指定表文件字段分隔符
storted as指定表文件的存储格式, 常用格式:SEQUENCEFILE, TEXTFILE, RCFILE,如果文件 数据是纯文本,可以使用 STORED AS TEXTFILE。如果数据需要压缩,使用 storted as SEQUENCEFILE。
location
指定表文件的存储路径
查询 1 2 3 4 5 6 7 8 SELECT [ALL | DISTINCT ] select_expr, select_expr, ...FROM table_reference[WHERE where_condition] [GROUP BY col_list [HAVING condition]] [CLUSTER BY col_list | [DISTRIBUTE BY col_list] [SORT BY | ORDER BY col_list] ] [LIMIT number ]
order by 会对输入做全局排序,因此只有一个reducer,会导致当输入规模较大时,需要 较长的计算时间。
sort by不是全局排序,其在数据进入reducer前完成排序。因此,如果用sort by进行排 序,并且设置mapred.reduce.tasks>1,则sort by只保证每个reducer的输出有序,不保证全 局有序。
distribute by(字段)根据指定的字段将数据分到不同的reducer,且分发算法是hash散列。
cluster by(字段) 除了具有distribute by的功能外,还会对该字段进行排序. 因此,如果distribute 和sort字段是同一个时,此时, cluster by = distribute by + sort by
实用函数总结 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 日期比较函数: datediff() 第一个参数减第二个参数 日期格式: date_format date_format('2019-12-12','yyyy-MM') If函数: if if(1=1,100,200) 可嵌套 条件判断函数: CASE CASE a WHEN b THEN c [WHEN d THEN e]* [ELSE f] END 如果a等于b,那么返回c;如果a等于d,那么返回e;否则返回f 字符串连接函数: concat 带分隔符字符串连接: concat_ws 字符串截取函数: substr,substring 去空格函数:trim 分割字符串函数: split 返回值: array 字段数据进行拆分: explode 将Map和Array拆分 LATERAL VIEW: LATERAL VIEW explode(split(goodid,','))goods as goods_id2 goods相当于一个虚拟表 Over窗口函数: Count(*) over(xxxx) 反射函数reflect: reflect reflect("java.lang.Math","max",col1,col2) 调用java中的自带函数,秒杀一切udf函数。 RANK、ROW_NUMBER、DENSE_RANK OVER的使用使用这几个函数,可以实现分组求topN 类型转换函数: cast select cast ('1' as bigint ) from tableName; 1. 使用 Round() 函数,如 Round(number,2) ,其中参数2表示保留两位有效数字,但是只负责四舍五入到两位小数,但是不负责截断 例如 ROUND(3.141592653, 2) 结果为3.140000000; 2. 使用 Convert(decimal(10,2),number) 实现转换,其中参数2表示保留两位有效数字 例如Convert(decimal(10,2),3.1415) 结果为3. 3. 使用 cast(number as decimal(10,2)) 实现转换,其中参数2表示保留两位有效数字 例如cast(3.1415 as decimal(10,2)) 结果为3.14; 4. 备注:CAST与CONVERT都可以执行数据类型转换,且都默认实现了四舍五入 如果目标表的字段是decimal(10,4)型的,从源表查数据:select round (field ,2 ) from sourcetable;sql查询的结果字段的小数位可能是2位,但是该数据插入目标表,字段的小数位数就是4位后面2位以0补充; select decimal (field ,10 ,2 ) from sourcetable;该数据插入目标表,字段的小数位就是2位 select decimal (field1/field2*100 ,10 ,2 ) || '%' from tablename;select round (field1/field2*100 ,2 ) || '%' from tablename;select concat (round (field1/field2*100 ,2 ) ,'%' ) from tablename;
HiveUDF 1 2 3 4 5 6 7 8 9 hive-exec 添加依赖 1. 继承UDF 2. 实现evaluate方法(可重裁实现多个evaluate方法,以实现不同需求) 3. 导出类jar包,注意指定main方法 1. 将jar包添加到Hive: add jar linux_path # 0.14版才开始支持 2. 创建临时函数: create [temporary] function [if not exists] f_name classpath 删除临时函数: drop [temporary] function [if exists] f_name
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import org.apache.hadoop.hive.ql.exec.UDF;import org.apache.hadoop.io.Text; public class RMQuotes extends UDF { public Text evaluate (Text str) { if (str != null ){ return new Text(str.toString().replaceAll("\"" , "" )); }else return null ; } public static void main (String[] args) { System.out.println(new RMQuotes().evaluate(new Text("\"hbhb\" \"GET /SSS/DDD/FFF?id=8 HTTP/1.1\"" ))); } }
Spark总结 关于RDD RDD不存储数据,数据流动处理.
spark中job,stage,task的关系
1、一个应用程序对应多个job,一个job会有多个stage阶段,一个stage会有多个task
2、一个应用程序中有多少个行动算子就会创建多少个job作业;一个job作业中一个宽依赖会划分一个stage阶段;同一个stage阶段中最后一个算子有多少个分区这个stage就有多少个task,因为窄依赖每个分区任务是并行执行的,没有必要每个算子的一个分区启动一个task任务。如图所示阶段2最后一个map算子是对应5个分区,reducebykey是3个分区,总共是8个task任务。
3、当一个rdd的数据需要打乱重组然后分配到下一个rdd时就产生shuffle阶段,宽依赖就是以shuffle进行划分的。
spark中job,stage,task的关系 - 简书 (jianshu.com)
基本使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 1 ) 从集合(内存)中创建 RDD val sparkConf =new SparkConf ().setMaster("local[*]" ).setAppName("spark" ) val sparkContext = new SparkContext (sparkConf)val rdd1 = sparkContext.parallelize( List (1 ,2 ,3 ,4 ))val rdd2 = sparkContext.makeRDD( List (1 ,2 ,3 ,4 ))rdd1.collect().foreach(println) 底层代码实现来讲,makeRDD 方法就是parallelize 方法 2 ) 从外部存储(文件)创建RDD val sparkConf =new SparkConf ().setMaster("local[*]" ).setAppName("spark" ) val sparkContext = new SparkContext (sparkConf)val fileRDD: RDD [String ] = sparkContext.textFile("input" ) fileRDD.collect().foreach(println) sparkContext.stop() 3 ) 从其他 RDD 创建... 4 ) 直接创建 RDD (new )使用 new 的方式直接构造RDD ,一般由Spark 框架自身使用。 累加器 原理:day03笔记
1 2 3 4 5 6 7 8 9 val source: RDD [String ] = sc.textFile( "hdfs://node02:8020/data/wordcount.txt" ) val words: RDD [String ] = source.flatMap { line => line.split(" " ) } val wordsTuple: RDD [(String , Int )] = words.map { word => (word, 1 ) } val wordsCount: RDD [(String , Int )] = wordsTuple.reduceByKey { (x, y) => x + y } wordsCount.collect().foreach(print) }
工作原理 切片 使用textFile方法,有两个参数,第一个是文件路径,第二个是numpartitions。 如果我们不传第二个参数的话,minpartitions数就是采取默认的(用local指定的并行数和2取最小值) 如果我们传入第二个参数后,numpartitions会和参数保持一致。 ,最终一步一步的传入到hadoop的切片处。
Spark分区原理和分区后数据的划分
RDD 的 Shuffle 和分区 Spark 的 Shuffle 发展大致有两个阶段: Hash base shuffle
和 Sort base shuffle
默认的分区数量是和 Cores 的数量有关的, 也可以通过如下几种方式修改或者重新指定分区数量
1 2 3 4 5 parallelize 设置 textFile 设置 通过 coalesce 算子指定 通过 repartition 算子指定 repartition 算子本质上就是 coalesce(numPartitions, shuffle = true )
RDD 的 Shuffle 是什么 让来自相同 Key 的所有数据都在 reduceByKey
的同一个 reduce
中处理, 需要执行一个 all-to-all
的操作, 需要在不同的节点(不同的分区)之间拷贝数据, 必须跨分区聚集相同 Key 的所有数据, 这个过程叫做 Shuffle
.
底层逻辑
Master Daemon
负责管理 Master
节点, 协调资源的获取, 以及连接 Worker
节点来运行 Executor
, 是 Spark 集群中的协调节点
Worker Daemon
Workers
也称之为叫 Slaves
, 是 Spark 集群中的计算节点, 用于和 Master 交互并管理 Executor
.
当一个 Spark Job
提交后, 会创建 SparkContext
, 后 Worker
会启动对应的 Executor
.
Executor Backend
上面有提到 Worker
用于控制 Executor
的启停, 其实 Worker
是通过 Executor Backend
来进行控制的, Executor Backend
是一个进程(是一个 JVM
实例), 持有一个 Executor
对象
另外在启动程序的时候, 有三种程序需要运行在集群上:
Driver
Driver
是一个 JVM
实例, 是一个进程, 是 Spark Application
运行时候的领导者, 其中运行了 SparkContext
.
Driver
控制 Job
和 Task
, 并且提供 WebUI
.
Executor
Executor
对象中通过线程池来运行 Task
, 一个 Executor
中只会运行一个 Spark Application
的 Task
, 不同的 Spark Application
的 Task
会由不同的 Executor
来运行
Stage 的划分是由 Shuffle 操作来确定的, 有 Shuffle 的地方, Stage 断开
数据可视化 jinjia2语法 1 2 3 4 5 6 7 {% for k in names %} {% if k == 'rose' %} <li style="color:Red">{{loop.index}}-{{k}}</li> {% else %} <li>{{loop.index}}-{{k}}</li> {% endif%} {% endfor%}
Sqlarchemy查询 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 sql = ‘select xx,xxfrom tb_xxwhere . group by . having… order by … ’ result = db.session.execute(sql).fetchall() result = db.session.query(Product).all() result = Product.query.all() result = db.session.query(Product.p_name,Product.p_price).all() result = Product.query.with_entities(Product.p_name,Product.p_price).all() result = db.session.query(Product).filter(Product.p_price> 10 ,Product.p_addr=='武汉' ).order_by(Product.p_id.desc()).all() result = db.session.query(func.count(Product.p_id)).scalar() result = db.session.query(Product.p_addr,func.count(Product.p_addr)).group_by(Product.p_addr).all() select s_addr,sum(case s_sexwhen‘男’ then 1 else 0 end) as nan,sum(case s_sexwhen '女' then 1 else 0 end) as nvfrom tb_studentgroup by s_addr result = db.session.query(Student.s_addr,(func.sum(func.if_(Student.s_sex=='男' ,1 ,0 ))).label('nan' ),(func.sum(func.if_(Student.s_sex=='女' ,1 ,0 ))).label('nv' )).all() sql = '''select cla.c_name,avg(sc_score) from tb_studentstu inner join tb_classclaon stu.s_cid= cla.c_id inner join tb_scorescon stu.s_id= sc.sc_sid where sc_course= 'Java高级编程' group by cla.c_name''‘ result = db.session.execute(sql).fetchall() result = db.session.query(Student).join(ClassInfo,Student.s_cid==ClassInfo.c_id).join(Score,Student.s_id==Score.sc_sid).filter(Score.sc_course=='Java高级编程').with_entities(ClassInfo.c_name,func.avg(Score.sc_score)).group_by(ClassInfo.c_name).all()
Scala总结 一、基础知识: 1 2 3 4 5 6 7 8 9 10 11 12 1、常量和变量:尽量使用常量,可以避免很多问题 2、数据类型: Any AnyVal(等价于java的基本类型):Long、Int、StringOps、Unit AnyRef(等价于java的引用类型):Java中的所有类到Scala都是AnyRef、 Scala中所有集合、定义的所有类、Null(只有一个null) Nothing:没有对象也没有子类,所有类的子类 3、数据类型之间的转换: AnyVal值类型转换:.toInt、.toLong AnyRef类型的转换:.isInstanceof[]、.asInstanceof[] 4、流程控制 顺序、选择(有返回值)、循环(没有返回值)
二、函数式编程 函数基本语法
高阶函数
高阶函数可以接受函数作为参数,比如常见的有map,filter,reduce等函数,它们可以接受一个函数作为参数。
高阶函数的返回值也可以是函数。
高阶函数包含:作为值的函数、匿名函数、闭包、柯里化等等
1、作为值的函数
函数就像和数字、字符串一样,可以将函数传递给一个方法,如List的map方法,可以接收一个函数
1 2 val func:Int => String = (num:Int ) => "*" * numprintln((1 to 10 ).map(func))
2、匿名函数
没有赋值给变量的函数就是匿名函数
1 2 3 (1 to 10 ).map(num => "*" * num) (1 to 10 ).map("*" * _)
3、柯里化
将原先接受多个参数的方法转换为多个只有一个参数的参数列表的过程。
1 2 3 def add (x:Int )(y:Int ) = { x + y }
相当于
1 2 3 4 def add (x:Int ) = { (y:Int ) => x + y } println(add(3 )(4 ))
4、闭包
闭包其实就是一个函数,返回值依赖于声明在函数外部的一个或多个变量。
1 2 3 4 val y = 10 def add (x:Int ): Int ={ x+y }
三、面向对象 样例类和普通类的区别:
样例类默认实现了序列化
样例类使用的时候不需要new语法
样例类可以进行模式匹配
样例类实现了apply方法、toString方法、equals方法、hashCode方法、copy方法
四、集合 1、数组:定长数组(Array)、变长数据(ArrayBuffer)
2、列表:不可变列表(List)、可变列表(ListBuffer)
3、Set:
4、Map
使用+=添加元素
使用-=删除元素
使用++=追加一个数组、列表
五、隐式转换 六、模式匹配 七、泛型 快速测试 语言实现 Java - StringTokenizer类
Scala - Spark基本方法都支持 方便快速测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 val stringList = List ("Hello Scala Hbase kafka" , "Hello Scala Hbase" , "Hello Scala" , "Hello" ) val wordList: List [String ] = stringList.flatMap(str=>str.split(" " )) val wordToWordsMap: Map [String , List [String ]] = wordList.groupBy(word=>word) val wordToCountMap:Map [String ,Int ] = wordToWordsMap.map(tuple=>(tuple._1, tuple._2.size)) val sortList: List [(String ,Int )] = wordToCountMap.toList.sortWith { (left, right) => { left._2 > right._2 } } val resultList: List [(String , Int )] = sortList.take(3 ) println(resultList)
ScrapyShell SparkShell