diff --git a/mssql_python/cursor.py b/mssql_python/cursor.py index ba5065d5..9841a673 100644 --- a/mssql_python/cursor.py +++ b/mssql_python/cursor.py @@ -1638,6 +1638,14 @@ def fetchall_with_mapping(): self.fetchmany = fetchmany_with_mapping self.fetchall = fetchall_with_mapping + # Initialize rownumber tracking so fetchone() and iteration work + self._reset_rownumber() + + # Metadata methods produce a new result set, so rowcount is unknown + # until rows are fetched. Reset it to avoid exposing a stale value + # from a previous statement. + self.rowcount = -1 + # Return the cursor itself for method chaining return self diff --git a/tests/test_004_cursor.py b/tests/test_004_cursor.py index 6ac15738..fe3436d1 100644 --- a/tests/test_004_cursor.py +++ b/tests/test_004_cursor.py @@ -15961,3 +15961,152 @@ def reader(reader_id): finally: stop_event.set() mssql_python.native_uuid = original + + +@pytest.fixture(scope="module") +def catalog_fetch_schema(cursor, db_connection): + """Create and tear down test objects for catalog fetchone/iteration testing.""" + cursor.execute( + "IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = 'pytest_cat_fetch') " + "EXEC('CREATE SCHEMA pytest_cat_fetch')" + ) + cursor.execute("DROP TABLE IF EXISTS pytest_cat_fetch.fetch_test_child") + cursor.execute("DROP TABLE IF EXISTS pytest_cat_fetch.fetch_test") + cursor.execute("DROP PROCEDURE IF EXISTS pytest_cat_fetch.fetch_test_proc") + + cursor.execute(""" + CREATE TABLE pytest_cat_fetch.fetch_test ( + id INT PRIMARY KEY, + name VARCHAR(100) NOT NULL, + value DECIMAL(10,2), + ts DATETIME DEFAULT GETDATE() + ) + """) + cursor.execute(""" + CREATE TABLE pytest_cat_fetch.fetch_test_child ( + child_id INT PRIMARY KEY, + parent_id INT NOT NULL, + CONSTRAINT fk_parent FOREIGN KEY (parent_id) + REFERENCES pytest_cat_fetch.fetch_test(id) + ) + """) + cursor.execute(""" + CREATE PROCEDURE pytest_cat_fetch.fetch_test_proc + AS + BEGIN + SELECT 1 AS result + END + """) + db_connection.commit() + + yield + + cursor.execute("DROP PROCEDURE IF EXISTS pytest_cat_fetch.fetch_test_proc") + cursor.execute("DROP TABLE IF EXISTS pytest_cat_fetch.fetch_test_child") + cursor.execute("DROP TABLE IF EXISTS pytest_cat_fetch.fetch_test") + cursor.execute("DROP SCHEMA IF EXISTS pytest_cat_fetch") + db_connection.commit() + + +def test_tables_fetchone(cursor, db_connection, catalog_fetch_schema): + """Test that fetchone() works on tables() result set (GH-505)""" + cursor.tables(table="fetch_test", schema="pytest_cat_fetch") + row = cursor.fetchone() + assert row is not None, "fetchone() should return a row" + assert row.table_name.lower() == "fetch_test" + assert row.table_schem.lower() == "pytest_cat_fetch" + assert cursor.fetchone() is None + + +def test_tables_iteration(cursor, db_connection, catalog_fetch_schema): + """Test that 'for row in cursor.tables()' works (GH-505)""" + rows = list(cursor.tables(table="fetch_test", schema="pytest_cat_fetch")) + assert len(rows) == 1, "Iteration should yield 1 row" + assert rows[0].table_name.lower() == "fetch_test" + + +def test_columns_fetchone(cursor, db_connection, catalog_fetch_schema): + """Test that fetchone() works on columns() result set (GH-505)""" + cursor.columns(table="fetch_test", schema="pytest_cat_fetch") + row = cursor.fetchone() + assert row is not None, "fetchone() should return a row from columns()" + assert hasattr(row, "column_name") + assert row.table_name.lower() == "fetch_test" + + +def test_primarykeys_fetchone(cursor, db_connection, catalog_fetch_schema): + """Test that fetchone() works on primaryKeys() result set (GH-505)""" + cursor.primaryKeys(table="fetch_test", schema="pytest_cat_fetch") + row = cursor.fetchone() + assert row is not None, "fetchone() should return a row from primaryKeys()" + assert row.column_name.lower() == "id" + assert cursor.fetchone() is None + + +def test_foreignkeys_fetchone(cursor, db_connection, catalog_fetch_schema): + """Test that fetchone() works on foreignKeys() result set (GH-505)""" + cursor.foreignKeys( + table="fetch_test_child", + schema="pytest_cat_fetch", + ) + row = cursor.fetchone() + assert row is not None, "fetchone() should return a row from foreignKeys()" + assert row.pkcolumn_name.lower() == "id" + assert row.fkcolumn_name.lower() == "parent_id" + assert cursor.fetchone() is None + + +def test_statistics_fetchone(cursor, db_connection, catalog_fetch_schema): + """Test that fetchone() works on statistics() result set (GH-505)""" + cursor.statistics(table="fetch_test", schema="pytest_cat_fetch") + row = cursor.fetchone() + assert row is not None, "fetchone() should return a row from statistics()" + assert row.table_name.lower() == "fetch_test" + + +def test_procedures_fetchone(cursor, db_connection, catalog_fetch_schema): + """Test that fetchone() works on procedures() result set (GH-505)""" + cursor.procedures(procedure="fetch_test_proc", schema="pytest_cat_fetch") + row = cursor.fetchone() + assert row is not None, "fetchone() should return a row from procedures()" + assert row.procedure_name.lower().startswith("fetch_test_proc") + assert cursor.fetchone() is None + + +def test_rowid_columns_fetchone(cursor, db_connection, catalog_fetch_schema): + """Test that fetchone() works on rowIdColumns() result set (GH-505)""" + cursor.rowIdColumns(table="fetch_test", schema="pytest_cat_fetch") + # May or may not have rowid columns; just verify no InterfaceError + row = cursor.fetchone() + if row is not None: + assert hasattr(row, "column_name") + + +def test_rowver_columns_fetchone(cursor, db_connection, catalog_fetch_schema): + """Test that fetchone() works on rowVerColumns() result set (GH-505)""" + cursor.rowVerColumns(table="fetch_test", schema="pytest_cat_fetch") + # May or may not have rowver columns; just verify no InterfaceError + row = cursor.fetchone() + if row is not None: + assert hasattr(row, "column_name") + + +def test_gettypeinfo_fetchone(cursor, db_connection, catalog_fetch_schema): + """Test that fetchone() works on getTypeInfo() result set (GH-505)""" + cursor.getTypeInfo() + row = cursor.fetchone() + assert row is not None, "fetchone() should return a row from getTypeInfo()" + assert hasattr(row, "type_name") + + +def test_catalog_rownumber_increments_correctly(cursor, db_connection, catalog_fetch_schema): + """Test that rownumber increments correctly during fetchone() on catalog results (GH-505)""" + cursor.columns(table="fetch_test", schema="pytest_cat_fetch") + assert cursor.rownumber == -1 + + for expected_idx in range(4): + row = cursor.fetchone() + assert row is not None, f"Expected row at index {expected_idx}" + assert cursor.rownumber == expected_idx + + assert cursor.fetchone() is None