大数据竞赛流程笔记

赛事介绍:

阶段一(15):
Hadoop分布式、HadoopHA、hive、spark、kafka、flume、zookeeper、Sqoop
(HBase、Storm)
阶段二(20):scrapy、正则表达式、xpath、数据存储(csv、json、mysql)
阶段三(25):mapreduce、spark、hive
阶段四(20):flask、jinja2、echarts、mysql语法
阶段五(15):写报告

团队素质(5):

环境搭建

前置操作

集群hostname,hosts配置

  1. 网络环境配置

    关闭防火墙
    systemctl stop firewalld
    永久关闭(禁用)防火墙
    systemctl disable firewalld
    启动防火墙
    systemctl start firewalld

  2. 关闭SELinux

    修改/etc/selinux/config 文件

    将SELINUX=enforcing改为SELINUX=disabled

    重启机器即可

  3. 配置SSH免密登录

    在master机器上输入:ssh-keygen–t rsa回车
    ssh-copy-id –iroot@主机名回车输入主机对应的用户密码

    ssh root@node01

  4. JDK

  5. Mysql

  6. 可选:

    • 配置同步脚本

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      #!/bin/bash
      #1 获取输入参数个数,如果没有参数,直接退出
      pcount=$#
      if [ $pcount -lt 1 ]
      then
      echo Not Enough Arguement!
      exit;
      fi

      #2. 遍历集群所有机器
      # 也可以采用:
      # for host in node{01..03};
      for host in master slave1 slave2
      do
      echo ==================== $host ====================
      #3. 遍历所有目录,挨个发送
      for file in $@
      do
      #4 判断文件是否存在
      if [ -e $file ]
      then
      #5. 获取父目录
      pdir=$(cd -P $(dirname $file); pwd)
      echo pdir=$pdir

      #6. 获取当前文件的名称
      fname=$(basename $file)
      echo fname=$fname

      #7. 通过ssh执行命令:在$host主机上递归创建文件夹(如果存在该文件夹)
      ssh $host "mkdir -p $pdir"

      #8. 远程同步文件至$host主机的$USER用户的$pdir文件夹下
      rsync -av $pdir/$fname $USER@$host:$pdir
      else
      echo $file does not exists!
      fi
      done
      done

  • 显示进程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    #!/bin/bash

    # 执行jps命令查询每台服务器上的节点状态
    echo ======================集群节点状态====================

    for i in master slave1 slave2
    do
    echo ====================== $i ====================
    ssh root@$i '/opt/module/jdk1.8.0_141/bin/jps'
    done
    echo ======================执行完毕====================

Hadoop

集群

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# core-site
<configuration>
<!-- 配置hadoop中Namenode的地址,node01为hostname主机名 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://node01:9000</value>
</property>
<!-- 伪分布式该路径自己指定,一般设置为hadoop下的data/tmp,该目录会自动创建,标识hadoop运行时产生文件的存储目录
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/temp/data/tmp</value>
</property>
-->
</configuration>
1
2
3
4
5
6
7
8
# hdfs
<configuration>
<!-- 备份复本数量,伪分布式则修改为1 -->
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
</configuration>
1
2
# mapred-env.sh
添加环境变量 Java
1
2
3
4
5
6
7
8
# mapred
<configuration>
<!-- 指定MapReduce运行时框架,这里指定在Yarn上,默认是local -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
1
2
3
4
5
6
7
8
9
10
11
12
# Yarn
<configuration>
<!-- 指定yarn的resourceManager地址 -->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>node01</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
</configuration>
1
2
3
4
5
6
7
8
9
10
11
1.配置slaves文件,填入datanode的节点
2.将hadoop完全拷贝至其他机器上
3.环境变量文件也需要拷贝,并且source一下
4.格式化hadoop ./hdfs namenode -format
5.启动

HDFS:
http://master:50070/
yarn集群访问查看
http://master:8088/cluster

高可用

Zookepper集群搭建

zookeeper(集群之间进行协调管理,同步,故障检测等作用)
        1)解压zookeeper到/usr/local/src目录下,重命名
        2)配置环境变量,并且生效
        3)配置文件: zoo_sample.cfg拷贝一个
              cp zoo_sample.cfg zoo.cfg
              修改内容:
             dataDir=/usr/local/src/zookeeper/zkdata/data    (自己创建的数据存放目录)
            dataLogDir=/usr/local/src/zookeeper/zkdata/log    (自己创建的日志目录)
             server.1=主机名1:2888:3888         (集群配置)
            server.2=主机名2:2888:3888         (集群配置)
            server.3=主机名3:2888:3888         (集群配置)
        4)在刚才配置的dataDir目录下新建一个 名为myid的文件,vim myid
              在里面写上数字编号:1
       5)将zookeeper远程拷贝至其他机器上。
       6)修改拷贝后其他机器上zookeeper中myid的编号
       7)将环境变量也同步拷贝到其他机器上,并且在其他机器上source一下
       8)分别在集群机器上启动zookeeper:
           启动:         zkServer.sh start
           查看状态:  zkServer.sh status
           停止:         zkServer.sh stop

高可用配置

Hadoop-env/Yarn-env

a) Hadoop-env.sh 配置jdk路径

b) Yarn-env.sh配置jdk路径

core-site

配置core-site.xmla 配置zookeeper和hadoop通信地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

<!-- 指定hdfs的nameservice为,如myns1,统一对外提供服务的名字
不再单独指定某一个机器节点-->
<property>
<name>fs.defaultFS</name>
<value>hdfs://myns1/</value>
</property>
<!-- 指定hadoop数据存储目录,自己指定 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/soft/hadoop/data</value>
</property>
<!-- 依赖zookeeper,所以指定zookeeper集群访问地址,名字为不同集群机器的hostname -->
<property>
<name>ha.zookeeper.quorum</name>
<value>mymaster:2181,myslave1:2181,myslave2:2181</value>
</property>
hdfs-site

a) 配置hdfs-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
<!-- 指定副本的数量 -->
<property>
<name>fs.replication</name>
<value>3</value>
</property>
<!-- 指定hdfs的nameservice为myns1(在core-site.xml配置的一致) -->
<property>
<name>dfs.nameservices</name>
<value>myns1</value>
</property>
<!-- hadoopHA下面有两个Namenode,分别是nn1,nn2 -->
<property>
<name>dfs.ha.namenodes.myns1</name>
<value>nn1,nn2</value>
</property>

<!-- nn1的RPC通信地址,myns1为前后对应的,mymaster是其中一个namenode地址 -->
<property>
<name>dfs.namenode.rpc-address.myns1.nn1</name>
<value>mymaster:9000</value>
</property>
<!-- nn1的http通信地址,myns1为前后对应的 -->
<property>
<name>dfs.namenode.http-address.myns1.nn1</name>
<value>mymaster:50070</value>
</property>

<!-- nn2的RPC通信地址,myns1为前后对应的,myslave1是其中一个namenode地址 -->
<property>
<name>dfs.namenode.rpc-address.myns1.nn2</name>
<value>myslave1:9000</value>
</property>
<!-- nn2的http通信地址,myns1为前后对应的 -->
<property>
<name>dfs.namenode.http-address.myns1.nn2</name>
<value>myslave1:50070</value>
</property>

<!-- 指定namenode的edits元数据在JournalNode上的存放位置 -->
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://mymaster:8485;myslave1:8485;myslave2:8485/myns1</value>
</property>
<!-- 指定namenode在本地磁盘存放数据的位置
journaldata目录自己创建并且指定
-->
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/opt/soft/hadoop/journaldata</value>
</property>

<!-- 开启NameNode失败自动切换,监控体系发现activi挂了,把备用的启用 -->
<property>
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
</property>

<!-- 配置失败自动切换实现方式 -->
<!-- 此处配置在安装的时候切记检查不要换行 ,注意myns1是上面配置的集群服务名 -->
<property>
<name>dfs.client.failover.proxy.provider.myns1</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>

<!-- 配置隔离机制方法,多个机制用换行符分隔,即每个机制暂用一行
主要是防止脑裂,确保死的彻底 -->
<property>
<name>dfs.ha.fencing.methods</name>
<value>
sshfence
shell(/bin/true)
</value>
</property>

<!-- 使用sshfence隔离机制时需要ssh免登陆,注意这里是id_rsa文件地址 -->
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/root/.ssh/id_rsa</value>
<!--
<value>/home/账户名/.ssh/id_rsa</value>
如果直接使用root,则路径为 /root/.ssh/id_rsa
-->
</property>

<!-- 配置sshfence隔离机制超时时间 -->
<property>
<name>dfs.ha.fencing.ssh.connect-timeout</name>
<value>30000</value>
</property>

<!-- 在NN和DN上开启WebHDFS (REST API)功能,不是必须 -->
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>

mapred-site
1
2
3
4
5
6
<!--mapreduce运行的平台 ,默认Local-->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>

yarn-site
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
<!-- 开启YARN高可用 -->
<property>
<name>yarn.resourcemanager.ha.enabled</name>
<value>true</value>
</property>

<!-- 指定RM的cluster id,该value可以随意 -->
<property>
<name>yarn.resourcemanager.cluster-id</name>
<value>yrc</value>
</property>

<!-- 指定RM的名字 -->
<property>
<name>yarn.resourcemanager.ha.rm-ids</name>
<value>rm1,rm2</value>
</property>

<!-- 分别指定RM的地址 -->
<property>
<name>yarn.resourcemanager.hostname.rm1</name>
<value>mymaster</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm2</name>
<value>myslave1</value>
</property>

<!-- 指定zookeeper集群地址 -->
<property>
<name>yarn.resourcemanager.zk-address</name>
<value>mymaster:2181,myslave1:2181,myslave2:2181</value>
</property>

<!-- 要运行MapReduce程序必须配置的附属服务 -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>

<!-- Yarn集群的聚合日志最长保留时间 -->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>86400</value>
</property>
<!-- 启用自动恢复 -->
<property>
<name>yarn.resourcemanager.recovery.enabled</name>
<value>true</value>
</property>

<!-- 指定resourcemanager的状态信息存储在zookeeper集群上 -->
<property>
<name>yarn.resourcemanager.store.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
</property>

slaves
1
slaves 集群节点
启动集群

配置文件修改完毕,同步发送到其他机器上
scp -r /opt/soft/hadoop/etc/hadoop hadoop2:/opt/soft/hadoop/etc/
scp -r /opt/soft/hadoop/etc/hadoop hadoop3:/opt/soft/hadoop/etc/

1、分别每个机器上启动zookeeper集群(jps看到QuorumPeerMain进程)
bin/zkServer.sh start
2、每个机器上启动 journalnode(最好是奇数台机器)
(hadoop-daemon.sh start journalnode)
启动完毕后jps能看到JournalNode进程
3、在一个namenode节点上进行格式化(配置了两个namenode机器,只需要一台电脑上格式化)
(格式化成功后在配置的data目录下,有个dfs,里面有name和data,因为没有启动集群,data是空的因为格式化后暂时没有内容)
(name中存储元数据,data存真数据)
(name下有current,下面有4个文件,就是元数据信息<fsimage…..,seen_txid,VERSION>)
4、注意,在格式化后namenode的机器上找到存放数据的data(core-site.xml里面配置的元数据存储目录)目录,然后拷贝到另外一个备份的data目录下,保持初始元数据一致。
下面是例子,具体目录自己决定(只需要拷贝到另外一个namenode节点机器上即可)。
scp -r data hadoop02:/opt/temp
5、在任何一个namenode上执行zkfc -formatZK操作(#格式化zookeeper),命令如下:
hdfs zkfc -formatZK
6、start-dfs.sh
7、start-yarn.sh(在自己配置的对应任何一台resourcemanager机器上执行)
8、测试
1)分别在每个机器上jps查看状态
2)分别访问两个namenode机器上的50070界面,查看状态是否一个是Active另外一个是Standby
3)也可以测试访问一下yarn集群
4)高可用测试,测试主备切换
(将active状态的namenode进程干掉,kill -9 xxxxid)
(可以再使用单节点启动方式启动namenode:hadoop-daemon.sh start namenode)

在另外一台resourceManager机器上单独启动resourceManager进程

yarn高可用测试和hadoop一样,访问的端口是8088

yarn-daemon.sh start resourcemanager

hadoop-daemon.sh start namenode

启动历史服务器
mr-jobhistory-daemon.sh start historyserver

yarn-daemon.sh start nodemanager
yarn-daemon.sh start resourcemanager

然后两台机器分别启动resourcemanager即可

命令方式查看hadoop状态:hdfs haadmin -getServiceState nn2
命令方式查看yarn状态:yarn rmadmin -getServiceState rm1

测试

在本地创建一个文本文件,随便放入一些单词,以空格分开
a》将本地文件上传至已经启动的hdfs中 hdfs dfs -put /本地目录/本地文件.txt /
b》运行mapreduce示例(/usr/local/src/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar)
hadoop jar hadoop-mapreduce-examples-2.6.0.jar wordcount /hello.txt /test1
注意:/hello.txt是hdfs中的文件路径和名称
/test1是mapreduce结果输出到hdfs的目录,该目录存在则直接报错,所以该目录不能存在

常用命令

随机返回指定行数的样本数据
hdsf dfs -cat /test/gonganbu/scene_analysis_suggestion/* | shuf -n 5

返回前几行的样本数据
hdsf dfs -cat /test/gonganbu/scene_analysis_suggestion/* | head -100

返回最后几行的样本数据
hdsf dfs -cat /test/gonganbu/scene_analysis_suggestion/* | tail -5

……

Hive

1、保证有一台机器上安装好了MySql数据库
2、解压hive 改名 配置环境变量
3、hive中conf下:
复制hive-env.sh 修改
复制hive-site.xml 修改(数据库连接,数据库驱动名,账号,密码,3个地址配置改)
4、将mysql的驱动放入hive中lib目录下,保证hive能够正常连接mysql数据库
5、将hive/lib中jline2.12.xxx的包拷贝到 hadoop/share/hadoop/yarn/lib
原来的jlineXXX老版本要删除掉
6、初始化元数据
schematool -dbType mysql -initSchema

7、直接输入hive启动进入hive命令行:
show databases;
create database test1;
use test1;
create table employee(eid int ,name string ,salary float)
row format delimited
fields terminated by ‘hdfs中文件行中字段的分隔符’
lines terminated by ‘\n’
;
8、load data inpath ‘/hdfs中的文件’ overwrite into table employee;

​ 本地上传:load data local inpath ‘/export/servers/hivedatas/student.csv’ into table student;

9、使用select * from 表做一些查询测试
如果涉及分组则会进行mapreduce操作,得到结果。

在HDFS上默认储存路径:/user/hive/warehouse

hive-env

1
2
3
4
export JAVA_HOME=/opt/soft/jdk          
export HADOOP_HOME=/opt/soft/Hadoop
export HIVE_HOME=/opt/soft/hive
export SPARK_HOME=/usr/lib/spark

hive-site

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<configuration>
<!--连接数据的用户名-->
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>
<!--连接数据的用户名-->
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
</property>
<!--mysql数据库的访问路径,没有路径则自动创建-->
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://node03:3306/hive?createDatabaseIfNotExist=true&amp;useSSL=false</value>
</property>
<!--连接数据库的驱动-->
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>

<!--Hive 运行时结构化日志文件的位置-->
<property>
<name>hive.querylog.location</name>
<value>/opt/soft/hive/tmp</value>
</property>
<!--Hive 作业的本地暂存空间-->
<property>
<name>hive.exec.local.scratchdir</name>
<value>/opt/soft/hive/tmp</value>
</property>
<!--用于在远程文件系统中添加资源的临时本地目录-->
<property>
<name>hive.downloaded.resources.dir</name>
<value>/opt/soft/hive/tmp</value>
</property>
</configuration>

Sqoop

Sqoop详细介绍包括:sqoop命令,原理,流程

通过hadoop的mapreduce进行数据传输

1
2
3
4
5
6
7
export HADOOP_COMMON_HOME= /usr/app/hadoop-2.7.3
export HADOOP_MAPRED_HOME= /usr/app/hadoop-2.7.3
#下面的暂时可以不配置
#export HBASE_HOME=
#export HIVE_HOME=
#export ZOOCFGDIR=

将MySQL相关驱动jar拷贝到 sqoop下的lib目录下

测试:

sqoop-list-tables –connect jdbc:mysql://localhost:3306/bigdata_db – username root –password 123456

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
sqoop-list-tables --connect jdbc:mysql://localhost:3306/bigdata_db -- username root --password 12345

#Sqoop从Mysql数据导入HDFS
sqoop import \
--connect jdbc:mysql://localhost/IP:3306/bigdata_db \
--username root \
--password 123456 \
--target-dir /hdfs目录 #该目录不能存在
--table tb_goods
--m 1 #设置文件数量为1,否则一条记录会形成一个文件

#Sqoop从Hive导出MySql (有用)
sqoop export --connect jdbc:mysql://localhost:3306/bigdata_db?useUnicode=true\&chara cterEncoding=utf-8
--username root
--password 123456
--table tb_sp
--num-mappers 1
--export-dir /user/hive/warehouse/testdb1.db/goods/part-m-00000
--inputfields-terminated-by '\001';
注意:input-fields-terminated-by跟导入时保持一致,如果默认则是 \001


# 总结文档
1、mysql表导入到hdfs
sqoop import --connect jdbc:mysql://localhost/IP:3306/bigdata_db
--username root --password 123456 --target-dir /hdfs目录 --table tb_goods --m 1
--fields-terminated-by '\t'
说明:
#--target-dir 设置hdfs中路径,该路径存在会报错
#--delete-target-dir 加上该参数,目录存在会自动删除
#--m 设置文件数量为1,否则每条记录都会形成一个文件
#--where 'id<3' 加上该参数,会对mysql表加入条件后导入
#--query 'select 列名1,列名2 from 表' 加上该参数,则执行查询具体数据后导入
#--fields-terminated-by 加上该参数后hdfs中文件的分隔符将会按这个设定

2、hdfs导出到mysql表
sqoop export --connect jdbc:mysql://IP地址:3306/bigdata_db?useUnicode=true\&characterEncoding=utf-8
--username root --password 123456 --table tb_goods --export-dir /sp/mytable
--fields-terminated-by '\t' --m 1

3、mysql表导入到hive表(先在hive中创建表,并且加载数据后测试)
sqoop import --m 1 --hive-import --connect jdbc:mysql://IP地址:3306/bigdata_db?useUnicode=true\&characterEncoding=utf-8 --username root --password 123456 --table tb_goods --fields-terminated-by ‘\t' --hive-database testdb1 --hive-table goods;

4、hive表导出到mysql表
sqoop export --connect jdbc:mysql://localhost:3306/bigdata_db?useUnicode=true\&characterEncoding=utf-8 --username root --password 123456 --table tb_sp --export-dir /user/hive/warehouse/testdb1.db/goods/part-m-00000 --input-fields-terminated-by ‘\001’;
#input-fields-terminated-by是hive表对应hdfs中文件数据的分隔符

Spark

1
mv slaves.template slaves 

spark-env.sh 添加 JAVA_HOME 环境变量和集群对应的 master 节点

1
2
3
4
5
6
7
8
9
# 指定 Java Home
export JAVA_HOME=/export/servers/jdk1.8.0

# 指定 Spark Master 地址
export SPARK_MASTER_HOST=node01
export SPARK_MASTER_PORT=7077

分发 , sbin/start-all.sh 启动

1
spark-submit --class org.apache.spark.examples.SparkPi --master spark://master:7077 --executor-memory 1G --total-executor-cores 2 ../examples/jars/spark-examples_2.11-2.0.0.jar 100

配置 HistoryServer

  1. 默认情况下, Spark 程序运行完毕后, 就无法再查看运行记录的 Web UI 了, 通过 HistoryServer 可以提供一个服务, 通过读取日志文件, 使得我们可以在程序运行结束后, 依然能够查看运行过程

  2. 复制 spark-defaults.conf, 以供修改

    1
    2
    3
    cd /export/servers/spark/conf
    cp spark-defaults.conf.template spark-defaults.conf
    vi spark-defaults.conf
  3. 将以下内容复制到spark-defaults.conf末尾处, 通过这段配置, 可以指定 Spark 将日志输入到 HDFS 中

    1
    2
    3
    spark.eventLog.enabled  true
    spark.eventLog.dir hdfs://node01:8020/spark_log
    spark.eventLog.compress true
  4. 将以下内容复制到spark-env.sh末尾, 配置 HistoryServer 启动参数, 使得 HistoryServer 在启动的时候读取 HDFS 中写入的 Spark 日志

    1
    2
    # 指定 Spark History 运行参数
    export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=4000 -Dspark.history.retainedApplications=3 -Dspark.history.fs.logDirectory=hdfs://node01:8020/spark_log"
  5. 为 Spark 创建 HDFS 中的日志目录

    1
    hdfs dfs -mkdir -p /spark_log

1、在linxu中搭建好spark环境(单机或集群都可以)并启动
如果是单机在jps后可以看到:Master和Worker节点进程

2、代码中修改:
val sparkConf = new SparkConf().setMaster(“spark://hadoop05:7077”).setAppName(“Task1”)
说明:hadoop05是linux主机名,7077是spark通信端口

 //指明读取的文件是hdfs文件系统中的文件
  val rdd = sc.textFile("hdfs://hadoop05:9000/myword.txt")
   说明:hadoop05是hdfs所在机器的主机名,9000是hadoop通信端口

//保存的时候也指明保存结果目录是hdfs文件系统
rdd4.coalesce(1).saveAsTextFile("hdfs://hadoop05:9000/out1")

3、在linux服务器中运行jar包:
spark-submit –class com.xyzy.spark3.MyTask1 –executor-memory 1g /opt/test/spark3_jar/spark3.jar
说明:com.xyzy.spark3.MyTask1是代码中的包名+类名

1
2
3
4
5
bin/spark-submit \
--class <main-class>
--master <master-url> \
... # other options
<application-jar> \ [application-arguments]

在提交应用中,一般会同时一些提交参数

参数 解释 可选值举例
–class Spark 程序中包含主函数的类
–master Spark 程序运行的模式(环境) 模式:local[*]、spark://linux1:7077、 Yarn
–executor-memory 1G 指定每个 executor 可用内存为 1G 符合集群内存配置即可,具体情况具体分析。
–total-executor-cores 2 指定所有executor 使用的cpu 核数 为 2 个 符合集群内存配置即可,具体情况具体分析。
–executor-cores 指定每个executor 使用的cpu 核数 符合集群内存配置即可,具体情况具体分析。
application-jar 打包好的应用 jar,包含依赖。这 个 URL 在集群中全局可见。 比如 hdfs:// 共享存储系统,如果是

Spark master及部署方式

我们在初始化SparkConf时,或者提交Spark任务时,都会有master参数需要设置,如下:

1
2
3
4
5
6
7
conf = SparkConf().setAppName(appName).setMaster(master)

sc = SparkContext(conf=conf)
/bin/spark-submit \
--cluster cluster_name \
--master yarn-cluster \
...

但是这个master到底是何含义呢?文档说是设定master url,但是啥是master url呢?说到这就必须先要了解下Spark的部署方式了。

我们要部署Spark这套计算框架,有多种方式,可以部署到一台计算机,也可以是多台(cluster)。我们要去计算数据,就必须要有计算机帮我们计算,当然计算机越多(集群规模越大),我们的计算力就越强。但有时候我们只想在本机做个试验或者小型的计算,因此直接部署在单机上也是可以的。Spark部署方式可以用如下图形展示:

img

Spark部署方式

下面我们就来分别介绍下。

Local模式

Local模式就是运行在一台计算机上的模式,通常就是用于在本机上练手和测试。它可以通过以下集中方式设置master。

  • local: 所有计算都运行在一个线程当中,没有任何并行计算,通常我们在本机执行一些测试代码,或者练手,就用这种模式。
  • local[K]: 指定使用几个线程来运行计算,比如local[4]就是运行4个worker线程。通常我们的cpu有几个core,就指定几个线程,最大化利用cpu的计算能力
  • local[*]: 这种模式直接帮你按照cpu最多cores来设置线程数了。

使用示例:

1
2
3
4
/bin/spark-submit \
--cluster cluster_name \
--master local[*] \
...

总而言之这几种local模式都是运行在本地的单机版模式,通常用于练手和测试,而实际的大规模计算就需要下面要介绍的cluster模式。

cluster模式

cluster模式肯定就是运行很多机器上了,但是它又分为以下三种模式,区别在于谁去管理资源调度。(说白了,就好像后勤管家,哪里需要资源,后勤管家要负责调度这些资源)

standalone模式

这种模式下,Spark会自己负责资源的管理调度。它将cluster中的机器分为master机器和worker机器,master通常就一个,可以简单的理解为那个后勤管家,worker就是负责干计算任务活的苦劳力。具体怎么配置可以参考Spark Standalone Mode
使用standalone模式示例:

1
2
3
4
/bin/spark-submit \
--cluster cluster_name \
--master spark://host:port \
...

–master就是指定master那台机器的地址和端口,我想这也正是–master参数名称的由来吧。

mesos模式

这里就很好理解了,如果使用mesos来管理资源调度,自然就应该用mesos模式了,示例如下:

1
2
3
4
/bin/spark-submit \
--cluster cluster_name \
--master mesos://host:port \
...

yarn模式

同样,如果采用yarn来管理资源调度,就应该用yarn模式,由于很多时候我们需要和mapreduce使用同一个集群,所以都采用Yarn来管理资源调度,这也是生产环境大多采用yarn模式的原因。yarn模式又分为yarn cluster模式和yarn client模式:

  • yarn cluster: 这个就是生产环境常用的模式,所有的资源调度和计算都在集群环境上运行。
  • yarn client: 这个是说Spark Driver和ApplicationMaster进程均在本机运行,而计算任务在cluster上。

使用示例:

1
2
3
4
/bin/spark-submit \
--cluster cluster_name \
--master yarn-cluster \
...

爬虫阶段

练习

点播课程 (51moot.cn)

腾讯招聘 (tencent.com)

【武汉二手丰田】武汉丰田二手车报价_武汉二手丰田价格|多少钱-人人车 (renrenche.com)

Scrapy爬虫步骤

1.创建一个scrapy项目

1
scrapy startproject mySpider   #mySpider是项目名字

2.生成一个爬虫

1
scrapy genspider itcast itcast.cn  #itcast是爬虫名字,"itcast.cn"限制爬虫地址,防止爬到其他网站

3.提取数据

1
完善spiders,使用xpath等方法

3.保存数据

1
pipelines  中保存数据

启动爬虫

1
scrapy crawl 爬虫名字    #crawl(抓取的意思)

启动爬虫不打印日志

1
scrapy crawl 爬虫名字 --nolog

run.py启动爬虫

1
2
from scrapy import cmdline
cmdline.execute("scrapy crawl lagou".split())

启动爬虫并保存

1
scrapy crawl 爬虫名字 --o data.csv

重写第一次请求

1
2
3
4
5
6
7
8
9
10
name = 'maoyan3'
allowed_domains = ['maoyan.com']
#重写start_requests()方法,把所有URL地址都交给调度器
def start_requests(self):
# 把所有的URL地址统一扔给调度器入队列
for offset in range(0, 91, 10):
url = 'https://maoyan.com/board/4?offset={}'.format(offset)
# 交给调度器
yield scrapy.Request(url=url,callback=self.parse)

POST请求

1
2
3
4
5
6
7
8
9
10
11
post_data= dict(
login="812******[email protected]",
password="******"
)
#表单请求
yield scrapy.FormRequest(
url=“http://www.xxx.xxx”,#请求地址
formdata=post_data,#传递的参数
meta={‘key’:value},
callback=self.处理方法#处理方法
)

scrapy shell

Scrapy shell是一个交互终端,我们可以在未启动spider的情况下尝试及调试代码,也可以用来测试XPath表达式

使用方法:

1
2
命令行输入:
scrapy shell http://www.itcast.cn/channel/teacher.shtml

常用参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
response.url:当前响应的url地址
response.request.url:当前响应对应的请求的url地址
response.headers:响应头
response.body:响应体,也就是html代码,默认是byte类型
response.body.decode():变为字符串类型
response.request.headers:当前响应的请求头
response.xpath("//h3/text()").extract():调试xpath,查看xpath可不可以取出数据

[s] Available Scrapy objects:
[s] scrapy scrapy module (contains scrapy.Request, scrapy.Selector, etc)
[s] crawler <scrapy.crawler.Crawler object at 0x000001AB6A39EF10>
[s] item {}
[s] request <GET https://movie.douban.com/top250>
[s] response <403 https://movie.douban.com/top250>
[s] settings <scrapy.settings.Settings object at 0x000001AB6ADAE0D0>
[s] spider <DefaultSpider 'default' at 0x1ab6b239d30>
[s] Useful shortcuts:
[s] fetch(url[, redirect=True]) Fetch URL and update local objects (by default, redirects are followed)
[s] fetch(req) Fetch a scrapy.Request and update local objects
[s] shelp() Shell help (print this help)
[s] view(response) View response in a browser

Settings.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#常见的设置

#项目名
BOT_NAME = 'yangguang'
#爬虫位置
SPIDER_MODULES = ['yangguang.spiders']
NEWSPIDER_MODULE = 'yangguang.spiders'

#遵守robots协议,robots.txt文件
ROBOTSTXT_OBEY = True
#下载延迟,请求前睡3秒
DOWNLOAD_DELAY = 3
# 默认请求头,不能放浏览器标识
DEFAULT_REQUEST_HEADERS = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en',
"Cookie": "rfnl=https://www.guazi.com/sjz/dazhong/; antipas=2192r893U97623019B485050817",
}
Cookie设置COOKIES_ENABLED默认为true,下次请求带上cookie
#项目管道,保存数据
ITEM_PIPELINES = {
'yangguang.pipelines.YangguangPipeline': 300, # 数值越小优先级越高
}

image-20210910151427212

存储数据

命令导出

您可以使用scrapy crawl myspider命令从命令行运行您的scraper 。如果要创建输出文件,则必须设置要使用的文件名和扩展名:

1
2
3
4
5
scrapy crawl myspider -o data.json 

scrapy crawl myspider -o data.csv

scrapy crawl myspider -o data.xml

Scrapy有自己的内置工具来生成json,csv,xml和其他序列化格式。如果要指定生成的文件的相对路径或绝对路径,或者从命令行设置其他属性,也可以执行此操作:

1
2
3
4
scrapy crawl reddit -s FEED_URI='/home/user/folder/mydata.csv' -s FEED_FORMAT=csv 

scrapy crawl reddit -s FEED_URI='mydata.json' -s FEED_FORMAT=json

使用Item Pipeline导出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

1、csv格式
import csv

在__init__方法中:
self.file = open('../moot2.csv','a',encoding='utf-8',newline='')
self.writer = csv.writer(self.file,delimiter=';')
self.writer.writerow(['表头1','表头2',.....])
在process_item中:
self.writer.writerow([item['title'],item['price'],......])
在close_spider中:
self.file.close()

2、json格式
import json
在__init__方法中:
self.file = open('../moot2.json','a',encoding='utf-8')
在process_item中 :
jsonstr = json.dumps(dict(item),ensure_ascii=False)
self.file.write(jsonstr+'\n')
在close_spider中:
self.file.close()

3、mysql存储
import pymysql
在__init__或者open_spider方法中:
self.conn = pymysql.connect(host='IP/localhost',user='root',password='123456',port=3306,db='数据库名')
self.cursor = self.conn.cursor()
在process_item中:
sql = "insert into tb_moot values(null,%s,%s,)"
self.cursor.execute(sql,(item['title'],item['price'],.......))
self.conn.commit()
在close_spider中,关闭数据库资源:
self.cursor.close()
self.conn.close()

正则表达式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
\d  一个数字
\w 数字字母下划线
.代表任意字符
\s 代表空白字符
\d{3} 3个数字
\d{3,6} 3到6位数字
+ 代表至少一个
* 号代表可有可无
? 代表可以没有,有只能1个



[…] 用来表示一组字符,单独列出,例如,[amk]匹配a,m或k
[^…] 不在[]中的字符,例如,[^abc]匹配除了a,b,c之外的字符
* 匹配0个或多个的表达式
+ 匹配1个或者多个的表达式
? 匹配0个或1个由前面的正则表达式定义的片段,非贪婪方式
{n} 精确匹配n次前面的表示
{n,m} 匹配n到m次由前面的正则表达式定义的片段,贪婪模式
a|b 匹配a或者b
() 匹配括号内的表达式,也表示一个组
\D
匹配任意非数字
\A
匹配字符串开始
\Z
匹配字符串结束,如果存在换行,只匹配换行前的结束字符串
\G
匹配最后匹配完成的位置
\n
匹配一个换行符
\t
匹配一个制表符
^
匹配字符串的开头
$
匹配字符串的末尾
.
匹配任意字符,除了换行符,当re.DOTALL标记被指定时,则可以匹配包括换行符在内的任意字符

Python:re模块

image-20210910144732628

Xpath

image-20210910144830498

1
2
3
4
5
6
7
//title/text():选择title标签内的文字。
//a/@href:选择a标签的href属性。
//a[contains(@href, "image")]/@href:选择a标签中href属性值包含image的a标签,再获取它的href属性
获取所有超链接中href中带有images的超链接内容。
mystr1 = Selector(text=ahtml).xpath('//ul[@id="list2"]/li//a[contains(@href,"images")]/@href').extract()
获取所有超链接页面的href地址
mystr1 = Selector(text=ahtml).xpath('//ul[@id="list2"]/li/div[2]/a/@href').extract()

数据清洗分析

使用IDEA打包

1) 右键项目 Open Module Settings
2)找到左侧 Artifacts,点击中间的 +号,选择JAR,然后选择“”“From Module With dependicies”
3)选择要执行的Main方法
4)点击idea上面的Builde菜单,选择Builder Artifacts。然后builder即可(如果改了代码,直接点击Rebuild)

MapReduce总结

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
JobClass类继承  Configured 实现 Tool 接口   // 实现Tool接口免去一些麻烦
在Mian 方法中:
创建Configuration 对象
int run = ToolRunner.run(configuration, new mainclass(), args);
System.exit(run);
run方法中:
创建Job对象 Job job = Job.getInstance(super.getConf(), "jobname");
//如果打包运行出错,则需要加该配置
job.setJarByClass(HFDSTest.class);
设置输入类,和输入路径
设置mapper类,和k,v类型
reduce类同上
设置输出类
job.waitForCompletion(true); 执行返回结果
创建Job对象

//分区: 分区数量需要和reduce数量对应 默认分区为1
job.setPartitionerClass(SumCountPartition.class);
job.setNumReduceTasks(4);
Partitioner<Text,FlowBean>
重写:getPartition() 返回序号
//序列化和排序 --自定义Key类型:
Writable 是 Hadoop 的序列化格式, Hadoop 定义了这样一个 Writable 接口. 一个类要支持可序列化只需实现这个接口即可
另外 Writable 有一个子接口是 WritableComparable, WritableComparable 是既可实现序列化, 也可以对key进行比较,我们这里可以通过自定义 Key 实现 WritableComparable 来实现我们的排序功能

自定义类实现Writable[Comparable]
write 序列化
readFields 反序列化
compareTo 排序
//规约:
在Map阶段做reduce阶段的任务
job.setCombinerClass(MyCombiner.class);
//分组:
1. 自定义类继承WritableComparator 类
2: 调用父类的有参构造
public OrderGroupComparator() {
super(OrderBean.class,true);
}
3:指定分组的规则(重写方法)
实现compare 方法:
public int compare(WritableComparable a, WritableComparable b) {
//3.1 对形参做强制类型转换
OrderBean first = (OrderBean)a; // K2的类型
OrderBean second = (OrderBean)b;
//3.2 指定分组规则
return first.getOrderId().compareTo(second.getOrderId());
}
4:设置job.setGroupingComparatorClass(CustomGroupingComparator.class) // 自定义类


//分区是根据key来决定哪些key-value被分到同一个reduce处理, 一个分区写出一个文件
//而分组是根据key来决定同一个reduce中的key-value在同一批次中进行处理,可以理解为reducebykey中 key分类规则

image-20210922210422059

可能会用到的方法:

1
2
3
4
setup(),此方法被MapReduce框架仅且执行一次,在执行Map任务前,进行相关变量或者资源的集中初始化工作。若是将资源初始化工作放在方法map()中,导致Mapper任务在解析每一行输入时都会进行资源初始化工作,导致重复,程序运行效率不高!


cleanup(),此方法被MapReduce框架仅且执行一次,在执行完毕Map任务后,进行相关变量或资源的释放工作。若是将释放资源工作放入方法map()中,也会导致Mapper任务在解析、处理每一行文本后释放资源,而且在下一行文本解析前还要重复初始化,导致反复重复,程序运行效率不高!

排序: 如果想要全局排序,吧reduce数设为1,可在map阶段进行一次排序,reduce进行归并二次排序

数据去重: –分组在reduce只取一个value输出

非法过滤+去重+计数: –map过滤 Counter计数 去重

统计平均值案例: – reduce累加处理

TopN案例: – 如果是每一类的TopN则只有在reduce处理

MapReduce的Join操作: – 正常处理 –缓存优化

MapReduce数据清洗+排序:

找共同好友案例:

1
2
3
4
5
6
compareTo 方法用于将当前对象与方法的参数进行比较。
如果指定的数与参数相等返回 0。
如果指定的数小于参数返回 -1。
如果指定的数大于参数返回 1。
例如: o1.compareTo(o2); 返回正数的话,当前对象(调用 compareTo 方法的对象 o1)要
排在比较对象(compareTo 传参对象 o2)后面,返回负数的话,放在前面

MapReduce中计数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//第一种 组名+计数器名
context.getCounter(groupName, counterName).increment(1);//参数类型为String
//第二种 Enum
context.getCounter(counterName).increment(1);//参数类型为Enum

//使用,改变计数器的值
increment(long incr)

public enum LogProcessorCounter {
//组名_名称
Test_Count;
}


//本次Job所有的计数器(包括MapReduce自带的和自定义的计数器)
Counters counters=Job.getCounters();

//获取指定计数器
Counter counter1 = counters.findCounter(groupName, counterName);
Counter counter2 = counters.findCounter(LogProcessorCounter.Test_Count);

//获取计数器的值
long value = counter1.getValue();

Java字符串处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
SimpleDateFormat sdf_1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
SimpleDateFormat sdf_2 = new SimpleDateFormat("yyyy-MM-dd HH:mm");
String dateStr = "2021-7-24 20:56:17";
Date date = sdf_1.parse(dateStr);
String newDateStr = sdf_2.format(date);

Calendar cal = Calendar.getInstance();
cal.setTime(date);
int year = cal.get(Calendar.YEAR);
int week = cal.get(Calendar.DAY_OF_WEEK);
System.out.println(year);
System.out.println(week);

String no = "110101197406313981";
System.out.println(no.substring(6,10));
System.out.println(no.substring(10,12));
System.out.println(no.substring(12,14));

String carno = "赣G 67EF2";
String newCarno = carno.replace(" ","-");
System.out.println("新的车牌号是:"+newCarno);

TopN

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
jack,88
rose,90
tom,45
cat,99
eclfsd,95
fsdf,79
e3r4,25
ppp,96
oip,65
iuu,37
adf,87

利用TreeMap进行key自然排序,
//TreeMap声明在map和reduce方法的外面。
TreeMap<Integer,String> treeMap = new TreeMap<Integer, String>();

map中存入数据:
treeMap.put(分数,名字);
if(treeMap.size() > 3){
treeMap.remove(treeMap.firstKey());
}

reduce中存入数据:
treeMap.put(key.get(),values.iterator().next().toString());

输出在cleanup方法中:
for(Integer score : treeMap.keySet()){
context.write(new IntWritable(score),new Text(treeMap.get(score)));
}

Join

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

1、创建一个工具类,实现Writable接口:
private String 标示字段
private String 数据字段
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(tag);
dataOutput.writeUTF(data);
}

public void readFields(DataInput dataInput) throws IOException {
this.tag = dataInput.readUTF();
this.data = dataInput.readUTF();
}

2、在mapper中判断当前读取的文件是哪一个:
// 核心不同文件 不同字段做Key
String filename=((FileSplit)context.getInputSplit()).getPath().getName();
if(filename.contains("customers")){
实体对象的标示设置
}
else{
实体对象的标示设置
}
输出(客户ID,自定义对象)

3、reduce里面:
循环values(里面装的是1条客户数据,和该客户对应的所有订单数据),分别取出存储。
客户使用字符串即可
订单使用List<String>

循环完毕,再次循环List组合数据输出到文件中即可


// 小表可缓存 提高效率 Map Join 适用于一张表十分小、一张表很大的场景

//给当前任务添加缓存文件
job.addCacheFile(new Path("D:\\mapreduce\\join\\customers.csv").toUri());

//客户ID为键,客户信息为值
HashMap<String,String> pdInfoMap = new HashMap<String,String>();
//在setup方法中提前讀取緩存文件,將客戶數據存儲到HashMap中
URI uri = context.getCacheFiles()[0];
//得到缓冲区阅读器
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(uri.getPath())));
String line=null;
while((line=br.readLine())!=null){
String[] fields = line.split(",");
pdInfoMap.put(fields[0],fields[1]+","+fields[2]);
}
br.close();

TreeMap按value排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected void cleanup(Context context) throws IOException, InterruptedException {
//輸出即可
List<Map.Entry<String, Integer>> list = new ArrayList<Map.Entry<String, Integer>>(treeMap.entrySet());
Collections.sort(list,new Comparator<Map.Entry<String, Integer>>() {
public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
return -o1.getValue().compareTo(o2.getValue());
}
});
int size = 0;
for (Map.Entry<String, Integer> e: list) {
context.write(new Text(e.getKey()),new IntWritable(e.getValue()));
size++;
if(size > 3){
break;
}
}
}

Hive

创建

1
2
3
4
5
6
7
8
9
10
11
12
create [external] table [if not exists] table_name ( 
col_name data_type [comment '字段描述信息']
col_name data_type [comment '字段描述信息'])
[comment '表的描述信息']
[partitioned by (col_name data_type,...)]
[clustered by (col_name,col_name,...)]
[sorted by (col_name [asc|desc],...
)
into num_buckets buckets]
[row format row_format]
[storted as ....]
[location '指定表的路径']
  1. create table
    创建一个指定名字的表。如果相同名字的表已经存在,则抛出异常;用户可以用 IF NOT
    EXISTS 选项来忽略这个异常。

  2. external
    可以让用户创建一个外部表,在建表的同时指定一个指向实际数据的路径
    (LOCATION),Hive 创建内部表时,会将数据移动到数据仓库指向的路径;若创建外部
    表,仅记录数据所在的路径,不对数据的位置做任何改变。在删除表的时候,内部表的
    元数据和数据会被一起删除,而外部表只删除元数据,不删除数据。

  3. comment
    表示注释,默认不能使用中文

  4. partitioned by
    表示使用表分区,一个表可以拥有一个或者多个分区,每一个分区单独存在一个目录下 .

  5. clustered by 对于每一个表分文件, Hive可以进一步组织成桶,也就是说桶是更为细粒
    度的数据范围划分。Hive也是 针对某一列进行桶的组织。

  6. sorted by
    指定排序字段和排序规则

  7. row format
    指定表文件字段分隔符

  8. storted as指定表文件的存储格式, 常用格式:SEQUENCEFILE, TEXTFILE, RCFILE,如果文件
    数据是纯文本,可以使用 STORED AS TEXTFILE。如果数据需要压缩,使用 storted as
    SEQUENCEFILE。

  9. location

    指定表文件的存储路径

查询

1
2
3
4
5
6
7
8
SELECT [ALL | DISTINCT] select_expr, select_expr, ...
FROM table_reference
[WHERE where_condition]
[GROUP BY col_list [HAVING condition]]
[CLUSTER BY col_list
| [DISTRIBUTE BY col_list] [SORT BY| ORDER BY col_list]
]
[LIMIT number]
  1. order by 会对输入做全局排序,因此只有一个reducer,会导致当输入规模较大时,需要
    较长的计算时间。
  2. sort by不是全局排序,其在数据进入reducer前完成排序。因此,如果用sort by进行排
    序,并且设置mapred.reduce.tasks>1,则sort by只保证每个reducer的输出有序,不保证全
    局有序。
  3. distribute by(字段)根据指定的字段将数据分到不同的reducer,且分发算法是hash散列。
  4. cluster by(字段) 除了具有distribute by的功能外,还会对该字段进行排序.
    因此,如果distribute 和sort字段是同一个时,此时, cluster by = distribute by +
    sort by

实用函数总结

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

日期比较函数: datediff() 第一个参数减第二个参数

日期格式: date_format date_format('2019-12-12','yyyy-MM') --日期字符串必须满足yyyy-MM-dd格式

If函数: if if(1=1,100,200) 可嵌套

条件判断函数: CASE CASE a WHEN b THEN c [WHEN d THEN e]* [ELSE f] END 如果a等于b,那么返回c;如果a等于d,那么返回e;否则返回f

字符串连接函数: concat

带分隔符字符串连接: concat_ws

字符串截取函数: substr,substring

去空格函数:trim

分割字符串函数: split 返回值: array

字段数据进行拆分: explode 将Map和Array拆分

LATERAL VIEW: LATERAL VIEW explode(split(goodid,','))goods as goods_id2 goods相当于一个虚拟表

Over窗口函数: Count(*) over(xxxx)

反射函数reflect: reflect reflect("java.lang.Math","max",col1,col2) 调用java中的自带函数,秒杀一切udf函数。

RANK、ROW_NUMBER、DENSE_RANK OVER的使用使用这几个函数,可以实现分组求topN

类型转换函数: cast select cast('1' as bigint) from tableName;


# SQL中字段保留两位小数:

1. 使用 Round() 函数,如 Round(number,2) ,其中参数2表示保留两位有效数字,但是只负责四舍五入到两位小数,但是不负责截断
例如 ROUND(3.141592653, 2) 结果为3.140000000;
2. 使用 Convert(decimal(10,2),number) 实现转换,其中参数2表示保留两位有效数字 例如Convert(decimal(10,2),3.1415) 结果为3.
3. 使用 cast(number as decimal(10,2)) 实现转换,其中参数2表示保留两位有效数字 例如cast(3.1415 as decimal(10,2)) 结果为3.14;
4. 备注:CAST与CONVERT都可以执行数据类型转换,且都默认实现了四舍五入
如果目标表的字段是decimal(10,4)型的,从源表查数据:select round(field2) from sourcetable;sql查询的结果字段的小数位可能是2位,但是该数据插入目标表,字段的小数位数就是4位后面2位以0补充;
select decimal(field,10,2) from sourcetable;该数据插入目标表,字段的小数位就是2位

#SQL中两个字段相除并加上%

select decimal(field1/field2*100,10,2) || '%' from tablename;
select round(field1/field2*100,2) || '%' from tablename;
select concat(round(field1/field2*100,2) ,'%') from tablename;

HiveUDF

1
2
3
4
5
6
7
8
9
hive-exec 添加依赖
1. 继承UDF
2. 实现evaluate方法(可重裁实现多个evaluate方法,以实现不同需求)
3. 导出类jar包,注意指定main方法

1. 将jar包添加到Hive: add jar linux_path # 0.14版才开始支持
2. 创建临时函数: create [temporary] function [if not exists] f_name classpath
删除临时函数: drop [temporary] function [if exists] f_name

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;

public class RMQuotes extends UDF{

  public Text evaluate(Text str){
if(str != null){
  return new Text(str.toString().replaceAll("\"", ""));
}else return null;
  }
  public static void main(String[] args) {
System.out.println(new RMQuotes().evaluate(new Text("\"hbhb\" \"GET /SSS/DDD/FFF?id=8 HTTP/1.1\"")));
  }
}

Spark总结

关于RDD

RDD不存储数据,数据流动处理.

spark中job,stage,task的关系

1、一个应用程序对应多个job,一个job会有多个stage阶段,一个stage会有多个task

2、一个应用程序中有多少个行动算子就会创建多少个job作业;一个job作业中一个宽依赖会划分一个stage阶段;同一个stage阶段中最后一个算子有多少个分区这个stage就有多少个task,因为窄依赖每个分区任务是并行执行的,没有必要每个算子的一个分区启动一个task任务。如图所示阶段2最后一个map算子是对应5个分区,reducebykey是3个分区,总共是8个task任务。

3、当一个rdd的数据需要打乱重组然后分配到下一个rdd时就产生shuffle阶段,宽依赖就是以shuffle进行划分的。

spark中job,stage,task的关系 - 简书 (jianshu.com)

基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
1)	从集合(内存)中创建 RDD
// 从集合中创建RDD,Spark 主要提供了两个方法:parallelize 和 makeRD
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)

val rdd1 = sparkContext.parallelize( List(1,2,3,4))
val rdd2 = sparkContext.makeRDD( List(1,2,3,4))
rdd1.collect().foreach(println)
底层代码实现来讲,makeRDD 方法就是parallelize 方法

2) 从外部存储(文件)创建RDD
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val fileRDD: RDD[String] = sparkContext.textFile("input")
fileRDD.collect().foreach(println)
sparkContext.stop()

3) 从其他 RDD 创建
...
4) 直接创建 RDDnew
使用 new 的方式直接构造RDD,一般由Spark 框架自身使用。

累加器


原理:day03笔记
1
2
3
4
5
6
7
8
9
 val source: RDD[String] = sc.textFile(
"hdfs://node02:8020/data/wordcount.txt")
val words: RDD[String] = source.flatMap { line => line.split(" ") }
val wordsTuple: RDD[(String, Int)] = words.map { word => (word, 1) }
val wordsCount: RDD[(String, Int)] = wordsTuple.reduceByKey { (x, y) => x + y }

// 3. 查看执行结果
wordsCount.collect().foreach(print)
}

工作原理

切片

使用textFile方法,有两个参数,第一个是文件路径,第二个是numpartitions。
如果我们不传第二个参数的话,minpartitions数就是采取默认的(用local指定的并行数和2取最小值)
如果我们传入第二个参数后,numpartitions会和参数保持一致。
,最终一步一步的传入到hadoop的切片处。

Spark分区原理和分区后数据的划分

RDD 的 Shuffle 和分区

Spark 的 Shuffle 发展大致有两个阶段: Hash base shuffleSort base shuffle

默认的分区数量是和 Cores 的数量有关的, 也可以通过如下几种方式修改或者重新指定分区数量

1
2
3
4
5
parallelize 设置
textFile 设置
通过 coalesce 算子指定
通过 repartition 算子指定 repartition 算子本质上就是 coalesce(numPartitions, shuffle = true)

RDD 的 Shuffle 是什么

让来自相同 Key 的所有数据都在 reduceByKey 的同一个 reduce 中处理, 需要执行一个 all-to-all 的操作, 需要在不同的节点(不同的分区)之间拷贝数据, 必须跨分区聚集相同 Key 的所有数据, 这个过程叫做 Shuffle.

底层逻辑

image-20211004134616791

  • Master Daemon

    负责管理 Master 节点, 协调资源的获取, 以及连接 Worker 节点来运行 Executor, 是 Spark 集群中的协调节点

  • Worker Daemon

    Workers 也称之为叫 Slaves, 是 Spark 集群中的计算节点, 用于和 Master 交互并管理 Executor.

    当一个 Spark Job 提交后, 会创建 SparkContext, 后 Worker 会启动对应的 Executor.

  • Executor Backend

    上面有提到 Worker 用于控制 Executor 的启停, 其实 Worker 是通过 Executor Backend 来进行控制的, Executor Backend 是一个进程(是一个 JVM 实例), 持有一个 Executor 对象

另外在启动程序的时候, 有三种程序需要运行在集群上:

  • Driver

    Driver 是一个 JVM 实例, 是一个进程, 是 Spark Application 运行时候的领导者, 其中运行了 SparkContext.

    Driver 控制 JobTask, 并且提供 WebUI.

  • Executor

    Executor 对象中通过线程池来运行 Task, 一个 Executor 中只会运行一个 Spark ApplicationTask, 不同的 Spark ApplicationTask 会由不同的 Executor 来运行

Stage 的划分是由 Shuffle 操作来确定的, 有 Shuffle 的地方, Stage 断开

数据可视化

jinjia2语法

1
2
3
4
5
6
7
{% for k in names %}
{% if k == 'rose' %}
<li style="color:Red">{{loop.index}}-{{k}}</li>
{% else %}
<li>{{loop.index}}-{{k}}</li>
{% endif%}
{% endfor%}

image-20210911100426118

image-20210911100431562

Sqlarchemy查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
sql = ‘select xx,xxfrom tb_xxwhere . group by . having… order by … ’
result = db.session.execute(sql).fetchall()

result = db.session.query(Product).all()
result = Product.query.all()

result = db.session.query(Product.p_name,Product.p_price).all()
result = Product.query.with_entities(Product.p_name,Product.p_price).all()

result = db.session.query(Product).filter(Product.p_price> 10,Product.p_addr=='武汉').order_by(Product.p_id.desc()).all()


result = db.session.query(func.count(Product.p_id)).scalar()

result = db.session.query(Product.p_addr,func.count(Product.p_addr)).group_by(Product.p_addr).all()


select s_addr,sum(case s_sexwhen‘男’ then 1 else 0 end) as nan,sum(case s_sexwhen '女' then 1 else 0 end) as nvfrom tb_studentgroup by s_addr

result = db.session.query(Student.s_addr,(func.sum(func.if_(Student.s_sex=='男',1,0))).label('nan'),(func.sum(func.if_(Student.s_sex=='女',1,0))).label('nv')).all()

sql = '''select cla.c_name,avg(sc_score) from tb_studentstu
inner join tb_classclaon stu.s_cid= cla.c_id
inner join tb_scorescon stu.s_id= sc.sc_sid
where sc_course= 'Java高级编程'
group by cla.c_name''‘
result = db.session.execute(sql).fetchall()

result = db.session.query(Student).join(ClassInfo,Student.s_cid==ClassInfo.c_id).join(Score,Student.s_id==Score.sc_sid).filter(Score.sc_course=='Java高级编程').with_entities(ClassInfo.c_name,func.avg(Score.sc_score)).group_by(ClassInfo.c_name).all()

Scala总结

一、基础知识:

1
2
3
4
5
6
7
8
9
10
11
12
1、常量和变量:尽量使用常量,可以避免很多问题
2、数据类型:
Any
AnyVal(等价于java的基本类型):Long、Int、StringOps、Unit
AnyRef(等价于java的引用类型):Java中的所有类到Scala都是AnyRef、
Scala中所有集合、定义的所有类、Null(只有一个null)
Nothing:没有对象也没有子类,所有类的子类
3、数据类型之间的转换:
AnyVal值类型转换:.toInt、.toLong
AnyRef类型的转换:.isInstanceof[]、.asInstanceof[]
4、流程控制
顺序、选择(有返回值)、循环(没有返回值)

二、函数式编程

img函数基本语法

高阶函数

  • 高阶函数可以接受函数作为参数,比如常见的有map,filter,reduce等函数,它们可以接受一个函数作为参数。
  • 高阶函数的返回值也可以是函数。
  • 高阶函数包含:作为值的函数、匿名函数、闭包、柯里化等等

1、作为值的函数

函数就像和数字、字符串一样,可以将函数传递给一个方法,如List的map方法,可以接收一个函数

1
2
val func:Int => String =  (num:Int) => "*" * num
println((1 to 10).map(func))

2、匿名函数

没有赋值给变量的函数就是匿名函数

1
2
3
(1 to 10).map(num => "*" * num)

(1 to 10).map("*" * _)

3、柯里化

将原先接受多个参数的方法转换为多个只有一个参数的参数列表的过程。

1
2
3
def add(x:Int)(y:Int) = {
x + y
}

相当于

1
2
3
4
  def add(x:Int) = {
(y:Int) => x + y
}
println(add(3)(4))

4、闭包

闭包其实就是一个函数,返回值依赖于声明在函数外部的一个或多个变量。

1
2
3
4
val y = 10
def add(x:Int): Int ={
x+y
}

三、面向对象

样例类和普通类的区别:

  • 样例类默认实现了序列化
  • 样例类使用的时候不需要new语法
  • 样例类可以进行模式匹配
  • 样例类实现了apply方法、toString方法、equals方法、hashCode方法、copy方法

四、集合

1、数组:定长数组(Array)、变长数据(ArrayBuffer)

2、列表:不可变列表(List)、可变列表(ListBuffer)

3、Set:

4、Map

  • 使用+=添加元素
  • 使用-=删除元素
  • 使用++=追加一个数组、列表

五、隐式转换

六、模式匹配

七、泛型

快速测试

语言实现

​ Java - StringTokenizer类

​ Scala - Spark基本方法都支持 方便快速测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 单词计数:将集合中出现的相同的单词,进行计数,取计数排名前三的结果 
val stringList = List("Hello Scala Hbase kafka", "Hello Scala Hbase", "Hello Scala", "Hello")
// 1) 将每一个字符串转换成一个一个单词
val wordList: List[String] = stringList.flatMap(str=>str.split(" "))
//println(wordList)
// 2) 将相同的单词放置在一起
val wordToWordsMap: Map[String, List[String]] = wordList.groupBy(word=>word) //println(wordToWordsMap)
// 3) 对相同的单词进行计数
// (word, list) => (word, count)
val wordToCountMap:Map[String,Int] = wordToWordsMap.map(tuple=>(tuple._1, tuple._2.size))
// 4) 对计数完成后的结果进行排序(降序)
val sortList: List[(String,Int)] = wordToCountMap.toList.sortWith {
(left, right) => {
left._2 > right._2
}
}
// 5) 对排序后的结果取前3名
val resultList: List[(String, Int)] = sortList.take(3)
println(resultList)

ScrapyShell

SparkShell