From 07f6659435958d385936f2f014da036f76bd2464 Mon Sep 17 00:00:00 2001 From: charlesdong1991 Date: Sat, 2 May 2026 16:36:50 +0200 Subject: [PATCH 1/4] Add integration tests for array data type --- Cargo.lock | 118 +++++ bindings/python/test/test_kv_table.py | 242 ++++++++- crates/fluss/tests/integration/kv_table.rs | 268 +++++++++- crates/fluss/tests/integration/log_table.rs | 557 ++++++++++++++++++++ 4 files changed, 1175 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ac5b27f8..72a86180 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -746,6 +746,15 @@ dependencies = [ "rustc_version", ] +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -975,6 +984,12 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "endian-type" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" + [[package]] name = "equivalent" version = "1.0.2" @@ -1104,6 +1119,8 @@ dependencies = [ "jiff", "linked-hash-map", "log", + "metrics", + "metrics-util", "opendal", "ordered-float", "parking_lot", @@ -1403,6 +1420,9 @@ name = "hashbrown" version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +dependencies = [ + "foldhash 0.2.0", +] [[package]] name = "heck" @@ -2054,6 +2074,37 @@ dependencies = [ "autocfg", ] +[[package]] +name = "metrics" +version = "0.24.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff56c2e7dce6bd462e3b8919986a617027481b1dcc703175b58cf9dd98a2f071" +dependencies = [ + "portable-atomic", + "rapidhash", +] + +[[package]] +name = "metrics-util" +version = "0.20.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e56997f084e57b045edf17c3ed8ba7f9f779c670df8206dfd1c736f4c02dc4a" +dependencies = [ + "aho-corasick", + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown 0.16.1", + "indexmap 2.13.1", + "metrics", + "ordered-float", + "quanta", + "radix_trie", + "rand 0.9.2", + "rand_xoshiro", + "rapidhash", + "sketches-ddsketch", +] + [[package]] name = "mime" version = "0.3.17" @@ -2077,6 +2128,15 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" +[[package]] +name = "nibble_vec" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43" +dependencies = [ + "smallvec", +] + [[package]] name = "num" version = "0.4.3" @@ -2546,6 +2606,21 @@ dependencies = [ "cc", ] +[[package]] +name = "quanta" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3ab5a9d756f0d97bdc89019bd2e4ea098cf9cde50ee7564dde6b81ccc8f06c7" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi", + "web-sys", + "winapi", +] + [[package]] name = "quick-xml" version = "0.37.5" @@ -2648,6 +2723,16 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" +[[package]] +name = "radix_trie" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd" +dependencies = [ + "endian-type", + "nibble_vec", +] + [[package]] name = "rand" version = "0.8.5" @@ -2709,6 +2794,33 @@ dependencies = [ "getrandom 0.3.4", ] +[[package]] +name = "rand_xoshiro" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f703f4665700daf5512dcca5f43afa6af89f09db47fb56be587f80636bda2d41" +dependencies = [ + "rand_core 0.9.5", +] + +[[package]] +name = "rapidhash" +version = "4.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5e48930979c155e2f33aa36ab3119b5ee81332beb6482199a8ecd6029b80b59" +dependencies = [ + "rustversion", +] + +[[package]] +name = "raw-cpuid" +version = "11.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186" +dependencies = [ + "bitflags", +] + [[package]] name = "redox_syscall" version = "0.5.18" @@ -3214,6 +3326,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e" +[[package]] +name = "sketches-ddsketch" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c6f73aeb92d671e0cc4dca167e59b2deb6387c375391bc99ee743f326994a2b" + [[package]] name = "slab" version = "0.4.12" diff --git a/bindings/python/test/test_kv_table.py b/bindings/python/test/test_kv_table.py index 36aa3e46..7fd85228 100644 --- a/bindings/python/test/test_kv_table.py +++ b/bindings/python/test/test_kv_table.py @@ -30,6 +30,17 @@ import fluss +async def _upsert_and_wait(writer, row): + handle = writer.upsert(row) + await handle.wait() + + +def _assert_float_specials(values): + assert math.isnan(values[0]) + assert math.isinf(values[1]) and values[1] > 0 + assert math.isinf(values[2]) and values[2] < 0 + + async def test_upsert_delete_and_lookup(connection, admin): """Test upsert, lookup, update, delete, and non-existent key lookup.""" table_path = fluss.TablePath("fluss", "py_test_upsert_and_lookup") @@ -335,6 +346,228 @@ async def test_partitioned_table_upsert_and_lookup(connection, admin): await admin.drop_table(table_path, ignore_if_not_exists=False) +async def test_upsert_and_lookup_with_array(connection, admin): + """Test upsert and lookup with array columns in KV tables.""" + table_path = fluss.TablePath("fluss", "py_test_kv_arrays_basic") + await admin.drop_table(table_path, ignore_if_not_exists=True) + + schema = fluss.Schema( + pa.schema( + [ + pa.field("id", pa.int32()), + pa.field("tags", pa.list_(pa.string())), + pa.field("scores", pa.list_(pa.int32())), + ] + ), + primary_keys=["id"], + ) + table_descriptor = fluss.TableDescriptor(schema) + await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) + + table = await connection.get_table(table_path) + upsert_writer = table.new_upsert().create_writer() + + # Row 1: standard arrays + await _upsert_and_wait( + upsert_writer, {"id": 1, "tags": ["hello", "world"], "scores": [10, 20, 30]} + ) + + # Row 2: null element in array + empty array + await _upsert_and_wait(upsert_writer, {"id": 2, "tags": [None], "scores": []}) + + # Row 3: null array column + await _upsert_and_wait(upsert_writer, {"id": 3, "tags": None, "scores": [42]}) + + lookuper = table.new_lookup().create_lookuper() + + result1 = await lookuper.lookup({"id": 1}) + assert result1 is not None + assert result1["tags"] == ["hello", "world"] + assert result1["scores"] == [10, 20, 30] + + result2 = await lookuper.lookup({"id": 2}) + assert result2 is not None + assert result2["tags"] == [None] + assert result2["scores"] == [] + + result3 = await lookuper.lookup({"id": 3}) + assert result3 is not None + assert result3["tags"] is None + assert result3["scores"] == [42] + + await admin.drop_table(table_path, ignore_if_not_exists=False) + + +async def test_upsert_and_lookup_with_nested_array(connection, admin): + """Test upsert and lookup with nested array (ARRAY>) in KV tables.""" + table_path = fluss.TablePath("fluss", "py_test_kv_arrays_nested") + await admin.drop_table(table_path, ignore_if_not_exists=True) + + schema = fluss.Schema( + pa.schema( + [ + pa.field("id", pa.int32()), + pa.field("matrix", pa.list_(pa.list_(pa.int32()))), + ] + ), + primary_keys=["id"], + ) + table_descriptor = fluss.TableDescriptor(schema) + await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) + + table = await connection.get_table(table_path) + upsert_writer = table.new_upsert().create_writer() + + await _upsert_and_wait(upsert_writer, {"id": 1, "matrix": [[1, 2], [3, 4]]}) + + await _upsert_and_wait(upsert_writer, {"id": 2, "matrix": [[], [5], [6, 7, 8]]}) + + await _upsert_and_wait(upsert_writer, {"id": 3, "matrix": None}) + + await _upsert_and_wait(upsert_writer, {"id": 4, "matrix": [[1, None], None, []]}) + + lookuper = table.new_lookup().create_lookuper() + + result1 = await lookuper.lookup({"id": 1}) + assert result1 is not None + assert result1["matrix"] == [[1, 2], [3, 4]] + + result2 = await lookuper.lookup({"id": 2}) + assert result2 is not None + assert result2["matrix"] == [[], [5], [6, 7, 8]] + + result3 = await lookuper.lookup({"id": 3}) + assert result3 is not None + assert result3["matrix"] is None + + result4 = await lookuper.lookup({"id": 4}) + assert result4 is not None + assert result4["matrix"] == [[1, None], None, []] + + await admin.drop_table(table_path, ignore_if_not_exists=False) + + +async def test_upsert_and_lookup_with_array_rich_types(connection, admin): + """Test upsert/lookup for arrays with rich element types in KV tables.""" + table_path = fluss.TablePath("fluss", "py_test_kv_arrays_rich_types") + await admin.drop_table(table_path, ignore_if_not_exists=True) + + schema = fluss.Schema( + pa.schema( + [ + pa.field("id", pa.int32()), + pa.field("arr_bytes", pa.list_(pa.binary())), + pa.field("arr_date", pa.list_(pa.date32())), + pa.field("arr_time", pa.list_(pa.time32("ms"))), + pa.field("arr_ts_ntz", pa.list_(pa.timestamp("us"))), + pa.field("arr_ts_ltz", pa.list_(pa.timestamp("us", tz="UTC"))), + pa.field("arr_decimal", pa.list_(pa.decimal128(10, 2))), + ] + ), + primary_keys=["id"], + ) + table_descriptor = fluss.TableDescriptor(schema) + await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) + + table = await connection.get_table(table_path) + upsert_writer = table.new_upsert().create_writer() + + await _upsert_and_wait( + upsert_writer, + { + "id": 1, + "arr_bytes": [b"\x10\x20\x30", None], + "arr_date": [date(2026, 1, 23), None], + "arr_time": [dt_time(10, 13, 47, 123000), None], + "arr_ts_ntz": [datetime(2026, 1, 23, 10, 13, 47, 123000)], + "arr_ts_ltz": [datetime(2026, 1, 23, 10, 13, 47, 123000, tzinfo=timezone.utc)], + "arr_decimal": [Decimal("123.45"), None], + }, + ) + + lookuper = table.new_lookup().create_lookuper() + result = await lookuper.lookup({"id": 1}) + assert result is not None + + assert result["arr_bytes"] == [b"\x10\x20\x30", None] + assert result["arr_date"] == [date(2026, 1, 23), None] + assert result["arr_time"] == [dt_time(10, 13, 47, 123000), None] + assert result["arr_ts_ntz"] == [datetime(2026, 1, 23, 10, 13, 47, 123000)] + assert result["arr_ts_ltz"] == [ + datetime(2026, 1, 23, 10, 13, 47, 123000, tzinfo=timezone.utc) + ] + assert result["arr_decimal"] == [Decimal("123.45"), None] + + await admin.drop_table(table_path, ignore_if_not_exists=False) + + +async def test_upsert_and_lookup_with_array_encoding_edge_cases(connection, admin): + """Test array encoding edge cases in KV table upsert/lookup.""" + table_path = fluss.TablePath("fluss", "py_test_kv_arrays_edge_cases") + await admin.drop_table(table_path, ignore_if_not_exists=True) + + schema = fluss.Schema( + pa.schema( + [ + pa.field("id", pa.int32()), + pa.field("arr_long_str", pa.list_(pa.string())), + pa.field("arr_big_decimal", pa.list_(pa.decimal128(22, 5))), + pa.field("arr_ts_nano", pa.list_(pa.timestamp("ns"))), + pa.field("arr_float", pa.list_(pa.float32())), + pa.field("arr_double", pa.list_(pa.float64())), + # TODO(fluss-python): support PyArrow FixedSizeBinary in schema conversion. + # Then switch this back to fixed-size binary: + # pa.field("arr_binary", pa.list_(pa.binary(4))), + pa.field("arr_binary", pa.list_(pa.binary())), + ] + ), + primary_keys=["id"], + ) + table_descriptor = fluss.TableDescriptor(schema) + await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) + + table = await connection.get_table(table_path) + upsert_writer = table.new_upsert().create_writer() + + await _upsert_and_wait( + upsert_writer, + { + "id": 1, + "arr_long_str": [ + "abcdefgh", + "this is a much longer string that definitely exceeds inline", + ], + "arr_big_decimal": [ + Decimal("12345678901234567.12345"), + Decimal("-99999999999999999.99999"), + ], + "arr_ts_nano": [datetime(2026, 1, 23, 10, 13, 47, 123456)], + "arr_float": [float("nan"), float("inf"), float("-inf")], + "arr_double": [float("nan"), float("inf"), float("-inf")], + "arr_binary": [b"\xde\xad\xbe\xef", b"\x00\x01\x02\x03"], + }, + ) + + lookuper = table.new_lookup().create_lookuper() + result = await lookuper.lookup({"id": 1}) + assert result is not None + + assert result["arr_long_str"] == [ + "abcdefgh", + "this is a much longer string that definitely exceeds inline", + ] + assert result["arr_big_decimal"] == [ + Decimal("12345678901234567.12345"), + Decimal("-99999999999999999.99999"), + ] + assert result["arr_ts_nano"] == [datetime(2026, 1, 23, 10, 13, 47, 123456)] + _assert_float_specials(result["arr_float"]) + _assert_float_specials(result["arr_double"]) + assert result["arr_binary"] == [b"\xde\xad\xbe\xef", b"\x00\x01\x02\x03"] + + await admin.drop_table(table_path, ignore_if_not_exists=False) + + async def test_all_supported_datatypes(connection, admin): """Test upsert/lookup for all supported data types, including nulls.""" table_path = fluss.TablePath("fluss", "py_test_kv_all_datatypes") @@ -358,6 +591,7 @@ async def test_all_supported_datatypes(connection, admin): pa.field("col_timestamp_ntz", pa.timestamp("us")), pa.field("col_timestamp_ltz", pa.timestamp("us", tz="UTC")), pa.field("col_bytes", pa.binary()), + pa.field("col_array", pa.list_(pa.string())), ] ), primary_keys=["pk_int"], @@ -385,10 +619,10 @@ async def test_all_supported_datatypes(connection, admin): "col_timestamp_ntz": datetime(2026, 1, 23, 10, 13, 47, 123000), "col_timestamp_ltz": datetime(2026, 1, 23, 10, 13, 47, 123000), "col_bytes": b"binary data", + "col_array": ["fluss", "python"], } - handle = upsert_writer.upsert(row_data) - await handle.wait() + await _upsert_and_wait(upsert_writer, row_data) lookuper = table.new_lookup().create_lookuper() result = await lookuper.lookup({"pk_int": 1}) @@ -411,14 +645,14 @@ async def test_all_supported_datatypes(connection, admin): 2026, 1, 23, 10, 13, 47, 123000, tzinfo=timezone.utc ) assert result["col_bytes"] == b"binary data" + assert result["col_array"] == ["fluss", "python"] # Test with null values for all nullable columns null_row = {"pk_int": 2} for col in row_data: if col != "pk_int": null_row[col] = None - handle = upsert_writer.upsert(null_row) - await handle.wait() + await _upsert_and_wait(upsert_writer, null_row) result = await lookuper.lookup({"pk_int": 2}) assert result is not None, "Row with nulls should exist" diff --git a/crates/fluss/tests/integration/kv_table.rs b/crates/fluss/tests/integration/kv_table.rs index 62e206b6..79f162b1 100644 --- a/crates/fluss/tests/integration/kv_table.rs +++ b/crates/fluss/tests/integration/kv_table.rs @@ -20,14 +20,41 @@ mod kv_table_test { use crate::integration::utils::{create_partitions, create_table, get_shared_cluster}; use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; - use fluss::row::{GenericRow, InternalRow}; + use fluss::row::binary_array::FlussArrayWriter; + use fluss::row::{FlussArray, GenericRow, InternalRow}; fn make_key(id: i32) -> GenericRow<'static> { - let mut row = GenericRow::new(3); + make_key_with_len(id, 3) + } + + fn make_key_with_len(id: i32, field_count: usize) -> GenericRow<'static> { + let mut row = GenericRow::new(field_count); row.set_field(0, id); row } + fn make_string_array(values: &[Option<&str>]) -> FlussArray { + let mut writer = FlussArrayWriter::new(values.len(), &DataTypes::string()); + for (idx, value) in values.iter().enumerate() { + match value { + Some(v) => writer.write_string(idx, v), + None => writer.set_null_at(idx), + } + } + writer.complete().expect("Failed to build string array") + } + + fn make_int_array(values: &[Option]) -> FlussArray { + let mut writer = FlussArrayWriter::new(values.len(), &DataTypes::int()); + for (idx, value) in values.iter().enumerate() { + match value { + Some(v) => writer.write_int(idx, *v), + None => writer.set_null_at(idx), + } + } + writer.complete().expect("Failed to build int array") + } + #[tokio::test] async fn upsert_delete_and_lookup() { let cluster = get_shared_cluster(); @@ -606,6 +633,7 @@ mod kv_table_test { // Binary types .column("col_bytes", DataTypes::bytes()) .column("col_binary", DataTypes::binary(20)) + .column("col_array", DataTypes::array(DataTypes::string())) .primary_key(vec!["pk_int"]) .build() .expect("Failed to build schema"), @@ -644,8 +672,10 @@ mod kv_table_test { let col_bytes: &[u8] = b"binary data"; let col_binary: &[u8] = b"fixed binary data!!!"; + let col_array = make_string_array(&[Some("fluss"), Some("rust")]); + // Upsert a row with all datatypes - let mut row = GenericRow::new(17); + let mut row = GenericRow::new(18); row.set_field(0, pk_int); row.set_field(1, col_boolean); row.set_field(2, col_tinyint); @@ -663,6 +693,7 @@ mod kv_table_test { row.set_field(14, col_timestamp_ltz); row.set_field(15, col_bytes); row.set_field(16, col_binary); + row.set_field(17, col_array); upsert_writer .upsert(&row) @@ -677,7 +708,7 @@ mod kv_table_test { .create_lookuper() .expect("Failed to create lookuper"); - let mut key = GenericRow::new(17); + let mut key = GenericRow::new(18); key.set_field(0, pk_int); let result = lookuper.lookup(&key).await.expect("Failed to lookup"); @@ -772,10 +803,14 @@ mod kv_table_test { col_binary, "col_binary mismatch" ); + let arr = found_row.get_array(17).unwrap(); + assert_eq!(arr.size(), 2, "col_array size mismatch"); + assert_eq!(arr.get_string(0).unwrap(), "fluss", "col_array[0] mismatch"); + assert_eq!(arr.get_string(1).unwrap(), "rust", "col_array[1] mismatch"); // Test with null values for nullable columns let pk_int_2 = 2i32; - let mut row_with_nulls = GenericRow::new(17); + let mut row_with_nulls = GenericRow::new(18); row_with_nulls.set_field(0, pk_int_2); row_with_nulls.set_field(1, Datum::Null); // col_boolean row_with_nulls.set_field(2, Datum::Null); // col_tinyint @@ -793,6 +828,7 @@ mod kv_table_test { row_with_nulls.set_field(14, Datum::Null); // col_timestamp_ltz row_with_nulls.set_field(15, Datum::Null); // col_bytes row_with_nulls.set_field(16, Datum::Null); // col_binary + row_with_nulls.set_field(17, Datum::Null); // col_array upsert_writer .upsert(&row_with_nulls) @@ -801,7 +837,7 @@ mod kv_table_test { .expect("Failed to wait for upsert acknowledgment"); // Lookup row with nulls - let mut key2 = GenericRow::new(17); + let mut key2 = GenericRow::new(18); key2.set_field(0, pk_int_2); let result = lookuper.lookup(&key2).await.expect("Failed to lookup"); @@ -880,6 +916,226 @@ mod kv_table_test { found_row_nulls.is_null_at(16).unwrap(), "col_binary should be null" ); + assert!( + found_row_nulls.is_null_at(17).unwrap(), + "col_array should be null" + ); + + admin + .drop_table(&table_path, false) + .await + .expect("Failed to drop table"); + } + + #[tokio::test] + async fn upsert_and_lookup_with_array() { + use fluss::row::Datum; + + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("Failed to get admin"); + + let table_path = TablePath::new("fluss", "test_kv_arrays_basic"); + + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("tags", DataTypes::array(DataTypes::string())) + .column("scores", DataTypes::array(DataTypes::int())) + .primary_key(vec!["id"]) + .build() + .expect("Failed to build schema"), + ) + .build() + .expect("Failed to build table descriptor"); + + create_table(&admin, &table_path, &table_descriptor).await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + + let upsert = table.new_upsert().expect("Failed to create upsert"); + let upsert_writer = upsert.create_writer().expect("Failed to create writer"); + + // Row 1: id=1, tags=["hello", "world"], scores=[10, 20, 30] + let mut row1 = GenericRow::new(3); + row1.set_field(0, 1_i32); + let tags1 = make_string_array(&[Some("hello"), Some("world")]); + let scores1 = make_int_array(&[Some(10), Some(20), Some(30)]); + row1.set_field(1, tags1); + row1.set_field(2, scores1); + + upsert_writer + .upsert(&row1) + .expect("upsert row1") + .await + .expect("ack row1"); + + // Row 2: id=2, tags=[null element], scores=[] (empty) + let mut row2 = GenericRow::new(3); + row2.set_field(0, 2_i32); + let tags2 = make_string_array(&[None]); + let scores2 = make_int_array(&[]); + row2.set_field(1, tags2); + row2.set_field(2, scores2); + + upsert_writer + .upsert(&row2) + .expect("upsert row2") + .await + .expect("ack row2"); + + // Row 3: id=3, tags=null, scores=[42] + let mut row3 = GenericRow::new(3); + row3.set_field(0, 3_i32); + row3.set_field(1, Datum::Null); + let scores3 = make_int_array(&[Some(42)]); + row3.set_field(2, scores3); + + upsert_writer + .upsert(&row3) + .expect("upsert row3") + .await + .expect("ack row3"); + + // Lookup and verify + let mut lookuper = table + .new_lookup() + .expect("Failed to create lookup") + .create_lookuper() + .expect("Failed to create lookuper"); + + // Verify row 1 + let result1 = lookuper.lookup(&make_key(1)).await.expect("lookup row1"); + let r1 = result1 + .get_single_row() + .expect("get row1") + .expect("row1 should exist"); + assert_eq!(r1.get_int(0).unwrap(), 1); + let tags_r1 = r1.get_array(1).unwrap(); + assert_eq!(tags_r1.size(), 2); + assert_eq!(tags_r1.get_string(0).unwrap(), "hello"); + assert_eq!(tags_r1.get_string(1).unwrap(), "world"); + let scores_r1 = r1.get_array(2).unwrap(); + assert_eq!(scores_r1.size(), 3); + assert_eq!(scores_r1.get_int(0).unwrap(), 10); + assert_eq!(scores_r1.get_int(1).unwrap(), 20); + assert_eq!(scores_r1.get_int(2).unwrap(), 30); + + // Verify row 2 + let result2 = lookuper.lookup(&make_key(2)).await.expect("lookup row2"); + let r2 = result2 + .get_single_row() + .expect("get row2") + .expect("row2 should exist"); + assert_eq!(r2.get_int(0).unwrap(), 2); + let tags_r2 = r2.get_array(1).unwrap(); + assert_eq!(tags_r2.size(), 1); + assert!(tags_r2.is_null_at(0)); + let scores_r2 = r2.get_array(2).unwrap(); + assert_eq!(scores_r2.size(), 0); + + // Verify row 3 + let result3 = lookuper.lookup(&make_key(3)).await.expect("lookup row3"); + let r3 = result3 + .get_single_row() + .expect("get row3") + .expect("row3 should exist"); + assert_eq!(r3.get_int(0).unwrap(), 3); + assert!(r3.is_null_at(1).unwrap()); + let scores_r3 = r3.get_array(2).unwrap(); + assert_eq!(scores_r3.size(), 1); + assert_eq!(scores_r3.get_int(0).unwrap(), 42); + + admin + .drop_table(&table_path, false) + .await + .expect("Failed to drop table"); + } + + #[tokio::test] + async fn upsert_and_lookup_with_nested_array() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("Failed to get admin"); + + let table_path = TablePath::new("fluss", "test_kv_arrays_nested"); + let inner_array_type = DataTypes::array(DataTypes::int()); + let matrix_type = DataTypes::array(inner_array_type.clone()); + + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("matrix", matrix_type) + .primary_key(vec!["id"]) + .build() + .expect("Failed to build schema"), + ) + .build() + .expect("Failed to build table descriptor"); + + create_table(&admin, &table_path, &table_descriptor).await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + + let upsert = table.new_upsert().expect("Failed to create upsert"); + let upsert_writer = upsert.create_writer().expect("Failed to create writer"); + + // Row: id=1, matrix=[[1, 2], [3, 4]] + let mut row = GenericRow::new(2); + row.set_field(0, 1_i32); + let inner1 = make_int_array(&[Some(1), Some(2)]); + let inner2 = make_int_array(&[Some(3), Some(4)]); + let outer = { + let mut w = FlussArrayWriter::new(2, &inner_array_type); + w.write_array(0, &inner1); + w.write_array(1, &inner2); + w.complete().expect("outer") + }; + row.set_field(1, outer); + + upsert_writer + .upsert(&row) + .expect("upsert") + .await + .expect("ack"); + + // Lookup and verify nested structure + let mut lookuper = table + .new_lookup() + .expect("Failed to create lookup") + .create_lookuper() + .expect("Failed to create lookuper"); + + let result = lookuper + .lookup(&make_key_with_len(1, 2)) + .await + .expect("lookup"); + let r = result + .get_single_row() + .expect("get row") + .expect("row should exist"); + + assert_eq!(r.get_int(0).unwrap(), 1); + let matrix: FlussArray = r.get_array(1).unwrap(); + assert_eq!(matrix.size(), 2); + + let row0 = matrix.get_array(0).unwrap(); + assert_eq!(row0.size(), 2); + assert_eq!(row0.get_int(0).unwrap(), 1); + assert_eq!(row0.get_int(1).unwrap(), 2); + + let row1 = matrix.get_array(1).unwrap(); + assert_eq!(row1.size(), 2); + assert_eq!(row1.get_int(0).unwrap(), 3); + assert_eq!(row1.get_int(1).unwrap(), 4); admin .drop_table(&table_path, false) diff --git a/crates/fluss/tests/integration/log_table.rs b/crates/fluss/tests/integration/log_table.rs index d10834e8..e9438570 100644 --- a/crates/fluss/tests/integration/log_table.rs +++ b/crates/fluss/tests/integration/log_table.rs @@ -579,6 +579,7 @@ mod table_test { /// in log tables. #[tokio::test] async fn all_supported_datatypes() { + use fluss::row::binary_array::FlussArrayWriter; use fluss::row::{Date, Datum, Decimal, GenericRow, Time, TimestampLtz, TimestampNtz}; let cluster = get_shared_cluster(); @@ -658,6 +659,7 @@ mod table_test { "col_timestamp_ltz_ns_neg", DataTypes::timestamp_ltz_with_precision(9), ) + .column("col_array", DataTypes::array(DataTypes::string())) .build() .expect("Failed to build schema"), ) @@ -719,6 +721,13 @@ mod table_test { let col_timestamp_ltz_ns_neg = TimestampLtz::from_millis_nanos(-301234154877, 999_999).unwrap(); + let col_array = { + let mut w = FlussArrayWriter::new(2, &DataTypes::string()); + w.write_string(0, "fluss"); + w.write_string(1, "rust"); + w.complete().expect("col_array") + }; + // Append a row with all datatypes let mut row = GenericRow::new(field_count); row.set_field(0, col_tinyint); @@ -750,6 +759,7 @@ mod table_test { row.set_field(26, col_timestamp_ns_neg); row.set_field(27, col_timestamp_ltz_us_neg); row.set_field(28, col_timestamp_ltz_ns_neg); + row.set_field(29, col_array); append_writer .append(&row) @@ -994,6 +1004,11 @@ mod table_test { "col_timestamp_ltz_ns_neg nanos mismatch" ); + let arr = found_row.get_array(29).unwrap(); + assert_eq!(arr.size(), 2, "col_array size mismatch"); + assert_eq!(arr.get_string(0).unwrap(), "fluss", "col_array[0] mismatch"); + assert_eq!(arr.get_string(1).unwrap(), "rust", "col_array[1] mismatch"); + // Verify row with all nulls (record index 1) let found_row_nulls = records[1].row(); for i in 0..field_count { @@ -1373,4 +1388,546 @@ mod table_test { .await .expect("Failed to drop table"); } + + #[tokio::test] + async fn append_and_scan_with_array() { + use fluss::row::binary_array::FlussArrayWriter; + use fluss::row::{Datum, GenericRow}; + + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("Failed to get admin"); + + let table_path = TablePath::new("fluss", "test_log_arrays_basic"); + let schema = Schema::builder() + .column("id", DataTypes::int()) + .column("tags", DataTypes::array(DataTypes::string())) + .column("scores", DataTypes::array(DataTypes::int())) + .build() + .expect("Failed to build schema"); + + let table_descriptor = TableDescriptor::builder() + .schema(schema) + .build() + .expect("Failed to build table descriptor"); + + create_table(&admin, &table_path, &table_descriptor).await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + + let append_writer = table + .new_append() + .expect("Failed to create append") + .create_writer() + .expect("Failed to create writer"); + + // Row 1: id=1, tags=["hello", "world"], scores=[10, 20, 30] + let mut row1 = GenericRow::new(3); + row1.set_field(0, 1_i32); + + let tags1 = { + let mut w = FlussArrayWriter::new(2, &DataTypes::string()); + w.write_string(0, "hello"); + w.write_string(1, "world"); + w.complete().expect("tags1") + }; + let scores1 = { + let mut w = FlussArrayWriter::new(3, &DataTypes::int()); + w.write_int(0, 10); + w.write_int(1, 20); + w.write_int(2, 30); + w.complete().expect("scores1") + }; + row1.set_field(1, tags1); + row1.set_field(2, scores1); + + // Row 2: id=2, tags=[null], scores=[] + let mut row2 = GenericRow::new(3); + row2.set_field(0, 2_i32); + + let tags2 = { + let mut w = FlussArrayWriter::new(1, &DataTypes::string()); + w.set_null_at(0); + w.complete().expect("tags2") + }; + let scores2 = { + let w = FlussArrayWriter::new(0, &DataTypes::int()); + w.complete().expect("scores2") + }; + row2.set_field(1, tags2); + row2.set_field(2, scores2); + + // Row 3: id=3, tags=null, scores=[42] + let mut row3 = GenericRow::new(3); + row3.set_field(0, 3_i32); + row3.set_field(1, Datum::Null); + let scores3 = { + let mut w = FlussArrayWriter::new(1, &DataTypes::int()); + w.write_int(0, 42); + w.complete().expect("scores3") + }; + row3.set_field(2, scores3); + + append_writer.append(&row1).expect("append row1"); + append_writer.append(&row2).expect("append row2"); + append_writer.append(&row3).expect("append row3"); + append_writer.flush().await.expect("Failed to flush"); + + let records = scan_table(&table, |scan| scan).await; + assert_eq!(records.len(), 3, "expected three log records"); + + let r0 = records[0].row(); + assert_eq!(r0.get_int(0).unwrap(), 1); + let tags_r0 = r0.get_array(1).unwrap(); + assert_eq!(tags_r0.size(), 2); + assert_eq!(tags_r0.get_string(0).unwrap(), "hello"); + assert_eq!(tags_r0.get_string(1).unwrap(), "world"); + let scores_r0 = r0.get_array(2).unwrap(); + assert_eq!(scores_r0.size(), 3); + assert_eq!(scores_r0.get_int(0).unwrap(), 10); + assert_eq!(scores_r0.get_int(1).unwrap(), 20); + assert_eq!(scores_r0.get_int(2).unwrap(), 30); + + let r1 = records[1].row(); + assert_eq!(r1.get_int(0).unwrap(), 2); + let tags_r1 = r1.get_array(1).unwrap(); + assert_eq!(tags_r1.size(), 1); + assert!(tags_r1.is_null_at(0)); + let scores_r1 = r1.get_array(2).unwrap(); + assert_eq!(scores_r1.size(), 0); + + let r2 = records[2].row(); + assert_eq!(r2.get_int(0).unwrap(), 3); + assert!(r2.is_null_at(1).unwrap()); + let scores_r2 = r2.get_array(2).unwrap(); + assert_eq!(scores_r2.size(), 1); + assert_eq!(scores_r2.get_int(0).unwrap(), 42); + + admin + .drop_table(&table_path, false) + .await + .expect("Failed to drop table"); + } + + #[tokio::test] + async fn append_and_scan_with_nested_array() { + use fluss::row::binary_array::FlussArrayWriter; + use fluss::row::{FlussArray, GenericRow}; + + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("Failed to get admin"); + + let table_path = TablePath::new("fluss", "test_log_arrays_nested"); + let inner_array_type = DataTypes::array(DataTypes::int()); + let matrix_type = DataTypes::array(inner_array_type.clone()); + + let schema = Schema::builder() + .column("id", DataTypes::int()) + .column("matrix", matrix_type.clone()) + .build() + .expect("Failed to build schema"); + + let table_descriptor = TableDescriptor::builder() + .schema(schema) + .build() + .expect("Failed to build table descriptor"); + + create_table(&admin, &table_path, &table_descriptor).await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + + let append_writer = table + .new_append() + .expect("Failed to create append") + .create_writer() + .expect("Failed to create writer"); + + fn build_inner_ints(vals: &[i32]) -> FlussArray { + let mut w = FlussArrayWriter::new(vals.len(), &DataTypes::int()); + for (i, v) in vals.iter().enumerate() { + w.write_int(i, *v); + } + w.complete().expect("inner array") + } + + // Row 1: matrix = [[1,2],[3,4]] + let mut row1 = GenericRow::new(2); + row1.set_field(0, 1_i32); + let m1 = { + let mut w = FlussArrayWriter::new(2, &inner_array_type); + w.write_array(0, &build_inner_ints(&[1, 2])); + w.write_array(1, &build_inner_ints(&[3, 4])); + w.complete().expect("matrix1") + }; + row1.set_field(1, m1); + + // Row 2: matrix = [[5], null, []] + let mut row2 = GenericRow::new(2); + row2.set_field(0, 2_i32); + let m2 = { + let mut w = FlussArrayWriter::new(3, &inner_array_type); + w.write_array(0, &build_inner_ints(&[5])); + w.set_null_at(1); + let empty: FlussArray = FlussArrayWriter::new(0, &DataTypes::int()) + .complete() + .unwrap(); + w.write_array(2, &empty); + w.complete().expect("matrix2") + }; + row2.set_field(1, m2); + + append_writer.append(&row1).expect("append row1"); + append_writer.append(&row2).expect("append row2"); + append_writer.flush().await.expect("Failed to flush"); + + let records = scan_table(&table, |scan| scan).await; + assert_eq!(records.len(), 2); + + let r0 = records[0].row(); + assert_eq!(r0.get_int(0).unwrap(), 1); + let mat0 = r0.get_array(1).unwrap(); + assert_eq!(mat0.size(), 2); + let a00 = mat0.get_array(0).unwrap(); + assert_eq!(a00.size(), 2); + assert_eq!(a00.get_int(0).unwrap(), 1); + assert_eq!(a00.get_int(1).unwrap(), 2); + let a01 = mat0.get_array(1).unwrap(); + assert_eq!(a01.size(), 2); + assert_eq!(a01.get_int(0).unwrap(), 3); + assert_eq!(a01.get_int(1).unwrap(), 4); + + let r1 = records[1].row(); + assert_eq!(r1.get_int(0).unwrap(), 2); + let mat1 = r1.get_array(1).unwrap(); + assert_eq!(mat1.size(), 3); + let b0 = mat1.get_array(0).unwrap(); + assert_eq!(b0.size(), 1); + assert_eq!(b0.get_int(0).unwrap(), 5); + assert!(mat1.is_null_at(1)); + let b2 = mat1.get_array(2).unwrap(); + assert_eq!(b2.size(), 0); + + admin + .drop_table(&table_path, false) + .await + .expect("Failed to drop table"); + } + + #[tokio::test] + async fn append_and_scan_with_array_rich_types() { + use fluss::row::binary_array::FlussArrayWriter; + use fluss::row::{Date, Decimal, GenericRow, Time, TimestampNtz}; + + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("Failed to get admin"); + + let table_path = TablePath::new("fluss", "test_log_arrays_rich_types"); + let ts_val = TimestampNtz::from_millis_nanos(1769163227123, 456000).unwrap(); + let dec_val = Decimal::from_unscaled_long(12345, 10, 2).unwrap(); + + let schema = Schema::builder() + .column("id", DataTypes::int()) + .column("arr_bytes", DataTypes::array(DataTypes::bytes())) + .column("arr_date", DataTypes::array(DataTypes::date())) + .column( + "arr_time", + DataTypes::array(DataTypes::time_with_precision(3)), + ) + .column( + "arr_ts", + DataTypes::array(DataTypes::timestamp_with_precision(6)), + ) + .column("arr_decimal", DataTypes::array(DataTypes::decimal(10, 2))) + .build() + .expect("Failed to build schema"); + + let table_descriptor = TableDescriptor::builder() + .schema(schema) + .build() + .expect("Failed to build table descriptor"); + + create_table(&admin, &table_path, &table_descriptor).await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + + let append_writer = table + .new_append() + .expect("Failed to create append") + .create_writer() + .expect("Failed to create writer"); + + let mut row = GenericRow::new(6); + row.set_field(0, 1_i32); + + let elem_bytes = &[0_u8, 1, 2, 255]; + let d = Date::new(20476); + let t = Time::new(36827123); + + let arr_bytes = { + let mut w = FlussArrayWriter::new(2, &DataTypes::bytes()); + w.write_binary_bytes(0, elem_bytes); + w.set_null_at(1); + w.complete().expect("arr_bytes") + }; + let arr_date = { + let mut w = FlussArrayWriter::new(2, &DataTypes::date()); + w.write_date(0, d); + w.set_null_at(1); + w.complete().expect("arr_date") + }; + let arr_time = { + let mut w = FlussArrayWriter::new(2, &DataTypes::time_with_precision(3)); + w.write_time(0, t); + w.set_null_at(1); + w.complete().expect("arr_time") + }; + let arr_ts = { + let mut w = FlussArrayWriter::new(2, &DataTypes::timestamp_with_precision(6)); + w.write_timestamp_ntz(0, &ts_val, 6); + w.set_null_at(1); + w.complete().expect("arr_ts") + }; + let arr_decimal = { + let mut w = FlussArrayWriter::new(2, &DataTypes::decimal(10, 2)); + w.write_decimal(0, &dec_val, 10); + w.set_null_at(1); + w.complete().expect("arr_decimal") + }; + + row.set_field(1, arr_bytes); + row.set_field(2, arr_date); + row.set_field(3, arr_time); + row.set_field(4, arr_ts); + row.set_field(5, arr_decimal); + + append_writer.append(&row).expect("append"); + append_writer.flush().await.expect("Failed to flush"); + + let records = scan_table(&table, |scan| scan).await; + assert_eq!(records.len(), 1); + let r = records[0].row(); + + let ab = r.get_array(1).unwrap(); + assert_eq!(ab.size(), 2); + assert_eq!(ab.get_binary(0).unwrap(), elem_bytes); + assert!(ab.is_null_at(1)); + + let ad = r.get_array(2).unwrap(); + assert_eq!(ad.size(), 2); + assert_eq!(ad.get_date(0).unwrap().get_inner(), d.get_inner()); + assert!(ad.is_null_at(1)); + + let at = r.get_array(3).unwrap(); + assert_eq!(at.size(), 2); + assert_eq!(at.get_time(0).unwrap().get_inner(), t.get_inner()); + assert!(at.is_null_at(1)); + + let ats = r.get_array(4).unwrap(); + assert_eq!(ats.size(), 2); + let read_ts = ats.get_timestamp_ntz(0, 6).unwrap(); + assert_eq!(read_ts.get_millisecond(), ts_val.get_millisecond()); + assert_eq!( + read_ts.get_nano_of_millisecond(), + ts_val.get_nano_of_millisecond() + ); + assert!(ats.is_null_at(1)); + + let adc = r.get_array(5).unwrap(); + assert_eq!(adc.size(), 2); + assert_eq!(adc.get_decimal(0, 10, 2).unwrap(), dec_val); + assert!(adc.is_null_at(1)); + + admin + .drop_table(&table_path, false) + .await + .expect("Failed to drop table"); + } + + #[tokio::test] + async fn append_and_scan_with_array_encoding_edge_cases() { + use fluss::row::binary_array::FlussArrayWriter; + use fluss::row::{Decimal, FlussArray, GenericRow, TimestampNtz}; + + fn assert_f32_special(actual: f32, expected: f32) { + if expected.is_nan() { + assert!(actual.is_nan(), "expected NaN"); + } else if expected.is_infinite() { + assert_eq!(actual.is_infinite(), true); + assert_eq!(actual.signum(), expected.signum()); + } else { + assert!((actual - expected).abs() < f32::EPSILON); + } + } + + fn assert_f64_special(actual: f64, expected: f64) { + if expected.is_nan() { + assert!(actual.is_nan(), "expected NaN"); + } else if expected.is_infinite() { + assert_eq!(actual.is_infinite(), true); + assert_eq!(actual.signum(), expected.signum()); + } else { + assert!((actual - expected).abs() < f64::EPSILON); + } + } + + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("Failed to get admin"); + + let table_path = TablePath::new("fluss", "test_log_arrays_edge_cases"); + + // DECIMAL(22,5) non-compact: unscaled 1234567890123456709876 (22 digits) + let edge_decimal = + Decimal::from_unscaled_bytes(&[66, 237, 18, 59, 11, 216, 31, 4, 244], 22, 5) + .expect("edge decimal"); + + let schema = Schema::builder() + .column("id", DataTypes::int()) + .column("arr_long_str", DataTypes::array(DataTypes::string())) + .column( + "arr_big_decimal", + DataTypes::array(DataTypes::decimal(22, 5)), + ) + .column( + "arr_ts_nano", + DataTypes::array(DataTypes::timestamp_with_precision(9)), + ) + .column("arr_float", DataTypes::array(DataTypes::float())) + .column("arr_double", DataTypes::array(DataTypes::double())) + .column("arr_binary", DataTypes::array(DataTypes::binary(4))) + .build() + .expect("Failed to build schema"); + + let table_descriptor = TableDescriptor::builder() + .schema(schema) + .build() + .expect("Failed to build table descriptor"); + + create_table(&admin, &table_path, &table_descriptor).await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + + let append_writer = table + .new_append() + .expect("Failed to create append") + .create_writer() + .expect("Failed to create writer"); + + let ts_edge = TimestampNtz::from_millis_nanos(1769163227123, 999_999).unwrap(); + + let arr_long_str = { + let mut w = FlussArrayWriter::new(2, &DataTypes::string()); + // >= 8 bytes to exercise heap-backed string slots + w.write_string(0, "abcdefghi"); + w.write_string(1, "longstring_here"); + w.complete().expect("arr_long_str") + }; + + let arr_big_decimal = { + let mut w = FlussArrayWriter::new(1, &DataTypes::decimal(22, 5)); + w.write_decimal(0, &edge_decimal, 22); + w.complete().expect("arr_big_decimal") + }; + + let arr_ts_nano = { + let mut w = FlussArrayWriter::new(1, &DataTypes::timestamp_with_precision(9)); + w.write_timestamp_ntz(0, &ts_edge, 9); + w.complete().expect("arr_ts_nano") + }; + + let arr_float = { + let mut w = FlussArrayWriter::new(3, &DataTypes::float()); + w.write_float(0, f32::NAN); + w.write_float(1, f32::INFINITY); + w.write_float(2, f32::NEG_INFINITY); + w.complete().expect("arr_float") + }; + + let arr_double = { + let mut w = FlussArrayWriter::new(3, &DataTypes::double()); + w.write_double(0, f64::NAN); + w.write_double(1, f64::INFINITY); + w.write_double(2, f64::NEG_INFINITY); + w.complete().expect("arr_double") + }; + + let fixed_a: Vec = vec![0xDE, 0xAD, 0xBE, 0xEF]; + let fixed_b: Vec = vec![0x01, 0x02, 0x03, 0x04]; + let arr_binary = { + let mut w = FlussArrayWriter::new(2, &DataTypes::binary(4)); + w.write_binary_bytes(0, &fixed_a); + w.write_binary_bytes(1, &fixed_b); + w.complete().expect("arr_binary") + }; + + let mut row = GenericRow::new(7); + row.set_field(0, 1_i32); + row.set_field(1, arr_long_str); + row.set_field(2, arr_big_decimal); + row.set_field(3, arr_ts_nano); + row.set_field(4, arr_float); + row.set_field(5, arr_double); + row.set_field(6, arr_binary); + + append_writer.append(&row).expect("append"); + append_writer.flush().await.expect("Failed to flush"); + + let records = scan_table(&table, |scan| scan).await; + assert_eq!(records.len(), 1); + let r = records[0].row(); + + let s = r.get_array(1).unwrap(); + assert_eq!(s.size(), 2); + assert_eq!(s.get_string(0).unwrap(), "abcdefghi"); + assert_eq!(s.get_string(1).unwrap(), "longstring_here"); + + let dec_a = r.get_array(2).unwrap(); + assert_eq!(dec_a.size(), 1); + assert_eq!(dec_a.get_decimal(0, 22, 5).unwrap(), edge_decimal); + + let ts_a = r.get_array(3).unwrap(); + assert_eq!(ts_a.size(), 1); + let read_ts = ts_a.get_timestamp_ntz(0, 9).unwrap(); + assert_eq!(read_ts.get_millisecond(), ts_edge.get_millisecond()); + assert_eq!( + read_ts.get_nano_of_millisecond(), + ts_edge.get_nano_of_millisecond() + ); + + let f_a = r.get_array(4).unwrap(); + assert_eq!(f_a.size(), 3); + assert_f32_special(f_a.get_float(0).unwrap(), f32::NAN); + assert_f32_special(f_a.get_float(1).unwrap(), f32::INFINITY); + assert_f32_special(f_a.get_float(2).unwrap(), f32::NEG_INFINITY); + + let d_a = r.get_array(5).unwrap(); + assert_eq!(d_a.size(), 3); + assert_f64_special(d_a.get_double(0).unwrap(), f64::NAN); + assert_f64_special(d_a.get_double(1).unwrap(), f64::INFINITY); + assert_f64_special(d_a.get_double(2).unwrap(), f64::NEG_INFINITY); + + let fb: FlussArray = r.get_array(6).unwrap(); + assert_eq!(fb.size(), 2); + assert_eq!(fb.get_binary(0).unwrap(), fixed_a.as_slice()); + assert_eq!(fb.get_binary(1).unwrap(), fixed_b.as_slice()); + + admin + .drop_table(&table_path, false) + .await + .expect("Failed to drop table"); + } } From baf6a4defe74b14ada55a01c102d96a090fa313c Mon Sep 17 00:00:00 2001 From: charlesdong1991 Date: Sun, 3 May 2026 11:14:11 +0200 Subject: [PATCH 2/4] Address review comments --- crates/fluss/tests/integration/kv_table.rs | 32 +++----------- crates/fluss/tests/integration/log_table.rs | 49 +++++---------------- crates/fluss/tests/integration/utils.rs | 26 ++++++++++- 3 files changed, 42 insertions(+), 65 deletions(-) diff --git a/crates/fluss/tests/integration/kv_table.rs b/crates/fluss/tests/integration/kv_table.rs index 79f162b1..d1bfd6d6 100644 --- a/crates/fluss/tests/integration/kv_table.rs +++ b/crates/fluss/tests/integration/kv_table.rs @@ -18,43 +18,23 @@ #[cfg(test)] mod kv_table_test { - use crate::integration::utils::{create_partitions, create_table, get_shared_cluster}; + use crate::integration::utils::{ + create_partitions, create_table, get_shared_cluster, make_int_array, make_string_array, + }; use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; use fluss::row::binary_array::FlussArrayWriter; use fluss::row::{FlussArray, GenericRow, InternalRow}; fn make_key(id: i32) -> GenericRow<'static> { - make_key_with_len(id, 3) + make_key_with_field_count(id, 3) } - fn make_key_with_len(id: i32, field_count: usize) -> GenericRow<'static> { + fn make_key_with_field_count(id: i32, field_count: usize) -> GenericRow<'static> { let mut row = GenericRow::new(field_count); row.set_field(0, id); row } - fn make_string_array(values: &[Option<&str>]) -> FlussArray { - let mut writer = FlussArrayWriter::new(values.len(), &DataTypes::string()); - for (idx, value) in values.iter().enumerate() { - match value { - Some(v) => writer.write_string(idx, v), - None => writer.set_null_at(idx), - } - } - writer.complete().expect("Failed to build string array") - } - - fn make_int_array(values: &[Option]) -> FlussArray { - let mut writer = FlussArrayWriter::new(values.len(), &DataTypes::int()); - for (idx, value) in values.iter().enumerate() { - match value { - Some(v) => writer.write_int(idx, *v), - None => writer.set_null_at(idx), - } - } - writer.complete().expect("Failed to build int array") - } - #[tokio::test] async fn upsert_delete_and_lookup() { let cluster = get_shared_cluster(); @@ -1115,7 +1095,7 @@ mod kv_table_test { .expect("Failed to create lookuper"); let result = lookuper - .lookup(&make_key_with_len(1, 2)) + .lookup(&make_key_with_field_count(1, 2)) .await .expect("lookup"); let r = result diff --git a/crates/fluss/tests/integration/log_table.rs b/crates/fluss/tests/integration/log_table.rs index e9438570..97b27b10 100644 --- a/crates/fluss/tests/integration/log_table.rs +++ b/crates/fluss/tests/integration/log_table.rs @@ -18,7 +18,9 @@ #[cfg(test)] mod table_test { - use crate::integration::utils::{create_partitions, create_table, get_shared_cluster}; + use crate::integration::utils::{ + create_partitions, create_table, get_shared_cluster, make_int_array, make_string_array, + }; use arrow::array::record_batch; use fluss::client::{EARLIEST_OFFSET, FlussTable, TableScan}; use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; @@ -579,7 +581,6 @@ mod table_test { /// in log tables. #[tokio::test] async fn all_supported_datatypes() { - use fluss::row::binary_array::FlussArrayWriter; use fluss::row::{Date, Datum, Decimal, GenericRow, Time, TimestampLtz, TimestampNtz}; let cluster = get_shared_cluster(); @@ -721,12 +722,7 @@ mod table_test { let col_timestamp_ltz_ns_neg = TimestampLtz::from_millis_nanos(-301234154877, 999_999).unwrap(); - let col_array = { - let mut w = FlussArrayWriter::new(2, &DataTypes::string()); - w.write_string(0, "fluss"); - w.write_string(1, "rust"); - w.complete().expect("col_array") - }; + let col_array = make_string_array(&[Some("fluss"), Some("rust")]); // Append a row with all datatypes let mut row = GenericRow::new(field_count); @@ -1391,7 +1387,6 @@ mod table_test { #[tokio::test] async fn append_and_scan_with_array() { - use fluss::row::binary_array::FlussArrayWriter; use fluss::row::{Datum, GenericRow}; let cluster = get_shared_cluster(); @@ -1428,19 +1423,8 @@ mod table_test { let mut row1 = GenericRow::new(3); row1.set_field(0, 1_i32); - let tags1 = { - let mut w = FlussArrayWriter::new(2, &DataTypes::string()); - w.write_string(0, "hello"); - w.write_string(1, "world"); - w.complete().expect("tags1") - }; - let scores1 = { - let mut w = FlussArrayWriter::new(3, &DataTypes::int()); - w.write_int(0, 10); - w.write_int(1, 20); - w.write_int(2, 30); - w.complete().expect("scores1") - }; + let tags1 = make_string_array(&[Some("hello"), Some("world")]); + let scores1 = make_int_array(&[Some(10), Some(20), Some(30)]); row1.set_field(1, tags1); row1.set_field(2, scores1); @@ -1448,15 +1432,8 @@ mod table_test { let mut row2 = GenericRow::new(3); row2.set_field(0, 2_i32); - let tags2 = { - let mut w = FlussArrayWriter::new(1, &DataTypes::string()); - w.set_null_at(0); - w.complete().expect("tags2") - }; - let scores2 = { - let w = FlussArrayWriter::new(0, &DataTypes::int()); - w.complete().expect("scores2") - }; + let tags2 = make_string_array(&[None]); + let scores2 = make_int_array(&[]); row2.set_field(1, tags2); row2.set_field(2, scores2); @@ -1464,11 +1441,7 @@ mod table_test { let mut row3 = GenericRow::new(3); row3.set_field(0, 3_i32); row3.set_field(1, Datum::Null); - let scores3 = { - let mut w = FlussArrayWriter::new(1, &DataTypes::int()); - w.write_int(0, 42); - w.complete().expect("scores3") - }; + let scores3 = make_int_array(&[Some(42)]); row3.set_field(2, scores3); append_writer.append(&row1).expect("append row1"); @@ -1763,7 +1736,7 @@ mod table_test { if expected.is_nan() { assert!(actual.is_nan(), "expected NaN"); } else if expected.is_infinite() { - assert_eq!(actual.is_infinite(), true); + assert!(actual.is_infinite()); assert_eq!(actual.signum(), expected.signum()); } else { assert!((actual - expected).abs() < f32::EPSILON); @@ -1774,7 +1747,7 @@ mod table_test { if expected.is_nan() { assert!(actual.is_nan(), "expected NaN"); } else if expected.is_infinite() { - assert_eq!(actual.is_infinite(), true); + assert!(actual.is_infinite()); assert_eq!(actual.signum(), expected.signum()); } else { assert!((actual - expected).abs() < f64::EPSILON); diff --git a/crates/fluss/tests/integration/utils.rs b/crates/fluss/tests/integration/utils.rs index dc2876f8..81a7c0b1 100644 --- a/crates/fluss/tests/integration/utils.rs +++ b/crates/fluss/tests/integration/utils.rs @@ -17,7 +17,9 @@ */ use crate::integration::fluss_cluster::{FlussTestingCluster, FlussTestingClusterBuilder}; use fluss::client::FlussAdmin; -use fluss::metadata::{PartitionSpec, TableDescriptor, TablePath}; +use fluss::metadata::{DataTypes, PartitionSpec, TableDescriptor, TablePath}; +use fluss::row::FlussArray; +use fluss::row::binary_array::FlussArrayWriter; use std::collections::HashMap; use std::sync::Arc; use std::sync::LazyLock; @@ -94,6 +96,28 @@ pub async fn create_table( .expect("Failed to create table"); } +pub fn make_string_array(values: &[Option<&str>]) -> FlussArray { + let mut writer = FlussArrayWriter::new(values.len(), &DataTypes::string()); + for (idx, value) in values.iter().enumerate() { + match value { + Some(v) => writer.write_string(idx, v), + None => writer.set_null_at(idx), + } + } + writer.complete().expect("Failed to build string array") +} + +pub fn make_int_array(values: &[Option]) -> FlussArray { + let mut writer = FlussArrayWriter::new(values.len(), &DataTypes::int()); + for (idx, value) in values.iter().enumerate() { + match value { + Some(v) => writer.write_int(idx, *v), + None => writer.set_null_at(idx), + } + } + writer.complete().expect("Failed to build int array") +} + /// Similar to wait_for_cluster_ready but connects with SASL credentials. pub async fn wait_for_cluster_ready_with_sasl(cluster: &FlussTestingCluster) { let timeout = Duration::from_secs(30); From 28ee28fa469783ce0556b193885f6d7e8df23b74 Mon Sep 17 00:00:00 2001 From: charlesdong1991 Date: Tue, 5 May 2026 20:08:24 +0200 Subject: [PATCH 3/4] Consolidate tests --- crates/fluss/tests/integration/kv_table.rs | 176 +++---- crates/fluss/tests/integration/log_table.rs | 535 ++++++++------------ 2 files changed, 286 insertions(+), 425 deletions(-) diff --git a/crates/fluss/tests/integration/kv_table.rs b/crates/fluss/tests/integration/kv_table.rs index d1bfd6d6..2f70088b 100644 --- a/crates/fluss/tests/integration/kv_table.rs +++ b/crates/fluss/tests/integration/kv_table.rs @@ -915,7 +915,8 @@ mod kv_table_test { let connection = cluster.get_fluss_connection().await; let admin = connection.get_admin().expect("Failed to get admin"); - let table_path = TablePath::new("fluss", "test_kv_arrays_basic"); + let table_path = TablePath::new("fluss", "test_kv_arrays"); + let inner_array_type = DataTypes::array(DataTypes::int()); let table_descriptor = TableDescriptor::builder() .schema( @@ -923,6 +924,7 @@ mod kv_table_test { .column("id", DataTypes::int()) .column("tags", DataTypes::array(DataTypes::string())) .column("scores", DataTypes::array(DataTypes::int())) + .column("matrix", DataTypes::array(inner_array_type.clone())) .primary_key(vec!["id"]) .build() .expect("Failed to build schema"), @@ -940,13 +942,18 @@ mod kv_table_test { let upsert = table.new_upsert().expect("Failed to create upsert"); let upsert_writer = upsert.create_writer().expect("Failed to create writer"); - // Row 1: id=1, tags=["hello", "world"], scores=[10, 20, 30] - let mut row1 = GenericRow::new(3); + // Row 1: id=1, tags=["hello", "world"], scores=[10, 20, 30], matrix=[[1,2],[3,4]] + let mut row1 = GenericRow::new(4); row1.set_field(0, 1_i32); - let tags1 = make_string_array(&[Some("hello"), Some("world")]); - let scores1 = make_int_array(&[Some(10), Some(20), Some(30)]); - row1.set_field(1, tags1); - row1.set_field(2, scores1); + row1.set_field(1, make_string_array(&[Some("hello"), Some("world")])); + row1.set_field(2, make_int_array(&[Some(10), Some(20), Some(30)])); + let m1 = { + let mut w = FlussArrayWriter::new(2, &inner_array_type); + w.write_array(0, &make_int_array(&[Some(1), Some(2)])); + w.write_array(1, &make_int_array(&[Some(3), Some(4)])); + w.complete().expect("matrix1") + }; + row1.set_field(3, m1); upsert_writer .upsert(&row1) @@ -954,13 +961,12 @@ mod kv_table_test { .await .expect("ack row1"); - // Row 2: id=2, tags=[null element], scores=[] (empty) - let mut row2 = GenericRow::new(3); + // Row 2: id=2, tags=[null element], scores=[] (empty), matrix=null + let mut row2 = GenericRow::new(4); row2.set_field(0, 2_i32); - let tags2 = make_string_array(&[None]); - let scores2 = make_int_array(&[]); - row2.set_field(1, tags2); - row2.set_field(2, scores2); + row2.set_field(1, make_string_array(&[None])); + row2.set_field(2, make_int_array(&[])); + row2.set_field(3, Datum::Null); upsert_writer .upsert(&row2) @@ -968,12 +974,19 @@ mod kv_table_test { .await .expect("ack row2"); - // Row 3: id=3, tags=null, scores=[42] - let mut row3 = GenericRow::new(3); + // Row 3: id=3, tags=null, scores=[42], matrix=[[5], null, []] + let mut row3 = GenericRow::new(4); row3.set_field(0, 3_i32); row3.set_field(1, Datum::Null); - let scores3 = make_int_array(&[Some(42)]); - row3.set_field(2, scores3); + row3.set_field(2, make_int_array(&[Some(42)])); + let m3 = { + let mut w = FlussArrayWriter::new(3, &inner_array_type); + w.write_array(0, &make_int_array(&[Some(5)])); + w.set_null_at(1); + w.write_array(2, &make_int_array(&[])); + w.complete().expect("matrix3") + }; + row3.set_field(3, m3); upsert_writer .upsert(&row3) @@ -988,8 +1001,11 @@ mod kv_table_test { .create_lookuper() .expect("Failed to create lookuper"); - // Verify row 1 - let result1 = lookuper.lookup(&make_key(1)).await.expect("lookup row1"); + // Verify row 1: populated flat arrays + nested array + let result1 = lookuper + .lookup(&make_key_with_field_count(1, 4)) + .await + .expect("lookup row1"); let r1 = result1 .get_single_row() .expect("get row1") @@ -1004,9 +1020,22 @@ mod kv_table_test { assert_eq!(scores_r1.get_int(0).unwrap(), 10); assert_eq!(scores_r1.get_int(1).unwrap(), 20); assert_eq!(scores_r1.get_int(2).unwrap(), 30); - - // Verify row 2 - let result2 = lookuper.lookup(&make_key(2)).await.expect("lookup row2"); + let matrix_r1: FlussArray = r1.get_array(3).unwrap(); + assert_eq!(matrix_r1.size(), 2); + let mr1_0 = matrix_r1.get_array(0).unwrap(); + assert_eq!(mr1_0.size(), 2); + assert_eq!(mr1_0.get_int(0).unwrap(), 1); + assert_eq!(mr1_0.get_int(1).unwrap(), 2); + let mr1_1 = matrix_r1.get_array(1).unwrap(); + assert_eq!(mr1_1.size(), 2); + assert_eq!(mr1_1.get_int(0).unwrap(), 3); + assert_eq!(mr1_1.get_int(1).unwrap(), 4); + + // Verify row 2: null element in array, empty array, null nested column + let result2 = lookuper + .lookup(&make_key_with_field_count(2, 4)) + .await + .expect("lookup row2"); let r2 = result2 .get_single_row() .expect("get row2") @@ -1017,9 +1046,13 @@ mod kv_table_test { assert!(tags_r2.is_null_at(0)); let scores_r2 = r2.get_array(2).unwrap(); assert_eq!(scores_r2.size(), 0); + assert!(r2.is_null_at(3).unwrap()); - // Verify row 3 - let result3 = lookuper.lookup(&make_key(3)).await.expect("lookup row3"); + // Verify row 3: null flat column, nested array with mixed inner (value, null, empty) + let result3 = lookuper + .lookup(&make_key_with_field_count(3, 4)) + .await + .expect("lookup row3"); let r3 = result3 .get_single_row() .expect("get row3") @@ -1029,93 +1062,14 @@ mod kv_table_test { let scores_r3 = r3.get_array(2).unwrap(); assert_eq!(scores_r3.size(), 1); assert_eq!(scores_r3.get_int(0).unwrap(), 42); - - admin - .drop_table(&table_path, false) - .await - .expect("Failed to drop table"); - } - - #[tokio::test] - async fn upsert_and_lookup_with_nested_array() { - let cluster = get_shared_cluster(); - let connection = cluster.get_fluss_connection().await; - let admin = connection.get_admin().expect("Failed to get admin"); - - let table_path = TablePath::new("fluss", "test_kv_arrays_nested"); - let inner_array_type = DataTypes::array(DataTypes::int()); - let matrix_type = DataTypes::array(inner_array_type.clone()); - - let table_descriptor = TableDescriptor::builder() - .schema( - Schema::builder() - .column("id", DataTypes::int()) - .column("matrix", matrix_type) - .primary_key(vec!["id"]) - .build() - .expect("Failed to build schema"), - ) - .build() - .expect("Failed to build table descriptor"); - - create_table(&admin, &table_path, &table_descriptor).await; - - let table = connection - .get_table(&table_path) - .await - .expect("Failed to get table"); - - let upsert = table.new_upsert().expect("Failed to create upsert"); - let upsert_writer = upsert.create_writer().expect("Failed to create writer"); - - // Row: id=1, matrix=[[1, 2], [3, 4]] - let mut row = GenericRow::new(2); - row.set_field(0, 1_i32); - let inner1 = make_int_array(&[Some(1), Some(2)]); - let inner2 = make_int_array(&[Some(3), Some(4)]); - let outer = { - let mut w = FlussArrayWriter::new(2, &inner_array_type); - w.write_array(0, &inner1); - w.write_array(1, &inner2); - w.complete().expect("outer") - }; - row.set_field(1, outer); - - upsert_writer - .upsert(&row) - .expect("upsert") - .await - .expect("ack"); - - // Lookup and verify nested structure - let mut lookuper = table - .new_lookup() - .expect("Failed to create lookup") - .create_lookuper() - .expect("Failed to create lookuper"); - - let result = lookuper - .lookup(&make_key_with_field_count(1, 2)) - .await - .expect("lookup"); - let r = result - .get_single_row() - .expect("get row") - .expect("row should exist"); - - assert_eq!(r.get_int(0).unwrap(), 1); - let matrix: FlussArray = r.get_array(1).unwrap(); - assert_eq!(matrix.size(), 2); - - let row0 = matrix.get_array(0).unwrap(); - assert_eq!(row0.size(), 2); - assert_eq!(row0.get_int(0).unwrap(), 1); - assert_eq!(row0.get_int(1).unwrap(), 2); - - let row1 = matrix.get_array(1).unwrap(); - assert_eq!(row1.size(), 2); - assert_eq!(row1.get_int(0).unwrap(), 3); - assert_eq!(row1.get_int(1).unwrap(), 4); + let matrix_r3 = r3.get_array(3).unwrap(); + assert_eq!(matrix_r3.size(), 3); + let mr3_0 = matrix_r3.get_array(0).unwrap(); + assert_eq!(mr3_0.size(), 1); + assert_eq!(mr3_0.get_int(0).unwrap(), 5); + assert!(matrix_r3.is_null_at(1)); + let mr3_2 = matrix_r3.get_array(2).unwrap(); + assert_eq!(mr3_2.size(), 0); admin .drop_table(&table_path, false) diff --git a/crates/fluss/tests/integration/log_table.rs b/crates/fluss/tests/integration/log_table.rs index 97b27b10..1a6b514b 100644 --- a/crates/fluss/tests/integration/log_table.rs +++ b/crates/fluss/tests/integration/log_table.rs @@ -1387,17 +1387,21 @@ mod table_test { #[tokio::test] async fn append_and_scan_with_array() { - use fluss::row::{Datum, GenericRow}; + use fluss::row::binary_array::FlussArrayWriter; + use fluss::row::{Datum, FlussArray, GenericRow}; let cluster = get_shared_cluster(); let connection = cluster.get_fluss_connection().await; let admin = connection.get_admin().expect("Failed to get admin"); - let table_path = TablePath::new("fluss", "test_log_arrays_basic"); + let table_path = TablePath::new("fluss", "test_log_arrays"); + let inner_array_type = DataTypes::array(DataTypes::int()); + let schema = Schema::builder() .column("id", DataTypes::int()) .column("tags", DataTypes::array(DataTypes::string())) .column("scores", DataTypes::array(DataTypes::int())) + .column("matrix", DataTypes::array(inner_array_type.clone())) .build() .expect("Failed to build schema"); @@ -1419,30 +1423,39 @@ mod table_test { .create_writer() .expect("Failed to create writer"); - // Row 1: id=1, tags=["hello", "world"], scores=[10, 20, 30] - let mut row1 = GenericRow::new(3); + // Row 1: id=1, tags=["hello", "world"], scores=[10, 20, 30], matrix=[[1,2],[3,4]] + let mut row1 = GenericRow::new(4); row1.set_field(0, 1_i32); + row1.set_field(1, make_string_array(&[Some("hello"), Some("world")])); + row1.set_field(2, make_int_array(&[Some(10), Some(20), Some(30)])); + let m1 = { + let mut w = FlussArrayWriter::new(2, &inner_array_type); + w.write_array(0, &make_int_array(&[Some(1), Some(2)])); + w.write_array(1, &make_int_array(&[Some(3), Some(4)])); + w.complete().expect("matrix1") + }; + row1.set_field(3, m1); - let tags1 = make_string_array(&[Some("hello"), Some("world")]); - let scores1 = make_int_array(&[Some(10), Some(20), Some(30)]); - row1.set_field(1, tags1); - row1.set_field(2, scores1); - - // Row 2: id=2, tags=[null], scores=[] - let mut row2 = GenericRow::new(3); + // Row 2: id=2, tags=[null], scores=[], matrix=[[5], null, []] + let mut row2 = GenericRow::new(4); row2.set_field(0, 2_i32); + row2.set_field(1, make_string_array(&[None])); + row2.set_field(2, make_int_array(&[])); + let m2 = { + let mut w = FlussArrayWriter::new(3, &inner_array_type); + w.write_array(0, &make_int_array(&[Some(5)])); + w.set_null_at(1); + w.write_array(2, &make_int_array(&[])); + w.complete().expect("matrix2") + }; + row2.set_field(3, m2); - let tags2 = make_string_array(&[None]); - let scores2 = make_int_array(&[]); - row2.set_field(1, tags2); - row2.set_field(2, scores2); - - // Row 3: id=3, tags=null, scores=[42] - let mut row3 = GenericRow::new(3); + // Row 3: id=3, tags=null, scores=[42], matrix=null + let mut row3 = GenericRow::new(4); row3.set_field(0, 3_i32); row3.set_field(1, Datum::Null); - let scores3 = make_int_array(&[Some(42)]); - row3.set_field(2, scores3); + row3.set_field(2, make_int_array(&[Some(42)])); + row3.set_field(3, Datum::Null); append_writer.append(&row1).expect("append row1"); append_writer.append(&row2).expect("append row2"); @@ -1452,6 +1465,7 @@ mod table_test { let records = scan_table(&table, |scan| scan).await; assert_eq!(records.len(), 3, "expected three log records"); + // Verify row 1: populated flat arrays + nested array let r0 = records[0].row(); assert_eq!(r0.get_int(0).unwrap(), 1); let tags_r0 = r0.get_array(1).unwrap(); @@ -1463,7 +1477,18 @@ mod table_test { assert_eq!(scores_r0.get_int(0).unwrap(), 10); assert_eq!(scores_r0.get_int(1).unwrap(), 20); assert_eq!(scores_r0.get_int(2).unwrap(), 30); - + let matrix_r0: FlussArray = r0.get_array(3).unwrap(); + assert_eq!(matrix_r0.size(), 2); + let mr0_0 = matrix_r0.get_array(0).unwrap(); + assert_eq!(mr0_0.size(), 2); + assert_eq!(mr0_0.get_int(0).unwrap(), 1); + assert_eq!(mr0_0.get_int(1).unwrap(), 2); + let mr0_1 = matrix_r0.get_array(1).unwrap(); + assert_eq!(mr0_1.size(), 2); + assert_eq!(mr0_1.get_int(0).unwrap(), 3); + assert_eq!(mr0_1.get_int(1).unwrap(), 4); + + // Verify row 2: null element in array, empty array, nested with mixed inner let r1 = records[1].row(); assert_eq!(r1.get_int(0).unwrap(), 2); let tags_r1 = r1.get_array(1).unwrap(); @@ -1471,13 +1496,23 @@ mod table_test { assert!(tags_r1.is_null_at(0)); let scores_r1 = r1.get_array(2).unwrap(); assert_eq!(scores_r1.size(), 0); - + let matrix_r1 = r1.get_array(3).unwrap(); + assert_eq!(matrix_r1.size(), 3); + let mr1_0 = matrix_r1.get_array(0).unwrap(); + assert_eq!(mr1_0.size(), 1); + assert_eq!(mr1_0.get_int(0).unwrap(), 5); + assert!(matrix_r1.is_null_at(1)); + let mr1_2 = matrix_r1.get_array(2).unwrap(); + assert_eq!(mr1_2.size(), 0); + + // Verify row 3: null flat column, null nested column let r2 = records[2].row(); assert_eq!(r2.get_int(0).unwrap(), 3); assert!(r2.is_null_at(1).unwrap()); let scores_r2 = r2.get_array(2).unwrap(); assert_eq!(scores_r2.size(), 1); assert_eq!(scores_r2.get_int(0).unwrap(), 42); + assert!(r2.is_null_at(3).unwrap()); admin .drop_table(&table_path, false) @@ -1486,125 +1521,52 @@ mod table_test { } #[tokio::test] - async fn append_and_scan_with_nested_array() { + async fn append_and_scan_with_array_rich_types() { use fluss::row::binary_array::FlussArrayWriter; - use fluss::row::{FlussArray, GenericRow}; - - let cluster = get_shared_cluster(); - let connection = cluster.get_fluss_connection().await; - let admin = connection.get_admin().expect("Failed to get admin"); - - let table_path = TablePath::new("fluss", "test_log_arrays_nested"); - let inner_array_type = DataTypes::array(DataTypes::int()); - let matrix_type = DataTypes::array(inner_array_type.clone()); - - let schema = Schema::builder() - .column("id", DataTypes::int()) - .column("matrix", matrix_type.clone()) - .build() - .expect("Failed to build schema"); - - let table_descriptor = TableDescriptor::builder() - .schema(schema) - .build() - .expect("Failed to build table descriptor"); - - create_table(&admin, &table_path, &table_descriptor).await; + use fluss::row::{Date, Decimal, FlussArray, GenericRow, Time, TimestampNtz}; - let table = connection - .get_table(&table_path) - .await - .expect("Failed to get table"); - - let append_writer = table - .new_append() - .expect("Failed to create append") - .create_writer() - .expect("Failed to create writer"); - - fn build_inner_ints(vals: &[i32]) -> FlussArray { - let mut w = FlussArrayWriter::new(vals.len(), &DataTypes::int()); - for (i, v) in vals.iter().enumerate() { - w.write_int(i, *v); + fn assert_f32_special(actual: f32, expected: f32) { + if expected.is_nan() { + assert!(actual.is_nan(), "expected NaN"); + } else if expected.is_infinite() { + assert!(actual.is_infinite()); + assert_eq!(actual.signum(), expected.signum()); + } else { + assert!((actual - expected).abs() < f32::EPSILON); } - w.complete().expect("inner array") } - // Row 1: matrix = [[1,2],[3,4]] - let mut row1 = GenericRow::new(2); - row1.set_field(0, 1_i32); - let m1 = { - let mut w = FlussArrayWriter::new(2, &inner_array_type); - w.write_array(0, &build_inner_ints(&[1, 2])); - w.write_array(1, &build_inner_ints(&[3, 4])); - w.complete().expect("matrix1") - }; - row1.set_field(1, m1); - - // Row 2: matrix = [[5], null, []] - let mut row2 = GenericRow::new(2); - row2.set_field(0, 2_i32); - let m2 = { - let mut w = FlussArrayWriter::new(3, &inner_array_type); - w.write_array(0, &build_inner_ints(&[5])); - w.set_null_at(1); - let empty: FlussArray = FlussArrayWriter::new(0, &DataTypes::int()) - .complete() - .unwrap(); - w.write_array(2, &empty); - w.complete().expect("matrix2") - }; - row2.set_field(1, m2); - - append_writer.append(&row1).expect("append row1"); - append_writer.append(&row2).expect("append row2"); - append_writer.flush().await.expect("Failed to flush"); - - let records = scan_table(&table, |scan| scan).await; - assert_eq!(records.len(), 2); - - let r0 = records[0].row(); - assert_eq!(r0.get_int(0).unwrap(), 1); - let mat0 = r0.get_array(1).unwrap(); - assert_eq!(mat0.size(), 2); - let a00 = mat0.get_array(0).unwrap(); - assert_eq!(a00.size(), 2); - assert_eq!(a00.get_int(0).unwrap(), 1); - assert_eq!(a00.get_int(1).unwrap(), 2); - let a01 = mat0.get_array(1).unwrap(); - assert_eq!(a01.size(), 2); - assert_eq!(a01.get_int(0).unwrap(), 3); - assert_eq!(a01.get_int(1).unwrap(), 4); - - let r1 = records[1].row(); - assert_eq!(r1.get_int(0).unwrap(), 2); - let mat1 = r1.get_array(1).unwrap(); - assert_eq!(mat1.size(), 3); - let b0 = mat1.get_array(0).unwrap(); - assert_eq!(b0.size(), 1); - assert_eq!(b0.get_int(0).unwrap(), 5); - assert!(mat1.is_null_at(1)); - let b2 = mat1.get_array(2).unwrap(); - assert_eq!(b2.size(), 0); - - admin - .drop_table(&table_path, false) - .await - .expect("Failed to drop table"); - } - - #[tokio::test] - async fn append_and_scan_with_array_rich_types() { - use fluss::row::binary_array::FlussArrayWriter; - use fluss::row::{Date, Decimal, GenericRow, Time, TimestampNtz}; + fn assert_f64_special(actual: f64, expected: f64) { + if expected.is_nan() { + assert!(actual.is_nan(), "expected NaN"); + } else if expected.is_infinite() { + assert!(actual.is_infinite()); + assert_eq!(actual.signum(), expected.signum()); + } else { + assert!((actual - expected).abs() < f64::EPSILON); + } + } let cluster = get_shared_cluster(); let connection = cluster.get_fluss_connection().await; let admin = connection.get_admin().expect("Failed to get admin"); let table_path = TablePath::new("fluss", "test_log_arrays_rich_types"); - let ts_val = TimestampNtz::from_millis_nanos(1769163227123, 456000).unwrap(); - let dec_val = Decimal::from_unscaled_long(12345, 10, 2).unwrap(); + + // Compact types: DECIMAL(10,2) precision<=18, TIMESTAMP(6) precision<=3 for millis + let dec_compact = Decimal::from_unscaled_long(12345, 10, 2).unwrap(); + let ts_compact = TimestampNtz::from_millis_nanos(1769163227123, 456000).unwrap(); + + // Non-compact types: DECIMAL(22,5) precision>18, TIMESTAMP(9) precision>3 + let dec_big = Decimal::from_unscaled_bytes(&[66, 237, 18, 59, 11, 216, 31, 4, 244], 22, 5) + .expect("big decimal"); + let ts_nano = TimestampNtz::from_millis_nanos(1769163227123, 999_999).unwrap(); + + let d = Date::new(20476); + let t = Time::new(36827123); + let elem_bytes = &[0_u8, 1, 2, 255]; + let fixed_a: Vec = vec![0xDE, 0xAD, 0xBE, 0xEF]; + let fixed_b: Vec = vec![0x01, 0x02, 0x03, 0x04]; let schema = Schema::builder() .column("id", DataTypes::int()) @@ -1615,10 +1577,25 @@ mod table_test { DataTypes::array(DataTypes::time_with_precision(3)), ) .column( - "arr_ts", + "arr_ts_compact", DataTypes::array(DataTypes::timestamp_with_precision(6)), ) - .column("arr_decimal", DataTypes::array(DataTypes::decimal(10, 2))) + .column( + "arr_ts_nano", + DataTypes::array(DataTypes::timestamp_with_precision(9)), + ) + .column( + "arr_decimal_compact", + DataTypes::array(DataTypes::decimal(10, 2)), + ) + .column( + "arr_decimal_big", + DataTypes::array(DataTypes::decimal(22, 5)), + ) + .column("arr_long_str", DataTypes::array(DataTypes::string())) + .column("arr_float", DataTypes::array(DataTypes::float())) + .column("arr_double", DataTypes::array(DataTypes::double())) + .column("arr_binary", DataTypes::array(DataTypes::binary(4))) .build() .expect("Failed to build schema"); @@ -1640,188 +1617,80 @@ mod table_test { .create_writer() .expect("Failed to create writer"); - let mut row = GenericRow::new(6); + let mut row = GenericRow::new(12); row.set_field(0, 1_i32); - let elem_bytes = &[0_u8, 1, 2, 255]; - let d = Date::new(20476); - let t = Time::new(36827123); - + // col 1: arr_bytes — binary with null element let arr_bytes = { let mut w = FlussArrayWriter::new(2, &DataTypes::bytes()); w.write_binary_bytes(0, elem_bytes); w.set_null_at(1); w.complete().expect("arr_bytes") }; + row.set_field(1, arr_bytes); + + // col 2: arr_date let arr_date = { let mut w = FlussArrayWriter::new(2, &DataTypes::date()); w.write_date(0, d); w.set_null_at(1); w.complete().expect("arr_date") }; + row.set_field(2, arr_date); + + // col 3: arr_time let arr_time = { let mut w = FlussArrayWriter::new(2, &DataTypes::time_with_precision(3)); w.write_time(0, t); w.set_null_at(1); w.complete().expect("arr_time") }; - let arr_ts = { + row.set_field(3, arr_time); + + // col 4: arr_ts_compact — compact timestamp (precision 6, millis+nanos) + let arr_ts_compact = { let mut w = FlussArrayWriter::new(2, &DataTypes::timestamp_with_precision(6)); - w.write_timestamp_ntz(0, &ts_val, 6); + w.write_timestamp_ntz(0, &ts_compact, 6); w.set_null_at(1); - w.complete().expect("arr_ts") + w.complete().expect("arr_ts_compact") }; - let arr_decimal = { + row.set_field(4, arr_ts_compact); + + // col 5: arr_ts_nano — non-compact timestamp (precision 9) + let arr_ts_nano = { + let mut w = FlussArrayWriter::new(1, &DataTypes::timestamp_with_precision(9)); + w.write_timestamp_ntz(0, &ts_nano, 9); + w.complete().expect("arr_ts_nano") + }; + row.set_field(5, arr_ts_nano); + + // col 6: arr_decimal_compact — compact decimal (precision 10) + let arr_decimal_compact = { let mut w = FlussArrayWriter::new(2, &DataTypes::decimal(10, 2)); - w.write_decimal(0, &dec_val, 10); + w.write_decimal(0, &dec_compact, 10); w.set_null_at(1); - w.complete().expect("arr_decimal") + w.complete().expect("arr_decimal_compact") }; + row.set_field(6, arr_decimal_compact); - row.set_field(1, arr_bytes); - row.set_field(2, arr_date); - row.set_field(3, arr_time); - row.set_field(4, arr_ts); - row.set_field(5, arr_decimal); - - append_writer.append(&row).expect("append"); - append_writer.flush().await.expect("Failed to flush"); - - let records = scan_table(&table, |scan| scan).await; - assert_eq!(records.len(), 1); - let r = records[0].row(); - - let ab = r.get_array(1).unwrap(); - assert_eq!(ab.size(), 2); - assert_eq!(ab.get_binary(0).unwrap(), elem_bytes); - assert!(ab.is_null_at(1)); - - let ad = r.get_array(2).unwrap(); - assert_eq!(ad.size(), 2); - assert_eq!(ad.get_date(0).unwrap().get_inner(), d.get_inner()); - assert!(ad.is_null_at(1)); - - let at = r.get_array(3).unwrap(); - assert_eq!(at.size(), 2); - assert_eq!(at.get_time(0).unwrap().get_inner(), t.get_inner()); - assert!(at.is_null_at(1)); - - let ats = r.get_array(4).unwrap(); - assert_eq!(ats.size(), 2); - let read_ts = ats.get_timestamp_ntz(0, 6).unwrap(); - assert_eq!(read_ts.get_millisecond(), ts_val.get_millisecond()); - assert_eq!( - read_ts.get_nano_of_millisecond(), - ts_val.get_nano_of_millisecond() - ); - assert!(ats.is_null_at(1)); - - let adc = r.get_array(5).unwrap(); - assert_eq!(adc.size(), 2); - assert_eq!(adc.get_decimal(0, 10, 2).unwrap(), dec_val); - assert!(adc.is_null_at(1)); - - admin - .drop_table(&table_path, false) - .await - .expect("Failed to drop table"); - } - - #[tokio::test] - async fn append_and_scan_with_array_encoding_edge_cases() { - use fluss::row::binary_array::FlussArrayWriter; - use fluss::row::{Decimal, FlussArray, GenericRow, TimestampNtz}; - - fn assert_f32_special(actual: f32, expected: f32) { - if expected.is_nan() { - assert!(actual.is_nan(), "expected NaN"); - } else if expected.is_infinite() { - assert!(actual.is_infinite()); - assert_eq!(actual.signum(), expected.signum()); - } else { - assert!((actual - expected).abs() < f32::EPSILON); - } - } - - fn assert_f64_special(actual: f64, expected: f64) { - if expected.is_nan() { - assert!(actual.is_nan(), "expected NaN"); - } else if expected.is_infinite() { - assert!(actual.is_infinite()); - assert_eq!(actual.signum(), expected.signum()); - } else { - assert!((actual - expected).abs() < f64::EPSILON); - } - } - - let cluster = get_shared_cluster(); - let connection = cluster.get_fluss_connection().await; - let admin = connection.get_admin().expect("Failed to get admin"); - - let table_path = TablePath::new("fluss", "test_log_arrays_edge_cases"); - - // DECIMAL(22,5) non-compact: unscaled 1234567890123456709876 (22 digits) - let edge_decimal = - Decimal::from_unscaled_bytes(&[66, 237, 18, 59, 11, 216, 31, 4, 244], 22, 5) - .expect("edge decimal"); - - let schema = Schema::builder() - .column("id", DataTypes::int()) - .column("arr_long_str", DataTypes::array(DataTypes::string())) - .column( - "arr_big_decimal", - DataTypes::array(DataTypes::decimal(22, 5)), - ) - .column( - "arr_ts_nano", - DataTypes::array(DataTypes::timestamp_with_precision(9)), - ) - .column("arr_float", DataTypes::array(DataTypes::float())) - .column("arr_double", DataTypes::array(DataTypes::double())) - .column("arr_binary", DataTypes::array(DataTypes::binary(4))) - .build() - .expect("Failed to build schema"); - - let table_descriptor = TableDescriptor::builder() - .schema(schema) - .build() - .expect("Failed to build table descriptor"); - - create_table(&admin, &table_path, &table_descriptor).await; - - let table = connection - .get_table(&table_path) - .await - .expect("Failed to get table"); - - let append_writer = table - .new_append() - .expect("Failed to create append") - .create_writer() - .expect("Failed to create writer"); - - let ts_edge = TimestampNtz::from_millis_nanos(1769163227123, 999_999).unwrap(); + // col 7: arr_decimal_big — non-compact decimal (precision 22) + let arr_decimal_big = { + let mut w = FlussArrayWriter::new(1, &DataTypes::decimal(22, 5)); + w.write_decimal(0, &dec_big, 22); + w.complete().expect("arr_decimal_big") + }; + row.set_field(7, arr_decimal_big); + // col 8: arr_long_str — heap-backed strings (>= 8 bytes) let arr_long_str = { let mut w = FlussArrayWriter::new(2, &DataTypes::string()); - // >= 8 bytes to exercise heap-backed string slots w.write_string(0, "abcdefghi"); w.write_string(1, "longstring_here"); w.complete().expect("arr_long_str") }; + row.set_field(8, arr_long_str); - let arr_big_decimal = { - let mut w = FlussArrayWriter::new(1, &DataTypes::decimal(22, 5)); - w.write_decimal(0, &edge_decimal, 22); - w.complete().expect("arr_big_decimal") - }; - - let arr_ts_nano = { - let mut w = FlussArrayWriter::new(1, &DataTypes::timestamp_with_precision(9)); - w.write_timestamp_ntz(0, &ts_edge, 9); - w.complete().expect("arr_ts_nano") - }; - + // col 9: arr_float — IEEE 754 specials let arr_float = { let mut w = FlussArrayWriter::new(3, &DataTypes::float()); w.write_float(0, f32::NAN); @@ -1829,7 +1698,9 @@ mod table_test { w.write_float(2, f32::NEG_INFINITY); w.complete().expect("arr_float") }; + row.set_field(9, arr_float); + // col 10: arr_double — IEEE 754 specials let arr_double = { let mut w = FlussArrayWriter::new(3, &DataTypes::double()); w.write_double(0, f64::NAN); @@ -1837,24 +1708,16 @@ mod table_test { w.write_double(2, f64::NEG_INFINITY); w.complete().expect("arr_double") }; + row.set_field(10, arr_double); - let fixed_a: Vec = vec![0xDE, 0xAD, 0xBE, 0xEF]; - let fixed_b: Vec = vec![0x01, 0x02, 0x03, 0x04]; + // col 11: arr_binary — fixed-size binary(4) let arr_binary = { let mut w = FlussArrayWriter::new(2, &DataTypes::binary(4)); w.write_binary_bytes(0, &fixed_a); w.write_binary_bytes(1, &fixed_b); w.complete().expect("arr_binary") }; - - let mut row = GenericRow::new(7); - row.set_field(0, 1_i32); - row.set_field(1, arr_long_str); - row.set_field(2, arr_big_decimal); - row.set_field(3, arr_ts_nano); - row.set_field(4, arr_float); - row.set_field(5, arr_double); - row.set_field(6, arr_binary); + row.set_field(11, arr_binary); append_writer.append(&row).expect("append"); append_writer.flush().await.expect("Failed to flush"); @@ -1863,37 +1726,81 @@ mod table_test { assert_eq!(records.len(), 1); let r = records[0].row(); - let s = r.get_array(1).unwrap(); - assert_eq!(s.size(), 2); - assert_eq!(s.get_string(0).unwrap(), "abcdefghi"); - assert_eq!(s.get_string(1).unwrap(), "longstring_here"); + // Verify arr_bytes + let ab = r.get_array(1).unwrap(); + assert_eq!(ab.size(), 2); + assert_eq!(ab.get_binary(0).unwrap(), elem_bytes); + assert!(ab.is_null_at(1)); + + // Verify arr_date + let ad = r.get_array(2).unwrap(); + assert_eq!(ad.size(), 2); + assert_eq!(ad.get_date(0).unwrap().get_inner(), d.get_inner()); + assert!(ad.is_null_at(1)); - let dec_a = r.get_array(2).unwrap(); - assert_eq!(dec_a.size(), 1); - assert_eq!(dec_a.get_decimal(0, 22, 5).unwrap(), edge_decimal); + // Verify arr_time + let at = r.get_array(3).unwrap(); + assert_eq!(at.size(), 2); + assert_eq!(at.get_time(0).unwrap().get_inner(), t.get_inner()); + assert!(at.is_null_at(1)); - let ts_a = r.get_array(3).unwrap(); - assert_eq!(ts_a.size(), 1); - let read_ts = ts_a.get_timestamp_ntz(0, 9).unwrap(); - assert_eq!(read_ts.get_millisecond(), ts_edge.get_millisecond()); + // Verify arr_ts_compact + let ats = r.get_array(4).unwrap(); + assert_eq!(ats.size(), 2); + let read_ts_compact = ats.get_timestamp_ntz(0, 6).unwrap(); + assert_eq!( + read_ts_compact.get_millisecond(), + ts_compact.get_millisecond() + ); assert_eq!( - read_ts.get_nano_of_millisecond(), - ts_edge.get_nano_of_millisecond() + read_ts_compact.get_nano_of_millisecond(), + ts_compact.get_nano_of_millisecond() ); + assert!(ats.is_null_at(1)); - let f_a = r.get_array(4).unwrap(); - assert_eq!(f_a.size(), 3); - assert_f32_special(f_a.get_float(0).unwrap(), f32::NAN); - assert_f32_special(f_a.get_float(1).unwrap(), f32::INFINITY); - assert_f32_special(f_a.get_float(2).unwrap(), f32::NEG_INFINITY); + // Verify arr_ts_nano + let ats_nano = r.get_array(5).unwrap(); + assert_eq!(ats_nano.size(), 1); + let read_ts_nano = ats_nano.get_timestamp_ntz(0, 9).unwrap(); + assert_eq!(read_ts_nano.get_millisecond(), ts_nano.get_millisecond()); + assert_eq!( + read_ts_nano.get_nano_of_millisecond(), + ts_nano.get_nano_of_millisecond() + ); - let d_a = r.get_array(5).unwrap(); - assert_eq!(d_a.size(), 3); - assert_f64_special(d_a.get_double(0).unwrap(), f64::NAN); - assert_f64_special(d_a.get_double(1).unwrap(), f64::INFINITY); - assert_f64_special(d_a.get_double(2).unwrap(), f64::NEG_INFINITY); + // Verify arr_decimal_compact + let adc = r.get_array(6).unwrap(); + assert_eq!(adc.size(), 2); + assert_eq!(adc.get_decimal(0, 10, 2).unwrap(), dec_compact); + assert!(adc.is_null_at(1)); - let fb: FlussArray = r.get_array(6).unwrap(); + // Verify arr_decimal_big + let adb = r.get_array(7).unwrap(); + assert_eq!(adb.size(), 1); + assert_eq!(adb.get_decimal(0, 22, 5).unwrap(), dec_big); + + // Verify arr_long_str + let als = r.get_array(8).unwrap(); + assert_eq!(als.size(), 2); + assert_eq!(als.get_string(0).unwrap(), "abcdefghi"); + assert_eq!(als.get_string(1).unwrap(), "longstring_here"); + + // Verify arr_float — IEEE 754 specials + let af = r.get_array(9).unwrap(); + assert_eq!(af.size(), 3); + assert_f32_special(af.get_float(0).unwrap(), f32::NAN); + assert_f32_special(af.get_float(1).unwrap(), f32::INFINITY); + assert_f32_special(af.get_float(2).unwrap(), f32::NEG_INFINITY); + + // Verify arr_double — IEEE 754 specials + let adbl = r.get_array(10).unwrap(); + assert_eq!(adbl.size(), 3); + assert_f64_special(adbl.get_double(0).unwrap(), f64::NAN); + assert_f64_special(adbl.get_double(1).unwrap(), f64::INFINITY); + assert_f64_special(adbl.get_double(2).unwrap(), f64::NEG_INFINITY); + + // Verify arr_binary — fixed-size binary(4) + let fb: FlussArray = r.get_array(11).unwrap(); assert_eq!(fb.size(), 2); assert_eq!(fb.get_binary(0).unwrap(), fixed_a.as_slice()); assert_eq!(fb.get_binary(1).unwrap(), fixed_b.as_slice()); From 8c535123dfe9759c29fcc0b063fc7b597cadc183 Mon Sep 17 00:00:00 2001 From: charlesdong1991 Date: Wed, 6 May 2026 21:15:30 +0200 Subject: [PATCH 4/4] Conslidate tests --- bindings/python/test/test_kv_table.py | 153 ++++++++------------------ 1 file changed, 46 insertions(+), 107 deletions(-) diff --git a/bindings/python/test/test_kv_table.py b/bindings/python/test/test_kv_table.py index 7fd85228..38a7169e 100644 --- a/bindings/python/test/test_kv_table.py +++ b/bindings/python/test/test_kv_table.py @@ -347,8 +347,8 @@ async def test_partitioned_table_upsert_and_lookup(connection, admin): async def test_upsert_and_lookup_with_array(connection, admin): - """Test upsert and lookup with array columns in KV tables.""" - table_path = fluss.TablePath("fluss", "py_test_kv_arrays_basic") + """Test upsert and lookup with flat, nested, and null-pattern arrays in KV tables.""" + table_path = fluss.TablePath("fluss", "py_test_kv_arrays") await admin.drop_table(table_path, ignore_if_not_exists=True) schema = fluss.Schema( @@ -357,6 +357,7 @@ async def test_upsert_and_lookup_with_array(connection, admin): pa.field("id", pa.int32()), pa.field("tags", pa.list_(pa.string())), pa.field("scores", pa.list_(pa.int32())), + pa.field("matrix", pa.list_(pa.list_(pa.int32()))), ] ), primary_keys=["id"], @@ -367,16 +368,27 @@ async def test_upsert_and_lookup_with_array(connection, admin): table = await connection.get_table(table_path) upsert_writer = table.new_upsert().create_writer() - # Row 1: standard arrays await _upsert_and_wait( - upsert_writer, {"id": 1, "tags": ["hello", "world"], "scores": [10, 20, 30]} + upsert_writer, + { + "id": 1, + "tags": ["hello", "world"], + "scores": [10, 20, 30], + "matrix": [[1, 2], [3, 4]], + }, + ) + await _upsert_and_wait( + upsert_writer, + {"id": 2, "tags": [None], "scores": [], "matrix": None}, + ) + await _upsert_and_wait( + upsert_writer, + {"id": 3, "tags": None, "scores": [42], "matrix": [[], [5], [6, 7, 8]]}, + ) + await _upsert_and_wait( + upsert_writer, + {"id": 4, "tags": None, "scores": None, "matrix": [[1, None], None, []]}, ) - - # Row 2: null element in array + empty array - await _upsert_and_wait(upsert_writer, {"id": 2, "tags": [None], "scores": []}) - - # Row 3: null array column - await _upsert_and_wait(upsert_writer, {"id": 3, "tags": None, "scores": [42]}) lookuper = table.new_lookup().create_lookuper() @@ -384,71 +396,31 @@ async def test_upsert_and_lookup_with_array(connection, admin): assert result1 is not None assert result1["tags"] == ["hello", "world"] assert result1["scores"] == [10, 20, 30] + assert result1["matrix"] == [[1, 2], [3, 4]] result2 = await lookuper.lookup({"id": 2}) assert result2 is not None assert result2["tags"] == [None] assert result2["scores"] == [] + assert result2["matrix"] is None result3 = await lookuper.lookup({"id": 3}) assert result3 is not None assert result3["tags"] is None assert result3["scores"] == [42] - - await admin.drop_table(table_path, ignore_if_not_exists=False) - - -async def test_upsert_and_lookup_with_nested_array(connection, admin): - """Test upsert and lookup with nested array (ARRAY>) in KV tables.""" - table_path = fluss.TablePath("fluss", "py_test_kv_arrays_nested") - await admin.drop_table(table_path, ignore_if_not_exists=True) - - schema = fluss.Schema( - pa.schema( - [ - pa.field("id", pa.int32()), - pa.field("matrix", pa.list_(pa.list_(pa.int32()))), - ] - ), - primary_keys=["id"], - ) - table_descriptor = fluss.TableDescriptor(schema) - await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) - - table = await connection.get_table(table_path) - upsert_writer = table.new_upsert().create_writer() - - await _upsert_and_wait(upsert_writer, {"id": 1, "matrix": [[1, 2], [3, 4]]}) - - await _upsert_and_wait(upsert_writer, {"id": 2, "matrix": [[], [5], [6, 7, 8]]}) - - await _upsert_and_wait(upsert_writer, {"id": 3, "matrix": None}) - - await _upsert_and_wait(upsert_writer, {"id": 4, "matrix": [[1, None], None, []]}) - - lookuper = table.new_lookup().create_lookuper() - - result1 = await lookuper.lookup({"id": 1}) - assert result1 is not None - assert result1["matrix"] == [[1, 2], [3, 4]] - - result2 = await lookuper.lookup({"id": 2}) - assert result2 is not None - assert result2["matrix"] == [[], [5], [6, 7, 8]] - - result3 = await lookuper.lookup({"id": 3}) - assert result3 is not None - assert result3["matrix"] is None + assert result3["matrix"] == [[], [5], [6, 7, 8]] result4 = await lookuper.lookup({"id": 4}) assert result4 is not None + assert result4["tags"] is None + assert result4["scores"] is None assert result4["matrix"] == [[1, None], None, []] await admin.drop_table(table_path, ignore_if_not_exists=False) async def test_upsert_and_lookup_with_array_rich_types(connection, admin): - """Test upsert/lookup for arrays with rich element types in KV tables.""" + """Test upsert/lookup for arrays with rich element types and encoding edge cases.""" table_path = fluss.TablePath("fluss", "py_test_kv_arrays_rich_types") await admin.drop_table(table_path, ignore_if_not_exists=True) @@ -462,62 +434,13 @@ async def test_upsert_and_lookup_with_array_rich_types(connection, admin): pa.field("arr_ts_ntz", pa.list_(pa.timestamp("us"))), pa.field("arr_ts_ltz", pa.list_(pa.timestamp("us", tz="UTC"))), pa.field("arr_decimal", pa.list_(pa.decimal128(10, 2))), - ] - ), - primary_keys=["id"], - ) - table_descriptor = fluss.TableDescriptor(schema) - await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) - - table = await connection.get_table(table_path) - upsert_writer = table.new_upsert().create_writer() - - await _upsert_and_wait( - upsert_writer, - { - "id": 1, - "arr_bytes": [b"\x10\x20\x30", None], - "arr_date": [date(2026, 1, 23), None], - "arr_time": [dt_time(10, 13, 47, 123000), None], - "arr_ts_ntz": [datetime(2026, 1, 23, 10, 13, 47, 123000)], - "arr_ts_ltz": [datetime(2026, 1, 23, 10, 13, 47, 123000, tzinfo=timezone.utc)], - "arr_decimal": [Decimal("123.45"), None], - }, - ) - - lookuper = table.new_lookup().create_lookuper() - result = await lookuper.lookup({"id": 1}) - assert result is not None - - assert result["arr_bytes"] == [b"\x10\x20\x30", None] - assert result["arr_date"] == [date(2026, 1, 23), None] - assert result["arr_time"] == [dt_time(10, 13, 47, 123000), None] - assert result["arr_ts_ntz"] == [datetime(2026, 1, 23, 10, 13, 47, 123000)] - assert result["arr_ts_ltz"] == [ - datetime(2026, 1, 23, 10, 13, 47, 123000, tzinfo=timezone.utc) - ] - assert result["arr_decimal"] == [Decimal("123.45"), None] - - await admin.drop_table(table_path, ignore_if_not_exists=False) - - -async def test_upsert_and_lookup_with_array_encoding_edge_cases(connection, admin): - """Test array encoding edge cases in KV table upsert/lookup.""" - table_path = fluss.TablePath("fluss", "py_test_kv_arrays_edge_cases") - await admin.drop_table(table_path, ignore_if_not_exists=True) - - schema = fluss.Schema( - pa.schema( - [ - pa.field("id", pa.int32()), pa.field("arr_long_str", pa.list_(pa.string())), pa.field("arr_big_decimal", pa.list_(pa.decimal128(22, 5))), pa.field("arr_ts_nano", pa.list_(pa.timestamp("ns"))), pa.field("arr_float", pa.list_(pa.float32())), pa.field("arr_double", pa.list_(pa.float64())), - # TODO(fluss-python): support PyArrow FixedSizeBinary in schema conversion. - # Then switch this back to fixed-size binary: - # pa.field("arr_binary", pa.list_(pa.binary(4))), + # TODO(fluss-python#524): support PyArrow FixedSizeBinary in schema + # conversion. Then switch to pa.binary(4). pa.field("arr_binary", pa.list_(pa.binary())), ] ), @@ -533,6 +456,14 @@ async def test_upsert_and_lookup_with_array_encoding_edge_cases(connection, admi upsert_writer, { "id": 1, + "arr_bytes": [b"\x10\x20\x30", None], + "arr_date": [date(2026, 1, 23), None], + "arr_time": [dt_time(10, 13, 47, 123000), None], + "arr_ts_ntz": [datetime(2026, 1, 23, 10, 13, 47, 123000)], + "arr_ts_ltz": [ + datetime(2026, 1, 23, 10, 13, 47, 123000, tzinfo=timezone.utc) + ], + "arr_decimal": [Decimal("123.45"), None], "arr_long_str": [ "abcdefgh", "this is a much longer string that definitely exceeds inline", @@ -552,6 +483,14 @@ async def test_upsert_and_lookup_with_array_encoding_edge_cases(connection, admi result = await lookuper.lookup({"id": 1}) assert result is not None + assert result["arr_bytes"] == [b"\x10\x20\x30", None] + assert result["arr_date"] == [date(2026, 1, 23), None] + assert result["arr_time"] == [dt_time(10, 13, 47, 123000), None] + assert result["arr_ts_ntz"] == [datetime(2026, 1, 23, 10, 13, 47, 123000)] + assert result["arr_ts_ltz"] == [ + datetime(2026, 1, 23, 10, 13, 47, 123000, tzinfo=timezone.utc) + ] + assert result["arr_decimal"] == [Decimal("123.45"), None] assert result["arr_long_str"] == [ "abcdefgh", "this is a much longer string that definitely exceeds inline",