diff --git a/bindings/python/test/test_kv_table.py b/bindings/python/test/test_kv_table.py index 36aa3e46..38a7169e 100644 --- a/bindings/python/test/test_kv_table.py +++ b/bindings/python/test/test_kv_table.py @@ -30,6 +30,17 @@ import fluss +async def _upsert_and_wait(writer, row): + handle = writer.upsert(row) + await handle.wait() + + +def _assert_float_specials(values): + assert math.isnan(values[0]) + assert math.isinf(values[1]) and values[1] > 0 + assert math.isinf(values[2]) and values[2] < 0 + + async def test_upsert_delete_and_lookup(connection, admin): """Test upsert, lookup, update, delete, and non-existent key lookup.""" table_path = fluss.TablePath("fluss", "py_test_upsert_and_lookup") @@ -335,6 +346,167 @@ async def test_partitioned_table_upsert_and_lookup(connection, admin): await admin.drop_table(table_path, ignore_if_not_exists=False) +async def test_upsert_and_lookup_with_array(connection, admin): + """Test upsert and lookup with flat, nested, and null-pattern arrays in KV tables.""" + table_path = fluss.TablePath("fluss", "py_test_kv_arrays") + await admin.drop_table(table_path, ignore_if_not_exists=True) + + schema = fluss.Schema( + pa.schema( + [ + pa.field("id", pa.int32()), + pa.field("tags", pa.list_(pa.string())), + pa.field("scores", pa.list_(pa.int32())), + pa.field("matrix", pa.list_(pa.list_(pa.int32()))), + ] + ), + primary_keys=["id"], + ) + table_descriptor = fluss.TableDescriptor(schema) + await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) + + table = await connection.get_table(table_path) + upsert_writer = table.new_upsert().create_writer() + + await _upsert_and_wait( + upsert_writer, + { + "id": 1, + "tags": ["hello", "world"], + "scores": [10, 20, 30], + "matrix": [[1, 2], [3, 4]], + }, + ) + await _upsert_and_wait( + upsert_writer, + {"id": 2, "tags": [None], "scores": [], "matrix": None}, + ) + await _upsert_and_wait( + upsert_writer, + {"id": 3, "tags": None, "scores": [42], "matrix": [[], [5], [6, 7, 8]]}, + ) + await _upsert_and_wait( + upsert_writer, + {"id": 4, "tags": None, "scores": None, "matrix": [[1, None], None, []]}, + ) + + lookuper = table.new_lookup().create_lookuper() + + result1 = await lookuper.lookup({"id": 1}) + assert result1 is not None + assert result1["tags"] == ["hello", "world"] + assert result1["scores"] == [10, 20, 30] + assert result1["matrix"] == [[1, 2], [3, 4]] + + result2 = await lookuper.lookup({"id": 2}) + assert result2 is not None + assert result2["tags"] == [None] + assert result2["scores"] == [] + assert result2["matrix"] is None + + result3 = await lookuper.lookup({"id": 3}) + assert result3 is not None + assert result3["tags"] is None + assert result3["scores"] == [42] + assert result3["matrix"] == [[], [5], [6, 7, 8]] + + result4 = await lookuper.lookup({"id": 4}) + assert result4 is not None + assert result4["tags"] is None + assert result4["scores"] is None + assert result4["matrix"] == [[1, None], None, []] + + await admin.drop_table(table_path, ignore_if_not_exists=False) + + +async def test_upsert_and_lookup_with_array_rich_types(connection, admin): + """Test upsert/lookup for arrays with rich element types and encoding edge cases.""" + table_path = fluss.TablePath("fluss", "py_test_kv_arrays_rich_types") + await admin.drop_table(table_path, ignore_if_not_exists=True) + + schema = fluss.Schema( + pa.schema( + [ + pa.field("id", pa.int32()), + pa.field("arr_bytes", pa.list_(pa.binary())), + pa.field("arr_date", pa.list_(pa.date32())), + pa.field("arr_time", pa.list_(pa.time32("ms"))), + pa.field("arr_ts_ntz", pa.list_(pa.timestamp("us"))), + pa.field("arr_ts_ltz", pa.list_(pa.timestamp("us", tz="UTC"))), + pa.field("arr_decimal", pa.list_(pa.decimal128(10, 2))), + pa.field("arr_long_str", pa.list_(pa.string())), + pa.field("arr_big_decimal", pa.list_(pa.decimal128(22, 5))), + pa.field("arr_ts_nano", pa.list_(pa.timestamp("ns"))), + pa.field("arr_float", pa.list_(pa.float32())), + pa.field("arr_double", pa.list_(pa.float64())), + # TODO(fluss-python#524): support PyArrow FixedSizeBinary in schema + # conversion. Then switch to pa.binary(4). + pa.field("arr_binary", pa.list_(pa.binary())), + ] + ), + primary_keys=["id"], + ) + table_descriptor = fluss.TableDescriptor(schema) + await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) + + table = await connection.get_table(table_path) + upsert_writer = table.new_upsert().create_writer() + + await _upsert_and_wait( + upsert_writer, + { + "id": 1, + "arr_bytes": [b"\x10\x20\x30", None], + "arr_date": [date(2026, 1, 23), None], + "arr_time": [dt_time(10, 13, 47, 123000), None], + "arr_ts_ntz": [datetime(2026, 1, 23, 10, 13, 47, 123000)], + "arr_ts_ltz": [ + datetime(2026, 1, 23, 10, 13, 47, 123000, tzinfo=timezone.utc) + ], + "arr_decimal": [Decimal("123.45"), None], + "arr_long_str": [ + "abcdefgh", + "this is a much longer string that definitely exceeds inline", + ], + "arr_big_decimal": [ + Decimal("12345678901234567.12345"), + Decimal("-99999999999999999.99999"), + ], + "arr_ts_nano": [datetime(2026, 1, 23, 10, 13, 47, 123456)], + "arr_float": [float("nan"), float("inf"), float("-inf")], + "arr_double": [float("nan"), float("inf"), float("-inf")], + "arr_binary": [b"\xde\xad\xbe\xef", b"\x00\x01\x02\x03"], + }, + ) + + lookuper = table.new_lookup().create_lookuper() + result = await lookuper.lookup({"id": 1}) + assert result is not None + + assert result["arr_bytes"] == [b"\x10\x20\x30", None] + assert result["arr_date"] == [date(2026, 1, 23), None] + assert result["arr_time"] == [dt_time(10, 13, 47, 123000), None] + assert result["arr_ts_ntz"] == [datetime(2026, 1, 23, 10, 13, 47, 123000)] + assert result["arr_ts_ltz"] == [ + datetime(2026, 1, 23, 10, 13, 47, 123000, tzinfo=timezone.utc) + ] + assert result["arr_decimal"] == [Decimal("123.45"), None] + assert result["arr_long_str"] == [ + "abcdefgh", + "this is a much longer string that definitely exceeds inline", + ] + assert result["arr_big_decimal"] == [ + Decimal("12345678901234567.12345"), + Decimal("-99999999999999999.99999"), + ] + assert result["arr_ts_nano"] == [datetime(2026, 1, 23, 10, 13, 47, 123456)] + _assert_float_specials(result["arr_float"]) + _assert_float_specials(result["arr_double"]) + assert result["arr_binary"] == [b"\xde\xad\xbe\xef", b"\x00\x01\x02\x03"] + + await admin.drop_table(table_path, ignore_if_not_exists=False) + + async def test_all_supported_datatypes(connection, admin): """Test upsert/lookup for all supported data types, including nulls.""" table_path = fluss.TablePath("fluss", "py_test_kv_all_datatypes") @@ -358,6 +530,7 @@ async def test_all_supported_datatypes(connection, admin): pa.field("col_timestamp_ntz", pa.timestamp("us")), pa.field("col_timestamp_ltz", pa.timestamp("us", tz="UTC")), pa.field("col_bytes", pa.binary()), + pa.field("col_array", pa.list_(pa.string())), ] ), primary_keys=["pk_int"], @@ -385,10 +558,10 @@ async def test_all_supported_datatypes(connection, admin): "col_timestamp_ntz": datetime(2026, 1, 23, 10, 13, 47, 123000), "col_timestamp_ltz": datetime(2026, 1, 23, 10, 13, 47, 123000), "col_bytes": b"binary data", + "col_array": ["fluss", "python"], } - handle = upsert_writer.upsert(row_data) - await handle.wait() + await _upsert_and_wait(upsert_writer, row_data) lookuper = table.new_lookup().create_lookuper() result = await lookuper.lookup({"pk_int": 1}) @@ -411,14 +584,14 @@ async def test_all_supported_datatypes(connection, admin): 2026, 1, 23, 10, 13, 47, 123000, tzinfo=timezone.utc ) assert result["col_bytes"] == b"binary data" + assert result["col_array"] == ["fluss", "python"] # Test with null values for all nullable columns null_row = {"pk_int": 2} for col in row_data: if col != "pk_int": null_row[col] = None - handle = upsert_writer.upsert(null_row) - await handle.wait() + await _upsert_and_wait(upsert_writer, null_row) result = await lookuper.lookup({"pk_int": 2}) assert result is not None, "Row with nulls should exist" diff --git a/crates/fluss/tests/integration/kv_table.rs b/crates/fluss/tests/integration/kv_table.rs index 62e206b6..2f70088b 100644 --- a/crates/fluss/tests/integration/kv_table.rs +++ b/crates/fluss/tests/integration/kv_table.rs @@ -18,12 +18,19 @@ #[cfg(test)] mod kv_table_test { - use crate::integration::utils::{create_partitions, create_table, get_shared_cluster}; + use crate::integration::utils::{ + create_partitions, create_table, get_shared_cluster, make_int_array, make_string_array, + }; use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; - use fluss::row::{GenericRow, InternalRow}; + use fluss::row::binary_array::FlussArrayWriter; + use fluss::row::{FlussArray, GenericRow, InternalRow}; fn make_key(id: i32) -> GenericRow<'static> { - let mut row = GenericRow::new(3); + make_key_with_field_count(id, 3) + } + + fn make_key_with_field_count(id: i32, field_count: usize) -> GenericRow<'static> { + let mut row = GenericRow::new(field_count); row.set_field(0, id); row } @@ -606,6 +613,7 @@ mod kv_table_test { // Binary types .column("col_bytes", DataTypes::bytes()) .column("col_binary", DataTypes::binary(20)) + .column("col_array", DataTypes::array(DataTypes::string())) .primary_key(vec!["pk_int"]) .build() .expect("Failed to build schema"), @@ -644,8 +652,10 @@ mod kv_table_test { let col_bytes: &[u8] = b"binary data"; let col_binary: &[u8] = b"fixed binary data!!!"; + let col_array = make_string_array(&[Some("fluss"), Some("rust")]); + // Upsert a row with all datatypes - let mut row = GenericRow::new(17); + let mut row = GenericRow::new(18); row.set_field(0, pk_int); row.set_field(1, col_boolean); row.set_field(2, col_tinyint); @@ -663,6 +673,7 @@ mod kv_table_test { row.set_field(14, col_timestamp_ltz); row.set_field(15, col_bytes); row.set_field(16, col_binary); + row.set_field(17, col_array); upsert_writer .upsert(&row) @@ -677,7 +688,7 @@ mod kv_table_test { .create_lookuper() .expect("Failed to create lookuper"); - let mut key = GenericRow::new(17); + let mut key = GenericRow::new(18); key.set_field(0, pk_int); let result = lookuper.lookup(&key).await.expect("Failed to lookup"); @@ -772,10 +783,14 @@ mod kv_table_test { col_binary, "col_binary mismatch" ); + let arr = found_row.get_array(17).unwrap(); + assert_eq!(arr.size(), 2, "col_array size mismatch"); + assert_eq!(arr.get_string(0).unwrap(), "fluss", "col_array[0] mismatch"); + assert_eq!(arr.get_string(1).unwrap(), "rust", "col_array[1] mismatch"); // Test with null values for nullable columns let pk_int_2 = 2i32; - let mut row_with_nulls = GenericRow::new(17); + let mut row_with_nulls = GenericRow::new(18); row_with_nulls.set_field(0, pk_int_2); row_with_nulls.set_field(1, Datum::Null); // col_boolean row_with_nulls.set_field(2, Datum::Null); // col_tinyint @@ -793,6 +808,7 @@ mod kv_table_test { row_with_nulls.set_field(14, Datum::Null); // col_timestamp_ltz row_with_nulls.set_field(15, Datum::Null); // col_bytes row_with_nulls.set_field(16, Datum::Null); // col_binary + row_with_nulls.set_field(17, Datum::Null); // col_array upsert_writer .upsert(&row_with_nulls) @@ -801,7 +817,7 @@ mod kv_table_test { .expect("Failed to wait for upsert acknowledgment"); // Lookup row with nulls - let mut key2 = GenericRow::new(17); + let mut key2 = GenericRow::new(18); key2.set_field(0, pk_int_2); let result = lookuper.lookup(&key2).await.expect("Failed to lookup"); @@ -880,6 +896,180 @@ mod kv_table_test { found_row_nulls.is_null_at(16).unwrap(), "col_binary should be null" ); + assert!( + found_row_nulls.is_null_at(17).unwrap(), + "col_array should be null" + ); + + admin + .drop_table(&table_path, false) + .await + .expect("Failed to drop table"); + } + + #[tokio::test] + async fn upsert_and_lookup_with_array() { + use fluss::row::Datum; + + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("Failed to get admin"); + + let table_path = TablePath::new("fluss", "test_kv_arrays"); + let inner_array_type = DataTypes::array(DataTypes::int()); + + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("tags", DataTypes::array(DataTypes::string())) + .column("scores", DataTypes::array(DataTypes::int())) + .column("matrix", DataTypes::array(inner_array_type.clone())) + .primary_key(vec!["id"]) + .build() + .expect("Failed to build schema"), + ) + .build() + .expect("Failed to build table descriptor"); + + create_table(&admin, &table_path, &table_descriptor).await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + + let upsert = table.new_upsert().expect("Failed to create upsert"); + let upsert_writer = upsert.create_writer().expect("Failed to create writer"); + + // Row 1: id=1, tags=["hello", "world"], scores=[10, 20, 30], matrix=[[1,2],[3,4]] + let mut row1 = GenericRow::new(4); + row1.set_field(0, 1_i32); + row1.set_field(1, make_string_array(&[Some("hello"), Some("world")])); + row1.set_field(2, make_int_array(&[Some(10), Some(20), Some(30)])); + let m1 = { + let mut w = FlussArrayWriter::new(2, &inner_array_type); + w.write_array(0, &make_int_array(&[Some(1), Some(2)])); + w.write_array(1, &make_int_array(&[Some(3), Some(4)])); + w.complete().expect("matrix1") + }; + row1.set_field(3, m1); + + upsert_writer + .upsert(&row1) + .expect("upsert row1") + .await + .expect("ack row1"); + + // Row 2: id=2, tags=[null element], scores=[] (empty), matrix=null + let mut row2 = GenericRow::new(4); + row2.set_field(0, 2_i32); + row2.set_field(1, make_string_array(&[None])); + row2.set_field(2, make_int_array(&[])); + row2.set_field(3, Datum::Null); + + upsert_writer + .upsert(&row2) + .expect("upsert row2") + .await + .expect("ack row2"); + + // Row 3: id=3, tags=null, scores=[42], matrix=[[5], null, []] + let mut row3 = GenericRow::new(4); + row3.set_field(0, 3_i32); + row3.set_field(1, Datum::Null); + row3.set_field(2, make_int_array(&[Some(42)])); + let m3 = { + let mut w = FlussArrayWriter::new(3, &inner_array_type); + w.write_array(0, &make_int_array(&[Some(5)])); + w.set_null_at(1); + w.write_array(2, &make_int_array(&[])); + w.complete().expect("matrix3") + }; + row3.set_field(3, m3); + + upsert_writer + .upsert(&row3) + .expect("upsert row3") + .await + .expect("ack row3"); + + // Lookup and verify + let mut lookuper = table + .new_lookup() + .expect("Failed to create lookup") + .create_lookuper() + .expect("Failed to create lookuper"); + + // Verify row 1: populated flat arrays + nested array + let result1 = lookuper + .lookup(&make_key_with_field_count(1, 4)) + .await + .expect("lookup row1"); + let r1 = result1 + .get_single_row() + .expect("get row1") + .expect("row1 should exist"); + assert_eq!(r1.get_int(0).unwrap(), 1); + let tags_r1 = r1.get_array(1).unwrap(); + assert_eq!(tags_r1.size(), 2); + assert_eq!(tags_r1.get_string(0).unwrap(), "hello"); + assert_eq!(tags_r1.get_string(1).unwrap(), "world"); + let scores_r1 = r1.get_array(2).unwrap(); + assert_eq!(scores_r1.size(), 3); + assert_eq!(scores_r1.get_int(0).unwrap(), 10); + assert_eq!(scores_r1.get_int(1).unwrap(), 20); + assert_eq!(scores_r1.get_int(2).unwrap(), 30); + let matrix_r1: FlussArray = r1.get_array(3).unwrap(); + assert_eq!(matrix_r1.size(), 2); + let mr1_0 = matrix_r1.get_array(0).unwrap(); + assert_eq!(mr1_0.size(), 2); + assert_eq!(mr1_0.get_int(0).unwrap(), 1); + assert_eq!(mr1_0.get_int(1).unwrap(), 2); + let mr1_1 = matrix_r1.get_array(1).unwrap(); + assert_eq!(mr1_1.size(), 2); + assert_eq!(mr1_1.get_int(0).unwrap(), 3); + assert_eq!(mr1_1.get_int(1).unwrap(), 4); + + // Verify row 2: null element in array, empty array, null nested column + let result2 = lookuper + .lookup(&make_key_with_field_count(2, 4)) + .await + .expect("lookup row2"); + let r2 = result2 + .get_single_row() + .expect("get row2") + .expect("row2 should exist"); + assert_eq!(r2.get_int(0).unwrap(), 2); + let tags_r2 = r2.get_array(1).unwrap(); + assert_eq!(tags_r2.size(), 1); + assert!(tags_r2.is_null_at(0)); + let scores_r2 = r2.get_array(2).unwrap(); + assert_eq!(scores_r2.size(), 0); + assert!(r2.is_null_at(3).unwrap()); + + // Verify row 3: null flat column, nested array with mixed inner (value, null, empty) + let result3 = lookuper + .lookup(&make_key_with_field_count(3, 4)) + .await + .expect("lookup row3"); + let r3 = result3 + .get_single_row() + .expect("get row3") + .expect("row3 should exist"); + assert_eq!(r3.get_int(0).unwrap(), 3); + assert!(r3.is_null_at(1).unwrap()); + let scores_r3 = r3.get_array(2).unwrap(); + assert_eq!(scores_r3.size(), 1); + assert_eq!(scores_r3.get_int(0).unwrap(), 42); + let matrix_r3 = r3.get_array(3).unwrap(); + assert_eq!(matrix_r3.size(), 3); + let mr3_0 = matrix_r3.get_array(0).unwrap(); + assert_eq!(mr3_0.size(), 1); + assert_eq!(mr3_0.get_int(0).unwrap(), 5); + assert!(matrix_r3.is_null_at(1)); + let mr3_2 = matrix_r3.get_array(2).unwrap(); + assert_eq!(mr3_2.size(), 0); admin .drop_table(&table_path, false) diff --git a/crates/fluss/tests/integration/log_table.rs b/crates/fluss/tests/integration/log_table.rs index d10834e8..1a6b514b 100644 --- a/crates/fluss/tests/integration/log_table.rs +++ b/crates/fluss/tests/integration/log_table.rs @@ -18,7 +18,9 @@ #[cfg(test)] mod table_test { - use crate::integration::utils::{create_partitions, create_table, get_shared_cluster}; + use crate::integration::utils::{ + create_partitions, create_table, get_shared_cluster, make_int_array, make_string_array, + }; use arrow::array::record_batch; use fluss::client::{EARLIEST_OFFSET, FlussTable, TableScan}; use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; @@ -658,6 +660,7 @@ mod table_test { "col_timestamp_ltz_ns_neg", DataTypes::timestamp_ltz_with_precision(9), ) + .column("col_array", DataTypes::array(DataTypes::string())) .build() .expect("Failed to build schema"), ) @@ -719,6 +722,8 @@ mod table_test { let col_timestamp_ltz_ns_neg = TimestampLtz::from_millis_nanos(-301234154877, 999_999).unwrap(); + let col_array = make_string_array(&[Some("fluss"), Some("rust")]); + // Append a row with all datatypes let mut row = GenericRow::new(field_count); row.set_field(0, col_tinyint); @@ -750,6 +755,7 @@ mod table_test { row.set_field(26, col_timestamp_ns_neg); row.set_field(27, col_timestamp_ltz_us_neg); row.set_field(28, col_timestamp_ltz_ns_neg); + row.set_field(29, col_array); append_writer .append(&row) @@ -994,6 +1000,11 @@ mod table_test { "col_timestamp_ltz_ns_neg nanos mismatch" ); + let arr = found_row.get_array(29).unwrap(); + assert_eq!(arr.size(), 2, "col_array size mismatch"); + assert_eq!(arr.get_string(0).unwrap(), "fluss", "col_array[0] mismatch"); + assert_eq!(arr.get_string(1).unwrap(), "rust", "col_array[1] mismatch"); + // Verify row with all nulls (record index 1) let found_row_nulls = records[1].row(); for i in 0..field_count { @@ -1373,4 +1384,430 @@ mod table_test { .await .expect("Failed to drop table"); } + + #[tokio::test] + async fn append_and_scan_with_array() { + use fluss::row::binary_array::FlussArrayWriter; + use fluss::row::{Datum, FlussArray, GenericRow}; + + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("Failed to get admin"); + + let table_path = TablePath::new("fluss", "test_log_arrays"); + let inner_array_type = DataTypes::array(DataTypes::int()); + + let schema = Schema::builder() + .column("id", DataTypes::int()) + .column("tags", DataTypes::array(DataTypes::string())) + .column("scores", DataTypes::array(DataTypes::int())) + .column("matrix", DataTypes::array(inner_array_type.clone())) + .build() + .expect("Failed to build schema"); + + let table_descriptor = TableDescriptor::builder() + .schema(schema) + .build() + .expect("Failed to build table descriptor"); + + create_table(&admin, &table_path, &table_descriptor).await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + + let append_writer = table + .new_append() + .expect("Failed to create append") + .create_writer() + .expect("Failed to create writer"); + + // Row 1: id=1, tags=["hello", "world"], scores=[10, 20, 30], matrix=[[1,2],[3,4]] + let mut row1 = GenericRow::new(4); + row1.set_field(0, 1_i32); + row1.set_field(1, make_string_array(&[Some("hello"), Some("world")])); + row1.set_field(2, make_int_array(&[Some(10), Some(20), Some(30)])); + let m1 = { + let mut w = FlussArrayWriter::new(2, &inner_array_type); + w.write_array(0, &make_int_array(&[Some(1), Some(2)])); + w.write_array(1, &make_int_array(&[Some(3), Some(4)])); + w.complete().expect("matrix1") + }; + row1.set_field(3, m1); + + // Row 2: id=2, tags=[null], scores=[], matrix=[[5], null, []] + let mut row2 = GenericRow::new(4); + row2.set_field(0, 2_i32); + row2.set_field(1, make_string_array(&[None])); + row2.set_field(2, make_int_array(&[])); + let m2 = { + let mut w = FlussArrayWriter::new(3, &inner_array_type); + w.write_array(0, &make_int_array(&[Some(5)])); + w.set_null_at(1); + w.write_array(2, &make_int_array(&[])); + w.complete().expect("matrix2") + }; + row2.set_field(3, m2); + + // Row 3: id=3, tags=null, scores=[42], matrix=null + let mut row3 = GenericRow::new(4); + row3.set_field(0, 3_i32); + row3.set_field(1, Datum::Null); + row3.set_field(2, make_int_array(&[Some(42)])); + row3.set_field(3, Datum::Null); + + append_writer.append(&row1).expect("append row1"); + append_writer.append(&row2).expect("append row2"); + append_writer.append(&row3).expect("append row3"); + append_writer.flush().await.expect("Failed to flush"); + + let records = scan_table(&table, |scan| scan).await; + assert_eq!(records.len(), 3, "expected three log records"); + + // Verify row 1: populated flat arrays + nested array + let r0 = records[0].row(); + assert_eq!(r0.get_int(0).unwrap(), 1); + let tags_r0 = r0.get_array(1).unwrap(); + assert_eq!(tags_r0.size(), 2); + assert_eq!(tags_r0.get_string(0).unwrap(), "hello"); + assert_eq!(tags_r0.get_string(1).unwrap(), "world"); + let scores_r0 = r0.get_array(2).unwrap(); + assert_eq!(scores_r0.size(), 3); + assert_eq!(scores_r0.get_int(0).unwrap(), 10); + assert_eq!(scores_r0.get_int(1).unwrap(), 20); + assert_eq!(scores_r0.get_int(2).unwrap(), 30); + let matrix_r0: FlussArray = r0.get_array(3).unwrap(); + assert_eq!(matrix_r0.size(), 2); + let mr0_0 = matrix_r0.get_array(0).unwrap(); + assert_eq!(mr0_0.size(), 2); + assert_eq!(mr0_0.get_int(0).unwrap(), 1); + assert_eq!(mr0_0.get_int(1).unwrap(), 2); + let mr0_1 = matrix_r0.get_array(1).unwrap(); + assert_eq!(mr0_1.size(), 2); + assert_eq!(mr0_1.get_int(0).unwrap(), 3); + assert_eq!(mr0_1.get_int(1).unwrap(), 4); + + // Verify row 2: null element in array, empty array, nested with mixed inner + let r1 = records[1].row(); + assert_eq!(r1.get_int(0).unwrap(), 2); + let tags_r1 = r1.get_array(1).unwrap(); + assert_eq!(tags_r1.size(), 1); + assert!(tags_r1.is_null_at(0)); + let scores_r1 = r1.get_array(2).unwrap(); + assert_eq!(scores_r1.size(), 0); + let matrix_r1 = r1.get_array(3).unwrap(); + assert_eq!(matrix_r1.size(), 3); + let mr1_0 = matrix_r1.get_array(0).unwrap(); + assert_eq!(mr1_0.size(), 1); + assert_eq!(mr1_0.get_int(0).unwrap(), 5); + assert!(matrix_r1.is_null_at(1)); + let mr1_2 = matrix_r1.get_array(2).unwrap(); + assert_eq!(mr1_2.size(), 0); + + // Verify row 3: null flat column, null nested column + let r2 = records[2].row(); + assert_eq!(r2.get_int(0).unwrap(), 3); + assert!(r2.is_null_at(1).unwrap()); + let scores_r2 = r2.get_array(2).unwrap(); + assert_eq!(scores_r2.size(), 1); + assert_eq!(scores_r2.get_int(0).unwrap(), 42); + assert!(r2.is_null_at(3).unwrap()); + + admin + .drop_table(&table_path, false) + .await + .expect("Failed to drop table"); + } + + #[tokio::test] + async fn append_and_scan_with_array_rich_types() { + use fluss::row::binary_array::FlussArrayWriter; + use fluss::row::{Date, Decimal, FlussArray, GenericRow, Time, TimestampNtz}; + + fn assert_f32_special(actual: f32, expected: f32) { + if expected.is_nan() { + assert!(actual.is_nan(), "expected NaN"); + } else if expected.is_infinite() { + assert!(actual.is_infinite()); + assert_eq!(actual.signum(), expected.signum()); + } else { + assert!((actual - expected).abs() < f32::EPSILON); + } + } + + fn assert_f64_special(actual: f64, expected: f64) { + if expected.is_nan() { + assert!(actual.is_nan(), "expected NaN"); + } else if expected.is_infinite() { + assert!(actual.is_infinite()); + assert_eq!(actual.signum(), expected.signum()); + } else { + assert!((actual - expected).abs() < f64::EPSILON); + } + } + + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("Failed to get admin"); + + let table_path = TablePath::new("fluss", "test_log_arrays_rich_types"); + + // Compact types: DECIMAL(10,2) precision<=18, TIMESTAMP(6) precision<=3 for millis + let dec_compact = Decimal::from_unscaled_long(12345, 10, 2).unwrap(); + let ts_compact = TimestampNtz::from_millis_nanos(1769163227123, 456000).unwrap(); + + // Non-compact types: DECIMAL(22,5) precision>18, TIMESTAMP(9) precision>3 + let dec_big = Decimal::from_unscaled_bytes(&[66, 237, 18, 59, 11, 216, 31, 4, 244], 22, 5) + .expect("big decimal"); + let ts_nano = TimestampNtz::from_millis_nanos(1769163227123, 999_999).unwrap(); + + let d = Date::new(20476); + let t = Time::new(36827123); + let elem_bytes = &[0_u8, 1, 2, 255]; + let fixed_a: Vec = vec![0xDE, 0xAD, 0xBE, 0xEF]; + let fixed_b: Vec = vec![0x01, 0x02, 0x03, 0x04]; + + let schema = Schema::builder() + .column("id", DataTypes::int()) + .column("arr_bytes", DataTypes::array(DataTypes::bytes())) + .column("arr_date", DataTypes::array(DataTypes::date())) + .column( + "arr_time", + DataTypes::array(DataTypes::time_with_precision(3)), + ) + .column( + "arr_ts_compact", + DataTypes::array(DataTypes::timestamp_with_precision(6)), + ) + .column( + "arr_ts_nano", + DataTypes::array(DataTypes::timestamp_with_precision(9)), + ) + .column( + "arr_decimal_compact", + DataTypes::array(DataTypes::decimal(10, 2)), + ) + .column( + "arr_decimal_big", + DataTypes::array(DataTypes::decimal(22, 5)), + ) + .column("arr_long_str", DataTypes::array(DataTypes::string())) + .column("arr_float", DataTypes::array(DataTypes::float())) + .column("arr_double", DataTypes::array(DataTypes::double())) + .column("arr_binary", DataTypes::array(DataTypes::binary(4))) + .build() + .expect("Failed to build schema"); + + let table_descriptor = TableDescriptor::builder() + .schema(schema) + .build() + .expect("Failed to build table descriptor"); + + create_table(&admin, &table_path, &table_descriptor).await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + + let append_writer = table + .new_append() + .expect("Failed to create append") + .create_writer() + .expect("Failed to create writer"); + + let mut row = GenericRow::new(12); + row.set_field(0, 1_i32); + + // col 1: arr_bytes — binary with null element + let arr_bytes = { + let mut w = FlussArrayWriter::new(2, &DataTypes::bytes()); + w.write_binary_bytes(0, elem_bytes); + w.set_null_at(1); + w.complete().expect("arr_bytes") + }; + row.set_field(1, arr_bytes); + + // col 2: arr_date + let arr_date = { + let mut w = FlussArrayWriter::new(2, &DataTypes::date()); + w.write_date(0, d); + w.set_null_at(1); + w.complete().expect("arr_date") + }; + row.set_field(2, arr_date); + + // col 3: arr_time + let arr_time = { + let mut w = FlussArrayWriter::new(2, &DataTypes::time_with_precision(3)); + w.write_time(0, t); + w.set_null_at(1); + w.complete().expect("arr_time") + }; + row.set_field(3, arr_time); + + // col 4: arr_ts_compact — compact timestamp (precision 6, millis+nanos) + let arr_ts_compact = { + let mut w = FlussArrayWriter::new(2, &DataTypes::timestamp_with_precision(6)); + w.write_timestamp_ntz(0, &ts_compact, 6); + w.set_null_at(1); + w.complete().expect("arr_ts_compact") + }; + row.set_field(4, arr_ts_compact); + + // col 5: arr_ts_nano — non-compact timestamp (precision 9) + let arr_ts_nano = { + let mut w = FlussArrayWriter::new(1, &DataTypes::timestamp_with_precision(9)); + w.write_timestamp_ntz(0, &ts_nano, 9); + w.complete().expect("arr_ts_nano") + }; + row.set_field(5, arr_ts_nano); + + // col 6: arr_decimal_compact — compact decimal (precision 10) + let arr_decimal_compact = { + let mut w = FlussArrayWriter::new(2, &DataTypes::decimal(10, 2)); + w.write_decimal(0, &dec_compact, 10); + w.set_null_at(1); + w.complete().expect("arr_decimal_compact") + }; + row.set_field(6, arr_decimal_compact); + + // col 7: arr_decimal_big — non-compact decimal (precision 22) + let arr_decimal_big = { + let mut w = FlussArrayWriter::new(1, &DataTypes::decimal(22, 5)); + w.write_decimal(0, &dec_big, 22); + w.complete().expect("arr_decimal_big") + }; + row.set_field(7, arr_decimal_big); + + // col 8: arr_long_str — heap-backed strings (>= 8 bytes) + let arr_long_str = { + let mut w = FlussArrayWriter::new(2, &DataTypes::string()); + w.write_string(0, "abcdefghi"); + w.write_string(1, "longstring_here"); + w.complete().expect("arr_long_str") + }; + row.set_field(8, arr_long_str); + + // col 9: arr_float — IEEE 754 specials + let arr_float = { + let mut w = FlussArrayWriter::new(3, &DataTypes::float()); + w.write_float(0, f32::NAN); + w.write_float(1, f32::INFINITY); + w.write_float(2, f32::NEG_INFINITY); + w.complete().expect("arr_float") + }; + row.set_field(9, arr_float); + + // col 10: arr_double — IEEE 754 specials + let arr_double = { + let mut w = FlussArrayWriter::new(3, &DataTypes::double()); + w.write_double(0, f64::NAN); + w.write_double(1, f64::INFINITY); + w.write_double(2, f64::NEG_INFINITY); + w.complete().expect("arr_double") + }; + row.set_field(10, arr_double); + + // col 11: arr_binary — fixed-size binary(4) + let arr_binary = { + let mut w = FlussArrayWriter::new(2, &DataTypes::binary(4)); + w.write_binary_bytes(0, &fixed_a); + w.write_binary_bytes(1, &fixed_b); + w.complete().expect("arr_binary") + }; + row.set_field(11, arr_binary); + + append_writer.append(&row).expect("append"); + append_writer.flush().await.expect("Failed to flush"); + + let records = scan_table(&table, |scan| scan).await; + assert_eq!(records.len(), 1); + let r = records[0].row(); + + // Verify arr_bytes + let ab = r.get_array(1).unwrap(); + assert_eq!(ab.size(), 2); + assert_eq!(ab.get_binary(0).unwrap(), elem_bytes); + assert!(ab.is_null_at(1)); + + // Verify arr_date + let ad = r.get_array(2).unwrap(); + assert_eq!(ad.size(), 2); + assert_eq!(ad.get_date(0).unwrap().get_inner(), d.get_inner()); + assert!(ad.is_null_at(1)); + + // Verify arr_time + let at = r.get_array(3).unwrap(); + assert_eq!(at.size(), 2); + assert_eq!(at.get_time(0).unwrap().get_inner(), t.get_inner()); + assert!(at.is_null_at(1)); + + // Verify arr_ts_compact + let ats = r.get_array(4).unwrap(); + assert_eq!(ats.size(), 2); + let read_ts_compact = ats.get_timestamp_ntz(0, 6).unwrap(); + assert_eq!( + read_ts_compact.get_millisecond(), + ts_compact.get_millisecond() + ); + assert_eq!( + read_ts_compact.get_nano_of_millisecond(), + ts_compact.get_nano_of_millisecond() + ); + assert!(ats.is_null_at(1)); + + // Verify arr_ts_nano + let ats_nano = r.get_array(5).unwrap(); + assert_eq!(ats_nano.size(), 1); + let read_ts_nano = ats_nano.get_timestamp_ntz(0, 9).unwrap(); + assert_eq!(read_ts_nano.get_millisecond(), ts_nano.get_millisecond()); + assert_eq!( + read_ts_nano.get_nano_of_millisecond(), + ts_nano.get_nano_of_millisecond() + ); + + // Verify arr_decimal_compact + let adc = r.get_array(6).unwrap(); + assert_eq!(adc.size(), 2); + assert_eq!(adc.get_decimal(0, 10, 2).unwrap(), dec_compact); + assert!(adc.is_null_at(1)); + + // Verify arr_decimal_big + let adb = r.get_array(7).unwrap(); + assert_eq!(adb.size(), 1); + assert_eq!(adb.get_decimal(0, 22, 5).unwrap(), dec_big); + + // Verify arr_long_str + let als = r.get_array(8).unwrap(); + assert_eq!(als.size(), 2); + assert_eq!(als.get_string(0).unwrap(), "abcdefghi"); + assert_eq!(als.get_string(1).unwrap(), "longstring_here"); + + // Verify arr_float — IEEE 754 specials + let af = r.get_array(9).unwrap(); + assert_eq!(af.size(), 3); + assert_f32_special(af.get_float(0).unwrap(), f32::NAN); + assert_f32_special(af.get_float(1).unwrap(), f32::INFINITY); + assert_f32_special(af.get_float(2).unwrap(), f32::NEG_INFINITY); + + // Verify arr_double — IEEE 754 specials + let adbl = r.get_array(10).unwrap(); + assert_eq!(adbl.size(), 3); + assert_f64_special(adbl.get_double(0).unwrap(), f64::NAN); + assert_f64_special(adbl.get_double(1).unwrap(), f64::INFINITY); + assert_f64_special(adbl.get_double(2).unwrap(), f64::NEG_INFINITY); + + // Verify arr_binary — fixed-size binary(4) + let fb: FlussArray = r.get_array(11).unwrap(); + assert_eq!(fb.size(), 2); + assert_eq!(fb.get_binary(0).unwrap(), fixed_a.as_slice()); + assert_eq!(fb.get_binary(1).unwrap(), fixed_b.as_slice()); + + admin + .drop_table(&table_path, false) + .await + .expect("Failed to drop table"); + } } diff --git a/crates/fluss/tests/integration/utils.rs b/crates/fluss/tests/integration/utils.rs index dc2876f8..81a7c0b1 100644 --- a/crates/fluss/tests/integration/utils.rs +++ b/crates/fluss/tests/integration/utils.rs @@ -17,7 +17,9 @@ */ use crate::integration::fluss_cluster::{FlussTestingCluster, FlussTestingClusterBuilder}; use fluss::client::FlussAdmin; -use fluss::metadata::{PartitionSpec, TableDescriptor, TablePath}; +use fluss::metadata::{DataTypes, PartitionSpec, TableDescriptor, TablePath}; +use fluss::row::FlussArray; +use fluss::row::binary_array::FlussArrayWriter; use std::collections::HashMap; use std::sync::Arc; use std::sync::LazyLock; @@ -94,6 +96,28 @@ pub async fn create_table( .expect("Failed to create table"); } +pub fn make_string_array(values: &[Option<&str>]) -> FlussArray { + let mut writer = FlussArrayWriter::new(values.len(), &DataTypes::string()); + for (idx, value) in values.iter().enumerate() { + match value { + Some(v) => writer.write_string(idx, v), + None => writer.set_null_at(idx), + } + } + writer.complete().expect("Failed to build string array") +} + +pub fn make_int_array(values: &[Option]) -> FlussArray { + let mut writer = FlussArrayWriter::new(values.len(), &DataTypes::int()); + for (idx, value) in values.iter().enumerate() { + match value { + Some(v) => writer.write_int(idx, *v), + None => writer.set_null_at(idx), + } + } + writer.complete().expect("Failed to build int array") +} + /// Similar to wait_for_cluster_ready but connects with SASL credentials. pub async fn wait_for_cluster_ready_with_sasl(cluster: &FlussTestingCluster) { let timeout = Duration::from_secs(30);