+
Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 50 additions & 2 deletions src/omcp/sql_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import sqlglot.expressions as exp
import typing as t
import omcp.exceptions as ex
from sqlglot.optimizer.scope import build_scope

OMOP_TABLES = [
"care_site",
Expand Down Expand Up @@ -91,12 +92,31 @@ def _check_is_select_query(
"Only SELECT statements are allowed for security reasons."
)

def _check_is_omop_table(self, tables: t.List[exp.Table]) -> ex.TableNotFoundError:
def _check_is_omop_table(self, parsed_sql: exp.Expression) -> ex.TableNotFoundError:
"""
Check if all real table references in the query are OMOP CDM tables and
ignores CTEs (defined in WITH clauses).

Args:
parsed_sql (exp.Expression): The parsed SQL expression.

Return:
TableNotFoundError: If any non-OMOP tables are found.
"""
root = build_scope(parsed_sql)
tables = [
source
for scope in root.traverse()
for alias, (node, source) in scope.selected_sources.items()
if isinstance(source, exp.Table)
]

not_omop_tables = [
table.name.lower()
for table in tables
if table.name.lower() not in OMOP_TABLES
]

if not_omop_tables:
return ex.TableNotFoundError(
f"Tables not found in OMOP CDM: {', '.join(not_omop_tables)}"
Expand Down Expand Up @@ -216,7 +236,7 @@ def validate_sql(self, sql: str):
errors.append(ex.ColumnNotFoundError("No columns found in the query."))

# Check is OMOP table
errors.append(self._check_is_omop_table(tables))
errors.append(self._check_is_omop_table(parsed_sql))

# Check for excluded tables
errors.append(self._check_unauthorized_tables(tables))
Expand All @@ -234,3 +254,31 @@ def validate_sql(self, sql: str):
finally:
errors = list(filter(None, errors)) # Remove None values from the list
return errors


if __name__ == "__main__":
query = """
WITH lisinopril_patients AS (
SELECT DISTINCT person_id
FROM base.drug_exposure d
JOIN base.concept c ON d.drug_concept_id = c.concept_id
WHERE c.concept_name LIKE '%lisinopril%'
)SELECT
c.concept_name as condition_name,
COUNT(DISTINCT co.person_id) as patient_count,
COUNT(*) as occurrence_count
FROM
base.condition_occurrence co
JOIN
base.concept c ON co.condition_concept_id = c.concept_id
JOIN
lisinopril_patients lp ON co.person_id = lp.person_id
GROUP BY
c.concept_name
ORDER BY
patient_count DESC, occurrence_count DESC
LIMIT 20
"""
sql_validator = SQLValidator( allow_source_value_columns=False, exclude_tables=None, exclude_columns=None)
sql_validator.validate_sql(query)

Loading
点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载