Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 177 additions & 4 deletions bindings/python/test/test_kv_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@
import fluss


async def _upsert_and_wait(writer, row):
handle = writer.upsert(row)
await handle.wait()


def _assert_float_specials(values):
assert math.isnan(values[0])
assert math.isinf(values[1]) and values[1] > 0
assert math.isinf(values[2]) and values[2] < 0


async def test_upsert_delete_and_lookup(connection, admin):
"""Test upsert, lookup, update, delete, and non-existent key lookup."""
table_path = fluss.TablePath("fluss", "py_test_upsert_and_lookup")
Expand Down Expand Up @@ -335,6 +346,167 @@ async def test_partitioned_table_upsert_and_lookup(connection, admin):
await admin.drop_table(table_path, ignore_if_not_exists=False)


async def test_upsert_and_lookup_with_array(connection, admin):
"""Test upsert and lookup with flat, nested, and null-pattern arrays in KV tables."""
table_path = fluss.TablePath("fluss", "py_test_kv_arrays")
await admin.drop_table(table_path, ignore_if_not_exists=True)

schema = fluss.Schema(
pa.schema(
[
pa.field("id", pa.int32()),
pa.field("tags", pa.list_(pa.string())),
pa.field("scores", pa.list_(pa.int32())),
pa.field("matrix", pa.list_(pa.list_(pa.int32()))),
]
),
primary_keys=["id"],
)
table_descriptor = fluss.TableDescriptor(schema)
await admin.create_table(table_path, table_descriptor, ignore_if_exists=False)

table = await connection.get_table(table_path)
upsert_writer = table.new_upsert().create_writer()

await _upsert_and_wait(
upsert_writer,
{
"id": 1,
"tags": ["hello", "world"],
"scores": [10, 20, 30],
"matrix": [[1, 2], [3, 4]],
},
)
await _upsert_and_wait(
upsert_writer,
{"id": 2, "tags": [None], "scores": [], "matrix": None},
)
await _upsert_and_wait(
upsert_writer,
{"id": 3, "tags": None, "scores": [42], "matrix": [[], [5], [6, 7, 8]]},
)
await _upsert_and_wait(
upsert_writer,
{"id": 4, "tags": None, "scores": None, "matrix": [[1, None], None, []]},
)

lookuper = table.new_lookup().create_lookuper()

result1 = await lookuper.lookup({"id": 1})
assert result1 is not None
assert result1["tags"] == ["hello", "world"]
assert result1["scores"] == [10, 20, 30]
assert result1["matrix"] == [[1, 2], [3, 4]]

result2 = await lookuper.lookup({"id": 2})
assert result2 is not None
assert result2["tags"] == [None]
assert result2["scores"] == []
assert result2["matrix"] is None

result3 = await lookuper.lookup({"id": 3})
assert result3 is not None
assert result3["tags"] is None
assert result3["scores"] == [42]
assert result3["matrix"] == [[], [5], [6, 7, 8]]

result4 = await lookuper.lookup({"id": 4})
assert result4 is not None
assert result4["tags"] is None
assert result4["scores"] is None
assert result4["matrix"] == [[1, None], None, []]

await admin.drop_table(table_path, ignore_if_not_exists=False)


async def test_upsert_and_lookup_with_array_rich_types(connection, admin):
Comment thread
charlesdong1991 marked this conversation as resolved.
"""Test upsert/lookup for arrays with rich element types and encoding edge cases."""
table_path = fluss.TablePath("fluss", "py_test_kv_arrays_rich_types")
await admin.drop_table(table_path, ignore_if_not_exists=True)

schema = fluss.Schema(
pa.schema(
[
pa.field("id", pa.int32()),
pa.field("arr_bytes", pa.list_(pa.binary())),
pa.field("arr_date", pa.list_(pa.date32())),
pa.field("arr_time", pa.list_(pa.time32("ms"))),
pa.field("arr_ts_ntz", pa.list_(pa.timestamp("us"))),
pa.field("arr_ts_ltz", pa.list_(pa.timestamp("us", tz="UTC"))),
pa.field("arr_decimal", pa.list_(pa.decimal128(10, 2))),
pa.field("arr_long_str", pa.list_(pa.string())),
pa.field("arr_big_decimal", pa.list_(pa.decimal128(22, 5))),
pa.field("arr_ts_nano", pa.list_(pa.timestamp("ns"))),
pa.field("arr_float", pa.list_(pa.float32())),
pa.field("arr_double", pa.list_(pa.float64())),
# TODO(fluss-python#524): support PyArrow FixedSizeBinary in schema
# conversion. Then switch to pa.binary(4).
pa.field("arr_binary", pa.list_(pa.binary())),
]
),
primary_keys=["id"],
)
table_descriptor = fluss.TableDescriptor(schema)
await admin.create_table(table_path, table_descriptor, ignore_if_exists=False)

table = await connection.get_table(table_path)
upsert_writer = table.new_upsert().create_writer()

await _upsert_and_wait(
upsert_writer,
{
"id": 1,
"arr_bytes": [b"\x10\x20\x30", None],
"arr_date": [date(2026, 1, 23), None],
"arr_time": [dt_time(10, 13, 47, 123000), None],
"arr_ts_ntz": [datetime(2026, 1, 23, 10, 13, 47, 123000)],
"arr_ts_ltz": [
datetime(2026, 1, 23, 10, 13, 47, 123000, tzinfo=timezone.utc)
],
"arr_decimal": [Decimal("123.45"), None],
"arr_long_str": [
"abcdefgh",
"this is a much longer string that definitely exceeds inline",
],
"arr_big_decimal": [
Decimal("12345678901234567.12345"),
Decimal("-99999999999999999.99999"),
],
"arr_ts_nano": [datetime(2026, 1, 23, 10, 13, 47, 123456)],
"arr_float": [float("nan"), float("inf"), float("-inf")],
"arr_double": [float("nan"), float("inf"), float("-inf")],
"arr_binary": [b"\xde\xad\xbe\xef", b"\x00\x01\x02\x03"],
},
)

lookuper = table.new_lookup().create_lookuper()
result = await lookuper.lookup({"id": 1})
assert result is not None

assert result["arr_bytes"] == [b"\x10\x20\x30", None]
assert result["arr_date"] == [date(2026, 1, 23), None]
assert result["arr_time"] == [dt_time(10, 13, 47, 123000), None]
assert result["arr_ts_ntz"] == [datetime(2026, 1, 23, 10, 13, 47, 123000)]
assert result["arr_ts_ltz"] == [
datetime(2026, 1, 23, 10, 13, 47, 123000, tzinfo=timezone.utc)
]
assert result["arr_decimal"] == [Decimal("123.45"), None]
assert result["arr_long_str"] == [
"abcdefgh",
"this is a much longer string that definitely exceeds inline",
]
assert result["arr_big_decimal"] == [
Decimal("12345678901234567.12345"),
Decimal("-99999999999999999.99999"),
]
assert result["arr_ts_nano"] == [datetime(2026, 1, 23, 10, 13, 47, 123456)]
_assert_float_specials(result["arr_float"])
_assert_float_specials(result["arr_double"])
assert result["arr_binary"] == [b"\xde\xad\xbe\xef", b"\x00\x01\x02\x03"]

await admin.drop_table(table_path, ignore_if_not_exists=False)


async def test_all_supported_datatypes(connection, admin):
"""Test upsert/lookup for all supported data types, including nulls."""
table_path = fluss.TablePath("fluss", "py_test_kv_all_datatypes")
Expand All @@ -358,6 +530,7 @@ async def test_all_supported_datatypes(connection, admin):
pa.field("col_timestamp_ntz", pa.timestamp("us")),
pa.field("col_timestamp_ltz", pa.timestamp("us", tz="UTC")),
pa.field("col_bytes", pa.binary()),
pa.field("col_array", pa.list_(pa.string())),
]
),
primary_keys=["pk_int"],
Expand Down Expand Up @@ -385,10 +558,10 @@ async def test_all_supported_datatypes(connection, admin):
"col_timestamp_ntz": datetime(2026, 1, 23, 10, 13, 47, 123000),
"col_timestamp_ltz": datetime(2026, 1, 23, 10, 13, 47, 123000),
"col_bytes": b"binary data",
"col_array": ["fluss", "python"],
}

handle = upsert_writer.upsert(row_data)
await handle.wait()
await _upsert_and_wait(upsert_writer, row_data)

lookuper = table.new_lookup().create_lookuper()
result = await lookuper.lookup({"pk_int": 1})
Expand All @@ -411,14 +584,14 @@ async def test_all_supported_datatypes(connection, admin):
2026, 1, 23, 10, 13, 47, 123000, tzinfo=timezone.utc
)
assert result["col_bytes"] == b"binary data"
assert result["col_array"] == ["fluss", "python"]

# Test with null values for all nullable columns
null_row = {"pk_int": 2}
for col in row_data:
if col != "pk_int":
null_row[col] = None
handle = upsert_writer.upsert(null_row)
await handle.wait()
await _upsert_and_wait(upsert_writer, null_row)

result = await lookuper.lookup({"pk_int": 2})
assert result is not None, "Row with nulls should exist"
Expand Down
Loading
Loading