您现在的位置:首页 >> 前端 >> 内容

MapReduce深入操作

时间:2016/12/6 8:14:00 点击:

  核心提示:一、自定义数据类型在设计的系统开发过程之中,有可能要参与分析的文件会有很多,并且文件的组成结构也可能会非常的复杂,所以来讲在整个的Hadoop里面可以针对于用户的需求实现自定义类型。 现在假如说有以下...

一、自定义数据类型

在设计的系统开发过程之中,有可能要参与分析的文件会有很多,并且文件的组成结构也可能会非常的复杂,所以来讲在整个的Hadoop里面可以针对于用户的需求实现自定义类型。 现在假如说有以下一个信息文件,文件的组成格式(购物统计):

用户名[0]、省份[1]、城市[2]、购买日期时间[3]、商品名称[4]、商品分类[5]、商品子分类[6]、商品价格[7]、商品购买价格[8]

希望可以通过一种数据类型能够描述出以下几种关系:

1、保留有省份的花销(商品原始价格、成交价格、折扣的价格);

2、保留有用户的花销(商品原始价格、成交价格、折扣的价格);

3、保留有商品分类的花销(商品原始价格、成交价格、折扣的价格);

在整个的MapReduce之中并不会提供有任何的一种类型来描述这种花销的结构,那么这样的话就需要定义一个自己的数据类型,而所有的数据类型一定要实现一个接口:org.apache.hadoop.io.Writable;

/**
* 实现了自定义的记录数据价格的结构
* @author mldn
*/
public class RecordWritable implements Writable {
    private double price ; // 商品的原始价格
    private double deal ; // 商品的成交价格
    private double discount ; // 商品的折扣价格
    public RecordWritable() {
        // RecordWritable类需要在读取的时候执行数据反序列化操作;
    }
    // RecordWritable类要在数据创建的时候接收内容
    public RecordWritable(double price, double deal, double discount) {
        super();
        this.price = price;
        this.deal = deal;
        this.discount = discount;
    }
    @Override
    public void write(DataOutput output) throws IOException {
        output.writeDouble(this.price);
        output.writeDouble(this.deal);
        output.writeDouble(this.discount);
    }
    @Override
    public void readFields(DataInput input) throws IOException {
        this.price = input.readDouble() ;
        this.deal = input.readDouble() ;
        this.discount = input.readDouble() ;
    }
    @Override
    public String toString() {
        return "RecordWritable [price=" + price + ", deal=" + deal + ", discount=" + discount + "]";
    }
    public double getPrice() {
        return price;
    }
    public double getDeal() {
        return deal;
    }
    public double getDiscount() {
        return discount;
    }
}

最终的结果一定要对数据进行MapReduce 操作统计,所以在整个的统计里面,现在希望可以实现这样的操作统计:

· 可以实现根据省份数据得到的统计结果;

定义Map 数据处理

/**
* 针对于每一行发送的数据进行拆分处理,将每一行的数据拆分为key与value的形式<br>
* 输入类型:关注的是value的内容;<br>
* 输出类型:单词名称=数量(1)<br>
* @author mldn
*/
private static class RecordMapper extends Mapper<Object, Text, Text, RecordWritable> {
    @Override
    protected void map(Object key, Text value, Mapper<Object, Text, Text, RecordWritable>.Context context) throws       IOException, InterruptedException {
        // 读取每一行的数据,Map是根据换行作为读取的分割符;
        String str = value.toString(); // 取的每一行的内容
        // 所有的单词要求按照空格进行拆分
        String result[] = str.split(","); // 按照“,”拆分
        // 以后如果要根据不同的方式来进行统计,则此处可以取得不同的内容
        String province = result[1] ; // 取得省份的信息内容
        double price = Double.parseDouble(result[7]) ; // 取得原始价格
        double deal = Double.parseDouble(result[8]) ; // 成交价格
        RecordWritable rw = new RecordWritable(price, deal, price-deal) ;
        context.write(new Text(province), rw); // 将数据取出
    }
}

定义Reduce 处理类

private static class RecordReducer extends Reducer<Text, RecordWritable, Text, RecordWritable> {
    @Override
    protected void reduce(Text key, Iterable<RecordWritable> values,Reducer<Text, RecordWritable, Text, RecordWritable>.Context context)
        // 计算出消费的总价格数据
        double priceSum = 0.0 ;
        double dealSum = 0.0 ;
        double discountSum = 0.0 ;
        for (RecordWritable rw : values) {
            priceSum += rw.getPrice() ;
            dealSum += rw.getDeal() ;
            discountSum += rw.getDiscount() ;
        }
        RecordWritable rw = new RecordWritable(priceSum, dealSum, discountSum) ;
        context.write(key, rw);
    }
}

定义相关的作业进行处理

public class Record {
    private static final String OUTPUT_PATH = "hdfs://192.168.122.132:9000/output-" ;
    // 具体的输入的路径由用户自己来输入,而我们来定义一个属于自己的输出路径,格式“output-20201010”
    public static void main(String[] args) throws Exception {
        if (args.length != 1) { // 现在输入的参数个数不足,那么则应该退出程序
        System.out.println("本程序的执行需要两个参数:HDFS输入路径");
        System.exit(1); // 程序退出
        }
        Configuration conf = new Configuration() ; // 此时需要进行HDFS操作
        String paths [] = new GenericOptionsParser(conf,args).getRemainingArgs() ; //将输入的两个路径解析为HDFS的路径
        // 需要定义一个描述作业的操作类
        Job job = Job.getInstance(conf, "hadoop") ;
        job.setJarByClass(Record.class); // 定义本次作业执行的类的名称
        job.setMapperClass(RecordMapper.class); // 定义本次作业完成所需要的Mapper程序类
        job.setMapOutputKeyClass(Text.class); // 定义Map输出数据的Key的类型
        job.setMapOutputValueClass(RecordWritable.class); // 定义Map输出数据的Value的类型
        job.setReducerClass(RecordReducer.class); // 定义要使用的Reducer程序处理类
        job.setOutputKeyClass(Text.class); // 最终输出统计结果的key的类型
        job.setOutputValueClass(RecordWritable.class); // 最终输出统计结果的value的类型
        // 所有的数据都在HDFS上进行操作,那么就必须使用HDFS提供的程序进行数据的输入配置
        FileInputFormat.addInputPath(job, new Path(paths[0])); // 定义输入的HDFS路径
        // 输出路径可以自己进行定义
        FileOutputFormat.setOutputPath(job,
        new Path(OUTPUT_PATH + new SimpleDateFormat("yyyyMMdd").format(new
        Date())));// 定义统计结果的输出路径
        System.exit(job.waitForCompletion(true) ? 0 : 1); // 程序执行完毕后要进行退出
    }
}

Tags:MA AP PR RE 
作者:网络 来源:u012737182