mapreduce 将数据清洗后保存到 hbase
数据格式
{ "年份" : "1990" , "国家补贴(亿元)" : "5.4" , "地方补贴(亿元)" : "3.2" , "企业补贴(亿元)" : "0.8" , "其他补贴(亿元)" : "0.5" }
{ "年份" : "1991" , "国家补贴(亿元)" : "5.8" , "地方补贴(亿元)" : "3.4" , "企业补贴(亿元)" : "0.9" , "其他补贴(亿元)" : "0.6" }
{ "年份" : "1992" , "国家补贴(亿元)" : "6.2" , "地方补贴(亿元)" : "3.7" , "企业补贴(亿元)" : "1" , "其他补贴(亿元)" : "0.7" }
{ "年份" : "1993" , "国家补贴(亿元)" : "7" , "地方补贴(亿元)" : "4.1" , "企业补贴(亿元)" : "1.2" , "其他补贴(亿元)" : "0.8" }
{ "年份" : "1994" , "国家补贴(亿元)" : "7.8" , "地方补贴(亿元)" : "4.5" , "企业补贴(亿元)" : "1.4" , "其他补贴(亿元)" : "0.9" }
{ "年份" : "1995" , "国家补贴(亿元)" : "8.5" , "地方补贴(亿元)" : "4.9" , "企业补贴(亿元)" : "1.6" , "其他补贴(亿元)" : "1" }
{ "年份" : "1996" , "国家补贴(亿元)" : "9.2" , "地方补贴(亿元)" : "5.3" , "企业补贴(亿元)" : "1.8" , "其他补贴(亿元)" : "1.1" }
{ "年份" : "1997" , "国家补贴(亿元)" : "10" , "地方补贴(亿元)" : "5.7" , "企业补贴(亿元)" : "2" , "其他补贴(亿元)" : "1.2" }
{ "年份" : "1998" , "国家补贴(亿元)" : "10.8" , "地方补贴(亿元)" : "6.1" , "企业补贴(亿元)" : "2.2" , "其他补贴(亿元)" : "1.3" }
{ "年份" : "1999" , "国家补贴(亿元)" : "11.6" , "地方补贴(亿元)" : "6.6" , "企业补贴(亿元)" : "2.5" , "其他补贴(亿元)" : "1.4" }
{ "年份" : "2000" , "国家补贴(亿元)" : "12.5" , "地方补贴(亿元)" : "7.2" , "企业补贴(亿元)" : "2.8" , "其他补贴(亿元)" : "1.6" }
{ "年份" : "2001" , "国家补贴(亿元)" : "13.5" , "地方补贴(亿元)" : "7.9" , "企业补贴(亿元)" : "3.2" , "其他补贴(亿元)" : "1.8" }
{ "年份" : "2002" , "国家补贴(亿元)" : "14.5" , "地方补贴(亿元)" : "8.7" , "企业补贴(亿元)" : "3.7" , "其他补贴(亿元)" : "2" }
{ "年份" : "2003" , "国家补贴(亿元)" : "15.6" , "地方补贴(亿元)" : "9.6" , "企业补贴(亿元)" : "4.3" , "其他补贴(亿元)" : "2.2" }
{ "年份" : "2004" , "国家补贴(亿元)" : "16.8" , "地方补贴(亿元)" : "10.6" , "企业补贴(亿元)" : "5" , "其他补贴(亿元)" : "2.5" }
{ "年份" : "2005" , "国家补贴(亿元)" : "18.2" , "地方补贴(亿元)" : "11.7" , "企业补贴(亿元)" : "5.8" , "其他补贴(亿元)" : "2.8" }
{ "年份" : "2006" , "国家补贴(亿元)" : "19.8" , "地方补贴(亿元)" : "12.9" , "企业补贴(亿元)" : "6.7" , "其他补贴(亿元)" : "3.2" }
{ "年份" : "2007" , "国家补贴(亿元)" : "21.5" , "地方补贴(亿元)" : "14.3" , "企业补贴(亿元)" : "7.7" , "其他补贴(亿元)" : "3.7" }
{ "年份" : "2008" , "国家补贴(亿元)" : "23.3" , "地方补贴(亿元)" : "15.9" , "企业补贴(亿元)" : "8.8" , "其他补贴(亿元)" : "4.3" }
{ "年份" : "2009" , "国家补贴(亿元)" : "25.2" , "地方补贴(亿元)" : "17.6" , "企业补贴(亿元)" : "10.1" , "其他补贴(亿元)" : "5" }
{ "年份" : "2010" , "国家补贴(亿元)" : "27.2" , "地方补贴(亿元)" : "19.4" , "企业补贴(亿元)" : "11.6" , "其他补贴(亿元)" : "5.8" }
{ "年份" : "2011" , "国家补贴(亿元)" : "29.2" , "地方补贴(亿元)" : "21.3" , "企业补贴(亿元)" : "13.3" , "其他补贴(亿元)" : "6.7" }
{ "年份" : "2012" , "国家补贴(亿元)" : "31.3" , "地方补贴(亿元)" : "23.4" , "企业补贴(亿元)" : "15.2" , "其他补贴(亿元)" : "7.7" }
{ "年份" : "2013" , "国家补贴(亿元)" : "33.5" , "地方补贴(亿元)" : "25.6" , "企业补贴(亿元)" : "17.3" , "其他补贴(亿元)" : "8.8" }
{ "年份" : "2014" , "国家补贴(亿元)" : "35.8" , "地方补贴(亿元)" : "27.9" , "企业补贴(亿元)" : "19.6" , "其他补贴(亿元)" : "10" }
{ "年份" : "2015" , "国家补贴(亿元)" : "38.2" , "地方补贴(亿元)" : "30.3" , "企业补贴(亿元)" : "22.1" , "其他补贴(亿元)" : "11.4" }
{ "年份" : "2016" , "国家补贴(亿元)" : "40.7" , "地方补贴(亿元)" : "32.8" , "企业补贴(亿元)" : "24.9" , "其他补贴(亿元)" : "13.1" }
{ "年份" : "2017" , "国家补贴(亿元)" : "43.3" , "地方补贴(亿元)" : "35.5" , "企业补贴(亿元)" : "27.9" , "其他补贴(亿元)" : "15.2" }
{ "年份" : "2018" , "国家补贴(亿元)" : "46.2" , "地方补贴(亿元)" : "38.3" , "企业补贴(亿元)" : "31.2" , "其他补贴(亿元)" : "17.6" }
{ "年份" : "2019" , "国家补贴(亿元)" : "49.3" , "地方补贴(亿元)" : "41.3" , "企业补贴(亿元)" : "34.8" , "其他补贴(亿元)" : "20.3" }
{ "年份" : "2020" , "国家补贴(亿元)" : "52.5" , "地方补贴(亿元)" : "44.6" , "企业补贴(亿元)" : "38.7" , "其他补贴(亿元)" : "23.5" }
{ "年份" : "2021" , "国家补贴(亿元)" : "55.9" , "地方补贴(亿元)" : "48.2" , "企业补贴(亿元)" : "42.8" , "其他补贴(亿元)" : "27.1" }
{ "年份" : "2022" , "国家补贴(亿元)" : "59.4" , "地方补贴(亿元)" : "52.1" , "企业补贴(亿元)" : "47.3" , "其他补贴(亿元)" : "31.4" }
{ "年份" : "2023" , "国家补贴(亿元)" : "63.1" , "地方补贴(亿元)" : "56.5" , "企业补贴(亿元)" : "52.4" , "其他补贴(亿元)" : "36.2" }
javabean
package cn. lhz. bean ;
import cn. lhz. util. annotation. RowKeyAnnotation ;
import lombok. AllArgsConstructor ;
import lombok. Getter ;
import lombok. NoArgsConstructor ;
import lombok. Setter ;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class SubsidyYear {
@RowKeyAnnotation
private Integer year;
private double country;
private double local;
private double enterprise;
private double other;
@Override
public String toString ( ) {
return this . year + "\t" + this . country + "," + this . local + "," + this . enterprise + "," + this . other;
}
}
mapreduce
package cn. lhz. etl ;
import cn. lhz. bean. SubsidyYear ;
import cn. lhz. util. hbase. HbaseUtil ;
import cn. lhz. util. string. StringUtil ;
import org. apache. hadoop. conf. Configuration ;
import org. apache. hadoop. fs. FileSystem ;
import org. apache. hadoop. fs. Path ;
import org. apache. hadoop. hbase. client. Connection ;
import org. apache. hadoop. hbase. client. Table ;
import org. apache. hadoop. io. LongWritable ;
import org. apache. hadoop. io. Text ;
import org. apache. hadoop. mapreduce. Job ;
import org. apache. hadoop. mapreduce. Mapper ;
import org. apache. hadoop. mapreduce. Reducer ;
import org. apache. hadoop. mapreduce. lib. input. FileInputFormat ;
import org. apache. hadoop. mapreduce. lib. output. FileOutputFormat ;
import java. io. IOException ;
import java. lang. reflect. InvocationTargetException ;
public class SubsidyYear2Hbase {
public static class SubsidyYearMapper extends Mapper < LongWritable , Text , Text , Text > {
@Override
protected void map ( LongWritable key, Text value, Mapper < LongWritable , Text , Text , Text > . Context context) throws IOException , InterruptedException {
String json = value. toString ( ) ;
String csv = StringUtil . extractValuesToString ( json) ;
System . out. println ( csv) ;
System . out. println ( "key >>> " + csv. substring ( 0 , csv. indexOf ( "," ) ) ) ;
System . out. println ( "value >>> " + csv. substring ( csv. indexOf ( "," ) + 1 ) ) ;
Text outKey = new Text ( csv. substring ( 0 , csv. indexOf ( "," ) ) ) ;
Text outValue = new Text ( csv. substring ( csv. indexOf ( "," ) + 1 ) ) ;
context. write ( outKey, outValue) ;
}
}
public static class SubsidyYearReducer extends Reducer < Text , Text , Text , Text > {
private Connection connection;
public Table table;
@Override
protected void setup ( Reducer < Text , Text , Text , Text > . Context context) throws IOException , InterruptedException {
connection = HbaseUtil . getConnection ( ) ;
String tableName = "SUBSIDY_YEAR" ;
table = HbaseUtil . getTable ( connection, tableName) ;
}
@Override
protected void reduce ( Text key, Iterable < Text > values, Reducer < Text , Text , Text , Text > . Context context) throws IOException , InterruptedException {
String csv = "" ;
for ( Text value : values) {
csv = value. toString ( ) ;
}
try {
SubsidyYear subsidyYear = StringUtil . csv2Bean ( csv, false , SubsidyYear . class ) ;
subsidyYear. setYear ( Integer . parseInt ( key. toString ( ) ) ) ;
HbaseUtil . upsert ( table, "OVER_THE_YEARS" , subsidyYear) ;
} catch ( IllegalAccessException | NoSuchMethodException | InvocationTargetException | InstantiationException e) {
throw new RuntimeException ( e) ;
}
}
@Override
protected void cleanup ( Reducer < Text , Text , Text , Text > . Context context) throws IOException , InterruptedException {
if ( table != null ) {
table. close ( ) ;
}
if ( connection != null ) {
connection. close ( ) ;
}
}
}
public static void main ( String [ ] args) throws IOException , InterruptedException , ClassNotFoundException {
System . setProperty ( "HADOOP_USER_NAME" , "root" ) ;
Configuration conf = new Configuration ( ) ;
conf. set ( "mapreduce.app-submission.cross-platform" , "true" ) ;
conf. set ( "mapreduce.framework.name" , "local" ) ;
conf. set ( "mapreduce.cluster.local.dir" , "file:///home/lhz/hadoop" ) ;
Job job = Job . getInstance ( conf, "教育历年补贴" ) ;
job. setJarByClass ( SubsidyYear2Hbase . class ) ;
job. setMapperClass ( SubsidyYearMapper . class ) ;
job. setReducerClass ( SubsidyYearReducer . class ) ;
job. setOutputKeyClass ( Text . class ) ;
job. setOutputValueClass ( Text . class ) ;
FileInputFormat . addInputPath ( job, new Path ( "/edu-ods/教育补贴.log" ) ) ;
Path path = new Path ( "/edu-dwd" ) ;
FileSystem fs = path. getFileSystem ( conf) ;
if ( fs. exists ( path) ) {
fs. delete ( path, true ) ;
}
FileOutputFormat . setOutputPath ( job, path) ;
System . exit ( job. waitForCompletion ( true ) ? 0 : 1 ) ;
}
}