1、引入依赖
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-session</artifactId>
<version>0.14.0-preview1</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.6.3</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
2、iotdb 配置工具类
@Log4j2
@Component
@Configuration
public class IotDBSessionConfig {
private static Session session;
private static final String LOCAL_HOST = "127.0.0.1"; //改为自己服务器ip
@Bean
public Session getSession() throws IoTDBConnectionException, StatementExecutionException {
if (session == null) {
log.info("正在连接iotdb.......");
session = new Session.Builder().host(LOCAL_HOST).port(6667).username("root").password("root").version(Version.V_0_13).build();
session.open(false);
session.setFetchSize(100);
log.info("iotdb连接成功~");
// 设置时区
session.setTimeZone("+08:00");
}
return session;
}
}
3、入参
@Data
public class IotDbParam {
/***
* 产品PK
*/
private String pk;
/***
* 设备号
*/
private String sn;
/***
* 时间
*/
private Long time;
/***
* 实时呼吸
*/
private String breath;
/***
* 实时心率
*/
private String heart;
/***
* 查询开始时间
*/
private String startTime;
/***
* 查询结束时间
*/
private String endTime;
}
4、响应
@Data
public class IotDbResult {
/***
* 时间
*/
private String time;
/***
* 产品PK
*/
private String pk;
/***
* 设备号
*/
private String sn;
/***
* 实时呼吸
*/
private String breath;
/***
* 实时心率
*/
private String heart;
}
5、控制层
@Log4j2
@RestController
public class IotDbController {
@Resource
private IotDbServer iotDbServer;
@Resource
private IotDBSessionConfig iotDBSessionConfig;
/**
* 插入数据
* @param iotDbParam
*/
@PostMapping("/api/device/insert")
public ResponseData insert(@RequestBody IotDbParam iotDbParam) throws StatementExecutionException, ServerException, IoTDBConnectionException {
iotDbServer.insertData(iotDbParam);
return ResponseData.success();
}
/**
* 查询数据
* @param iotDbParam
*/
@PostMapping("/api/device/queryData")
public ResponseData queryDataFromIotDb(@RequestBody IotDbParam iotDbParam) throws Exception {
return ResponseData.success(iotDbServer.queryDataFromIotDb(iotDbParam));
}
/**
* 删除分组
* @return
*/
@PostMapping("/api/device/deleteGroup")
public ResponseData deleteGroup() throws StatementExecutionException, IoTDBConnectionException {
iotDBSessionConfig.deleteStorageGroup("root.a1eaKSRpRty");
iotDBSessionConfig.deleteStorageGroup("root.smartretirement");
return ResponseData.success();
}
}
6、接口
public interface IotDbServer {
/**
* 添加数据
*/
void insertData(IotDbParam iotDbParam) throws StatementExecutionException, ServerException, IoTDBConnectionException;
/**
* 查询数据
*/
Object queryDataFromIotDb(IotDbParam iotDbParam) throws Exception;
}
7、实现层
@Log4j2
@Service
public class IotDbServerImpl implements IotDbServer {
@Resource
private IotDBSessionConfig iotDBSessionConfig;
@Override
public void insertData(IotDbParam iotDbParam) throws StatementExecutionException, ServerException, IoTDBConnectionException {
// iotDbParam: 模拟设备上报消息
// bizkey: 业务唯一key PK :产品唯一编码 SN:设备唯一编码
String deviceId = "root.bizkey."+ iotDbParam.getPk() + "." + iotDbParam.getSn();
// 将设备上报的数据存入数据库(时序数据库)
List<String> measurementsList = new ArrayList<>();
measurementsList.add("heart");
measurementsList.add("breath");
List<String> valuesList = new ArrayList<>();
valuesList.add(String.valueOf(iotDbParam.getHeart()));
valuesList.add(String.valueOf(iotDbParam.getBreath()));
iotDBSessionConfig.insertRecord(deviceId, iotDbParam.getTime(), measurementsList, valuesList);
}
@Override
public List<IotDbResult> queryDataFromIotDb(IotDbParam iotDbParam) throws Exception {
List<IotDbResult> iotDbResultList = new ArrayList<>();
if (null != iotDbParam.getPk() && null != iotDbParam.getSn()) {
String sql = "select * from root.bizkey."+ iotDbParam.getPk() +"." + iotDbParam.getSn() + " where time >= "
+ iotDbParam.getStartTime() + " and time < " + iotDbParam.getEndTime();
SessionDataSet sessionDataSet = iotDBSessionConfig.query(sql);
List<String> columnNames = sessionDataSet.getColumnNames();
List<String> titleList = new ArrayList<>();
// 排除Time字段 -- 方便后面后面拼装数据
for (int i = 1; i < columnNames.size(); i++) {
String[] temp = columnNames.get(i).split("\\.");
titleList.add(temp[temp.length - 1]);
}
// 封装处理数据
packagingData(iotDbParam, iotDbResultList, sessionDataSet, titleList);
} else {
log.info("PK或者SN不能为空!!");
}
return iotDbResultList;
}
/**
* 封装处理数据
* @param iotDbParam
* @param iotDbResultList
* @param sessionDataSet
* @param titleList
* @throws StatementExecutionException
* @throws IoTDBConnectionException
*/
private void packagingData(IotDbParam iotDbParam, List<IotDbResult> iotDbResultList, SessionDataSet sessionDataSet, List<String> titleList)
throws StatementExecutionException, IoTDBConnectionException {
int fetchSize = sessionDataSet.getFetchSize();
if (fetchSize > 0) {
while (sessionDataSet.hasNext()) {
IotDbResult iotDbResult = new IotDbResult();
RowRecord next = sessionDataSet.next();
List<Field> fields = next.getFields();
String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(next.getTimestamp());
iotDbResult.setTime(timeString);
Map<String, String> map = new HashMap<>();
for (int i = 0; i < fields.size(); i++) {
Field field = fields.get(i);
// 这里的需要按照类型获取
map.put(titleList.get(i), field.getObjectValue(field.getDataType()).toString());
}
iotDbResult.setTime(timeString);
iotDbResult.setPk(iotDbParam.getPk());
iotDbResult.setSn(iotDbParam.getSn());
iotDbResult.setHeart(map.get("heart"));
iotDbResult.setBreath(map.get("breath"));
iotDbResultList.add(iotDbResult);
}
}
}
}
8、测试api
1.添加一条记录
接口:localhost:8080/api/device/insert
入参:
{
"time":1660573444672,
"pk":"a1TTQK9TbKT",
"sn":"SN202208120945QGJLD",
"breath":"17",
"heart":"68"
}
2.根据SQL查询时间区间记录
接口:localhost:8080/api/device/queryData
入参:
{
"pk":"a1TTQK9TbKT",
"sn":"SN202208120945QGJLD",
"startTime":"2022-08-14 00:00:00",
"endTime":"2022-08-16 00:00:00"
}
结果