コンテンツにスキップ

5.1.2 Pythonカスタムコンポーネント作成

本項では、Pythonによるカスタムコンポーネントの作成方法について説明します。

5.1.2.1 前提知識#

カスタムコンポーネントを開発する上での前提知識として、Python言語のチュートリアルの内容は大方把握できていること、併せてクラス定義、メソッド定義、クラスの継承等について理解していることを想定しています。 入門レベルの情報については、下記の Python公式ドキュメントを参照してください。

Python公式ドキュメント: https://docs.python.org/ja/3/

5.1.2.2 サンプルカスタムコンポーネント#

Pythonによるカスタムコンポーネントの実装方法説明のため、シンプルな仕様のカスタムコンポーネントをサンプルとして提供しています。

サンプルカスタムコンポーネントのダウンロード

もしくは、SpeeDBee Synapseをインストール済みの場合、カスタムコンポーネントサンプルも下記の場所にインストールされています。

$ ls /usr/local/speedbeesynapse/share/sample_component_py/
sin_and_random.py  read_and_log.py

5.1.2.2.1 サンプルの内容#

カスタムコンポーネントのサンプルとして本章では下記のサンプルを説明します。

  • sin_and_random.py

    出力ポートに対して以下の2つのカラムを作成し、データを出力し続けるコンポーネントです。

    カラム名 データ型 内容
    sin DOUBLE 1HzのSIN波の値を秒間10件登録します。SIN波の周期はパラメータで変更可能です
    rand INT32 1秒に1回、[0, 100)の範囲内のいずれかの値をランダムに登録します
  • read_and_log.py

    入力ポートからデータを受け取り、それを文字列型カラムとログに出力します。

5.1.2.2.2 実行#

*.pyファイルをそのまま画面から登録することで、エラー等がなければコンポーネントベースが追加された状態になります。登録方法は「カスタムコンポーネントを使用する」を参照してください。

登録後は、通常のコンポーネントと同様にD&Dでコンポーネントインスタンスを生成できます。

カスタムコンポーネント選択

sin_and_randomおよびread_and_logを繋いで実行することで、それぞれの出力したカラムを表示できます。 下記はsin_and_randomの出力ポートをモニタリングしています。

カスタムコンポーネント同士の接続

5.1.2.3 カスタムコンポーネント実装詳細#

以降では、サンプルのsin_and_random、および、read_and_logのソースコードを例に、カスタムコンポーネントの実装内容を説明していきます。

5.1.2.3.1 カスタムコンポーネントクラス定義#

Pythonによるカスタムコンポーネントは、下記の通りHiveComponentBaseクラスを継承した新しいクラスを定義することでそのコンポーネントベースの名前や識別情報、実行時の処理を定義することができます。

from speedbeesynapse.component.base import HiveComponentBase, HiveComponentInfo, DataType
  :

@HiveComponentInfo(uuid='00000000-0000-0000-0000-000000000000', name='コンポーネント名', inports=0, outports=1)
class HiveComponent(HiveComponentBase):
    def main(self, param):
        :

クラス名は必ずHiveComponentとしてください。 また、コンポーネントの情報を@HiveComponentInfoデコレータでラップする必要があります。 ここに定義した情報により、画面でもコンポーネントベースの情報を扱うことができるようになります。 下記の説明を参考に設定してください。

メンバ名 定義情報 説明
uuid UUID コンポーネントを識別するためのUUIDです。ランダムなIDを生成してここに設定してください。
name コンポーネント名 コンポーネントの名前を設定します。画面ではこの名前が表示されますのでなるべく他のコンポーネントと被らず、分かりやすい名前にしてください。
tag タグ コンポーネントへタグを付与します。
"collector","emitter","serializer","action","logic"のいずれかの指定によりコンポーネントベースの配置先を指定できます。省略した場合は、Customへ配置されます。
outports 出力ポート数 このコンポーネントが持つ出力ポートの数を設定します。他のコンポーネントの入力ポートにデータを渡す場合はこれを1としてください。
inports 入力ポート数 このコンポーネントが持つ入力ポートの数を設定します。他のコンポーネントの出力ポートからデータを受け取る場合は、これを1としてください。

UUIDについては必ずランダムなIDを生成してください。 他のコンポーネントと同じUUIDにしてしまうと、どちらのコンポーネントも使えなくなることがあります。

5.1.2.3.2 HiveComponentクラスのライフサイクル#

ここで定義したHiveComponentクラスが、1つのコンポーネントベースとなります。 Pythonスクリプトファイル内で、このクラスをインスタンス化するような処理は必要ありません。 かわりに、画面上でコンポーネントベースをD&Dしてコンポーネントインスタンスを生成すると、自動でここで定義したHiveComponentクラスのインスタンスが生成されます。 また、画面上でコンポーネントインスタンスを削除すると、このクラスのインスタンスが破棄されます。

そのため、HiveComponentクラス内に__init__()メソッドや__del__()メソッドを定義しておけば、画面上でのコンポーネントインスタンスの生成・削除に合わせてそのメソッドが実行されることになります。

@HiveComponentInfo(uuid='00000000-0000-0000-0000-000000000000', name='サンプルコンポ', inports=0, outports=0)
class HiveComponent(HiveComponentBase):
    def __init__(self):
        # called at creating component instance
        pass
    def __del__(self):
        # called at deleting component instance
        pass
    def premain(self, param):
        # called before main()
        pass
    def postmain(self, param):
        # called after main() finished
        pass
    def main(self, param):
        # called at starting component instance
        while self.is_runnable():
          pass
    def notify_stop(self):
        # called before the component received stop request from system
        pass

他にも、画面からのコンポーネントの操作により、コールされるメソッドがあります。 コールされるタイミングとそのメソッドについては下図、下表を参照してください。

コンポーネントライフサイクル

コールバック関数 説明
__init__ コンポーネントのインスタンスが生成されたときにコールされます
__del__ コンポーネントのインスタンスが破棄されるときにコールされます
premain コンポーネントを開始するときにmainの前にコールされます
main コンポーネントのメインの処理を実装する関数です
コンポーネントの終了要求を受けるまで、この関数は終了せずにループ処理し続ける必要があります
postmain コンポーネントの終了時、mainの終了直後にコールされます
notify_stop mainの実行中、コンポーネントの停止要求を受けたときに実行されます
このメソッドはmain()の実行中に呼ばれるため、別スレッドで実行されることに注意してください。

mainは、コンポーネントの処理の本体になりますので、必ず定義する必要があります。 以降ではサンプルプログラムのmain関数の実装方法を説明します。

5.1.2.3.3 出力ポートのカラム作成/データ登録(sin_and_random)#

下記はカスタムコンポーネントのサンプルsin_and_randomのメインの処理です。 これをもとにデータを出力する流れを説明します。

def main(self, param):
     :
    count = 0

    # Create columns.
    self.sin = self.out_port1.Column('sin', DataType.DOUBLE)
    self.rand = self.out_port1.Column('rand', DataType.INT32)

    # Repeat at 100msec intervals.
    for [ts, skip] in self.interval_iteration(100000000):

        # Compute sin wave value and insert it into 'sin' column.
        val = math.sin(2*math.pi*sin_hz*count/10)
        self.sin.insert(val, ts)

        # Insert random value into 'rand' column only once in 10.
        if count % 10 == 0:
            self.rand.insert(random.randrange(0, 100), ts)

        count += 1
  1. カラムの作成

    初めに、カラムを作成します。

    self.sin = self.out_port1.Column('sin', DataType.DOUBLE)
    self.rand = self.out_port1.Column('rand', DataType.INT32)
    

    このコンポーネントでは倍精度浮動小数点数のカラムsinと、32bit整数のカラムrandを利用しますので、 self.out_port1にあるColumn()メソッドにてカラム名、データ型等を指定して、カラムオブジェクトを生成しています。 戻り値は後にカラムにデータを登録する際に使用しますので、変数に格納しておきます。

    コンポーネントが出力するすべてのデータは、このように生成したカラムに対して登録していく必要があります。

  2. 定期的な処理の繰り返し

    定期的にデータを登録するため、for文で繰り返し処理を実装しています。

    for [ts, skip] in self.interval_iteration(100000000):
       :
    

    ループの判定条件で呼び出しているself.interval_iteration()は、引数に指定した時間間隔で繰り返し値を返すジェネレータになっています。 それにより、このfor文の内部は一定間隔で実行されることになります。

    引数に指定するのはナノ秒単位になりますので、この例の場合は0.1秒(100000000ナノ秒)に1回、ループ処理が実行されることになります。

    画面からコンポーネントを停止しない限り、このfor文のループは終わることはありません。

  3. カラムへのデータ登録

    繰り返し処理の中で、カラムに対してデータを登録します。

    val = math.sin(2*math.pi*sin_hz*count/10)
    self.sin.insert(val, ts)
    
    if count % 10 == 0:
        self.rand.insert(random.randrange(0, 100), ts)
    

    データの登録は、事前に作成したカラムオブジェクトのinsert()メソッドの呼び出しで行なえます。 第一引数には登録する値を、第二引数にはその値を登録する時刻を指定してください。第二引数は省略すれば現在時刻として扱われます。

5.1.2.3.4 入力ポートからのデータ読み込み(read_and_log#

下記はカスタムコンポーネントのサンプルread_and_logのメインの処理です。 これをもとに別のコンポーネントが出力したデータを受け取る処理の流れを説明します。

def main(self, param):
    # Create a column for logging received data.
    self.logclm = self.out_port1.Column('log', DataType.STRING)

    # Logging column information received from 'in_port1'.
    columns = self.in_port1.get_columns()
    for column in columns:
        self.log.info(column)

    # Start receving column values from 'in_port1'.
    with self.in_port1.ContinuousReader(start=self.get_timestamp()) as reader:
        # Loop until the component received 'stop' request.
        while self.is_runnable():
            # Get a group of records of column data (called as window_data).
            window_data = reader.read()
            if not window_data:
                # window_data may be None.
                self.log.info("no data yet")
                continue

            # Process all records in a window_data one by one.
            for record in window_data.records:
                # Convert timestamp as a string.
                ts = datetime.datetime.fromtimestamp(record.timestamp / 1000000000)

                # Process all column's value in a record one by one.
                for columnvalue in record.data:
                    if columnvalue.value:
                        # Make string from columnvalue, logging it and insert it into the column
                        logmsg = f"{columnvalue.column}[{str(ts)}]: {columnvalue.value}"
                        self.log.info(logmsg)
                        self.logclm.insert(logmsg)
                    else:
                        pass # ignore None

先頭で作成しているlogカラムは、文字列として情報を出力するためのものです。 別コンポーネントからデータを受けるだけなら必須ではないため割愛します。

  1. 入力ポートのカラム情報の取得

    in_port1にあるget_columns()メソッドにより、そのコンポーネントの入力ポートに接続されたコンポーネントが出力しているデータのカラム情報を取得できます。

    columns = self.in_port1.get_columns()
    for column in columns:
        self.log.info(column)
    

    上記の例では取得したカラムをイテレーションループでログに出力しています。

    コンポーネントへの接続を変えたり、接続されたコンポーネントを停止したりすると、このget_columns()で取得できるカラム情報が変化します。 常に最新の情報が必要な場合には定期的に取得して内容をチェックするようにしてください。

  2. ContinuousReaderの開始とループ

    入力ポートから継続的にデータを取得し続ける場合、ContinuousReaderオブジェクトを利用します。 下記のようにwith句で使用し、さらに繰り返しread()をコールすることで入力ポートに届いたデータを取得することができます。

    with self.in_port1.ContinuousReader(start=self.get_timestamp()) as reader:
        while self.is_runnable():
            window_data = reader.read()
            if not window_data:
                self.log.info("no data yet")
                continue
            :
    

    read()の戻り値はNoneになることもあるのでその場合は読み飛ばしてください

  3. 取得したデータのレコードを参照

    ContinuousReaderread()メソッドは、HiveWindowDataオブジェクトを返します。 このオブジェクトは、入力ポートに入ってきたデータをある時間幅分まとめて保持したもので、下図のように同一時刻のデータを収めた複数のレコードからなり、1つのレコードに複数のカラムのデータが収められています。

    そのため、下記のように二重のforループ文によりすべてのデータにアクセスすることができます。

    for record in window_data.records:
        ts = datetime.datetime.fromtimestamp(record.timestamp / 1000000000)
    
        for columnvalue in record.data:
            if columnvalue.value:
                logmsg = f"{columnvalue.column}[{str(ts)}]: {columnvalue.value}"
                self.log.info(logmsg)
                self.logclm.insert(logmsg)
            else:
                pass # ignore None
    

    各データの詳細はHiveRecordDataオブジェクトを参照してください。

5.1.2.3.5 ログ出力#

カスタムコンポーネントでは、下記のようにself.logのメソッドを利用することで任意のタイミングでログを出力することができます。 詳細はHiveLogを参照してください。

self.log.info('create columns')
self.log.info(f'insert value={val} into {column_name}')
self.log.error('invalid data received')

出力したログは、画面からダウンロードすることができます。「各種ログ」を参照してください。

5.1.2.3.6 コンポーネントパラメータ#

画面上からコンポーネントインスタンスを生成すると、その設定項目としてコンポーネントの実行時パラメータを設定することができます。 インスタンスごとにパラメータ設定を変えることで、同じコンポーネントベースの実装でも、コンポーネントの動作を変えることができます。

カラムコンポーネント設定画面

画面で指定する際、パラメータのタイプはSTRING、もしくは、JSONを選択できます。実装時に扱いやすいタイプを選択してください。

パラメータタイプ 説明
STRING 任意の文字列を設定できます。
コンポーネントの各メソッドの第二引数ではstr型として参照できます。
JSON JSON文字列を設定できます。
コンポーネントの各メソッドの第二引数ではdict型として参照できます。

コンポーネントインスタンスに設定したパラメータは、定義したHiveComponentクラスのpremain(),main(),postmain()メソッドの第二引数として取得することができます。

@HiveComponentInfo(uuid='00000000-0000-0000-0000-000000000000', name='サンプルコンポ', inports=0, outports=0)
class HiveComponent(HiveComponentBase):
    def premain(self, param):
        // use 'param' here.
        pass
    def postmain(self, param):
        // use 'param' here.
        pass
    def main(self, param):
        // use 'param' here.
        while ...

これらの扱いはカスタムコンポーネントの実装者次第です。 例としてsin_and_randomでは、パラメータに数値文字列をsinカラムの波形の周波数として利用していますので参考にしてください。

5.1.2.4 カスタムコンポーネント用API#

本項で紹介した関数以外にも、多数のAPI関数が用意されています。 詳細は付録の「カスタムコンポーネントPython-APIリファレンス」を参照してください。

5.1.2.5 カスタムコンポーネントを使用する#

ここでは、作成したカスタムコンポーネントをSpeeDBee Synapseで使用する方法について説明します。

5.1.2.5.1 登録#

次の手順で、作成したカスタムコンポーネントをSpeeDBee Synapseに登録することができます。

  1. 設定メニューアイコンを押下し、「カスタム(Python)」を選択します。

  2. カスタムコンポーネントの「追加」を押下し、カスタムコンポーネントのファイルを登録します。

  3. 「閉じる」を押下します。

  4. 確認ダイアログで「はい」を選択し、変更を反映するために、システムを再起動させます。

  5. 左メニューに、登録したカスタムコンポーネントが反映されます。

    ファイルの保存先

    登録したカスタムコンポーネントのファイルは、次のディレクトリに保存されます。

    /var/speedbeesynapse/components_python_python1/speedbeesynapse/component/custom

    保存先のディレクトリは、画面項目「保存先」に表示されます。

    Pythonバージョン

    カスタムコンポーネントを実行するPythonのバージョンは、画面項目「Python基本情報」の「Pythonバージョン」に表示されます。

    Pythonファイル名

    カスタムコンポーネントはSpeeDBee Synapseではモジュールとして扱っており、ファイル名をモジュール名としています。 したがって、ファイル名に"-"を使用した場合正しく扱えません。
    Pythonの命名規則に従いファイル名は短くすべて小文字で区切りは"_"を使用してください。

5.1.2.5.2 設定画面#

カスタムコンポーネントの設定画面の項目は次の通りです。

項目 説明
名称 コンポーネントの名前を入力
※他のコンポーネント名と重複する事はできません。
自動起動無効 コンポーネントの自動起動を無効にする場合ON
パラメータータイプ パラメータの種類を次の中から選択
・STRING:文字列
・JSON:JSON文字列
パラメーター コンポーネントパラメータを入力
スクリプトで使用するファイルをアップロードする(証明書など) カスタムコンポーネントの処理で使用する、証明書などのファイルをアップロードする場合ON
ファイル追加 押下すると、スクリプトで使用するファイルをアップロードすることができます。
※新しく追加されたファイルは、コンポーネントの設定を保存した時にアップロードされます。
ファイルパス アップロードしたファイルの保存先
※パラメータでファイルパスを使用する場合、変数を用いることができます。詳細は、表題の「ファイルパス」の右のアイコンをクリックすると確認することができます。

設定画面のパラメータ入力部分に、ユーザーが定義した画面項目を表示するように変更することができます。 詳細は「カスタムUI作成」をご覧ください。

5.1.2.5.3 削除#

次の手順で、登録したカスタムコンポーネントをSpeeDBee Synapseから削除することができます。

  1. 設定メニューアイコンを押下し、「カスタム(Python)」を選択します。

  2. カスタムコンポーネントの「削除」を押下します。

  3. 確認ダイアログで「はい」を選択します。

  4. カスタムコンポーネントが削除されます。

5.1.2.5.4 表示#

次の手順で、登録したカスタムコンポーネントの内容を表示することができます。

  1. 設定メニューアイコンを押下し、「カスタム(Python)」を選択します。

  2. カスタムコンポーネントの「表示」を押下します。

  3. カスタムコンポーネントのソースコードが画面に表示されます。

5.1.2.5.5 ダウンロード#

次の手順で、登録したカスタムコンポーネントをダウンロードすることができます。

  1. 設定メニューアイコンを押下し、「カスタム(Python)」を選択します。

  2. カスタムコンポーネントの「取得」を押下します。

  3. カスタムコンポーネントのファイルがダウンロードされます。

5.1.2.5.6 外部ライブラリの追加#

次の手順で、カスタムコンポーネントが使用する外部ライブラリを、SpeeDBee Synapseに追加することができます。

  1. 設定メニューアイコンを押下し、「カスタム(Python)」を選択します。

  2. Pythonモジュールの入力欄に、登録するライブラリの名前を入力します。

  3. Pythonモジュールの「追加」を押下します。

  4. 確認ダイアログで「はい」を選択します。

  5. ライブラリが登録されます。

    登録した外部ライブラリは、次のディレクトリに保存されます。

    /var/speedbeesynapse/dynlibs/pyvenv

    保存先のディレクトリは、画面項目「保存先」に表示されます。

    .whlファイルから追加する

    Pythonモジュールの「whl」を押下し、.whlファイルを選択します。

    pipバージョン

    外部ライブラリの管理で使用するpipのバージョンは、画面項目「Python基本情報」の「pipバージョン」に表示されます。

5.1.2.5.7 外部ライブラリの削除#

次の手順で、追加した外部ライブラリを、SpeeDBee Synapseから削除することができます。

  1. 設定メニューアイコンを押下し、「カスタム(Python)」を選択します。

  2. 削除するライブラリをクリックします。

  3. Pythonモジュールの「削除」を押下します。

  4. 確認ダイアログで「はい」を選択します。

  5. ライブラリが削除されます。