博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
mapreduce——实现网站PV排序两种思路
阅读量:6214 次
发布时间:2019-06-21

本文共 12460 字,大约阅读时间需要 41 分钟。

hot3.png

业务需求:

1.统计request.dat中每个页面被访问的总次数,同时,要求输出结果文件中的数据按照次数大小倒序排序2.统计request.dat中每个页面被访问的总次数,同时,要求输出结果文件中的数据top5

任务一:使用两个mapreduce分开计算

流程图

bfaa267bde413664ac52d9a3db6f16c2ff3.jpg

关键技术点:

mapreduce程序内置了一个排序机制:

map work和reduce work,都会对数据按照key的大小来排序

所以最终的输出结果中,一定是按照key有顺序的结果

思路:

本案例中,就可以利用这个机制来实现需求:

1.先写一个mapreduce程序,将每个页面的访问总次数统计出来

2.再写第二个mapreduce程序:

map阶段:读取第一个mapreduce产生的结果文件,将每一条数据解析成一个Java对象URLCountBean(封装一个URL和它的总次数),然后将这个对象作为key,null作为value返回。

要点:这个Java对象要实现WritableComparable接口,以让worker可以调用对象的compareTo方法来实现排序。

reduce阶段:由于worker已经对收到的数据按照URLCountBean的compareTo方法排序,所以,在reduce方法中,只要将数据输出即可,最后的结果自然是按照总次数大小的有序结果。

测试数据:访问日期+网站

2019/05/29 qq.com/a2019/05/29 qq.com/bx2019/05/29 qq.com/by2019/05/29 qq.com/by32019/05/29 qq.com/news2019/05/29 sina.com/news/socail2019/05/29 163.com/ac2019/05/29 sina.com/news/socail2019/05/29 163.com/sport2019/05/29 163.com/ac2019/05/29 sina.com/play2019/05/29 163.com/sport2019/05/29 163.com/ac2019/05/29 sina.com/movie2019/05/29 sina.com/play2019/05/29 sina.com/movie2019/05/29 163.com/sport2019/05/29 sina.com/movie2019/05/29 163.com/ac2019/05/29 163.com/ac2019/05/29 163.com/acc2019/05/29 qq.com/by2019/05/29 qq.com/by32019/05/29 qq.com/news2019/05/29 163.com/sport2019/05/29 sina.com/news/socail2019/05/29 163.com/sport2019/05/29 sina.com/movie2019/05/29 sina.com/news/socail2019/05/29 sina.com/movie2019/05/29 qq.com/news2019/05/29 163.com/bb2019/05/29 163.com/cc2019/05/29 sina.com/lady/2019/05/29 163.com/cc2019/05/29 qq.com/news2019/05/29 qq.com/by2019/05/29 qq.com/by32019/05/29 sina.com/lady/2019/05/29 qq.com/by32019/05/29 sina.com/lady/2019/05/29 qq.com/by32019/05/29 qq.com/news2019/05/29 qq.com/by32019/05/29 163.com/sport2019/05/29 163.com/sport2019/05/29 sina.com/news/socail2019/05/29 sina.com/lady/2019/05/29 sina.com/play2019/05/29 sina.com/movie2019/05/29 sina.com/music2019/05/29 sina.com/sport2019/05/29 sina.com/sport2019/05/29 163.com/sport2019/05/29 sina.com/news/socail2019/05/29 sohu.com/lady/2019/05/29 sohu.com/play2019/05/29 sohu.com/movie2019/05/29 sohu.com/music2019/05/29 sohu.com/sport2019/05/29 sohu.com/sport2019/05/29 sina.com/news/socail2019/05/29 baidu.com/lady/2019/05/29 baidu.com/play2019/05/29 baidu.com/movie2019/05/29 baidu.com/music2019/05/29 baidu.com/movie2019/05/29 baidu.com/music2019/05/29 baidu.com/movie2019/05/29 baidu.com/music2019/05/29 baidu.com/movie2019/05/29 baidu.com/music2019/05/29 baidu.com/movie2019/05/29 baidu.com/music2019/05/29 baidu.com/music2019/05/29 baidu.com/movie2019/05/29 baidu.com/music2019/05/29 baidu.com/sport2019/05/29 baidu.com/sport

编写代码实现

PageCount 序列化

public class PageCount implements WritableComparable
{ private String page; private int count; public void set(String page, int count) { this.page = page; this.count = count; } public String getPage() { return page; } public void setPage(String page) { this.page = page; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } public int compareTo(PageCount o) { return o.getCount()-this.count==0 ?this.page.compareTo(o.getPage()) :o.getCount()-this.count; } public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(this.page); dataOutput.writeInt(this.count); } public void readFields(DataInput dataInput) throws IOException { this.page=dataInput.readUTF(); this.count=dataInput.readInt(); } @Override public String toString() { return "PageCount{" + "page='" + page + '\'' + ", count=" + count + '}'; }}

1.先写一个mapreduce程序,将每个页面的访问总次数统计出来

public class PageCountStep1 {    public static class PageCountStep1Mapper extends Mapper
{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line=value.toString(); String[] split=line.split(" "); context.write(new Text(split[1]),new IntWritable(1)); } } public static class PageCountStep1Reduce extends Reducer
{ @Override protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { int count=0; for(IntWritable v:values){ count+=v.get(); } context.write(key,new IntWritable(count)); } } public static void main(String[] args)throws Exception { //加载配置文件 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(PageCountStep1.class); job.setMapperClass(PageCountStep1Mapper.class); job.setReducerClass(PageCountStep1Reduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path("f:/mrdata/url/input")); FileOutputFormat.setOutputPath(job, new Path("f:/mrdata/url/outputout")); boolean res = job.waitForCompletion(true); System.exit(res?0:1); }}

2.再写第二个mapreduce程序对结果进行排序

public class PageCountStep2 {    public static class PageCountStep2Mapper extends Mapper
{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split("\t"); PageCount pageCount = new PageCount(); pageCount.set(split[0],Integer.parseInt(split[1])); context.write(pageCount,NullWritable.get()); } } public static class PageCountStep2Reduce extends Reducer
{ @Override protected void reduce(PageCount key, Iterable
values, Context context) throws IOException, InterruptedException { context.write(key,NullWritable.get()); } } public static void main(String[] args)throws Exception { //加载配置文件 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(PageCountStep1.class); job.setMapperClass(PageCountStep2Mapper.class); job.setReducerClass(PageCountStep2Reduce.class); job.setMapOutputKeyClass(PageCount.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(PageCount.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path("f:/mrdata/url/outputout")); FileOutputFormat.setOutputPath(job, new Path("f:/mrdata/url/sortout")); boolean res = job.waitForCompletion(true); System.exit(res?0:1); }}

输出结果

1.将每个页面的访问总次数统计出来

163.com/ac	5163.com/acc	1163.com/bb	1163.com/cc	2163.com/sport	8baidu.com/lady/	1baidu.com/movie	6baidu.com/music	7baidu.com/play	1baidu.com/sport	2qq.com/a	1qq.com/bx	1qq.com/by	3qq.com/by3	6qq.com/news	5sina.com/lady/	4sina.com/movie	6sina.com/music	1sina.com/news/socail	7sina.com/play	3sina.com/sport	2sohu.com/lady/	1sohu.com/movie	1sohu.com/music	1sohu.com/play	1sohu.com/sport	2

2.对结果进行排序输出

PageCount{page='163.com/sport', count=8}PageCount{page='baidu.com/music', count=7}PageCount{page='sina.com/news/socail', count=7}PageCount{page='baidu.com/movie', count=6}PageCount{page='qq.com/by3', count=6}PageCount{page='sina.com/movie', count=6}PageCount{page='163.com/ac', count=5}PageCount{page='qq.com/news', count=5}PageCount{page='sina.com/lady/', count=4}PageCount{page='qq.com/by', count=3}PageCount{page='sina.com/play', count=3}PageCount{page='163.com/cc', count=2}PageCount{page='baidu.com/sport', count=2}PageCount{page='sina.com/sport', count=2}PageCount{page='sohu.com/sport', count=2}PageCount{page='163.com/acc', count=1}PageCount{page='163.com/bb', count=1}PageCount{page='baidu.com/lady/', count=1}PageCount{page='baidu.com/play', count=1}PageCount{page='qq.com/a', count=1}PageCount{page='qq.com/bx', count=1}PageCount{page='sina.com/music', count=1}PageCount{page='sohu.com/lady/', count=1}PageCount{page='sohu.com/movie', count=1}PageCount{page='sohu.com/music', count=1}PageCount{page='sohu.com/play', count=1}

任务二:使用两个map缓存来充当中间件,做计数使用,适合数据量小的场景使用

分析图

946809a235623f04566ceed14061e5ac939.jpg

PageCount对象实体

public class PageCount implements Comparable
{ private String page; private int count; public void set(String page, int count) { this.page = page; this.count = count; } public String getPage() { return page; } public void setPage(String page) { this.page = page; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } public int compareTo(PageCount o) { return o.getCount()-this.count==0 ?this.page.compareTo(o.getPage()) :o.getCount()-this.count; }}

PageTopMapper:读取数据

import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class PageTopMapper extends Mapper
{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] split = line.split(" "); context.write(new Text(split[1]),new IntWritable(1)); }}

PageTopReducer:对数据进行分析统计

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;import java.util.Map;import java.util.Set;import java.util.TreeMap;public class PageTopReducer extends Reducer
{ TreeMap
treeMap=new TreeMap
(); @Override protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { int count =0; for(IntWritable value :values){ count +=value.get(); } PageCount pageCount = new PageCount(); pageCount.set(key.toString(),count); treeMap.put(pageCount,null); } /** * 所有数据处理完调用 */ @Override protected void cleanup(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); int top = conf.getInt("top.n", 5); Set
> entrySet = treeMap.entrySet(); int i=0; for (Map.Entry
entry:entrySet) { context.write(new Text(entry.getKey().getPage()),new IntWritable(entry.getKey().getCount())); i++; //取到排名前n条 if(i==top) return; } }}

JobSubmitter:启动程序

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.util.Properties;public class JobSubmitter {    public static void main(String[] args) throws Exception{        //加载配置文件        Configuration conf = new Configuration();        //1.直接给定        //conf.setInt("top.n",3);        //2.main方法传参数        //conf.setInt("top.n",Integer.parseInt(args[0]));        //3.通过配置文件        Properties props = new Properties();        props.load(JobSubmitter.class.getClassLoader().                getResourceAsStream("topn.properties"));        conf.setInt("top.n",Integer.parseInt(props.getProperty("top.n")));        //4.xml形式,直接配置,默认加载        Job job = Job.getInstance(conf);        job.setJarByClass(JobSubmitter.class);        job.setMapperClass(PageTopMapper.class);        job.setReducerClass(PageTopReducer.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(IntWritable.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(IntWritable.class);        FileInputFormat.setInputPaths(job, new Path("f:/mrdata/url/input"));        FileOutputFormat.setOutputPath(job, new Path("f:/mrdata/url/output"));        //job.setNumReduceTasks(3);        boolean res = job.waitForCompletion(true);        System.exit(res?0:1);    }}

统计结果输出:top5

163.com/sport    8

baidu.com/music    7
sina.com/news/socail    7
baidu.com/movie    6
qq.com/by3    6

版权

转载于:https://my.oschina.net/u/3995125/blog/3055437

你可能感兴趣的文章
tcc分布式事务框架源码解析系列(二)之环境搭建
查看>>
iOS开发 计时器的实现方式
查看>>
第一节 介绍
查看>>
flex快速学习
查看>>
我的webpack优化总结
查看>>
说说如何在 Virtual Box 中安装 CentOS
查看>>
解决Redis集群条件下键空间通知服务器接收不到消息的问题
查看>>
小米6.0系统手机一键激活XPOSED框架的教程
查看>>
更新cocoapods 遇到的坑
查看>>
程序员简历怎么写?ResumeX — 一个极简主义且独具创意的简历方案
查看>>
Android使用freetype
查看>>
webservice
查看>>
漫画“逼宫”网文,IP经济生变在即
查看>>
笔记:Android 状态栏攻略
查看>>
Btrace
查看>>
关于双重循环思想思路
查看>>
产品设计阶段中的设计准则
查看>>
RxJS的学习之路二(观察者模式和迭代器模式)
查看>>
IntelliJ IDEA 同一项目使用多个git管理多个分支(不同项目 多模块 pull 更新 push 推送 操作)...
查看>>
开源 iOS 性能优化/调试辅助工具集 MTHawkeye
查看>>