Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Register custom markers
markers =
stress: marks tests as stress tests (long-running, resource-intensive)
slow: marks tests as extra-slow (sustained load, multi-minute duration)

# Default options applied to all pytest runs
# Default: pytest -v → Skips stress tests (fast)
Expand Down
350 changes: 336 additions & 14 deletions tests/test_011_singlethreaded_stress.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
- Exception handling during batch processing
- Thousands of empty string allocations
- 10MB+ LOB data handling
- Concurrent fetch data integrity (multi-cursor)
- executemany() with 10,000 mixed-type rows
- NULL-heavy result sets (50,000 rows, 6/8 columns NULL)
- Cursor re-use across 5,000 execute/fetch cycles
- fetchone() loop vs fetchall() parity at 100,000 rows

Tests are marked with @pytest.mark.stress and may be skipped in regular CI runs.
"""
Expand Down Expand Up @@ -262,7 +267,7 @@ def test_thousands_of_empty_strings_allocation_stress(cursor, db_connection):
@pytest.mark.stress
def test_large_result_set_100k_rows_no_overflow(cursor, db_connection):
"""
Test #5: Fetch very large result sets (100,000+ rows) to test buffer overflow protection.
Test #4: Fetch very large result sets (100,000+ rows) to test buffer overflow protection.

Tests that large rowIdx values don't cause buffer overflow when calculating
rowIdx × fetchBufferSize. Verifies data integrity across all rows - no crashes,
Expand Down Expand Up @@ -356,7 +361,7 @@ def test_large_result_set_100k_rows_no_overflow(cursor, db_connection):
@pytest.mark.stress
def test_very_large_lob_10mb_data_integrity(cursor, db_connection):
"""
Test #6: Fetch VARCHAR(MAX), NVARCHAR(MAX), VARBINARY(MAX) with 10MB+ data.
Test #5: Fetch VARCHAR(MAX), NVARCHAR(MAX), VARBINARY(MAX) with 10MB+ data.

Verifies:
1. Correct LOB detection
Expand Down Expand Up @@ -458,7 +463,7 @@ def test_very_large_lob_10mb_data_integrity(cursor, db_connection):
@pytest.mark.stress
def test_concurrent_fetch_data_integrity_no_corruption(db_connection, conn_str):
"""
Test #7: Multiple threads/cursors fetching data simultaneously.
Test #6: Multiple threads/cursors fetching data simultaneously.

Verifies:
1. No data corruption occurs
Expand Down Expand Up @@ -547,18 +552,18 @@ def worker_thread(thread_id: int, conn_str: str, results_list: List, errors_list
for thread in threads:
thread.join()

# Verify results
print(f"\nConcurrent fetch results:")
for result in results:
print(
f" Thread {result['thread_id']}: Fetched {result['rows_fetched']} rows - {'OK' if result['success'] else 'FAILED'}"
)
# Verify results (after ALL threads have finished)
print(f"\nConcurrent fetch results:")
for result in results:
print(
f" Thread {result['thread_id']}: Fetched {result['rows_fetched']} rows - {'OK' if result['success'] else 'FAILED'}"
)

if errors:
print(f"\nErrors encountered:")
for error in errors:
print(f" Thread {error['thread_id']}: {error['error']}")
pytest.fail(f"Concurrent fetch had {len(errors)} errors")
if errors:
print(f"\nErrors encountered:")
for error in errors:
print(f" Thread {error['thread_id']}: {error['error']}")
pytest.fail(f"Concurrent fetch had {len(errors)} errors")

# All threads should have succeeded
assert (
Expand All @@ -574,3 +579,320 @@ def worker_thread(thread_id: int, conn_str: str, results_list: List, errors_list
print(
f"\n[OK] Concurrent fetch test passed: {num_threads} threads, no corruption, no race conditions"
)


# ============================================================================
# New Stress Tests
# ============================================================================


@pytest.mark.stress
def test_executemany_large_batch_mixed_types(cursor, db_connection):
"""
Test #7: Stress executemany() with 10,000 rows of mixed parameter types in a
single call.

Exercises parameter serialization at scale for INT, FLOAT, NVARCHAR, VARBINARY,
DECIMAL, and NULL in one large executemany batch. Verifies the inserted row count
and spot-checks five rows for exact value correctness.
"""
num_rows = 10000

try:
drop_table_if_exists(cursor, "#pytest_executemany_stress")

cursor.execute("""
CREATE TABLE #pytest_executemany_stress (
id INT,
int_col INT,
float_col FLOAT,
str_col NVARCHAR(100),
bytes_col VARBINARY(50),
dec_col DECIMAL(18, 6),
null_col NVARCHAR(50)
)
""")
db_connection.commit()

# Build 10,000 rows with predictable, verifiable values
rows = [
(
i,
i * 2,
float(i) * 1.5,
f"str_{i}",
bytes([i % 256]) * 10,
decimal.Decimal(str(i)) / decimal.Decimal("1000"),
None, # always NULL to exercise the NULL serialization path
)
for i in range(num_rows)
]

# Single large executemany call — stresses parameter serialization at scale
cursor.executemany(
"INSERT INTO #pytest_executemany_stress VALUES (?, ?, ?, ?, ?, ?, ?)",
rows,
)
db_connection.commit()

# Verify total count
cursor.execute("SELECT COUNT(*) FROM #pytest_executemany_stress")
count = cursor.fetchone()[0]
assert count == num_rows, f"Expected {num_rows} rows, got {count}"
print(f"[OK] executemany stress: {num_rows} rows inserted")

# Spot-check five representative rows
for idx in [0, 1, 500, 5000, 9999]:
cursor.execute(
"SELECT id, int_col, float_col, str_col, bytes_col, dec_col, null_col"
" FROM #pytest_executemany_stress WHERE id = ?",
(idx,),
)
row = cursor.fetchone()
assert row is not None, f"Row {idx} not found after executemany"
assert row[0] == idx, f"Row {idx}: id mismatch"
assert row[1] == idx * 2, f"Row {idx}: int_col mismatch"
assert abs(row[2] - float(idx) * 1.5) < 1e-9, f"Row {idx}: float_col mismatch"
assert row[3] == f"str_{idx}", f"Row {idx}: str_col mismatch"
assert row[4] == bytes([idx % 256]) * 10, f"Row {idx}: bytes_col mismatch"
expected_dec = decimal.Decimal(str(idx)) / decimal.Decimal("1000")
assert (
row[5] == expected_dec
), f"Row {idx}: dec_col mismatch: got {row[5]}, expected {expected_dec}"
assert row[6] is None, f"Row {idx}: null_col should be None, got {row[6]}"

print(
"[OK] executemany stress: all 5 spot-checks passed (int, float, str, bytes, decimal, NULL)"
)

except Exception as e:
pytest.fail(f"executemany large batch stress failed: {e}")
finally:
drop_table_if_exists(cursor, "#pytest_executemany_stress")
db_connection.commit()


@pytest.mark.stress
def test_null_heavy_large_result_set(cursor, db_connection):
"""
Test #8: Fetch 50,000 rows where 6 of 8 columns are always NULL.

Real-world tables have many nullable columns and SQL NULL takes a separate code
path in the fetch layer. This test stresses that path at scale and verifies:
- All NULL columns map to Python None (no corruption)
- The two non-null sentinel columns are intact
- No crashes, no partial rows

Note: VARBINARY is excluded because the driver cannot reliably infer the SQL
type from a Python None, causing an implicit-conversion error on SQL Server.
Binary-NULL handling is covered by test_thousands_of_empty_strings_allocation_stress.
"""
num_rows = 50000

try:
drop_table_if_exists(cursor, "#pytest_null_heavy")

# Note: VARBINARY is intentionally excluded — passing None for a VARBINARY
# column causes the driver to infer SQL_C_CHAR, which SQL Server rejects with
# an implicit-conversion error. NULL handling for binary data is covered by
# test_thousands_of_empty_strings_allocation_stress instead.
cursor.execute("""
CREATE TABLE #pytest_null_heavy (
id INT NOT NULL,
non_null_str NVARCHAR(30) NOT NULL,
null_int INT,
null_float FLOAT,
null_str NVARCHAR(100),
null_nvarchar NVARCHAR(MAX),
null_datetime DATETIME,
null_bit BIT
)
""")
db_connection.commit()

# Insert in batches of 1000 to avoid a single huge parameter array
batch_size = 1000
for batch_start in range(0, num_rows, batch_size):
batch = [
(i, f"ID_{i}", None, None, None, None, None, None)
for i in range(batch_start, min(batch_start + batch_size, num_rows))
]
cursor.executemany(
"INSERT INTO #pytest_null_heavy VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
batch,
)
db_connection.commit()
print(f"[OK] Inserted {num_rows} NULL-heavy rows")

# Fetch all at once and verify
cursor.execute("SELECT * FROM #pytest_null_heavy ORDER BY id")
rows = cursor.fetchall()

assert len(rows) == num_rows, f"Expected {num_rows} rows, got {len(rows)}"

for i, row in enumerate(rows):
assert row[0] == i, f"Row {i}: id mismatch (got {row[0]})"
assert row[1] == f"ID_{i}", f"Row {i}: non_null_str mismatch (got {row[1]})"
# Columns 2–7 must be Python None (SQL NULL)
for col_idx in range(2, 8):
assert (
row[col_idx] is None
), f"Row {i} col {col_idx}: expected None, got {row[col_idx]!r}"

print(
f"[OK] NULL-heavy stress: {num_rows} rows, all 6 nullable cols are None, "
"no corruption in non-null sentinel columns"
)

except Exception as e:
pytest.fail(f"NULL-heavy result set stress failed: {e}")
finally:
drop_table_if_exists(cursor, "#pytest_null_heavy")
db_connection.commit()


@pytest.mark.stress
def test_cursor_reuse_high_iteration(db_connection):
"""
Test #9: Re-use a single cursor for 5,000 sequential execute/fetch cycles.

ORM frameworks and connection pools re-use cursors heavily. This test verifies
that cursor state is correctly reset between executes, results are accurate after
thousands of prior queries, and memory does not grow unboundedly (potential leak).

Three alternating query patterns exercise different internal code paths each cycle.
"""
import psutil
import gc

iterations = 5000
stress_cursor = db_connection.cursor()

try:
gc.collect()
process = psutil.Process()
baseline_rss_mb = process.memory_info().rss / (1024 * 1024)

for i in range(iterations):
pattern = i % 3

if pattern == 0:
# Parametrized WHERE clause — exercises parameter binding path
stress_cursor.execute(
"SELECT COUNT(*) FROM sys.objects WHERE object_id > ?",
(i % 1000,),
)
row = stress_cursor.fetchone()
assert row is not None, f"Iter {i}: fetchone returned None"
assert isinstance(row[0], int), f"Iter {i}: COUNT(*) not an int"

elif pattern == 1:
# Multi-row result — exercises fetchall path
stress_cursor.execute(
"SELECT TOP 5 name, object_id FROM sys.objects ORDER BY object_id"
)
rows = stress_cursor.fetchall()
assert len(rows) <= 5, f"Iter {i}: got {len(rows)} rows, expected ≤5"
assert all(r[0] is not None for r in rows), f"Iter {i}: NULL name in result"

else:
# Scalar query — simplest fetch path
stress_cursor.execute("SELECT 1 AS n, 'hello' AS s")
row = stress_cursor.fetchone()
assert row[0] == 1, f"Iter {i}: scalar mismatch (got {row[0]})"
assert row[1] == "hello", f"Iter {i}: string mismatch (got {row[1]})"

gc.collect()
final_rss_mb = process.memory_info().rss / (1024 * 1024)
mem_growth_mb = final_rss_mb - baseline_rss_mb

# 50MB growth limit across 5,000 iterations is generous but detects real leaks
assert (
mem_growth_mb < 50
), f"Potential cursor memory leak: {mem_growth_mb:.1f}MB growth over {iterations} iterations"

print(
f"[OK] Cursor re-use stress: {iterations} iterations, "
f"memory delta {mem_growth_mb:+.1f}MB, all results correct"
)

except Exception as e:
pytest.fail(f"Cursor re-use stress failed: {e}")
finally:
stress_cursor.close()


@pytest.mark.stress
def test_fetchone_loop_vs_fetchall_parity(cursor, db_connection):
"""
Test #10: Verify fetchone() loop and fetchall() produce bit-identical results
for 100,000 rows.

The two fetch paths have separate internal implementations. Any divergence —
wrong values, swapped columns, missing rows — indicates a bug in one of them.
This test surfaces such divergence at a scale where the bug would not be visible
in small unit tests.
"""
num_rows = 100000

try:
drop_table_if_exists(cursor, "#pytest_fetch_parity")

cursor.execute("""
CREATE TABLE #pytest_fetch_parity (
id INT,
val NVARCHAR(20),
num INT
)
""")
db_connection.commit()

batch_size = 1000
for start in range(0, num_rows, batch_size):
batch = [(i, f"V_{i}", i * 3) for i in range(start, min(start + batch_size, num_rows))]
cursor.executemany("INSERT INTO #pytest_fetch_parity VALUES (?, ?, ?)", batch)
db_connection.commit()
print(f"[OK] Inserted {num_rows} rows for parity test")

# Path A: fetchone() loop
cursor.execute("SELECT id, val, num FROM #pytest_fetch_parity ORDER BY id")
fetchone_rows: List[Tuple] = []
while True:
row = cursor.fetchone()
if row is None:
break
fetchone_rows.append(row)

assert (
len(fetchone_rows) == num_rows
), f"fetchone loop got {len(fetchone_rows)} rows, expected {num_rows}"
print(f"[OK] fetchone loop: {len(fetchone_rows)} rows collected")

# Path B: fetchall()
cursor.execute("SELECT id, val, num FROM #pytest_fetch_parity ORDER BY id")
fetchall_rows = cursor.fetchall()

assert (
len(fetchall_rows) == num_rows
), f"fetchall got {len(fetchall_rows)} rows, expected {num_rows}"
print(f"[OK] fetchall: {len(fetchall_rows)} rows collected")

# Row-by-row comparison
for i in range(num_rows):
fo = fetchone_rows[i]
fa = fetchall_rows[i]
assert fo[0] == fa[0] == i, f"Row {i}: id mismatch (fetchone={fo[0]}, fetchall={fa[0]})"
assert (
fo[1] == fa[1] == f"V_{i}"
), f"Row {i}: val mismatch (fetchone={fo[1]!r}, fetchall={fa[1]!r})"
assert (
fo[2] == fa[2] == i * 3
), f"Row {i}: num mismatch (fetchone={fo[2]}, fetchall={fa[2]})"

print(f"[OK] fetchone/fetchall parity: {num_rows} rows identical across both fetch paths")

except Exception as e:
pytest.fail(f"fetchone vs fetchall parity test failed: {e}")
finally:
drop_table_if_exists(cursor, "#pytest_fetch_parity")
db_connection.commit()
Loading
Loading