前言
最近在做有关地理时空大数据的实验,本文将介绍如何利用geowave框架,将矢量数据导入到HBase或Accumulo等NoSQL数据库中。
软件版本:
Hadoop: 2.10.2
Zookeeper: 3.6.4
geowave: 1.2.0
Accumulo:1.9.3
HBase: 1.4.0
Java: 1.8
准备工作
从GeoWave官网下载geowave-hbase-1.2.0-apache.jar导入到HBase的lib文件夹下。(Accumulo数据库导入geowave-accumulo-1.2.0-apache-accumulo1.7.jar包)
代码
1、引入依赖
<dependency>
<groupId>org.locationtech.geowave</groupId>
<artifactId>geowave-datastore-hbase</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.locationtech.geowave</groupId>
<artifactId>geowave-adapter-vector</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.locationtech.geowave</groupId>
<artifactId>geowave-format-vector</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.locationtech.geowave</groupId>
<artifactId>geowave-datastore-accumulo</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>4.6.10</version>
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-core</artifactId>
<version>1.9.3</version>
</dependency>
2、HBase数据库导入矢量数据
本文选取AIS数据,文件为JSON格式如下:
首先需要写代码构建SimpleFeatureTypeBuilder对象,该对象用于创建SimpleFeatureType,其实 用于定义你的矢量数据的Geometry类型,有哪些属性字段。
public static SimpleFeatureTypeBuilder getSimpleFeatureBuilder (String typeName) throws IOException, JSONException {
// 创建 SimpleFeatureTypeBuilder 对象
SimpleFeatureTypeBuilder featureTypeBuilder = new SimpleFeatureTypeBuilder();
AttributeTypeBuilder attributeBuilder = new AttributeTypeBuilder();
featureTypeBuilder.add(attributeBuilder.binding(Point.class).nillable(false).buildDescriptor("the_geom"));
// 添加属性
featureTypeBuilder.add("uuid", String.class);
featureTypeBuilder.add("mmsi", Integer.class);
featureTypeBuilder.add("timestamp", Date.class);
featureTypeBuilder.add("system_timestamp", Date.class);
featureTypeBuilder.add("nav_status", Integer.class);
featureTypeBuilder.add("rot", Double.class);
featureTypeBuilder.add("sog", Double.class);
featureTypeBuilder.add("pos_acc", Double.class);
featureTypeBuilder.add("longitude", Double.class);
featureTypeBuilder.add("latitude", Double.class);
featureTypeBuilder.add("cog", Double.class);
featureTypeBuilder.add("true_head", Double.class);
featureTypeBuilder.add("eta", String.class);
featureTypeBuilder.add("destid", Integer.class);
featureTypeBuilder.add("dest", String.class);
featureTypeBuilder.add("srcid", Integer.class);
featureTypeBuilder.add("distance", Double.class);
featureTypeBuilder.add("speed", Double.class);
featureTypeBuilder.add("draught", Double.class);
featureTypeBuilder.add("ship_type", Integer.class);
featureTypeBuilder.setCRS(DefaultGeographicCRS.WGS84);
featureTypeBuilder.setName(typeName);
return featureTypeBuilder;
}
导入数据,这里的typeName参数是定义矢量数据名称,indexName参数定义索引名称
public static void IngestData(String typeName, String indexName) throws
IOException, JSONException, ParseException {
HBaseRequiredOptions hBaseRequiredOptions = new HBaseRequiredOptions();
//Zookeeper IP
hBaseRequiredOptions.setZookeeper("localhost:2181");
HBaseDataStore hBaseDataStore = (HBaseDataStore)
DataStoreFactory.createDataStore(hBaseRequiredOptions);
// JSON 数据 文件路径
String filePath = "D:\\轨迹数据\\5h.json";
long startTimeStamp = System.currentTimeMillis();
// 转换 JSON 文件为 SimpleFeatureType
SimpleFeatureTypeBuilder featureTypeBuilder = getSimpleFeatureBuilder(typeName);
SimpleFeatureType pointType = featureTypeBuilder.buildFeatureType();
//创建SimpleFeatureBuilder
SimpleFeatureBuilder pointFeatureBuilder = new SimpleFeatureBuilder(pointType);
// Create an adapter for point type
FeatureDataAdapter pointTypeAdapter = new FeatureDataAdapter(pointType);
//创建索引
Index spatialTemporalIndex = new SpatialTemporalIndexBuilder()
.setMaxDuplicates(-1)
.setNumPartitions(3)
.setPeriodicity(TemporalBinningStrategy.Unit.DAY)
.setPartitionStrategy(IndexPluginOptions.PartitionStrategy.HASH)
.setName(indexName).createIndex();
//Add the point type to the data store in the spatial index
hbaseStore.addType(pointTypeAdapter, spatialTemporalIndex);
hbaseStore.getIndexStore().addIndex(spatialTemporalIndex);
Writer<SimpleFeature> writer = hbaseStore.createWriter(pointTypeAdapter.getTypeName());
// 读取 JSON 文件
BufferedReader bufferedReader = new BufferedReader(new FileReader(filePath));
String line;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
while ((line = bufferedReader.readLine()) != null) {
JSONObject jsonObject = JSONObject.parseObject(line);
// Write some features to the data store
GeometryFactory factory = new GeometryFactory();
String uuId = jsonObject.get("uuid").toString();
String mmsi = jsonObject.get("mmsi").toString();
pointFeatureBuilder.set("the_geom", factory.createPoint(new Coordinate(Double.parseDouble(jsonObject.get("longitude").toString()),
Double.parseDouble(jsonObject.get("latitude").toString()))));
pointFeatureBuilder.set("uuid", uuId);
pointFeatureBuilder.set("mmsi", mmsi);
String timestamp_str = jsonObject.get("timestamp").toString();
pointFeatureBuilder.set("timestamp", sdf.parse(timestamp_str));
String system_timestamp_str = jsonObject.get("system_timestamp").toString();
pointFeatureBuilder.set("system_timestamp", sdf.parse(system_timestamp_str));
pointFeatureBuilder.set("nav_status", jsonObject.get("nav_status"));
pointFeatureBuilder.set("rot", jsonObject.get("rot"));
pointFeatureBuilder.set("sog", jsonObject.get("sog"));
pointFeatureBuilder.set("pos_acc", jsonObject.get("pos_acc"));
pointFeatureBuilder.set("cog", jsonObject.get("cog"));
pointFeatureBuilder.set("true_head", jsonObject.get("true_head"));
pointFeatureBuilder.set("eta", jsonObject.get("eta"));
pointFeatureBuilder.set("destid", jsonObject.get("destid"));
pointFeatureBuilder.set("dest", jsonObject.get("dest"));
pointFeatureBuilder.set("srcid", jsonObject.get("srcid"));
pointFeatureBuilder.set("distance", jsonObject.get("distance"));
pointFeatureBuilder.set("speed", jsonObject.get("speed"));
pointFeatureBuilder.set("draught", jsonObject.get("draught"));
pointFeatureBuilder.set("ship_type", jsonObject.get("ship_type"));
//取UUID 前三位 + 后三位 + 船舶编号
String id = uuId.substring(0, 3) + uuId.substring(uuId.length() -3) + "-" + mmsi;
writer.write(pointFeatureBuilder.buildFeature(id));
}
System.out.println("ingest finished!!!!");
Long endTimeStamp = System.currentTimeMillis();
System.out.println("导入耗时:" + (endTimeStamp - startTimeStamp) + "毫秒");
bufferedReader.close();
writer.flush();
writer.close();
}
3、Accumulo数据库导入矢量数据
代码和上面基本相同,把HBaseDataStore换成AccumuloDataStore即可。