カスタムコンポーネントPython-APIリファレンス
カスタムコンポーネントをPython言語で開発する際のAPI仕様です。 カスタムコンポーネントの開発手順については下記を参照してください。
speedbeesynapse.component.base#
Hiveコンポーネントベースモジュール
このモジュールはSpeeDBee SynapseのコンポーネントをPythonで実装する場合にインポートするモジュールです。
HiveComponentBase Objects#
class HiveComponentBase()
コンポーネント用基底クラス
すべてのコンポーネントはこのクラスを継承して定義する必要があります。
Attributes:
in_portXHiveInPort - 入力ポートX番を表すインスタンス変数。実際にはin_port1, in_port2のように、Xには数字が入ります。out_portXHiveOutPort - 出力ポートX番を表すインスタンス変数。実際にはout_port1, out_port2のように、Xには数字が入ります。
premain#
def premain(param: str | dict) -> typing.Optional[ErrorInfoBase]
コンポーネントメイン処理直前に呼ばれる関数
コンポーネントの開始時、メイン処理の直前に呼ばれます。 ここでパラメータを読み取ってカラムの作成や独自の初期化処理等を行うことが可能です。
ただし、この関数における処理は全コンポーネント内で排他制御が掛けられますので長時間処理に時間がかかると、その間、他のコンポーネントのpremainが動作できないことに留意してください。その代わり、ここでカラムを作成する場合はメイン処理で作成する場合に比べて高速に動作します。
正常に初期化処理等が終了した場合は、何も返さず本メソッドを終了してください。その後mainメソッドがコールされます。
何らかの理由により、コンポーネントの実行を継続できない場合は、ErrorInfoBaseを継承したクラスのインスタンスを返却してください。それにより画面上にもエラーの内容が表示され、コンポーネントは停止します。(main,postmainメソッドは呼ばれません)
このメソッドの定義は必須ではありません。
Arguments:
paramstr | dict - コンポーネントへのパラメータ
Returns:
None- 正常に終了したErrorInfoBase- なんらかの理由でコンポーネントを開始できない場合、ErrorInfoBaseを継承したエラーオブジェクトを返す
main#
def main(param: str | dict)
コンポーネントメイン処理関数
コンポーネントのメイン処理を実装するためのメソッドです。 HiveComponentBaseを継承したクラスではこのメソッドを必ずオーバーライドして、実装してください。
この関数の終了は、コンポーネントの処理が完了したことを意味します。正常に終了する際は何も返す必要はありません。 エラーにより終了する場合は、ErrorInfoBaseを継承したクラスのインスタンスを返してください。返されたインスタンスの情報は画面上に停止理由として表示されます。
このメソッドの外に例外が送出された場合、不明なエラーとしてコンポーネントは停止します。その挙動が望ましくない場合は、例外をキャッチして処理を継続するか、ErrorInfoBaseを継承したクラスのインスタンスを返して本メソッドを終了してください。
Arguments:
paramstr | dict - コンポーネントへのパラメータ
Returns:
None- 正常に終了したErrorInfoBase- なんらかの理由でコンポーネントを開始できない場合、ErrorInfoBaseを継承したエラーオブジェクトを返す
postmain#
def postmain(param: str | dict)
コンポーネントメイン処理直後に呼ばれる関数
コンポーネントの停止時など、メイン処理関数が終了した直後に呼ばれます。
メイン処理がエラー等で終了した場合も必ず呼ばれるため、使用したリソースの解放等を行うことを想定しています。
このメソッドの定義は必須ではありません。
Arguments:
paramstr | dict - コンポーネントへのパラメータ
is_runnable#
def is_runnable()
コンポーネントの実行継続可否判定
このコンポーネントのメイン処理を継続してよいかを判定します。 コンポーネントは定期的(少なくとも5秒以内)にこの関数をコールし、メインの処理を継続してよいか確認する必要があります。 この関数がTrueを返すときはそのままメイン処理を継続することができますが、 Falseを返すときは、速やかにメイン処理を終了する必要があります。
Returns:
- Trueなら継続可能、Flaseならコンポーネントの停止要求が出ている
get_timestamp#
def get_timestamp() -> int
現在時刻の取得
ナノ秒単位のUNIXタイムで表現された現在時刻を取得します。
Returns:
- 現在時刻のタイムスタンプ。
notify_stop#
def notify_stop()
コンポーネントの停止要求通知
このコンポーネントがシステムから停止要求を受けた場合にコールされます。
HiveComponentBaseを継承したクラスでは必要に応じてこのメソッドをオーバーライドしてください。
メソッドがコールされた場合には、速やかにコンポーネントのメイン処理を停止させる必要があります。
メイン処理内でis_runnable()を定期的にコールして継続可否をチェックするのであれば、この関数を実装する必要はありません。
register_status#
def register_status(status: RunningStatus)
ステータス情報の登録
エラー情報が追加されたステータスを登録します
Arguments:
status- エラー情報が登録されたステータスオブジェクト
Status#
def Status()
ステータス情報の生成
エラー情報を登録するためのステータスオブジェクトを生成します。
interval_iteration#
def interval_iteration(interval, base_time=0)
定期処理用イテレータ
コンポーネントにて一定時間間隔で何らかの処理をするためのイテレータです。
指定された基準時刻より、指定した定期処理間隔でイテレータから値がyieldされます。ただし実際に処理が開始されるタイミングは厳密に予定時刻と一致するわけではなく、OSやデバイスの負荷やクロック精度に影響されます。
また、ユーザーが実装する定期的な処理の実行に、定期処理間隔より長い時間を要した場合、次の定期処理がスキップされることがあります。この場合、スキップされた定期処理の回数がyieldの第二要素として受け取れます。
本関数はコンポーネントの停止要求を受けた場合にはイテレーションを終了します。
Arguments:
interval- 定期処理間隔(単位:ナノ秒)base_time- イテレーション開始基準時刻を表すUNIXタイム(単位:ナノ秒)
Yields:
[0]- その時のイテレーションが実行される予定だった時刻を表すUNIXタイム(単位:ナノ秒)[1]- 直前のイテレーションのスキップ数
Examples:
以下のようにfor文でイテレータとして使用できます。
for [ts, skip] in self.interval_iteration(1000000000):
#
# 定期的に実行したい処理
#
save_running_info#
def save_running_info(key: str, data: bytes)
任意の情報を保存する
任意のバイト列をシステムに記録します。
保存されたデータは、同一のコンポーネントであればシステムの再起動後もload_running_info()で参照する事ができます。また、指定されたkeyにより異なるデータとして保存されるため、複数保存したい場合はそれぞれkeyを分けてください。
Arguments:
keystr - 保存する情報を識別するためのキーdatabytes - bytes型の保存したい情報
Raises:
HiveApiError- Hiveフレームワークの各種エラー
load_running_info#
def load_running_info(key: str) -> typing.Optional[bytes]
任意の情報を読み込む
Arguments:
keystr - 読み込みたい情報を識別するためのキー
Returns:
data- byte型の読み込んだデータNone- データがない
Raises:
HiveApiError- Hiveフレームワークの各種エラー
get_common_variable#
def get_common_variable(varname: str) -> typing.Optional[str]
コンポーネント共通変数取得
Arguments:
varnamestr - 取得したい共通変数の変数名
Returns:
- 指定した共通変数の値
None- 指定した変数が存在しない
Raises:
HiveApiError- Hiveフレームワークの各種エラー
create_lock#
def create_lock(key: typing.Union[str, bytes],
interbase: bool = False,
timeout: typing.Optional[float] = None) -> SynapseLock
コンポーネント間での排他制御のためのロック獲得関数
本メソッドは、引数keyで指定された文字列orバイト列で識別されるリソースの排他制御のためのロックを獲得します。
同一のkeyにおけるロックは1つのスレッドでしか獲得できません。
獲得したロックは、返却されるロックオブジェクトのunlock()メソッドで解放できます。
もしくは、本メソッドをwith句で使用すればスコープを抜けた際に解放されます。
また、ロックを獲得したままコンポーネントが停止した場合にも自動でロックが解放されます。
引数interbaseがtrueの場合、同じkeyを使うことで異なるコンポーネントベース間でも排他制御が可能になります。
この場合はC-APIのhive_lock()を使ったカスタムコンポーネントとの排他制御も利用可能です。
interbaseがfalseの場合、同一のコンポーネントベースでのみ排他制御されます。
本メソッドを呼び出したときにすでに外のコンポーネントインスタンスがロックを獲得している場合、メソッド内で実行がブロックされます。
ブロックする最大時間は、引数timeoutにて秒単位で指定可能です。タイムアウト時間を過ぎてもロックを獲得できない場合はHiveTimeoutError例外が送出されます。
timeoutに0以下の値を指定した場合は、実行がブロックされることはなく、ロックを獲得できなければ即時タイムアウトとなります。
timeoutにNoneを指定した場合はタイムアウトせずにロック獲得できるまで待ち続けます。
ロック獲得を待っている間に実行しているコンポーネントインスタンスが停止要求を受けた場合、 タイムアウト時間にかかわらず、HiveLockCanceled例外を送出します。
同一スレッドで同じkeyに対して二重にロックすることはできません。その場合はロック解放待ち状態になりますのでタイムアウトするか停止要求を受けるまでブロックし続けます。
Arguments:
keystr|bytes - 排他制御対象を識別するための任意の文字列orバイト列interbasebool - 異なるコンポーネントベース間で排他制御を行う場合はTruetimeoutfloat|None - ロックが即時獲得できない場合に解放を待つ場合のタイムアウト時間
Returns:
SynapseLock オブジェクト
Raises:
HiveTimeoutError- 指定されたタイムアウト時間内にロックを獲得できなかったHiveLockCanceled- ロックを獲得する前にコンポーネントの停止要求を受けたHiveApiError- Hiveフレームワークの各種エラー
Examples:
以下のようにwith句を使う事ができます。
while self.is_runnable();
try:
with self.create_lock('lock-serial-device', False, 10.0):
#
# 排他制御が必要な処理
#
except HiveTimeoutError as exc:
self.log.debug('wait lock')
continue
except HiveLockCanceled:
return
DataType Objects#
class DataType(IntEnum)
データ型定義Enumクラス
データ型を示すEnumクラスです。カラム作成時に指定するDataTypeとして、このアトリビュートを使用してください。
Attributes:
NONE(0)- どの型にも一致しない未定義の型BOOLEAN(1)- 真偽値型INT8(2)- 8bit符号あり整数型INT16(3)- 16bit符号あり整数型INT32(4)- 32bit符号あり整数型INT64(5)- 64bit符号あり整数型UINT8(6)- 8bit符号なし整数型UINT16(7)- 16bit符号なし整数型UINT32(8)- 32bit符号なし整数型UINT64(9)- 64bit符号なし整数型FLOAT(10)- 32bit浮動小数点型DOUBLE(11)- 64bit浮動小数点型STRING(13)- 文字列BINARY(14)- バイナリFILE(16)- ファイル
ColumnOption Objects#
class ColumnOption()
カラムオプション
出力ポートのカラムに対するオプション設定
Attributes:
column_typeColumnType - カラムタイプ(LOW、MIDDLE、HIGH)samplingratefloat - サンプリングレートcompression_unitint - 圧縮単位fixedsize_binaryint - カラムデータサイズmax_binarysizeint - LO可変長タイプのデータ最大長
FileTypeMetaInfo Objects#
class FileTypeMetaInfo(typing.TypedDict)
ファイル型データメタ情報
ファイル型カラムに登録するファイルのメタ情報をもつクラスです。 いずれのメンバも省略可能です。
Attributes:
media_typeColumnType - メディアタイプ(application/jsonやtext/csvなどを設定できる)filenamefloat - ファイル名のヒント(ファイルエミッタに繋いだ場合にこのファイル名が使われる)begin_timestampint - このファイルに収められたデータの時間範囲の先頭時刻(UNIXタイム、単位:ナノ秒)end_timestampint - このファイルに収められたデータの時間範囲の末尾時刻(UNIXタイム、単位:ナノ秒)
OutColumn Objects#
class OutColumn()
出力ポートのカラム
出力ポートのカラムに対するデータの登録等はこのクラスのインスタンスを介して行います
Attributes:
namestr - 作成時に指定したカラム名
※使用できない文字については下記「使用できないカラム名について」をご確認くださいdata_typeDataType - 作成時に指定したカラムのデータ型data_arrayint - 作成時に指定したカラムの要素数 スカラ型の場合は0となりますoptionsColumnOption - 作成時に指定したカラムのオプション
使用できないカラム名について: 以下の文字はカラム名に使用することができません
\- バックスラッシュ/- スラッシュ*- アスタリスク?- クエスチョン"- ダブルクォート< >- 不等号|- パイプ'- シングルクォート- スペース
insert#
def insert(*args) -> None
カラムへの値登録
このカラムに指定したタイムスタンプtsの値を登録します。
タイムスタンプを省略した場合は現在時刻として扱われます。
dataには、このカラム作成時のデータ型と一致するデータを渡してください。異なるデータを渡した場合、暗黙に変換可能なものは変換されますが、そうでない場合はエラーとなります。
Arguments:
data- 登録する値ts- この値を登録するタイムスタンプ
Raises:
TypeError- このカラムのデータ型に一致しない、不正なデータが渡されたInvalidTimestamp- タイムスタンプが現在時刻と著しく離れている、もしくは前に登録したタイムスタンプより過去になっている
open_file#
@contextmanager
def open_file()
新規ファイルの作成
ファイル型として作成されたカラムに登録する新規のファイルをオープンし、そのファイルオブジェクトを返します。
Returns:
FileObject- オープンされたファイルオブジェクト
Raises:
TypeError- FILE型でないカラムに対してこのメソッドが呼ばれたHiveApiError- Hiveフレームワークの各種エラーOSError- ファイルオープンに関連する各種エラー
Examples:
以下のようにwith句で使用できます。
file_column = self.out_port1.Column("clmfile", DataType.FILE)
with file_column.open_file() as fo:
fo.write("filedata - line 1\n".encode('utf-8))
fo.write("filedata - line 2\n".encode('utf-8))
fo.write("filedata - line 3\n".encode('utf-8))
file_column.insert_file(fo, ts)
insert_file#
def insert_file(fo: typing.IO[typing.AnyStr],
ts: int = 0,
meta: typing.Optional[FileTypeMetaInfo] = None) -> None
ファイルのDB登録
ファイルをファイル型のカラムへ登録します。
Arguments:
file- オープンされたファイルオブジェクトts- この値を登録するタイムスタンプmeta- 登録するファイルのメタ情報
Raises:
ValueError- 登録しようとしたファイルがすでにクローズされているHiveApiError- Hiveフレームワークの各種エラーOSError- ファイル操作に関連する各種エラー
insert_file_move#
def insert_file_move(pathlike,
ts: int = 0,
meta: typing.Optional[FileTypeMetaInfo] = None) -> None
既存ファイルのDB登録(移動)
すでに作成済みのファイルをファイル型のカラムへ登録します。 指定されたファイルは、DBの管理ディレクトリに移動されます。引数で指定したパスにはファイルが残らないことに注意してください。 ディレクトリは登録できません。
Arguments:
pathlike- 登録対象のファイルのパスを表すpath-likeオブジェクトts- この値を登録するタイムスタンプmeta- 登録するファイルのメタ情報
Raises:
ValueError- 指定したパスHiveApiError- Hiveフレームワークの各種エラーOSError- ファイル操作に関連する各種エラー
insert_file_copy#
def insert_file_copy(pathlike,
ts: int = 0,
meta: typing.Optional[FileTypeMetaInfo] = None) -> None
既存ファイルのDB登録(コピー)
すでに作成済みのファイルをファイル型のカラムへ登録します。 指定されたファイルはDBの管理ディレクトリにコピーされ、そのコピー先のファイルがカラムに登録されます。 以降、引数で指定した元ファイルのパスは参照されません。 ディレクトリは登録できません。
Arguments:
pathlike- 登録対象のファイルのパスを表すpath-likeオブジェクトts- この値を登録するタイムスタンプmeta- 登録するファイルのメタ情報
Raises:
ValueError- 指定したパスHiveApiError- Hiveフレームワークの各種エラーOSError- ファイル操作に関連する各種エラー
insert_file_ref#
def insert_file_ref(pathlike,
ts: int = 0,
meta: typing.Optional[FileTypeMetaInfo] = None) -> None
既存ファイルのDB登録(参照)
すでに作成済みのファイルをファイル型のカラムへ登録します。 指定されたファイルは、そのパスのままカラムに登録されます。 そのためファイルを削除してしまうと、このカラムの情報を他のコンポーネントが参照する際にもファイルを参照できなくなります。 ディレクトリは登録できません。
Arguments:
pathlike- 登録対象のファイルのパスを表すpath-likeオブジェクトts- この値を登録するタイムスタンプmeta- 登録するファイルのメタ情報
Raises:
ValueError- 指定したパスHiveApiError- Hiveフレームワークの各種エラーOSError- ファイル操作に関連する各種エラー
AggregationType Objects#
class AggregationType(IntEnum)
集約種別定義Enumクラス
入力ポートから取得するカラムデータの集約種別を表します
Attributes:
NONE(0)- なしOLDER(1)- 最古値NEWER(2)- 最新値COUNT(3)- 件数SUM(4)- 総和SUMSQ(5)- 2乗和SUMSQD(6)- 偏差平方和MIN(7)- 最小MAX(8)- 最大RANGE(9)- 範囲MEAN(10)- 算術平均値VAR(11)- 分散STDEV(12)- 標準偏差UVAR(13)- 不偏分散USTDEV(14)- 標本不偏標準偏差STDER(15)- 標準誤差CV(16)- 変動係数MEDIAN(17)- 中央値QUARTMIN(18)- 四分位数QUART1(19)- 四分位数QUART2(20)- 四分位数QUART3(21)- 四分位数QUARTMAX(22)- 四分位数MODE(23)- 最頻値MODECOUNT(24)- 最頻値個数MDEV(25)- 平均偏差HMEAN(26)- 調和平均GMEAN(27)- 幾何平均KURT(28)- 尖度SKEW(29)- 歪度
AggregationTypeSet Objects#
class AggregationTypeSet()
入力ポートのカラムからデータ取得する際に有効になっている統計情報を示すフラグのセット。
AggregationTypeの属性をtest()メソッドに渡すことでこの統計情報が有効になっているかを確認できます。
また、イテレーターを使って有効になっている統計情報のリストを取得することも可能です。
Examples:
AggregationTypeの属性をtest()メソッドに渡すことでこの統計情報が有効になっているかを確認できます。
if a_type_set.test(AggregationType.MAX):
pring(max: ON")
else:
pring(max: OFF")
また、イテレーターを使って有効になっている統計情報のリストを取得することも可能です。
for a_type in aggregation_type_set:
print(a_type.name)
InColumn Objects#
class InColumn()
入力ポートのカラム
このクラスからは情報を取得するのみです。カラムに対してデータを登録するなどの操作はできません。
Attributes:
source_namestr - カラムを生成したコンポーネントの名前data_namestr - カラムの名前data_typeDataType - カラムの型情報array_sizeint - カラムの配列要素数(スカラの場合は0)raw_databool - カラムからのデータ取得時に未加工のデータが含まれるか否かを示す真偽値stat_type_setAggregationTypeSet - カラムからのデータ取得時に有効となっている統計情報を示すフラグセットoutput_namestr - カラムの出力名。指定がない場合はNone
get_latest_value#
def get_latest_value(use_storage: bool = False) -> (int, typing.Any)
カラムの最新値取得
カラムに登録されている最新のデータを読み込み、そのタイムスタンプと値のタプルを返します。
Arguments:
use_storagebool - ※現時点では未サポート
ストレージに永続化されたデータからの読み込みを行う場合にTrueを指定してください。
Falseの場合、メモリ上からのみ読み込みを行うため、永続化されたデータが有ってもデータなしを返す可能性があります。
Trueの場合でも、メモリ上に最新データがあればストレージのアクセスは行いません。
Returns:
(0, None): カラムから読み込める値がない場合(>0, !=None): 読み込まれたタイムスタンプと値
Raises:
HiveApiError- Hiveフレームワークの各種エラー
InPortReader Objects#
class InPortReader()
入力ポートリーダー
入力ポートのカラムから、データを読み込むクラスです。
このクラスは直接インスタンス化せず、self.in_portX.ContinuousReader()、もしくは、self.in_portX.TimeRangeReader()関数を使用してください。
read#
def read() -> typing.Optional[HiveWindowData]
カラムデータ読み込み
入力ポートからカラムデータを読み込みます。
Returns:
HiveWindowData- 読み込んだウィンドウデータを返します。None- 現時点で読み込みできない場合はNoneが返されます。
Raises:
HiveApiError- Hiveフレームワークの各種エラー
HiveOutPort Objects#
class HiveOutPort()
出力ポートを表すクラス
出力ポートに対するカラムの作成等はこのクラスのインスタンスを介して行います。
このクラスのインスタンスを直接生成することはできません。コンポーネントの開始時に、アトリビュートself.out_port1のように最初からアクセスできますのでこれを使ってください。
Column#
def Column(name: str,
data_type: DataType,
data_array: int = 0,
opt: typing.Optional[ColumnOption] = None) -> OutColumn
カラム生成
この出力ポート内に、指定された名称のカラムを生成します。
Arguments:
name- 作成するカラム名data_type- 作成するカラムのデータ型data_array- 作成するカラムの要素数(スカラ型の場合は0を指定)opt- カラムオプション
Returns:
- OutColumn
Raises:
HiveApiError- Hiveフレームワークの各種エラー
register_confirmed_time#
def register_confirmed_time(ts: int = 0) -> None
出力ポートへの確定時刻登録
この出力ポートのカラムへのデータ登録が、指定した時刻tsまでは確定したことを通知します。この登録により、指定した時刻tsより前の時刻のデータは読み込むことができることを、フローリンクを通して繋がっている別のコンポーネントが知ることができます。
※このAPIの呼び出しは必須ではありません。
Arguments:
ts- 出力ポートのカラムへのデータ登録が確定している時刻 ※省略した場合は現在時刻になります
HiveInPort Objects#
class HiveInPort()
入力ポートを表すクラス
入力ポートからデータを取得するような処理はこのクラスのインスタンスを介して行います
get_columns#
def get_columns() -> list[InColumn]
カラムリストの取得
この入力ポートに接続されている全カラムの情報を取得します
Returns:
- list[InColumn]
ContinuousReader#
def ContinuousReader(start: int) -> InPortReader
継続カラムリーダーの生成
この入力ポートに接続されているカラムから、継続的にデータを読み込むContinuousReaderを生成します
Arguments:
start- 入力ポートからの読み込みを開始する時刻(UNIXタイム、単位:ナノ秒)
Returns:
- ContinuousReader
Examples:
以下のようにwith句でreaderを取得して使用できます。
with self.in_port1.ContinuousReader(start=self.get_timestamp()) as reader:
while self.is_runnable():
window_data = reader.read()
if not window_data:
continue
:
TimeRangeReader#
def TimeRangeReader(start: int, end: int) -> InPortReader
時間範囲データリーダーの生成
指定したカラム、もしくは入力ポートに接続されている全カラムから、指定した時間範囲のデータを読み込むTimeRangeReaderを生成します
Arguments:
start- 入力ポートからの読み込みを開始する時刻(UNIXタイム、単位:ナノ秒)end- 入力ポートからの読み込みを終了する時刻(UNIXタイム、単位:ナノ秒)column- データを取得するカラム(指定がない場合には入力ポートの全カラム)
Returns:
- data
Examples:
以下のようにwith句でreaderを取得して使用できます。
with self.in_port1.ContinuousReader(start=self.get_timestamp()) as reader:
for window_data in reader:
:
RunningStatus Objects#
class RunningStatus()
add_error#
def add_error(err: ErrorInfoBase)
エラー情報登録
ステータスオブジェクトにエラー情報を登録します。 このオブジェクトに登録しただけでは保存されません。最終的にはHiveComponentBase.register_statusを呼び出してシステムに登録してください。
Arguments:
err- エラー情報
ErrorType#
def ErrorType(error_id: str, *param_names: str)
エラー情報クラス生成
エラー情報を管理するクラスを生成します。 実際のエラー情報は、この関数で生成したクラスからインスタンスを生成してください。
Arguments:
error_id- エラー情報を識別する文字列param_names- エラー情報のパラメータの名称
ErrorInfoBase Objects#
class ErrorInfoBase(Exception)
ステータスに登録するためのエラー情報
コンポーネントのエラーの種別や詳細を保持するためのクラスです。
このクラスは、直接インスタンス化したり継承したりせずに、ErrorType関数を使って生成したクラスをインスタンス化してください。
Attributes:
errorTypeIndexint - エラーパラメータのタイプparamslist[ErrorParam] - エラーパラメータ
HiveLogLevel Objects#
class HiveLogLevel(IntEnum)
ログレベル型定義Enumクラス
ログレベルを示すEnumクラスです。
Attributes:
ERROR(0)- エラーログWARNING(1)- 警告ログINFO(2)- 通常情報ログDEBUG(3)- デバッグ用ログTRACE(4)- トレースログ
HiveLog Objects#
class HiveLog()
ログ出力クラス
get_current_level#
def get_current_level()
ログレベル取得
Returns:
HiveLogLevel- 現在のログレベルEnum
error#
def error(*msg)
エラーログ出力
Arguments:
msg- ログ出力する文字列
warning#
def warning(*msg)
警告ログ出力
Arguments:
msg- ログ出力する文字列
info#
def info(*msg)
通常情報ログ出力
Arguments:
msg- ログ出力する文字列
debug#
def debug(*msg)
デバッグログ出力
Arguments:
msg- ログ出力する文字列
trace#
def trace(*msg)
トレースログ出力
Arguments:
msg- ログ出力する文字列
exception#
def exception(exc: Exception)
例外専用エラーログ出力
Arguments:
exc- ログ出力する例外
HiveWindowData Objects#
class HiveWindowData()
ウィンドウデータクラス
入力ポートからカラムのデータを取得する際に、特定の時間単位でまとめられたデータのかたまりです。 時間単位がどの程度になるかは入力ポートの設定により異なります。
Attributes:
time_rangetuple[int:2] - このウィンドウの時間範囲を表す2要素のタプル。最初の要素が開始時刻、2番目の要素が終了時刻を表す。どちらもナノ秒単位。window_idint - ウィンドウを識別する数値。ウィンドウ毎に異なる値が取得されるが、常に1つずつ増加するとは限らないことに注意。event_idint - イベント範囲を識別する数値。イベント定義されていない場合やイベント範囲外では0に固定。イベント中は1以上の値が入る。recordslist[HiveRecord] - このウィンドウ内のレコードリストcolumnslist[InColumn] - 取得した入力ポートに接続されたカラムの一覧。HiveInPort.get_columns()で取得したものと同じ。eventHiveInPortEventSetting - 入力ポートに設定されたイベント情報
HiveRecordType Objects#
class HiveRecordType(IntEnum)
レコード種別Enumクラス
レコード種別を示すEnumクラスです。
Attributes:
RAW_DATA(0)- 未加工データSTATISTICS(1)- 集約データ
HiveRecord Objects#
class HiveRecord()
レコードクラス
入力ポートからカラムのデータを取得する際に、同一時刻のデータをまとめたもの。
別カラムのデータでも同一時刻のものがあれば全て1つのインスタンスにまとめられます。レコードは2種類、未加工データと集約データがあり、record_typeで識別できます。
Attributes:
record_typeHiveRecordType - このウィンドウ内のレコードリストtimestampint - このレコードに含まれるデータのタイムスタンプ。UNIXタイム(単位ナノ秒)datalist[HiveRecordData] - このレコードのデータのリスト
HiveRecordData Objects#
class HiveRecordData()
レコードデータクラス
1レコード内の各カラムのデータ。
Attributes:
valueany - 実データ。数値、文字列、配列など、カラムの型に応じて異なる形式。columnInColumn - 対象カラムを示すオブジェクトstat_typeAggregationType - このデータの集約種別
HiveCompError Objects#
class HiveCompError()
コンポーネントエラー情報クラス
コンポーネント実行中、もしくは、エラーで停止した場合のエラー情報
Attributes:
error_typestr - コンポーネントのエラーの種別を示す文字列parameterslist[any] - エラーの付加情報
HiveCompStatus Objects#
class HiveCompStatus()
コンポーネントステータスクラス
コンポーネントのステータスとエラー情報
Attributes:
statusstr - コンポーネントの実行状態を表す文字列errorslist[HiveCompError] - コンポーネント実行中、もしくは、エラーで停止した場合のエラー情報
HiveInPortEventSetting Objects#
class HiveInPortEventSetting()
入力ポートのイベント設定情報クラス
入力ポートに設定されたイベント情報を保持するクラス このクラスの属性値を変更しても実際の入力ポート設定には反映されません
Attributes:
source_namestr - イベント判定の対象となっているコンポーネント名data_namestr - イベント判定の対象となっているカラム名pre_timeint - イベント区間前方の拡張時間範囲post_timeint - イベント区間後方の拡張時間範囲
SynapseLock Objects#
class SynapseLock()
コンポーネント間排他制御のためのロックオブジェクト
HiveComponentBase.create_lock()でロックを獲得したときに取得できるオブジェクトです。
通常はこのオブジェクトを操作する必要はありません。
create_lock()の使用例のように、with句を使用していればそのスコープを抜けた際に自動でロックが解放されます。
with句を使用しない場合は、このクラスのメソッドacquire(), release()でロックの獲得解放が可能です。
acquire#
def acquire() -> None
ロックを獲得する
このオブジェクトが生成されたときの引数key, timeout, interbaseを使ってロックを要求します。
ロック獲得成功した場合は何も返さずにメソッドが終了します。
Raises:
HiveTimeoutError- 指定されたタイムアウト時間内にロックを獲得できなかったHiveLockCanceled- ロックを獲得する前にコンポーネントの停止要求を受けたHiveApiError- Hiveフレームワークの各種エラー
release#
def release() -> None
ロックを解放する
このオブジェクトが保持しているロックを解放し、他のコンポーネント/スレッドがロックを獲得できるようにします。
HiveTimeoutError Objects#
class HiveTimeoutError(Exception)
関数、メソッド内でブロッキングする場合のタイムアウト時間に達した場合に送出される例外です
HiveLockCanceled Objects#
class HiveLockCanceled(Exception)
SynapseLockオブジェクトにおいて、ロック獲得要求中にコンポーネントが停止された場合に送出される例外です