电商推荐系统

news2024/11/20 16:32:44

在这里插入图片描述

此篇博客主要记录一下商品推荐系统的主要实现过程。

一、获取用户对商品的偏好值

在这里插入图片描述

代码实现

package zb.grms;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.math.BigDecimal;

public class GoodsStep1 extends Configured implements Tool {
    public static void main(String[] args) {
        try {
            ToolRunner.run(new GoodsStep1(), args);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static class GS1Mapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] split = value.toString().split(",");
            // 将行为转化为偏好值
            double like = 0.0;
            if (split.length >= 3) {
                String str = split[2].toLowerCase();
                if (str.equals("paysuccess")) { // 支付成功
                    like = 0.3;
                } else if (str.equals("addreview")) { //评论
                    like = 0.3;
                } else if (str.equals("createorder")) { // 创建订单
                    like = 0.2;
                } else if (str.equals("addcar")){ // 加入购物车
                    like = 0.15;
                } else { // 浏览
                    like = 0.05;
                }
            }
            // key=用户:商品 value=[偏好,偏好]
            Text outkey = new Text(split[0] + ":" + split[1]);
            DoubleWritable outvalue = new DoubleWritable(like);
            context.write(outkey, outvalue);
        }
    }

    public static class GS1Reducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
        @Override
        protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
            // 避免精度丢失选用bigDecimal
            BigDecimal sum = new BigDecimal(0.0);
            for (DoubleWritable value : values) {
                BigDecimal v = new BigDecimal(value.get());
                sum = sum.add(v);
            }
            DoubleWritable outvalue = new DoubleWritable(sum.doubleValue());
            context.write(key, outvalue);
        }
    }

    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
        Job job = Job.getInstance(conf, "step1");
        job.setJarByClass(this.getClass());
        // 2、装配map,指定map对象,map对象的输出key和value
        job.setMapperClass(GoodsStep1.GS1Mapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(DoubleWritable.class);
        //  默认reduce
        job.setReducerClass(GoodsStep1.GS1Reducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);
        // 输入分片类型
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path("data/userLog.log"));
        // 5、指定输出文件,文件类型, 文件路径
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step1"));
        // 6、运行
        job.waitForCompletion(true);
        return 0;
    }
}

二、将偏好数据整理成偏好矩阵

在这里插入图片描述
在这里插入图片描述

代码实现

package zb.grms;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

public class GoodsStep2 extends Configured implements Tool {

    public static class GS2Mapper extends Mapper<Text, Text, Text, Text> {
        @Override
        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
            String uid_gid = key.toString();
            String[] split = uid_gid.split(":");
            // 将商品id作为输出key
            Text outkey = new Text(split[1]);
            // 将用户id与偏好值组合形成value
            Text outvalue = new Text(split[0] + ":" + value.toString());
            context.write(outkey, outvalue);
        }
    }

    public static class GS2Reducer extends Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            StringBuffer buffer = new StringBuffer();
            for (Text value : values) {
                buffer.append(value.toString()).append(",");
            }
            buffer.setLength(buffer.length() - 1);
            context.write(key, new Text(buffer.toString()));
        }
    }

    public static void main(String[] args) {
        try {
            ToolRunner.run(new GoodsStep2(), args);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
        conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator","\t");
        Job job = Job.getInstance(conf, "step2");
        job.setJarByClass(this.getClass());
        // 2、装配map,指定map对象,map对象的输出key和value
        job.setMapperClass(GoodsStep2.GS2Mapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        //  默认reduce
        job.setReducerClass(GoodsStep2.GS2Reducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        // 输入分片类型
        job.setInputFormatClass(KeyValueTextInputFormat.class);
        KeyValueTextInputFormat.addInputPath(job, new Path("src/main/resources/step1/part-r-00000"));
        // 5、指定输出文件,文件类型, 文件路径
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step2"));
        // 6、运行
        job.waitForCompletion(true);
        return 0;
    }
}

三、统计商品共现次数

在这里插入图片描述

代码实现

笛卡尔积
package zb.grms;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.ArrayList;


public class GoodsStep3 extends Configured implements Tool {
    public static class GS3Mapper extends Mapper<Text, Text, Text, Text> {
        @Override
        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
            String uid_gid = key.toString();
            String[] split = uid_gid.split(":");
            Text outkey = new Text(split[0]);
            Text outvalue = new Text(split[1]);
            context.write(outkey, outvalue);
        }
    }

    public static class GS3Reducer extends Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            ArrayList<String> list = new ArrayList<>();
            for (Text value:values) {
                list.add(value.toString());
            }

            for (String g1 : list) {
                for (String g2:list) {
                    if (!g1.equals(g2)) {
                        Text outkey = new Text(g1);
                        Text outvalue = new Text(g2);
                        context.write(outkey, outvalue);
                    }
                }
            }
        }
    }

    public static void main(String[] args) {
        try {
            ToolRunner.run(new GoodsStep3(), args);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
        conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator","\t");
        Job job = Job.getInstance(conf, "step3");
        job.setJarByClass(this.getClass());
        // 2、装配map,指定map对象,map对象的输出key和value
        job.setMapperClass(GS3Mapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        //  默认reduce
        job.setReducerClass(GS3Reducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        // 输入分片类型
        job.setInputFormatClass(KeyValueTextInputFormat.class);
        KeyValueTextInputFormat.addInputPath(job, new Path("src/main/resources/step1/part-r-00000"));
        // 5、指定输出文件,文件类型, 文件路径
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step3"));
        // 6、运行
        job.waitForCompletion(true);
        return 0;
    }
}
共现次数
package zb.grms;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

public class GoodsStep4 extends Configured implements Tool {

    public static class GS4Mapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] split = value.toString().split("\t");
            String outkey = split[0] + ":" + split[1];
            context.write(new Text(outkey), new IntWritable(1));
        }
    }

    public static class GS4Reducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable i : values) {
                sum += 1;
            }
            context.write(key, new IntWritable(sum));
        }
    }

    public static void main(String[] args) {
        try {
            ToolRunner.run(new GoodsStep4(), args);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
        Job job = Job.getInstance(conf, "step4");
        job.setJarByClass(this.getClass());
        // 2、装配map,指定map对象,map对象的输出key和value
        job.setMapperClass(GS4Mapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //  默认reduce
        job.setReducerClass(GS4Reducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 输入分片类型
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path("src/main/resources/step3/part-r-00000"));
        // 5、指定输出文件,文件类型, 文件路径
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step4"));
        // 6、运行
        job.waitForCompletion(true);
        return 0;
    }
}

四、获取商品共现矩阵

在这里插入图片描述

代码实现

package zb.grms;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

public class GoodsStep5 extends Configured implements Tool {
    public static class GS5Mapper extends Mapper<Text, Text, Text, Text> {
        @Override
        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
            String goods = key.toString();
            String[] split = goods.split(":");
            // key为第一列商品,value为第二列商品:次数
            Text outkey = new Text(split[0]);
            Text outvalue = new Text(split[1] + ":" + value.toString());
            context.write(outkey, outvalue);
        }
    }

    public static class GS5Reducer extends Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            StringBuffer buffer = new StringBuffer();
            for (Text value : values) {
                buffer.append(value.toString()).append(",");
            }
            buffer.setLength(buffer.length() - 1);
            context.write(key, new Text(buffer.toString()));
        }
    }

    public static void main(String[] args) {
        try {
            ToolRunner.run(new GoodsStep5(), args);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
        Job job = Job.getInstance(conf, "step5");
        job.setJarByClass(this.getClass());
        // 2、装配map,指定map对象,map对象的输出key和value
        job.setMapperClass(GS5Mapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        //  默认reduce
        job.setReducerClass(GS5Reducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        // 输入分片类型
        job.setInputFormatClass(KeyValueTextInputFormat.class);
        KeyValueTextInputFormat.addInputPath(job, new Path("src/main/resources/step4/part-r-00000"));
        // 5、指定输出文件,文件类型, 文件路径
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step5"));
        // 6、运行
        job.waitForCompletion(true);
        return 0;
    }
}

五、获取推荐值

在这里插入图片描述

代码实现

package zb.grms;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;

public class GoodsStep6 extends Configured implements Tool {
    public static void main(String[] args) {
        try {
            ToolRunner.run(new GoodsStep6(), args);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 第二步
     * 375	11:0.25,5:0.25,4:0.55
     * 商品 用户:偏好值
     * 第五步
     * 375	203:1,961:1,91:1,90:2,89:1
     * 商品 商品:共现次数
     * 输出数据:
     * 用户:商品 推荐值
     */
    public static class GS6Mapper extends Mapper<Text, Text, Text, Text> {
        @Override
        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
            String[] split = value.toString().split(",");
            for (String str : split) {
                // key=商品 value={用户:偏好值,商品:共现次数}
                context.write(key, new Text(str));
            }
        }
    }

    public static class GS6Reducer extends Reducer<Text, Text, Text, DoubleWritable> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            // 偏好集合[用户:偏好]
            HashMap<String, String> like = new HashMap<>();
            // 共现集合[商品:共现次数]
            HashMap<String, String> same = new HashMap<>();
            for (Text value : values) {
                String data = value.toString();
                String[] split = data.split(":");
                if (split[1].contains(".")) {
                    like.put(split[0], split[1]);
                } else {
                    same.put(split[0], split[1]);
                }
            }
            for (Map.Entry<String, String> l : like.entrySet()) {
                for (Map.Entry<String, String> s : same.entrySet()) {
                    //用户偏好值
                    BigDecimal lvalue = new BigDecimal(l.getValue());
                    //商品共现
                    BigDecimal svalue = new BigDecimal(s.getValue());
                    //用户:共现商品
                    Text outkey = new Text(l.getKey() + ":" + s.getKey());
                    double outvalue = lvalue.multiply(svalue).doubleValue();
                    context.write(outkey, new DoubleWritable(outvalue));
                }
            }
        }
    }

    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
        Job job = Job.getInstance(conf, "step6");
        job.setJarByClass(this.getClass());
        // 2、装配map,指定map对象,map对象的输出key和value
        job.setMapperClass(GoodsStep6.GS6Mapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        //  默认reduce
        job.setReducerClass(GoodsStep6.GS6Reducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);
        // 输入分片类型
        job.setInputFormatClass(KeyValueTextInputFormat.class);
        KeyValueTextInputFormat.setInputPaths(job, new Path("src/main/resources/step2"),
                new Path("src/main/resources/step5"));
        // 5、指定输出文件,文件类型, 文件路径
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step6"));
        // 6、运行
        job.waitForCompletion(true);
        return 0;
    }
}

六、推荐值累加及数据清洗

在这里插入图片描述

代码实现

推荐值累加
package zb.grms;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.math.BigDecimal;

public class GoodsStep7 extends Configured implements Tool {
    public static void main(String[] args) {
        try {
            ToolRunner.run(new GoodsStep7(), args);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static class GS7Mapper extends Mapper<Text, Text, Text, Text> {
        @Override
        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
            context.write(key, value);
        }
    }

    public static class GS7Reducer extends Reducer<Text, Text, Text, DoubleWritable> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            BigDecimal sum = new BigDecimal(0.0);
            for (Text value : values) {
                BigDecimal v = new BigDecimal(value.toString());
                sum = sum.add(v);
            }
            DoubleWritable outvalue = new DoubleWritable(sum.doubleValue());
            context.write(key, outvalue);
        }
    }

    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
        Job job = Job.getInstance(conf, "step7");
        job.setJarByClass(this.getClass());
        // 2、装配map,指定map对象,map对象的输出key和value
        job.setMapperClass(GoodsStep7.GS7Mapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        //  默认reduce
        job.setReducerClass(GoodsStep7.GS7Reducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);
        // 输入分片类型
        job.setInputFormatClass(KeyValueTextInputFormat.class);
        KeyValueTextInputFormat.addInputPath(job, new Path("src/main/resources/step6"));
        // 5、指定输出文件,文件类型, 文件路径
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step7"));
        // 6、运行
        job.waitForCompletion(true);
        return 0;
    }
}
数据清洗
统计已经支付成功一次的数据
package zb.grms;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

public class GoodsStep8 extends Configured implements Tool {
    public static void main(String[] args) {
        try {
            ToolRunner.run(new GoodsStep8(), args);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static class GS8Mapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] split = value.toString().split(",");
            boolean paySuccess = split[2].toLowerCase().equals("paysuccess");
            if (paySuccess) {
                context.write(new Text(split[0] + ":" + split[1]), new IntWritable(1));
            }
        }
    }

    public static class GS8Reducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int num = 0;
            for (IntWritable i : values) {
                num ++;
            }
            if (num == 1) context.write(key, new IntWritable(num));
        }
    }

    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
        Job job = Job.getInstance(conf, "step8.1");
        job.setJarByClass(this.getClass());
        // 2、装配map,指定map对象,map对象的输出key和value
        job.setMapperClass(GoodsStep8.GS8Mapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //  默认reduce
        job.setReducerClass(GoodsStep8.GS8Reducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 输入分片类型
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path("data/userLog.log"));
        // 5、指定输出文件,文件类型, 文件路径
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step8"));
        // 6、运行
        job.waitForCompletion(true);
        return 0;
    }
}
在整理出来的数据中去除统计出来支付成功的
package zb.grms;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.Iterator;

public class GoodsStep8_2 extends Configured implements Tool {
    public static void main(String[] args) {
        try {
            ToolRunner.run(new GoodsStep8_2(), args);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static class GS8_1Mapper extends Mapper<Text, Text, Text, Text> {
        @Override
        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
            context.write(key, value);
        }
    }

    public static class GS8_1Reducer extends Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//            int num = 0;
//            String outvalue = "";
//            for (Text value : values) {
//                outvalue = value.toString();
//                num ++;
//            }
//            if (num == 1) context.write(key, new Text(outvalue));
            Iterator<Text> iter = values.iterator();
            Text outvalue = iter.next();
            if (iter.hasNext()) {}
            else {
                context.write(key, outvalue);
            }
        }
    }

    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
        Job job = Job.getInstance(conf, "step8.2");
        job.setJarByClass(this.getClass());
        // 2、装配map,指定map对象,map对象的输出key和value
        job.setMapperClass(GoodsStep8_2.GS8_1Mapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        //  默认reduce
        job.setReducerClass(GoodsStep8_2.GS8_1Reducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        // 输入分片类型
        job.setInputFormatClass(KeyValueTextInputFormat.class);
        KeyValueTextInputFormat.setInputPaths(job, new Path("src/main/resources/step7"),
                new Path("src/main/resources/step8"));
        // 5、指定输出文件,文件类型, 文件路径
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step8_2"));
        // 6、运行
        job.waitForCompletion(true);
        return 0;
    }
}

七、写入数据库

package zb.grms;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;

public class GoodsStep9 {
    public static void main(String[] args) {
        try {
            toDB();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void toDB() throws Exception {
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        Class.forName("com.mysql.cj.jdbc.Driver");
        Connection conn = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/tmall?serverTimezone=Asia/Shanghai", "briup", "briup");
        Statement statement = null;
        FSDataInputStream open = fs.open(new Path("src/main/resources/step8_2/part-r-00000"));
        BufferedReader br = new BufferedReader(new InputStreamReader(open));
        String line = "";
        while ((line = br.readLine()) != null) {
            // 11:512	0.25
            // 用户:商品 推荐值
            String[] str = line.split("\t");
            String[] uid_gid = str[0].split(":");
            statement = conn.createStatement();
            String sql = "delete from recommend where customerId = '" + uid_gid[0] + "' and bookId = '" + uid_gid[1] + "'";
            String sql2 = "insert into recommend(customerId, bookId, recommendNum) values ('"
                    + uid_gid[0] + "','" + uid_gid[1] + "'," + str[1] + ")";
            statement.addBatch(sql);
            statement.addBatch(sql2);
            statement.executeBatch();
        }
    }
}

八、工作流

package zb.grms;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class GoodsMain extends Configured implements Tool {
    public static void main(String[] args) {
        try {
            ToolRunner.run(new GoodsMain(), args);
            GoodsStep9.toDB();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
//        String inpath = new String("inpath");
//        Path path = new Path(conf.get(inpath));
        Path path = new Path("data/userLog.log");
        Path outpath =  new Path("src/main/resources/step1");
        Path outpath2 = new Path("src/main/resources/step2");
        Path outpath3 = new Path("src/main/resources/step3");
        Path outpath4 = new Path("src/main/resources/step4");
        Path outpath5 = new Path("src/main/resources/step5");
        Path outpath6 = new Path("src/main/resources/step6");
        Path outpath7 = new Path("src/main/resources/step7");
        Path outpath8 = new Path("src/main/resources/step8");
        Path outpath9 = new Path("src/main/resources/step8_2");
        //获取所有mr步骤job配置
        //step1
        Job job = Job.getInstance(conf);
        job.setJarByClass(this.getClass());
        job.setMapperClass(GoodsStep1.GS1Mapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(DoubleWritable.class);
        job.setReducerClass(GoodsStep1.GS1Reducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,path);
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,outpath);
        //step8
        Job job8 = Job.getInstance(conf);
        job8.setJarByClass(this.getClass());
        job8.setMapperClass(GoodsStep8.GS8Mapper.class);
        job8.setMapOutputKeyClass(Text.class);
        job8.setMapOutputValueClass(IntWritable.class);
        job8.setReducerClass(GoodsStep8.GS8Reducer.class);
        job8.setOutputKeyClass(Text.class);
        job8.setOutputValueClass(IntWritable.class);
        job8.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job8,path);
        job8.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job8,outpath8);
        //step2
        Job job2 = Job.getInstance(conf);
        job2.setJarByClass(this.getClass());
        job2.setMapperClass(GoodsStep2.GS2Mapper.class);
        job2.setMapOutputKeyClass(Text.class);
        job2.setMapOutputValueClass(Text.class);
        job2.setReducerClass(GoodsStep2.GS2Reducer.class);
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(Text.class);
        job2.setInputFormatClass(KeyValueTextInputFormat.class);
        KeyValueTextInputFormat.addInputPath(job2,outpath);
        job2.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job2,outpath2);
        //step3bookId
        Job job3 = Job.getInstance(conf);
        job3.setJarByClass(this.getClass());
        job3.setMapperClass(GoodsStep3.GS3Mapper.class);
        job3.setMapOutputKeyClass(Text.class);
        job3.setMapOutputValueClass(Text.class);
        job3.setReducerClass(GoodsStep3.GS3Reducer.class);
        job3.setOutputKeyClass(Text.class);
        job3.setOutputValueClass(Text.class);
        job3.setInputFormatClass(KeyValueTextInputFormat.class);
        KeyValueTextInputFormat.addInputPath(job3,outpath);
        job3.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job3,outpath3);
        //step4
        Job job4 = Job.getInstance(conf);
        job4.setJarByClass(this.getClass());
        job4.setMapperClass(GoodsStep4.GS4Mapper.class);
        job4.setMapOutputKeyClass(Text.class);
        job4.setMapOutputValueClass(IntWritable.class);
        job4.setReducerClass(GoodsStep4.GS4Reducer.class);
        job4.setOutputKeyClass(Text.class);
        job4.setOutputValueClass(IntWritable.class);
        job4.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job4,outpath3);
        job4.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job4,outpath4);
        //step5
        Job job5 = Job.getInstance(conf);
        job5.setJarByClass(this.getClass());
        job5.setMapperClass(GoodsStep5.GS5Mapper.class);
        job5.setMapOutputKeyClass(Text.class);
        job5.setMapOutputValueClass(Text.class);
        job5.setReducerClass(GoodsStep5.GS5Reducer.class);
        job5.setOutputKeyClass(Text.class);
        job5.setOutputValueClass(Text.class);
        job5.setInputFormatClass(KeyValueTextInputFormat.class);
        KeyValueTextInputFormat.addInputPath(job5,outpath4);
        job5.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job5,outpath5);
        //step6
        Job job6 = Job.getInstance(conf);
        job6.setJarByClass(this.getClass());
        job6.setMapperClass(GoodsStep6.GS6Mapper.class);
        job6.setMapOutputKeyClass(Text.class);
        job6.setMapOutputValueClass(Text.class);
        job6.setReducerClass(GoodsStep6.GS6Reducer.class);
        job6.setOutputKeyClass(Text.class);
        job6.setOutputValueClass(DoubleWritable.class);
        job6.setInputFormatClass(KeyValueTextInputFormat.class);
        KeyValueTextInputFormat.setInputPaths(job6,outpath2,outpath5);
        job6.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job6,outpath6);
        //step7
        Job job7 = Job.getInstance(conf);
        job7.setJarByClass(this.getClass());
        job7.setMapperClass(GoodsStep7.GS7Mapper.class);
        job7.setMapOutputKeyClass(Text.class);
        job7.setMapOutputValueClass(Text.class);
        job7.setReducerClass(GoodsStep7.GS7Reducer.class);
        job7.setOutputKeyClass(Text.class);
        job7.setOutputValueClass(DoubleWritable.class);
        job7.setInputFormatClass(KeyValueTextInputFormat.class);
        KeyValueTextInputFormat.setInputPaths(job7,outpath6);
        job7.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job7,outpath7);
        //step9
        Job job9 = Job.getInstance(conf);
        job9.setJarByClass(this.getClass());
        job9.setMapperClass(GoodsStep8_2.GS8_1Mapper.class);
        job9.setMapOutputKeyClass(Text.class);
        job9.setMapOutputValueClass(Text.class);
        job9.setReducerClass(GoodsStep8_2.GS8_1Reducer.class);
        job9.setOutputKeyClass(Text.class);
        job9.setOutputValueClass(Text.class);
        job9.setInputFormatClass(KeyValueTextInputFormat.class);
        KeyValueTextInputFormat.setInputPaths(job9,outpath7,outpath8);
        job9.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job9,outpath9);

        //创建可控作业
        ControlledJob cj = new ControlledJob(conf);
        cj.setJob(job);
        ControlledJob cj2 = new ControlledJob(conf);
        cj2.setJob(job2);
        ControlledJob cj3 = new ControlledJob(conf);
        cj3.setJob(job3);
        ControlledJob cj4 = new ControlledJob(conf);
        cj4.setJob(job4);
        ControlledJob cj5 = new ControlledJob(conf);
        cj5.setJob(job5);
        ControlledJob cj6 = new ControlledJob(conf);
        cj6.setJob(job6);
        ControlledJob cj7 = new ControlledJob(conf);
        cj7.setJob(job7);
        ControlledJob cj8 = new ControlledJob(conf);
        cj8.setJob(job8);
        ControlledJob cj9 = new ControlledJob(conf);
        cj9.setJob(job9);
        //添加作业间的依赖关系
        cj2.addDependingJob(cj);
        cj3.addDependingJob(cj);
        cj4.addDependingJob(cj3);
        cj5.addDependingJob(cj4);
        cj6.addDependingJob(cj2);
        cj6.addDependingJob(cj5);
        cj7.addDependingJob(cj6);
        cj9.addDependingJob(cj7);
        cj9.addDependingJob(cj8);
        //创建工作流,创建控制器
        JobControl jobs = new JobControl("work_flow");
        jobs.addJob(cj);
        jobs.addJob(cj2);
        jobs.addJob(cj3);
        jobs.addJob(cj4);
        jobs.addJob(cj5);
        jobs.addJob(cj6);
        jobs.addJob(cj7);
        jobs.addJob(cj8);
        jobs.addJob(cj9);
        //启动控制器-》一键完成所有mr计算任务
        Thread t=new Thread(jobs);
        t.start();
        while(true){
            if(jobs.allFinished()){
                System.out.println("作业全部完成");
                System.out.println(jobs.getSuccessfulJobList());
                jobs.stop();
                return 0;
            }else if(jobs.getFailedJobList().size()>0) {
                System.out.println("任务失败");
                System.out.println(jobs.getFailedJobList());
                jobs.stop();
                return -1;
            }
        }
    }
}

总结

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1434490.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

docker自定义镜像并使用

写在前面 本文看下如何自定义镜像。 ik包从这里 下载。 1&#xff1a;自定义带有ik的es镜像 先看下目录结构&#xff1a; /opt/program/mychinese [rootlocalhost mychinese]# ll total 16 -rw-r--r-- 1 root root 1153 Feb 5 04:18 docker-compose.yaml -rw-rw-r-- 1 el…

2024智慧城市新纪元:引领未来,重塑都市生活

随着科技的飞速发展和数字化转型的不断深入&#xff0c;2024年智慧城市领域迎来了全新的发展格局。 这一年&#xff0c;智慧城市的建设更加注重人性化、可持续性和创新性&#xff0c;为城市居民带来了前所未有的便捷与舒适。以下将重点关注智慧城市的几个核心内容&#xff0c;…

Java设计模式-模板方法模式(14)

行为型模式 行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对…

【UE 材质】扇形材质

目录 效果 步骤 &#xff08;1&#xff09;控制扇形的弧宽度 &#xff08;2&#xff09;控制扇形的角度 &#xff08;3&#xff09;完整节点 效果 步骤 &#xff08;1&#xff09;控制扇形的弧宽度 创建一个材质&#xff0c;混合模式设置为“Additive”&#xff0c;着色…

代码随想录算法训练营DAY13 | 栈与队列 (3)

一、LeetCode 239 滑动窗口最大值 题目链接&#xff1a;239.滑动窗口最大值https://leetcode.cn/problems/sliding-window-maximum/ 思路&#xff1a;使用单调队列&#xff0c;只保存窗口中可能存在的最大值&#xff0c;从而降低时间复杂度。 public class MyQueue{Deque<I…

On the Spectral Bias of Neural Networks论文阅读

1. 摘要 众所周知&#xff0c;过度参数化的深度神经网络(DNNs)是一种表达能力极强的函数&#xff0c;它甚至可以以100%的训练精度记忆随机数据。这就提出了一个问题&#xff0c;为什么他们不能轻易地对真实数据进行拟合呢。为了回答这个问题&#xff0c;研究人员使用傅里叶分析…

mysql+node.js+html+js完整扫雷项目

一.下载 可以直接下载绑定资源&#xff0c; 也可以访问&#xff1a;克隆仓库&#xff1a;mine_clearance: mysqlnode.jshtmljs完整扫雷项目 (gitee.com) 二.运行sql数据文件 将mysql数据文件导入到本地 先在本地localhost里创建数据库 mine_clearance&#xff0c; 然后如图&…

编译原理本科课程 专题5 基于 SLR(1)分析的语义分析及中间代码生成程序设计

一、程序功能描述 本程序由C/C编写&#xff0c;实现了赋值语句语法制导生成四元式&#xff0c;并完成了语法分析和语义分析过程。 以专题 1 词法分析程序的输出为语法分析的输入&#xff0c;完成以下描述赋值语句 SLR(1)文法的语义分析及中间代码四元式的过程&#xff0c;实现…

进程和线程的区别详解

&#x1f3a5; 个人主页&#xff1a;Dikz12&#x1f4d5;格言&#xff1a;那些在暗处执拗生长的花&#xff0c;终有一日会馥郁传香欢迎大家&#x1f44d;点赞✍评论⭐收藏 目录 进程 进程在系统中是如何管理的 进一步认识PCB 线程 能否一直增加线程数目来提高效率 进程和线程…

【240126】上海大学—调剂信息

上海大学 学校层级&#xff1a;211 调剂专业&#xff1a;081000 信息与通信工程 发布时间&#xff1a;2024.1.26 发布来源&#xff1a;网络发布 调剂要求&#xff1a;要求考数一英一且初试成绩在320分以上 来源说明 1、官方发布&#xff1a;学校官网、研招网 2、网络发布…

EOF和0区别

题目描述 KiKi学习了循环&#xff0c;BoBo老师给他出了一系列打印图案的练习&#xff0c;该任务是打印用“*”组成的X形图案。 输入描述&#xff1a; 多组输入&#xff0c;一个整数&#xff08;2~20&#xff09;&#xff0c;表示输出的行数&#xff0c;也表示组成“X”的反斜…

Architecture Lab:Part C【流水线通用原理/Y86-64的流水线实现/实现IIADDQ指令】

目录 任务描述 知识回顾 流水线通用原理 Y86-64流水线实现&#xff08;PIPE-与PIPE&#xff09; 开始实验 IIADDQ指令的添加 优化 ncopy.ys 仅用第四章知识&#xff0c;CEP11.55 8x1展开&#xff0c;CPE9.35 8x1展开2x1展开消除气泡&#xff0c;CPE8.10 流水线化通过…

在VM虚拟机上搭建MariaDB数据库服务器

例题&#xff1a;搭建MariaDB数据库服务器&#xff0c;并实现主主复制。 1.在二台服务器中分别MariaDB安装。 2.在二台服务器中分别配置my.cnf文件&#xff0c;开启log_bin。 3.在二台服务器中分别创建专用于数据库同步的用户replication_user&#xff0c;并授权SLAVE。&#x…

【DDD】学习笔记-数据分析模型

在 Eric Evans 提出领域驱动设计之前&#xff0c;对企业系统的分析设计多数采用数据模型驱动设计。如前所述&#xff0c;这种数据模型驱动设计就是站在数据的建模视角&#xff0c;逐步开展分析、设计与实现的建模过程。通过对数据的正确建模&#xff0c;设计人员就可以根据模型…

【INTEL(ALTERA)】为什么在编译 HDMI 英特尔® FPGA IP设计示例 VHDL 变体时看到错误 (13879)?

说明 由于英特尔 Quartus Prime Pro Edition 软件版本 23.2 存在一个问题&#xff0c;您在编译 HDMI 英特尔 FPGA IP设计示例的 VHDL 变体时可能会看到以下错误&#xff1a; 错误 &#xff08;13879&#xff09;&#xff1a; VHDL 绑定指示 hdmi_rx_ram_1port_intel_mce_2010…

Java设计模式大全:23种常见的设计模式详解(一)

本系列文章简介&#xff1a; 设计模式是在软件开发过程中&#xff0c;经过实践和总结得到的一套解决特定问题的可复用的模板。它是一种在特定情境中经过验证的经验和技巧的集合&#xff0c;可以帮助开发人员设计出高效、可维护、可扩展和可复用的软件系统。设计模式提供了一种在…

缩略图保持加密(TPE)论文

文献: R.Zhao,Y.Zhang,Y.Nan,W.Wen,X.Chai,andR. Lan, “Primitively visually meaningful image encryption: A new paradigm,” Inf. Sci. (Ny), Vol. 613, pp. 628–48, 2022. DOI: 10.1016/j.ins.2022.08.027. (1) 第1行:原始图像 第2行:加密图像 加密的目标: 原始…

synchronized内部工作原理

作者简介&#xff1a; zoro-1&#xff0c;目前大二&#xff0c;正在学习Java&#xff0c;数据结构&#xff0c;javaee等 作者主页&#xff1a; zoro-1的主页 欢迎大家点赞 &#x1f44d; 收藏 ⭐ 加关注哦&#xff01;&#x1f496;&#x1f496; synchronized内部工作原理 syn…

运维监控之MySQL死锁查询及监控

死锁是指两个或两个以上的事务在执行过程中&#xff0c;因争夺资源而造成的一种相互等待的现象&#xff0c;若无外力作用&#xff0c;它们都将无法推进下去&#xff0c;此时称系统处于死锁状态或系统产生了死锁。 为了监控MySQL的死锁情况&#xff0c;可以使用以下方法&#xf…

【论文解读】Point Transformer

Point Tranformer 摘要引言方法实验结论 摘要 自注意网络已经彻底改变了自然语言处理&#xff0c;并在图像分析任务&#xff08;如图像分类和对象检测&#xff09;方面取得了令人印象深刻的进展。受这一成功的启发&#xff0c;我们研究了自注意网络在三维点云处理中的应用。我…