From ee5afb691f8aa12f74876ab63ad9b0d421d6f66c Mon Sep 17 00:00:00 2001 From: JackieTien97 Date: Wed, 22 Apr 2026 08:11:34 +0800 Subject: [PATCH 1/3] Rewrite SQLAlchemy dialect to support IoTDB 2.0+ table model The old SQLAlchemy dialect was built for the tree model (path-based schema). This rewrites it to support the table model with standard relational SQL, including: - Column categories (TAG, ATTRIBUTE, FIELD, TIME) via dialect-specific args - DDL generation with CREATE TABLE categories and TTL support - Table model reflection (SHOW TABLES, SHOW COLUMNS FROM) - Updated type mappings (STRING, BLOB, TIMESTAMP, DATE) - Simplified SQL compiler (table model supports standard SQL) - DBAPI layer: add sql_dialect parameter, propagate exceptions Co-Authored-By: Claude Opus 4.7 (1M context) --- .../client-py/iotdb/dbapi/Connection.py | 6 + iotdb-client/client-py/iotdb/dbapi/Cursor.py | 61 ++-- .../iotdb/sqlalchemy/IoTDBDDLCompiler.py | 67 ++++ .../iotdb/sqlalchemy/IoTDBDialect.py | 154 +++++---- .../iotdb/sqlalchemy/IoTDBSQLCompiler.py | 303 +----------------- .../iotdb/sqlalchemy/IoTDBTypeCompiler.py | 80 ++++- .../client-py/iotdb/sqlalchemy/__init__.py | 4 + .../integration/sqlalchemy/test_dialect.py | 292 ++++++++++++----- 8 files changed, 482 insertions(+), 485 deletions(-) create mode 100644 iotdb-client/client-py/iotdb/sqlalchemy/IoTDBDDLCompiler.py diff --git a/iotdb-client/client-py/iotdb/dbapi/Connection.py b/iotdb-client/client-py/iotdb/dbapi/Connection.py index aee5520e9af92..caec91364a503 100644 --- a/iotdb-client/client-py/iotdb/dbapi/Connection.py +++ b/iotdb-client/client-py/iotdb/dbapi/Connection.py @@ -37,9 +37,15 @@ def __init__( zone_id=Session.DEFAULT_ZONE_ID, enable_rpc_compression=False, sqlalchemy_mode=False, + sql_dialect=None, + database=None, ): self.__session = Session(host, port, username, password, fetch_size, zone_id) self.__sqlalchemy_mode = sqlalchemy_mode + if sql_dialect: + self.__session.sql_dialect = sql_dialect + if database: + self.__session.database = database self.__is_close = True try: self.__session.open(enable_rpc_compression) diff --git a/iotdb-client/client-py/iotdb/dbapi/Cursor.py b/iotdb-client/client-py/iotdb/dbapi/Cursor.py index a1d6e2caabac9..018ade6e99ac2 100644 --- a/iotdb-client/client-py/iotdb/dbapi/Cursor.py +++ b/iotdb-client/client-py/iotdb/dbapi/Cursor.py @@ -129,41 +129,32 @@ def execute(self, operation, parameters=None): sql_seqs.append(seq) sql = "\n".join(sql_seqs) - try: - data_set = self.__session.execute_statement(sql) - col_names = None - col_types = None - rows = [] - - if data_set: - data = data_set.todf() - - if self.__sqlalchemy_mode and time_index: - time_column = data.columns[0] - time_column_value = data.Time - del data[time_column] - for i in range(len(time_index)): - data.insert(time_index[i], time_names[i], time_column_value) - - col_names = data.columns.tolist() - col_types = data_set.get_column_types() - rows = data.values.tolist() - data_set.close_operation_handle() - - self.__result = { - "col_names": col_names, - "col_types": col_types, - "rows": rows, - "row_count": len(rows), - } - except Exception: - logger.error("failed to execute statement:{}".format(sql)) - self.__result = { - "col_names": None, - "col_types": None, - "rows": [], - "row_count": -1, - } + data_set = self.__session.execute_statement(sql) + col_names = None + col_types = None + rows = [] + + if data_set: + data = data_set.todf() + + if self.__sqlalchemy_mode and time_index: + time_column = data.columns[0] + time_column_value = data.Time + del data[time_column] + for i in range(len(time_index)): + data.insert(time_index[i], time_names[i], time_column_value) + + col_names = data.columns.tolist() + col_types = data_set.get_column_types() + rows = data.values.tolist() + data_set.close_operation_handle() + + self.__result = { + "col_names": col_names, + "col_types": col_types, + "rows": rows, + "row_count": len(rows), + } self.__rows = iter(self.__result["rows"]) def executemany(self, operation, seq_of_parameters=None): diff --git a/iotdb-client/client-py/iotdb/sqlalchemy/IoTDBDDLCompiler.py b/iotdb-client/client-py/iotdb/sqlalchemy/IoTDBDDLCompiler.py new file mode 100644 index 0000000000000..0d8f377cb1a41 --- /dev/null +++ b/iotdb-client/client-py/iotdb/sqlalchemy/IoTDBDDLCompiler.py @@ -0,0 +1,67 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from sqlalchemy.sql.compiler import DDLCompiler + + +class IoTDBDDLCompiler(DDLCompiler): + def visit_create_column(self, create, first_pk=False, **kw): + column = create.element + + if column.system: + return None + + category = column.dialect_options["iotdb"].get("category") + + if category and category.upper() == "TIME": + colspec = self.preparer.format_column(column) + " TIME" + return colspec + + colspec = ( + self.preparer.format_column(column) + + " " + + self.dialect.type_compiler_instance.process( + column.type, type_expression=column + ) + ) + + if category: + colspec += " " + category.upper() + + return colspec + + def post_create_table(self, table): + ttl = table.dialect_options["iotdb"].get("ttl") + if ttl is not None: + return " WITH (TTL=%d)" % int(ttl) + return "" + + def create_table_constraints(self, table, **kw): + return "" + + def visit_primary_key_constraint(self, constraint, **kw): + return None + + def visit_foreign_key_constraint(self, constraint, **kw): + return None + + def visit_unique_constraint(self, constraint, **kw): + return None + + def visit_check_constraint(self, constraint, **kw): + return None diff --git a/iotdb-client/client-py/iotdb/sqlalchemy/IoTDBDialect.py b/iotdb-client/client-py/iotdb/sqlalchemy/IoTDBDialect.py index 912e23e9f7a14..44f9e860fea69 100644 --- a/iotdb-client/client-py/iotdb/sqlalchemy/IoTDBDialect.py +++ b/iotdb-client/client-py/iotdb/sqlalchemy/IoTDBDialect.py @@ -16,58 +16,64 @@ # under the License. # -from sqlalchemy import types, util +from sqlalchemy import schema as sa_schema, types from sqlalchemy.engine import default from sqlalchemy.sql import text -from sqlalchemy.sql.sqltypes import String from iotdb import dbapi +from .IoTDBDDLCompiler import IoTDBDDLCompiler from .IoTDBIdentifierPreparer import IoTDBIdentifierPreparer from .IoTDBSQLCompiler import IoTDBSQLCompiler from .IoTDBTypeCompiler import IoTDBTypeCompiler -TYPES_MAP = { +IOTDB_CATEGORY_TIME = "TIME" +IOTDB_CATEGORY_TAG = "TAG" +IOTDB_CATEGORY_ATTRIBUTE = "ATTRIBUTE" +IOTDB_CATEGORY_FIELD = "FIELD" + +ischema_names = { "BOOLEAN": types.Boolean, "INT32": types.Integer, "INT64": types.BigInteger, "FLOAT": types.Float, "DOUBLE": types.Float, + "STRING": types.String, "TEXT": types.Text, - "LONG": types.BigInteger, + "BLOB": types.LargeBinary, + "TIMESTAMP": types.DateTime, + "DATE": types.Date, } class IoTDBDialect(default.DefaultDialect): name = "iotdb" - driver = "iotdb-python" + driver = "iotdb" + statement_compiler = IoTDBSQLCompiler - type_compiler = IoTDBTypeCompiler + ddl_compiler = IoTDBDDLCompiler + type_compiler_cls = IoTDBTypeCompiler preparer = IoTDBIdentifierPreparer - convert_unicode = True - supports_unicode_statements = True - supports_unicode_binds = True - supports_simple_order_by_label = False + supports_alter = True supports_schemas = True - supports_right_nested_joins = False - description_encoding = None - - if hasattr(String, "RETURNS_UNICODE"): - returns_unicode_strings = String.RETURNS_UNICODE - else: - - def _check_unicode_returns(self, connection, additional_tests=None): - return True - - _check_unicode_returns = _check_unicode_returns - - def create_connect_args(self, url): - # inherits the docstring from interfaces.Dialect.create_connect_args - opts = url.translate_connect_args() - opts.update(url.query) - opts.update({"sqlalchemy_mode": True}) - return [[], opts] + supports_sequences = False + supports_native_boolean = True + supports_native_enum = False + supports_statement_cache = True + insert_returning = False + update_returning = False + delete_returning = False + supports_default_values = False + supports_empty_insert = False + postfetch_lastrowid = False + supports_sane_rowcount = False + supports_sane_multi_rowcount = False + + construct_arguments = [ + (sa_schema.Column, {"category": None}), + (sa_schema.Table, {"ttl": None}), + ] @classmethod def import_dbapi(cls): @@ -77,8 +83,23 @@ def import_dbapi(cls): def dbapi(cls): return dbapi - def has_schema(self, connection, schema): - return schema in self.get_schema_names(connection) + def create_connect_args(self, url): + opts = url.translate_connect_args() + opts.update(url.query) + opts["sql_dialect"] = "table" + return ([], opts) + + def initialize(self, connection): + pass + + def _get_server_version_info(self, connection): + return None + + def _get_default_schema_name(self, connection): + return None + + def has_schema(self, connection, schema_name, **kw): + return schema_name in self.get_schema_names(connection) def has_table(self, connection, table_name, schema=None, **kw): return table_name in self.get_table_names(connection, schema=schema) @@ -88,22 +109,41 @@ def get_schema_names(self, connection, **kw): return [row[0] for row in cursor.fetchall()] def get_table_names(self, connection, schema=None, **kw): - cursor = connection.execute( - text("SHOW DEVICES %s.**" % (schema or self.default_schema_name)) - ) - return [row[0].replace(schema + ".", "", 1) for row in cursor.fetchall()] + if schema: + connection.execute(text("USE %s" % schema)) + cursor = connection.execute(text("SHOW TABLES")) + return [row[0] for row in cursor.fetchall()] def get_columns(self, connection, table_name, schema=None, **kw): + if schema: + connection.execute(text("USE %s" % schema)) cursor = connection.execute( - text("SHOW TIMESERIES %s.%s.*" % (schema, table_name)) + text("SHOW COLUMNS FROM %s" % table_name) ) - columns = [self._general_time_column_info()] + columns = [] for row in cursor.fetchall(): - columns.append(self._create_column_info(row, schema, table_name)) + col_name = row[0] + col_type_str = row[1] + col_category = row[2] if len(row) > 2 else None + + sa_type = ischema_names.get(col_type_str.upper(), types.UserDefinedType) + + col_info = { + "name": col_name, + "type": sa_type() if isinstance(sa_type, type) else sa_type, + "nullable": True, + "default": None, + } + + if col_category: + col_info["iotdb_category"] = col_category.upper() + + columns.append(col_info) + return columns def get_pk_constraint(self, connection, table_name, schema=None, **kw): - pass + return {"constrained_columns": [], "name": None} def get_foreign_keys(self, connection, table_name, schema=None, **kw): return [] @@ -111,33 +151,11 @@ def get_foreign_keys(self, connection, table_name, schema=None, **kw): def get_indexes(self, connection, table_name, schema=None, **kw): return [] - @util.memoized_property - def _dialect_specific_select_one(self): - # IoTDB does not support select 1 - # so replace the statement with "show version" - return "SHOW VERSION" - - def _general_time_column_info(self): - """ - Treat Time as a column - """ - return { - "name": "Time", - "type": self._resolve_type("LONG"), - "nullable": False, - "default": None, - } - - def _create_column_info(self, row, schema, table_name): - """ - Generate description information for each column - """ - return { - "name": row[0].replace(schema + "." + table_name + ".", "", 1), - "type": self._resolve_type(row[3]), - "nullable": True, - "default": None, - } - - def _resolve_type(self, type_): - return TYPES_MAP.get(type_, types.UserDefinedType) + def get_view_names(self, connection, schema=None, **kw): + return [] + + def do_commit(self, dbapi_connection): + pass + + def do_rollback(self, dbapi_connection): + pass diff --git a/iotdb-client/client-py/iotdb/sqlalchemy/IoTDBSQLCompiler.py b/iotdb-client/client-py/iotdb/sqlalchemy/IoTDBSQLCompiler.py index 008a314e683d4..08482ac41b948 100644 --- a/iotdb-client/client-py/iotdb/sqlalchemy/IoTDBSQLCompiler.py +++ b/iotdb-client/client-py/iotdb/sqlalchemy/IoTDBSQLCompiler.py @@ -17,308 +17,7 @@ # from sqlalchemy.sql.compiler import SQLCompiler -from sqlalchemy.sql.compiler import OPERATORS -from sqlalchemy.sql import operators class IoTDBSQLCompiler(SQLCompiler): - def order_by_clause(self, select, **kw): - """allow dialects to customize how ORDER BY is rendered.""" - - order_by = select._order_by_clause._compiler_dispatch(self, **kw) - if "Time" in order_by: - return " ORDER BY " + order_by.replace('"', "") - else: - return "" - - def group_by_clause(self, select, **kw): - """allow dialects to customize how GROUP BY is rendered.""" - return "" - - def visit_select( - self, - select, - asfrom=False, - parens=True, - fromhints=None, - compound_index=0, - nested_join_translation=False, - select_wraps_for=None, - lateral=False, - **kwargs, - ): - """ - Override this method to solve two problems - 1. IoTDB does not support querying Time as a measurement name (e.g. select Time from root.storagegroup.device) - 2. IoTDB does not support path.measurement format to determine a column (e.g. select root.storagegroup.device.temperature from root.storagegroup.device) - """ - assert select_wraps_for is None, ( - "SQLAlchemy 1.4 requires use of " - "the translate_select_structure hook for structural " - "translations of SELECT objects" - ) - - # initial setup of SELECT. the compile_state_factory may now - # be creating a totally different SELECT from the one that was - # passed in. for ORM use this will convert from an ORM-state - # SELECT to a regular "Core" SELECT. other composed operations - # such as computation of joins will be performed. - - kwargs["within_columns_clause"] = False - - compile_state = select_stmt._compile_state_factory(select_stmt, self, **kwargs) - select_stmt = compile_state.statement - - toplevel = not self.stack - - if toplevel and not self.compile_state: - self.compile_state = compile_state - - is_embedded_select = compound_index is not None or insert_into - - # translate step for Oracle, SQL Server which often need to - # restructure the SELECT to allow for LIMIT/OFFSET and possibly - # other conditions - if self.translate_select_structure: - new_select_stmt = self.translate_select_structure( - select_stmt, asfrom=asfrom, **kwargs - ) - - # if SELECT was restructured, maintain a link to the originals - # and assemble a new compile state - if new_select_stmt is not select_stmt: - compile_state_wraps_for = compile_state - select_wraps_for = select_stmt - select_stmt = new_select_stmt - - compile_state = select_stmt._compile_state_factory( - select_stmt, self, **kwargs - ) - select_stmt = compile_state.statement - - entry = self._default_stack_entry if toplevel else self.stack[-1] - - populate_result_map = need_column_expressions = ( - toplevel - or entry.get("need_result_map_for_compound", False) - or entry.get("need_result_map_for_nested", False) - ) - - # indicates there is a CompoundSelect in play and we are not the - # first select - if compound_index: - populate_result_map = False - - # this was first proposed as part of #3372; however, it is not - # reached in current tests and could possibly be an assertion - # instead. - if not populate_result_map and "add_to_result_map" in kwargs: - del kwargs["add_to_result_map"] - - froms = self._setup_select_stack( - select_stmt, compile_state, entry, asfrom, lateral, compound_index - ) - - column_clause_args = kwargs.copy() - column_clause_args.update( - {"within_label_clause": False, "within_columns_clause": False} - ) - - text = "SELECT " # we're off to a good start ! - - if select_stmt._hints: - hint_text, byfrom = self._setup_select_hints(select_stmt) - if hint_text: - text += hint_text + " " - else: - byfrom = None - - if select_stmt._independent_ctes: - for cte in select_stmt._independent_ctes: - cte._compiler_dispatch(self, **kwargs) - - if select_stmt._prefixes: - text += self._generate_prefixes( - select_stmt, select_stmt._prefixes, **kwargs - ) - - text += self.get_select_precolumns(select_stmt, **kwargs) - # the actual list of columns to print in the SELECT column list. - inner_columns = [ - c - for c in [ - self._label_select_column( - select_stmt, - column, - populate_result_map, - asfrom, - column_clause_args, - name=name, - proxy_name=proxy_name, - fallback_label_name=fallback_label_name, - column_is_repeated=repeated, - need_column_expressions=need_column_expressions, - ) - for ( - name, - proxy_name, - fallback_label_name, - column, - repeated, - ) in compile_state.columns_plus_names - ] - if c is not None - ] - - if populate_result_map and select_wraps_for is not None: - # if this select was generated from translate_select, - # rewrite the targeted columns in the result map - - translate = dict( - zip( - [ - name - for ( - key, - proxy_name, - fallback_label_name, - name, - repeated, - ) in compile_state.columns_plus_names - ], - [ - name - for ( - key, - proxy_name, - fallback_label_name, - name, - repeated, - ) in compile_state_wraps_for.columns_plus_names - ], - ) - ) - - self._result_columns = [ - (key, name, tuple(translate.get(o, o) for o in obj), type_) - for key, name, obj, type_ in self._result_columns - ] - - # change the superset aggregate function name into iotdb aggregate function name - # by matching the head of aggregate function name and replace it. - for i in range(len(inner_columns)): - if inner_columns[i].startswith("max("): - inner_columns[i] = inner_columns[i].replace("max(", "max_value(") - if inner_columns[i].startswith("min("): - inner_columns[i] = inner_columns[i].replace("min(", "min_value(") - if inner_columns[i].startswith("count(DISTINCT"): - inner_columns[i] = inner_columns[i].replace("count(DISTINCT", "count(") - - # IoTDB does not allow to query Time as column, - # need to filter out Time and pass Time and Time's alias to DBAPI separately - # to achieve the query of Time by encoding. - time_column_index = [] - time_column_names = [] - for i in range(len(inner_columns)): - column_strs = ( - inner_columns[i].replace(self.preparer.initial_quote, "").split() - ) - if "Time" in column_strs: - time_column_index.append(str(i)) - time_column_names.append( - column_strs[2] - if OPERATORS[operators.as_] in column_strs - else column_strs[0] - ) - # delete Time column - inner_columns = list( - filter( - lambda x: "Time" - not in x.replace(self.preparer.initial_quote, "").split(), - inner_columns, - ) - ) - - if inner_columns and time_column_index: - inner_columns[-1] = ( - inner_columns[-1] - + " \n FROM Time Index " - + " ".join(time_column_index) - + " \n FROM Time Name " - + " ".join(time_column_names) - ) - - text = self._compose_select_body( - text, - select_stmt, - compile_state, - inner_columns, - froms, - byfrom, - toplevel, - kwargs, - ) - - if select_stmt._statement_hints: - per_dialect = [ - ht - for (dialect_name, ht) in select_stmt._statement_hints - if dialect_name in ("*", self.dialect.name) - ] - if per_dialect: - text += " " + self.get_statement_hint_text(per_dialect) - - # In compound query, CTEs are shared at the compound level - if self.ctes and (not is_embedded_select or toplevel): - nesting_level = len(self.stack) if not toplevel else None - text = ( - self._render_cte_clause( - nesting_level=nesting_level, - visiting_cte=kwargs.get("visiting_cte"), - ) - + text - ) - - if select_stmt._suffixes: - text += " " + self._generate_prefixes( - select_stmt, select_stmt._suffixes, **kwargs - ) - - self.stack.pop(-1) - return text - - def visit_table( - self, - table, - asfrom=False, - iscrud=False, - ashint=False, - fromhints=None, - use_schema=True, - **kwargs, - ): - """ - IoTDB's table does not support quotation marks (e.g. select ** from `root.`) - need to override this method - """ - if asfrom or ashint: - effective_schema = self.preparer.schema_for_object(table) - - if use_schema and effective_schema: - ret = effective_schema + "." + table.name - else: - ret = table.name - if fromhints and table in fromhints: - ret = self.format_from_hint_text(ret, table, fromhints[table], iscrud) - return ret - else: - return "" - - def visit_column( - self, column, add_to_result_map=None, include_table=True, **kwargs - ): - """ - IoTDB's where statement does not support "table".column format(e.g. "table".column > 1) - need to override this method to return the name of column directly - """ - return column.name + pass diff --git a/iotdb-client/client-py/iotdb/sqlalchemy/IoTDBTypeCompiler.py b/iotdb-client/client-py/iotdb/sqlalchemy/IoTDBTypeCompiler.py index 4cfd2480bd4b7..1fb80185bd53b 100644 --- a/iotdb-client/client-py/iotdb/sqlalchemy/IoTDBTypeCompiler.py +++ b/iotdb-client/client-py/iotdb/sqlalchemy/IoTDBTypeCompiler.py @@ -20,11 +20,17 @@ class IoTDBTypeCompiler(GenericTypeCompiler): + def visit_BOOLEAN(self, type_, **kw): + return "BOOLEAN" + def visit_FLOAT(self, type_, **kw): return "FLOAT" + def visit_REAL(self, type_, **kw): + return "FLOAT" + def visit_NUMERIC(self, type_, **kw): - return "INT64" + return "DOUBLE" def visit_DECIMAL(self, type_, **kw): return "DOUBLE" @@ -36,10 +42,76 @@ def visit_SMALLINT(self, type_, **kw): return "INT32" def visit_BIGINT(self, type_, **kw): - return "LONG" + return "INT64" def visit_TIMESTAMP(self, type_, **kw): - return "LONG" + return "TIMESTAMP" + + def visit_DATETIME(self, type_, **kw): + return "TIMESTAMP" + + def visit_DATE(self, type_, **kw): + return "DATE" + + def visit_TEXT(self, type_, **kw): + return "STRING" + + def visit_VARCHAR(self, type_, **kw): + return "STRING" + + def visit_NVARCHAR(self, type_, **kw): + return "STRING" + + def visit_CHAR(self, type_, **kw): + return "STRING" + + def visit_BLOB(self, type_, **kw): + return "BLOB" + + def visit_BINARY(self, type_, **kw): + return "BLOB" + + def visit_VARBINARY(self, type_, **kw): + return "BLOB" + + def visit_LARGE_BINARY(self, type_, **kw): + return "BLOB" + + def visit_large_binary(self, type_, **kw): + return "BLOB" + + def visit_boolean(self, type_, **kw): + return "BOOLEAN" + + def visit_string(self, type_, **kw): + return "STRING" + + def visit_unicode(self, type_, **kw): + return "STRING" def visit_text(self, type_, **kw): - return "TEXT" + return "STRING" + + def visit_unicode_text(self, type_, **kw): + return "STRING" + + def visit_float(self, type_, **kw): + return "FLOAT" + + def visit_numeric(self, type_, **kw): + return "DOUBLE" + + def visit_integer(self, type_, **kw): + return "INT32" + + def visit_big_integer(self, type_, **kw): + return "INT64" + + def visit_timestamp(self, type_, **kw): + return "TIMESTAMP" + + def visit_datetime(self, type_, **kw): + return "TIMESTAMP" + + def visit_date(self, type_, **kw): + return "DATE" diff --git a/iotdb-client/client-py/iotdb/sqlalchemy/__init__.py b/iotdb-client/client-py/iotdb/sqlalchemy/__init__.py index 2a1e720805f29..7a6b7223a6fe0 100644 --- a/iotdb-client/client-py/iotdb/sqlalchemy/__init__.py +++ b/iotdb-client/client-py/iotdb/sqlalchemy/__init__.py @@ -15,3 +15,7 @@ # specific language governing permissions and limitations # under the License. # + +from .IoTDBDialect import IoTDBDialect + +__all__ = ["IoTDBDialect"] diff --git a/iotdb-client/client-py/tests/integration/sqlalchemy/test_dialect.py b/iotdb-client/client-py/tests/integration/sqlalchemy/test_dialect.py index 9ed0c808917ee..c5bb2fa30290f 100644 --- a/iotdb-client/client-py/tests/integration/sqlalchemy/test_dialect.py +++ b/iotdb-client/client-py/tests/integration/sqlalchemy/test_dialect.py @@ -16,96 +16,236 @@ # under the License. # -import operator - -from sqlalchemy import create_engine, inspect +from sqlalchemy import ( + create_engine, + inspect, + Column, + Float, + Integer, + BigInteger, + String, + Boolean, + Table, + MetaData, +) from sqlalchemy.dialects import registry -from sqlalchemy.orm import Session from sqlalchemy.sql import text from tests.integration.iotdb_container import IoTDBContainer from urllib.parse import quote_plus as urlquote -final_flag = True -failed_count = 0 - - -def test_fail(): - global failed_count - global final_flag - final_flag = False - failed_count += 1 +TEST_DB = "test_sqlalchemy" -def print_message(message): - print("*********") - print(message) - print("*********") - assert False - -def test_dialect(): +def test_table_model_dialect(): with IoTDBContainer("iotdb:dev") as db: db: IoTDBContainer - password = urlquote("root") + password = urlquote("TimechoDB@2021") host = db.get_container_host_ip() port = db.get_exposed_port(6667) - url = f"iotdb://root:{password}@{host}:{port}" + url = f"iotdb://root:{password}@{host}:{port}/{TEST_DB}" registry.register("iotdb", "iotdb.sqlalchemy.IoTDBDialect", "IoTDBDialect") - eng = create_engine(url) - - with Session(eng) as session: - session.execute(text("create database root.cursor")) - session.execute(text("create database root.cursor_s1")) - session.execute( - text( - "create timeseries root.cursor.device1.temperature with datatype=FLOAT,encoding=RLE" - ) + engine = create_engine(url) + + with engine.connect() as conn: + conn.execute(text("CREATE DATABASE %s" % TEST_DB)) + + _test_ddl(engine) + _test_dml(engine) + _test_reflection(engine) + _test_time_column(engine) + + with engine.connect() as conn: + conn.execute(text("DROP DATABASE %s" % TEST_DB)) + + engine.dispose() + print("All table model dialect tests passed!") + + +def _test_ddl(engine): + metadata = MetaData() + + sensors = Table( + "sensors", + metadata, + Column("region", String, iotdb_category="TAG"), + Column("device_id", String, iotdb_category="TAG"), + Column("model", String, iotdb_category="ATTRIBUTE"), + Column("temperature", Float, iotdb_category="FIELD"), + Column("humidity", Float, iotdb_category="FIELD"), + Column("status", Boolean, iotdb_category="FIELD"), + schema=TEST_DB, + iotdb_ttl=3600000, + ) + + metadata.create_all(engine) + + insp = inspect(engine) + table_names = insp.get_table_names(schema=TEST_DB) + assert "sensors" in table_names, ( + "CREATE TABLE failed: 'sensors' not in %s" % table_names + ) + + sensors.drop(engine) + table_names = insp.get_table_names(schema=TEST_DB) + assert "sensors" not in table_names, ( + "DROP TABLE failed: 'sensors' still in %s" % table_names + ) + + print(" DDL tests passed") + + +def _test_dml(engine): + metadata = MetaData() + sensors = Table( + "sensors_dml", + metadata, + Column("region", String, iotdb_category="TAG"), + Column("device_id", String, iotdb_category="TAG"), + Column("temperature", Float, iotdb_category="FIELD"), + Column("humidity", Float, iotdb_category="FIELD"), + schema=TEST_DB, + ) + metadata.create_all(engine) + + with engine.connect() as conn: + conn.execute( + sensors.insert().values( + region="asia", + device_id="d001", + temperature=25.5, + humidity=60.0, ) - session.execute( - text( - "create timeseries root.cursor.device1.status with datatype=FLOAT,encoding=RLE" - ) + ) + conn.execute( + sensors.insert().values( + region="europe", + device_id="d002", + temperature=18.3, + humidity=75.0, ) - session.execute( - text( - "create timeseries root.cursor.device2.temperature with datatype=FLOAT,encoding=RLE" - ) + ) + + result = conn.execute(sensors.select()).fetchall() + assert len(result) == 2, "INSERT/SELECT failed: expected 2 rows, got %d" % len( + result + ) + + result = conn.execute( + sensors.select().where(sensors.c.region == "asia") + ).fetchall() + assert len(result) == 1, ( + "SELECT WHERE failed: expected 1 row, got %d" % len(result) + ) + + result = conn.execute( + sensors.select().order_by(sensors.c.temperature).limit(1) + ).fetchall() + assert len(result) == 1, "LIMIT failed: expected 1 row, got %d" % len(result) + + conn.execute( + sensors.delete().where(sensors.c.device_id == "d002") + ) + result = conn.execute(sensors.select()).fetchall() + assert len(result) == 1, ( + "DELETE failed: expected 1 row after delete, got %d" % len(result) + ) + + sensors.drop(engine) + print(" DML tests passed") + + +def _test_reflection(engine): + with engine.connect() as conn: + conn.execute(text("USE %s" % TEST_DB)) + conn.execute( + text( + "CREATE TABLE reflect_test (" + "region STRING TAG, " + "device STRING TAG, " + "model STRING ATTRIBUTE, " + "temperature FLOAT FIELD, " + "humidity DOUBLE FIELD" + ")" ) - - insp = inspect(eng) - # test get_schema_names - schema_names = insp.get_schema_names() - if not operator.ge( - schema_names, ["root.__audit", "root.cursor", "root.cursor_s1"] - ): - test_fail() - print_message("Actual result " + str(schema_names)) - print_message("test get_schema_names failed!") - # test get_table_names - table_names = insp.get_table_names("root.cursor") - if not operator.eq(table_names, ["device1", "device2"]): - test_fail() - print_message("Actual result " + str(table_names)) - print_message("test get_table_names failed!") - # test get_columns - columns = insp.get_columns(table_name="device1", schema="root.cursor") - if len(columns) != 3: - test_fail() - print_message("Actual result " + str(columns)) - print_message("test get_columns failed!") - - with Session(eng) as session: - session.execute(text("delete database root.cursor")) - session.execute(text("delete database root.cursor_s1")) - - # close engine - eng.dispose() - - -if final_flag: - print("All executions done!!") -else: - print("Some test failed, please have a check") - print("failed count: ", failed_count) - exit(1) + ) + + insp = inspect(engine) + + schemas = insp.get_schema_names() + assert TEST_DB in schemas, "%s not in schemas: %s" % (TEST_DB, schemas) + + tables = insp.get_table_names(schema=TEST_DB) + assert "reflect_test" in tables, "reflect_test not in tables: %s" % tables + + columns = insp.get_columns(table_name="reflect_test", schema=TEST_DB) + col_names = [c["name"] for c in columns] + assert "region" in col_names, "region not in columns: %s" % col_names + assert "temperature" in col_names, "temperature not in columns: %s" % col_names + + pk = insp.get_pk_constraint(table_name="reflect_test", schema=TEST_DB) + assert pk["constrained_columns"] == [], "Expected empty PK constraint" + + fks = insp.get_foreign_keys(table_name="reflect_test", schema=TEST_DB) + assert fks == [], "Expected empty FK list" + + indexes = insp.get_indexes(table_name="reflect_test", schema=TEST_DB) + assert indexes == [], "Expected empty index list" + + with engine.connect() as conn: + conn.execute(text("USE %s" % TEST_DB)) + conn.execute(text("DROP TABLE reflect_test")) + + print(" Reflection tests passed") + + +def _test_time_column(engine): + metadata = MetaData() + + table_implicit = Table( + "time_implicit", + metadata, + Column("device", String, iotdb_category="TAG"), + Column("value", Float, iotdb_category="FIELD"), + schema=TEST_DB, + ) + metadata.create_all(engine) + + with engine.connect() as conn: + conn.execute( + table_implicit.insert().values(device="d001", value=42.0) + ) + result = conn.execute(table_implicit.select()).fetchall() + assert len(result) == 1, ( + "Implicit TIME insert failed: expected 1 row, got %d" % len(result) + ) + + table_implicit.drop(engine) + + metadata2 = MetaData() + table_explicit = Table( + "time_explicit", + metadata2, + Column("ts", BigInteger, iotdb_category="TIME"), + Column("device", String, iotdb_category="TAG"), + Column("value", Float, iotdb_category="FIELD"), + schema=TEST_DB, + ) + metadata2.create_all(engine) + + with engine.connect() as conn: + conn.execute( + table_explicit.insert().values(ts=1000000, device="d001", value=42.0) + ) + result = conn.execute(table_explicit.select()).fetchall() + assert len(result) == 1, ( + "Explicit TIME insert failed: expected 1 row, got %d" % len(result) + ) + + table_explicit.drop(engine) + print(" TIME column tests passed") + + +if __name__ == "__main__": + test_table_model_dialect() From dfe9b27184c66dfcb0c07fb47282da535edc9dd9 Mon Sep 17 00:00:00 2001 From: JackieTien97 Date: Wed, 22 Apr 2026 09:32:10 +0800 Subject: [PATCH 2/3] Add SQLAlchemy example, Chinese README, and comprehensive dialect tests - Add sqlalchemy_example.py covering DDL/DML/reflection/raw SQL - Add README_ZH.md as full Chinese translation of README.md - Update README.md SQLAlchemy section for IoTDB 2.0+ table model - Add tests for all data types, advanced queries (aggregation, AND/OR, NULL, batch insert, LIMIT+OFFSET), schema operations (has_schema, has_table, multiple databases, column category reflection, get_view_names), raw SQL, and URL-with-database --- iotdb-client/client-py/README.md | 206 +++-- iotdb-client/client-py/README_ZH.md | 671 +++++++++++++++ iotdb-client/client-py/sqlalchemy_example.py | 243 ++++++ .../integration/sqlalchemy/test_dialect.py | 781 +++++++++++++++++- 4 files changed, 1811 insertions(+), 90 deletions(-) create mode 100644 iotdb-client/client-py/README_ZH.md create mode 100644 iotdb-client/client-py/sqlalchemy_example.py diff --git a/iotdb-client/client-py/README.md b/iotdb-client/client-py/README.md index c7dc1b33cc9f1..f3c404916233d 100644 --- a/iotdb-client/client-py/README.md +++ b/iotdb-client/client-py/README.md @@ -413,93 +413,161 @@ cursor.close() conn.close() ``` -### IoTDB SQLAlchemy Dialect (Experimental) -The SQLAlchemy dialect of IoTDB is written to adapt to Apache Superset. -This part is still being improved. -Please do not use it in the production environment! -#### Mapping of the metadata -The data model used by SQLAlchemy is a relational data model, which describes the relationships between different entities through tables. -While the data model of IoTDB is a hierarchical data model, which organizes the data through a tree structure. -In order to adapt IoTDB to the dialect of SQLAlchemy, the original data model in IoTDB needs to be reorganized. -Converting the data model of IoTDB into the data model of SQLAlchemy. - -The metadata in the IoTDB are: - -1. Database -2. Path -3. Entity -4. Measurement - -The metadata in the SQLAlchemy are: -1. Schema -2. Table -3. Column - -The mapping relationship between them is: - -| The metadata in the SQLAlchemy | The metadata in the IoTDB | -| -------------------- | ---------------------------------------------- | -| Schema | Database | -| Table | Path ( from database to entity ) + Entity | -| Column | Measurement | - -The following figure shows the relationship between the two more intuitively: - -![sqlalchemy-to-iotdb](https://github.com/apache/iotdb-bin-resources/blob/main/docs/UserGuide/API/IoTDB-SQLAlchemy/sqlalchemy-to-iotdb.png?raw=true) - -#### Data type mapping -| data type in IoTDB | data type in SQLAlchemy | -|--------------------|-------------------------| -| BOOLEAN | Boolean | -| INT32 | Integer | -| INT64 | BigInteger | -| FLOAT | Float | -| DOUBLE | Float | -| TEXT | Text | -| LONG | BigInteger | -#### Example - -+ execute statement +### IoTDB SQLAlchemy Dialect + +The IoTDB SQLAlchemy dialect provides a standard SQLAlchemy interface for IoTDB's **table model** (IoTDB 2.0+). +It supports DDL (CREATE/DROP TABLE), DML (INSERT/SELECT/DELETE), and schema reflection. +A complete runnable example is available at: [SQLAlchemy Example](https://github.com/apache/iotdb/blob/master/iotdb-client/client-py/sqlalchemy_example.py) + +#### Prerequisites + +```bash +pip install apache-iotdb sqlalchemy +``` + +#### Connection URL + +``` +iotdb://username:password@host:port/database +``` + +The `/database` part is optional. If omitted, you can specify the database using `schema=` on tables or via `USE` statements. ```python from sqlalchemy import create_engine engine = create_engine("iotdb://root:root@127.0.0.1:6667") -connect = engine.connect() -result = connect.execute("SELECT ** FROM root") -for row in result.fetchall(): - print(row) ``` -+ ORM (now only simple queries are supported) +#### Metadata Mapping + +| SQLAlchemy | IoTDB | +|------------|----------| +| Schema | Database | +| Table | Table | +| Column | Column | + +#### Column Categories + +IoTDB table model columns have categories that must be specified via the `iotdb_category` dialect option: + +| Category | Description | +|-------------|------------------------------------------------------| +| `TIME` | Timestamp column (auto-generated if not specified) | +| `TAG` | Identifier/indexing columns (e.g., region, device) | +| `ATTRIBUTE` | Descriptive columns (e.g., model, firmware version) | +| `FIELD` | Measurement/metric columns (e.g., temperature) | + +#### Data Type Mapping + +| IoTDB | SQLAlchemy | +|-----------|--------------| +| BOOLEAN | Boolean | +| INT32 | Integer | +| INT64 | BigInteger | +| FLOAT | Float | +| DOUBLE | Float | +| STRING | String | +| TEXT | Text | +| BLOB | LargeBinary | +| TIMESTAMP | DateTime | +| DATE | Date | + +#### DDL — Create Table + +Use `iotdb_category` on each column and `iotdb_ttl` on the table: ```python -from sqlalchemy import create_engine, Column, Float, BigInteger, MetaData -from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import sessionmaker +from sqlalchemy import Table, Column, Float, String, Boolean, MetaData + +metadata = MetaData() +sensors = Table( + "sensors", + metadata, + Column("region", String, iotdb_category="TAG"), + Column("device_id", String, iotdb_category="TAG"), + Column("model", String, iotdb_category="ATTRIBUTE"), + Column("temperature", Float, iotdb_category="FIELD"), + Column("humidity", Float, iotdb_category="FIELD"), + Column("status", Boolean, iotdb_category="FIELD"), + schema="my_database", + iotdb_ttl=86400000, # TTL in milliseconds (1 day) +) -metadata = MetaData( - schema='root.factory' +metadata.create_all(engine) +``` + +To define an explicit TIME column instead of using the auto-generated one: + +```python +from sqlalchemy import BigInteger + +events = Table( + "events", + metadata, + Column("ts", BigInteger, iotdb_category="TIME"), + Column("device_id", String, iotdb_category="TAG"), + Column("value", Float, iotdb_category="FIELD"), + schema="my_database", ) -Base = declarative_base(metadata=metadata) +``` +#### DML — Insert, Query, Delete -class Device(Base): - __tablename__ = "room2.device1" - Time = Column(BigInteger, primary_key=True) - temperature = Column(Float) - status = Column(Float) +```python +with engine.connect() as conn: + # Insert + conn.execute( + sensors.insert().values( + region="asia", device_id="d001", temperature=25.5, humidity=60.0, status=True, + ) + ) + # Select all + result = conn.execute(sensors.select()) + for row in result: + print(row) -engine = create_engine("iotdb://root:root@127.0.0.1:6667") + # Select with WHERE, ORDER BY, LIMIT + result = conn.execute( + sensors.select() + .where(sensors.c.region == "asia") + .order_by(sensors.c.temperature) + .limit(10) + ) -DbSession = sessionmaker(bind=engine) -session = DbSession() + # Delete + conn.execute(sensors.delete().where(sensors.c.device_id == "d001")) +``` -res = session.query(Device.status).filter(Device.temperature > 1) +#### Schema Reflection -for row in res: - print(row) +```python +from sqlalchemy import inspect + +insp = inspect(engine) + +# List databases +schemas = insp.get_schema_names() + +# List tables in a database +tables = insp.get_table_names(schema="my_database") + +# Get column details +columns = insp.get_columns(table_name="sensors", schema="my_database") +for col in columns: + print(col["name"], col["type"], col.get("iotdb_category")) +``` + +#### Raw SQL + +```python +from sqlalchemy.sql import text + +with engine.connect() as conn: + result = conn.execute(text("SELECT * FROM my_database.sensors")) + for row in result: + print(row) ``` diff --git a/iotdb-client/client-py/README_ZH.md b/iotdb-client/client-py/README_ZH.md new file mode 100644 index 0000000000000..3524ccbb463af --- /dev/null +++ b/iotdb-client/client-py/README_ZH.md @@ -0,0 +1,671 @@ + + +# Apache IoTDB + +[![GitHub release](https://img.shields.io/github/release/apache/iotdb.svg)](https://github.com/apache/iotdb/releases) +[![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html) +![](https://github-size-badge.herokuapp.com/apache/iotdb.svg) +![](https://img.shields.io/github/downloads/apache/iotdb/total.svg) +![](https://img.shields.io/badge/platform-win%20%7C%20macos%20%7C%20linux-yellow.svg) +[![IoTDB Website](https://img.shields.io/website-up-down-green-red/https/shields.io.svg?label=iotdb-website)](https://iotdb.apache.org/) + + +Apache IoTDB(物联网数据库)是一款物联网原生数据库,具有高性能的数据管理和分析能力,可部署在边缘和云端。 +凭借其轻量级架构、高性能和丰富的功能集,以及与 Apache Hadoop、Spark 和 Flink 的深度集成, +Apache IoTDB 能够满足物联网工业领域中海量数据存储、高速数据写入和复杂数据分析的需求。 + +## Python 原生 API + +### 环境要求 + +使用本包之前,需要先安装 thrift (>=0.14.1)。 + + + +### 使用方法(示例) + +首先,安装最新的包:`pip3 install apache-iotdb` + +读写数据的使用示例请参考:[示例](https://github.com/apache/iotdb/blob/master/iotdb-client/client-py/SessionExample.py) + +对齐时间序列的示例:[对齐时间序列 Session 示例](https://github.com/apache/iotdb/blob/master/iotdb-client/client-py/SessionAlignedTimeseriesExample.py) + +(需要在文件开头添加 `import iotdb`) + +或者: + +```python +from iotdb.Session import Session + +ip = "127.0.0.1" +port_ = "6667" +username_ = "root" +password_ = "root" +session = Session(ip, port_, username_, password_) +session.open(False) +zone = session.get_time_zone() +session.close() +``` + +### 初始化 + +* 初始化 Session + +```python +session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="Asia/Shanghai") +``` + +* 打开 Session,可通过参数指定是否启用 RPC 压缩 + +```python +session.open(enable_rpc_compression=False) +``` + +注意:客户端的 RPC 压缩状态必须与 IoTDB 服务器一致。 + +* 关闭 Session + +```python +session.close() +``` + +### 数据定义接口(DDL 接口) + +#### 数据库管理 + +* 创建数据库 + +```python +session.set_storage_group(group_name) +``` + +* 删除一个或多个数据库 + +```python +session.delete_storage_group(group_name) +session.delete_storage_groups(group_name_lst) +``` +#### 时间序列管理 + +* 创建一个或多个时间序列 + +```python +session.create_time_series(ts_path, data_type, encoding, compressor, + props=None, tags=None, attributes=None, alias=None) + +session.create_multi_time_series( + ts_path_lst, data_type_lst, encoding_lst, compressor_lst, + props_lst=None, tags_lst=None, attributes_lst=None, alias_lst=None +) +``` + +* 创建对齐时间序列 + +```python +session.create_aligned_time_series( + device_id, measurements_lst, data_type_lst, encoding_lst, compressor_lst +) +``` + +注意:目前**不支持**度量(measurement)的别名。 + +* 删除一个或多个时间序列 + +```python +session.delete_time_series(paths_list) +``` + +* 检查指定的时间序列是否存在 + +```python +session.check_time_series_exists(path) +``` + +### 数据操作接口(DML 接口) + +#### 写入 + +建议使用 insertTablet 以提高写入效率。 + +* 插入一个 Tablet,即一个设备的多行数据,每行具有相同的度量 + + * **更好的写入性能** + * **支持空值**:用任意值填充空值位置,然后通过 BitMap 标记空值(v0.13 起支持) + + +Python API 中有两种 Tablet 实现。 + +* 普通 Tablet + +```python +values_ = [ + [False, 10, 11, 1.1, 10011.1, "test01"], + [True, 100, 11111, 1.25, 101.0, "test02"], + [False, 100, 1, 188.1, 688.25, "test03"], + [True, 0, 0, 0, 6.25, "test04"], +] +timestamps_ = [1, 2, 3, 4] +tablet_ = Tablet( + device_id, measurements_, data_types_, values_, timestamps_ +) +session.insert_tablet(tablet_) + +values_ = [ + [None, 10, 11, 1.1, 10011.1, "test01"], + [True, None, 11111, 1.25, 101.0, "test02"], + [False, 100, None, 188.1, 688.25, "test03"], + [True, 0, 0, 0, None, None], +] +timestamps_ = [16, 17, 18, 19] +tablet_ = Tablet( + device_id, measurements_, data_types_, values_, timestamps_ +) +session.insert_tablet(tablet_) +``` +* Numpy Tablet + +与普通 Tablet 相比,Numpy Tablet 使用 [numpy.ndarray](https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html) 来存储数据。 +由于内存占用更少且序列化开销更低,写入性能更好。 + +**注意** +1. Tablet 中的时间列和值列均为 ndarray。 +2. 建议为每个 ndarray 使用特定的 dtype,参见下方示例(使用默认 dtype 也可以)。 + +```python +import numpy as np +data_types_ = [ + TSDataType.BOOLEAN, + TSDataType.INT32, + TSDataType.INT64, + TSDataType.FLOAT, + TSDataType.DOUBLE, + TSDataType.TEXT, +] +np_values_ = [ + np.array([False, True, False, True], TSDataType.BOOLEAN.np_dtype()), + np.array([10, 100, 100, 0], TSDataType.INT32.np_dtype()), + np.array([11, 11111, 1, 0], TSDataType.INT64.np_dtype()), + np.array([1.1, 1.25, 188.1, 0], TSDataType.FLOAT.np_dtype()), + np.array([10011.1, 101.0, 688.25, 6.25], TSDataType.DOUBLE.np_dtype()), + np.array(["test01", "test02", "test03", "test04"], TSDataType.TEXT.np_dtype()), +] +np_timestamps_ = np.array([1, 2, 3, 4], TSDataType.INT64.np_dtype()) +np_tablet_ = NumpyTablet( + device_id, measurements_, data_types_, np_values_, np_timestamps_ +) +session.insert_tablet(np_tablet_) + +# 插入一个包含空值的 numpy tablet +np_values_ = [ + np.array([False, True, False, True], TSDataType.BOOLEAN.np_dtype()), + np.array([10, 100, 100, 0], TSDataType.INT32.np_dtype()), + np.array([11, 11111, 1, 0], TSDataType.INT64.np_dtype()), + np.array([1.1, 1.25, 188.1, 0], TSDataType.FLOAT.np_dtype()), + np.array([10011.1, 101.0, 688.25, 6.25], TSDataType.DOUBLE.np_dtype()), + np.array(["test01", "test02", "test03", "test04"], TSDataType.TEXT.np_dtype()), +] +np_timestamps_ = np.array([98, 99, 100, 101], TSDataType.INT64.np_dtype()) +np_bitmaps_ = [] +for i in range(len(measurements_)): + np_bitmaps_.append(BitMap(len(np_timestamps_))) +np_bitmaps_[0].mark(0) +np_bitmaps_[1].mark(1) +np_bitmaps_[2].mark(2) +np_bitmaps_[4].mark(3) +np_bitmaps_[5].mark(3) +np_tablet_with_none = NumpyTablet( + device_id, measurements_, data_types_, np_values_, np_timestamps_, np_bitmaps_ +) +session.insert_tablet(np_tablet_with_none) +``` + +* 插入多个 Tablet + +```python +session.insert_tablets(tablet_lst) +``` + +* 插入一条记录 + +```python +session.insert_record(device_id, timestamp, measurements_, data_types_, values_) +``` + +* 插入多条记录 + +```python +session.insert_records( + device_ids_, time_list_, measurements_list_, data_type_list_, values_list_ +) +``` + +* 插入属于同一设备的多条记录。 + 提供类型信息后,服务器无需进行类型推断,可获得更好的性能。 + + +```python +session.insert_records_of_one_device(device_id, time_list, measurements_list, data_types_list, values_list) +``` + +#### 带类型推断的写入 + +当数据为 String 类型时,可以使用以下接口根据值本身进行类型推断。例如,值为 "true" 时会自动推断为布尔类型,值为 "3.2" 时会自动推断为浮点类型。不提供类型信息时,服务器需要进行类型推断,这可能会消耗一定时间。 + +* 插入一条记录,包含一个设备在某个时间戳的多个度量值 + +```python +session.insert_str_record(device_id, timestamp, measurements, string_values) +``` + +#### 对齐时间序列的写入 + +对齐时间序列的写入使用 insert_aligned_XXX 系列接口,其余与上述接口类似: + +* insert_aligned_record +* insert_aligned_records +* insert_aligned_records_of_one_device +* insert_aligned_tablet +* insert_aligned_tablets + + +### IoTDB-SQL 接口 + +* 执行查询语句 + +```python +session.execute_query_statement(sql) +``` + +* 执行非查询语句 + +```python +session.execute_non_query_statement(sql) +``` + +* 执行语句 + +```python +session.execute_statement(sql) +``` + + + +### Pandas 支持 + +为了方便地将查询结果转换为 [Pandas DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html), +SessionDataSet 提供了 `.todf()` 方法,可以消费数据集并将其转换为 Pandas DataFrame。 + +示例: + +```python +from iotdb.Session import Session + +ip = "127.0.0.1" +port_ = "6667" +username_ = "root" +password_ = "root" +session = Session(ip, port_, username_, password_) +session.open(False) +result = session.execute_query_statement("SELECT * FROM root.*") + +# 转换为 Pandas DataFrame +df = result.todf() + +session.close() + +# 现在可以操作 DataFrame +df = ... +``` + + +### IoTDB 测试容器 + +测试支持基于 `testcontainers` 库 (https://testcontainers-python.readthedocs.io/en/latest/index.html),如果需要使用该功能,需要在项目中安装此库。 + +在 Docker 容器中启动(和停止)IoTDB 数据库: +```python +class MyTestCase(unittest.TestCase): + + def test_something(self): + with IoTDBContainer() as c: + session = Session("localhost", c.get_exposed_port(6667), "root", "root") + session.open(False) + result = session.execute_query_statement("SHOW TIMESERIES") + print(result) + session.close() +``` + +默认会加载 `apache/iotdb:latest` 镜像,如果需要特定版本,可以传入版本号,如 `IoTDBContainer("apache/iotdb:0.12.0")`。 + + +### IoTDB DBAPI + +IoTDB DBAPI 实现了 Python DB API 2.0 规范 (https://peps.python.org/pep-0249/),定义了 Python 中访问数据库的通用接口。 + +#### 示例 ++ 初始化 + +初始化参数与 Session 部分一致(sqlalchemy_mode 除外)。 +```python +from iotdb.dbapi import connect + +ip = "127.0.0.1" +port_ = "6667" +username_ = "root" +password_ = "root" +conn = connect(ip, port_, username_, password_,fetch_size=1024,zone_id="Asia/Shanghai",sqlalchemy_mode=False) +cursor = conn.cursor() +``` ++ 简单 SQL 语句执行 +```python +cursor.execute("SELECT * FROM root.*") +for row in cursor.fetchall(): + print(row) +``` + ++ 带参数执行 SQL + +IoTDB DBAPI 支持 pyformat 风格的参数 +```python +cursor.execute("SELECT * FROM root.* WHERE time < %(time)s",{"time":"2017-11-01T00:08:00.000"}) +for row in cursor.fetchall(): + print(row) +``` + ++ 批量执行 SQL +```python +seq_of_parameters = [ + {"timestamp": 1, "temperature": 1}, + {"timestamp": 2, "temperature": 2}, + {"timestamp": 3, "temperature": 3}, + {"timestamp": 4, "temperature": 4}, + {"timestamp": 5, "temperature": 5}, +] +sql = "insert into root.cursor(timestamp,temperature) values(%(timestamp)s,%(temperature)s)" +cursor.executemany(sql,seq_of_parameters) +``` + ++ 关闭连接和游标 +```python +cursor.close() +conn.close() +``` + +### IoTDB SQLAlchemy 方言 + +IoTDB SQLAlchemy 方言为 IoTDB 的**表模型**(IoTDB 2.0+)提供了标准的 SQLAlchemy 接口, +支持 DDL(CREATE/DROP TABLE)、DML(INSERT/SELECT/DELETE)和 Schema 反射。 + +完整可运行示例请参考:[SQLAlchemy 示例](https://github.com/apache/iotdb/blob/master/iotdb-client/client-py/sqlalchemy_example.py) + +#### 环境要求 + +```bash +pip install apache-iotdb sqlalchemy +``` + +#### 连接 URL + +``` +iotdb://username:password@host:port/database +``` + +`/database` 部分是可选的。如果省略,可以通过在表上指定 `schema=` 或使用 `USE` 语句来指定数据库。 + +```python +from sqlalchemy import create_engine + +engine = create_engine("iotdb://root:root@127.0.0.1:6667") +``` + +#### 元数据映射 + +| SQLAlchemy | IoTDB | +|------------|--------| +| Schema | 数据库 | +| Table | 表 | +| Column | 列 | + +#### 列类别 + +IoTDB 表模型的列具有类别属性,需要通过 `iotdb_category` 方言选项指定: + +| 类别 | 描述 | +|-------------|---------------------------------------------| +| `TIME` | 时间戳列(未指定时自动生成) | +| `TAG` | 标识/索引列(如区域、设备 ID) | +| `ATTRIBUTE` | 描述性列(如型号、固件版本) | +| `FIELD` | 度量/指标列(如温度、湿度) | + +#### 数据类型映射 + +| IoTDB | SQLAlchemy | +|-----------|--------------| +| BOOLEAN | Boolean | +| INT32 | Integer | +| INT64 | BigInteger | +| FLOAT | Float | +| DOUBLE | Float | +| STRING | String | +| TEXT | Text | +| BLOB | LargeBinary | +| TIMESTAMP | DateTime | +| DATE | Date | + +#### DDL — 创建表 + +在每个列上使用 `iotdb_category`,在表上使用 `iotdb_ttl`: + +```python +from sqlalchemy import Table, Column, Float, String, Boolean, MetaData + +metadata = MetaData() +sensors = Table( + "sensors", + metadata, + Column("region", String, iotdb_category="TAG"), + Column("device_id", String, iotdb_category="TAG"), + Column("model", String, iotdb_category="ATTRIBUTE"), + Column("temperature", Float, iotdb_category="FIELD"), + Column("humidity", Float, iotdb_category="FIELD"), + Column("status", Boolean, iotdb_category="FIELD"), + schema="my_database", + iotdb_ttl=86400000, # TTL 单位为毫秒(1 天) +) + +metadata.create_all(engine) +``` + +如需显式定义 TIME 列而非使用自动生成的时间列: + +```python +from sqlalchemy import BigInteger + +events = Table( + "events", + metadata, + Column("ts", BigInteger, iotdb_category="TIME"), + Column("device_id", String, iotdb_category="TAG"), + Column("value", Float, iotdb_category="FIELD"), + schema="my_database", +) +``` + +#### DML — 插入、查询、删除 + +```python +with engine.connect() as conn: + # 插入 + conn.execute( + sensors.insert().values( + region="asia", device_id="d001", temperature=25.5, humidity=60.0, status=True, + ) + ) + + # 查询全部 + result = conn.execute(sensors.select()) + for row in result: + print(row) + + # 带 WHERE、ORDER BY、LIMIT 的查询 + result = conn.execute( + sensors.select() + .where(sensors.c.region == "asia") + .order_by(sensors.c.temperature) + .limit(10) + ) + + # 删除 + conn.execute(sensors.delete().where(sensors.c.device_id == "d001")) +``` + +#### Schema 反射 + +```python +from sqlalchemy import inspect + +insp = inspect(engine) + +# 列出所有数据库 +schemas = insp.get_schema_names() + +# 列出数据库中的表 +tables = insp.get_table_names(schema="my_database") + +# 获取列详情 +columns = insp.get_columns(table_name="sensors", schema="my_database") +for col in columns: + print(col["name"], col["type"], col.get("iotdb_category")) +``` + +#### 原生 SQL + +```python +from sqlalchemy.sql import text + +with engine.connect() as conn: + result = conn.execute(text("SELECT * FROM my_database.sensors")) + for row in result: + print(row) +``` + + +## 开发者指南 + +### 简介 + +这是一个使用 Thrift RPC 接口连接 IoTDB 的 Python 示例。Windows 和 Linux 上的操作基本相同,但请注意路径分隔符等差异。 + + + +### 前提条件 + +推荐使用 Python 3.6 或更高版本。 + +需要安装 Thrift(0.14.1 或更高版本)来将 Thrift 文件编译为 Python 代码。以下是官方安装教程,最终你需要有一个 thrift 可执行文件。 + +``` +http://thrift.apache.org/docs/install/ +``` + +开始之前,需要在 Python 环境中安装 `requirements_dev.txt`,例如: +```shell +pip install -r requirements_dev.txt +``` + + + +### 编译 Thrift 库和调试 + +在 IoTDB 源代码根目录下,运行 `mvn clean generate-sources -pl client-py -am`。 + +这将自动删除并重新生成 `iotdb/thrift` 目录中的 Thrift 文件。 +该目录已被 git 忽略,**不应推送到 git!** + +**注意** 不要将 `iotdb/thrift` 上传到 git 仓库。 + + + + +### Session 客户端和示例 + +我们将 Thrift 接口封装在 `client-py/src/iotdb/Session.py` 中(类似于 Java 版本),并提供了示例文件 `client-py/src/SessionExample.py`,展示如何使用 Session 模块,请仔细阅读。 + + +或者,一个简单的示例: + +```python +from iotdb.Session import Session + +ip = "127.0.0.1" +port_ = "6667" +username_ = "root" +password_ = "root" +session = Session(ip, port_, username_, password_) +session.open(False) +zone = session.get_time_zone() +session.close() +``` + + + +### 测试 + +请将自定义测试添加到 `tests` 目录中。 + +在根目录下运行 `pytest .` 即可执行所有已定义的测试。 + +**注意** 部分测试需要系统上运行 Docker,因为会使用 [testcontainers](https://testcontainers-python.readthedocs.io/en/latest/index.html) 在 Docker 容器中启动测试实例。 + + + +### 其他工具 + +[black](https://pypi.org/project/black/) 和 [flake8](https://pypi.org/project/flake8/) 已安装,分别用于自动格式化和代码检查。 +可以分别通过 `black .` 或 `flake8 .` 运行。 + + + +## 发布 + +发布前,请确保拥有正确的 Thrift 生成文件。 +然后运行代码检查和自动格式化。 +确保所有测试通过(通过 `pytest .`)。 +然后即可进行发布! + + + +### 准备环境 + +首先,通过 `pip install -r requirements_dev.txt` 安装所有必要的开发依赖。 + + + +### 执行发布 + +提供了一个便捷脚本 `release.sh` 来执行发布的所有步骤,包括: + +* 删除上次发布的临时目录(如果存在) +* 通过 mvn(重新)生成所有源文件 +* 通过 pytest 运行测试(可选) +* 构建 +* 发布到 PyPI diff --git a/iotdb-client/client-py/sqlalchemy_example.py b/iotdb-client/client-py/sqlalchemy_example.py new file mode 100644 index 0000000000000..21800a125ae00 --- /dev/null +++ b/iotdb-client/client-py/sqlalchemy_example.py @@ -0,0 +1,243 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +IoTDB SQLAlchemy Dialect Example + +This example demonstrates how to use the IoTDB SQLAlchemy dialect with IoTDB's +table model (IoTDB 2.0+). It covers: + 1. Creating an engine and connecting to IoTDB + 2. DDL: Creating tables with IoTDB-specific column categories and TTL + 3. DML: Inserting, querying, and deleting data + 4. Schema reflection via inspect() + 5. Raw SQL execution via text() + +Prerequisites: + - IoTDB 2.0+ server running on localhost:6667 + - pip install apache-iotdb sqlalchemy +""" + +from sqlalchemy import ( + create_engine, + inspect, + Column, + Float, + Integer, + BigInteger, + String, + Boolean, + Table, + MetaData, +) +from sqlalchemy.sql import text + +DATABASE = "sqlalchemy_example_db" + +# --------------------------------------------------------------- +# 1. Create an Engine +# --------------------------------------------------------------- +# Connection URL format: iotdb://username:password@host:port/database +# The database part is optional; you can also set it later with USE. +engine = create_engine("iotdb://root:root@127.0.0.1:6667") +print("Engine created successfully.") + +# --------------------------------------------------------------- +# 2. Create a Database +# --------------------------------------------------------------- +with engine.connect() as conn: + conn.execute(text("CREATE DATABASE IF NOT EXISTS %s" % DATABASE)) + print("Database '%s' created." % DATABASE) + +# --------------------------------------------------------------- +# 3. DDL — Define and Create a Table +# --------------------------------------------------------------- +# IoTDB table model columns have categories: +# TIME — the timestamp column (auto-generated if not specified) +# TAG — identifier/indexing columns (e.g., region, device id) +# ATTRIBUTE — descriptive columns (e.g., model, firmware version) +# FIELD — measurement/metric columns (e.g., temperature, humidity) +# +# Use iotdb_category on each Column and iotdb_ttl on the Table. +metadata = MetaData() + +sensors = Table( + "sensors", + metadata, + Column("region", String, iotdb_category="TAG"), + Column("device_id", String, iotdb_category="TAG"), + Column("model", String, iotdb_category="ATTRIBUTE"), + Column("temperature", Float, iotdb_category="FIELD"), + Column("humidity", Float, iotdb_category="FIELD"), + Column("status", Boolean, iotdb_category="FIELD"), + schema=DATABASE, + iotdb_ttl=86400000, # TTL in milliseconds (1 day) +) + +metadata.create_all(engine) +print("Table 'sensors' created with TTL=86400000ms.") + +# --------------------------------------------------------------- +# 4. DML — Insert Data +# --------------------------------------------------------------- +with engine.connect() as conn: + conn.execute( + sensors.insert().values( + region="asia", + device_id="d001", + model="ModelA", + temperature=25.5, + humidity=60.0, + status=True, + ) + ) + conn.execute( + sensors.insert().values( + region="asia", + device_id="d002", + model="ModelB", + temperature=27.3, + humidity=55.0, + status=True, + ) + ) + conn.execute( + sensors.insert().values( + region="europe", + device_id="d003", + model="ModelA", + temperature=18.1, + humidity=75.0, + status=False, + ) + ) + print("Inserted 3 rows into 'sensors'.") + +# --------------------------------------------------------------- +# 5. DML — Query Data +# --------------------------------------------------------------- +with engine.connect() as conn: + # SELECT all rows + print("\n--- All rows ---") + result = conn.execute(sensors.select()) + for row in result: + print(row) + + # SELECT with WHERE + print("\n--- Rows where region='asia' ---") + result = conn.execute(sensors.select().where(sensors.c.region == "asia")) + for row in result: + print(row) + + # SELECT with ORDER BY and LIMIT + print("\n--- Top 2 by temperature (ascending) ---") + result = conn.execute(sensors.select().order_by(sensors.c.temperature).limit(2)) + for row in result: + print(row) + +# --------------------------------------------------------------- +# 6. DML — Delete Data +# --------------------------------------------------------------- +with engine.connect() as conn: + conn.execute(sensors.delete().where(sensors.c.device_id == "d003")) + print("\nDeleted rows where device_id='d003'.") + + remaining = conn.execute(sensors.select()).fetchall() + print("Remaining rows: %d" % len(remaining)) + +# --------------------------------------------------------------- +# 7. Schema Reflection via inspect() +# --------------------------------------------------------------- +insp = inspect(engine) + +schemas = insp.get_schema_names() +print("\n--- Databases (schemas) ---") +print(schemas) + +tables = insp.get_table_names(schema=DATABASE) +print("\n--- Tables in '%s' ---" % DATABASE) +print(tables) + +columns = insp.get_columns(table_name="sensors", schema=DATABASE) +print("\n--- Columns of 'sensors' ---") +for col in columns: + print( + " %s: type=%s, category=%s" + % ( + col["name"], + col["type"], + col.get("iotdb_category", "N/A"), + ) + ) + +# --------------------------------------------------------------- +# 8. Raw SQL via text() +# --------------------------------------------------------------- +with engine.connect() as conn: + print("\n--- Raw SQL query ---") + result = conn.execute(text("SELECT * FROM %s.sensors" % DATABASE)) + for row in result: + print(row) + +# --------------------------------------------------------------- +# 9. Explicit TIME Column +# --------------------------------------------------------------- +# By default, IoTDB auto-generates the TIME column. You can also +# define it explicitly to control the timestamp values. +metadata2 = MetaData() + +events = Table( + "events", + metadata2, + Column("ts", BigInteger, iotdb_category="TIME"), + Column("device_id", String, iotdb_category="TAG"), + Column("event_type", String, iotdb_category="ATTRIBUTE"), + Column("value", Float, iotdb_category="FIELD"), + schema=DATABASE, +) + +metadata2.create_all(engine) +print("\nTable 'events' created with explicit TIME column.") + +with engine.connect() as conn: + conn.execute( + events.insert().values( + ts=1000000, device_id="d001", event_type="alert", value=99.9 + ) + ) + conn.execute( + events.insert().values( + ts=2000000, device_id="d002", event_type="info", value=42.0 + ) + ) + + print("\n--- Events ---") + result = conn.execute(events.select()) + for row in result: + print(row) + +# --------------------------------------------------------------- +# 10. Cleanup +# --------------------------------------------------------------- +sensors.drop(engine) +events.drop(engine) +with engine.connect() as conn: + conn.execute(text("DROP DATABASE IF EXISTS %s" % DATABASE)) +print("\nCleanup done. Database '%s' dropped." % DATABASE) + +engine.dispose() +print("Example finished.") diff --git a/iotdb-client/client-py/tests/integration/sqlalchemy/test_dialect.py b/iotdb-client/client-py/tests/integration/sqlalchemy/test_dialect.py index c5bb2fa30290f..185e2f5b73a24 100644 --- a/iotdb-client/client-py/tests/integration/sqlalchemy/test_dialect.py +++ b/iotdb-client/client-py/tests/integration/sqlalchemy/test_dialect.py @@ -25,16 +25,32 @@ BigInteger, String, Boolean, + Text, + LargeBinary, + DateTime, + Date, Table, MetaData, + func, + and_, + or_, ) from sqlalchemy.dialects import registry from sqlalchemy.sql import text from tests.integration.iotdb_container import IoTDBContainer from urllib.parse import quote_plus as urlquote - TEST_DB = "test_sqlalchemy" +TEST_DB2 = "test_sqlalchemy2" + + +def _make_engine(host, port, password, database=None): + if database: + url = f"iotdb://root:{password}@{host}:{port}/{database}" + else: + url = f"iotdb://root:{password}@{host}:{port}" + registry.register("iotdb", "iotdb.sqlalchemy.IoTDBDialect", "IoTDBDialect") + return create_engine(url) def test_table_model_dialect(): @@ -44,9 +60,7 @@ def test_table_model_dialect(): password = urlquote("TimechoDB@2021") host = db.get_container_host_ip() port = db.get_exposed_port(6667) - url = f"iotdb://root:{password}@{host}:{port}/{TEST_DB}" - registry.register("iotdb", "iotdb.sqlalchemy.IoTDBDialect", "IoTDBDialect") - engine = create_engine(url) + engine = _make_engine(host, port, password, TEST_DB) with engine.connect() as conn: conn.execute(text("CREATE DATABASE %s" % TEST_DB)) @@ -63,6 +77,125 @@ def test_table_model_dialect(): print("All table model dialect tests passed!") +def test_all_data_types(): + + with IoTDBContainer("iotdb:dev") as db: + db: IoTDBContainer + password = urlquote("TimechoDB@2021") + host = db.get_container_host_ip() + port = db.get_exposed_port(6667) + engine = _make_engine(host, port, password, TEST_DB) + + with engine.connect() as conn: + conn.execute(text("CREATE DATABASE IF NOT EXISTS %s" % TEST_DB)) + + _test_all_data_types(engine) + + with engine.connect() as conn: + conn.execute(text("DROP DATABASE %s" % TEST_DB)) + + engine.dispose() + print("All data type tests passed!") + + +def test_advanced_queries(): + + with IoTDBContainer("iotdb:dev") as db: + db: IoTDBContainer + password = urlquote("TimechoDB@2021") + host = db.get_container_host_ip() + port = db.get_exposed_port(6667) + engine = _make_engine(host, port, password, TEST_DB) + + with engine.connect() as conn: + conn.execute(text("CREATE DATABASE IF NOT EXISTS %s" % TEST_DB)) + + _test_select_specific_columns(engine) + _test_multiple_where_conditions(engine) + _test_aggregation(engine) + _test_batch_insert(engine) + _test_null_values(engine) + + with engine.connect() as conn: + conn.execute(text("DROP DATABASE %s" % TEST_DB)) + + engine.dispose() + print("All advanced query tests passed!") + + +def test_schema_operations(): + + with IoTDBContainer("iotdb:dev") as db: + db: IoTDBContainer + password = urlquote("TimechoDB@2021") + host = db.get_container_host_ip() + port = db.get_exposed_port(6667) + engine = _make_engine(host, port, password, TEST_DB) + + with engine.connect() as conn: + conn.execute(text("CREATE DATABASE IF NOT EXISTS %s" % TEST_DB)) + conn.execute(text("CREATE DATABASE IF NOT EXISTS %s" % TEST_DB2)) + + _test_has_schema_and_table(engine) + _test_get_view_names(engine) + _test_multiple_databases(engine) + _test_column_category_reflection(engine) + _test_multiple_tables(engine) + _test_table_without_ttl(engine) + + with engine.connect() as conn: + conn.execute(text("DROP DATABASE IF EXISTS %s" % TEST_DB)) + conn.execute(text("DROP DATABASE IF EXISTS %s" % TEST_DB2)) + + engine.dispose() + print("All schema operation tests passed!") + + +def test_raw_sql(): + + with IoTDBContainer("iotdb:dev") as db: + db: IoTDBContainer + password = urlquote("TimechoDB@2021") + host = db.get_container_host_ip() + port = db.get_exposed_port(6667) + engine = _make_engine(host, port, password, TEST_DB) + + with engine.connect() as conn: + conn.execute(text("CREATE DATABASE IF NOT EXISTS %s" % TEST_DB)) + + _test_raw_sql(engine) + + with engine.connect() as conn: + conn.execute(text("DROP DATABASE %s" % TEST_DB)) + + engine.dispose() + print("All raw SQL tests passed!") + + +def test_url_with_database(): + + with IoTDBContainer("iotdb:dev") as db: + db: IoTDBContainer + password = urlquote("TimechoDB@2021") + host = db.get_container_host_ip() + port = db.get_exposed_port(6667) + + engine_with_db = _make_engine(host, port, password, TEST_DB) + engine_without_db = _make_engine(host, port, password) + + with engine_without_db.connect() as conn: + conn.execute(text("CREATE DATABASE IF NOT EXISTS %s" % TEST_DB)) + + _test_url_with_database(engine_with_db, engine_without_db) + + with engine_without_db.connect() as conn: + conn.execute(text("DROP DATABASE %s" % TEST_DB)) + + engine_with_db.dispose() + engine_without_db.dispose() + print("URL with database tests passed!") + + def _test_ddl(engine): metadata = MetaData() @@ -135,8 +268,8 @@ def _test_dml(engine): result = conn.execute( sensors.select().where(sensors.c.region == "asia") ).fetchall() - assert len(result) == 1, ( - "SELECT WHERE failed: expected 1 row, got %d" % len(result) + assert len(result) == 1, "SELECT WHERE failed: expected 1 row, got %d" % len( + result ) result = conn.execute( @@ -144,13 +277,11 @@ def _test_dml(engine): ).fetchall() assert len(result) == 1, "LIMIT failed: expected 1 row, got %d" % len(result) - conn.execute( - sensors.delete().where(sensors.c.device_id == "d002") - ) + conn.execute(sensors.delete().where(sensors.c.device_id == "d002")) result = conn.execute(sensors.select()).fetchall() - assert len(result) == 1, ( - "DELETE failed: expected 1 row after delete, got %d" % len(result) - ) + assert ( + len(result) == 1 + ), "DELETE failed: expected 1 row after delete, got %d" % len(result) sensors.drop(engine) print(" DML tests passed") @@ -213,13 +344,11 @@ def _test_time_column(engine): metadata.create_all(engine) with engine.connect() as conn: - conn.execute( - table_implicit.insert().values(device="d001", value=42.0) - ) + conn.execute(table_implicit.insert().values(device="d001", value=42.0)) result = conn.execute(table_implicit.select()).fetchall() - assert len(result) == 1, ( - "Implicit TIME insert failed: expected 1 row, got %d" % len(result) - ) + assert ( + len(result) == 1 + ), "Implicit TIME insert failed: expected 1 row, got %d" % len(result) table_implicit.drop(engine) @@ -239,9 +368,9 @@ def _test_time_column(engine): table_explicit.insert().values(ts=1000000, device="d001", value=42.0) ) result = conn.execute(table_explicit.select()).fetchall() - assert len(result) == 1, ( - "Explicit TIME insert failed: expected 1 row, got %d" % len(result) - ) + assert ( + len(result) == 1 + ), "Explicit TIME insert failed: expected 1 row, got %d" % len(result) table_explicit.drop(engine) print(" TIME column tests passed") @@ -249,3 +378,613 @@ def _test_time_column(engine): if __name__ == "__main__": test_table_model_dialect() + + +# --------------------------------------------------------------- +# All data types test +# --------------------------------------------------------------- +def _test_all_data_types(engine): + metadata = MetaData() + + all_types = Table( + "all_types", + metadata, + Column("device", String, iotdb_category="TAG"), + Column("col_boolean", Boolean, iotdb_category="FIELD"), + Column("col_int32", Integer, iotdb_category="FIELD"), + Column("col_int64", BigInteger, iotdb_category="FIELD"), + Column("col_float", Float, iotdb_category="FIELD"), + Column("col_double", Float(precision=53), iotdb_category="FIELD"), + Column("col_string", String, iotdb_category="FIELD"), + Column("col_text", Text, iotdb_category="FIELD"), + Column("col_date", Date, iotdb_category="FIELD"), + Column("col_timestamp", DateTime, iotdb_category="FIELD"), + schema=TEST_DB, + ) + metadata.create_all(engine) + + with engine.connect() as conn: + conn.execute( + all_types.insert().values( + device="d001", + col_boolean=True, + col_int32=42, + col_int64=9999999999, + col_float=3.14, + col_double=2.718281828459045, + col_string="hello", + col_text="world", + col_date="2024-01-15", + col_timestamp=1700000000000, + ) + ) + + result = conn.execute(all_types.select()).fetchall() + assert ( + len(result) == 1 + ), "All types insert failed: expected 1 row, got %d" % len(result) + + insp = inspect(engine) + columns = insp.get_columns(table_name="all_types", schema=TEST_DB) + col_names = [c["name"] for c in columns] + for expected in [ + "device", + "col_boolean", + "col_int32", + "col_int64", + "col_float", + "col_double", + "col_string", + "col_text", + "col_date", + "col_timestamp", + ]: + assert expected in col_names, "%s not in columns: %s" % (expected, col_names) + + all_types.drop(engine) + print(" All data types tests passed") + + +# --------------------------------------------------------------- +# Advanced query tests +# --------------------------------------------------------------- +def _create_sensor_table(engine, table_name): + metadata = MetaData() + table = Table( + table_name, + metadata, + Column("region", String, iotdb_category="TAG"), + Column("device_id", String, iotdb_category="TAG"), + Column("model", String, iotdb_category="ATTRIBUTE"), + Column("temperature", Float, iotdb_category="FIELD"), + Column("humidity", Float, iotdb_category="FIELD"), + Column("status", Boolean, iotdb_category="FIELD"), + schema=TEST_DB, + ) + metadata.create_all(engine) + return table + + +def _insert_sample_data(engine, table): + with engine.connect() as conn: + rows = [ + ("asia", "d001", "ModelA", 25.5, 60.0, True), + ("asia", "d002", "ModelB", 27.3, 55.0, True), + ("europe", "d003", "ModelA", 18.1, 75.0, False), + ("europe", "d004", "ModelC", 22.0, 65.0, True), + ("na", "d005", "ModelB", 30.2, 45.0, False), + ] + for region, device_id, model, temp, hum, status in rows: + conn.execute( + table.insert().values( + region=region, + device_id=device_id, + model=model, + temperature=temp, + humidity=hum, + status=status, + ) + ) + + +def _test_select_specific_columns(engine): + table = _create_sensor_table(engine, "select_cols") + _insert_sample_data(engine, table) + + with engine.connect() as conn: + result = conn.execute( + table.select().with_only_columns(table.c.region, table.c.temperature) + ).fetchall() + assert len(result) == 5, "Select columns failed: expected 5 rows, got %d" % len( + result + ) + + result = conn.execute( + table.select().with_only_columns(table.c.device_id) + ).fetchall() + assert ( + len(result) == 5 + ), "Select single column failed: expected 5 rows, got %d" % len(result) + + table.drop(engine) + print(" Select specific columns tests passed") + + +def _test_multiple_where_conditions(engine): + table = _create_sensor_table(engine, "where_cond") + _insert_sample_data(engine, table) + + with engine.connect() as conn: + # AND condition + result = conn.execute( + table.select().where( + and_(table.c.region == "asia", table.c.temperature > 26.0) + ) + ).fetchall() + assert len(result) == 1, "AND condition failed: expected 1 row, got %d" % len( + result + ) + + # OR condition + result = conn.execute( + table.select().where(or_(table.c.region == "asia", table.c.region == "na")) + ).fetchall() + assert len(result) == 3, "OR condition failed: expected 3 rows, got %d" % len( + result + ) + + # Range condition + result = conn.execute( + table.select().where( + and_(table.c.temperature >= 20.0, table.c.temperature <= 28.0) + ) + ).fetchall() + assert ( + len(result) == 3 + ), "Range condition failed: expected 3 rows, got %d" % len(result) + + # NOT EQUAL + result = conn.execute(table.select().where(table.c.region != "asia")).fetchall() + assert ( + len(result) == 3 + ), "NOT EQUAL condition failed: expected 3 rows, got %d" % len(result) + + table.drop(engine) + print(" Multiple WHERE conditions tests passed") + + +def _test_aggregation(engine): + table = _create_sensor_table(engine, "agg_test") + _insert_sample_data(engine, table) + + with engine.connect() as conn: + # COUNT + result = conn.execute(table.select().with_only_columns(func.count())).scalar() + assert result == 5, "COUNT failed: expected 5, got %s" % result + + # AVG + result = conn.execute( + table.select().with_only_columns(func.avg(table.c.temperature)) + ).scalar() + assert result is not None, "AVG returned None" + assert abs(result - 24.62) < 0.1, "AVG failed: expected ~24.62, got %s" % result + + # SUM + result = conn.execute( + table.select().with_only_columns(func.sum(table.c.humidity)) + ).scalar() + assert result is not None, "SUM returned None" + assert abs(result - 300.0) < 0.1, "SUM failed: expected 300.0, got %s" % result + + # MIN / MAX + min_val = conn.execute( + table.select().with_only_columns(func.min(table.c.temperature)) + ).scalar() + max_val = conn.execute( + table.select().with_only_columns(func.max(table.c.temperature)) + ).scalar() + assert abs(min_val - 18.1) < 0.1, "MIN failed: expected 18.1, got %s" % min_val + assert abs(max_val - 30.2) < 0.1, "MAX failed: expected 30.2, got %s" % max_val + + table.drop(engine) + print(" Aggregation tests passed") + + +def _test_batch_insert(engine): + metadata = MetaData() + table = Table( + "batch_test", + metadata, + Column("device_id", String, iotdb_category="TAG"), + Column("value", Float, iotdb_category="FIELD"), + schema=TEST_DB, + ) + metadata.create_all(engine) + + with engine.connect() as conn: + for i in range(20): + conn.execute(table.insert().values(device_id="d%03d" % i, value=float(i))) + + result = conn.execute(table.select()).fetchall() + assert len(result) == 20, "Batch insert failed: expected 20 rows, got %d" % len( + result + ) + + result = conn.execute( + table.select().order_by(table.c.value).limit(5) + ).fetchall() + assert len(result) == 5, "Batch limit failed: expected 5 rows, got %d" % len( + result + ) + + result = conn.execute( + table.select().order_by(table.c.value).limit(10).offset(5) + ).fetchall() + assert len(result) == 10, "Batch offset failed: expected 10 rows, got %d" % len( + result + ) + + table.drop(engine) + print(" Batch insert tests passed") + + +def _test_null_values(engine): + metadata = MetaData() + table = Table( + "null_test", + metadata, + Column("device_id", String, iotdb_category="TAG"), + Column("temperature", Float, iotdb_category="FIELD"), + Column("humidity", Float, iotdb_category="FIELD"), + schema=TEST_DB, + ) + metadata.create_all(engine) + + with engine.connect() as conn: + conn.execute( + table.insert().values(device_id="d001", temperature=25.5, humidity=60.0) + ) + conn.execute(table.insert().values(device_id="d002", temperature=18.0)) + conn.execute(table.insert().values(device_id="d003", humidity=70.0)) + + result = conn.execute(table.select()).fetchall() + assert len(result) == 3, "Null insert failed: expected 3 rows, got %d" % len( + result + ) + + table.drop(engine) + print(" NULL value tests passed") + + +# --------------------------------------------------------------- +# Schema operation tests +# --------------------------------------------------------------- +def _test_has_schema_and_table(engine): + insp = inspect(engine) + dialect = engine.dialect + + with engine.connect() as conn: + assert dialect.has_schema(conn, TEST_DB), ( + "has_schema('%s') returned False" % TEST_DB + ) + assert not dialect.has_schema( + conn, "nonexistent_db_xyz" + ), "has_schema('nonexistent_db_xyz') returned True" + + metadata = MetaData() + t = Table( + "has_check", + metadata, + Column("device", String, iotdb_category="TAG"), + Column("value", Float, iotdb_category="FIELD"), + schema=TEST_DB, + ) + metadata.create_all(engine) + + with engine.connect() as conn: + assert dialect.has_table( + conn, "has_check", schema=TEST_DB + ), "has_table('has_check') returned False" + assert not dialect.has_table( + conn, "nonexistent_table_xyz", schema=TEST_DB + ), "has_table('nonexistent_table_xyz') returned True" + + t.drop(engine) + print(" has_schema/has_table tests passed") + + +def _test_get_view_names(engine): + insp = inspect(engine) + views = insp.get_view_names(schema=TEST_DB) + assert views == [], "Expected empty view list, got %s" % views + print(" get_view_names tests passed") + + +def _test_multiple_databases(engine): + insp = inspect(engine) + schemas = insp.get_schema_names() + assert TEST_DB in schemas, "%s not in schemas: %s" % (TEST_DB, schemas) + assert TEST_DB2 in schemas, "%s not in schemas: %s" % (TEST_DB2, schemas) + + metadata1 = MetaData() + t1 = Table( + "table_in_db1", + metadata1, + Column("device", String, iotdb_category="TAG"), + Column("value", Float, iotdb_category="FIELD"), + schema=TEST_DB, + ) + metadata1.create_all(engine) + + metadata2 = MetaData() + t2 = Table( + "table_in_db2", + metadata2, + Column("device", String, iotdb_category="TAG"), + Column("value", Float, iotdb_category="FIELD"), + schema=TEST_DB2, + ) + metadata2.create_all(engine) + + tables1 = insp.get_table_names(schema=TEST_DB) + tables2 = insp.get_table_names(schema=TEST_DB2) + assert "table_in_db1" in tables1, "table_in_db1 not in %s tables: %s" % ( + TEST_DB, + tables1, + ) + assert "table_in_db2" in tables2, "table_in_db2 not in %s tables: %s" % ( + TEST_DB2, + tables2, + ) + assert "table_in_db2" not in tables1, ( + "table_in_db2 should not be in %s tables" % TEST_DB + ) + + with engine.connect() as conn: + conn.execute(t1.insert().values(device="d1", value=1.0)) + conn.execute(t2.insert().values(device="d2", value=2.0)) + + r1 = conn.execute(t1.select()).fetchall() + r2 = conn.execute(t2.select()).fetchall() + assert ( + len(r1) == 1 and len(r2) == 1 + ), "Cross-db insert failed: db1=%d, db2=%d" % (len(r1), len(r2)) + + t1.drop(engine) + t2.drop(engine) + print(" Multiple databases tests passed") + + +def _test_column_category_reflection(engine): + with engine.connect() as conn: + conn.execute(text("USE %s" % TEST_DB)) + conn.execute( + text( + "CREATE TABLE cat_reflect (" + "region STRING TAG, " + "device STRING TAG, " + "model STRING ATTRIBUTE, " + "temperature FLOAT FIELD, " + "humidity DOUBLE FIELD" + ")" + ) + ) + + insp = inspect(engine) + columns = insp.get_columns(table_name="cat_reflect", schema=TEST_DB) + + category_map = {} + for col in columns: + if "iotdb_category" in col: + category_map[col["name"]] = col["iotdb_category"] + + assert category_map.get("time") == "TIME" or "time" in [ + c["name"] for c in columns + ], "TIME column not found" + + if "region" in category_map: + assert category_map["region"] == "TAG", ( + "Expected TAG for region, got %s" % category_map["region"] + ) + if "model" in category_map: + assert category_map["model"] == "ATTRIBUTE", ( + "Expected ATTRIBUTE for model, got %s" % category_map["model"] + ) + if "temperature" in category_map: + assert category_map["temperature"] == "FIELD", ( + "Expected FIELD for temperature, got %s" % category_map["temperature"] + ) + + with engine.connect() as conn: + conn.execute(text("USE %s" % TEST_DB)) + conn.execute(text("DROP TABLE cat_reflect")) + print(" Column category reflection tests passed") + + +def _test_multiple_tables(engine): + metadata = MetaData() + + t1 = Table( + "multi_a", + metadata, + Column("device", String, iotdb_category="TAG"), + Column("value", Float, iotdb_category="FIELD"), + schema=TEST_DB, + ) + t2 = Table( + "multi_b", + metadata, + Column("region", String, iotdb_category="TAG"), + Column("score", Integer, iotdb_category="FIELD"), + schema=TEST_DB, + ) + t3 = Table( + "multi_c", + metadata, + Column("name", String, iotdb_category="TAG"), + Column("active", Boolean, iotdb_category="FIELD"), + schema=TEST_DB, + ) + metadata.create_all(engine) + + insp = inspect(engine) + tables = insp.get_table_names(schema=TEST_DB) + for name in ["multi_a", "multi_b", "multi_c"]: + assert name in tables, "%s not in tables: %s" % (name, tables) + + with engine.connect() as conn: + conn.execute(t1.insert().values(device="d1", value=1.0)) + conn.execute(t2.insert().values(region="asia", score=100)) + conn.execute(t3.insert().values(name="sensor1", active=True)) + + assert len(conn.execute(t1.select()).fetchall()) == 1 + assert len(conn.execute(t2.select()).fetchall()) == 1 + assert len(conn.execute(t3.select()).fetchall()) == 1 + + t1.drop(engine) + t2.drop(engine) + t3.drop(engine) + + tables = insp.get_table_names(schema=TEST_DB) + for name in ["multi_a", "multi_b", "multi_c"]: + assert name not in tables, "%s still in tables after drop" % name + + print(" Multiple tables tests passed") + + +def _test_table_without_ttl(engine): + metadata = MetaData() + table = Table( + "no_ttl", + metadata, + Column("device", String, iotdb_category="TAG"), + Column("value", Float, iotdb_category="FIELD"), + schema=TEST_DB, + ) + metadata.create_all(engine) + + insp = inspect(engine) + tables = insp.get_table_names(schema=TEST_DB) + assert "no_ttl" in tables, "no_ttl not in tables: %s" % tables + + with engine.connect() as conn: + conn.execute(table.insert().values(device="d1", value=42.0)) + result = conn.execute(table.select()).fetchall() + assert ( + len(result) == 1 + ), "Table without TTL failed: expected 1 row, got %d" % len(result) + + table.drop(engine) + print(" Table without TTL tests passed") + + +# --------------------------------------------------------------- +# Raw SQL tests +# --------------------------------------------------------------- +def _test_raw_sql(engine): + with engine.connect() as conn: + conn.execute(text("USE %s" % TEST_DB)) + conn.execute( + text( + "CREATE TABLE raw_sql_test (" + "region STRING TAG, " + "device STRING TAG, " + "temperature FLOAT FIELD, " + "humidity DOUBLE FIELD" + ")" + ) + ) + + conn.execute( + text( + "INSERT INTO raw_sql_test (region, device, temperature, humidity) " + "VALUES ('asia', 'd001', 25.5, 60.0)" + ) + ) + conn.execute( + text( + "INSERT INTO raw_sql_test (region, device, temperature, humidity) " + "VALUES ('europe', 'd002', 18.1, 75.0)" + ) + ) + conn.execute( + text( + "INSERT INTO raw_sql_test (region, device, temperature, humidity) " + "VALUES ('na', 'd003', 30.2, 45.0)" + ) + ) + + result = conn.execute( + text("SELECT * FROM %s.raw_sql_test" % TEST_DB) + ).fetchall() + assert len(result) == 3, "Raw SQL select all failed: expected 3, got %d" % len( + result + ) + + result = conn.execute( + text("SELECT * FROM %s.raw_sql_test WHERE region = 'asia'" % TEST_DB) + ).fetchall() + assert len(result) == 1, "Raw SQL where failed: expected 1, got %d" % len( + result + ) + + result = conn.execute( + text("SELECT * FROM %s.raw_sql_test ORDER BY temperature LIMIT 2" % TEST_DB) + ).fetchall() + assert len(result) == 2, "Raw SQL order+limit failed: expected 2, got %d" % len( + result + ) + + result = conn.execute( + text("SELECT COUNT(*) FROM %s.raw_sql_test" % TEST_DB) + ).fetchall() + assert result[0][0] == 3, ( + "Raw SQL count failed: expected 3, got %s" % result[0][0] + ) + + conn.execute( + text("DELETE FROM %s.raw_sql_test WHERE device = 'd003'" % TEST_DB) + ) + result = conn.execute( + text("SELECT * FROM %s.raw_sql_test" % TEST_DB) + ).fetchall() + assert len(result) == 2, "Raw SQL delete failed: expected 2, got %d" % len( + result + ) + + conn.execute(text("DROP TABLE raw_sql_test")) + + print(" Raw SQL tests passed") + + +# --------------------------------------------------------------- +# URL with database tests +# --------------------------------------------------------------- +def _test_url_with_database(engine_with_db, engine_without_db): + metadata = MetaData() + table = Table( + "url_db_test", + metadata, + Column("device", String, iotdb_category="TAG"), + Column("value", Float, iotdb_category="FIELD"), + schema=TEST_DB, + ) + metadata.create_all(engine_with_db) + + with engine_with_db.connect() as conn: + conn.execute(table.insert().values(device="d1", value=1.0)) + result = conn.execute(table.select()).fetchall() + assert len(result) == 1, "URL with DB insert failed: expected 1, got %d" % len( + result + ) + + insp_with = inspect(engine_with_db) + insp_without = inspect(engine_without_db) + + tables_with = insp_with.get_table_names(schema=TEST_DB) + tables_without = insp_without.get_table_names(schema=TEST_DB) + assert "url_db_test" in tables_with, "url_db_test not visible with db engine" + assert "url_db_test" in tables_without, "url_db_test not visible without db engine" + + table.drop(engine_with_db) + print(" URL with database tests passed") From ae5bc1b4edfc84f6d7af3fea38af7375bca98972 Mon Sep 17 00:00:00 2001 From: JackieTien97 Date: Wed, 22 Apr 2026 09:34:51 +0800 Subject: [PATCH 3/3] format code --- iotdb-client/client-py/iotdb/dbapi/Cursor.py | 27 ------------------- .../iotdb/sqlalchemy/IoTDBDialect.py | 22 +++++++++------ 2 files changed, 14 insertions(+), 35 deletions(-) diff --git a/iotdb-client/client-py/iotdb/dbapi/Cursor.py b/iotdb-client/client-py/iotdb/dbapi/Cursor.py index 018ade6e99ac2..8897720ccc8db 100644 --- a/iotdb-client/client-py/iotdb/dbapi/Cursor.py +++ b/iotdb-client/client-py/iotdb/dbapi/Cursor.py @@ -110,25 +110,6 @@ def execute(self, operation, parameters=None): else: sql = operation % parameters - time_index = [] - time_names = [] - if self.__sqlalchemy_mode: - sql_seqs = [] - seqs = sql.split("\n") - for seq in seqs: - if seq.find("FROM Time Index") >= 0: - time_index = [ - int(index) - for index in seq.replace("FROM Time Index", "").split() - ] - elif seq.find("FROM Time Name") >= 0: - time_names = [ - name for name in seq.replace("FROM Time Name", "").split() - ] - else: - sql_seqs.append(seq) - sql = "\n".join(sql_seqs) - data_set = self.__session.execute_statement(sql) col_names = None col_types = None @@ -136,14 +117,6 @@ def execute(self, operation, parameters=None): if data_set: data = data_set.todf() - - if self.__sqlalchemy_mode and time_index: - time_column = data.columns[0] - time_column_value = data.Time - del data[time_column] - for i in range(len(time_index)): - data.insert(time_index[i], time_names[i], time_column_value) - col_names = data.columns.tolist() col_types = data_set.get_column_types() rows = data.values.tolist() diff --git a/iotdb-client/client-py/iotdb/sqlalchemy/IoTDBDialect.py b/iotdb-client/client-py/iotdb/sqlalchemy/IoTDBDialect.py index 44f9e860fea69..cc32649529957 100644 --- a/iotdb-client/client-py/iotdb/sqlalchemy/IoTDBDialect.py +++ b/iotdb-client/client-py/iotdb/sqlalchemy/IoTDBDialect.py @@ -37,7 +37,7 @@ "INT32": types.Integer, "INT64": types.BigInteger, "FLOAT": types.Float, - "DOUBLE": types.Float, + "DOUBLE": types.Float(precision=53), "STRING": types.String, "TEXT": types.Text, "BLOB": types.LargeBinary, @@ -86,7 +86,9 @@ def dbapi(cls): def create_connect_args(self, url): opts = url.translate_connect_args() opts.update(url.query) + opts["sqlalchemy_mode"] = True opts["sql_dialect"] = "table" + self._url_database = opts.get("database") return ([], opts) def initialize(self, connection): @@ -96,7 +98,7 @@ def _get_server_version_info(self, connection): return None def _get_default_schema_name(self, connection): - return None + return getattr(self, "_url_database", None) def has_schema(self, connection, schema_name, **kw): return schema_name in self.get_schema_names(connection) @@ -110,16 +112,20 @@ def get_schema_names(self, connection, **kw): def get_table_names(self, connection, schema=None, **kw): if schema: - connection.execute(text("USE %s" % schema)) - cursor = connection.execute(text("SHOW TABLES")) + quoted = self.identifier_preparer.quote_identifier(schema) + cursor = connection.execute(text("SHOW TABLES FROM %s" % quoted)) + else: + cursor = connection.execute(text("SHOW TABLES")) return [row[0] for row in cursor.fetchall()] def get_columns(self, connection, table_name, schema=None, **kw): + quoted_table = self.identifier_preparer.quote_identifier(table_name) if schema: - connection.execute(text("USE %s" % schema)) - cursor = connection.execute( - text("SHOW COLUMNS FROM %s" % table_name) - ) + quoted_schema = self.identifier_preparer.quote_identifier(schema) + qualified = "%s.%s" % (quoted_schema, quoted_table) + else: + qualified = quoted_table + cursor = connection.execute(text("DESC %s" % qualified)) columns = [] for row in cursor.fetchall(): col_name = row[0]