IoTDB

IoTDB 是最早由清华大学发起的开源时序数据库项目,现已经是 Apache 的顶级项目。IoTDB 可以为用户提供数据收集、存储和分析等服务。由于其轻量级架构、高性能和高可用的特性,以及与 Hadoop 和 Spark 生态的无缝集成,满足了工业 IoT 领域中海量数据存储、高吞吐量数据写入和复杂数据查询分析的需求。

Mqtt 服务器可以直接将数据存储到 IoTDB 中,配置起来非常简单。

这里有一些应用场景,最后一个例子中,还带有边缘计算的意思。

  • 可以研究更新一下,这个做的还是不错的。有边缘计算的概念

应用制造

  • 车辆网
  • 智能工厂
  • 设备测试制造

1、快速入手

入手非常简单,估计 7 天就能熟练掌握了。直接看文档就可以了,都是中文的,很容易理解。这里就是有一些提示。

1.1 快速部署集群

按照官方的文档,3 分钟就弄好了。在及实际过程中,需要往集群里面添加一些数据,然后故意删除某些节点的数据,然后看看还好用不。

  • 如果以前启动了集群,那么重启后,就必须把所有的节点都启动,否则集群不能启动。

1.2 元数据模板

按照文档的做法没有问题,但是在实际过程中,可以用一段时间后,会添加一些一些项目,这时候会修改元数据模板,修改时候数据不会不会丢失,这个需要自己计算一下。

1.3 MQTT

iotDb 自带 Mqtt 服务,自带的 Mqtt 服务只支持 MQTT v3.1(OASIS 标准)协议。另外不知道自带的 Mqtt 服务效果如何。

所以可以自己做一个实验,不启动自带的服务,而是连接到一个外部的 Mqtt 服务上.

1.4 Zeppelin

参考文档:Zeppelin-IoTDB 集成的方法

前提条件

  • 已经安装了 IotDb
  • 编译了 zeppelin-interpreter

编译 zeppelin-interpreter 的步骤,下载 iotdb 的原代码,然后在根目录执行下面的语句。

mvn clean package -pl iotdb-connector/zeppelin-interpreter -am -DskipTests -P get-jar-with-dependencies

如果用的是 idea,可以使用 idea 的 maven:/home/fanhl/ideaIU-2021.2.3/plugins/maven/lib/maven3/bin/

最后生成的文件是:zeppelin-iotdb-1.3.0-SNAPSHOT-jar-with-dependencies.jar

具体安装

step1: 下载安装包。下载 Zeppelinopen in new window 并解压二进制文件。推荐下载 netinstopen 二进制包,此包由于未编译不相关的 interpreter,因此大小相对较小。

step2: 解压软件包

tar -xzvf zeppelin-0.10.1-bin-netinst.tgz -C ./zeppelin
# 可以将解压出来的文件,移动到父亲目录上
mv * -t ../

step3: 让局域网的机器可以访问

cd conf
cp zeppelin-site.xml.template zeppelin-site.xml
vim zeppelin-site.xml

把下面的地址127.0.0.1修改成0.0.0.0

<property>
<name>zeppelin.server.addr</name>
<value>127.0.0.1</value>
<description>Server binding address</description>
</property>

step4: 复制 jar 文件

进入 zeppelin 目录

mkdir interpreter/iotdb
# 下面的jar文件是自己编译的,路径需要修改成自己的路径
cp /media/sf_share/zeppelin-iotdb-1.3.0-SNAPSHOT-jar-with-dependencies.jar interpreter/iotdb

step5:启动服务

#启动iotdb 进入iotdb目录
./cluster0/sbin/start-confignode.sh -d
./cluster0/sbin/start-datanode.sh -d
# 启动zeppelin
./bin/zeppelin-daemon.sh start

step6:验证结果

输入: http://1277.0.0.1:8080,然后点击 create New Note 就可以了。

​ 如果你的 zeppelin 与 iotdb 不在同一台机器上,可以输入这个链接 http://127.0.0.1:8080/#/interpreter,来配置相关参数

step7:测试入门

我们提供了一些简单的 SQL 来展示 Zeppelin-IoTDB 解释器的使用:

CREATE DATABASE root.ln.wf01.wt01;
CREATE TIMESERIES root.ln.wf01.wt01.status WITH DATATYPE=BOOLEAN, ENCODING=PLAIN;
CREATE TIMESERIES root.ln.wf01.wt01.temperature WITH DATATYPE=FLOAT, ENCODING=PLAIN;
CREATE TIMESERIES root.ln.wf01.wt01.hardware WITH DATATYPE=INT32, ENCODING=PLAIN;
INSERT INTO root.ln.wf01.wt01 (timestamp, temperature, status, hardware)
VALUES (1, 1.1, false, 11);
INSERT INTO root.ln.wf01.wt01 (timestamp, temperature, status, hardware)
VALUES (2, 2.2, true, 22);
INSERT INTO root.ln.wf01.wt01 (timestamp, temperature, status, hardware)
VALUES (3, 3.3, false, 33);
INSERT INTO root.ln.wf01.wt01 (timestamp, temperature, status, hardware)
VALUES (4, 4.4, false, 44);
INSERT INTO root.ln.wf01.wt01 (timestamp, temperature, status, hardware)
VALUES (5, 5.5, false, 55);
SELECT *
FROM root.ln.wf01.wt01
WHERE time >= 1
AND time <= 6;

2、常见查询

查询某车辆一天内发生里程变化的时刻

select *
from root.db.车辆Id
where time between now()-1d and now()
and DIFF(mileage)!=0

统计时间段内某台车的定位异常天数

连续为无效的次数大于 10 次(即 GPS 经纬度连续为 0 的次数大于 10 次),算一次定位异常。

select count(abnormal_value_count) as abnormal_day_count
from(
select COUNT_IF(latitude=0 and longitude=0,KEEP>10,ignoreNull=true) as abnormal_value_count
from root.sg.车辆ID
group by ([开始时间,结束时间),1d)
)
where abnormal_value_count>0

找出所有车辆休眠时间的首条数据,并计算出时检差

当 gw_nm 字段不是 1 或者 0 时,都认为车处于休眠时间

SELECT time as start_item_time,
__endtime as end_item_time
DATEDIFF(max_time(gw_nm),min_time(gw_nm),"ss") as sleep_diff
from root.db.**
GROUP BY CONDITION(gw_nm not in ('1','0'),KEEP>=2,ignoreNull=true)
ALIGN BY DEVICE

取出持续 60 秒以上的连续信号的第一条和最后一条数据

上面的标题也可以描述为:找到车辆每一段行程出现最频繁的驾驶模式,以及每段驾驶模式的首末条数据

信号连续的定义为,根据时间戳排序后,一台车的前后两条数据的时间差不超过 24 秒

假设数据结构如下

create table vehicli_table(
item_time timestamp,
vin varchar(32),
trip_id int,
count varchar(32),
city varchar(32),
province varchar(32),
ven_spd int,
latitude flocat,
longitude float,
hcu_drv_mod varchar(32)
)

trip_id 为同一值,表示这些数据属于同一行程,与上面出现不同值表示不同形成。

select __endTime,
mode_with_filter(hcu_drv_mod,hcu_drv_mod!='-') as driving_mode,
first_value(count) as start_county,
last_value(count) as end_county,
first_value(city) as start_city,
last_value(city) as end_city,
first_value(province) as start_province,
last_value(province) as end_province,
max(veh_spd) as max_speed,
avg(veh_spd) as avg_speed,
first_value(latitude) as start_latitude,
last_value(latitude) as end_latitude,
first_value(longitude) as start_longitude,
last_value(longitude) as end_longitude,
from root.db.**
where trip_id is not null
group by variation(trip_id,0);
align by device

设备最新状态

select last *
from root
[ where time >=T ]