import os import logging import pandas as pd import rdflib from rdflib import Namespace, Literal, BNode, RDF, RDFS from pyshacl import validate # Set up basic logging (use DEBUG level to see detailed output) logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s: %(message)s') BASE_DIR = os.path.join(os.path.dirname(__file__), "MonographDCTAP") TSV_FILES = [ "MonographDCTAP/Monograph_Work_Text.tsv", "MonographDCTAP/Monograph_AdminMetadata.tsv", "MonographDCTAP/Monograph_Instance_Print.tsv", "electronic_MonographDCTAP/Monograph_Instance_Electronic.tsv", ] PREFIX_FILE = "./MonographDCTAP/Monograph_Prefixes.tsv" # Add a global constant for fixed prefixes. FIXED_PREFIXES = { "bf": "http://id.loc.gov/ontologies/bibframe/", "bflc": "http://id.loc.gov/ontologies/bflc/", "rdfs": "http://www.w3.org/2000/01/rdf-schema#", "big": "https://example.org/" } # Replace load_prefixes() with a simplified function: def load_prefixes(prefixes_file): logging.info("Using hardcoded prefixes:") for p, ns in FIXED_PREFIXES.items(): logging.info(f"{p} -> {ns}") return FIXED_PREFIXES # Optionally simplify register_prefixes and _bind_namespaces: def register_prefixes(graph, prefixes): for prefix, uri in prefixes.items(): graph.bind(prefix, Namespace(uri), override=True) def _bind_namespaces(graph: rdflib.Graph): # Hard-code the fixed namespaces as well. graph.namespace_manager.bind("bf", Namespace(FIXED_PREFIXES["bf"])) graph.namespace_manager.bind("bflc", Namespace(FIXED_PREFIXES["bflc"])) graph.namespace_manager.bind("rdfs", Namespace(FIXED_PREFIXES["rdfs"])) graph.namespace_manager.bind("big", Namespace(FIXED_PREFIXES["big"])) def _prop_id_to_uri(property_id, prefixes): if ":" in property_id: prefix, suffix = property_id.split(":", 1) ns = prefixes.get(prefix.strip()) if ns: return rdflib.URIRef(ns + suffix.strip()) if property_id.startswith("http"): return rdflib.URIRef(property_id) return Literal(property_id) def add_shape_from_row(graph, row, prefixes): shape_uri = rdflib.URIRef(row['shapeID']) logging.info(f"Processing shape: {shape_uri}") if (shape_uri, RDF.type, rdflib.URIRef("http://www.w3.org/ns/shacl#NodeShape")) not in graph: graph.add((shape_uri, RDF.type, rdflib.URIRef("http://www.w3.org/ns/shacl#NodeShape"))) graph.add((shape_uri, RDFS.label, Literal(row['shapeLabel']))) logging.info(f"Added NodeShape: {shape_uri} with label {row['shapeLabel']}") targets = [t.strip() for t in str(row['target']).split(";")] for target in targets: target_uri = _prop_id_to_uri(target, prefixes) graph.add((shape_uri, rdflib.URIRef("http://www.w3.org/ns/shacl#targetClass"), target_uri)) logging.info(f"Added target '{target_uri}' to shape {shape_uri}") # If the property is mandatory, add a SPARQLTarget to force evaluation of nodes missing the property. if str(row['mandatory']).strip().lower() == "true": property_uri = _prop_id_to_uri(row['propertyID'], prefixes) target_uris = [ _prop_id_to_uri(t, prefixes) for t in targets ] union_clause = " UNION ".join([f"{{ ?this a <{uri}> }}" for uri in target_uris]) query = f"SELECT ?this WHERE {{ {union_clause} FILTER NOT EXISTS {{ ?this <{property_uri}> ?o }} }}" bnode = BNode() sh = rdflib.URIRef("http://www.w3.org/ns/shacl#") # Use RDF.type triple to mark the bnode as a SPARQLTarget graph.add((bnode, RDF.type, rdflib.URIRef("http://www.w3.org/ns/shacl#SPARQLTarget"))) graph.add((bnode, rdflib.URIRef("http://www.w3.org/ns/shacl#select"), Literal(query))) graph.add((shape_uri, rdflib.URIRef("http://www.w3.org/ns/shacl#target"), bnode)) logging.info(f"Added SPARQLTarget with query: {query} to shape {shape_uri}") property_bnode = BNode() graph.add((shape_uri, rdflib.URIRef("http://www.w3.org/ns/shacl#property"), property_bnode)) graph.add((property_bnode, RDF.type, rdflib.URIRef("http://www.w3.org/ns/shacl#PropertyShape"))) graph.add((property_bnode, RDFS.label, Literal(row['propertyLabel']))) path_uri = _prop_id_to_uri(row['propertyID'], prefixes) graph.add((property_bnode, rdflib.URIRef("http://www.w3.org/ns/shacl#path"), path_uri)) logging.info(f"Added property shape for property {row['propertyID']} with label {row['propertyLabel']}") if str(row['mandatory']).strip().lower() == "true": graph.add((property_bnode, rdflib.URIRef("http://www.w3.org/ns/shacl#minCount"), Literal(1))) logging.info(f"Set minCount 1 for property {row['propertyID']}") if str(row['repeatable']).strip().lower() == "false": graph.add((property_bnode, rdflib.URIRef("http://www.w3.org/ns/shacl#maxCount"), Literal(1))) logging.info(f"Set maxCount 1 for property {row['propertyID']}") severity = str(row.get("severity", "")).strip() if severity: sev_ns = rdflib.URIRef("http://www.w3.org/ns/shacl#") if severity == "Violation": graph.add((property_bnode, rdflib.URIRef("http://www.w3.org/ns/shacl#severity"), rdflib.URIRef(sev_ns + "Violation"))) elif severity == "Warning": graph.add((property_bnode, rdflib.URIRef("http://www.w3.org/ns/shacl#severity"), rdflib.URIRef(sev_ns + "Warning"))) else: graph.add((property_bnode, rdflib.URIRef("http://www.w3.org/ns/shacl#severity"), rdflib.URIRef(sev_ns + "Info"))) logging.info(f"Set severity {severity} for property {row['propertyID']}") if pd.notna(row.get("valueShape")) and row["valueShape"].strip(): value_shape_uri = _prop_id_to_uri(row["valueShape"], prefixes) graph.add((property_bnode, rdflib.URIRef("http://www.w3.org/ns/shacl#node"), value_shape_uri)) logging.info(f"Linked valueShape {value_shape_uri} for property {row['propertyID']}") return graph def build_shacl_graphs(): logging.info("Building individual SHACL graphs from TSV files") module_graphs = {} # Initialize the dictionary for module graphs prefixes = load_prefixes(PREFIX_FILE) for tsv in TSV_FILES: tsv_path = tsv # already an absolute path if not os.path.exists(tsv_path): logging.error(f"TSV file not found: {tsv_path}") logging.info(f"Processing TSV file: {tsv_path}") graph = rdflib.Graph() register_prefixes(graph, prefixes) _bind_namespaces(graph) # Bind fixed namespaces for the SHACL graph df = pd.read_csv(tsv_path, sep='\t', comment='/') for _, row in df.iterrows(): if pd.isna(row.get("shapeID")): continue add_shape_from_row(graph, row, prefixes) module_graphs[tsv] = graph logging.info("Completed building individual SHACL graphs") return module_graphs def parse_results_text(results_text: str) -> str: """ Parse and reformat raw results_text for nicer display. Adjust the logic to suit your output format. """ lines = results_text.strip().splitlines() formatted_lines = [] for line in lines: line = line.strip() if line.startswith("==="): # Start of a module section formatted_lines.append("\n" + line) elif line.startswith("Validation Result"): # Start a new violation formatted_lines.append("\n" + line) else: formatted_lines.append("\t" + line) return "\n".join(formatted_lines) def validate_rdf(rdf_data, template): logging.info("Starting validation") data_graph = rdflib.Graph() logging.info("Parsing RDF data") try: data_graph.parse(data=rdf_data, format='xml') except Exception as e: logging.error(f"Error parsing RDF data: {e}") raise e logging.info(f"Data graph has {len(data_graph)} triples.") # Bind known namespaces explicitly from the input RDF/XML namespaces = { "bf": "http://id.loc.gov/ontologies/bibframe/", "bflc": "http://id.loc.gov/ontologies/bflc/", "bfsimple": "http://id.loc.gov/ontologies/bfsimple/", "cc": "http://creativecommons.org/ns#", "datatypes": "http://id.loc.gov/datatypes/", "dcterms": "http://purl.org/dc/terms/", "foaf": "http://xmlns.com/foaf/0.1/", "lcc": "http://id.loc.gov/ontologies/lcc#", "lclocal": "http://id.loc.gov/ontologies/lclocal/", "madsrdf": "http://www.loc.gov/mads/rdf/v1#", "mnotetype": "http://id.loc.gov/vocabulary/mnotetype/", "mstatus": "https://id.loc.gov/vocabulary/mstatus/", "owl": "http://www.w3.org/2002/07/owl#", "pmo": "http://performedmusicontology.org/ontology/", "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#", "rdfs": "http://www.w3.org/2000/01/rdf-schema#", "skos": "http://www.w3.org/2004/02/skos/core#", "vartitletype": "http://id.loc.gov/vocabulary/vartitletype/", "void": "http://rdfs.org/ns/void#", "xsd": "http://www.w3.org/2001/XMLSchema#" } for prefix, uri in namespaces.items(): data_graph.bind(prefix, uri) logging.info(f"Data graph has {len(data_graph)} triples.") for s, p, o in list(data_graph)[:10]: logging.debug(f"Parsed triple: {s} {p} {o}") # New: Log the full RDF graph in turtle format serialized_graph = data_graph.serialize(format='turtle') logging.info("Full RDF graph:\n" + (serialized_graph.decode('utf-8') if isinstance(serialized_graph, bytes) else serialized_graph)) # Extra debugging: log all rdf:type values from the data graph classes = set() for s, o in data_graph.subject_objects(RDF.type): classes.add(o) logging.debug(f"Data graph contains these types: {list(classes)}") # === Added debugging to check expected target class URIs === prefixes = load_prefixes(PREFIX_FILE) # List your expected target class identifiers as they are used in your TSV expected_targets = ["https:Agent", "big:Contribution"] expanded_targets = [ _prop_id_to_uri(t, prefixes) for t in expected_targets ] logging.debug(f"Expected target classes per TSV: {expanded_targets}") if template.lower() == 'monograph': logging.info("Using Monograph template; processing individual TSV modules") module_graphs = build_shacl_graphs() # Debug: inspect declared target classes in each module and query focus nodes. for tsv, module in module_graphs.items(): logging.debug(f"Module {tsv} declared targets:") for shape in module.subjects(RDF.type, rdflib.URIRef("http://www.w3.org/ns/shacl#NodeShape")): for target in module.objects(shape, rdflib.URIRef("http://www.w3.org/ns/shacl#targetClass")): logging.debug(f"Shape {shape} declares target: {target}") q = f"SELECT ?x WHERE {{ ?x a <{target}> . }}" matches = list(data_graph.query(q)) logging.debug(f"Found {len(matches)} focus node(s) for target {target}") for match in matches: logging.debug(f"Focus node: {match.x}") all_results = [] overall_conforms = True for tsv, graph in module_graphs.items(): shacl_text = graph.serialize(format='turtle') logging.info(f"Module {tsv} SHACL shapes:") logging.info(shacl_text.decode('utf-8') if isinstance(shacl_text, bytes) else shacl_text) conforms, results_graph, results_text = validate(data_graph, shacl_graph=graph, inference='rdfs', debug=True) # Override conform status if any violation has severity sh:Violation. violation_query = """ PREFIX sh: SELECT ?severity WHERE { ?vr a sh:ValidationResult ; sh:resultSeverity ?severity . } """ severities = [str(row.severity) for row in results_graph.query(violation_query)] module_conforms = False if any("http://www.w3.org/ns/shacl#Violation" in s for s in severities) else True logging.info(f"Module {tsv} - Overridden Conforms: {module_conforms}") # Build a nicely formatted summary of the results. query_formatted = """ PREFIX sh: SELECT ?component ?severity ?sourceShape ?focus ?resultPath ?message WHERE { ?vr a sh:ValidationResult ; sh:sourceConstraintComponent ?component ; sh:resultSeverity ?severity ; sh:sourceShape ?sourceShape ; sh:focusNode ?focus ; sh:resultPath ?resultPath ; sh:resultMessage ?message . } ORDER BY ?component """ formatted_results = "" count = 0 for row in results_graph.query(query_formatted): count += 1 formatted_results += f"Validation Result in {row.component}:\n" formatted_results += f"\tSeverity: {row.severity}\n" formatted_results += f"\tSource Shape: {row.sourceShape}\n" formatted_results += f"\tFocus Node: {row.focus}\n" formatted_results += f"\tResult Path: {row.resultPath}\n" formatted_results += f"\tMessage: {row.message}\n" formatted_results = f"Results ({count}):\n" + formatted_results # Assemble module output. module_output = ( f"\n=== Module: {tsv} ===\n" f"Overridden Conforms: {module_conforms}\n" f"{formatted_results}\n" "------------------------\n" ) all_results.append(module_output) if not module_conforms: overall_conforms = False combined_results = "\n".join(all_results) # Optionally, parse the combined results for easier display. combined_results = parse_results_text(combined_results) return overall_conforms, combined_results else: logging.info("Using default SHACL template") shacl_text = """ @prefix sh: . @prefix ex: . ex:DefaultShape a sh:NodeShape ; sh:targetNode ex:SomeNode ; sh:property [ sh:path ex:someProperty ; sh:datatype xsd:string ; ] . """ shacl_graph = rdflib.Graph() shacl_graph.parse(data=shacl_text, format='turtle') conforms, results_graph, results_text = validate(data_graph, shacl_graph=shacl_graph, inference='rdfs', debug=True) logging.info(f"Validation completed; Conforms: {conforms}") logging.info("Results text:") logging.info(results_text) serialized_results = results_graph.serialize(format='turtle') logging.info("Detailed results graph:") logging.info(serialized_results.decode('utf-8') if isinstance(serialized_results, bytes) else serialized_results) combined_results = (f"{results_text.strip()}\nDetailed Results:\n" f"{serialized_results.decode('utf-8') if isinstance(serialized_results, bytes) else serialized_results}") return conforms, combined_results