HBase表数据的读、写操作与综合操作

news2024/11/24 15:45:43

文章目录

  • HBase表数据的读、写操作与综合操作
    • 一、实验目标
    • 二、实验要求及注意事项
    • 三、实验内容及步骤
  • 附:系列文章

HBase表数据的读、写操作与综合操作

一、实验目标

  1. 熟练掌握通过HBase shell命令来设计HBase表结构实例
  2. 掌握使用HBase编程创建HBase表、删除HBase表、修改HBase表和查看HBase表和表结构。
  3. 掌握通过HBase 编程实现HBase表数据的读、写操作

二、实验要求及注意事项

  1. 给出每个实验的主要实验步骤、实现代码和测试效果截图。
  2. 对本次实验工作进行全面的总结分析。
  3. 建议工程名,类名、包名或表名显示个人学号或者姓名

三、实验内容及步骤

实验任务1:使用MapReduce批量将HBase表中数据导入到HDFS上。表名和表中数据自拟。

主要实现步骤和运行效果图:

完整程序

WjwReadMapper:

package hbase;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.io.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.*;
import org.apache.hadoop.io.*;
public class WjwReadMapper extends TableMapper<Writable, Writable> {
	private Text k=new Text();
	private Text v=new Text();
	public static final String F1="\u0001";
	protected void setup(Context c){
	}
	public void map(ImmutableBytesWritable row,Result r,Context c){
		String value=null;
		String rk=new String(row.get());
		byte[] family=null;
		byte[] column=null;
		long ts=0L;
		try{
			for(KeyValue kv:r.list()){
			value=Bytes.toStringBinary(kv.getValue());
			family=kv.getFamily();
			column=kv.getQualifier();
			ts=kv.getTimestamp();
			k.set(rk);
			v.set(Bytes.toString(family)+F1+Bytes.toString(column)+F1+value+F1+ts);
			c.write(k, v);
			}
		}catch(Exception e){
			e.printStackTrace();
			System.err.println();
		}
	}
}

WjwReadMain:

package hbase;
import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.commons.logging.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.*;

public class WjwReadMain {
	public static final Log LOG = LogFactory.getLog(WjwMain.class);
	public static final String NAME = "Member Test1";
	public static final String TEMP_INDEX_PATH = "hdfs://master:9000/tmp/tb_wjw";
	public static String inputTable = "tb_wjw";
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
		Configuration conf = HBaseConfiguration.create();
		Scan scan = new Scan();
		scan.setBatch(0);
		scan.setCaching(10000);
		scan.setMaxVersions();
		scan.setTimeRange(System.currentTimeMillis() - 3*24*3600*1000L, System.currentTimeMillis());
		scan.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("keyword"));
		conf.setBoolean("mapred.map.tasks.speculative.execution", false);
		conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
		Path tmpIndexPath = new Path(TEMP_INDEX_PATH);
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(tmpIndexPath)){
			fs.delete(tmpIndexPath, true);
		}
		
		Job job = new Job(conf, NAME);
		job.setJarByClass(WjwMain.class);
		TableMapReduceUtil.initTableMapperJob(inputTable, scan, WjwMapper.class, Text.class, Text.class, job);
	   job.setNumReduceTasks(0);
	   job.setOutputFormatClass(TextOutputFormat.class);
	   FileOutputFormat.setOutputPath(job, tmpIndexPath);
	   boolean success = job.waitForCompletion(true);
	   System.exit(success?0:1);
	}
}

运行结果

创建表,用于等会将数据传入hadoop里

1-1

运行map程序将表数据导入hadoop,并查看是否导入成功

1-2

实验任务2:使用MapReduce批量将HDFS上的数据导入到HBase表中。表名和数据自拟,建议体现个人学号或姓名。使用Java编程创建表和删除表,表名和列族自拟。

主要实现步骤和运行效果图:

完整程序

WjwWriteMapper:

package hbase;
import java.io.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.io.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.*;
public class WjwWriteMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
	private byte[] family=null;
	private byte[] qualifier=null;
	private byte[] val=null;
	private String rk=null;
	private long ts=System.currentTimeMillis();
	protected void map(LongWritable key,Text value,Context context) throws InterruptedException, IOException{
		try{
			String line=value.toString();
			String[] arr=line.split("\t",-1);
			if(arr.length==2){
				 rk=arr[0];
			    String[] vals=arr[1].split("\u0001",-1);
			    if(vals.length==4){
			        family=vals[0].getBytes();
					  qualifier=vals[1].getBytes();
					  val=vals[2].getBytes();
					  ts=Long.parseLong(vals[3]);
					  Put put=new Put(rk.getBytes(),ts);
					  put.add(family,qualifier,val);
			        context.write(new ImmutableBytesWritable(rk.getBytes()), put);
			    }
			}
		}catch(Exception e){
		    e.printStackTrace();
		}
	}
}

WjwWriteMain:

package hbase;
import org.apache.hadoop.util.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.io.*;
import org.apache.hadoop.hbase.mapreduce.*;
import java.io.IOException;
import org.apache.commons.logging.*;
public class WjwWriteMain extends Configured implements Tool{
	static final Log LOG=LogFactory.getLog(WjwWriteMain.class);
	public int run(String[] args)throws Exception{
		if(args.length!=2){
		    LOG.info("2 parameters needed!");
		}
		String input="hdfs://master:9000/tmp/tb_wjw/part-m-00000";
		String table="tb_wjw01";
		Configuration conf=HBaseConfiguration.create();
		Job job=new Job(conf,"Input from file "+input+" into table "+table);
		job.setJarByClass(WjwWriteMain.class);
		job.setMapperClass(WjwWriteMapper.class);
		job.setJarByClass(WjwWriteMain.class);
		job.setMapperClass(WjwWriteMapper.class);
		job.setOutputFormatClass(TableOutputFormat.class);
		job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE,table);
		job.setOutputKeyClass(ImmutableBytesWritable.class);
		job.setOutputValueClass(Waitable.class);
		job.setNumReduceTasks(0);
		FileInputFormat.addInputPath(job, new Path(input));
		return job.waitForCompletion(true)?0:1;
	}
	public static void main(String[] args) throws IOException {
	    Configuration conf=new Configuration();
		 String[] otherArgs=new GenericOptionsParser(conf,args).getRemainingArgs();
		try {
		    System.out.println(ToolRunner.run(conf, new WjwWriteMain(),otherArgs));
		}catch(Exception e) {
		    e.printStackTrace();
		}
	}
}

运行结果

创建一个空表tb_wjw01,用于等会将tb_wjw的数据导入tb_wjw01

2-1

配置yarn,并运行map程序

2-2
2-3

查看hadoop里的表tb_wjw

2-4

将hadoop里tb_wjw的数据导入hbase里的tb_wjw01里面

2-5

实验任务3:在实验任务1和实验任务2的基础上,通过HBase编程,实现创建HBase表,修改HBase表(包括增加列族和删除列族),向HBase表中写入数据,读取HBase表中数据,查看HBase数据库中所有表和表结构功能,建议在一个类中定义多个方法实现上述功能,并进行验证。表名和数据自拟。

主要实现步骤和运行效果图:

完整程序

package hbase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

public class WjwHbase{
    private static Configuration conf = HBaseConfiguration.create();

    public static void createTable(String tableName, String[] families)
            throws IOException {
        Connection conn = ConnectionFactory.createConnection(conf);
        Admin admin = conn.getAdmin();
        if (admin.tableExists(TableName.valueOf(tableName))) {
            System.out.println("Table already exists!");
        } else {
            HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
            for (String family : families) {
                tableDesc.addFamily(new HColumnDescriptor(family));
            }
            admin.createTable(tableDesc);
            System.out.println("Table created successfully!");
        }
        admin.close();
        conn.close();
    }

    public static void addRecord(String tableName, String rowKey,
                                  String family, String qualifier, String value) throws IOException {
        Connection conn = ConnectionFactory.createConnection(conf);
        Table table = conn.getTable(TableName.valueOf(tableName));
        Put put = new Put(Bytes.toBytes(rowKey));
        put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(value));
        table.put(put);
        System.out.println("Record added successfully!");
        table.close();
        conn.close();
    }

    public static void deleteRecord(String tableName, String rowKey,
                                     String family, String qualifier) throws IOException {
        Connection conn = ConnectionFactory.createConnection(conf);
        Table table = conn.getTable(TableName.valueOf(tableName));
        Delete delete = new Delete(Bytes.toBytes(rowKey));
        delete.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
        table.delete(delete);
        System.out.println("Record deleted successfully!");
        table.close();
        conn.close();
    }

    public static void deleteTable(String tableName) throws IOException {
        Connection conn = ConnectionFactory.createConnection(conf);
        Admin admin = conn.getAdmin();
        if (admin.tableExists(TableName.valueOf(tableName))) {
            admin.disableTable(TableName.valueOf(tableName));
            admin.deleteTable(TableName.valueOf(tableName));
            System.out.println("Table deleted successfully!");
        } else {
            System.out.println("Table does not exist!");
        }
        admin.close();
        conn.close();
    }

    public static void addColumnFamily(String tableName, String columnFamily) throws IOException {
        Connection conn = ConnectionFactory.createConnection(conf);
        Admin admin = conn.getAdmin();
        if (admin.tableExists(TableName.valueOf(tableName))) {
            HColumnDescriptor columnDesc = new HColumnDescriptor(columnFamily);
            admin.addColumn(TableName.valueOf(tableName), columnDesc);
            System.out.println("Column family added successfully!");
        } else {
            System.out.println("Table does not exist!");
        }
        admin.close();
        conn.close();
    }

    public static void deleteColumnFamily(String tableName, String columnFamily) throws IOException {
        Connection conn = ConnectionFactory.createConnection(conf);
        Admin admin = conn.getAdmin();
        if (admin.tableExists(TableName.valueOf(tableName))) {
            admin.deleteColumn(TableName.valueOf(tableName), Bytes.toBytes(columnFamily));
            System.out.println("Column family deleted successfully!");
        } else {
            System.out.println("Table does not exist!");
        }
        admin.close();
        conn.close();
    }

    public static void getRecord(String tableName, String rowKey,
                                  String family, String qualifier) throws IOException {
        Connection conn = ConnectionFactory.createConnection(conf);
        Table table = conn.getTable(TableName.valueOf(tableName));
        Get get = new Get(Bytes.toBytes(rowKey));
        Result result = table.get(get);
        byte[] value = result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier));
        System.out.println("Result: " + Bytes.toString(value));
        table.close();
        conn.close();
    }

    public static void scanTable(String tableName) throws IOException {
        Connection conn = ConnectionFactory.createConnection(conf);
        Table table = conn.getTable(TableName.valueOf(tableName));
        Scan scan = new Scan();
        ResultScanner scanner = table.getScanner(scan);
        for (Result result : scanner) {
            System.out.println("Result: " + result);
        }
        table.close();
        conn.close();
    }

    public static void listTables() throws IOException {
        Connection conn = ConnectionFactory.createConnection(conf);
        Admin admin = conn.getAdmin();
        HTableDescriptor[] tableDescs = admin.listTables();
        List<String> tableNames = new ArrayList<String>();
        for (HTableDescriptor tableDesc : tableDescs) {
            tableNames.add(tableDesc.getNameAsString());
        }
        System.out.println("Tables: " + tableNames);
        admin.close();
        conn.close();
    }

    public static void describeTable(String tableName) throws IOException {
        Connection conn = ConnectionFactory.createConnection(conf);
        Admin admin = conn.getAdmin();
        HTableDescriptor tableDesc = admin.getTableDescriptor(TableName.valueOf(tableName));
        System.out.println("Table structure: " + tableDesc);
        admin.close();
        conn.close();
    }

    public static void main(String[] args) throws IOException {
        String tableName = "wjwtest";
        String rowKey = "row1";
        String family = "cf1";
        String qualifier = "q1";
        String value = "this is wjw!";
        String columnFamily = "cf2";
        String[] families = {family};
        createTable(tableName, families);
        addRecord(tableName, rowKey, family, qualifier, value);
        getRecord(tableName, rowKey, family, qualifier);
        scanTable(tableName);
        addColumnFamily(tableName, columnFamily);
        describeTable(tableName);
        deleteColumnFamily(tableName, columnFamily);
        deleteRecord(tableName, rowKey, family, qualifier);
        deleteTable(tableName);
        listTables();
    }
}

运行结果

3-1
3-2
3-3
3-4
3-5
3-6

附:系列文章

实验文章目录直达链接
实验01Hadoop安装部署https://want595.blog.csdn.net/article/details/132767284
实验02HDFS常用shell命令https://want595.blog.csdn.net/article/details/132863345
实验03Hadoop读取文件https://want595.blog.csdn.net/article/details/132912077
实验04HDFS文件创建与写入https://want595.blog.csdn.net/article/details/133168180
实验05HDFS目录与文件的创建删除与查询操作https://want595.blog.csdn.net/article/details/133168734
实验06SequenceFile、元数据操作与MapReduce单词计数https://want595.blog.csdn.net/article/details/133926246
实验07MapReduce编程:数据过滤保存、UID 去重https://want595.blog.csdn.net/article/details/133947981
实验08MapReduce 编程:检索特定群体搜索记录和定义分片操作https://want595.blog.csdn.net/article/details/133948849
实验09MapReduce 编程:join操作和聚合操作https://want595.blog.csdn.net/article/details/133949148
实验10MapReduce编程:自定义分区和自定义计数器https://want595.blog.csdn.net/article/details/133949522

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1184064.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

中国发表第一篇计算机顶会的人都怎么样了?中国ACL、AAAI、CVPR第一人是谁

夕小瑶科技说 整理 | 王二狗中国大陆发第一篇计算机顶会的人都怎么样了&#xff1f;相信AI从业者对这个话题都会非常感兴趣&#xff0c;本文对知乎上各位大佬的信息做一个整理&#xff0c;分享给大家。 注&#xff1a;信息由知乎网友整理&#xff0c;不保证100%准确&#xff0c…

主流新闻媒体有哪些,怎么邀约记者

主流新闻媒体是社会信息传播的重要渠道&#xff0c;它们拥有广泛的受众群体和影响力。邀请主流新闻媒体参与活动或报道&#xff0c;可以迅速扩大活动影响力&#xff0c;提升组织的知名度和公信力。下面介绍一些主流新闻媒体及邀约媒体的方法。 主流新闻媒体 1、电视台&#x…

算法通过村第十八关-回溯|白银笔记|经典问题

文章目录 前言组合总和问题分割回文串子集问题排序问题字母大小写全排列单词搜索总结 前言 提示&#xff1a;我不愿再给你写信了。因为我终于感到&#xff0c;我们的全部通信知识一个大大的幻影&#xff0c;我们每个人知识再给自己写信。 --安德烈纪德 回溯主要解决一些暴力枚举…

Find My拐杖|苹果Find My技术与拐杖结合,智能防丢,全球定位

拐杖是一种重要的医疗康复辅助用具&#xff0c;辅助腿脚不灵活的人群平稳的行走&#xff0c;避免出现摔倒等情况。目前&#xff0c;全球均已步入老龄化社会&#xff0c;那么老年人的生活质量和安全成为各国学者的关注点。由于年龄原因容易突发意外情况&#xff0c;如摔倒&#…

SSM大学生众筹平台-计算机毕设 附源码22506

SSM大学生众筹平台 摘 要 随着社会的发展&#xff0c;社会的各行各业都在利用信息化时代的优势。计算机的优势和普及使得各种信息系统的开发成为必需。 大学生众筹平台&#xff0c;主要的模块包括管理员和用户及筹资用户&#xff0c;实现功能包括&#xff1a;首页、个人资料&a…

多媒体融合应急通信解决方案

近年来&#xff0c;随着经济社会快速发展和现代化进程加快&#xff0c;我国公共安全面临诸多新的挑战。面对大型安全事故发生后&#xff0c;救援队伍必须在恶劣的条件下迅速建立指挥调度中心&#xff0c;方能协调前后方救援力量&#xff0c;这对应急通信网络建设的可靠性、时效…

基于springboot实现智慧外贸平台系统【项目源码+论文说明】

基于springboot实现智慧外贸平台管理系统演示 摘要 网络的广泛应用给生活带来了十分的便利。所以把智慧外贸管理与现在网络相结合&#xff0c;利用java技术建设智慧外贸平台&#xff0c;实现智慧外贸的信息化。则对于进一步提高智慧外贸管理发展&#xff0c;丰富智慧外贸管理经…

ICC2与PT端口时序上的差别

我正在「拾陆楼」和朋友们讨论有趣的话题,你⼀起来吧? 拾陆楼知识星球入口 有星球成员遇到如下问题: 你好,我想问一下就是之前一直遇到一个情况:INtoReg的path_group的时序报告,ICC2里launch的clock network delay(propagated)会有一个值,skew就很小。 但是到PT里launc…

窗函数法设计FIR中,如何选择窗函数和滤波器阶数N

窗函数法设计FIR中&#xff0c;如何选择窗函数和滤波器阶数N 1、概述 在用窗函数法设计FIR滤波器时&#xff0c;给出了滤波器要求的具体指标&#xff0c;包括通带频率fp、阻带频率fs、通带波纹Rp 和阻带衰减As 等&#xff0c;有了这些指标后&#xff0c;是否什么窗函数都可以选…

辐射骚扰整改思路及方法:实地验证?|深圳比创达电子EMC

某产品首次EMC测试时&#xff0c;辐射、静电、浪涌均失败。本篇文章就“实地验证”问题进行详细讨论。 方案拟定好后&#xff0c;就该准备前往EMC检测机构进行测试了。出发前&#xff0c;先分析4个解决方案的易操作程度&#xff1a;方案2最容易验证&#xff0c;只要将磁环往电…

【机器学习】六、概率图模型

今天我们对概率图模型&#xff08;Probabilistic Graphical Model&#xff0c;PGM&#xff09;做一个总结。 模型表示 概率图模型&#xff0c;是指一种用图结构来描述多元随机变量之间条件独立关系的概率模型。 它提出的背景是为了更好研究复杂联合概率分布的数据特征&#x…

亚马逊合规,亚马逊涉及12个站点合规政策更新,需警惕合规要求!

最近&#xff0c;许多亚马逊站点的卖家陆续收到了合规政策更新的通知邮件&#xff0c;涵盖了美国站、加拿大站、英国站、法国站、意大利站、德国站以及西班牙站。 这些更新影响了不同品类的卖家&#xff0c;包括以下品类&#xff1a; 美国站&#xff08;US&#xff09;对于“发…

农林牧数据可视化监控平台 | 智慧农垦

数字农业是一种现代农业方式&#xff0c;它将信息作为农业生产的重要元素&#xff0c;并利用现代信息技术进行农业生产过程的实时可视化、数字化设计和信息化管理。能将信息技术与农业生产的各个环节有机融合&#xff0c;对于改造传统农业和改变农业生产方式具有重要意义。 图扑…

3 任务3 使用趋动云部署自己的stable-diffusion

使用趋动云部署自己的stable-diffusion 1 创建项目&#xff1a;2 初始化开发环境实例3 部署模型4 模型测试 1 创建项目&#xff1a; 1.进入趋动云用户工作台&#xff0c;选择&#xff1a;当前空间&#xff0c;请确保当前所在空间是注册时系统自动生成的空间。 a.非系统自动生成…

19.6 Boost Asio 文本压缩传输

Base64是一种二进制到文本的编码方案&#xff0c;用于将二进制数据转换为ASCII字符串格式。它通过将二进制数据流转换为一系列64个字符来工作&#xff0c;这些字符都可以安全地传输到设计用于处理文本数据的系统中。 如下代码中我们使用Boost中提供的base64_from_binary头文件…

网站小程序分类目录网源码系统+会员登录注册功能 带完整搭建教程

大家好啊&#xff0c;源码小编今天来给大家分享一款网站小程序分类目录网源码系统会员登录注册功能 。 以下是核心代码图模块&#xff1a; 系统特色功能一览&#xff1a; 分类目录&#xff1a;系统按照不同的类别对网站进行分类&#xff0c;方便用户查找自己需要的网站。用户可…

【算法与数据结构】39、LeetCode组合总和

文章目录 一、题目二、解法三、完整代码 所有的LeetCode题解索引&#xff0c;可以看这篇文章——【算法和数据结构】LeetCode题解。 一、题目 二、解法 思路分析&#xff1a;这道题当中数字可以多次使用&#xff0c;那么我们在递归语句当中不能直接找下一个candidate的元素&…

IP地址与MAC地址(硬件地址)的区别

IP地址和硬件地址都是用于标识网络设备的地址&#xff0c;但它们的作用和使用方式不同。IP地址是用于在网络中唯一标识一个设备的逻辑地址它是由网络协议栈分配的&#xff0c;可以动态地分配和改变。而硬件地址是设备的物理地址&#xff0c;也称为MAC地址&#xff0c;是由设备制…

《009.Springboot+vue之进销存管理系统》

《009.Springbootvue之进销存管理系统》 项目简介 [1]本系统涉及到的技术主要如下&#xff1a; 推荐环境配置&#xff1a;DEA jdk1.8 Maven MySQL 前后端分离; 后台&#xff1a;SpringBootMybatisredis; 前台&#xff1a;vueElementUI; [2]功能模块展示&#xff1a; 1.用户管…