登录
首页 >  数据库 >  MySQL

EMQ X 规则引擎系列(二)存储消息到 MySQL 数据库

来源:SegmentFault

时间:2023-02-24 21:45:26 190浏览 收藏

编程并不是一个机械性的工作,而是需要有思考,有创新的工作,语法是固定的,但解决问题的思路则是依靠人的思维,这就需要我们坚持学习和更新自己的知识。今天golang学习网就整理分享《EMQ X 规则引擎系列(二)存储消息到 MySQL 数据库》,文章讲解的知识点主要包括MySQL、mqtt、物联网、iot、发布订阅模式,如果你对数据库方面的知识点感兴趣,就不要错过golang学习网,在这可以对大家的知识积累有所帮助,助力开发能力的提升。

场景介绍

该场景需要将 EMQ X 指定主题下且满足条件的消息存储到 MySQL 数据库。为了便于后续分析检索,消息内容需要进行拆分存储。

该场景下设备端上报信息如下:

  • 上报主题:cmd/state/:id,主题中 id 代表车辆客户端识别码
  • 消息体:

    {
      "id": "NXP-058659730253-963945118132721-22", // 客户端识别码
      "speed": 32.12, // 车辆速度
      "direction": 198.33212, // 行驶方向
      "tachometer": 3211, // 发动机转速,数值大于 8000 时才需存储
      "dynamical": 8.93, // 瞬时油耗
      "location": { // GPS 经纬度数据
        "lng": 116.296011,
        "lat": 40.005091
      },
      "ts": 1563268202 // 上报时间
    }

当上报数据发动机转速数值大于

CREATE DATABASE `emqx_rule_engine_output` CHARACTER SET utf8mb4;

创建数据表

根据场景需求,创建数据表

CREATE TABLE `use_statistics` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `client_id` varchar(100) DEFAULT NULL COMMENT '客户端识别码',
  `speed` float unsigned DEFAULT '0.00' COMMENT '当前车速',
  `tachometer` int(11) unsigned DEFAULT '0' COMMENT '发动机转速',
  `ts` int(11) unsigned DEFAULT '0' COMMENT '上报时间戳',
  `msg_id` varchar(50) DEFAULT NULL COMMENT 'MQTT 消息 ID',
  PRIMARY KEY (`id`),
  KEY `client_id_index` (`client_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

创建成功后通过 MySQL 命令确认数据表是否存在:

Database changed
mysql> desc use_statistics;
+------------+------------------+------+-----+---------+----------------+
| Field      | Type             | Null | Key | Default | Extra          |
+------------+------------------+------+-----+---------+----------------+
| id         | int(11)          | NO   | PRI | NULL    | auto_increment |
| client_id  | varchar(100)     | YES  | MUL | NULL    |                |
| speed      | float unsigned   | YES  |     | 0       |                |
| tachometer | int(11) unsigned | YES  |     | 0       |                |
| ts         | int(11) unsigned | YES  |     | 0       |                |
| msg_id     | varchar(50)      | YES  |     | NULL    |                |
+------------+------------------+------+-----+---------+----------------+
6 rows in set (0.01 sec)

配置说明

创建资源

打开 EMQ X Dashboard,进入左侧菜单的 资源 页面,点击 新建 按钮,键入 MySQL 服务器信息进行资源创建。

图片描述

EMQ X 集群中节点所在网络环境可能互不相同,资源创建成功后点击列表中 状态按钮,查看各个节点资源连接状况,如果节点上资源不可用,请检查配置是否正确、网络连通性,并点击 重连 按钮手动重连。

图片描述

创建规则

进入左侧菜单的 规则 页面,点击 新建 按钮,进行规则创建。这里选择触发事件 消息发布,在消息发布时触发该规则进行数据处理。

选定触发事件后,我们可在界面上看到可选字段及示例 SQL:

图片描述

筛选所需字段

规则引擎使用 SQL 语句处理规则条件,该业务中我们需要将

SELECT
  payload.id as client_id, payload.speed as speed, 
  payload.tachometer as tachometer,
  payload.ts as ts, id
FROM
  "message.publish"
WHERE
  topic =~ 't/#'

确立筛选条件

使用 SQL 语句 WHERE 字句进行条件筛选,该业务中我们需要定义两个条件:

  • 仅处理
    SELECT
      payload.id as client_id, payload.speed as speed, 
      payload.tachometer as tachometer,
      payload.ts as ts,
      id
    FROM
      "message.publish"
    WHERE
      topic =~ 'cmd/state/+'
      AND payload.tachometer > 8000

    使用 SQL 测试功能进行输出测试

    借助 SQL 测试功能,我们可以实时查看当前 SQL 处理后的数据输出,该功能需要我们指定 payload 等模拟原始数据。

    payload 数据如下,注意更改

    {
      "id": "NXP-058659730253-963945118132721-22",
      "speed": 32.12,
      "direction": 198.33212,
      "tachometer": 9001,
      "dynamical": 8.93,
      "location": {
        "lng": 116.296011,
        "lat": 40.005091
      },
      "ts": 1563268202
    }

    点击 SQL 测试 切换按钮,更改

    {
      "client_id": "NXP-058659730253-963945118132721-22",
      "id": "589A429E9572FB44B0000057C0001",
      "speed": 32.12,
      "tachometer": 9001,
      "ts": 1563268202
    }

    测试输出与预期相符,我们可以进行后续步骤。

    添加响应动作,存储消息到 MySQL

    SQL 条件输入输出无误后,我们继续添加相应动作,配置写入 SQL 语句,将筛选结果存储到 MySQL。

    点击响应动作中的 添加 按钮,选择 保存数据到 MySQL 动作,选取刚刚选定的资源,我们使用

    INSERT INTO 
        `use_statistics` (`client_id`, `speed`, `tachometer`, `ts`, `msg_id`)
    VALUES 
        (${client_id}, ${speed}, ${tachometer}, ${ts}, ${id});

    图片描述

    测试

    预期结果

    我们成功创建了一条规则,包含一个处理动作,动作期望效果如下:

    1. 设备向
      {
        "id": "NXP-058659730253-963945118132721-22",
        "speed": 32.12,
        "direction": 198.33212,
        "tachometer": 9002,
        "dynamical": 8.93,
        "location": {
          "lng": 116.296011,
          "lat": 40.005091
        },
        "ts": 1563268202
      }

图片描述

点击 发送 按钮,发送成功后查看规则统计 已命中 数据统计值为 1 表明规则已成功命中,MySQL 命令行中查看数据表记录得到数据如下:

至此,我们通过规则引擎实现了使用规则引擎存储消息到 MySQL 数据库的业务开发。

版权声明: 本文为 EMQ 原创,转载请注明出处。

原文链接:https://www.emqx.io/cn/blog/e...

理论要掌握,实操不能落!以上关于《EMQ X 规则引擎系列(二)存储消息到 MySQL 数据库》的详细介绍,大家都掌握了吧!如果想要继续提升自己的能力,那么就来关注golang学习网公众号吧!

声明:本文转载于:SegmentFault 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>