Hadoop-Zookeeper-Hive集群搭建

Hadoop3 :

(2条消息) hadoop 3.2.1集群高可用(HA)搭建_半斤米粉闯天下的博客-CSDN博客

(2条消息) Hadoop全面讲解(三)Hadoop+yarn高可用_束安的博客-CSDN博客

Zookeeper安装

服务器IP 主机名 myid的值
192.168.174.100 node01 1
192.168.174.110 node02 2
192.168.174.120 node03 3

第一步:下载zookeeeper的压缩包,下载网址如下

http://archive.apache.org/dist/zookeeper/

我们在这个网址下载我们使用的zk版本为3.4.9

下载完成之后,上传到我们的linux的/export/softwares路径下准备进行安装

第二步:解压

解压zookeeper的压缩包到/export/servers路径下去,然后准备进行安装

1
2
3
cd /export/software

tar -zxvf zookeeper-3.4.9.tar.gz -C ../servers/

第三步:修改配置文件

第一台机器修改配置文件

1
2
3
4
5
cd /export/servers/zookeeper-3.4.9/conf/

cp zoo_sample.cfg zoo.cfg

mkdir -p /export/servers/zookeeper-3.4.9/zkdatas/

vim zoo.cfg

1
2
3
4
5
6
7
8
9
dataDir=/export/servers/zookeeper-3.4.9/zkdatas
# 保留多少个快照
autopurge.snapRetainCount=3
# 日志多少小时清理一次
autopurge.purgeInterval=1
# 集群中服务器地址
server.1=node01:2888:3888
server.2=node02:2888:3888
server.3=node03:2888:3888

第四步:添加myid配置

在第一台机器的

/export/servers/zookeeper-3.4.9/zkdatas /这个路径下创建一个文件,文件名为myid ,文件内容为1

echo 1 > /export/servers/zookeeper-3.4.9/zkdatas/myid

第五步:安装包分发并修改myid的值

安装包分发到其他机器

第一台机器上面执行以下两个命令

scp -r /export/servers/zookeeper-3.4.9/ node02:/export/servers/

scp -r /export/servers/zookeeper-3.4.9/ node03:/export/servers/

第二台机器上修改myid的值为2

echo 2 > /export/servers/zookeeper-3.4.9/zkdatas/myid

第三台机器上修改myid的值为3

echo 3 > /export/servers/zookeeper-3.4.9/zkdatas/myid

第六步:三台机器启动zookeeper服务

三台机器启动zookeeper服务

这个命令三台机器都要执行

/export/servers/zookeeper-3.4.9/bin/zkServer.sh start

查看启动状态

/export/servers/zookeeper-3.4.9/bin/zkServer.sh status

eNode
dataNode
ResourceManager
NodeManager

Hadoop安装

第一步:上传apache hadoop包并解压

解压命令

1
2
cd /export/softwares
tar -zxvf hadoop-2.7.5.tar.gz -C ../servers/

img

第二步:修改配置文件

修改core-site.xml

第一台机器执行以下命令

1
2
cd  /export/servers/hadoop-2.7.5/etc/hadoop
vim core-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<configuration>

<property>
<name>fs.default.name</name>
<value>hdfs://node01:8020</value>
</property>

<property>
<name>hadoop.tmp.dir</name>
<value>/export/servers/hadoop-2.7.5/hadoopDatas/tempDatas</value>
</property>

<!-- 缓冲区大小,实际工作中根据服务器性能动态调整 -->
<property>
<name>io.file.buffer.size</name>
<value>4096</value>
</property>

<!-- 开启hdfs的垃圾桶机制,删除掉的数据可以从垃圾桶中回收,单位分钟 -->
<property>
<name>fs.trash.interval</name>
<value>10080</value>
</property>
</configuration>
修改hdfs-site.xml

第一台机器执行以下命令

1
2
cd  /export/servers/hadoop-2.7.5/etc/hadoop
vim hdfs-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
<configuration>
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>node01:50090</value>
</property>

<property>
<name>dfs.namenode.http-address</name>
<value>node01:50070</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:///export/servers/hadoop-2.7.5/hadoopDatas/namenodeDatas,file:///export/servers/hadoop-2.7.5/hadoopDatas/namenodeDatas2</value>
</property>
<!-- 定义dataNode数据存储的节点位置,实际工作中,一般先确定磁盘的挂载目录,然后多个目录用,进行分割 -->

<property>
<name>dfs.datanode.data.dir</name>
<value>file:///export/servers/hadoop-2.7.5/hadoopDatas/datanodeDatas,file:///export/servers/hadoop-2.7.5/hadoopDatas/datanodeDatas2</value>
</property>

<property>
<name>dfs.namenode.edits.dir</name>
<value>file:///export/servers/hadoop-2.7.5/hadoopDatas/nn/edits</value>
</property>

<property>
<name>dfs.namenode.checkpoint.dir</name>
<value>file:///export/servers/hadoop-2.7.5/hadoopDatas/snn/name</value>
</property>

<property>
<name>dfs.namenode.checkpoint.edits.dir</name>
<value>file:///export/servers/hadoop-2.7.5/hadoopDatas/dfs/snn/edits</value>
</property>

<property>
<name>dfs.replication</name>
<value>3</value>
</property>


<property>
<name>dfs.permissions</name>
<value>false</value>
</property>

<property>
<name>dfs.blocksize</name>
<value>134217728</value>
</property>

</configuration>
修改hadoop-env.sh

第一台机器执行以下命令

1
2
cd  /export/servers/hadoop-2.7.5/etc/hadoop
vim hadoop-env.sh
1
export JAVA_HOME=/export/servers/jdk1.8.0_141
修改mapred-site.xml

第一台机器执行以下命令

1
2
cd  /export/servers/hadoop-2.7.5/etc/hadoop
vim mapred-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<configuration>

<property>
<name>mapreduce.job.ubertask.enable</name>
<value>true</value>
</property>


<property>
<name>mapreduce.jobhistory.address</name>
<value>node01:10020</value>
</property>

<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>node01:19888</value>
</property>

</configuration>

修改yarn-site.xml

第一台机器执行以下命令

1
2
cd  /export/servers/hadoop-2.7.5/etc/hadoop
vim yarn-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<configuration>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>node01</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>

<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604800</value>
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>20480</value>
</property>
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>2048</value>
</property>
<property>
<name>yarn.nodemanager.vmem-pmem-ratio</name>
<value>2.1</value>
</property>

</configuration>
修改mapred-env.sh

第一台机器执行以下命令

1
2
cd  /export/servers/hadoop-2.7.5/etc/hadoop
vim mapred-env.sh
1
export JAVA_HOME=/export/servers/jdk1.8.0_141
修改slaves

修改slaves文件,然后将安装包发送到其他机器,重新启动集群即可

第一台机器执行以下命令

1
2
cd  /export/servers/hadoop-2.7.5/etc/hadoop
vim slaves
1
2
3
node01
node02
node03

第一台机器执行以下命令

1
2
3
4
5
6
7
8
mkdir -p /export/servers/hadoop-2.7.5/hadoopDatas/tempDatas
mkdir -p /export/servers/hadoop-2.7.5/hadoopDatas/namenodeDatas
mkdir -p /export/servers/hadoop-2.7.5/hadoopDatas/namenodeDatas2
mkdir -p /export/servers/hadoop-2.7.5/hadoopDatas/datanodeDatas
mkdir -p /export/servers/hadoop-2.7.5/hadoopDatas/datanodeDatas2
mkdir -p /export/servers/hadoop-2.7.5/hadoopDatas/nn/edits
mkdir -p /export/servers/hadoop-2.7.5/hadoopDatas/snn/name
mkdir -p /export/servers/hadoop-2.7.5/hadoopDatas/dfs/snn/edits

安装包的分发

第一台机器执行以下命令

1
2
3
cd  /export/servers/
scp -r hadoop-2.7.5 node02:$PWD
scp -r hadoop-2.7.5 node03:$PWD

第三步:配置hadoop的环境变量

三台机器都要进行配置hadoop的环境变量

三台机器执行以下命令

1
vim  /etc/profile
1
2
export HADOOP_HOME=/export/servers/hadoop-2.7.5
export PATH=:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH

配置完成之后生效

1
source /etc/profile

分发Hadoop

scp -r xxx xxx

第四步:启动集群

要启动 Hadoop 集群,需要启动 HDFS 和 YARN 两个模块。
注意: 首次启动 HDFS 时,必须对其进行格式化操作。 本质上是一些清理和
准备工作,因为此时的 HDFS 在物理上还是不存在的。

hdfs namenode -format 或者 hadoop namenode –format

准备启动

第一台机器执行以下命令

1
2
3
4
5
cd  /export/servers/hadoop-2.7.5/
bin/hdfs namenode -format
sbin/start-dfs.sh
sbin/start-yarn.sh
sbin/mr-jobhistory-daemon.sh start historyserver

三个端口查看界面

http://node01:50070/explorer.html#/ 查看hdfs

http://node01:8088/cluster 查看yarn集群

http://node01:19888/jobhistory 查看历史完成的任务

Hive 的安装

这里我们选用hive的版本是2.1.1
下载地址为:
http://archive.apache.org/dist/hive/hive-2.1.1/apache-hive-2.1.1-bin.tar.gz

下载之后,将我们的安装包上传到第三台机器的/export/softwares目录下面去

第一步:上传并解压安装包

将我们的hive的安装包上传到第三台服务器的/export/softwares路径下,然后进行解压

1
2
cd /export/softwares/
tar -zxvf apache-hive-2.1.1-bin.tar.gz -C ../servers/
第二步:安装mysql

已安装跳过

第一步:在线安装mysql相关的软件包

yum install mysql mysql-server mysql-devel

第二步:启动mysql的服务

/etc/init.d/mysqld start

第三步:通过mysql安装自带脚本进行设置

/usr/bin/mysql_secure_installation

第四步:进入mysql的客户端然后进行授权

grant all privileges on *.* to 'root'@'%' identified by '123456' with grant option;

flush privileges;

第三步:修改hive的配置文件

修改hive-env.sh

1
2
cd /export/servers/apache-hive-2.1.1-bin/conf
cp hive-env.sh.template hive-env.sh
1
2
HADOOP_HOME=/export/servers/hadoop-2.7.5
export HIVE_CONF_DIR=/export/servers/apache-hive-2.1.1-bin/conf

修改hive-site.xml

1
2
cd /export/servers/apache-hive-2.1.1-bin/conf
vim hive-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://node03:3306/hive?createDatabaseIfNotExist=true&amp;useSSL=false</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
<property>
<name>datanucleus.schema.autoCreateAll</name>
<value>true</value>
</property>
<property>
<name>hive.server2.thrift.bind.host</name>
<value>node03</value>
</property>
</configuration>
第四步:添加mysql的连接驱动包到hive的lib目录下

hive使用mysql作为元数据存储,必然需要连接mysql数据库,所以我们添加一个mysql的连接驱动包到hive的安装目录下,然后就可以准备启动hive了

将我们准备好的mysql-connector-java-5.1.38.jar 这个jar包直接上传到
/export/servers/apache-hive-2.1.1-bin/lib 这个目录下即可

至此,hive的安装部署已经完成,接下来我们来看下hive的三种交互方式

第五步:配置hive的环境变量

node03服务器执行以下命令配置hive的环境变量

1
sudo vim /etc/profile
1
2
export HIVE_HOME=/export/servers/apache-hive-2.1.1-bin
export PATH=:$HIVE_HOME/bin:$PATH

Sqoop安装

总结

一、Hadoop环境搭建
1、伪分布式
1)解压hadoop包 tar -zxvf hadoop….2.6.0.gz -C /usr/local/src
2) cd /usr/local/src mv hadoop.2.6.0… hadoop
3) 配置环境变量:
vim /root/.bash_profile
export HADOOP_HOME=/usr/local/src/hadoop
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
source /root/.bash_profile
4) 输入 hadoop version查看是否成功
5)开始修改hadoop相关配置文件(/usr/local/src/hadoop/etc/hadoop/)
a> hadoop-env.sh 改jdk环境变量位置
b> core-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
<configuration>
<!-- 配置hadoop中Namenode的地址,mymaster为hostname主机名 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://mymaster:9000</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<!-- 该路径自己指定,一般设置为hadoop下的data/tmp,该目录会自
动创建,标识hadoop运行时产生文件的存储目录 -->
<value>/opt/temp/data/tmp</value>
</property>
</configuration>
            c>  hdfs-site.xml
            d>  mapred-site.xml(需要先复制一个出来)
            e>  yarn-site.xml
    6) 配置完毕之后,格式化namenode   
            hdfs namenode -format
   7)  启动hadoop
          a》简单粗暴型:start-all.sh     停止:stop-all.sh
          b》单独启动namenode和datanode、resourcemanager
                   start-dfs.sh
                   start-yarn.sh
          c》单一节点启动:
                    namenode   ----  hadoop-daemon.sh start namenode
                    datanode   ---      hadoop-daemon.sh start datanode
   8)启动完毕之后,使用浏览器访问虚拟机中的IP对应点 hadoop文件系统界面:
        http://192.168.xx.xxx:50070
2、分布式
          删掉刚才存储元数据的文件夹
         1) hdfs-site.xml修改副本数量,如3
         2)  slaves文件,修改datanode的节点
         3)将hadoop完全拷贝至其他机器上
         4)环境变量文件也需要拷贝,并且source一下
         5)格式化hadoop
         6)启动
 测试:在本地创建一个文本文件,随便放入一些单词,以空格分开
      a》将本地文件上传至已经启动的hdfs中   hdfs dfs -put /本地目录/本地文件.txt  /
      b》运行mapreduce示例(/usr/local/src/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar)    
      hadoop jar hadoop-mapreduce-examples-2.6.0.jar wordcount /hello.txt /test1
      注意:/hello.txt是hdfs中的文件路径和名称
                /test1是mapreduce结果输出到hdfs的目录,该目录存在则直接报错,所以该目录不能存在

3、zookeeper(集群之间进行协调管理,同步,故障检测等作用)
1)解压zookeeper到/usr/local/src目录下,重命名
2)配置环境变量,并且生效
3)配置文件: zoo_sample.cfg拷贝一个
cp zoo_sample.cfg zoo.cfg
修改内容:
dataDir=/usr/local/src/zookeeper/zkdata/data (自己创建的数据存放目录)
dataLogDir=/usr/local/src/zookeeper/zkdata/log (自己创建的日志目录)
server.1=主机名1:2888:3888 (集群配置)
server.2=主机名2:2888:3888 (集群配置)
server.3=主机名3:2888:3888 (集群配置)
4)在刚才配置的dataDir目录下新建一个 名为myid的文件,vim myid
在里面写上数字编号:1
5)将zookeeper远程拷贝至其他机器上。
6)修改拷贝后其他机器上zookeeper中myid的编号
7)将环境变量也同步拷贝到其他机器上,并且在其他机器上source一下
8)分别在集群机器上启动zookeeper:
启动: zkServer.sh start
查看状态: zkServer.sh status
停止: zkServer.sh stop

4、高可用

Hadoop总结

day01

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132

大数据的数据单位:
1T = 1024G
1P = 1024T
1E = 1024P


1:服务器
服务器也是一台计算机,是一台高性能的计算机
高处理能力(硬件: 可热插拔磁盘,冗余配置的CPU,强大的网络处理能力)
高扩展性 (硬盘和CPU)
高可靠性 (高可用(HA))

2:硬盘
机械硬盘
固态硬盘:
NOR Flash(韩国三星)
Nand Flash(韩国三星)
高纯硅(日本)
光刻机(荷兰:AMSL)

3:交换机
IP地址:可以从逻辑上唯一的定位网络中的一台主机
MAC地址:从物理上可以唯一的定位一台主机
流控 :控制数据流速度 128Kbit/s

4:局域网
局域网:
城域网:
广域网:(全球互联网)


机架感知:

5:配置MAC和IP
MAC地址的路径: vim /etc/udev/rules.d/70-persistent-net.rules
IP地址修改路径: vim /etc/sysconfig/network-scripts/ifcfg-eth0

6:Zookeer的概述
1:Zookeeper是一个分布式的协调框架(协调人)
2:Zookeeper是一个集群(好多台主机都安装Zookeeper软件,形成一个整体)
3:Zookeeper集群对外提供一个文件系统:
/
app1 app2
4:Zookeeper中的每一个目录或者文件被称为znode
5:Zookeeper中的znode既具有文件的特性,又具有目录的特性
znode可以存数据
znode可以有子节点

7:Zookeeper的架构
1:Zookeeper是一个主从架构
老大: Leader 通过选举得到,如果leader挂掉,则重新选举
小弟: Follower
Observer
2:Zookeeper的操作
事务性操作: 写操作
非事务性操作:读操作
3:架构组件的分工
Leader :
1:负责集群整理的管理工作
2:负责发起投票
3:负责处理事务性操作和非事务性操作
Follower:
1:负责非事务性操作
2:如果遇到事务性操作,则转发给Leader
3:参与投票
Observer:
1:负责非事务性操作
2:如果遇到事务性操作,则转发给Leader
3:不能参与投票

8:Zookeeper的应用
1:发布和订阅
2:命名服务
让集群对外提供一个唯一的访问路径,通过这个访问路径可以找到某个节点文件
3:分布式协调/通知
心跳机制
4:分布式锁
写锁: 独占锁 同一时刻只能有一个进程可以得到这个锁
读锁: 共享锁 同一时刻,可以有多个进程共享这个锁
5:分布式队列
如果在集群中有多个任务需要按照一定的顺序执行,则可以将这些任务放入分布式队列中
A -----> B -------->C

9:Zookeeper的选举机制
1:zxid 该数值越大表示数据越新
2:myid 每个Zookeeper集群中,主机的编号
3:选举时有个过半机制

10:Zookeeper的数据模型
1:Zookeeper中存储的数据大小一般不会超过1M
2:Zookeeper中访问数据只能使用绝对路径
3:每个 Znode 由 3 部分组成:
stat:此为状态信息, 描述该 Znode 的版本, 权限等信息
data:与该 Znode 关联的数据
children:该 Znode 下的子节点

4:Znode的节点类型
PERSISTENT:永久节点
EPHEMERAL:临时节点
PERSISTENT_SEQUENTIAL:永久节点、序列化
EPHEMERAL_SEQUENTIAL:临时节点、序列化
5:Znode的节点操作
1:客户端登录znode
bin/zkCli.sh -server node02:2181 #方式1
bin/zkCli.sh #方式2
2:命令操作
永久节点
create /hello world
永久序列化
create -s /hello world
临时节点
create -e /hello world
临时序列化
create -s -e /hello world

创建子节点:
create /hello/aaa world #临时节点不能创建子节点
修改节点数据
set /hello zookeeper
删除节点, 如果要删除的节点有子Znode则无法删除
delete /hello
删除节点, 如果有子Znode则递归删除
rmr /abc
列出历史记录
histroy

获取节点属性:
get /hello
ephemeralOwner = 0x36d14c88edf0001 #临时节点
ephemeralOwner = 0x0 #永久节点

day02

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
1:Hadoop介绍
1:Hadoop基于Nutch项目,起源于谷歌的两篇论文
GFS(谷歌文件系统) ---->解决分布式存储问题 ---->HDFS(Hadoop 分布式文件系统)
MapReduce------>解决分布式计算问题
2:Hadoop的定义
狭义:
HDFS 分布式存储
MapReduce 分布式计算
Yarn 分布式资源调度系统
广义:
生态圈
Hadoop,MapReduce,Hive,flume,azkaban,impala,oozie,hue,spark,flink
storm,Hbase,kafka
2:Hadoop版本
版本:
1.x
2.x 引入yarn
3.x 最新的,引入了多namenode
发行公司:
apache版:
优势: 版本更新快
缺点: 版本之间兼容性差
cloudera版:
基于apache版的Hadoop
优势: 版本兼容性好
缺点: 商业版要收费
3:Hadoop的架构
3.1 1.x 架构
分布式存储(HDFS)
主节点:
namenode
辅助节点:
SecondaryNamenode
从节点:
datanode

分布式计算:
主节点:
JobTracker
从节点:
TaskTracker

3.1 2.x 架构
分布式存储(HDFS)
主节点:
namenode
辅助节点:
SecondaryNamenode
从节点:
datanode

分布式计算:
主节点:
ResourceManger
从节点:
NodeManager

4:HDFS
Hadoop Distributed File System 分布式文件系统
HDFS特点:
1:高延时(慢):
2:高吞吐量(P级) :
3:hdfs中文件的数据不能随机访问(HBase)
4:hdfs不适合存储小文件(小于128M)
1个文件: 1条元数据信息
100个1K文件 ---->100条元数据 ---> 15000字节
100K ----->1条元数据 ---> 150字节

5:hdfs可以有很强的扩展能力

day03

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
1:Hadoop适合的应用场景
1:高吞吐量,高延时
2:一次写入,多次读取
3:具有高扩展性和容错性
2:Hadoop不适合的应用场景
1: 需要低延时
2: 不适合存储小文件
1个文件--->一条元数据--->150字节---->namenode的内存中
1T文件
每一个文件是一个字节: 1024*1024*1024*1024*1024个
每一个文件1T: 1个
3: 不支持数据的任意修改和读写


3:hdfs的组成
1:Client:就是客户端。
a:文件切分文件上传 HDFS 的时候,Client 将文件切分成一个个的Block,然后进行存储。
b:与 NameNode 交互,获取文件的位置信息。
c:与 DataNode 交互,读取或者写入数据。
d:Client 提供一些命令来管理 和访问HDFS(hdfs dfs -put a.txt /)
2:NameNode:就是 master,它是一个主管、管理者。
a:管理 HDFS 的名称空间,对外提供一个统一的访问路径
b:管理数据块(Block)映射信息,告诉客户端,一个文件的每一个切片(block)所在的位置
c:配置副本策略,每个副本存放在哪台主机
d:处理客户端读写请求
3:DataNode:就是Slave。NameNode 下达命令,DataNode 执行实际的操作。
a:存储实际的数据块。
b:执行数据块的读/写操作
c:不断的向namenode发送心跳包和汇报自己的block信息
4:Secondary NameNode
a:对namenode做辅助性的操作,不能替代namenode
b:SecondaryNameNode可以定期的将fsimage文件和edits文件进行合并,合并一个新的fsimage,并且
替换原来的fsimage,减轻namenode的压力
合并的触发条件
1: 时间 一个小时
2: 文件大小 edits文件达到64M

4:HDFS的block和副本机制
1:每一个block默认是128M,如果不够128M,也称为一个block,所以block是一个逻辑单位
2:每一个block默认是3个副本
3:每一个block的副本存放是根据机架感知来实现的

5:HDFS命令
hdfs dfs -ls /
hdfs dfs -put a.txt / #将文件复制到hdfs
hdfs dfs -put /root/a.txt /root/

hdfs dfs -mkdir /dir1
hdfs dfs -mkdir -p /dir1/dir11 #递归创建文件夹
hdfs dfs -moveFromLocal a.txt /dir1/dir11 #将文件移动到hdfs

hdfs dfs -get /a.txt ./ #将文件下载到当前目录

hdfs dfs -mv /dir1/a.txt /dir2 #将hdfs的文件移动到hdfs的另外一个路径
hdfs dfs -rm /a.txt #删除一个文件(删除文件之后移动到hdfs的垃圾桶,七天之后自动删除)
hdfs dfs -rm -r /dir1 #递归删除一个文件夹(删除文件之后移动到hdfs的垃圾桶)

hdfs dfs -cp -p /dir1/a.txt /dir2/b.txt #将hdfs的某个文件拷贝到hdfs的另外一个路径(深度拷贝)

hdfs dfs -cat /a.txt #查看hdfs文件的内容
hdfs dfs -chmod -R 777 / #修改hdfs文件或者文件夹(加-R参数)权限


hdfs dfs -appendToFile a.xml b.xml /big.xml #将linux本地的文件合并之后上传到hdfs

hdfs dfs -appendToFile /export/servers/hadoop-2.7.5/etc/hadoop/*.xml /big.xml


6:HDFS限额配置
6.1 文件个数限额
hdfs dfs -count -q -h /user/root/dir1 #查看配额信息
hdfs dfsadmin -setQuota 2 dir #设置N个限额数量,只能存放N-1个文件
hdfs dfsadmin -clrQuota /user/root/dir #清除个数限额配置

6.2 文件的大小限额
在设置空间配额时,设置的空间至少是block_size * 3(384M)大小
hdfs dfs -count -q -h /user/root/dir1 #查看配额信息
hdfs dfsadmin -setSpaceQuota 384M /user/root/dir # 限制空间大小384M

dd if=/dev/zero of=1.txt bs=1G count=1 #生成1G的文件
hdfs dfsadmin -clrSpaceQuota /user/root/dir #清除空间限额配置

7:HDFS的安全模式
1:启动HDFS时,会自动进入安全模式
加载hdfs的元数据,保证数据的完整性
2:在HDFS的工作中,发现BLOCK副本率(当前的副本数/设置的副本数)不达标,则自动进入安全模式
保证Block的副本率达标,保证数据的完整性
3:在安全模式状态下,文件系统只接受读数据请求,而不接受删除、修改等变更请求

安全模式的操作命令:
hdfs dfsadmin -safemode get #查看安全模式状态
hdfs dfsadmin -safemode enter #进入安全模式
hdfs dfsadmin -safemode leave #离开安全模式

8:HDFS的基准测试
在搭建好HDFS集群之后,需要对HDFS的性能进行测试

cd /export/servers
hadoop jar /export/servers/hadoop-2.7.5/share/hadoop/mapreduce/hadoopmapreduce-client-jobclient-2.7.5.jar TestDFSIO -write -nrFiles 10 -fileSize 10MB
8.1 写入性能测试
#向HDFS文件系统中写入数据,10个文件,每个文件10MB,文件存放到hdfs的/benchmarks/TestDFSIO中
#写完之后,会在当前目录生成一个TestDFSIO_results.log文件,该文件就是测试报告
hadoop jar /export/servers/hadoop-2.7.5/share/hadoop/mapreduce/hadoopmapreduce-client-jobclient-2.7.5.jar TestDFSIO -write -nrFiles 10 -
fileSize 10MB
8.2 读取性能测试
#从HDFS文件系统中读入10个文件,每个文件10M
#写完之后,会在当前目录生成一个TestDFSIO_results.log文件,该文件就是测试报告
hadoop jar /export/servers/hadoop-2.7.5/share/hadoop/mapreduce/hadoopmapreduce-client-jobclient-2.7.5.jar TestDFSIO -read -nrFiles 10 -
fileSize 10MB


100M * 3 = 300M

day04 HDFS-API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
1:在Windows下配置Hadoop的运行环境
第一步:将hadoop2.7.5文件夹拷贝到一个没有中文没有空格的路径下面
第二步:在windows上面配置hadoop的环境变量: HADOOP_HOME,并将%HADOOP_HOME%\bin添加到path中
第三步:把hadoop2.7.5文件夹中bin目录下的hadoop.dll文件放到系统盘:C:\Windows\System32 目录
第四步:关闭windows重启

2:获取FileSystem的方式
@Test
public void getFileSystem2() throws Exception{
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"),
new Configuration()); //new Configuration()该对象会自动读取haoop的xml配置文件
System.out.println("fileSystem:"+fileSystem);
}
3:HDFS的API操作
/*
创建文件夹和文件
*/
@Test
public void mkdirs() throws Exception{
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"),new Configuration());
//boolean mkdirs = fileSystem.mkdirs(new Path("/hello/mydir/test")); //递归创建文件夹
/*
1:如果父目录不存在,则会自动创建
2:创建文件时,文件所属用户是你windows下的用户
*/
boolean mkdirs = fileSystem.create(new Path("/hello/mydir/test"));
fileSystem.close();
}

4:HDFS文件流的获取
//获取hdfs文件的输入流--->读取hdfs文件--->下载
FSDataInputStream inputStream = fileSystem.open(new Path("/a.txt"));
//获取hdfs文件的输出流--->向hdfs文件写数据
FSDataOutputStream outputStream = fileSystem.create(new Path("/a.txt"));


//文件上传:
fileSystem.copyFromLocalFile(new Path("D://set.xml"), new Path("/"));
//文件下载:
fileSystem.copyToLocalFile(new Path("/a.txt"), new Path("D://a4.txt"));

5:HDFS的权限问题
1:如果要让hdfs的权限生效,则需要修改hdfs-site.xml文件,修改如下:
<property>
<name>dfs.permissions</name>
<value>true</value>
</property>
2:伪造用户:以某一个用户的身份去访问 //过滤器 Filter
//最后一个参数root,就是以root身份去访问
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration(),"root");

6:HDFS的高可用(HA)
1:HDFS高可用是有2个namenode,一个是Active状态,一个是Standby状态
2:Zookeeper用来解决两个namenode的单点故障问题,Journal Node用来保证两个namenode元数据的同步

7:MapReduce的思想
MpReduce运行在yarn之上
阶段划分:
Map阶段: 负责将一个大的任务划分成小的任务,小任务之间不能有依赖关系
Reduce阶段: 负责将Map阶段的结果进行汇总


8:MapReduce的八个步骤
Map阶段2个步骤
1. 设置 InputFormat 类, 将数据切分为 Key-Value(K1和V1) 对, 输入到第二步
2. 自定义 Map 逻辑(自己写代码), 将第一步的结果转换成另外的 Key-Value(K2和V2) 对, 输出结果
Shuffle 阶段 4 个步骤
3. 分区(Partition)
4. 排序(Sort)
5. 规约(Combiner)
6. 分组(Group By)
Reduce阶段2个步骤
7. 自定义Reduce逻辑(自己写代码)将新的K2和V2转为新的 Key-Value(K3和V3)输出
8. 设置 OutputFormat 处理并保存 Reduce 输出的K3和V3 数据
1
2
运行模式
分区 集群和本地

day06

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
1:序列化和反序列化
序列化:
将一个内存的对象转为字节流保存到磁盘上,或者通过网络发送出去
反序列化:
从磁盘读取一个对象,或者从网络中接收一个对象

MapReduce并没有使用Java的序列化方式,认为Java的序列化方式态臃肿,自定提出了一套序列化方式

分区:
继承:Partitioner类
从写:getPartition方法,返回分区编号
在主类中设置:
job.setPartitionerClass(FlowPartition.class);
job.setNumReduceTasks(4); // 分区个数 对应Reduce


Java中序列化:
实现Serializable接口,不需要重写方法
MapReduce中序列化:
实现WritableComparable,实现了该接口,不仅可以实现序列化,还可以实现排序
public interface WritableComparable<T> extends Writable, Comparable<T> {
void write(DataOutput out); //实现序列化
void readFields(DataInput in); //实现反序列化

int compareTo(T var1); //实现排序
}


//如果你在Mapreduce中写一个JavaBean,则该JavaBean必须实现Writable接口

//如果你的JavaBean要实现排序,则必须WritableComparable

规约(Combiner):
1:是在每一个MapTask之后,加上一个局部的聚合
2:就相当于在每一个map之后进行了一次Reduce操作
3:通过Combiner可以减少Map到Reduce之间数据的传输量
注意:
1:不是所有的MR程序都可以使用聚合
2:Combiner只是来优化MR的执行,不能去改变最终的结果
实现步骤:
1:写一个MyCombiner类去继承Reducer类,重写reduce方法(也可以直接拷贝自定义Reducer类)
2:在JobMain中设置自定义Combiner
job.setCombinerClass(MyCombiner.class);

在MR中自定义的JavaBean必须实现序列化(实现Writable接口)


分组:
1. 继承WritableComparator 类
2: 调用父类的有参构造
public OrderGroupComparator() {
super(OrderBean.class,true);
}
3:指定分组的规则(重写方法)
compare 方法
public int compare(WritableComparable a, WritableComparable b) {
//3.1 对形参做强制类型转换
OrderBean first = (OrderBean)a; // K2的类型
OrderBean second = (OrderBean)b;
//3.2 指定分组规则
return first.getOrderId().compareTo(second.getOrderId());
}

简单总结

在主类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
继承  Configured 实现 Tool 接口
在Mian 方法中:
创建Configuration 对象
int run = ToolRunner.run(configuration, new mainclass(), args);
System.exit(run);
run方法中:
创建Job对象 Job job = Job.getInstance(super.getConf(), "jobname");
//如果打包运行出错,则需要加该配置
job.setJarByClass(HFDSTest.class);
设置输入类,和输入路径
设置mapper类,和k,v类型
reduce类同上
设置输出类
job.waitForCompletion(true); 执行返回结果
创建Job对象

分区:
job.setPartitionerClass(SumCountPartition.class);
job.setNumReduceTasks(4);
Partitioner<Text,FlowBean>
重写:getPartition() 返回序号
排序/序列化:
实现Writable[Comparable]
write 反序列化
readFields 序列化
compareTo 排序

规约:
在Map阶段做reduce阶段的任务
job.setCombinerClass(MyCombiner.class);
分组:
1. 继承WritableComparator 类
2: 调用父类的有参构造
public OrderGroupComparator() {
super(OrderBean.class,true);
}
3:指定分组的规则(重写方法)
compare 方法
public int compare(WritableComparable a, WritableComparable b) {
//3.1 对形参做强制类型转换
OrderBean first = (OrderBean)a; // K2的类型
OrderBean second = (OrderBean)b;
//3.2 指定分组规则
return first.getOrderId().compareTo(second.getOrderId());
}
4:设置job.setGroupingComparatorClass(CustomGroupingComparator.class)

day07

day08

1
2
3
4
5
6
7
8
9
10
11
12
自定义输入输出:
输入继承 FileInputFormat
RecordReader 返回 自定义RecordReader<>对象

:initialize()初始化方法
//获取文件的切片
fileSplit= (FileSplit)inputSplit;
//获取Configuration对象
configuration = taskAttemptContext.getConfiguration();
:nextKeyValue()该方法用于获取K1和V1
自定义输入:
outputformat,改写其中的recordwriter<>,改写具体输出数据的方法write()

Hive

1
2
3
4
5
1:在hive中,一个数据库就是一个文件夹,一张表也是一个文件夹
2:向表中插入数据时,这些数据会生成表文件数据,表文件字段之间默认的分隔符是 '\001'
3:create table stu3 as select * from stu2; 拷贝表时,表的表文件数据要也一起拷贝
4:删除内部表,会将元数据和表数据一起删除
5:外部表表示共有,内部表示私有
1
2
3
4
5
6
7
8
9
10
6:给表添加数据:
方式1: 在linux
vim student.txt
hdfs dfs -put student.txt /user/hive/warehouse/mytest3.db/student
方式2: 在hive中
本地加载:表文件原来是在Linux本地,加载之后,做的复制操作
load data local inpath '/export/servers/hivedatas/student.csv' [overwrite] into table student;
方式3:
hdfs加载:表文件原来是在hdfs上,加载之后,做的剪切操作
load data inpath '/hivedatas/student.csv' into table student;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
7:分区表
1:分区表就是分文件夹,就是对大数据进行分类管理
2:创建分区表
创建单个分区
create table score(s_id string,c_id string, s_score int) partitioned by(month string)
row format delimited fields terminated by '\t';
创建多个分区
create table score(s_id string,c_id string, s_score int) partitioned by(year string,month string,day string)
row format delimited fields terminated by '\t';
3:给分区添加数据
给单个分区添加数据: 创建单级文件夹
load data local inpath '/export/servers/hivedatas/score.csv' into table
score partition (month='201806');
给多个分区添加数据:创建多级文件夹
load data local inpath '/export/servers/hivedatas/score.csv' into table
score partition (year='1999',month='12',day='23');
4:分区表的查询
查询单分区表
select * from score where month = '201801';
select * from score where month = '201801' or month = '201802';
select * from score where month = '201801' union all select * from score where month = '201802'
查询多分区表:
select * from score where year='1999' and month = '12' and day = '01';


5:表模型
1:外部的分区表
create external table score(id int ,name string ,score int) partitioned by(month string)
row format delimited fields terminated by '\t' location '/scoredatas'
2:内部的分区表
create table score(id int ,name string ,score int) partitioned by(month string)
row format delimited fields terminated by '\t' location '/scoredatas'
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
8:分桶表
1:分桶表就是分文件,就是对大数据进行分类管理


加载表数据:stu;
方式1:
hdfs dfs -put /a.txt /stu
方式2:
load data local inpath '/a.txt' into table /stu;
方式3(从hdfs加载):
load data inpath '/a.txt' into table /stu;
方式4:
create external table stu() ...... location '/scoredatas';
msck repair table stu;
方式5:
create external table stu as select * from A;

hive 删库报错问题
在 hive bin 目录下执行下面这句话
./schematool -dbType mysql -initSchema


+-----------+------------+----------+--+
| col_name | data_type | comment |
+-----------+------------+----------+--+
| id | int | |
| name | string | |
+-----------+------------+----------+--+

create table if not exists stu3(id int ,name string) row format delimited fields terminated by '\t' location '/user/stu2';

day10

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
1:select查询
order by
对全局的结果进行整体排序,要求只能有一个Reduce
sort by
对每一个Reduce的结果进行局部排序,不能保证整体有序
distributed by
可以将原文件以某个字段作为分区的条件,将文件分到不同的区中,类似于MR中分区
cluster by(字段)
既可以分区,又可以对每一个分区的数据排序
除了具有distribute by的功能外,还会对该字段进行排序
如果distribute 和sort字段是同一个时,此时,
cluster by = distribute by + sort by

2:正则表达式
/*
第1位: 1
第2位: 3 4 5 7 8
剩下9位: 0-9

*/
String regex = "1[34578][0-9]{9}"; //指定规则
String phoneNum = "13888888888"; //字符串
boolean bl = phoneNum.match(regex);
if(bl){
//你输入的手机号合法
}else{
//你输入的手机号非法
}

3:连表查询
3.1 内连接(求两张表的交集)
隐式内连接
select * from A a ,B b where a.id = b.id;
显式内连接
select * from A a inner join B b on a.id = b.id;
3.2 外连接
左外连接
select * from A a left join B b on a.id = b.id;
右外连接
select * from A a right join B b on a.id = b.id;


四张表联合查询:
select * from teacher t
left join course c on t.t_id = c.t_id
left join score s on s.c_id = c.c_id
left join student stu on s.s_id = stu.s_id;
4:hive的排序
order By(全局排序 ) !!!!!!!!!!!
//排序,先按照s_id升序排序,在按照avg降序排列
select s_id ,avg(s_score) avg from score group by s_id order by s_id ,avg desc;
sort By(对每一个Reduce局部排序)
第一步:设置Reduce个数
set mapreduce.job.reduces=3;
第二步:查询成绩按照成绩降序排列
select * from score sort by s_score
第三步:将查询后的结果保存到本地文件中
insert overwrite local directory '/export/servers/hivedatas/sort' select *
from score sort by s_score;
distributed By 和 sort by !!!!!!!!!!!!
第一步:设置Reduce个数
set mapreduce.job.reduces=7;
第二步:通过distribute by 进行数据的分区
insert overwrite local directory '/export/servers/hivedatas/sort' select *
from score distribute by s_id sort by s_score;
cluster by
1:先分区,然后对分区的数据排序
2: cluster By id 等价于 distributed By id + sort by id
5:Hive的参数配置
如果在运行hive时,需要给hive设置参数,则有三种方式
方式1: 配置文件
在hive安装目录conf/hive-site.xml 文件中配置
方式2:命令行参数
bin/hive -hiveconf hive.root.logger=INFO,console
方式3:参数声明
bin/hive
set mapreduce.job.reduces=7;

设置级别优先级:
参数声明 > 命令行参数 > 配置文件参数(hive)
6:Hive的内置函数
查看所有的内置函数:
show functions;
查看某个函数的详细用法:
desc function extended upper;
常用的内置函数:
#字符串连接函数:concat
select concat("hello","world","java"); //结果 helloworldjava
#带分隔符字符串连接函数: concat_ws
select concat_ws(',','hello','world','java');//结果 hello,world,java
#cast类型转换
select cast(1.5 as int); //结果 1
#get_json_object(json 解析函数,用来处理json,必须是json格式)
select get_json_object('{"name":"jack","age":"20"}','$.name'); //结果 jack
#URL解析函数
select parse_url('http://facebook.com/path1/p.php?k1=v1&k2=v2#Ref1','HOST'); // facebook.com
select parse_url('http://facebook.com/path1/p.php?k1=v1&k2=v2#Ref1','PATH'); // path1/p.php
select parse_url('http://facebook.com/path1/p.php?k1=v1&k2=v2#Ref1','QUERY'); // k1=v1&k2=v2
select parse_url('http://facebook.com/path1/p.php?k1=v1&k2=v2#Ref1','QUERY',"k1"); // v1

7:Hive的自定义函数
UDF:一进一出
upper abs ceil floor
UDAF:多进一出
max min avg concat
UDTF:一进多出
"1,zhangsan,18" ---> 1 zhangsan 18 //将行转列
lateral view explore()
8:Hive的压缩
gzip bzip2 snappy
1:设置Map端压缩
意义:减少map和reduce之间数据传输量
1)开启hive中间传输数据压缩功能
set hive.exec.compress.intermediate=true;
2)开启mapreduce中map输出压缩功能
set mapreduce.map.output.compress=true;
3)设置压缩的方式
set mapreduce.map.output.compress.codec= org.apache.hadoop.io.compress.SnappyCodec;
4)测试
select count(1) from score;

2:设置Reduce端压缩
可以减少磁盘空间
1)开启hive最终输出数据压缩功能
set hive.exec.compress.output=true;
2)开启mapreduce最终输出数据压缩
set mapreduce.output.fileoutputformat.compress=true;
3)设置mapreduce最终数据输出压缩方式
set mapreduce.output.fileoutputformat.compress.codec =org.apache.hadoop.io.compress.SnappyCodec;
4)设置压缩类型为块压缩
set mapreduce.output.fileoutputformat.compress.type=BLOCK;
5)测试
set mapreduce.job.reduces = 7;
insert overwrite local directory '/export/servers/snappy' select * from
score distribute by s_id sort by s_id desc;

9:Hive的存储方式
9.1 行存储(TEXTFILE, SEQUENCEFILE)
特点:
1:select * from A 效率高
2:select a from A 效率低
3:如果进行数据压缩则效率低
9.2 列存储(ORC , PARQUET)
1:select * from A 效率低
2:select a from A 效率高
3:如果进行数据压缩则效率高
10:压缩方式和存储方式结合
1:创建一个指定存储格式的表
create table log_text (
track_time string,
url string,
)ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE / SEQUENCEFILE /ORC /PARQUET;
2:三种存储格式的压缩比和查询速度
压缩比:
ORC > Parquet > TextFile
查询速度:
ORC > TextFile > Parquet
3:压缩方式和存储方式结合
create table log_text (
track_time string,
url string,
)ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE / SEQUENCEFILE /ORC /PARQUET tblproperties ("orc.compress"="NONE/SNAPPY/GZIP/BZIP2");;
4:结论
存储方式: 选择 ORC和PARQUET
压缩方式: 选择 SNAPPY和BZIP2
11:Hive的调优
11.1 Fetch抓取
1)设置参数所有查询都走MapReduce
set hive.fetch.task.conversion=none;
测试:select s_score from score limit 3;
2)设置参数所有查询都不走MapReduce
set hive.fetch.task.conversion=more;
测试:select * from score;

11.2 本地模式
当计算任务不是特别大时,可以将所有的计算任务在本机完成,可以提高执行效率
1)开启本地模式,并执行查询语句
set hive.exec.mode.local.auto=true;
select * from score cluster by s_id;

2)关闭本地模式,并执行查询语句
set hive.exec.mode.local.auto=false;
select * from score cluster by s_id;
11.3 Map端join
当小表join大表时,则Hive默认会开启Map端join
11.4 Hive中的规约
在数据超过一定量时,则可以在map端完成聚合,就是规约
1)是否在Map端进行聚合,默认为True
set hive.map.aggr = true; //无需设置
2)在Map端进行聚合操作的条目数目
set hive.groupby.mapaggr.checkinterval = 100000; //默认就是100000
3)有数据倾斜的时候进行负载均衡(默认是false)
set hive.groupby.skewindata = true; //需要设置
11.5 去重求统计值(count(distinct))
原始做法:select count(distinct s_id) from score;
改进做法:select count(s_id) from (select id from score group by s_id) a
12:动态分区

MapReduce总结

MapReduce思想:

核心:

分而治之,先分在和

应用场景:

复杂任务,没有依赖,以并行提供处理效率

脉络体现:

​ 先map后reduce

​ map:把复杂的任务拆分成任务,局部进行计算,得出局部结果

​ reduce:把map的局部结果进行全局汇总,得到最终结果

MapReduce设计构思:

如何进行大数据处理?

​ 先分在合,分而治之

抽象俩个函数模型:

输入输出都是以kv键值段

​ map:把复杂的任务拆分成任务,局部进行计算,得出局部结果

​ reduce:把map的局部结果进行全局汇总,得到最终结果

把这么做和做什么进行拆分:

​ 程序负责复杂这么做(技术)

​ 用户负责做什么(业务)

以上俩者合并起来才是完整的MR程序

MapReduce框架结构和编程规范:

代码层面:

类继承Mapper 重写map()—–负责map阶段的业务

类继承Reduce 重写reduce()—-负责reduce阶段的业务逻辑

客户端运行的主类(main)—–指定mr相关属性,提交程序

将以上三个打包为jar包

运行角度:

MapTask:map阶段运行的task

ReduceTask:reduce阶段运行的task

MapReduceApplictionMaster(MrAppMaster):程序运行的主体,监督各task运行和mr程序的运行 ,负责跟yarn进行资源

案例WordCount:

环境开发版本问题:

​ Apache 2.7.4 优化了CDH2.6.0本地执行环境

数据类型和序列化机制:

Writable(接口) 认为java序列化机制臃肿 不利于大数据网络传递

重点:(MR执行流程):

img

序列化机制:

序列化机制概念:

​ 进程网络间传递数据 数据变成字节流

Writable:

​ 序列化方法:write(out)

​ 反序列化:readField(in)

注意:先序列化,后反序列化

自定义排序:

本质(CompareTo):

​ 0:相等

​ 正数:大于

​ 负数:小于

注意:谁大谁在后

倒序排序:

​ 欺骗程序 :欺骗 大—>负数 小—>正数

对象实现接口:

​ Compareable | WritableCompareable

自定义分区:

分区定义:

​ 决定了map的输出key value在哪一个reduceTask上

默认分区规则:

​ HashPartitioner(key.hashcode % NumReduceTasks)

实现自定义分区:

​ 继承Partition类 重写getPartitions 该方法返回值就是分区的标号值

让自定义分区生效:

​ job.setPartitionClass()

分区个数和reduceTask个数的关系:

​ 应该保持相等

​ 分区个数多 报错 非法分区

​ 分区个数少 执行 空文件产生

Combiner(归约):

​ 局部聚合组件 把每一个map的输出先进行局部聚合

​ 优化了IO网络

​ 本身就是reduce 只是范围小 不是全局

​ 默认不是开启的

注意:慎重使用:因为顺序 个数在最终的结果 会发生变化。

并行度机制:

概念:所谓的并行度,指的是多个同时工作

maptask并行度(逻辑切片 归约):文件大小 个数 切片大小

reducetask并行度:代码设置 涉及全局计数 慎重使用

shuffle机制:

概念:是一个过程

从map输出数据开始到reduce接受数据作为输入之前

横跨了map reduce 阶段 中间横跨网络 是mr程序的核心 是执行效率最慢的原因。

数据压缩:

压缩目的:减少网络传输数据量,减少最终磁盘所占空间

压缩机制:

​ map输出压缩:(影响网络传输的数量)

​ redcue的输出压缩:(磁盘所占的空间)

压缩算法:

​ 推荐用:snappy

​ 取决于Hadoop是否支持该压缩

​ 检查是否支持本地库:hadoop chechnative

​ 最好结合Hadoop编译 支持一部分压缩算法。

压缩的设置方式:

​ 直接在map程序中 通过conf.set()—–只对本mr有效

​ 修改xml配置文件 mapred-site.xml—–全局有效

优化参数:

​ 包括:资源,容错,稳定性等——Hadoop官网api xxx.default.xml(查找弃用属性–Deprecated Properties)

大小文件之间的关联操作—(hive大小表之间的join(结合))

​ 把所有的数据以关联的字段作为key发送到同一个reduce处理

​ 弊端:reduce join 压力大 可能发生数据倾斜

​ 在map阶段完成数据之间的关联

​ map join 没有reduce阶段(numreducetask(0))part-m-00000

分布式缓存:

​ 可以把指定的文件(压缩包 jar ) 发生给当下程序的每一个maptask

setup初始化方法:

​ 把缓存的小文件加载到当前maptask运行的程序内存中

​ 创建各种不同的数据集合类型 保存小文件数据

处理小文件场景:

​ 默认切片机制:–>一个小文件一个切片—->一个切片一个maptask

​ CombineTextInputFormat:切片机制

​ 小文件:

img

自定义分分组:

发生阶段:

​ 调用reduce()方法之前

默认分组:

​ 排好序的数据,根据前后俩个key是否相等(相等 或者 不相等)

自定义对象作为key:

​ WritableComparator分组继承的类 注意:WritableComparable 排序实现接口

​ 它是用来给Key分组的

​ 它在ReduceTask中进行,默认的类型是GroupingComparator也可以自定义

​ WritableComparator为辅助排序手段提供基础(继承它),用来应对不同的业务需求

​ 比如GroupingComparator(分组比较器)会在ReduceTask将文件写入磁盘并排序后按照Key进行分组,判断下一个key是否相同,将同组的Key传给reduce()执行

自定义分组生效:

​ job.setGroupingComparatorClass(OrderGroupingComparator.class);

IDEA配置Maven报错无法解析问题解决

在settings.xml文件加入镜像

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<!-- 阿里云镜像 -->
<mirror>
<id>alimaven</id>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/repositories/central/</url>
<mirrorOf>central</mirrorOf>
</mirror>

<!-- junit镜像地址 -->
<mirror>
<id>junit</id>
<name>junit Address/</name>
<url>http://jcenter.bintray.com/</url>
<mirrorOf>central</mirrorOf>
</mirror>

确保如下设置正确

img

img

如果还出现同样的错误:

在File → setting下的 Maven → Importing ,如下图,加一句话

1
2
-Dmaven.wagon.http.ssl.insecure=true -Dmaven.wagon.http.ssl.allowall=true
该配置用于忽略ssl证书的验证

img

1
-Dmaven.wagon.http.ssl.insecure=true -Dmaven.wagon.http.ssl.allowall=true -Dmaven.wagon.http.ssl.ignore.validity.dates=true

img

记得先把仓库里面之前缓存的文件全部清除,然后右键项目,Maven→Reimport 重新下载

img

问题完美解决

Linux一些遇到的坑和笔记

关闭SElinux

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
查看SELinux状态:
1、/usr/sbin/sestatus -v ##如果SELinux status参数为enabled即为开启状态
SELinux status: enabled
2、getenforce ##也可以用这个命令检查

关闭SELinux:
1、临时关闭(不用重启机器):
setenforce 0 ##设置SELinux 成为permissive模式
##setenforce 1 设置SELinux 成为enforcing模式

2、修改配置文件需要重启机器:
修改/etc/selinux/config 文件
将SELINUX=enforcing改为SELINUX=disabled
重启机器即可

这里遇到这个问题实在是不应该,别修改错了,注意不是SELINUXTPYE

CentOS7关闭防火墙

1
2
3
4
5
查看:systemctl status firewalld.service

关闭:systemctl stop firewalld.service

永久关闭:systemctl disable firewalld.service

SSH

1
2
3
4
5
6
生成 ssh-keygen -t rsa
node1 操作:
ssh-copy-id node01
# 复制到其他主机
scp /root/.ssh/authorized_keys node02:/root/.ssh
scp /root/.ssh/authorized_keys node03:/root/.ssh

Mysql的安装

Centos7 环境下

1
2
3
4
5
6
# 下载并安装MySQL官方的 Yum Repository
wget -i -c http://dev.mysql.com/get/mysql57-community-release-el7-10.noarch.rpm
# 使用上面的命令就直接下载了安装用的Yum Repository,大概25KB的样子,然后就可以直接yum安装了。
yum -y install mysql57-community-release-el7-10.noarch.rpm
# 之后就开始安装MySQL服务器。
yum -y install mysql-community-server
1
2
3
4
5
MySQL数据库设置
# 启动MySQL:
systemctl start mysqld.service
# 查看MySQL运行状态:
systemctl status mysqld.service
1
2
# 查看默认密码
grep "password" /var/log/mysqld.log

详细查看:https://www.cnblogs.com/luohanguo/p/9045391.html

重启Hadoop集群

1
2
3
4
5
sbin/stop-dfs.sh
sbin/stop-yarn.sh

sbin/start-dfs.sh
sbin/start-yarn.sh

Flask项目随手笔记

项目结构

image-20201027173842515

image-20201027174011656

蓝图

image-20201027174114900

ORM数据库操作

image-20201027175536009

1**、针对flask框架的web项目**

html文件写在templates文件夹里面

css文件还有静态资源(例如图片)放在static文件夹里面,直接访问 localhost:80/static/a.jpg

应用程序根目录是根据初始化app=Flask(name)的时候的代码在哪就决定了哪里是根目录

更改flask的默认设置静态资源位置:

static_folder

img

\1. app = Flask(name, static_folder=’hehe’)

\2. # http://127.0.0.1:5000/hehe/haha/a.png

\3. app = Flask(name, static_folder=”hehe/haha”)

\4. # http://127.0.0.1:5000/haha/a.png

static_url_path:

前端访问资源文件的前缀目录。默认是/static,就是前端必须这样访问:
我们改成 ‘’,就可以这样访问了:。就达到前端从根目录访问的目的了。

\1. app = Flask(name, static_folder=”hehe”,static_url_path=””)

\2. # http://127.0.0.1:5000/haha/a.png

2**、**url_for()

这个函数有很多的功能,是根据函数名反向生成路由url

\1. @app.route(‘/test’)

\2. def tt():

\3. return url_for(“tt”) # /test

3****redirect

功能就是跳转到指定的url,大部分情况下,我们都是和url_for一起使用的,例如:

\1. @app.route(‘/‘)

\2. def hello_world():

\3. return ‘Hello World’

\4.

\5.

\6. @app.route(‘//‘)

\7. def hello(name):

\8. if name == ‘Harp’:

\9. return ‘Hello %s’ % name

\10. else:

\11. return redirect(url_for(‘hello_world’))

在hello这个视图函数中,如果url传入的参数是Harp(即请求的网址是http://127.0.0.1:5000/Harp/),则返回'Hello Harp’,其他情况则重定向到hello_world这个视图函数对应的网址’/‘。

4**、获取input标签的用户名和密码**

方式一:**request.form[‘username’]**

img

下面指定的username和password是和上面的form中的name是一致的

img

方式二:**request.args[‘username’]**

如果要通过url来传送数据 , https://localhost:5000/login?username=jenrey

就要用下面这种方式获取参数值

img

5**、写cookie:**

img

img

6****@app.errorhandler(400)

我们不需要指定路由了, 只需要指定状态码,这样出现错误的时候,他就会自己响应到所对应的页面了。

img

img

errorhandler捕捉当前app或蓝图的状态码,并进行自定制处理

app_errorhandler捕捉全局状态码,并进行自定制异常处理

7**、产生依赖引用文档:**

C:\Users\Desktop\testflask>pip freeze > requirements.txt

指令执行完毕会产生一个文档,他会把我们用到的包列入到文档内

img

requirements这个文件名其实是python约定俗成的用法,当时我们使用这个文件名,pycharm会自动帮我们识别我们装了哪些包没装哪些包

8**、安装依赖引用文档的包:**

img

9**、**flask_script

先安装模块

pip install flask_script

flask_script 作用:可以通过命令行的形式来操作Flask,例如通过命令跑一个开发版本的服务器、设置数据库、定时任务等

代码中导入包:

img

img
直接把flask对象app传入进去

整体代码如下

img

python manage.py runserver #**启动服务**

img

img

flask-script官网:

img

具体的@manager.command用法看下面

10**livereload:**

先安装模块:

pip install livereload

img

img

img

用**@manager.command**来,上面就传入了dev的参数

11****@app.template_filter() 把filter注册到模板

我们只要在函数上声明这个装饰器,就可以把这个filter注册到模板

自定义Markdown过滤器:

img

img

img

12**@app.context_processor上下文处理器**

直接把方法注册到模板中。上下文处理器本身会返回一个字典的函数。其装饰的函数返回的内容对所有html模板都起作用。

img

img

13****@app.template_test 自定义测试函数

img

img

14****flash 消息闪现

导入flash

img

img

img

需要在配置文件中设置 SECRET_KEY 数值可以随意,其实就是秘钥,这样能保证数据的安全,因为我们的session是在客户端的,所以设置完SECRET_KEY后就可以通过系统自己的算法来进行验证数据安全的问题了。

img

img

这里看起来是个列表的形式,所以我们可以多次使用flash()

img

img

img

消息闪现的消息分类:

img

img

with和endwith是限制变量作用域的,errors只会在其之间有效果。原来我们不加with,变量的作用域是在block中的,加了之后就变成with中了。

15**、开启DEBUG调试模式**

有多种方法来开启debug模式:

1. 在app.run()中添加参数,变为app.run(debug=True);

2. 在run之前增加app.debug = True;

3. 新建config.py文件,在config文件中添加DEBUG = True,然后在程序中引入app.config.from_object(config);

4. 在run之前增加app.config[‘DEBUG’] = True;

建议使用第3种方式,其中还可以写入以下信息

\1. SECRET_KEY

\2. SQLALCHEMY_DB

\3. APP_ROOT

16****render_template 渲染静态的html文件,也能传递参数给html

作用:render_template不仅能渲染静态的html文件,也能传递参数给html

在templates文件夹建立一个html文件,内容随便写一点如下:

\1.

\2.

\3.

\4.

\5. Index

\6.

\7.

\8.

This is index page

\9.

\10.

send_static_file**:**

我们可以使用Flask对象app的send_static_file方法,使视图函数返回一个静态的**html**文件

但现在我们不使用这种方法,

而是使用flask的render_template函数,它功能更强大。
从flask中导入render_template,整体代码如下:

\1. from flask import Flask, render_template

\2. import config

\3.

\4. app = Flask(name)

\5. app.config.from_object(config)

\6.

\7.

\8. @app.route(‘/‘)

\9. def index():

\10. return render_template(‘index.html’)

\11.

\12. if name == ‘main‘:

\13. app.run()

render_template函数会自动templates文件夹中找到对应的html,因此我们不用写完整的html文件路径。用浏览器访问’/‘这个地址,显示结果如下:

img

使一个html模板根据参数的不同显示不同的内容,这是因为flask使用了jinja2这个模板引擎。要使用模板,在**render_template参数中以key=value形式传入变量,在html中使用**来显示传入的变量,例如:

\1. # 视图函数

\2. @app.route(‘/‘)

\3. def index():

\4. return render_template(‘index.html’, contents=’This is index page’)

\5.

\6. # html

\7.

\8.

\9.

\10.

\11. Index

\12.

\13.

\14.

\15.

\16.

浏览器显示的结果与上文是一样的。我们还可以直接把一个类的实例传递过去,并在模板中访问类的属性,例如假设一个类对象obj有a和b属性,关键部分的代码如下:

\1. # 视图函数中

\2. return render_template(‘index.html’, object=obj)

\3.

\4. …

\5. # html**中

\6.

a:

\7.

b:

传入一个字典也可以,并且在模板中既可以用dict[key],也可以用dict.key。

17**、ORM与**SQLAlchemy

以MySQL数据库为例,平时我们会用mysqldb(python 2)或者pymysql(python 3)去操作MySQL数据库,但这种方法也是需要自己编写SQL语句的。现在我们有了ORM模型,简单来说,ORM是把数据库中的表抽象成模型,表的列名对应模型的属性,这样我们可以调用类的属性或方法去获得数据库中的数据。例如假设MySQL数据库中有一张表名为table1,使用SELECT * FROM table1 WHERE id=1获取id为1的数据,如果将表table1映射成ORM模型Table,那么可以直接使用Table.query.filter(id=1),这样操作简单了很多,也很利于理解。

SQLAlchemy就是一个这样的ORM,我们可以直接安装flask_sqlalchemy来使用

在配置文件config.py中填写好数据库的连接信息:

\1. HOST = “127.0.0.1”

\2. PORT = “3306”

\3. DB = “harp”

\4. USER = “root”

\5. PASS = “Your Password”

\6. CHARSET = “utf8”

\7. DB_URI = “mysql+pymysql://{}:{}@{}:{}/{}?charset={}”.format(USER, PASS, HOST, PORT, DB, CHARSET)

\8. SQLALCHEMY_DATABASE_URI = DB_URI

SQLAlchemy**依赖mysqldb或者pymysql去连接数据库和执行SQL语句,**因为我们用的是python 3,所以需要在配置信息中指明使用pymysql,如果是python 2可以省略,默认是使用mysqldb。

建表

\1. from flask_sqlalchemy import SQLAlchemy

\2. from datetime import datetime

\3.

\4. import config

\5.

\6. app = Flask(name)

\7. app.config.from_object(config)

\8.

\9. db = SQLAlchemy(app)

\10.

\11.

\12. class Users(db.Model):

\13. tablename = ‘users_info’

\14. id = db.Column(db.Integer, primary_key=True, autoincrement=True)

\15. username = db.Column(db.String(32), nullable=False)

\16. password = db.Column(db.String(100), nullable=False)

\17. register_time = db.Column(db.DateTime, nullable=False, default=datetime.now())

\18.

\19.

\20. db.create_all()

tablename这个属性就是建表后,数据库生成的表名。primary_key=True说明该字段为主键,autoincrement=True代表自增长,nullable决定是否可为空,default代表默认值。最后用db.create_all()来实现创建。

进入数据库,输入desc user_info;,我们发现表已经建立好了,其结构图如下:

img

插入数据

\1. @app.route(‘/‘)

\2. def index():

\3. user = Users(username=’Harp’, password=’123456’)

\4. db.session.add(user)

\5. db.session.commit()

\6. return render_template(‘home.html’)

代码实例化一个Users的对象user,传入username和password,使用db.session.add(user)将其加入到数据库的session(可以理解为事务)中,然后使用db.session.commit()提交。我们运行程序,然后用浏览器访问,浏览器正常显示了结果,这时再看一眼数据库,发现这条数据已经写入到了数据库:

img

查询、修改数据

\1. @app.route(‘/‘)

\2. def index():

\3. user = Users.query.filter(Users.id == 1).first() #**查找

\4. print(user.username)

\5. user.username = ‘Harp1207’ #**修改

\6. db.session.commit() #**修改后需提交

\7. print(user.username)

\8. return render_template(‘home.html’)

外键关联

users_info表(Users模型)代码如下:

\1. class Users(db.Model):

\2. tablename = ‘users_info’

\3. id = db.Column(db.Integer, primary_key=True, autoincrement=True)

\4. username = db.Column(db.String(32), nullable=False)

\5. password = db.Column(db.String(100), nullable=False)

\6. register_time = db.Column(db.DateTime, nullable=False, default=datetime.now())

\7. # 我们新增了一个avatar_path字段来存用户头像图片文件的路径

\8. avatar_path = db.Column(db.String(256), nullable=False, default=’images/doraemon.jpg’)

questions_info表(Questions模型)代码如下:

\1. class Questions(db.Model):

\2. tablename = ‘questions_info’

\3. id = db.Column(db.Integer, primary_key=True, autoincrement=True)

\4. title = db.Column(db.String(100), nullable=False)

\5. content = db.Column(db.TEXT, nullable=False)

\6. author_id = db.Column(db.Integer, db.ForeignKey(‘users_info.id’))

\7. create_time = db.Column(db.DateTime, nullable=False, default=datetime.now())

\8.

\9. author = db.relationship(‘Users’, backref=db.backref(‘questions’, order_by=create_time.desc()))

这个表存储所有问题的标题、内容、创建时间、作者ID,作者ID通过外键与用户表的ID关联,方式也很简单,在db.Column中用db.ForeignKey(‘users_info.id’)作为参数即可。
再看最后一条语句:

author = db.relationship(‘Users’, backref=db.backref(‘questions’, order_by=create_time.desc()))

db.relationship会自动找到两个表的外键,建立Questions和Users的关系,此时对于任意一个Questions对象question,通过question.author就可获得这个question的作者对应的Users对象,例如获取id为1的问题的作者姓名:

\1. question = Questions.query.filter(Questions.id == 1).first()

\2. author_name = question.author.username

db.relationship的第二个参数backref=db.backref(‘questions’, order_by=create_time.desc())则建立了一个反向引用,这样我们不仅可以使用question.author,还可以使用author.questions获得一个作者所有的问题,并通过order_by=create_time.desc()按创建时间倒序排列(网页的内容按时间倒序排列),返回的是一个Questions对象的列表,可以遍历它获取每个对象,如获取作者Harp的所有问题的title:

\1. author = Users.query.filter(Users.username == ‘Harp’).first()

\2. for question in author.questions:

\3. print(question.title)

flask-migrate 数据库迁移(用途:更新表结构)

我们增加了两个模型Questions和Comments,并为Users增加了avatar_path这个字段,然后通过这段代码更新到数据库:

\1. with app.test_request_context():

\2. db.drop_all()

\3. db.create_all()

因为当使用过db.create_all()之后,再次直接使用db.create_all(),对模型的修改并不会更新到数据库,我们要使用db.drop_all()先把数据库中所有的表先删除掉,然后再db.create_all()一次。听上去是不是很麻烦?更糟糕的是,原先数据库的的数据也就没有了。所以我们不用这种简单粗暴的方式去更新数据库结构,而是借助flask-migrate这个专门用于迁移数据库的工具,它可以在保留数据库原始数据的情况下,完成模型的更新。此外,我们还将结合flask-script一起使用,简单来说flask-script让我们可以使用命令行去完成数据库迁移的操作。

在项目主文件夹下新建一个manage.py,代码如下:

\1. from flask_script import Manager

\2. from flask_migrate import Migrate, MigrateCommand

\3. from HarpQA import app, db

\4. from models import Users, Questions, Comments

\5.

\6. manager = Manager(app)

\7.

\8. migrate = Migrate(app, db)

\9.

\10. manager.add_command(‘db’, MigrateCommand)

\11.

\12.

\13. if name == ‘main‘:

\14. manager.run()

首先导入相关的类,注意模型要全部导入过来,即使代码中并没有显式地使用它们。然后传入app或db来构建Manager和Migrate两个类的实例,最后将MigrateCommand的命令加入到manager中。

此时我们假设要更新模型的结构,在models.py的User模型结尾添加一行代码test = db.Column(db.Integer),然后点击PyCharm下方的Terminal,自动进入到了虚拟环境的命令行中,输入python manage.py db init来初始化,这一步主要是建立数据库迁移相关的文件和文件夹,只是在第一次需要使用。接着依次使用python manage.py db migrate和python manage.py db upgrade,待运行完成,查看users_infor表的结构,结果如下:

img

可以看到test字段已经添加到表中了。

18****werkzeug.security 哈希与字符串的加密与解密

werkzeug.security中的

generate_password_hash**这个函数,将字符串变成hash值。**

password=generate_password_hash(password1)

werkzeug.security的

check_password_hash**方法,它能验证哈希值是否与原始的密码是匹配的**

check_password_hash(加密后的哈希, 需要对比的非哈希密码)

19****request.method == ‘GET’

\1. @app.route(‘/question/‘, methods=[‘GET’, ‘POST’])

\2. def question():

\3. if request.method == ‘GET’:

\4. return render_template(‘question.html’)

\5. else:

\6. question_title = request.form.get(‘question_title’)

\7. question_desc = request.form.get(‘question_desc’)

\8. author_id = Users.query.filter(Users.username == session.get(‘username’)).first().id

\9. new_question = Questions(title=question_title, content=question_desc, author_id=author_id)

\10. db.session.add(new_question)

\11. db.session.commit()

\12. return redirect(url_for(‘home’))

20****@app.before_request 钩子函数

@app.before_request这个钩子函数,看其名字就很好理解,是在**request**之前会自动运行的,我们在每次请求之前(或者说每次运行视图函数之前)。

例如通过钩子函数来得到当期登录用户的User对象(而不是仅仅是session中的username),然后在需要的地方使用它,代码如下:

\1. @app.before_request

\2. def my_before_request():

\3. username = session.get(‘username’)

\4. if username:

\5. g.user = Users.query.filter(Users.username == username).first()

这个钩子函数,从session中获取当前登陆的username,如果获取到了,再去检索Users模型,把返回的user对象存入到g对象中,在视图函数中我们就可以直接使用这个user对象的id/register_time等字段了。此时前面的视图函数中的

author_id = Users.query.filter(Users.username == session.get(‘username’)).first().id

可以修改成

author_id = g.user.id

g对象不能跨请求使用,因此在上下文管理器中用的是session,为什么这里又用了g对象呢?原因是现在有了钩子函数,每次请求都会执行钩子函数,向g对象中写入user,所以上下文管理器一直都能从g对象中取到user,不管这个g对象是属于哪次请求的。

21****request.files[name] 获取上传的文件

与获取POST数据类似(上传文件其实也是使用POST方法),在flask中使用request.files[name]获取上传的文件,其中name为对应input控件的name值(name=”avatar_upload”),然后使用文件的save方法即可保存。例如:

\1. @app.route(‘/user/avatar/‘, methods=[‘GET’, ‘POST’])

\2. def avatar():

\3. if request.method == ‘GET’:

\4. return render_template(‘avatar.html’)

\5. else:

\6. file = request.files[‘avatar_upload’]

\7. path = “D:\Flask\HarpQA\static\“

\8. file.save(path + file.filename)

\9. return ‘Saved’

注意save方法要加上具体的路径,默认不会保存到py文件所在的路径,而是系统的根目录,此时会提示Permission denied。

22****set_cookie 设置cookie

make_response方法生成一个response对象(这个对象有set_cookie方法,这也是Flask设置cookie的常规方法),并为其设置cookie,set_cookie第一个参数’sid’是key,第二个参数是value(session id),之后返回response对象。当请求是GET时候,首先就会使用request.cookies.get(‘sid’)去获取cookie中的session id

23**、**make_response

make_response(),相当于DJango中的HttpResponse。

· 返回内容

\1. from flask import make_response

\2.

\3. @app.route(‘/makeresponse/‘)

\4. def make_response_function():

\5. response = make_response(‘

羞羞哒

‘)

\6. return response, 404

· 返回页面

\1. from flask import make_response

\2.

\3. @app.route(‘/makeresponse/‘)

\4. def make_response_function():

\5. temp = render_template(‘hello.html’)

\6. response = make_response(temp)

\7. return response

>>>注意:make_response 想要返回页面,不能直接写做:make_response(‘hello.html’),必须用render_template(‘hello.html’)形式。

· 返回状态码

>>>**方式一:在make_response()中传入状态码**

\1. from flask import make_response

\2.

\3. @app.route(‘/makeresponse/‘)

\4. def make_response_function():

\5. temp = render_template(‘hello.html’)

\6. response = make_response(temp, 200)

\7. return response

>>>**方式二:直接return状态码**

\1. from flask import make_response

\2.

\3. @app.route(‘/makeresponse/‘)

\4. def make_response_function():

\5. temp = render_template(‘hello.html’)

\6. response = make_response(temp)

\7. return response, 200

24**redirect **跳转

flask中的 redirect 相当于 DJango中的 HttpResponseRedirect。

1.**参数是url形式**

\1. from flask import redirect

\2.

\3. @app.route(‘/redirect/‘)

\4. def make_redirect():

\5. return redirect(‘/hello/index/‘)

2.**参数是 name.name 形式**

url_for 相当于reverse,name.name 相当于django中的namespace:name,第一个name是初始化蓝图时的参数名,第二个name是函数名

img

\1. from flask import redirect

\2.

\3. @blue.route(‘/redirect/‘)

\4. def make_redirect():

\5. return redirect(url_for(‘first.index’))

25**request的属性**

\1. #代码示例,仅仅是为了测试request**的属性值

\2. @app.route(‘/login’, methods = [‘GET’,’POST’])

\3. def login():

\4. if request.method == ‘POST’:

\5. if request.form[‘username’] == request.form[‘password’]:

\6. return ‘TRUE’

\7. else:

\8. #form**中的两个字段内容不一致时,返回我们所需要的测试信息

\9. return str(request.headers) #**需要替换的部分

\10. else:

\11. return render_template(‘login.html’)

1**、method:请求的方法**

return request.method #POST

2**、form:返回form的内容**

return json.dumps(request.form) #{“username”: “123”, “password”: “1234”}

3**、args和values:args返回请求中的参数,values返回请求中的参数和form**

\1. return json.dumps(request.args) #urlhttp://192.168.1.183:5000/login?a=1&b=2**、返回值:**{"a": “1”, “b”: “2”}

\2. return str(request.values) #CombinedMultiDict([ImmutableMultiDict([(‘a’, ‘1’), (‘b’, ‘2’)]), ImmutableMultiDict([(‘username’, ‘123’), (‘password’, ‘1234’)])])

4**、cookies:cookies信息**

return json.dumps(request.cookies) #cookies**信息

5**、headers:请求headers信息,返回的结果是个list**

\1. return str(request.headers) #headers**信息

\2. request.headers.get(‘User-Agent’) #获取User-Agent**信息

6**、url、path、script_root、base_url、url_root:看结果比较直观**

\1. return ‘url: %s , script_root: %s , path: %s , base_url: %s , url_root : %s’ % (request.url,request.script_root, request.path,request.base_url,request.url_root)

\2. #url: http://192.168.1.183:5000/testrequest?a&b , script_root: , path: /testrequest , base_url: http://192.168.1.183:5000/testrequest , url_root : http://192.168.1.183:5000/

7**、date、files:date是请求的数据,files随请求上传的文件**

\1. @app.route(‘/upload’,methods=[‘GET’,’POST’])

\2. def upload():

\3. if request.method == ‘POST’:

\4. f = request.files[‘file’]

\5. filename = secure_filename(f.filename)

\6. #f.save(os.path.join(‘app/static’,filename))

\7. f.save(‘app/static/‘+str(filename))

\8. return ‘ok’

\9. else:

\10. return render_template(‘upload.html’)

\11.

\12. #html

\13.

\14.

\15.

\16.

\17.

\18.

\19.

\20.

\21.

26**jsonify flask提供的json格式数据处理方法**

python的flask框架为用户提供了直接返回包含json格式数据响应的方法,即jsonify

在flask中使用jsonify和json.dumps的区别

27**g对象**

1.在flask中,有一个专门用来存储用户信息的g对象,g的全称的为global。
2.g对象在一次请求中的所有的代码的地方,都是可以使用的。

flask之g对象

28**Flask-HTTPAuth **扩展 HTTP认证

安装:

pip install flask-httpauth

Flask-HTTPAuth提供了几种不同的Auth方法,比如HTTPBasicAuth**HTTPTokenAuthMultiAuth**HTTPDigestAuth

· HTTPBasicAuth****基础认证

创建扩展对象实例

\1. from flask import Flask

\2. from flask_httpauth import HTTPBasicAuth

\3.

\4. app = Flask(name)

\5. auth = HTTPBasicAuth()

注意,初始化实例时不需要传入app对象,也不需要调用”auth.init_app(app)”注入应用对象。

案例:用户名及密码验证

我们所要做的,就是实现一个根据用户名获取密码的回调函数:

@auth.get_password 的使用(明文密码有效)

\1. users = [

\2. {‘username’: ‘Tom’, ‘password’: ‘111111’},

\3. {‘username’: ‘Michael’, ‘password’: ‘123456’}

\4. ]

\5.

\6. @auth.get_password

\7. def get_password(username):

\8. for user in users:

\9. if user[‘username’] == username:

\10. return user[‘password’]

\11. return None

回调函数”get_password()”由装饰器”@auth.get_password”修饰。在函数里,我们根据传入的用户名,返回其密码;如果用户不存在,则返回空。

@auth.login_required 的使用

接下来,我们就可以在任一视图函数上,加上”@auth.login_required”装饰器,来表示该视图需要认证:

\1. @app.route(‘/‘)

\2. @auth.login_required

\3. def index():

\4. return “Hello, %s!” % auth.username()

启动该应用,当你在浏览器里打开”http://localhost:5000/”,你会发现浏览器跳出了下面的登录框,输入正确的用户名密码(比如上例中的Tom:111111)后,”Hello Tom!”的字样才会显示出来。

img

进入浏览器调试,发现认证并没有启用Cookie,而是在请求头中加上了加密后的认证字段:

Authorization: Basic TWljaGFlbDoxMjM0NTY=

这就是”HTTPBasicAuth”认证的功能,你也可以用Curl命令来测试:

curl -u Tom:111111 -i -X GET http://localhost:5000/

@auth.verify_password 的使用(非明文密码有效)

上例中”@auth.get_password”**回调只对明文的密码有效,但是大部分情况,我们的密码都是经过加密后才保存的,这时候,我们要使用另一个回调函数”@auth.verify_password”。在演示代码之前,先要介绍Werkzeug**库里提供的两个方法:

· generate_password_hash: 对于给定的字符串,生成其加盐的哈希值

· check_password_hash: 验证传入的哈希值及明文字符串是否相符

这两个方法都在”werkzeug.security”包下。现在,我们要利用这两个方法,来实现加密后的用户名密码验证:

\1. from werkzeug.security import generate_password_hash, check_password_hash

\2.

\3. users = [

\4. {‘username’: ‘Tom’, ‘password’: generate_password_hash(‘111111’)},

\5. {‘username’: ‘Michael’, ‘password’: generate_password_hash(‘123456’)}

\6. ]

\7.

\8. @auth.verify_password

\9. def verify_password(username, password):

\10. for user in users:

\11. if user[‘username’] == username:

\12. if check_password_hash(user[‘password’], password):

\13. return True

\14. return False

在”@auth.verify_password”所修饰的回调函数里,我们验证传入的用户名密码,如果正确的话返回True,否则就返回False。

错误处理

在之前的例子中,如果未认证成功,服务端会返回401状态码及”Unauthorized Access”文本信息。你可以重写错误处理方法,并用”@auth.error_handler”装饰器来修饰它:

\1. from flask import make_response, jsonify

\2.

\3. @auth.error_handler

\4. def unauthorized():

\5. return make_response(jsonify({‘error’: ‘Unauthorized access’}), 401)

有了上面的”unauthorized()”方法后,如果认证未成功,服务端返回401状态码,并返回JSON信息”{‘error’: ‘Unauthorized access’}”。

· HTTPTokenAuth**Token认证**

在对HTTP形式的API发请求时,大部分情况我们不是通过用户名密码做验证,而是通过一个令牌,也就是Token来做验证。此时,我们就要请出Flask-HTTPAuth扩展中的HTTPTokenAuth对象。

同HTTPBasicAuth类似,它也提供”login_required”装饰器来认证视图函数,”error_handler”装饰器来处理错误。

@auth.verify_token 的使用

区别是,它没有”verify_password”装饰器,相应的,它提供了”verify_token”装饰器来验证令牌。我们来看下代码,为了简化,我们将Token与用户的关系保存在一个字典中:

\1. from flask import Flask, g

\2. from flask_httpauth import HTTPTokenAuth

\3.

\4. app = Flask(name)

\5. auth = HTTPTokenAuth(scheme=’Bearer’)

\6.

\7. tokens = {

\8. “secret-token-1”: “John”,

\9. “secret-token-2”: “Susan”

\10. }

\11.

\12. @auth.verify_token

\13. def verify_token(token):

\14. g.user = None

\15. if token in tokens:

\16. g.user = tokens[token]

\17. return True

\18. return False

\19.

\20. @app.route(‘/‘)

\21. @auth.login_required

\22. def index():

\23. return “Hello, %s!” % g.user

可以看到,在”verify_token()”方法里,我们验证传入的Token是否合法,是的话返回True,否则返回False。另外,我们通过Token获取了用户信息,并保存在全局变量g中,这样视图中可以获取它。注意,在第一节的例子中,我们使用了”auth.username()”来获取用户名,但这里不支持。

初始化HTTPTokenAuth对象时,我们传入了”scheme=’Bearer’”。这个scheme,就是我们在发送请求时,在HTTP头”Authorization”中要用的scheme字段。

启动上面的代码,并用Curl命令来测试它:

curl -X GET -H “Authorization: Bearer secret-token-1” http://localhost:5000/

HTTP头信息”Authorization: Bearer secret-token-1″,”Bearer”就是指定的scheme,”secret-token-1″就是待验证的Token。在上例中,”secret-token-1″对应着用户名”John”,所以Token验证成功,Curl命令会返回响应内容”Hello, John!”。

使用**itsdangerous**库来管理令牌

itsdangerous库提供了对信息加签名(Signature)的功能,我们可以通过它来生成并验证令牌。使用前,先记得安装”pip install itsdangerous”。现在,让我们先来产生令牌,并打印出来看看:

\1. from itsdangerous import TimedJSONWebSignatureSerializer as Serializer

\2.

\3. app = Flask(name)

\4. app.config[‘SECRET_KEY’] = ‘secret key here’

\5. serializer = Serializer(app.config[‘SECRET_KEY’], expires_in=1800)

\6.

\7. users = [‘John’, ‘Susan’]

\8. for user in users:

\9. token = serializer.dumps({‘username’: user})

\10. print(‘Token for {}: {}\n’.format(user, token))

这里实例化了一个针对JSON的签名序列化对象serializer,它是有时效性的,30分钟后序列化后的签名即会失效。让我们运行下程序,在控制台上,会看到类似下面的内容:

\1. Token for John: eyJhbGciOiJIUzI1NiIsImV4cCI6MTQ2MzUzMzY4MCwiaWF0IjoxNDYzNTMxODgwfQ.eyJ1c2VybmFtZSI6IkpvaG4ifQ.ox-64Jbd2ngjQMV198nHYUsJ639KIZS6RJl48tC7-DU

\2.

\3. Token for Susan: eyJhbGciOiJIUzI1NiIsImV4cCI6MTQ2MzUzMzY4MCwiaWF0IjoxNDYzNTMxODgwfQ.eyJ1c2VybmFtZSI6IlN1c2FuIn0.lRx6Z4YZMmjCmga7gs84KB44UIadHYRnhOr7b4AAKwo

接下来,改写”verify_token()”方法:

\1. @auth.verify_token

\2. def verify_token(token):

\3. g.user = None

\4. try:

\5. data = serializer.loads(token)

\6. except:

\7. return False

\8. if ‘username’ in data:

\9. g.user = data[‘username’]

\10. return True

\11. return False

我们通过序列化对象的”load()”方法,将签名反序列化为JSON对象,也就是Python里的字典。然后获取字典中的用户名,如果成功则返回True,否则返回False。这样,就实现了加密后的令牌认证了,让我们用Curl测试一下,还记得刚才控制台上打印出的令牌吗?

curl -X GET -H “Authorization: Bearer eyJhbGciOiJIUzI1NiIsImV4cCI6MTQ2MzUzMzY4MCwiaWF0IjoxNDYzNTMxODgwfQ.eyJ1c2VybmFtZSI6IkpvaG4ifQ.ox-64Jbd2ngjQMV198nHYUsJ639KIZS6RJl48tC7-DU” http://localhost:5000/

· MultiAuth**:** 多重认证

Flask-HTTPAuth扩展还支持几种不同认证的组合,比如上面我们介绍了HTTPBasicAuth和HTTPTokenAuth,我们可以将两者组合在一起,其中任意一个认证通过,即可以访问应用视图。实现起来也很简单,只需将不同的认证实例化为不同的对象,并将其传入MultiAuth对象即可。大体代码如下:

\1. from flask_httpauth import HTTPBasicAuth, HTTPTokenAuth, MultiAuth

\2.

\3. …

\4.

\5. basic_auth = HTTPBasicAuth()

\6. token_auth = HTTPTokenAuth(scheme=’Bearer’)

\7. multi_auth = MultiAuth(basic_auth, token_auth)

\8.

\9. …

\10.

\11. @basic_auth.verify_password

\12. …

\13.

\14. @token_auth.verify_token

\15. …

\16.

\17. @basic_auth.error_handler

\18. …

\19.

\20. @token_auth.error_handler

\21. …

\22.

\23. @app.route(‘/‘)

\24. @multi_auth.login_required

\25. def index():

\26. return ‘Hello, %s!’ % g.user

这里,每个认证都有自己的验证和错误处理函数,不过在视图上,我们使用”@multi_auth.login_required”来实现多重认证。大家可以使用Curl命令试验下。

· HTTPDigestAuth**:**

暂无举例

详情请点击Flask 扩展 HTTP认证–flask-httpAuth

29**return **返回问题

例如下面的代码返回的是空的,因为浏览器会解析html标签,但是我们标签里面没有内容所以无任何显示

img

如果想当做普通字符串打印出来就需要返回的content-type=text/plain,如下图所示,需要导入flask的make_response

img

现在我们在做一个有意思的测试(转发)

img

img

可以看到直接转发到了bing的网站

上面还有一种简单的写法就是如下图所示:

img

上面我们用逗号分隔的形式其实是我们的flask的元组,当你返回为一个元组的时候flask内部还是会把它自动变成一个response对象的。在返回回去。

改成下面这样,就是返回json格式的数据了,其实这就是web返回的本质,返回的本质都是都是字符串,只不过控制的因素在这个context-type,他控制了我们的客户端在接收到我们的返回的时候要怎么样的去解释我们的返回内容。

img

30**flask-login **的使用

当用户登录成功之后我们要产生一个票据,并且把这个票据写入cookie中,我们不仅负责写入票据还要负责读取票据,并且要管理这个票据,整个的登陆机制是非常繁琐的,所以我们自己去用cookie实现这一整套的管理机制是非常不明智的,很幸运的是flask给我们提供了插件flask-login,可以完全用这个插件来管理登陆信息。

导入插件

img

Python-Scrapy详细总结

Scrapy框架

什么是scrapy

Scrapy是一个为了爬取网站数据,提取结构性数据而编写的应用框架,我们只需要实现少量的代码,就能够快速的抓取。

Scrapy使用了Twisted异步网络框架,可以加快我们的下载速度。(Twisted(中文翻译)扭曲)

入门文档:https://scrapy-chs.readthedocs.io/zh_CN/0.24/intro/tutorial.html

image-20201018170247042

异步和非阻塞的区别.png

补充:
阻塞和非阻塞指的是拿到结果之前的状态
异步和同步指的是整个过程

功能 解释 scrapy
Scrapy Engine(引擎) 总指挥:负责数据和信号在不同模块的传递 scrapy已经实现
Scheduler(调度器) 一个队列,存放引擎发过来的request请求 scrapy已经实现
Downloader(下载器) 下载把引擎发过来的requests请求,并返回引擎 scrapy已经实现
Spider(爬虫) 处理引擎发来的response,提取数据,提取url,并交给引擎 需要手写
Item Pipeline(管道) 处理引擎传过来的数据,比如存储 需要手写
Downloader Middlewares(下载中间件) 可以自定义的下载扩展,比如设置代理 一般不用手写
Spider MiddlewaresSpider(中间件) 可以自定义requests请求和进行response过滤 一般不用手写

文件目录

1
2
3
4
5
6
7
scrapy.cfg    #项目的配置文件
items.py #提取要爬取的字段,字典保存爬取到的数据的容器
middlewares #自定义中间件的地方
pipelines.py #管道,保存数据
settings.py #项目的设置文件 设置文件,UA,启动管道
spiders #存储爬虫代码目录
itcast.py #写爬虫的文件

爬虫步骤:

1.创建一个scrapy项目

1
scrapy startproject mySpider   #mySpider是项目名字

2.生成一个爬虫

1
scrapy genspider itcast itcast.cn  #itcast是爬虫名字,"itcast.cn"限制爬虫地址,防止爬到其他网站

3.提取数据

1
完善spiders,使用xpath等方法

3.保存数据

1
pipelines中保存数据

启动爬虫

1
scrapy crawl 爬虫名字    #crawl(抓取的意思)

启动爬虫不打印日志

1
scrapy crawl 爬虫名字 --nolog

run.py启动爬虫

1
2
from scrapy import cmdline
cmdline.execute("scrapy crawl lagou".split())

image-20201018170258403

Scrapy运行流程

spider内容

spider翻页

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# -*- coding: utf-8 -*-
import scrapy
#导入items
from tencent.items import TencentItem

#自定义spider类,继承自scrapy.Spider
class ItcastSpider(scrapy.Spider):
name = 'itcast' #爬虫名字<爬虫启动时候使用:scrapy crawl itcast>
#允许爬取的范围,防止爬虫爬到了别的网站
allowed_domains = ['tencent.com']
#开始爬取的地址,下载中间件提取网页数据
start_urls = ['https://hr.tencent.com/position.php']
#数据提取方法,接收下载中间件传过来的response(响应)
def parse(self, response):
#处理start_url地址对应的响应
#提取数据
# reti = response.xpath("//div[@class='tea_con']//h3/text()").extract()
# print(reti)

#分组,[1:-1]切片,不要第一条数据
li_list = response.xpath("//table[@class="tablelist"]/tr")[1:-1]
for li in li_list:
#在item中定义要爬取的字段,以字典形式传入
item = TencentItem()
item["name"] = li.xpath(".//h3/text()").extract_first()
item["title"] = li.xpath(".//h4/text()").extract_first()
#yield可以返回request对象,BaseItem(items.py中的类),dict,None
yield item #yield传到pipeline
#找到下一页url地址
next_url = response.xpath('//a[@id="next"]/@href').extract_first()
#如果url地址的href="地址"不等于javascript:;
if next_url != "javascript:;":
next_url = "https://hr.tencent.com/"+ next_url
#把next_url的地址通过回调函数callback交给parse方法处理
yield scrapy.Request(next_url,callback=self.parse)

提取数据
response.xpath(‘//a[@id=”next”]/@href’)

body = response.text.replace(‘\n’, ‘’).replace(‘\r’, ‘’).replace(‘\t’, ‘’)
re.findall(‘<a title=”.?” href=”(.?)”‘, body)

从选择器中提取字符串:

  • extract() 返回一个包含有字符串数据的列表
  • extract_first()返回列表中的第一个字符串

注意:

  • spider中的parse方法名不能修改
  • 需要爬取的url地址必须要属于allow_domain(允许_域)下的连接
  • respone.xpath()返回的是一个含有selector对象的列表

为什么要使用yield?
让整个函数变成一个生成器,变成generator(生成器)有什么好处?
每次遍历的时候挨个读到内存中,不会导致内存的占用量瞬间变>高python3中range和python2中的xrange同理

scrapy.Request常用参数为
callback = xxx:指定传入的url交给那个解析函数去处理
meta={“xxx”:”xxx”}:实现在不同的解析函数中传递数据,配合callback用
dont_filter=False:让scrapy的去重不会过滤当前url,默认开启url去重
headers:请求头
cookies:cookies,不能放在headers中,独立写出来
method = “GET”:请求方式,(GET和POST)

爬取详细页和翻页

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# -*- coding: utf-8 -*-
import scrapy
from yangguang.items import YangguangItem

class YgSpider(scrapy.Spider):
name = 'yg'
allowed_domains = ['sun0769.com']
start_urls = ['http://wz.sun0769.com/index.php/question/questionType?type=4&page=0']

def parse(self, response):
tr_list = response.xpath("//div[@class='greyframe']/table[2]/tr/td/table/tr")
for tr in tr_list:
item = YangguangItem()
item["title"] = tr.xpath("./td[2]/a[@class='news14']/@title").extract_first()
item["href"] = tr.xpath("./td[2]/a[@class='news14']/@href").extract_first()
item["publish_date"] = tr.xpath("./td[last()]/text()").extract_first()
#执行进入url地址,再把item传到下面parse_detail,提取详细页的内容
yield scrapy.Request(item["href"],callback=self.parse_detail,meta={"item":item})
#翻页
#获取url地址
next_url = response.xpath("//a[text()='>']/@href").extract_first()
#如果下一页url地址不为空,进入下一页连接
if next_url is not None:
yield scrapy.Request(next_url,callback=self.parse)

#处理详情页
def parse_detail(self,response):
#item接收meta传过来的item,在item字典里继续为item添加内容
item = response.meta["item"]
#拿到详细页的内容
item["content"] = response.xpath("//div[@class='c1 text14_2']//text()").extract()
#拿到详细页的图片地址
item["content_img"] = response.xpath("//div[@class='c1 text14_2']//img/@src").extract()
#给图片前面加上http://wz.sun0769.com
item["content_img"] = ["http://wz.sun0769.com" + i for i in item["content_img"]]
#把item传给pipeline
yield item

items(存储爬取字段)

1
2
3
4
5
6
import scrapy
#scrapy.Item是一个字典
class TencentItem(scrapy.Item):
#scrapy.Field()是一个字典
url = scrapy.Field()
name = scrapy.Field()

使用pipeline(管道)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from demo1 import settings
import pymongo

class Demo1Pipeline(object):
def __init__(self):
#连接mongodb数据库(数据库地址,端口号,数据库)
client = pymongo.MongoClient(host=settings.MONGODB_HOST, port=settings.MONGODB_PORT)
#选择数据库和集合
self.db = client[settings.MONGODB_DBNAME][settings.MONGODB_DOCNAME]
def process_item(self, item, spider):
data = dict(item)
self.db.insert(data)

#完成pipeline代码后,需要在setting中设置开启
ITEM_PIPELINES = {
#开启管道,可以设置多个管道,'管道地址数值':越小越先执行
'mySpider.pipelines.MyspiderPipeline': 300,
}
# MONGODB 主机环回地址127.0.0.1
MONGODB_HOST = '127.0.0.1'
# 端口号,默认是27017
MONGODB_PORT = 27017
# 设置数据库名称
MONGODB_DBNAME = 'DouBan'
# 存放本次数据的表名称
MONGODB_DOCNAME = 'DouBanMovies'

第二种:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class MyspiderPipeline(object):
def __init__(self):
#连接mongodb数据库(数据库地址,端口号,数据库)
client = pymongo.MongoClient(host=settings.MONGODB_HOST, port=settings.MONGODB_PORT)
#选择数据库和集合
self.db = client[settings.MONGODB_DBNAME]

#实现存储方法,item是spider传过来的,spider就是自己写的爬虫
def process_item(self, item, spider):
table = ''
#通过spider参数,可以针对不同的Spider进行处理
#spider.name爬虫的名字
if spider.name == "itcast":
#如果爬虫的名字为itcast执行这里面的东西
table = self.db.表名
#如果爬虫的名字为itcast2执行这里面的东西
elif spider.name == "itcast2":
table = self.db.表名
table.insert(dict(item))

#也可以通过item参数,可以针对不同的Item进行处理
table = ''
if isinstance(item, 爬虫名字):
table = self.db.表名
table.insert(dict(item))

mysql存储

1
2
3
4
5
6
7
8
9
10
11
12
13
from pymysql import connect
import pymysql
class TxPipeline(object):
def __init__(self):
self.conn=connect(host='localhost',port=3306,db='txzp',user='root',passwd='root',charset='utf8')
self.cc = self.conn.cursor()
def process_item(self, item, spider):
print(item["title"],item["href"],item["number"],item["time"],item["duty"])
aa = (item["title"],item["href"],item["number"],item["time"],item["duty"],item["requirement"])
sql = '''insert into tx values (0,"%s","%s","%s","%s","%s","%s")'''
self.cc.execute(sql%aa)
self.conn.commit()#提交
# self.cc.close() #关闭游标会报错

注意

  • pipeline中process_item方法名不能修改,修改会报错
  • pipeline(管道)可以有多个
  • 设置了pipelines必须开启管道,权重越小优先级越高

为什么需要多个pipeline:

  • 可能会有多个spider,不同的pipeline处理不同的item的内容
  • 一个spider的内容可能要做不同的操作,比如存入不同的数据库中

简单设置LOG(日志)

为了让我们自己希望输出到终端的内容能容易看一些:
我们可以在setting中设置log级别
在setting中添加一行(全部大写):

1
LOG LEVEL="WARNING"

默认终端显示的是debug级别的log信息

logging模块的使用

scrapy中使用logging

1
2
3
#settings中设置
LOG_LEVEL=“WARNING”
LOG_FILE="./a.log" #设置日志保存的位置,设置会后终端不会显示日志内容
1
2
3
4
5
6
7
8
9
#打印logging日志
import logging
#实例化logging,显示运行文件的名字,不写不会显示运行文件的目录
logging = logging.getLogger(__name__)
#日志输出打印
logging.warning(item)

#打印内容(日志创建时间,运行文件的目录,日志级别,打印的内容)
2018-10-31 15:25:57 [mySpider.pipelines] WARNING: {'name': '胡老师', 'title': '高级讲师'}

普通项目中使用logging
具体参数信息:https://www.cnblogs.com/bjdxy/archive/2013/04/12/3016820.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#a.py文件

import logging
#level: 设置日志级别,默认为logging.WARNING
logging.basicConfig(level=logging.INFO,
format=
#日志的时间
'%(asctime)s'
#日志级别名称 : 当前行号
' %(levelname)s [%(filename)s : %(lineno)d ]'
#日志信息
' : %(message)s'
#指定时间格式
, datefmt='[%Y/%m/%d %H:%M:%S]')
#实例化logging,显示当前运行文件的名字,不写不会显示运行文件的目录
logging=logging.getLogger(__name__)

if __name__ == '__main__':
#日志级别打印信息
logging.info("this is a info log")

b.py文件使用a.py文件的logging(日志)

1
2
3
4
5
6
7
#b.py文件

from a import logging #导入a.py中的实例logging

if __name__ == '__main__':
#warning级别大于info也可以打印,debug级别小于info,不可以打印
logger.warning("this is log b")

日志级别:

  • debug   #调试
  • info     #正常信息
  • warning  #警告
  • error   #错误

如果设置日志级别为info,warning级别比info大,warning也可以打印,debug比info小不可以打印
如果设置日志级别为warning,info和debug都比warning小,不可以打印

把数据保存到mongodb中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#导入mongodb的包
from pymongo import MongoClient
#实例化client,建立连接
client = MongoClient(host='127.0.0.1',port = 27017)
#选择数据库和集合
collection = client["tencent"]["hr"]

class TencentPipeline(object):
def process_item(self, item, spider):
#传过来的数据是对象,把item转化为字典
item = dict(item)
#把数据存入mongodb数据库
collection.insert(item)
print(item)
return item

scrapy shell

Scrapy shell是一个交互终端,我们可以在未启动spider的情况下尝试及调试代码,也可以用来测试XPath表达式

使用方法:

1
2
命令行输入:
scrapy shell http://www.itcast.cn/channel/teacher.shtml

常用参数:

1
2
3
4
5
6
7
response.url:当前响应的url地址
response.request.url:当前响应对应的请求的url地址
response.headers:响应头
response.body:响应体,也就是html代码,默认是byte类型
response.body.decode():变为字符串类型
response.request.headers:当前响应的请求头
response.xpath("//h3/text()").extract():调试xpath,查看xpath可不可以取出数据

setting设置文件

为什么需要配置文件:

  • 配置文件存放一些公共的变量(比如数据库的地址,账号密码等)
  • 方便自己和别人修改
  • 一般用全大写字母命名变量名SQL_HOST=’192.168.0.1’

参考地址:https://blog.csdn.net/u011781521/article/details/70188171

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#常见的设置

#项目名
BOT_NAME = 'yangguang'
#爬虫位置
SPIDER_MODULES = ['yangguang.spiders']
NEWSPIDER_MODULE = 'yangguang.spiders'
#遵守robots协议,robots.txt文件
ROBOTSTXT_OBEY = True
#下载延迟,请求前睡3秒
DOWNLOAD_DELAY = 3
# 默认请求头,不能放浏览器标识
DEFAULT_REQUEST_HEADERS = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en',
"Cookie": "rfnl=https://www.guazi.com/sjz/dazhong/; antipas=2192r893U97623019B485050817",
}
#项目管道,保存数据
ITEM_PIPELINES = {
'yangguang.pipelines.YangguangPipeline': 300,
}

spiders文件使用settings的配置属性

1
2
3
4
#第一种
self.settings["MONGO_HOST"]
#第二种
self.settings.get("MONGO_HOST")

pipelines文件使用settings的配置属性

1
spider.settings.get("MONGO_HOST")

Scrapy中CrawlSpider类

深度爬虫

1
2
#创建CrawlSpider爬虫,就多加了-t crawl
scrapy genspider -t crawl cf gov.cn

第一种用法:提取内容页和翻页

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# -*- coding: utf-8 -*-
import scrapy
from scrapy.linkextractors import LinkExtractor
from scrapy.spiders import CrawlSpider, Rule
from tengxun.items import TengxunItem


class TxSpider(CrawlSpider):
name = 'tx'
allowed_domains = ['hr.tencent.com']
#第一次请求的url
start_urls = ['https://hr.tencent.com/position.php']
#rules自动提取url地址
rules = (
# 内容页,交给parse_item处理数据
Rule(LinkExtractor(allow=r'position_detail\.php\?id=\d+&keywords=&tid=0&lid=0'), callback='parse_item'),
# 翻页
Rule(LinkExtractor(allow=r'position\.php\?&start=\d+#a'), follow=True),
)
#处理内容页的数据
def parse_item(self, response):
item = TengxunItem()
#爬取标题
item["bt"] = response.xpath('//td[@id="sharetitle"]/text()').extract_first()
#爬取工作要求
item["gzyq"] = response.xpath('//div[text()="工作要求:"]/../ul/li/text()').extract()
yield item

第二种用法:提取标题页,内容,翻页

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# -*- coding: utf-8 -*-
import scrapy
from scrapy.linkextractors import LinkExtractor
from scrapy.spiders import CrawlSpider, Rule
from tengxun.items import TengxunItem

class Tx2Spider(CrawlSpider):
name = 'tx2'
allowed_domains = ['hr.tencent.com']
start_urls = ['https://hr.tencent.com/position.php']

rules = (
#翻页
Rule(LinkExtractor(allow=r'position\.php\?&start=\d+#a'), callback='parse_item', follow=True),
)
#标题页内容
def parse_item(self, response):
tr_list = response.xpath('//table[@class="tablelist"]/tr')[1:-1]
for tr in tr_list:
item = TengxunItem()
#爬取标题
item['bt'] = tr.xpath('./td/a/text()').extract_first()
#爬取url
item['url'] = tr.xpath('./td/a/@href').extract_first()
item['url'] = "https://hr.tencent.com/" + item['url']
yield scrapy.Request(
item['url'],
callback=self.parse_detail,
meta={"item":item}
)
#爬取内容
def parse_detail(self,response):
item = response.meta['item']
item['gzyq'] = response.xpath('//div[text()="工作要求:"]/../ul/li/text()').extract()
yield item
1
2
3
4
5
#LinkExtractor 连接提取器,提取url地址
#callback 提取出来的url地址的response会交给callback处理
#follow 当前url地址的响应是否重新进过rules来提取url地址
#提取详细页的url
#CrawlSpider会自动把url补充完整

Rule的匹配细节

1
2
3
Rule(LinkExtractor(allow=r'position_detail\.php\?id=\d+&keywords=&tid=0&lid=0'), callback='parse_item'),
#这里把该匹配的东西写成正则
#?和.别忘了转义\? \.
1
2
3
4
5
#LinkExtractor 连接提取器,提取url地址
#callback 提取出来的url地址的response会交给callback处理
#follow 当前url地址的响应是否重新进过rules来提取url地址
#提取详细页的url
#CrawlSpider会自动把url补充完整

注意点:

  • 用命令创建一个crawlspider的模板:scrapy genspider-t crawl 爬虫名字 域名,也可以手动创建
  • CrawiSpider中不能再有以parse为名字的数据提取方法,这个方法被CrawlSpider用来实现基础url提取等功能)
  • 一个Rule对象接收很多参数,首先第一个是包含url规则的LinkExtractor对象,常用的还有calback(制定满足规则的url的解析函数的字符串)和follow(response中提取的链接是否需要跟进)
  • 不指定callback函数的请求下,如果follow为True,满足该rule的url还会继续被请求
  • 如果多个Rule都满足某一个url,会从rules中选择第一个满足的进行操作

CrawlSpider补充(了解)

1
2
3
4
5
6
7
8
9
LinkExtractor更多常用链接

LinkExtractor(allow=r'/web/site0/tab5240/info\d+\.htm')

allow:满足括号中"正则表达式"的URL会被提取,如果为空,则全部匹配.
deny:满足括号中"正则表达式"的URL一定不提取(优先级高于allow).
allow_domains:会被提取的链接的domains.
deny_domains:一定不会被提取链接的domains.
restrict_xpaths:使用xpath表达式,和allow共同作用过滤链接,级xpath满足范围内的uri地址会被提取
1
2
3
4
5
6
7
8
9
rule常见参数:

Rule(LinkExtractor(allow=r'/web/site0/tab5240/info\d+\.htm'), callback='parse_item', follow=False),

LinkExtractor:是一个Link Extractor对象,用于定义需要提取的链接.
callback:从link_extractor中每获取到链接时,参数所指定的值作为回调函数
follow:是一个布尔(boolean)值,指定了根据该规则从response提取的链接是否需要跟进.如果callback为None,follow 默认设置为True,否则默认为False.
process_links:指定该spider中哪个的函数将会被调用,从link_extractor中获取到链接列表时将会调用该函数,该方法主要用来过滤url.
process_request:指定该spider中哪个的函数将会被调用,该规则提取到每个request时都会调用该函数,用来过滤request

Scrapy分布式爬虫

img

Scrapy分布式爬虫流程

Scrapy_redis之domz

domz相比于之前的spider多了持久化和request去重的功能
domz就是Crawlspider去重和持久化版本
不能分布式
可以分布式的是RedisSpider和RedisCrawlspider

Scrapy redis在scrapy的基础上实现了更多,更强大的功能,具体体现在:reqeust去重,爬虫持久化,和轻松实现分布式
官方站点:https://github.com/rmax/scrapy-redis

1
2
3
#spiders文件夹

爬虫内容和自己写的CrawlSpider没有任何区别
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
settings.py
#写上下面东西Crawlspider就可以去重了
#还可以持久化爬虫,关闭后,在开启继续爬取

DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter" #去重
SCHEDULER = "scrapy_redis.scheduler.Scheduler" #重写调度器(scheduler)
SCHEDULER_PERSIST = True #不清楚缓存,队列中的内容是否持久保存(开启后爬虫关闭后,下次开启从关闭的位置继续爬取)

ITEM_PIPELINES = {
#将数据保存到redis中,屏蔽这条命令
#'scrapy_redis.pipelines.RedisPipeline': 400,
}

#指定redis地址
REDIS_URL = 'redis://127.0.0.1:6379'
#也可以写成下面形式
#REDIS_HOST = "127.0.0.1"
#REDIS_PORT = 6379

我们执行domz的爬虫,会发现redis中多了一下三个键:

  • dmoz:requests  (zset类型)(待爬取)
    Scheduler队列,存放的待请求的request对象,获取的过程是pop操作,即获取一个会去除一个
  • dmoz:dupefilter  (set)(已爬取)
    指纹集合,存放的是已经进入scheduler队列的request对象的指纹,指纹默认由请求方法,url和请求体组成
  • dmoz:items  (list类型)(item信息)
    存放的获取到的item信息,在pipeline中开启RedisPipeline才会存入

Scrapy_redis之RedisSpider

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from scrapy_redis.spiders import RedisSpider

#继承RedisSpider
class MySpider(RedisSpider):
#指定爬虫名
name='myspider_redis'

#指定redis中start_urls的键,
#启动的时候只需要往对应的键总存入url地址,不同位置的爬虫就会来获取该url
#所以启动爬虫的命令分类两个步骤:
#(1)scrapy crawl myspider_redis(或者scrapy runspider myspider_redis)让爬虫就绪
#(2)在redis中输入lpush myspider:start_urls"http://dmoztools.net/"让爬虫从这个ur开始爬取
redis_key ='myspider:start_urls'

#手动指定allow_domain,执行爬虫范围
#可以不写
allow_doamin=["dmoztools.net"]

def parse(self, response):
#普通scrapy框架写法
...

启动

1
2
3
4
5
#爬虫名字
scrapy runspider myspider
或(2选1)
#蜘蛛文件名字
scrapy runspider myspider.py

redis运行

1
2
#redis 添加 键:值 "爬取的网址"
redis-c1i lpush guazi:start_urls "http://ww.guazi.com/sjz/dazhong/"

Scrapy_redis之RedisCrawlSpider

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from scrapy.spiders import Rule
from scrapy.linkextractors import LinkExtractor
from scrapy_redis.spiders import RedisCrawlSpider

#继承RedisCrawlSpider
class MyCrawler(RedisCrawlSpider):
#爬虫名字
name='mycrawler_redis'
#start_url的redis的键
redis_key='mycrawler:start_urls'
#手动制定all_domains,可以不写
allow_domains=["dmoztools.net"]
#和crawl一样,指定url的过滤规则
rules=(
Rule(LinkExtractor(),callback='parse_page',follow=True)

启动

1
2
3
4
5
#爬虫名字
scrapy runspider myspider
或(2选1)
#蜘蛛文件名字
scrapy runspider myspider.py

redis运行

1
2
#redis 添加 键:值 "爬取的网址"
redis-c1i lpush guazi:start_urls "http://ww.guazi.com/sjz/dazhong/"

快速启动爬虫

1
2
3
4
5
6
7
8
from scrapy import cmdline

cmdline.execute("scrapy crawl guazicrawl".split())

# import redis
#
# r=redis.StrictRedis()
# r.lpush("myspider:start_urls",[])

其他参数

  • 如果抓取的url不完整,没前面的url,可以使用下面方法
1
2
3
4
5
6
7
8
import urllib
a = http://www.baidu.com?123
b = ?456
#在程序中a可以使用response.url(响应地址)
#在pycharm中parse颜色会加深,不过没事
b = urllib.parse.urljoin(a,b)
print("b=%s"%b)
#b=http://www.baidu.com?456

存多个url或其他东西,可以用列表存储

1
2
3
4
#比如存图片连接,一个网页中有很多图片连接
item["img_list"] =[]
#extend追加的方式,注意后面用.extract()
item["img_list"].extend(response.xpath('//img[@class="BDE_Image"]/@src').extract())

数据保存方式(包含:图片、文件的下载)