コンテンツにスキップ

3. MQTTコレクタ#

MQTTコレクタの参照/登録/更新を行います。

3.1 MQTTの登録#

MQTTの登録を行います。

  1. MQTTの追加ボタンをクリックします。
    MQTTコレクタ設定画面
  2. 下表の項目を入力し、保存をクリックします。

    入力項目 説明
    設定名 コレクタ名
    暗号化 暗号化通信の有効化
    匿名アクセス 匿名によるアクセスの有効化
    ホスト MQTTブローカーのIPアドレス
    ポート MQTTブローカーのポート番号
    MQTT QoS MQTTのQuality of Service
    ユーザー名 認証ユーザー名(匿名アクセス無効時に入力可能)
    パスワード 認証パスワード(匿名アクセス向渕に入力可能)
    トピック 収集対象のトピック
    キープアライブ MQTTブローカーとのキープアライブ設定
    リトライ回数 MQTTブローカー接続失敗時の再試行回数
    タイムアウト MQTTブローカー接続時のタイムアウト
    無効にする コレクタの稼働を無効にする(設定のみ行う場合に利用)
    CA証明書 暗号化通信に使用するCA証明書
  3. 暗号化通信を有効にして、CA証明書をアップロードする場合、ファイル送信のファイル選択後に保存をクリックします。

3.2 MQTTの参照・更新・削除#

MQTTの参照・更新・削除を行います。

  1. サブメニューから参照するMQTTをクリックし参照します。
    MQTTコレクタ設定画面
  2. 変更項目を入力後、「保存」をクリックし、設定を更新します。 データを削除したい場合は、「削除」をクリックし、設定を削除します。

Note

MQTTを削除すると、データで設定した収集項目がなくなります。 エミッタにて、そのデータ項目を利用している場合、自動削除されます。

3.3 データの編集#

MQTTで扱うデータの収集設定を行います。

3.3.1 登録#

  1. サブメニューから参照するMQTTをクリックして、データ一覧の「編集」をクリックします。
    MQTTコレクタ設定画面
  2. 下記の項目を入力し、「新規」をクリックします。

    入力項目 説明
    データ名 取得するデータの名称
    データ型 取得するデータの型
    無効にする 情報収集を無効にする(設定のみ行う場合に利用)
    永続化する ストレージ上に保存する
    リアルタイム分析を使用する リアルタイム分析を利用する。

    各項目の詳細は ユーザガイド > 補足 をご参照ください。

    MQTTコレクタ設定画面

  3. データ一覧に入力したデータが追加されます。

    MQTTコレクタ設定画面

    Note

    入力項目を変更して、「新規」をクリックする事で連続登録できます。

3.3.2 更新#

  1. 変更するデータを選択します。
    MQTTコレクタ設定画面
  2. 変更する項目を入力し、更新をクリックします。(入力項目は新規データと同様です)

3.3.3 削除#

  1. データ一覧の右にあるチェックボックスをONにし、「削除」をクリックします。
    MQTTコレクタ設定画面

3.4 MQTTテスト接続#

設定したMQTTの接続テストを行います。

  1. 対象となるMQTTを選択して、「Test」をクリックします。
    MQTTコレクタ設定画面
  2. テスト結果が表示されます。
    MQTTコレクタテスト接続結果画面

3.5 データフォーマット#

MQTTコレクタへ外部からデータを送信する際には以下のフォーマットのJSONデータを使用します。

MQTTコレクタが受け付けるJSONデータフォーマット
{
    "data": [
        {
            "timestamp": "1252581133014",
            "name": "num_data",
            "value": 12
        },
        {
            "timestamp": "1252581133019",
            "name": "note",
            "value": string data…”
        },
        : //  登録するデータ数分の繰り返し
    ]
}
属性名 説明
data 配列 登録データを格納する配列
timestamp 文字列 通算ナノ秒
name 文字列 データ一覧で登録したデータ名
value 文字列/数字 データ型に合わせて、文字列または数字で値を指定

3.6 サンプルプログラム#

MQTTコレクタへデータ登録するサンプルプログラムを以下に示します。 サンプルプログラムはPython 3.8.5を使用しています。 paho-mqttのライブラリをpip3でインストールしてください。

MQTTコレクタへ送信するサンプルプログラム
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
simple publish
"""
import json
import os
import random
import sys
import paho.mqtt.client as mqtt
import time
from datetime import datetime
from multiprocessing import Process

# MQTTブローカーの接続設定
HOST = 'localhost'
PORT = 1883
KEEPALIVE = 60  # in seconds
# 必要に応じて認証情報や証明書パスを設定してください。
USER = ''
PASS = ''
CA_FILE = ''
INTERVAL_SEC = 5


# connect callback
def on_connect(client, userdata, flags, response_code):
    print(f"response_code : {response_code} ")
    if response_code == 0:
        print("connected.")

# publish callback
def on_publish(client, userdata, mid):
    print(f"published message : {mid}")

def _get_data(pid):
    # 設定したデータに合わせてJSONを生成する必要があります
    ts = int(time.time() * 1000)
    value = int(random.uniform(1, 100))
    dump = json.dumps({ 'data' : [
        { 'timestamp' : ts, 'name' : 'dt1', 'value' : value },
        { 'timestamp' : ts, 'name' : 'dt2', 'value' : value * 2 }
    ]})
    print(dump)
    return dump

def spawn_client(topic="default"):
    pid = os.getpid()
    print(f"create client (pid : {pid})")
    client = mqtt.Client()

    # 認証
    if len(USER) > 0 and len(PASS) > 0:
        client.username_pw_set(USER, password=PASS)

    # 暗号化通信時
    if len(CA_FILE) > 0:
        client.tls_set(CA_FILE)
        client.tls_insecure_set(True)

    client.spawned_from_pid = pid
    client.on_connect = on_connect
    client.on_publish = on_publish

    client.connect(HOST, PORT, KEEPALIVE)
    client.loop_start()

    try:
        while True:
            client.publish(topic, payload=_get_data(pid), qos=0, retain=False)
            time.sleep(INTERVAL_SEC)
    except KeyboardInterrupt:
        pass

    print(f"stop client (pid : {pid})")
    client.loop_stop()

if __name__ == "__main__":
    # 引数にてトピックを指定します
    if len(sys.argv) > 1:
        topic = sys.argv[1]
        p = Process(target=spawn_client, args=(topic, ))
        p.start()
        p.join()
    else:
        print("usage: ./pub_test.py <topic>")