hadoop处理时序预测,hadoopyarn一级调度管理
墨初 知识笔记 39阅读
MapReduce原理分析 什么是MapReduce
Map与Reduce的WordsCount案例与日志查看 引入依赖
前言如果想知道一堆牌中有多少张红桃直接的方式是一张张的检查并数出有多少张红桃。
而MapReduce的方法是给所有的节点分配这堆牌让每个节点计算自己手中有几张是红桃然后将这个数汇总得到结果。
shuffle是一个过程贯穿map和reduce通过网络将map产生的数据放到reduce。

<?xml version1.0 encodingUTF-8?><project xmlns xmlns:xsi xsi:schemaLocation <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.14</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.hx</groupId> <artifactId>hadoopDemo1</artifactId> <version>0.0.1-SNAPSHOT</version> <name>hadoopDemo1</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.3.0</version> <scope>provided</scope> </dependency> </dependencies></project>
编码 import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/** * author Huathy * date 2023-10-21 21:17 * description 组装任务 */public class WordCountJob { public static void main(String[] args) throws Exception { System.out.println(inputPath > args[0]); System.out.println(outputPath > args[1]); String path args[0]; String path2 args[1]; // job需要的配置参数 Configuration configuration new Configuration(); // 创建job Job job Job.getInstance(configuration, wordCountJob); // 注意这一行必须设置否则在集群的时候将无法找到Job类 job.setJarByClass(WordCountJob.class); // 指定输入文件 FileInputFormat.setInputPaths(job, new Path(path)); FileOutputFormat.setOutputPath(job, new Path(path2)); job.setMapperClass(WordMap.class); job.setReducerClass(WordReduce.class); // 指定map相关配置 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 指定reduce job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 提交任务 job.waitForCompletion(true); } /** * author Huathy * date 2023-10-21 21:39 * description 创建自定义映射类 * 定义输入输出类型 */ public static class WordMap extends Mapper<LongWritable, Text, Text, LongWritable> { /** * 需要实现map函数 * 这个map函数就是可以接受keyInvalueIn产生keyOut、ValueOut * * param k1 * param v1 * param context * throws IOException * throws InterruptedException */ Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { // k1表示每行的行首偏移量v1表示每一行的内容 // 对获取到的每一行数据进行切割把单词切割出来 String[] words v1.toString().split(\W); // 迭代切割的单词数据 for (String word : words) { // 将迭代的单词封装为<k2,v2>的形式 Text k2 new Text(word); System.out.println(k2: k2.toString()); LongWritable v2 new LongWritable(1); // 将<k2,v2>输出 context.write(k2, v2); } } } /** * author Huathy * date 2023-10-21 22:08 * description 自定义的reducer类 */ public static class WordReduce extends Reducer<Text, LongWritable, Text, LongWritable> { /** * 针对v2s的数据进行累加求和并且把最终的数据转为k3,v3输出 * * param k2 * param v2s * param context * throws IOException * throws InterruptedException */ Override protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException { long sum 0L; for (LongWritable v2 : v2s) { sum v2.get(); } // 组装K3,V3 LongWritable v3 new LongWritable(sum); System.out.println(k3: k2.toString() -- v3: v3.toString()); context.write(k2, v3); } }}
运行命令与输出日志 [rootcent7-1 hadoop-3.2.4]# hadoop jar wc.jar WordCountJob hdfs://cent7-1:9000/hello.txt hdfs://cent7-1:9000/out /home/hadoop-3.2.4/wc.jarinputPath > hdfs://cent7-1:9000/hello.txtoutputPath > hdfs://cent7-1:9000/outset jar > /home/hadoop-3.2.4/wc.jar2023-10-22 15:30:34,183 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:80322023-10-22 15:30:35,183 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.2023-10-22 15:30:35,342 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1697944187818_00102023-10-22 15:30:36,196 INFO input.FileInputFormat: Total input files to process : 12023-10-22 15:30:37,320 INFO mapreduce.JobSubmitter: number of splits:12023-10-22 15:30:37,694 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1697944187818_00102023-10-22 15:30:37,696 INFO mapreduce.JobSubmitter: Executing with tokens: []2023-10-22 15:30:38,033 INFO conf.Configuration: resource-types.xml not found2023-10-22 15:30:38,034 INFO resource.ResourceUtils: Unable to find resource-types.xml.2023-10-22 15:30:38,188 INFO impl.YarnClientImpl: Submitted application application_1697944187818_00102023-10-22 15:30:38,248 INFO mapreduce.Job: The url to track the job: 15:30:38,249 INFO mapreduce.Job: Running job: job_1697944187818_00102023-10-22 15:30:51,749 INFO mapreduce.Job: Job job_1697944187818_0010 running in uber mode : false2023-10-22 15:30:51,751 INFO mapreduce.Job: map 0% reduce 0 23-10-22 15:30:59,254 INFO mapreduce.Job: map 100% reduce 0 23-10-22 15:31:08,410 INFO mapreduce.Job: map 100% reduce 100 23-10-22 15:31:09,447 INFO mapreduce.Job: Job job_1697944187818_0010 completed successfully2023-10-22 15:31:09,578 INFO mapreduce.Job: Counters: 54File System CountersFILE: Number of bytes read129FILE: Number of bytes written479187FILE: Number of read operations0FILE: Number of large read operations0FILE: Number of write operations0HDFS: Number of bytes read139HDFS: Number of bytes written35HDFS: Number of read operations8HDFS: Number of large read operations0HDFS: Number of write operations2HDFS: Number of bytes read erasure-coded0Job Counters Launched map tasks1Launched reduce tasks1Data-local map tasks1Total time spent by all maps in occupied slots (ms)4916Total time spent by all reduces in occupied slots (ms)5821Total time spent by all map tasks (ms)4916Total time spent by all reduce tasks (ms)5821Total vcore-milliseconds taken by all map tasks4916Total vcore-milliseconds taken by all reduce tasks5821Total megabyte-milliseconds taken by all map tasks5033984Total megabyte-milliseconds taken by all reduce tasks5960704Map-Reduce FrameworkMap input records4Map output records8Map output bytes107Map output materialized bytes129Input split bytes94Combine input records0Combine output records0Reduce input groups5Reduce shuffle bytes129Reduce input records8Reduce output records5Spilled Records16Shuffled Maps 1Failed Shuffles0Merged Map outputs1GC time elapsed (ms)259CPU time spent (ms)2990Physical memory (bytes) snapshot528863232Virtual memory (bytes) snapshot5158191104Total committed heap usage (bytes)378011648Peak Map Physical memory (bytes)325742592Peak Map Virtual memory (bytes)2575839232Peak Reduce Physical memory (bytes)203120640Peak Reduce Virtual memory (bytes)2582351872Shuffle ErrorsBAD_ID0CONNECTION0IO_ERROR0WRONG_LENGTH0WRONG_MAP0WRONG_REDUCE0File Input Format Counters Bytes Read45File Output Format Counters Bytes Written35[rootcent7-1 hadoop-3.2.4]#
MapReduce任务日志查看 开启yarn日志聚合功能将散落在nodemanager节点的日志统一收集管理方便查看修改yarn-site.xml中的yarn.log-aggregation-enable和yarn.log.server.url <property> <name>yarn.log-aggregation-enable</name> <value>true</value></property><property> <name>yarn.log.server.url</name> <value> 启动historyserver sbin/mr-jobhistory-daemon.sh start historyserver
UI界面查看 访问 点击History
点进Successful
看到成功记录点击logs可以看到成功日志
停止Hadoop集群中的任务 CtrlC退出终端并不会结束任务因为任务已经提交到了Hadoop
查看任务列表yarn application -list
结束任务进程yarn application -kill [application_Id]
# 查看正在进行的任务列表[rootcent7-1 hadoop-3.2.4]# yarn application -list2023-10-22 16:18:38,756 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032Total number of applications (application-types: [], states: [SUBMITTED, ACCEPTED, RUNNING] and tags: []):1 Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URLapplication_1697961350721_0002 wordCountJob MAPREDUCE root default ACCEPTED UNDEFINED 0% N/A# 结束任务[rootcent7-1 hadoop-3.2.4]# yarn application -kill application_1697961350721_00022023-10-22 16:18:55,669 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032Killing application application_1697961350721_00022023-10-22 16:18:56,795 INFO impl.YarnClientImpl: Killed application application_1697961350721_0002
Hadoop序列化机制 序列化机制作用
上面可以看出Hadoop运行的时候大多数IO操作。我们在编写Hadoop的Map和Reduce代码的时候用的都是Hadoop官方提供的数据类型Hadoop官方对序列化做了优化只会序列化核心内容来减少IO开销。
Hadoop序列化机制的特点 紧凑高效的使用存储空间快速读写数据的额外开销小可扩展可透明的读取老格式的数据互操作支持多语言操作 Java序列化的不足 不够精简附加信息多不适合随机访问存储空间占用大递归输出类的父类描述直到不再有父类扩展性差Hadoop中的Writable可以方便用户自定义 资源管理器Yarn详解 Yarn目前支持三种调度器针对任务的调度器 FIFO Scheduler先进先出调度策略工作中存在实时任务和离线任务先进先出可能不太适合业务CapacityScheduler可以看作是FIFO的多队列版本。可以分成多个队列每个队列里面是先进先出的。FairScheduler多队列多用户共享资源。公平任务调度建议使用。
标签: