day01 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 大数据的数据单位: 1T = 1024G 1P = 1024T 1E = 1024P 1:服务器 服务器也是一台计算机,是一台高性能的计算机 高处理能力(硬件: 可热插拔磁盘,冗余配置的CPU,强大的网络处理能力) 高扩展性 (硬盘和CPU) 高可靠性 (高可用(HA)) 2:硬盘 机械硬盘 固态硬盘: NOR Flash(韩国三星) Nand Flash(韩国三星) 高纯硅(日本) 光刻机(荷兰:AMSL) 3:交换机 IP地址:可以从逻辑上唯一的定位网络中的一台主机 MAC地址:从物理上可以唯一的定位一台主机 流控 :控制数据流速度 128Kbit/s 4:局域网 局域网: 城域网: 广域网:(全球互联网) 机架感知: 5:配置MAC和IP MAC地址的路径: vim /etc/udev/rules.d/70-persistent-net.rules IP地址修改路径: vim /etc/sysconfig/network-scripts/ifcfg-eth0 6:Zookeer的概述 1:Zookeeper是一个分布式的协调框架(协调人) 2:Zookeeper是一个集群(好多台主机都安装Zookeeper软件,形成一个整体) 3:Zookeeper集群对外提供一个文件系统: / app1 app2 4:Zookeeper中的每一个目录或者文件被称为znode 5:Zookeeper中的znode既具有文件的特性,又具有目录的特性 znode可以存数据 znode可以有子节点 7:Zookeeper的架构 1:Zookeeper是一个主从架构 老大: Leader 通过选举得到,如果leader挂掉,则重新选举 小弟: Follower Observer 2:Zookeeper的操作 事务性操作: 写操作 非事务性操作:读操作 3:架构组件的分工 Leader : 1:负责集群整理的管理工作 2:负责发起投票 3:负责处理事务性操作和非事务性操作 Follower: 1:负责非事务性操作 2:如果遇到事务性操作,则转发给Leader 3:参与投票 Observer: 1:负责非事务性操作 2:如果遇到事务性操作,则转发给Leader 3:不能参与投票 8:Zookeeper的应用 1:发布和订阅 2:命名服务 让集群对外提供一个唯一的访问路径,通过这个访问路径可以找到某个节点文件 3:分布式协调/通知 心跳机制 4:分布式锁 写锁: 独占锁 同一时刻只能有一个进程可以得到这个锁 读锁: 共享锁 同一时刻,可以有多个进程共享这个锁 5:分布式队列 如果在集群中有多个任务需要按照一定的顺序执行,则可以将这些任务放入分布式队列中 A -----> B -------->C 9:Zookeeper的选举机制 1:zxid 该数值越大表示数据越新 2:myid 每个Zookeeper集群中,主机的编号 3:选举时有个过半机制 10:Zookeeper的数据模型 1:Zookeeper中存储的数据大小一般不会超过1M 2:Zookeeper中访问数据只能使用绝对路径 3:每个 Znode 由 3 部分组成: stat:此为状态信息, 描述该 Znode 的版本, 权限等信息 data:与该 Znode 关联的数据 children:该 Znode 下的子节点 4:Znode的节点类型 PERSISTENT:永久节点 EPHEMERAL:临时节点 PERSISTENT_SEQUENTIAL:永久节点、序列化 EPHEMERAL_SEQUENTIAL:临时节点、序列化 5:Znode的节点操作 1:客户端登录znode bin/zkCli.sh -server node02:2181 #方式1 bin/zkCli.sh #方式2 2:命令操作 永久节点 create /hello world 永久序列化 create -s /hello world 临时节点 create -e /hello world 临时序列化 create -s -e /hello world 创建子节点: create /hello/aaa world #临时节点不能创建子节点 修改节点数据 set /hello zookeeper 删除节点, 如果要删除的节点有子Znode则无法删除 delete /hello 删除节点, 如果有子Znode则递归删除 rmr /abc 列出历史记录 histroy 获取节点属性: get /hello ephemeralOwner = 0x36d14c88edf0001 #临时节点 ephemeralOwner = 0x0 #永久节点
day02 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 1:Hadoop介绍 1:Hadoop基于Nutch项目,起源于谷歌的两篇论文 GFS(谷歌文件系统) ---->解决分布式存储问题 ---->HDFS(Hadoop 分布式文件系统) MapReduce------>解决分布式计算问题 2:Hadoop的定义 狭义: HDFS 分布式存储 MapReduce 分布式计算 Yarn 分布式资源调度系统 广义: 生态圈 Hadoop,MapReduce,Hive,flume,azkaban,impala,oozie,hue,spark,flink storm,Hbase,kafka 2:Hadoop版本 版本: 1.x 2.x 引入yarn 3.x 最新的,引入了多namenode 发行公司: apache版: 优势: 版本更新快 缺点: 版本之间兼容性差 cloudera版: 基于apache版的Hadoop 优势: 版本兼容性好 缺点: 商业版要收费 3:Hadoop的架构 3.1 1.x 架构 分布式存储(HDFS) 主节点: namenode 辅助节点: SecondaryNamenode 从节点: datanode 分布式计算: 主节点: JobTracker 从节点: TaskTracker 3.1 2.x 架构 分布式存储(HDFS) 主节点: namenode 辅助节点: SecondaryNamenode 从节点: datanode 分布式计算: 主节点: ResourceManger 从节点: NodeManager 4:HDFS Hadoop Distributed File System 分布式文件系统 HDFS特点: 1:高延时(慢): 2:高吞吐量(P级) : 3:hdfs中文件的数据不能随机访问(HBase) 4:hdfs不适合存储小文件(小于128M) 1个文件: 1条元数据信息 100个1K文件 ---->100条元数据 ---> 15000字节 100K ----->1条元数据 ---> 150字节 5:hdfs可以有很强的扩展能力
day03 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 1:Hadoop适合的应用场景 1:高吞吐量,高延时 2:一次写入,多次读取 3:具有高扩展性和容错性 2:Hadoop不适合的应用场景 1: 需要低延时 2: 不适合存储小文件 1个文件--->一条元数据--->150字节---->namenode的内存中 1T文件 每一个文件是一个字节: 1024*1024*1024*1024*1024个 每一个文件1T: 1个 3: 不支持数据的任意修改和读写 3:hdfs的组成 1:Client:就是客户端。 a:文件切分文件上传 HDFS 的时候,Client 将文件切分成一个个的Block,然后进行存储。 b:与 NameNode 交互,获取文件的位置信息。 c:与 DataNode 交互,读取或者写入数据。 d:Client 提供一些命令来管理 和访问HDFS(hdfs dfs -put a.txt /) 2:NameNode:就是 master,它是一个主管、管理者。 a:管理 HDFS 的名称空间,对外提供一个统一的访问路径 b:管理数据块(Block)映射信息,告诉客户端,一个文件的每一个切片(block)所在的位置 c:配置副本策略,每个副本存放在哪台主机 d:处理客户端读写请求 3:DataNode:就是Slave。NameNode 下达命令,DataNode 执行实际的操作。 a:存储实际的数据块。 b:执行数据块的读/写操作 c:不断的向namenode发送心跳包和汇报自己的block信息 4:Secondary NameNode a:对namenode做辅助性的操作,不能替代namenode b:SecondaryNameNode可以定期的将fsimage文件和edits文件进行合并,合并一个新的fsimage,并且 替换原来的fsimage,减轻namenode的压力 合并的触发条件 1: 时间 一个小时 2: 文件大小 edits文件达到64M 4:HDFS的block和副本机制 1:每一个block默认是128M,如果不够128M,也称为一个block,所以block是一个逻辑单位 2:每一个block默认是3个副本 3:每一个block的副本存放是根据机架感知来实现的 5:HDFS命令 hdfs dfs -ls / hdfs dfs -put a.txt / #将文件复制到hdfs hdfs dfs -put /root/a.txt /root/ hdfs dfs -mkdir /dir1 hdfs dfs -mkdir -p /dir1/dir11 #递归创建文件夹 hdfs dfs -moveFromLocal a.txt /dir1/dir11 #将文件移动到hdfs hdfs dfs -get /a.txt ./ #将文件下载到当前目录 hdfs dfs -mv /dir1/a.txt /dir2 #将hdfs的文件移动到hdfs的另外一个路径 hdfs dfs -rm /a.txt #删除一个文件(删除文件之后移动到hdfs的垃圾桶,七天之后自动删除) hdfs dfs -rm -r /dir1 #递归删除一个文件夹(删除文件之后移动到hdfs的垃圾桶) hdfs dfs -cp -p /dir1/a.txt /dir2/b.txt #将hdfs的某个文件拷贝到hdfs的另外一个路径(深度拷贝) hdfs dfs -cat /a.txt #查看hdfs文件的内容 hdfs dfs -chmod -R 777 / #修改hdfs文件或者文件夹(加-R参数)权限 hdfs dfs -appendToFile a.xml b.xml /big.xml #将linux本地的文件合并之后上传到hdfs hdfs dfs -appendToFile /export/servers/hadoop-2.7.5/etc/hadoop/*.xml /big.xml 6:HDFS限额配置 6.1 文件个数限额 hdfs dfs -count -q -h /user/root/dir1 #查看配额信息 hdfs dfsadmin -setQuota 2 dir #设置N个限额数量,只能存放N-1个文件 hdfs dfsadmin -clrQuota /user/root/dir #清除个数限额配置 6.2 文件的大小限额 在设置空间配额时,设置的空间至少是block_size * 3(384M)大小 hdfs dfs -count -q -h /user/root/dir1 #查看配额信息 hdfs dfsadmin -setSpaceQuota 384M /user/root/dir # 限制空间大小384M dd if=/dev/zero of=1.txt bs=1G count=1 #生成1G的文件 hdfs dfsadmin -clrSpaceQuota /user/root/dir #清除空间限额配置 7:HDFS的安全模式 1:启动HDFS时,会自动进入安全模式 加载hdfs的元数据,保证数据的完整性 2:在HDFS的工作中,发现BLOCK副本率(当前的副本数/设置的副本数)不达标,则自动进入安全模式 保证Block的副本率达标,保证数据的完整性 3:在安全模式状态下,文件系统只接受读数据请求,而不接受删除、修改等变更请求 安全模式的操作命令: hdfs dfsadmin -safemode get #查看安全模式状态 hdfs dfsadmin -safemode enter #进入安全模式 hdfs dfsadmin -safemode leave #离开安全模式 8:HDFS的基准测试 在搭建好HDFS集群之后,需要对HDFS的性能进行测试 cd /export/servers hadoop jar /export/servers/hadoop-2.7.5/share/hadoop/mapreduce/hadoopmapreduce-client-jobclient-2.7.5.jar TestDFSIO -write -nrFiles 10 -fileSize 10MB 8.1 写入性能测试 #向HDFS文件系统中写入数据,10个文件,每个文件10MB,文件存放到hdfs的/benchmarks/TestDFSIO中 #写完之后,会在当前目录生成一个TestDFSIO_results.log文件,该文件就是测试报告 hadoop jar /export/servers/hadoop-2.7.5/share/hadoop/mapreduce/hadoopmapreduce-client-jobclient-2.7.5.jar TestDFSIO -write -nrFiles 10 - fileSize 10MB 8.2 读取性能测试 #从HDFS文件系统中读入10个文件,每个文件10M #写完之后,会在当前目录生成一个TestDFSIO_results.log文件,该文件就是测试报告 hadoop jar /export/servers/hadoop-2.7.5/share/hadoop/mapreduce/hadoopmapreduce-client-jobclient-2.7.5.jar TestDFSIO -read -nrFiles 10 - fileSize 10MB 100M * 3 = 300M
day04 HDFS-API 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 1 :在Windows下配置Hadoop的运行环境 第一步:将hadoop2.7.5 文件夹拷贝到一个没有中文没有空格的路径下面 第二步:在windows上面配置hadoop的环境变量: HADOOP_HOME,并将%HADOOP_HOME%\bin添加到path中 第三步:把hadoop2.7.5 文件夹中bin目录下的hadoop.dll文件放到系统盘:C:\Windows\System32 目录 第四步:关闭windows重启 2 :获取FileSystem的方式 @Test public void getFileSystem2 () throws Exception { FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020" ), new Configuration()); System.out.println("fileSystem:" +fileSystem); } 3 :HDFS的API操作 @Test public void mkdirs () throws Exception { FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020" ),new Configuration()); boolean mkdirs = fileSystem.create(new Path("/hello/mydir/test" )); fileSystem.close(); } 4 :HDFS文件流的获取 FSDataInputStream inputStream = fileSystem.open(new Path("/a.txt" )); FSDataOutputStream outputStream = fileSystem.create(new Path("/a.txt" )); fileSystem.copyFromLocalFile(new Path("D://set.xml" ), new Path("/" )); fileSystem.copyToLocalFile(new Path("/a.txt" ), new Path("D://a4.txt" )); 5 :HDFS的权限问题 1 :如果要让hdfs的权限生效,则需要修改hdfs-site.xml文件,修改如下: <property> <name>dfs.permissions</name> <value>true</value> </property> 2 :伪造用户:以某一个用户的身份去访问 FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020" ), new Configuration(),"root" ); 6 :HDFS的高可用(HA) 1 :HDFS高可用是有2 个namenode,一个是Active状态,一个是Standby状态 2 :Zookeeper用来解决两个namenode的单点故障问题,Journal Node用来保证两个namenode元数据的同步 7 :MapReduce的思想 MpReduce运行在yarn之上 阶段划分: Map阶段: 负责将一个大的任务划分成小的任务,小任务之间不能有依赖关系 Reduce阶段: 负责将Map阶段的结果进行汇总 8 :MapReduce的八个步骤 Map阶段2 个步骤 1. 设置 InputFormat 类, 将数据切分为 Key-Value(K1和V1) 对, 输入到第二步 2. 自定义 Map 逻辑(自己写代码), 将第一步的结果转换成另外的 Key-Value(K2和V2) 对, 输出结果 Shuffle 阶段 4 个步骤 3. 分区(Partition) 4. 排序(Sort) 5. 规约(Combiner) 6. 分组(Group By) Reduce阶段2 个步骤 7. 自定义Reduce逻辑(自己写代码)将新的K2和V2转为新的 Key-Value(K3和V3)输出 8. 设置 OutputFormat 处理并保存 Reduce 输出的K3和V3 数据
day06 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 1 :序列化和反序列化 序列化: 将一个内存的对象转为字节流保存到磁盘上,或者通过网络发送出去 反序列化: 从磁盘读取一个对象,或者从网络中接收一个对象 MapReduce并没有使用Java的序列化方式,认为Java的序列化方式态臃肿,自定提出了一套序列化方式 分区: 继承:Partitioner类 从写:getPartition方法,返回分区编号 在主类中设置: job.setPartitionerClass(FlowPartition.class); job.setNumReduceTasks(4 ); Java中序列化: 实现Serializable接口,不需要重写方法 MapReduce中序列化: 实现WritableComparable,实现了该接口,不仅可以实现序列化,还可以实现排序 public interface WritableComparable <T > extends Writable , Comparable <T > { void write (DataOutput out) ; void readFields (DataInput in) ; int compareTo (T var1) ; } 规约(Combiner): 1 :是在每一个MapTask之后,加上一个局部的聚合 2 :就相当于在每一个map之后进行了一次Reduce操作 3 :通过Combiner可以减少Map到Reduce之间数据的传输量 注意: 1 :不是所有的MR程序都可以使用聚合 2 :Combiner只是来优化MR的执行,不能去改变最终的结果 实现步骤: 1 :写一个MyCombiner类去继承Reducer类,重写reduce方法(也可以直接拷贝自定义Reducer类) 2 :在JobMain中设置自定义Combiner job.setCombinerClass(MyCombiner.class); 在MR中自定义的JavaBean必须实现序列化(实现Writable接口) 分组: 1. 继承WritableComparator 类2 : 调用父类的有参构造 public OrderGroupComparator () { super (OrderBean.class,true ); } 3 :指定分组的规则(重写方法) compare 方法 public int compare (WritableComparable a, WritableComparable b) { OrderBean first = (OrderBean)a; OrderBean second = (OrderBean)b; return first.getOrderId().compareTo(second.getOrderId()); }
简单总结 在主类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 继承 Configured 实现 Tool 接口 在Mian 方法中: 创建Configuration 对象 int run = ToolRunner.run(configuration, new mainclass(), args); System.exit(run); run方法中: 创建Job对象 Job job = Job.getInstance(super .getConf(), "jobname" ); job.setJarByClass(HFDSTest.class); 设置输入类,和输入路径 设置mapper类,和k,v类型 reduce类同上 设置输出类 job.waitForCompletion(true ); 执行返回结果 创建Job对象 分区: job.setPartitionerClass(SumCountPartition.class); job.setNumReduceTasks(4 ); Partitioner<Text,FlowBean> 重写:getPartition() 返回序号 排序/序列化: 实现Writable[Comparable] write 反序列化 readFields 序列化 compareTo 排序 规约: 在Map阶段做reduce阶段的任务 job.setCombinerClass(MyCombiner.class); 分组: 1. 继承WritableComparator 类2 : 调用父类的有参构造 public OrderGroupComparator () { super (OrderBean.class,true ); } 3 :指定分组的规则(重写方法) compare 方法 public int compare (WritableComparable a, WritableComparable b) { OrderBean first = (OrderBean)a; OrderBean second = (OrderBean)b; return first.getOrderId().compareTo(second.getOrderId()); } 4 :设置job.setGroupingComparatorClass(CustomGroupingComparator.class)
day07 day08 1 2 3 4 5 6 7 8 9 10 11 12 自定义输入输出: 输入继承 FileInputFormat RecordReader 返回 自定义RecordReader<>对象 :initialize()初始化方法 fileSplit= (FileSplit)inputSplit; configuration = taskAttemptContext.getConfiguration(); :nextKeyValue()该方法用于获取K1和V1 自定义输入: outputformat,改写其中的recordwriter<>,改写具体输出数据的方法write()
Hive 1 2 3 4 5 1:在hive中,一个数据库就是一个文件夹,一张表也是一个文件夹 2:向表中插入数据时,这些数据会生成表文件数据,表文件字段之间默认的分隔符是 '\001' 3:create table stu3 as select * from stu2; 拷贝表时,表的表文件数据要也一起拷贝 4:删除内部表,会将元数据和表数据一起删除 5:外部表表示共有,内部表示私有
1 2 3 4 5 6 7 8 9 10 6:给表添加数据: 方式1: 在linux vim student.txt hdfs dfs -put student.txt /user/hive/warehouse/mytest3.db/student 方式2: 在hive中 本地加载:表文件原来是在Linux本地,加载之后,做的复制操作 load data local inpath '/export/servers/hivedatas/student.csv' [overwrite] into table student; 方式3: hdfs加载:表文件原来是在hdfs上,加载之后,做的剪切操作 load data inpath '/hivedatas/student.csv' into table student;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 7:分区表 1:分区表就是分文件夹,就是对大数据进行分类管理 2:创建分区表 创建单个分区 create table score(s_id string ,c_id string , s_score int ) partitioned by (month string ) row format delimited fields terminated by '\t' ; 创建多个分区 create table score(s_id string ,c_id string , s_score int ) partitioned by (year string ,month string ,day string ) row format delimited fields terminated by '\t' ; 3:给分区添加数据 给单个分区添加数据: 创建单级文件夹 load data local inpath '/export/servers/hivedatas/score.csv' into table score partition (month ='201806' ); 给多个分区添加数据:创建多级文件夹 load data local inpath '/export/servers/hivedatas/score.csv' into table score partition (year ='1999' ,month ='12' ,day ='23' ); 4:分区表的查询 查询单分区表 select * from score where month = '201801' ; select * from score where month = '201801' or month = '201802' ; select * from score where month = '201801' union all select * from score where month = '201802' 查询多分区表: select * from score where year ='1999' and month = '12' and day = '01' ; 5:表模型 1:外部的分区表 create external table score(id int ,name string ,score int ) partitioned by (month string ) row format delimited fields terminated by '\t' location '/scoredatas' 2 :内部的分区表 create table score(id int ,name string ,score int ) partitioned by (month string ) row format delimited fields terminated by '\t' location '/scoredatas'
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 8:分桶表 1:分桶表就是分文件,就是对大数据进行分类管理 加载表数据:stu; 方式1: hdfs dfs -put /a.txt /stu 方式2: load data local inpath '/a.txt' into table /stu; 方式3(从hdfs加载): load data inpath '/a.txt' into table /stu; 方式4: create external table stu() ...... location '/scoredatas' ; msck repair table stu; 方式5: create external table stu as select * from A; hive 删库报错问题 在 hive bin 目录下执行下面这句话 ./schematool -dbType mysql -initSchema + | col_name | data_type | comment | + | id | int | | | name | string | | + create table if not exists stu3(id int ,name string ) row format delimited fields terminated by '\t' location '/user/stu2' ;
day10 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 1:select查询 order by 对全局的结果进行整体排序,要求只能有一个Reduce sort by 对每一个Reduce的结果进行局部排序,不能保证整体有序 distributed by 可以将原文件以某个字段作为分区的条件,将文件分到不同的区中,类似于MR中分区 cluster by(字段) 既可以分区,又可以对每一个分区的数据排序 除了具有distribute by的功能外,还会对该字段进行排序 如果distribute 和sort字段是同一个时,此时, cluster by = distribute by + sort by 2:正则表达式 /* 第1位: 1 第2位: 3 4 5 7 8 剩下9位: 0-9 */ String regex = "1[34578][0-9]{9}"; //指定规则 String phoneNum = "13888888888"; //字符串 boolean bl = phoneNum.match(regex); if(bl){ //你输入的手机号合法 }else{ //你输入的手机号非法 } 3:连表查询 3.1 内连接(求两张表的交集) 隐式内连接 select * from A a ,B b where a.id = b.id; 显式内连接 select * from A a inner join B b on a.id = b.id; 3.2 外连接 左外连接 select * from A a left join B b on a.id = b.id; 右外连接 select * from A a right join B b on a.id = b.id; 四张表联合查询: select * from teacher t left join course c on t.t_id = c.t_id left join score s on s.c_id = c.c_id left join student stu on s.s_id = stu.s_id; 4:hive的排序 order By(全局排序 ) !!!!!!!!!!! //排序,先按照s_id升序排序,在按照avg降序排列 select s_id ,avg(s_score) avg from score group by s_id order by s_id ,avg desc; sort By(对每一个Reduce局部排序) 第一步:设置Reduce个数 set mapreduce.job.reduces=3; 第二步:查询成绩按照成绩降序排列 select * from score sort by s_score 第三步:将查询后的结果保存到本地文件中 insert overwrite local directory '/export/servers/hivedatas/sort' select * from score sort by s_score; distributed By 和 sort by !!!!!!!!!!!! 第一步:设置Reduce个数 set mapreduce.job.reduces=7; 第二步:通过distribute by 进行数据的分区 insert overwrite local directory '/export/servers/hivedatas/sort' select * from score distribute by s_id sort by s_score; cluster by 1:先分区,然后对分区的数据排序 2: cluster By id 等价于 distributed By id + sort by id 5:Hive的参数配置 如果在运行hive时,需要给hive设置参数,则有三种方式 方式1: 配置文件 在hive安装目录conf/hive-site.xml 文件中配置 方式2:命令行参数 bin/hive -hiveconf hive.root.logger=INFO,console 方式3:参数声明 bin/hive set mapreduce.job.reduces=7; 设置级别优先级: 参数声明 > 命令行参数 > 配置文件参数(hive) 6:Hive的内置函数 查看所有的内置函数: show functions; 查看某个函数的详细用法: desc function extended upper; 常用的内置函数: #字符串连接函数:concat select concat("hello","world","java"); //结果 helloworldjava # 带分隔符字符串连接函数: concat_ws select concat_ws(',','hello','world','java');//结果 hello,world,java # cast类型转换 select cast(1.5 as int); //结果 1 # get_json_object(json 解析函数,用来处理json,必须是json格式) select get_json_object('{"name":"jack","age":"20"}','$.name'); //结果 jack # URL解析函数 select parse_url('http://facebook.com/path1/p.php?k1=v1&k2=v2#Ref1','HOST'); // facebook.com select parse_url('http://facebook.com/path1/p.php?k1=v1&k2=v2#Ref1','PATH'); // path1/p.php select parse_url('http://facebook.com/path1/p.php?k1=v1&k2=v2#Ref1','QUERY'); // k1=v1&k2=v2 select parse_url('http://facebook.com/path1/p.php?k1=v1&k2=v2#Ref1','QUERY',"k1"); // v1 7:Hive的自定义函数 UDF:一进一出 upper abs ceil floor UDAF:多进一出 max min avg concat UDTF:一进多出 "1,zhangsan,18" ---> 1 zhangsan 18 //将行转列 lateral view explore() 8:Hive的压缩 gzip bzip2 snappy 1:设置Map端压缩 意义:减少map和reduce之间数据传输量 1)开启hive中间传输数据压缩功能 set hive.exec.compress.intermediate=true; 2)开启mapreduce中map输出压缩功能 set mapreduce.map.output.compress=true; 3)设置压缩的方式 set mapreduce.map.output.compress.codec= org.apache.hadoop.io.compress.SnappyCodec; 4)测试 select count(1) from score; 2:设置Reduce端压缩 可以减少磁盘空间 1)开启hive最终输出数据压缩功能 set hive.exec.compress.output=true; 2)开启mapreduce最终输出数据压缩 set mapreduce.output.fileoutputformat.compress=true; 3)设置mapreduce最终数据输出压缩方式 set mapreduce.output.fileoutputformat.compress.codec =org.apache.hadoop.io.compress.SnappyCodec; 4)设置压缩类型为块压缩 set mapreduce.output.fileoutputformat.compress.type=BLOCK; 5)测试 set mapreduce.job.reduces = 7; insert overwrite local directory '/export/servers/snappy' select * from score distribute by s_id sort by s_id desc; 9:Hive的存储方式 9.1 行存储(TEXTFILE, SEQUENCEFILE) 特点: 1:select * from A 效率高 2:select a from A 效率低 3:如果进行数据压缩则效率低 9.2 列存储(ORC , PARQUET) 1:select * from A 效率低 2:select a from A 效率高 3:如果进行数据压缩则效率高 10:压缩方式和存储方式结合 1:创建一个指定存储格式的表 create table log_text ( track_time string, url string, )ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE / SEQUENCEFILE /ORC /PARQUET; 2:三种存储格式的压缩比和查询速度 压缩比: ORC > Parquet > TextFile 查询速度: ORC > TextFile > Parquet 3:压缩方式和存储方式结合 create table log_text ( track_time string, url string, )ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE / SEQUENCEFILE /ORC /PARQUET tblproperties ("orc.compress"="NONE/SNAPPY/GZIP/BZIP2");; 4:结论 存储方式: 选择 ORC和PARQUET 压缩方式: 选择 SNAPPY和BZIP2 11:Hive的调优 11.1 Fetch抓取 1)设置参数所有查询都走MapReduce set hive.fetch.task.conversion=none; 测试:select s_score from score limit 3; 2)设置参数所有查询都不走MapReduce set hive.fetch.task.conversion=more; 测试:select * from score; 11.2 本地模式 当计算任务不是特别大时,可以将所有的计算任务在本机完成,可以提高执行效率 1)开启本地模式,并执行查询语句 set hive.exec.mode.local.auto=true; select * from score cluster by s_id; 2)关闭本地模式,并执行查询语句 set hive.exec.mode.local.auto=false; select * from score cluster by s_id; 11.3 Map端join 当小表join大表时,则Hive默认会开启Map端join 11.4 Hive中的规约 在数据超过一定量时,则可以在map端完成聚合,就是规约 1)是否在Map端进行聚合,默认为True set hive.map.aggr = true; //无需设置 2)在Map端进行聚合操作的条目数目 set hive.groupby.mapaggr.checkinterval = 100000; //默认就是100000 3)有数据倾斜的时候进行负载均衡(默认是false) set hive.groupby.skewindata = true; //需要设置 11.5 去重求统计值(count(distinct)) 原始做法:select count(distinct s_id) from score; 改进做法:select count(s_id) from (select id from score group by s_id) a 12:动态分区
MapReduce总结 MapReduce思想: 核心: 分而治之,先分在和 应用场景: 复杂任务,没有依赖,以并行提供处理效率 脉络体现:
先map后reduce
map:把复杂的任务拆分成任务,局部进行计算,得出局部结果
reduce:把map的局部结果进行全局汇总,得到最终结果
MapReduce设计构思: 如何进行大数据处理? 先分在合,分而治之
抽象俩个函数模型: 输入输出都是以kv键值段
map:把复杂的任务拆分成任务,局部进行计算,得出局部结果
reduce:把map的局部结果进行全局汇总,得到最终结果
把这么做和做什么进行拆分: 程序负责复杂这么做(技术)
用户负责做什么(业务)
以上俩者合并起来才是完整的MR程序
MapReduce框架结构和编程规范: 代码层面: 类继承Mapper 重写map()—–负责map阶段的业务
类继承Reduce 重写reduce()—-负责reduce阶段的业务逻辑
客户端运行的主类(main)—–指定mr相关属性,提交程序
将以上三个打包为jar包
运行角度: MapTask:map阶段运行的task
ReduceTask:reduce阶段运行的task
MapReduceApplictionMaster(MrAppMaster):程序运行的主体,监督各task运行和mr程序的运行 ,负责跟yarn进行资源
案例WordCount: 环境开发版本问题:
Apache 2.7.4 优化了CDH2.6.0本地执行环境
数据类型和序列化机制:
Writable(接口) 认为java序列化机制臃肿 不利于大数据网络传递
重点:(MR执行流程):
序列化机制: 序列化机制概念: 进程网络间传递数据 数据变成字节流
Writable:
序列化方法:write(out)
反序列化:readField(in)
注意:先序列化,后反序列化
自定义排序: 本质(CompareTo): 0:相等
正数:大于
负数:小于
注意:谁大谁在后
倒序排序: 欺骗程序 :欺骗 大—>负数 小—>正数
对象实现接口: Compareable | WritableCompareable
自定义分区: 分区定义: 决定了map的输出key value在哪一个reduceTask上
默认分区规则: HashPartitioner(key.hashcode % NumReduceTasks)
实现自定义分区: 继承Partition类 重写getPartitions 该方法返回值就是分区的标号值
让自定义分区生效: job.setPartitionClass()
分区个数和reduceTask个数的关系: 应该保持相等
分区个数多 报错 非法分区
分区个数少 执行 空文件产生
Combiner(归约): 局部聚合组件 把每一个map的输出先进行局部聚合
优化了IO网络
本身就是reduce 只是范围小 不是全局
默认不是开启的
注意:慎重使用:因为顺序 个数在最终的结果 会发生变化。
并行度机制: 概念:所谓的并行度,指的是多个同时工作
maptask并行度(逻辑切片 归约):文件大小 个数 切片大小
reducetask并行度:代码设置 涉及全局计数 慎重使用
shuffle机制: 概念:是一个过程
从map输出数据开始到reduce接受数据作为输入之前
横跨了map reduce 阶段 中间横跨网络 是mr程序的核心 是执行效率最慢的原因。
数据压缩: 压缩目的:减少网络传输数据量,减少最终磁盘所占空间
压缩机制: map输出压缩:(影响网络传输的数量)
redcue的输出压缩:(磁盘所占的空间)
压缩算法: 推荐用:snappy
取决于Hadoop是否支持该压缩
检查是否支持本地库:hadoop chechnative
最好结合Hadoop编译 支持一部分压缩算法。
压缩的设置方式: 直接在map程序中 通过conf.set()—–只对本mr有效
修改xml配置文件 mapred-site.xml—–全局有效
优化参数: 包括:资源,容错,稳定性等——Hadoop官网api xxx.default.xml(查找弃用属性–Deprecated Properties)
大小文件之间的关联操作—(hive大小表之间的join(结合)) 把所有的数据以关联的字段作为key发送到同一个reduce处理
弊端:reduce join 压力大 可能发生数据倾斜
在map阶段完成数据之间的关联
map join 没有reduce阶段(numreducetask(0))part-m-00000
分布式缓存: 可以把指定的文件(压缩包 jar ) 发生给当下程序的每一个maptask
setup初始化方法: 把缓存的小文件加载到当前maptask运行的程序内存中
创建各种不同的数据集合类型 保存小文件数据
处理小文件场景: 默认切片机制:–>一个小文件一个切片—->一个切片一个maptask
CombineTextInputFormat:切片机制
小文件:
自定义分分组: 发生阶段: 调用reduce()方法之前
默认分组: 排好序的数据,根据前后俩个key是否相等(相等 或者 不相等)
自定义对象作为key: WritableComparator分组继承的类 注意:WritableComparable 排序实现接口
它是用来给Key分组的
它在ReduceTask中进行,默认的类型是GroupingComparator也可以自定义
WritableComparator为辅助排序手段提供基础(继承它),用来应对不同的业务需求
比如GroupingComparator(分组比较器)会在ReduceTask将文件写入磁盘并排序后按照Key进行分组,判断下一个key是否相同,将同组的Key传给reduce()执行
自定义分组生效: job.setGroupingComparatorClass(OrderGroupingComparator.class);