一、Apache Kudu
Apache Kudu
是一种列式分布式存储引擎,它的设计目标是支持快速分析和高吞吐量的数据访问,同时也能够支持低延迟、实时查询和更新操作。它被称为Hadoop
生态系统的新一代存储层,能够与Apache Spark、Apache Impala、Apache Hive
等大数据处理框架集成使用。
Kudu
在设计上采用了多版本并发控制(MVCC
)技术,支持ACID
事务,并提供了灵活的数据复制机制,可以在不同的数据中心、机房之间实现数据同步和备份,确保数据的高可靠性和可用性。
Kudu
提供了对多种数据类型的支持,包括结构化数据和半结构化数据,支持复杂的数据模型和数据访问模式,例如时间序列数据、地理空间数据等。此外,Kudu
还提供了多种API
和工具,如Java、Python、SQL
等API
以及Impala Shell、Spark Shell
等工具,方便用户进行数据分析和查询操作。
Apache Kudu
部署可参考下面通过使用 k8s
快速部署方案:
K8s 部署 Apache Kudu 集群
二、Java Api 操作 kudu
这里创建一个 SpringBoot
项目,在 pom
中引入下面依赖:
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client-tools</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>1.14.0</version>
</dependency>
下面在配置文件中增加配置,将 kudu
信息放置在此:
kudu:
master: 10.218.222.13:30751
defaultSocketReadTimeout: 10000
创建 KuduClient
并注入 Spring
容器中:
@Configuration
public class KuduConfig {
@Value("${kudu.master}")
private String master;
@Value("${kudu.defaultSocketReadTimeout}")
private Integer defaultSocketReadTimeout;
@Bean
public KuduClient kuduClient() {
KuduClient.KuduClientBuilder kuduClientBuilder = new KuduClient.KuduClientBuilder(master);
kuduClientBuilder.defaultSocketReadTimeoutMs(defaultSocketReadTimeout);
return kuduClientBuilder.build();
}
}
1. 创建表
1.1 创建表,使用 hash 分区
@Resource
KuduClient kuduClient;
@Test
void createTable() throws KuduException {
String tableName = "user";
if (!kuduClient.tableExists(tableName)) {
//构建创建表的schema信息-----就是表的字段和类型
ArrayList<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).build());
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("age", Type.INT32).build());
Schema schema = new Schema(columnSchemas);
//指定创建表的相关属性
CreateTableOptions tableOptions = new CreateTableOptions();
//设置副本数
tableOptions.setNumReplicas(3);
ArrayList<String> partitionList = new ArrayList<String>();
// 指定kudu表的分区字段是什么
partitionList.add("id");
// 按照 id.hashcode % 分区数 = 分区号
tableOptions.addHashPartitions(partitionList, 6);
kuduClient.createTable(tableName, schema, tableOptions);
}
}
1.2 创建表,使用范围分区
@Test
public void testRangePartition() throws KuduException {
String tableName = "user1";
if (!kuduClient.tableExists(tableName)) {
//设置表的schema
LinkedList<ColumnSchema> columnSchemas = new LinkedList<ColumnSchema>();
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).build());
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("age", Type.INT32).build());
//创建schema
Schema schema = new Schema(columnSchemas);
//创建表时提供的所有选项
CreateTableOptions tableOptions = new CreateTableOptions();
//设置副本数
tableOptions.setNumReplicas(3);
//设置范围分区的规则
LinkedList<String> parcols = new LinkedList<String>();
parcols.add("id");
//设置按照那个字段进行range分区
tableOptions.setRangePartitionColumns(parcols);
// 设置分区的范围 , 10个范围,每个范围放 10000
int count = 0;
for (int i = 0; i < 10; i++) {
//范围开始
PartialRow lower = schema.newPartialRow();
lower.addInt("id", count);
//范围结束
PartialRow upper = schema.newPartialRow();
count += 10000;
upper.addInt("id", count);
//设置每一个分区的范围
tableOptions.addRangePartition(lower, upper);
}
kuduClient.createTable("student", schema, tableOptions);
}
}
1.3 创建表,同时使用hash分区和range分区
/**
* 哈希分区有利于提高写入数据的吞吐量,而范围分区可以避免tablet无限增长问题,
* hash分区和range分区结合,可以极大的提升kudu的性能
*/
@Test
public void testMultilevelPartition() throws KuduException {
String tableName = "user2";
if (!kuduClient.tableExists(tableName)) {
//设置表的schema
LinkedList<ColumnSchema> columnSchemas = new LinkedList<ColumnSchema>();
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).build());
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("age", Type.INT32).build());
//创建schema
Schema schema = new Schema(columnSchemas);
//创建表时提供的所有选项
CreateTableOptions tableOptions = new CreateTableOptions();
//设置副本数
tableOptions.setNumReplicas(1);
//分区字段
LinkedList<String> parcols = new LinkedList<String>();
parcols.add("id");
//hash分区
tableOptions.addHashPartitions(parcols, 5);
//range分区
int count = 0;
for (int i = 0; i < 10; i++) {
PartialRow lower = schema.newPartialRow();
lower.addInt("id", count);
count += 10000;
PartialRow upper = schema.newPartialRow();
upper.addInt("id", count);
tableOptions.addRangePartition(lower, upper);
}
kuduClient.createTable("cat", schema, tableOptions);
}
}
2. 写入数据
@Test
public void insertTable() throws KuduException {
String tableName = "user";
//构建 kuduSession 对象
KuduSession kuduSession = kuduClient.newSession();
kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
//构建 Operation 的子类实例对象
KuduTable kuduTable = kuduClient.openTable(tableName);
// 写入数据
for (int i = 1; i <= 10; i++) {
Insert insert = kuduTable.newInsert();
PartialRow row = insert.getRow();
row.addInt("id", i);
row.addString("name", "name-" + i);
row.addInt("age", 20 + i);
// 提交数据
kuduSession.apply(insert);
}
}
3. 查询数据
@Test
public void queryData() throws KuduException {
String tableName = "user";
//构建查询扫描器
KuduScanner.KuduScannerBuilder kuduScannerBuilder = kuduClient.newScannerBuilder(kuduClient.openTable(tableName));
// 查询的列
ArrayList<String> columnsList = new ArrayList<String>();
columnsList.add("id");
columnsList.add("name");
columnsList.add("age");
kuduScannerBuilder.setProjectedColumnNames(columnsList);
// 条件筛选
kuduScannerBuilder.addPredicate(
KuduPredicate.newComparisonPredicate(
new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).build(),
KuduPredicate.ComparisonOp.EQUAL,
"张三"));
// 范围查询
kuduScannerBuilder.addPredicate(
KuduPredicate.newComparisonPredicate(
new ColumnSchema.ColumnSchemaBuilder("age", Type.INT32).build(),
KuduPredicate.ComparisonOp.LESS,
50));
// in
kuduScannerBuilder.addPredicate(KuduPredicate.newInListPredicate(
new ColumnSchema.ColumnSchemaBuilder("id", Type.STRING).build(),
ImmutableList.of(1, 2, 3, 4)));
kuduScannerBuilder.limit(100);
//启用故障容错模式,如果设置为true,则在发生错误时,会尝试重新连接并重试操作。
kuduScannerBuilder.setFaultTolerant(true);
//返回结果集
KuduScanner kuduScanner = kuduScannerBuilder.build();
//遍历数据
while (kuduScanner.hasMoreRows()) {
RowResultIterator rowResults = kuduScanner.nextRows();
while (rowResults.hasNext()) {
RowResult row = rowResults.next();
int id = row.getInt("id");
String name = row.getString("name");
int age = row.getInt("age");
System.out.println("id=" + id + " name=" + name + " age=" + age);
}
}
}
4. 修改数据
@Test
public void updateData() throws KuduException {
String tableName = "user";
KuduSession kuduSession = kuduClient.newSession();
kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
KuduTable kuduTable = kuduClient.openTable(tableName);
//Update update = kuduTable.newUpdate();
//如果id存在就表示修改,不存在就新增
Upsert upsert = kuduTable.newUpsert();
PartialRow row = upsert.getRow();
row.addInt("id", 1);
row.addString("name", "name-100");
row.addInt("age", 20);
row.addInt("sex", 0);
kuduSession.apply(upsert);
}
5. 删除数据
@Test
public void deleteData() throws KuduException {
String tableName = "user";
KuduSession kuduSession = kuduClient.newSession();
kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
KuduTable kuduTable = kuduClient.openTable(tableName);
Delete delete = kuduTable.newDelete();
PartialRow row = delete.getRow();
row.addInt("id", 100);
kuduSession.apply(delete);
}
6. 删除表
@Test
public void dropTable() throws KuduException {
String tableName = "student";
if (kuduClient.tableExists(tableName)) {
kuduClient.deleteTable(tableName);
}
}
三、Spark 操作 kudu
在 pom
中新增依赖:
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-spark3_2.12</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.1</version>
</dependency>
1. KuduContext
在 Spark
中 kudu
提供了 KuduContext
可进行基本的数据操作:
例如:使用 KuduContext
创建表:
class KuduSparkTests {
public static void main(String[] args) throws AnalysisException, KuduException {
String master = "10.218.222.13:30751";
SparkSession spark = SparkSession
.builder()
.appName("sparksql")
.master("local[*]")
.getOrCreate();
SparkContext sc = spark.sparkContext();
sc.setLogLevel("warn");
KuduContext kuduContext = new KuduContext(master, sc);
// 创建表
String tableName = "user1";
if (!kuduContext.tableExists(tableName)) {
// 声明字段
List<ColumnSchema> columns = Arrays.asList(
new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build(),
new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).build()
);
Schema schema = new Schema(columns);
CreateTableOptions tableOptions = new CreateTableOptions();
tableOptions.setNumReplicas(3);
ArrayList<String> partitionList = new ArrayList<String>();
// 指定kudu表的分区字段是什么
partitionList.add("id");
// 按照 id.hashcode % 分区数 = 分区号
tableOptions.addHashPartitions(partitionList, 6);
kuduContext.createTable(tableName, schema, tableOptions);
}
spark.stop();
}
}
基于 KuduContext
也可以创建原生的 Session
,使用原生的方式操作数据:
KuduTable table = kuduContext.syncClient().openTable("user");
KuduSession session = kuduContext.syncClient().newSession();
2. SparkSQL
相比 Api
的操作,kudu
还可以使用 SparkSQL
进行操作:
class KuduSparkTests {
public static void main(String[] args) throws AnalysisException, KuduException {
String master = "10.218.222.13:30751";
SparkSession spark = SparkSession
.builder()
.appName("sparksql")
.master("local[*]")
.getOrCreate();
String tableName = "user1";
Dataset<Row> kuduData = spark.read().format("kudu")
.option("kudu.master", master)
.option("kudu.table", tableName)
.load();
kuduData.printSchema();
kuduData.show();
kuduData.createGlobalTempView("user");
spark.sql("select count(*) from user").show();
spark.stop();
}
}