时序数据库全称为时间序列数据库。 即时间序列数据,按时间维度顺序记录且索引的数据。
时间序列数据主要由 电力行业
、化工行业
、气象行业
、地理信息
等各类型实时监测、检查与分析设备所采集、产生的数据,这些工业数据的典型特点是:产生频率快(每一个监测点一秒钟内可产生多条数据)、严重依赖于采集时间(每一条数据均要求对应唯一的时间)、测点多信息量大(常规的实时监测系统均有成千上万的监测点,监测点每秒钟都产生数据,每天产生几十GB的数据量)。
例如:每秒都会上报的汽车 GPS
点位、无人驾驶每秒都需要进行采集的车辆状态信息、证券交易、虚拟机容器以及程序的状态等;上述的数据都会带有很明显的时间序列,每时每刻都在进行采集,跟随时间进行改变,而且数据量很大
所有采集的数据都是时序的
每个采集点的数据源唯一
通常不会对数据进行删除、修改操作
数据存储一般有保留期限设置,按照日期进行删除
数据以写入为主,查询操作主要是按照指定时间段进行查询、分析
往往需要对存储的数据进行统计、聚合等进行实时计算
存储的数据量巨大
时序数据库一般有很高的数据压缩率,即使海量数据存储的情况下,也可以对数据进行压缩存放,存储成本很低,这得益于时序数据库存储方式,一般是采用 列式存储
,存储成本低;例如:https://www.taosdata.com/user-cases/3626.html
与普通数据库相比,写入性能
更好,即使在海量设备、测点
的情况下,依然能有很优秀的性能。
时序数据库的数据采集频率较快,存储的数据量也巨大。用户一般可以根据自己业务要求设置数据的保留期限,比如 10 年、50 年。 普通的数据建库不会有这种功能
索引结构不同,关系型数据库一般采用 B+
和 B
树;但在时序数据库中,写入明显高于查询,大多数是采用 LSM Tree(Log Structured Merge Tree),LSM Tree 通常能够承受比 Btree 更高的 写入吞吐量
TDengine 是一款开源、高性能、云原生的 时序数据库
,且针对物联网、车联网、工业互联网、金融、IT 运维等场景进行了优化。除核心的时序数据库功能外,TDengine 还提供 缓存、数据订阅、流式计算
等其它功能以降低系统复杂度及研发和运维成本。
开源、免费、性能高、文档详细(提供了中文)、sql语法(上手难度低)
与典型的 NoSQL 存储模型相比,Tdengine 将标签数据与时序数据完全分离存储,其中具有的优势:
本地开发环境部署需要依赖 c 环境的库,安装比较麻烦,生态还不是很完全,对java而言没有提供对应的 orm 框架,mybatis不支持需要使用pgsql的方言进行支持(对现有业务的改造很麻烦)
TDengine与InfluxDB的查询测试报告:https://www.taosdata.com/engineering/5969.html
TDengine与InfluxDB的写入测试报告:https://www.taosdata.com/engineering/3248.html
https://docs.taosdata.com/get-started/package/ 官网网站提供了很详细的安装文档,提供了多种安装方式,我这里采用 docker-compose
的方式进行安装
version: '3'
services:tdengine:image: tdengine/tdengine:3.0.1.5container_name: tdengine-serverhostname: tdengine-serverports:- "6030:6030"- "6041:6041"
# - "6043-6049:6043-6049" 提供给第三方的接口可以根据需要是否打开
# - "6043-6049:6043-6049/udp"volumes:- "/usr/local/soft/tdengine/conf/taos.cfg:/etc/taos/taos.cfg"
# - "/usr/local/soft/tdengine/data:/var/lib/taos" #数据目录
# - "/usr/local/soft/tdengine/log:/var/log/taos" #日志目录environment:TAOS_FQDN: tdengine-server
Tdegine
采用 FQDN
的方式进行解析,FQDN
由两部分组成 hostname.domain:port hostname是主机的名称,domain则是 Tdengine
所在的域名,解析时通过 domain
找到所在主机,通过 hostname
找到对应的节点;hostname
就是配置在 /etc/hosts
中,这样的目的时就算 ip地址改了,只需要修改 /etc/hosts
中的地址就行;如果是采用的安装包安装,这里将ip配置到 /etc/hosts
就行
# 配置tdengine-server(这里是直接配置容器的名称)
firstEp tdengine-server:6030# 配置tdengine的域名解析
fqdn tdengine-server# 服务端口
serverPort 6030
如果是安装包就到安装的路径下 bin
目录执行 taos
命令,如果是容器,进入容器直接执行 taos
就行,注意默认的账号密码是 taos
与 taosdata
如果不修改这里会自动填入
taos -u root -p #密码 taosdata
alter user root pass ‘root’ #修改密码
可以使用 TDengine 的自带工具 taosBenchmark 快速体验 TDengine 的写入速度,通过 taosBenchmark 可以快速插入1亿条数据,具体体验可以看看官方文档 https://docs.taosdata.com/get-started/docker/
如果使用 Java Connector 进行连接 taos 数据,需要使用到 taos.dll 的c语言库的支持,这里就需要在win10系统上面安装客户端,访问下面的连接进行客户端下载
https://www.taosdata.com/assets-download/3.0/TDengine-client-3.0.1.5-Windows-x64.exe
上面客户端安装成功之后,路径就是下面的结构,在安装成功之后会默认将 driver 路径下面的 taos.dll 文件拷贝到 win10 的系统库路径下面
打开 cfg/taos.cfg 文件,只需要将 firstEp 改成服务端的ip地址和端口就行了,注意 tdengine 是我配置在 hosts 中的,配置好就可以使用 taos.exe 来进行操作了
firstEp tdengine:6030
使用 Java Connector 进行操作,如果出现了包路径的问题,先检查一下 C:\Windows\System32 路径下是否存在 taos.dll
3.0.0
com.taosdata.jdbc taos-jdbcdriver ${tdengine.version}
public class TdEngineDemo {public static void main(String[] args) throws SQLException, InterruptedException {TdEngineDemo tdEngineDemo = new TdEngineDemo();Connection conn = tdEngineDemo.getConn();}private Connection getConn() throws SQLException {String jdbcUrl = "jdbc:TAOS://tencent.server:6030?user=root&password=taosdata";Properties properties = new Properties();properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");return DriverManager.getConnection(jdbcUrl, properties);}
}
模拟采集汽车的点位的各项状态信息
在 Tdengine 中所谓的超级表,简单理解可以算是一个 模板集合表 ,所有的 子表 都要依靠 超级表 来进行创建,所谓的 集合 就是通过对应的 TAG 我们可以从超级表中找到对应的子表信息,具体例子可以看下面例子
子表 真正存放数据集合的表
下面是实战代码,具体步骤:
public class TdEngineDemo {public static void main(String[] args) throws SQLException, InterruptedException {TdEngineDemo tdEngineDemo = new TdEngineDemo();Connection conn = tdEngineDemo.getConn();tdEngineDemo.insert(conn);}private Connection getConn() throws SQLException {String jdbcUrl = "jdbc:TAOS://tencent.server:6030?user=root&password=taosdata";Properties properties = new Properties();properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");return DriverManager.getConnection(jdbcUrl, properties);}private void createTable(Connection connection) {try (Statement sta = connection.createStatement()) {sta.execute("create database if not exists iot");sta.executeUpdate("use iot");//创建超级表,idsta.execute("create stable if not exists device_stat(time timestamp, deviceName VARCHAR(32), position VARCHAR(32), speed INT)" +" " +"tags (deviceId VARCHAR(32))");} catch (SQLException throwables) {throwables.printStackTrace();}}private void insert(Connection conn) throws InterruptedException {//创建表createTable(conn);String sql = "insert into ? using device_stat tags(?) values(?, ?, ?, ?)";run(conn, sql);}//执行数据的插入private void run(Connection conn, String sql) throws InterruptedException {//遍历5次,每次给2个设备表里面插入10条数据for (int i = 0; i < 5; i++) {extracted(conn, sql);}try {conn.close();} catch (SQLException e) {e.printStackTrace();}}private void extracted(Connection conn, String sql) {try (TSDBPreparedStatement pst = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {//创建两个设备表for (int i = 1; i <= 2; i++) {//每个设备插入10条数据for (int i1 = 0; i1 < 10; i1++) {String deviceId = "d" + i;String tableName = "device_stat_" + deviceId;pst.setTableName(tableName);//设置tagpst.setTagNString(0, deviceId);pst.setTimestamp(0, toArray(Timestamp.from(Instant.now()).getTime()));pst.setString(1, toArray(("设备:" + deviceId)), 30);//设置随机点位pst.setString(2, toArray(LocationUtils.getRandomPoint("104.070799,30.545794")), 30);//设置速度pst.setInt(3, toArray(new Random().nextInt(50)));pst.columnDataAddBatch();}}pst.columnDataExecuteBatch();System.out.println("执行插入数据............");} catch (SQLException throwables) {throwables.printStackTrace();}}private static ArrayList toArray(T v) {ArrayList result = new ArrayList<>();result.add(v);return result;}
}
执行完代码后,会发现插入了两张子表 device_stat_d1、device_stat_d2 两个设备的信息
如果要查询某一个设备的信息,我们可以通过两种方式进行查询,第一张是通过超级表指定 tag 进行查询
select * from device_stat where deviceId = ‘d1’;
第二种方式直接按照指定数据库名称进行查询
select * from device_stat_d1;
https://docs.taosdata.com/taos-sql/function/
更多的例子:https://github.com/taosdata/TDengine/tree/3.0/examples/JDBC
加入 maven 依赖,使用了 mybatis plus 作为 orm 框架,druid 作为数据源连接池
3.0.0 3.1.2 1.1.17 org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-web com.baomidou mybatis-plus-boot-starter ${mybatis.plus.version} com.alibaba druid-spring-boot-starter ${druid.version} com.taosdata.jdbc taos-jdbcdriver ${tdengine.version} org.springframework.boot spring-boot-starter-test test
spring:datasource:driver-class-name: com.taosdata.jdbc.TSDBDriverurl: jdbc:TAOS://tencent-server:6030/iot?charset=UTF-8&locale=en_US.UTF-8&timezone=UTC-8password: taosdatausername: root
mybatis-plus:configuration:map-underscore-to-camel-case: falsemapper-locations: classpath:mapper/*.xml
分页插件使用 pstgresql 的方言
@Configuration
public class MybatisPlusConfig {@Beanpublic PaginationInterceptor paginationInterceptor() {PaginationInterceptor paginationInterceptor = new PaginationInterceptor();paginationInterceptor.setDialectType("postgresql");return paginationInterceptor;}
}
public interface DeviceStatMapper extends BaseMapper {/*** 创建超级表*/@Update("create stable if not exists device_stat(time timestamp, deviceName VARCHAR(32), position VARCHAR(32), speed INT) tags (deviceId VARCHAR(32))")int createSuperTable();@Insert("insert into ${tbName}(time, deviceName, position, speed) values(#{device.time}, #{device.deviceName}, #{device.position}, " +"#{device.speed})")int insertOne(@Param("tbName") String tbName, @Param("device") DeviceStat deviceStat);/*** 通过xml进行查询*/List selectListForXml(@Param("tableName") String tableName);
}
@Data
public class DeviceStat {private Timestamp time;private String deviceName;private String position;private Integer speed;
}
@RunWith(SpringRunner.class)
@SpringBootTest
public class TdengineTest {@Resourceprivate DeviceStatMapper deviceStatMapper;@Testpublic void testSelectList() {List deviceStats = deviceStatMapper.selectList(null);deviceStats.forEach(System.out::println);}@Testpublic void insertOne() {DeviceStat deviceStat = new DeviceStat();deviceStat.setDeviceName("设备:d1");deviceStat.setTime(Timestamp.from(Instant.now()));deviceStat.setSpeed(new Random().nextInt(50));deviceStat.setPosition(LocationUtils.getRandomPoint("104.070799,30.545794"));int one = this.deviceStatMapper.insertOne("device_stat_d1", deviceStat);System.out.println(one);}@Testpublic void testSelectListForXml() {List deviceStats = this.deviceStatMapper.selectListForXml("d1");deviceStats.forEach(System.out::println);}}
Tdengine 还提供了消息订阅功能用于指定 topic 进行对应数据的消费
https://docs.taosdata.com/develop/tmq/
create topic test as select * from iot.device_stat; # as后面跟随查询的子句
public class DeviceStatDeserializer extends ReferenceDeserializer {
}
private void subscribe() throws SQLException, InterruptedException {Properties properties = new Properties();properties.setProperty("enable.auto.commit", "true");properties.setProperty("auto.commit.interval.ms", "1000");properties.setProperty("group.id", "cgrpName");properties.setProperty("bootstrap.servers", "tdengine-server:6030");properties.setProperty("td.connect.user", "root");properties.setProperty("td.connect.pass", "taosdata");properties.setProperty("auto.offset.reset", "earliest");properties.setProperty("msg.with.table.name", "true");properties.setProperty("value.deserializer", "com.zhj.DeviceStatDeserializer");TaosConsumer consumer = new TaosConsumer<>(properties);consumer.subscribe(Arrays.asList("test"));while(true){ConsumerRecords meters = consumer.poll(Duration.ofMillis(100));for (DeviceStat meter : meters) {System.out.println(meter);}}
}
流式计算用于对数据进行 清洗 和 预处理,在传统的方案中一般都通过 Kafka、Flink 等流式处理系统进行处理;而 Tdengine 在 3.0的流式计算引擎提供了实时处理写入的数据流的能力,下面我们用一个例子来实战一下流式处理;
流式查询的语法跟 特色查询 一致,可以参考 特色查询 的语法:https://docs.taosdata.com/taos-sql/distinguished/ 注意:流式查询只能其中的子查询必须要配合聚合函数进行使用,普通的子查询函数会出现不支持的异常
CREATE STREAM [IF NOT EXISTS] stream_name [stream_options] INTO stb_name AS subquery
stream_options触发模式:
subquery: SELECT select_list from_clause [WHERE condition] [PARTITION BY tag_list] [window_clause]
subquery查询子句:
在设备上报了车辆的定位和速度之后,由于车辆的定位和速度都是实时的进行数据上报数据量很大,这时候我们需要整理一些数据用于单独存储起来使用;
查询5秒为窗口中速度最大的数据
create stream max_speed into max_speed_output_stb as select _wstart as start, _wend as wend, max(speed) from device_stat interval(5s) order by 1 desc
查询出每10秒为窗口 20 - 30 速度的数量,并且计算平均值数据写入库中
create stream select_thirty_speed into select_thirty_output_stb as select avg(speed), count(*) from device_stat where speed > 20 and speed < 30 interval(10s);
配置两个数据源用于进行测试使用,一个 mysql 、一个 Tdengine
@Configuration
public class MybatisPlusConfig {@Beanpublic PaginationInterceptor paginationInterceptor() {PaginationInterceptor paginationInterceptor = new PaginationInterceptor();paginationInterceptor.setDialectType("postgresql");return paginationInterceptor;}@Beanpublic DataSource dataSource(DataSourceProperties dataSourceProperties) {MultiDataSource dataSource = new MultiDataSource();Map
public class MultiDataSource extends AbstractRoutingDataSource {@Overrideprotected Object determineCurrentLookupKey() {return ThreadDataSource.get();}
}
用本地线程的方式选择指定的数据源
public class ThreadDataSource {private static final ThreadLocal datasourceKey = new ThreadLocal<>();public static void set(String key) {datasourceKey.set(key);}public static String get() {return datasourceKey.get();}}
@Mapper
public interface DeviceStatMapper extends BaseMapper {/*** 创建超级表*/@Update("create stable if not exists device_stat(time timestamp, deviceName VARCHAR(32), position VARCHAR(32), speed INT) tags (deviceId VARCHAR(32))")int createSuperTable();@Insert("insert into ${tbName} using device_stat tags (#{device.deviceId}) values(#{device.time}, #{device" +".deviceName}, #{device.position}, " +"#{device.speed})")int insertOne(@Param("tbName") String tbName, @Param("device") DeviceStat deviceStat);/*** 通过xml进行查询*/List selectListForXml(@Param("tableName") String tableName);
}
@Data
public class DeviceStat {private String deviceId;private Timestamp time;private String deviceName;private String position;private Integer speed;
}
mysql建表语句
create table device_stat
(id int auto_incrementprimary key,time timestamp default CURRENT_TIMESTAMP null,device_name varchar(32) null,position varchar(32) null,speed int(32) null,device_id varchar(32) null
);
测试文件
@RunWith(SpringRunner.class)
@SpringBootTest
public class TdengineTest {@Resourceprivate DeviceStatMapper deviceStatMapper;private final ExecutorCompletionService executorService = new ExecutorCompletionService<>(Executors.newFixedThreadPool(10));@Testpublic void testSelectList() throws InterruptedException {CountDownLatch count = new CountDownLatch(2);new Thread(() -> {selectTdengine();count.countDown();}).start();new Thread(() -> {selectMysql();count.countDown();}).start();count.await();}private void selectMysql() {long start = System.currentTimeMillis();ThreadDataSource.set("mysql");QueryWrapper wrapper = new QueryWrapper<>();List deviceStats = this.deviceStatMapper.selectList(wrapper);long end = System.currentTimeMillis();System.out.println("mysql耗时:" + (end - start) + "查询条数:" + deviceStats.size());}private void selectTdengine() {long start = System.currentTimeMillis();ThreadDataSource.set("tdengine");List deviceStats = this.deviceStatMapper.selectListForXml("d1");long end = System.currentTimeMillis();System.out.println("Tdengine耗时:" + (end - start) + "查询条数:" + deviceStats.size());}@Testpublic void insertTdengine() {ThreadDataSource.set("tdengine");this.deviceStatMapper.createSuperTable();long start = System.currentTimeMillis();int size = 100000;int deviceNum = 100000;for (int i = 0; i < size; i++) {String deviceId = "d" + i;executorService.submit(() -> {ThreadDataSource.set("tdengine");int num = 0;for (int i1 = 0; i1 < deviceNum; i1++) {DeviceStat deviceStat = new DeviceStat();deviceStat.setDeviceId(deviceId);deviceStat.setDeviceName("设备:" + deviceId);deviceStat.setTime(Timestamp.from(Instant.now()));deviceStat.setSpeed(new Random().nextInt(50));deviceStat.setPosition(LocationUtils.getRandomPoint("104.070799,30.545794"));int one = this.deviceStatMapper.insertOne("device_stat_" + deviceId, deviceStat);num += one;}return num;});}int num = 0;int completionTask = 0;do {try {num += executorService.take().get();completionTask++;} catch (InterruptedException | ExecutionException e) {System.out.println("Tdengine异常信息:" + e.getMessage());}System.out.println("Tdengine完成任务数:" + completionTask);} while (completionTask != size);long end = System.currentTimeMillis();System.out.println("Tdengine耗时:" + (end - start) + "插入条数:" + num);}@Testpublic void insertMysql() {long start = System.currentTimeMillis();int size = 100000;int deviceNum = 100000;for (int i = 0; i < size; i++) {String deviceId = "d" + i;executorService.submit(() -> {ThreadDataSource.set("mysql");int num = 0;for (int i1 = 0; i1 < deviceNum; i1++) {DeviceStat deviceStat = new DeviceStat();deviceStat.setDeviceId(deviceId);deviceStat.setDeviceName("设备:" + deviceId);deviceStat.setTime(Timestamp.from(Instant.now()));deviceStat.setSpeed(new Random().nextInt(50));deviceStat.setPosition(LocationUtils.getRandomPoint("104.070799,30.545794"));int one = this.deviceStatMapper.insert(deviceStat);num += one;}return num;});}int num = 0;int completionTask = 0;do {try {num += executorService.take().get();completionTask++;} catch (InterruptedException | ExecutionException e) {System.out.println("Mysql异常信息:" + e.getMessage());}System.out.println("Mysql完成任务数:" + completionTask);} while (completionTask != size);long end = System.currentTimeMillis();System.out.println("Mysql耗时:" + (end - start) + "插入条数:" + num);}@Testpublic void testSelectListForXml() {List deviceStats = this.deviceStatMapper.selectListForXml("d1");deviceStats.forEach(System.out::println);}}
开启两个线程对数据库通过 ORM 框架插入 10W 数据,Tdengine 耗时为 3分钟 ,mysql 耗时为 10分钟
Tdengine插入10w的数据:由于是内存存储数据,默认内存会写入96M 的数据,超过之后就执行持久化,目前这里不太好估算
mysql插入10w的数据,8M 左右
analyze table device_stat; #分析表大小select concat(round(data_length/1024/1024,2),'MB') as data, TABLE_ROWS as tr from information_schema.tables where TABLE_NAME = 'device_stat';
同时查询 10w 条数据:Tdengine的查询效率是mysql的一倍
同时查询 20w 数据
Tdeinge查询500w的数据时间差不多17秒左右