目录
1.添加依赖
2.在hbase shell界面中分别输入下面的语句,创建namespace和表
3.UserFriendToHB
4.UsersToHB
5.TrainToHB
6.EventsToHB
7.EventAttendeToHb
1.添加依赖
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.8.0</version>
</dependency>
<!--hbase相关依赖-->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.3.5</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>2.3.5</version>
</dependency>
</dependencies>
2.在hbase shell界面中分别输入下面的语句,创建namespace和表
create_namespace 'events_db'
create 'events_db:users','profile','region','registration'
create 'events_db:user_friend','uf'
create 'events_db:events','schedule','location','creator','remark'
create 'events_db:event_attendee','euat'
create 'events_db:train','eu'
3.UserFriendToHB
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Properties;
/**
* 将Kafka中的user_friends数据消费到HBase中
*/
public class UserFriendToHB {
static int num = 0;// 计数器
public static void main(String[] args) {
Properties prop = new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.180.147:9092");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");// 手动提交
prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");// 自动提交时,提交时间
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "userfriend_group");
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Collections.singleton("user_friends"));
// 配置HBase信息,连接HBase数据库
Configuration conf = HBaseConfiguration.create();
conf.set(HConstants.HBASE_DIR, "hdfs://192.168.180.147:9000/hbase");
conf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.180.147");
conf.set(HConstants.CLIENT_PORT_STR, "2181");
Connection connection = null;
try {
connection = ConnectionFactory.createConnection(conf);
Table userFriendTable = connection.getTable(TableName.valueOf("events_db:user_friend"));
while (true) {
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
ArrayList<Put> datas = new ArrayList<>();
for (ConsumerRecord<String, String> record :
poll) {
System.out.println(record.value());// userid,friendid
String[] split = record.value().split(",");
Put put = new Put(Bytes.toBytes(((split[0]) + split[1]).hashCode()));
put.addColumn("uf".getBytes(), "user_id".getBytes(), split[0].getBytes());
put.addColumn("uf".getBytes(), "friend_id".getBytes(), split[1].getBytes());
datas.add(put);
}
num = num + datas.size();
System.out.println("---------------------------------num:" + num);
if (datas.size() != 0)
userFriendTable.put(datas);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
4.UsersToHB
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Properties;
public class UsersToHB {
static int num = 0;// 计数器
public static void main(String[] args) {
Properties prop = new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.180.147:9092");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");// 手动提交
prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");// 自动提交时,提交时间
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "users_group");
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Collections.singleton("users"));
// 配置HBase信息,连接HBase数据库
Configuration conf = HBaseConfiguration.create();
conf.set(HConstants.HBASE_DIR, "hdfs://192.168.180.147:9000/hbase");
conf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.180.147");
conf.set(HConstants.CLIENT_PORT_STR, "2181");
Connection connection = null;
try {
connection = ConnectionFactory.createConnection(conf);
Table usersTable = connection.getTable(TableName.valueOf("events_db:users"));
while (true) {
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
ArrayList<Put> datas = new ArrayList<>();
for (ConsumerRecord<String, String> record :
poll) {
System.out.println(record.value());
String[] user = record.value().split(",");
Put put = new Put(Bytes.toBytes(user[0]));
put.addColumn("profile".getBytes(), "birthyear".getBytes(), user[2].getBytes());
put.addColumn("profile".getBytes(), "gender".getBytes(), user[3].getBytes());
put.addColumn("region".getBytes(), "locale".getBytes(), user[1].getBytes());
if (user.length > 5)
put.addColumn("region".getBytes(), "location".getBytes(), user[5].getBytes());
if (user.length > 6)
put.addColumn("region".getBytes(), "timezone".getBytes(), user[6].getBytes());
if (user.length > 4)
put.addColumn("registration".getBytes(), "joinedAt".getBytes(), user[4].getBytes());
datas.add(put);
}
num = num + datas.size();
System.out.println("---------------------------------num:" + num);
if (datas.size() != 0)
usersTable.put(datas);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
5.TrainToHB
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Properties;
public class TrainToHB {
static int num = 0;// 计数器
public static void main(String[] args) {
Properties prop = new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.180.147:9092");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");// 手动提交
prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");// 自动提交时,提交时间
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "train_group");
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Collections.singleton("train"));
// 配置HBase信息,连接HBase数据库
Configuration conf = HBaseConfiguration.create();
conf.set(HConstants.HBASE_DIR, "hdfs://192.168.180.147:9000/hbase");
conf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.180.147");
conf.set(HConstants.CLIENT_PORT_STR, "2181");
Connection connection = null;
try {
connection = ConnectionFactory.createConnection(conf);
Table trainTable = connection.getTable(TableName.valueOf("events_db:train"));
while (true) {
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
ArrayList<Put> datas = new ArrayList<>();
for (ConsumerRecord<String, String> record :
poll) {
System.out.println(record.value());
String[] train = record.value().split(",");
double random = Math.random();
Put put = new Put(Bytes.toBytes(train[0]+train[1]+random));
put.addColumn("eu".getBytes(),"user".getBytes(),train[0].getBytes());
put.addColumn("eu".getBytes(),"event".getBytes(),train[1].getBytes());
put.addColumn("eu".getBytes(),"invited".getBytes(),train[2].getBytes());
put.addColumn("eu".getBytes(),"timestamp".getBytes(),train[3].getBytes());
put.addColumn("eu".getBytes(),"interested".getBytes(),train[4].getBytes());
put.addColumn("eu".getBytes(),"not_interested".getBytes(),train[5].getBytes());
datas.add(put);
}
num = num + datas.size();
System.out.println("---------------------------------num:" + num);
if (datas.size() != 0)
trainTable.put(datas);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
6.EventsToHB
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Properties;
public class EventsToHB {
static int num = 0;// 计数器
public static void main(String[] args) {
Properties prop = new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.180.147:9092");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");// 手动提交
prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");// 自动提交时,提交时间
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "events_group");
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Collections.singleton("events"));
// 配置HBase信息,连接HBase数据库
Configuration conf = HBaseConfiguration.create();
conf.set(HConstants.HBASE_DIR, "hdfs://192.168.180.147:9000/hbase");
conf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.180.147");
conf.set(HConstants.CLIENT_PORT_STR, "2181");
Connection connection = null;
try {
connection = ConnectionFactory.createConnection(conf);
Table eventsTable = connection.getTable(TableName.valueOf("events_db:events"));
while (true) {
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
ArrayList<Put> datas = new ArrayList<>();
for (ConsumerRecord<String, String> record :
poll) {
System.out.println(record.value());
String[] events = record.value().split(",");
Put put = new Put(Bytes.toBytes((events[0])));
put.addColumn("creator".getBytes(), "userid".getBytes(), Bytes.toBytes(events[1]));
put.addColumn("schedule".getBytes(), "starttime".getBytes(), Bytes.toBytes(events[2]));
put.addColumn("location".getBytes(), "city".getBytes(), Bytes.toBytes(events[3]));
put.addColumn("location".getBytes(), "state".getBytes(), Bytes.toBytes(events[4]));
put.addColumn("location".getBytes(), "zip".getBytes(), Bytes.toBytes(events[5]));
put.addColumn("location".getBytes(), "country".getBytes(), Bytes.toBytes(events[6]));
put.addColumn("location".getBytes(), "lat".getBytes(), Bytes.toBytes(events[7]));
put.addColumn("location".getBytes(), "lng".getBytes(), Bytes.toBytes(events[8]));
put.addColumn("remark".getBytes(), "commonwords".getBytes(), Bytes.toBytes(events[9]));
datas.add(put);
}
num = num + datas.size();
System.out.println("---------------------------------num:" + num);
if (datas.size() != 0)
eventsTable.put(datas);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
7.EventAttendeToHb
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Properties;
public class EventAttendeToHb {
static int num = 0;// 计数器
public static void main(String[] args) {
Properties prop = new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.180.147:9092");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");// 手动提交
prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");// 自动提交时,提交时间
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "eventattendee_group");
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Collections.singleton("event_attendees"));
// 配置HBase信息,连接HBase数据库
Configuration conf = HBaseConfiguration.create();
conf.set(HConstants.HBASE_DIR, "hdfs://192.168.180.147:9000/hbase");
conf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.180.147");
conf.set(HConstants.CLIENT_PORT_STR, "2181");
Connection connection = null;
try {
connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(TableName.valueOf("events_db:event_attendee"));
while (true) {
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
ArrayList<Put> datas = new ArrayList<>();
for (ConsumerRecord<String, String> record :
poll) {
System.out.println(record.value());// eventid,friendid,yes/no/maybe
String[] eventattend = record.value().split(",");
Put put = new Put(Bytes.toBytes((eventattend[0]+eventattend[1]+eventattend[2])));
put.addColumn("euat".getBytes(),"eventid".getBytes(),Bytes.toBytes(eventattend[0]));
put.addColumn("euat".getBytes(),"friendid".getBytes(),Bytes.toBytes(eventattend[1]));
put.addColumn("euat".getBytes(),"state".getBytes(),Bytes.toBytes(eventattend[2]));
datas.add(put);
}
num = num + datas.size();
System.out.println("---------------------------------num:" + num);
if (datas.size() != 0)
table.put(datas);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}