コンテンツにスキップ

16. Pythonカスタムエミッタ#

16.1 カスタムエミッタとは#

SpeeDBee Hiveでは、収集したデータを出力または外部へ送信するモジュールのことを"エミッタ"と呼びます。 標準ではAWS等のクラウドサービス向けへデータ送信する"エミッタ"が用意されていますが、 ユーザーが独自にエミッタを開発、運用することも可能です。 エミッタの開発にはPythonを使用できます。
本ドキュメントではPythonによるカスタムエミッタの開発 方法を説明します。

16.1.1 対象読者と前提知識#

本書は、SpeeDBee Hiveを使用する上で、Python言語を用いて独自のエミッタを開発することを目的とするユーザーを対象としています。 本書を読む上での前提知識として、Python言語のチュートリアルの内容は大方把握できていること、 併せてクラス定義、メソッド定義、クラスの継承等について理解していることを想定しています。
入門レベルの情報については、下記の Python公式ドキュメントを参照してください。
Python公式ドキュメント: https://docs.python.org/ja/3/

16.1.2 Pythonカスタムエミッタの実行環境#

本書で説明するPythonカスタムエミッタは、SpeeDBee Hiveをインストールした環境により実行するPythonバージョンが異なります。

プラットフォーム Pythonバージョン
Windows 3.9.6
Linux:Ubuntu18 3.6.9
Linux:Ubuntu20 3.8.10
Linux:Ubuntu22(Arm64) 3.10.12
Linux:Raspberry Pi(buster) 3.7.3
Linux:Raspberry Pi(bullseye) 3.9.2
Linux:BlackBear 3.5.3

実際のバージョンについては実行環境でも確認可能です。 カスタムエミッタ登録の画面の最下部から確認してください。

16.1.3 制限事項#

現バージョンにおいて、Pythonカスタムエミッタには以下の制限事項があります。ご注意ください。

  • RaspberryPi環境において、オプションコレクタの OCRコレクタ(カメラ画像からテキストを抽出)とは同時に利用できない

16.2 エミッタの基本処理フロー#

カスタムエミッタをコーディングする上で、SpeeDBee Hiveの処理フローを理解する必要があります。 実装すべき処理が、どのタイミングで呼び出しされるか確認してください。 本節で、掲載している処理フローは説明のために簡略化しています。実際には、より複雑な処理を行いますがエミッタの処理フローを学ぶ上では、簡略化したフローが理解できれば問題ありません。

SpeeDBee Hive全体の処理フローを示します。エミッタの起動・停止するまでを含めた全体の流れとなります。

Hiveメイン処理フロー

SpeeDBee Hiveは、起動すると設定ファイルを読み込み、各種コンポーネントを初期化します。 設定情報を元にデータ収集のためのコレクタを起動します。 エミッタは、コレクタ(データ収集)が起動された後に起動されます。 次に、エミッタのデータ送信を含めた処理フローの詳細を示します。

Emitterメイン処理フロー

エミッタは、起動されると設定ファイルを読み込み、設定に従って各エミッタインスタンスを生成します。 各エミッタインスタンスは、専用のスレッドでデータ送信のデータ送信ループを開始します。 フロー図青色の処理が、ユーザが実装すべき関数です。

名称 説明
HiveEmitterコンストラクタ エミッタインスタンスの初期化を実装します。
HiveEmitter.connect データ送信前の接続処理など前準備を実装します。
HiveEmitter.upload 実際のデータ送信などデータ出力処理を実装します。
HiveEmitter.disconnect データ送信後の切断処理など後始末を実装します。
HiveEmitter.destroy エミッタインスタンスの解放処理を実装します。リソースの破棄などを行います。

上記のほかに、エミッタのデータ送信タイミングを「イベント」とした場合、イベントの検知およびイベント処理期間終了時に呼び出しされる関数があります。 イベントを交えた場合の処理フローを次に示します。フロー図紫色の処理に注目してください。

Emitterメイン処理フロー(イベント)

名称 説明
HiveEmitter.event_open イベント検知時に呼び出しされる関数です。イベントが検知されたタイミングで前準備があれば実装します。
HiveEmitter.event_close 検知されたイベントの処理期間が終了すると呼び出しされる関数です。イベント終了時に、後始末があれば実装します。

上記、処理フローの基本を理解した上で、カスタムエミッタを実装してください。 関数の詳細については後述します。

16.3 Pythonカスタムエミッタのサンプル#

カスタムエミッタの説明のため、単純なデータ出力or送信を行うシンプルなカスタムエミッタをサンプルとして提供しています。 SpeeDBee Hive Web画面の下記より、参照、もしくはダウンロードが可能です。

Pythonエミッタサンプル

16.3.1 Pythonカスタムエミッタのサンプル内容#

下記のPython カスタムエミッタのサンプルを提供しています。 それぞれ特に実用的なものではありませんので、あくまでサンプルとして参照してください。

  • csv_sample.py
    収集したデータをCSVファイルへ出力するサンプルです。 このサンプルで生成したCSVファイルは自動で削除されません。必要に応じて手動で削除する必要があります。
  • http_sample.py
    HTTPリクエスト&レスポンスを確認するサービスを使って、収集データを送信するサンプルです。 このサンプルの利用時には、データ送信間隔は、60秒以上に設定するなど配慮してください。
  • json_sample.py
    収集したデータをjson形式でファイル出力するサンプルです。 このサンプルでは、収集したデータをJSONファイルで出力します。 さらに、送信タイミングをイベントとした場合に、出力ファイルを分ける例も実装されています。 また、保存したファイルを保存期間がすぎた場合に削除する簡易実装も含んでいます。
  • sqlite_sample.py
    収集したデータをSQLite(オープンソースのデータベース)へ登録するサンプルです。 テーブル「collect_data」へ収集したデータを登録します。

参考

A simple HTTP Request & Response Service.
url : https://httpbin.org/
developer : - https://kennethreitz.org/

16.3.1.1 カスタムエミッタ設定#

すでに登録済みのPythonカスタムエミッタを利用する場合は下記の手順で設定してください。

  1. 出力先メニューの右側の「+」を選択し、出力先設定画面を表示します。

    カスタムエミッタ設定

  2. タイプで Custom(Python)を選択します。

  3. その他項目は、下表を参考に入力してください。

    設定項目 内容
    出力先名 任意の名前を入力
    タイムゾーン データを出力するディレクトリに使われる日時のタイムゾーンを指定
    リトライ回数 送信失敗した場合のリトライ回数を入力
    スクリプト名 Pythonカスタムエミッタのスクリプト名
    表示ボタンを押下すると、スクリプトの内容を確認できます。
    パラメーター Pythonカスタムエミッタに渡す任意のパラメーター
  4. 保存を選択します

設定した出力先にデータを保存するには、送信データメニューで送信データを設定する際に出力先を上記で設定した出力先名を選択してください。 送信データ設定方法については 「ユーザーガイド」の「エミッタの登録」 を参照してください。

16.3.1.2 カスタムエミッタ登録#

独自にPython カスタムエミッタを開発した場合、下記の手順で登録してください。

  1. 「システム」タブ→「外部連携設定」→「カスタム(Python)」より、Pythonカスタム管理画面を開きます。

    カスタムエミッタ登録1

  2. 「カスタムエミッタ」欄の「追加」をクリックし、開発したPythonカスタムエミッタを選択、「開く」をクリックします。

    カスタムエミッタ登録2

  3. 登録が完了するとカスタムエミッタの一覧に表示されます。

    カスタムエミッタ登録3

    この後は、「カスタムエミッタ設定」の手順によりカスタムエミッタを実行することができます。

    Note

    同一名称のエミッタがすでに登録されていた場合は上書きされます。 削除する場合は上記の画面から該当エミッタの「削除」をクリックしてください。 ※base_emitter.pyは、上書き及び削除ができません。

Python カスタムエミッタでは、初期状態でも Python の標準モジュールを使用可能ですが、 もし標準モジュールではなく、外部モジュールを利用したい場合は、この画面からインストールを行ってください。

  1. Pythonモジュールのテキスト欄に、使用するモジュール名を入力し「追加」をクリックします。

    カスタムエミッタ登録4

  2. ダイアログにて「OK」をクリックします。

    カスタムエミッタ登録5

  3. インストールが完了すると、モジュールが一覧表に追加されます。

    カスタムエミッタ登録6

    Note

    指定したモジュールに加え、その依存モジュールも自動的にインストールされます。

モジュールのインストールに長時間かかる場合や、 そのモジュールを実行するために必要なライブラリがインストールされていない場合など、 様々な要因でインストールに失敗する可能性があります。
もしこの画面からPythonモジュールのインストールに失敗する場合は、 SpeeDBee Hiveの動作するデバイスにログインし直接pipコマンドでインストールしてください。

$ sudo /var/speedbeehive/dynlibs/pyvenv/bin/pip3 install {モジュール名}
PS> C:\ProgramData\SALTYSTER\SpeeDBeeHive\dynlibs\pyvenv\Scripts\pip3.exe install {モジュール名}

16.4 カスタムエミッタのソースコード説明#

サンプルの「http_sample.py」を例に、カスタムエミッタの実装について説明します。 このカスタムエミッタは、コレクタの収集したデータをクラウドサービス向けにHTTPリクエストでデータ送信します。 ソースコードを以下に掲載します。

import json
import requests
from requests.auth import HTTPBasicAuth
from base_emitter import HiveEmitterBase

#
# このサンプルはhttps://httpbin.orgへ収集データを送信するカスタムエミッタのサンプルです。
# サンプルのためエラー処理などは簡略化しています。
#
# パラメーターは、ユーザIDとパスワードを指定します。※値は何でも可
#  parameter : {"user" : "xxx", "password" : "xxx" }
#
class HiveEmitter(HiveEmitterBase):

    BASE_URL = 'https://httpbin.org'

    def __init__(self, param, basic_meta=None, columns_meta=None) :
        super().__init__(param, basic_meta, columns_meta)
        self._load_param(param)
        self.access_token = None

    #def destroy(self):
    # このエミッタは解放処理不要のためオーバーライドしない
    #   pass

    def connect(self):
        # アクセストークンが取得済みなら再取得は行わないと仮定
        if self.access_token is not None:
            return True

        # BASIC認証に成功したらアクセストークン取得すると仮定(実際には固定文字をセットする)
        try:
            response = requests.get(self.BASE_URL + '/basic-auth/' + self.user + '/' + self.password,
                auth=HTTPBasicAuth(self.user, self.password) )
            self.logger.debug('status_code=' + str(response.status_code))

            if response.status_code != 200:
                return False

            self.access_token = "T7q7W92a%4DoMtCeLvLaQo6bTrgtp7"
            return True

        except Exception as e:
            self.logger.error('connect failed.')
            self.logger.error(str(e))
            return False

    #def disconnect(self):
    # このエミッタは切断処理不要のためオーバーライドしない
    #   pass

    def upload(self, records, event_info):
        try:
            # アクセストークンをセットするサンプル(実際は指定していなくてもエラーにはならない)
            if self.access_token is not None:
                headers = {'Authorization': 'Bearer {}'.format(self.access_token)}
            else:
                headers = None

            # JSON文字列で収集データを送信
            json_data = json.dumps({ "records" : records, "event_info" : event_info }, indent=2)
            response = requests.post(self.BASE_URL + '/post', data=json_data, headers=headers)
            self.logger.debug('status_code=' + str(response.status_code))
            self.logger.debug('response=' + response.text)

            # 200でない場合はアクセストークンを破棄
            # 次回通信時のconnectで再取得する
            if response.status_code != 200:
                self.access_token = None
                return False

            return True
        except Exception as e:
            self.access_token = None
            self.logger.error('upload failed.')
            self.logger.error(str(e))
            return False

    def try_connect(self):
        # このメソッドはエミッタ送信先のTest押下時に呼び出しされます
        # 注意点としてパラメータは使用できますが、その他、basic_infoなどは仮設定となります
        # また、self.loggerの関数が使用できません
        return self.connect()

    #def event_open(self, event_info):
    # このエミッタはイベント検知時に特別な処理不要なためオーバーライドしない
    #   return True

    #def event_close(self, event_info):
    # このエミッタはイベント期間終了時に特別な処理不要なためオーバーライドしない
    #   return True

    def _load_param(self, param):
        try:
            param_dict = json.loads(param)
            self.user = param_dict['user']
            self.password = param_dict['password']
        except Exception as e:
            self.logger.error('parameter failed.')
            self.logger.error('parameter example { "user" : "saltyster", "password" : "fooboo" }')
            raise e

16.4.1 専用モジュールのインポート#

Pythonカスタムエミッタを開発する際にはかならずHiveEmitterBaseクラスをインポートする必要があります。

from base_emitter import HiveEmitterBase

16.4.2 エミッタクラスの定義#

Pythonカスタムエミッタでは、HiveEmitterBaseクラスを継承したHiveEmitterという名称のクラスを、スクリプトファイル内で必ず定義する必要があります。
下記がその定義概要です。(各メソッドの詳細は後述)

class HiveEmitter(HiveEmitterBase):
    def __init__(self, param, basic_meta=None, columns_meta=None) :
        super().__init__(param, basic_meta, columns_meta)

    def destroy(self):
        pass

    def connect(self):
        return True

    def disconnect(self):
        return True

    def upload(self, records, event_info):
        return True

    def try_connect(self):
        return True

    def event_open(self, event_info):
        return True

    def event_close(self, event_info):
        return True

16.4.2.1 HiveEmitter.init … 初期化#

エミッタ開始時に最初に実行されるコンストラクタ(HiveEmitter.__init__)です。 コンストラクタ内では、必ずスーパークラスのコンストラクタを呼び出してください。

    super().__init__(param, basic_meta, columns_meta)

書式

HiveEmitter.__init__(self, param, basic_meta=None, columns_meta=None)

引数

No. 引数 説明
1 self Hiveエミッタのインスタンス
__init__実行後、他のメソッドでも共有する必要のある情報はself内のプロパティとして追加してください。
(例)http_sampleのaccess_token
2 param パラメータ文字列
このPythonエミッタを実行する上で、設定画面にて登録したパラメータが文字列としてこの引数に渡されます。
この使用方法は特に決まりはありませんので、実装するコレクタの内容に合わせて適宜参照してください。
各サンプルでは、JSON文字列から辞書へ変換して扱っています。
3 basic_meta エミッタインスタンスの基本情報です。辞書型で格納されます。
 { "client_id" : "dummy", "emitter_name" : "test", "aggregation" : False }
- client_id 設定画面で設定したクライアントID。インスタンスの識別に利用できます。
- emitter_name 設定画面で設定したエミッタの名称。
- aggregation データの送信で集計処理を使用している場合はTrue、集計なしの場合はFalse
4 columns_meta 設定画面で指定した送信データの定義情報です。通常は使用する必要はありません。

戻り値 - なし

説明

エミッタ開始時に、自動的に呼び出されるコンストラクタです。 このメソッドが呼び出されることでエミッタインスタンスに指定されたパラメータ文字列を受け取ることができます。 本メソッドが例外を送出する場合、エミッタインスタンスは実行されません。パラメータの不備や環境不備 等で、以降の処理が実行できない場合は、下記のように例外を送出してください。

class HiveEmitter(HiveEmitterBase):
    def __init__(self, param, basic_meta=None, columns_meta=None) :
        super().__init__(param, basic_meta, columns_meta)
        :
        if self.check_problem():
            raise Exception(some error message)

16.4.2.2 HiveEmitter.connect … 接続・前処理#

エミッタインスタンスの送信タイミング時に、最初に呼び出しされる関数です。 例えば、ソケットのオープン、認証用APIのコール、ファイル出力系であればファイルのオープン処理などを 実装することを想定した関数です。

書式

HiveEmitter.connect(self)

引数

No. 引数 説明
1 self Hiveエミッタのインスタンス
コンストラクタで初期化したパラメータ等を利用して前処理を行います。

戻り値

  • True: 接続・前処理に成功(送信準備完了)
  • False: 接続・前処理に失敗(データ送信を行わず処理中断)
    Falseを返した場合、リトライ回数分、再度connectが呼び出しされます。

16.4.2.3 HiveEmitter.upload … データ送信#

実際のデータ送信・出力を行うための関数です。 connect成功後に呼び出しされ、送信対象データが複数回分ある場合、connect後に複数回呼び出しされる可能性があります。 送信すべきデータはエミッタ本体より送出されるため、引数で渡ってくるデータを参照して、処理を実装します。

書式

HiveEmitter.upload(self, records, event_info)

引数

No. 引数 説明
1 self Hiveエミッタのインスタンス
2 records 処理するデータセット(辞書)
3 event_info 発生したイベントの簡易情報

recordの辞書構造は次の形式となります。

  • 集約なし

    {
      "gateway_id": "サーバーの名称",
      "records": [
        {
          "timestamp": "タイムスタンプ文字列",
          "送信データ名": 
        },
        // .... 繰り返し
      ],
      "records_count": レコードの件数,
      "aggregate": false
    }
    

  • 集約あり

      "gateway_id": "サーバーの名称",
      "records": [
        {
          "timestamp": "タイムスタンプ文字列",
          "送信データ名": ,
          "送信データ名(統計名)":値,
          // .... 集約有効な設定分
        },
        // .... 繰り返し
      ],
      "records_count": レコードの件数,
      "aggregate": true
    }
    

event_infoの辞書構造は次の通りです。 一部の内部の状態を示す属性は省略しています。 event_infoはevent_openとevent_closeの引数でも参照できます。

{
  "schedule_id": イベントの発生ごとに付与されるID(数値),
  "event_id": 開始時のイベントID(数値),
  "event_id_end": 終了時のイベントID(数値),
  "before": イベント発生前の記録時間(秒),
  "after": イベント発生後の記録時間(秒),
  "max": イベントの最大記録時間(秒),
  "event_st": イベント開始時のタイムスタンプ(ナノ秒),
  "event_et": イベント終了時のタイムスタンプ(ナノ秒),
  "notice": {
    "column_array_size": 配列の要素数(配列以外では0),
    "column_name": "カラム名称",
    "column_data": "イベント発生時の値(文字列)"
  },
  "notice_end": {
    "column_array_size": 配列の要素数(配列以外では0),
    "column_name": "カラム名称",
    "column_data": "終了イベント発生時の値(文字列)"
  }
}

戻り値

  • True: データ送信・出力に成功
  • False: データ送信・出力に失敗
    データ送信に失敗した場合、同一データが再度uploadに引き渡されます

16.4.2.4 HiveEmitter.disconnect … 切断・後処理#

エミッタインスタンスのデータ送信が終了したタイミングで呼び出しされる関数です。 例えば、ソケットのクローズ、ファイル出力系であればファイルのクローズ処理などを 実装することを想定した関数です。

書式

HiveEmitter.disconnect(self)

引数

No. 引数 説明
1 self Hiveエミッタのインスタンス
connectで使用したプロパティなどを参照して、処理を実装します。

戻り値

  • True: 切断・後処理に成功
  • False: 切断・後処理に失敗(エラーログ出力)

16.4.2.5 HiveEmitter.destroy … リソース解放#

エミッタインスタンスの停止後に呼び出しされる関数です。 インスタンス内で確保しているリソースなどの解放処理を実装することを想定した関数です。 エミッタ起動時中は、リソース解放を行わず処理終了時にのみ破棄したいリソースの解放などを行ってください。

書式

HiveEmitter.destroy(self)

引数

No. 引数 説明
1 self Hiveエミッタのインスタンス
リソースの解放など必要なプロパティを参照して、解放処理を行います。

戻り値 - なし

16.4.2.6 HiveEmitter.event_open … イベント処理開始#

送信タイミングを「イベント」にしている場合に呼び出しされる関数です。 設定したイベントを検知した時に、呼び出しされます。 イベントの開始を検知することになるので、イベント発生時に特別な処理を行いたい場合に実装します。 例えば、イベント発生時の出力先ファイルを分けるなどが挙げられます。

書式

HiveEmitter.event_open(self, event_info)

引数

No. 引数 説明
1 self Hiveエミッタのインスタンス
2 event_info 発生したイベントの簡易情報

戻り値

  • True: イベント開始処理に成功
  • False: イベント開始処理に失敗
{
  "event_id": 開始イベントID,
  "event_id_end": 終了イベントのID,
  "schedule_id": スケジュールID,
  "before": 記録するイベント発生前の対象時間(秒),
  "after": 記録するイベント発生後の対象期間(秒),
  "max": イベントを最大で記録する時間(秒),
  "event_st": 1673538675586751700,
  "event_et": 18446744073709551615,
  "notice": {
    "trigger_type": 4,
    "column_id": 11,
    "column_type": 10,
    "column_array_size": 0,
    "trigger_condition": "",
    "column_name": "$resource:cpu_usage",
    "column_data": "42.30"
  },
  "notice_end": {
    "trigger_type": 0,
    "column_id": 0,
    "column_type": 0,
    "column_array_size": 0,
    "trigger_condition": null,
    "column_name": null,
    "column_data": null
  }
}

16.4.2.7 HiveEmitter.event_close … イベント処理終了#

送信タイミングを「イベント」にしている場合に呼び出しされる関数です。 設定したイベントの終了イベント発生時に、呼び出しされます。 または、イベントの最大処理時間に達した場合も呼び出しされます。 例えば、イベントでの記録処理の終了処理などの実装を想定しています。

書式

HiveEmitter.event_close(self, event_info)

引数

No. 引数 説明
1 self Hiveエミッタのインスタンス
2 event_info 発生したイベントの簡易情報

戻り値

  • True: イベント終了処理に成功
  • False: イベント終了処理に失敗

16.4.2.8 HiveEmitter.try_connect … テスト接続#

エミッタ設定画面内のテスト接続押下時に呼び出しされる関数です。 オーバーライドしない場合は、常にTrueを返します。 この関数をオーバーライドして、何らかの接続処理を試行することで 画面からの簡易動作確認が可能となります。 注意点としてパラメータは使用できますが、その他、basic_infoなどは仮設定となります。 また、loggerの関数が使用できません。

書式

HiveEmitter.try_connect(self)

引数

No. 引数 説明
1 self Hiveエミッタのインスタンス

戻り値

  • True: テスト接続処理に成功
  • False: テスト接続処理に失敗