Question
I’m using pycberg v0.11.1 to write DataFrames in a setup that includes Lakekeeper, Iceberg, and MinIO.
When writing a single file, everything works correctly and performance is very good. However, issues start to appear when running tests with pytest-xdist using multiple workers.
With a small number of tests, everything still passes. But when I run around 15 tests in parallel, most of them complete successfully and write to storage as expected. The problem occurs toward the end of the run—some of the remaining tests fail during the write process.
pyiceberg. exceptions.SignError: Failed to sign request 412: {'method': 'PUT', 'region': 'us-east-1', 'uri': ' /my_warehouse/019dca4b-f5bc-71b3-9ccb-a54d0c1a7f87/019dca4c-45b7-7d03-b1b9-f9c899b8db5c/metadata/62127deb-4d73-46a7-bec5-3b9bfc6c6566-m0.avro', 'headers': {'User-Agent': ['aiobotocore/3.3.0 md/Botocore#1.42.70 ua/2.1 os/linux#4.18.0-348.7.1.el8_5.x86_64 md/arch#x86_64 lang/python#3.14.3 md/pyimpl#CPython m/N,D,c,a cfg/retry-mode#legacy botocore/1.42.70'], 'Expect': ['100-continue']}}
`
class IcebergConnProperties(CatalogConnProperties):
conn_type: str
catalog_uri: str
token: str | None
warehouse: str
ssl_verify: str = "true"
extra: dict[str, str] = field(default_factory=dict)
def to_catalog_config(self) -> dict[str, str | None]:
"""Return kwargs suitable for ``pyiceberg.catalog.load_catalog``."""
kwargs = {
"type": self.conn_type,
"uri": self.catalog_uri,
"token": self.token,
"warehouse": self.warehouse,
"ssl.verify": self.ssl_verify,
"header.X-Iceberg-Access-Delegation": "vended-credentials",
}
kwargs.update(self.extra)
return kwargs
`
`
def upload_dataframe_to_storage(
self,
dataframe: pl.DataFrame,
schema: str,
table_name: str,
) -> None:
table_just_created = False
table_id = f"{schema}.{table_name}"
catalog = self._connect_catalog(self._conn_properties)
arrow_table = dataframe.to_arrow()
try:
if catalog.table_exists(table_id):
catalog.drop_table(table_id)
self.logger.debug(f"Dropped existing table '{table_id}' before recreating")
else:
self.logger.debug(f"Table '{table_id}' not found — creating from Arrow schema")
self.logger.debug(f"Created namespace if not exists '{schema}'")
iceberg_table = catalog.create_table(
identifier=table_id,
schema=arrow_table.schema,
)
self.logger.debug(f"Created table '{table_id}'")
table_just_created = True
iceberg_table.append(arrow_table)
self.logger.debug(f"Wrote {len(dataframe)} rows to '{table_id}'")
self.logger.debug(f"Table '{table_id}' appended")
except Exception:
self.logger.exception(f"Failed to append {len(dataframe)} rows to '{table_id}'")
if table_just_created:
self.logger.debug(f"Dropping empty table '{table_id}' created in this call")
catalog.drop_table(table_id)
raise
finally:
self.logger.debug("Deleted arrow table from memory")
del arrow_table
def _connect_catalog(self, props: CatalogConnProperties) -> Catalog:
"""Connect to the Iceberg REST catalog."""
self.logger.debug("Loading Iceberg REST catalog")
catalog_kwargs = props.to_catalog_config()
if Config.Trino.USE_JWT_AUTH:
catalog_kwargs["token"] = get_access_token()
self.logger.debug(f"Catalog kwargs: {catalog_kwargs}")
return load_catalog(name=self._CATALOG_NAME, **catalog_kwargs)`
Question
I’m using pycberg v0.11.1 to write DataFrames in a setup that includes Lakekeeper, Iceberg, and MinIO.
When writing a single file, everything works correctly and performance is very good. However, issues start to appear when running tests with pytest-xdist using multiple workers.
With a small number of tests, everything still passes. But when I run around 15 tests in parallel, most of them complete successfully and write to storage as expected. The problem occurs toward the end of the run—some of the remaining tests fail during the write process.
`
class IcebergConnProperties(CatalogConnProperties):
conn_type: str
catalog_uri: str
token: str | None
warehouse: str
ssl_verify: str = "true"
extra: dict[str, str] = field(default_factory=dict)
`
`
def upload_dataframe_to_storage(
self,
dataframe: pl.DataFrame,
schema: str,
table_name: str,
) -> None:
table_just_created = False
table_id = f"{schema}.{table_name}"
catalog = self._connect_catalog(self._conn_properties)
arrow_table = dataframe.to_arrow()
try:
if catalog.table_exists(table_id):
catalog.drop_table(table_id)
self.logger.debug(f"Dropped existing table '{table_id}' before recreating")
else:
self.logger.debug(f"Table '{table_id}' not found — creating from Arrow schema")
self.logger.debug(f"Created namespace if not exists '{schema}'")
iceberg_table = catalog.create_table(
identifier=table_id,
schema=arrow_table.schema,
)
self.logger.debug(f"Created table '{table_id}'")
table_just_created = True
iceberg_table.append(arrow_table)
self.logger.debug(f"Wrote {len(dataframe)} rows to '{table_id}'")
self.logger.debug(f"Table '{table_id}' appended")
except Exception:
self.logger.exception(f"Failed to append {len(dataframe)} rows to '{table_id}'")
if table_just_created:
self.logger.debug(f"Dropping empty table '{table_id}' created in this call")
catalog.drop_table(table_id)
raise
finally:
self.logger.debug("Deleted arrow table from memory")
del arrow_table