IoTDB 是最早由清华大学发起的开源时序数据库项目,现已经是 Apache 的顶级项目。IoTDB 可以为用户提供数据收集、存储和分析等服务。由于其轻量级架构、高性能和高可用的特性,以及与 Hadoop 和 Spark 生态的无缝集成,满足了工业 IoT 领域中海量数据存储、高吞吐量数据写入和复杂数据查询分析的需求。
Mqtt 服务器可以直接将数据存储到 IoTDB 中,配置起来非常简单。
这里有一些应用场景,最后一个例子中,还带有边缘计算的意思。
应用制造
- 车辆网
- 智能工厂
- 设备测试制造
入手非常简单,估计 7 天就能熟练掌握了。直接看文档就可以了,都是中文的,很容易理解。这里就是有一些提示。
按照官方的文档,3 分钟就弄好了。在及实际过程中,需要往集群里面添加一些数据,然后故意删除某些节点的数据,然后看看还好用不。
按照文档的做法没有问题,但是在实际过程中,可以用一段时间后,会添加一些一些项目,这时候会修改元数据模板,修改时候数据不会不会丢失,这个需要自己计算一下。
iotDb 自带 Mqtt 服务,自带的 Mqtt 服务只支持 MQTT v3.1(OASIS 标准)协议。另外不知道自带的 Mqtt 服务效果如何。
所以可以自己做一个实验,不启动自带的服务,而是连接到一个外部的 Mqtt 服务上.
参考文档:Zeppelin-IoTDB 集成的方法。
编译 zeppelin-interpreter 的步骤,下载 iotdb 的原代码,然后在根目录执行下面的语句。
mvn clean package -pl iotdb-connector/zeppelin-interpreter -am -DskipTests -P get-jar-with-dependencies
如果用的是 idea,可以使用 idea 的 maven:/home/fanhl/ideaIU-2021.2.3/plugins/maven/lib/maven3/bin/
最后生成的文件是:zeppelin-iotdb-1.3.0-SNAPSHOT-jar-with-dependencies.jar
step1: 下载安装包。下载 Zeppelinopen in new window 并解压二进制文件。推荐下载 netinstopen 二进制包,此包由于未编译不相关的 interpreter,因此大小相对较小。
step2: 解压软件包
tar -xzvf zeppelin-0.10.1-bin-netinst.tgz -C ./zeppelin# 可以将解压出来的文件,移动到父亲目录上mv * -t ../
step3: 让局域网的机器可以访问
cd confcp zeppelin-site.xml.template zeppelin-site.xmlvim zeppelin-site.xml
把下面的地址127.0.0.1
修改成0.0.0.0
<property><name>zeppelin.server.addr</name><value>127.0.0.1</value><description>Server binding address</description></property>
step4: 复制 jar 文件
进入 zeppelin 目录
mkdir interpreter/iotdb# 下面的jar文件是自己编译的,路径需要修改成自己的路径cp /media/sf_share/zeppelin-iotdb-1.3.0-SNAPSHOT-jar-with-dependencies.jar interpreter/iotdb
step5:启动服务
#启动iotdb 进入iotdb目录./cluster0/sbin/start-confignode.sh -d./cluster0/sbin/start-datanode.sh -d# 启动zeppelin./bin/zeppelin-daemon.sh start
step6:验证结果
输入: http://1277.0.0.1:8080
,然后点击 create New Note
就可以了。
如果你的 zeppelin 与 iotdb 不在同一台机器上,可以输入这个链接 http://127.0.0.1:8080/#/interpreter
,来配置相关参数
step7:测试入门
我们提供了一些简单的 SQL 来展示 Zeppelin-IoTDB 解释器的使用:
CREATE DATABASE root.ln.wf01.wt01;CREATE TIMESERIES root.ln.wf01.wt01.status WITH DATATYPE=BOOLEAN, ENCODING=PLAIN;CREATE TIMESERIES root.ln.wf01.wt01.temperature WITH DATATYPE=FLOAT, ENCODING=PLAIN;CREATE TIMESERIES root.ln.wf01.wt01.hardware WITH DATATYPE=INT32, ENCODING=PLAIN;INSERT INTO root.ln.wf01.wt01 (timestamp, temperature, status, hardware)VALUES (1, 1.1, false, 11);INSERT INTO root.ln.wf01.wt01 (timestamp, temperature, status, hardware)VALUES (2, 2.2, true, 22);INSERT INTO root.ln.wf01.wt01 (timestamp, temperature, status, hardware)VALUES (3, 3.3, false, 33);INSERT INTO root.ln.wf01.wt01 (timestamp, temperature, status, hardware)VALUES (4, 4.4, false, 44);INSERT INTO root.ln.wf01.wt01 (timestamp, temperature, status, hardware)VALUES (5, 5.5, false, 55);SELECT *FROM root.ln.wf01.wt01WHERE time >= 1AND time <= 6;
select *from root.db.车辆Idwhere time between now()-1d and now()and DIFF(mileage)!=0
连续为无效的次数大于 10 次(即 GPS 经纬度连续为 0 的次数大于 10 次),算一次定位异常。
select count(abnormal_value_count) as abnormal_day_countfrom(select COUNT_IF(latitude=0 and longitude=0,KEEP>10,ignoreNull=true) as abnormal_value_countfrom root.sg.车辆IDgroup by ([开始时间,结束时间),1d))where abnormal_value_count>0
当 gw_nm 字段不是 1 或者 0 时,都认为车处于休眠时间
SELECT time as start_item_time,__endtime as end_item_timeDATEDIFF(max_time(gw_nm),min_time(gw_nm),"ss") as sleep_difffrom root.db.**GROUP BY CONDITION(gw_nm not in ('1','0'),KEEP>=2,ignoreNull=true)ALIGN BY DEVICE
上面的标题也可以描述为:找到车辆每一段行程出现最频繁的驾驶模式,以及每段驾驶模式的首末条数据
信号连续的定义为,根据时间戳排序后,一台车的前后两条数据的时间差不超过 24 秒
假设数据结构如下
create table vehicli_table(item_time timestamp,vin varchar(32),trip_id int,count varchar(32),city varchar(32),province varchar(32),ven_spd int,latitude flocat,longitude float,hcu_drv_mod varchar(32))
trip_id 为同一值,表示这些数据属于同一行程,与上面出现不同值表示不同形成。
select __endTime,mode_with_filter(hcu_drv_mod,hcu_drv_mod!='-') as driving_mode,first_value(count) as start_county,last_value(count) as end_county,first_value(city) as start_city,last_value(city) as end_city,first_value(province) as start_province,last_value(province) as end_province,max(veh_spd) as max_speed,avg(veh_spd) as avg_speed,first_value(latitude) as start_latitude,last_value(latitude) as end_latitude,first_value(longitude) as start_longitude,last_value(longitude) as end_longitude,from root.db.**where trip_id is not nullgroup by variation(trip_id,0);align by device
select last *from root[ where time >=T ]