Source code for rdflib.plugins.shared.jsonld.util

# https://github.com/RDFLib/rdflib-jsonld/blob/feature/json-ld-1.1/rdflib_jsonld/util.py
from __future__ import annotations

import json
import pathlib
from html.parser import HTMLParser
from io import StringIO, TextIOBase, TextIOWrapper
from typing import IO, TYPE_CHECKING, Any, Dict, List, Optional, TextIO, Tuple, Union

if TYPE_CHECKING:
    import json
else:
    try:
        import json

        assert json  # workaround for pyflakes issue #13
    except ImportError:
        import simplejson as json

from posixpath import normpath, sep
from typing import TYPE_CHECKING, cast
from urllib.parse import urljoin, urlsplit, urlunsplit

try:
    import orjson

    _HAS_ORJSON = True
except ImportError:
    orjson = None  # type: ignore[assignment, unused-ignore]
    _HAS_ORJSON = False


from rdflib.parser import (
    BytesIOWrapper,
    InputSource,
    PythonInputSource,
    StringInputSource,
    URLInputSource,
    create_input_source,
)


[docs] def source_to_json( source: Optional[ Union[IO[bytes], TextIO, InputSource, str, bytes, pathlib.PurePath] ], fragment_id: Optional[str] = None, extract_all_scripts: Optional[bool] = False, ) -> Tuple[Union[Dict, List[Dict]], Any]: """Extract JSON from a source document. The source document can be JSON or HTML with embedded JSON script elements (type attribute = "application/ld+json"). To process as HTML ``source.content_type`` must be set to "text/html" or "application/xhtml+xml". :param source: the input source document (JSON or HTML) :param fragment_id: if source is an HTML document then extract only the script element with matching id attribute, defaults to None :param extract_all_scripts: if source is an HTML document then extract all script elements (unless fragment_id is provided), defaults to False (extract only the first script element) :return: Tuple with the extracted JSON document and value of the HTML base element """ if isinstance(source, PythonInputSource): return source.data, None if isinstance(source, StringInputSource): # A StringInputSource is assumed to be never a HTMLJSON doc html_base: Any = None # We can get the original string from the StringInputSource # It's hidden in the BytesIOWrapper 'wrapped' attribute b_stream = source.getByteStream() original_string: Optional[str] = None json_dict: Union[Dict, List[Dict]] if isinstance(b_stream, BytesIOWrapper): wrapped_inner = cast(Union[str, StringIO, TextIOBase], b_stream.wrapped) if isinstance(wrapped_inner, str): original_string = wrapped_inner elif isinstance(wrapped_inner, StringIO): original_string = wrapped_inner.getvalue() if _HAS_ORJSON: if original_string is not None: json_dict = orjson.loads(original_string) elif isinstance(b_stream, BytesIOWrapper): # use the CharacterStream instead c_stream = source.getCharacterStream() json_dict = orjson.loads(c_stream.read()) else: # orjson assumes its in utf-8 encoding so # don't bother to check the source.getEncoding() json_dict = orjson.loads(b_stream.read()) else: if original_string is not None: json_dict = json.loads(original_string) else: json_dict = json.load(source.getCharacterStream()) return json_dict, html_base # TODO: conneg for JSON (fix support in rdflib's URLInputSource!) source = create_input_source(source, format="json-ld") try: content_type = source.content_type except (AttributeError, LookupError): content_type = None is_html = content_type is not None and content_type.lower() in ( "text/html", "application/xhtml+xml", ) if is_html: html_docparser: Optional[HTMLJSONParser] = HTMLJSONParser( fragment_id=fragment_id, extract_all_scripts=extract_all_scripts ) else: html_docparser = None try: b_stream = source.getByteStream() except (AttributeError, LookupError): b_stream = None try: c_stream = source.getCharacterStream() except (AttributeError, LookupError): c_stream = None if b_stream is None and c_stream is None: raise ValueError( f"Source does not have a character stream or a byte stream and cannot be used {type(source)}" ) try: b_encoding: Optional[str] = None if b_stream is None else source.getEncoding() except (AttributeError, LookupError): b_encoding = None underlying_string: Optional[str] = None if b_stream is not None and isinstance(b_stream, BytesIOWrapper): # Try to find an underlying wrapped Unicode string to use? wrapped_inner = b_stream.wrapped if isinstance(wrapped_inner, str): underlying_string = wrapped_inner elif isinstance(wrapped_inner, StringIO): underlying_string = wrapped_inner.getvalue() try: if is_html and html_docparser is not None: # Offload parsing to the HTMLJSONParser if underlying_string is not None: html_string: str = underlying_string elif c_stream is not None: html_string = c_stream.read() else: if TYPE_CHECKING: assert b_stream is not None if b_encoding is None: b_encoding = "utf-8" html_string = TextIOWrapper(b_stream, encoding=b_encoding).read() html_docparser.feed(html_string) json_dict, html_base = html_docparser.get_json(), html_docparser.get_base() elif _HAS_ORJSON: html_base = None if underlying_string is not None: json_dict = orjson.loads(underlying_string) elif ( (b_stream is not None and isinstance(b_stream, BytesIOWrapper)) or b_stream is None ) and c_stream is not None: # use the CharacterStream instead json_dict = orjson.loads(c_stream.read()) else: if TYPE_CHECKING: assert b_stream is not None # b_stream is not None json_dict = orjson.loads(b_stream.read()) else: html_base = None if underlying_string is not None: return json.loads(underlying_string) if c_stream is not None: use_stream = c_stream else: if TYPE_CHECKING: assert b_stream is not None # b_stream is not None if b_encoding is None: b_encoding = "utf-8" use_stream = TextIOWrapper(b_stream, encoding=b_encoding) json_dict = json.load(use_stream) return json_dict, html_base finally: if b_stream is not None: try: b_stream.close() except AttributeError: pass if c_stream is not None: try: c_stream.close() except AttributeError: pass
VOCAB_DELIMS = ("#", "/", ":")
[docs] def split_iri(iri: str) -> Tuple[str, Optional[str]]: for delim in VOCAB_DELIMS: at = iri.rfind(delim) if at > -1: return iri[: at + 1], iri[at + 1 :] return iri, None
[docs] def norm_url(base: str, url: str) -> str: """ >>> norm_url('http://example.org/', '/one') 'http://example.org/one' >>> norm_url('http://example.org/', '/one#') 'http://example.org/one#' >>> norm_url('http://example.org/one', 'two') 'http://example.org/two' >>> norm_url('http://example.org/one/', 'two') 'http://example.org/one/two' >>> norm_url('http://example.org/', 'http://example.net/one') 'http://example.net/one' >>> norm_url('http://example.org/', 'http://example.org//one') 'http://example.org//one' """ if "://" in url: return url # Fix for URNs parsed_base = urlsplit(base) parsed_url = urlsplit(url) if parsed_url.scheme: # Assume full URL return url if parsed_base.scheme in ("urn", "urn-x"): # No scheme -> assume relative and join paths base_path_parts = parsed_base.path.split("/", 1) base_path = "/" + (base_path_parts[1] if len(base_path_parts) > 1 else "") joined_path = urljoin(base_path, parsed_url.path) fragment = f"#{parsed_url.fragment}" if parsed_url.fragment else "" result = f"{parsed_base.scheme}:{base_path_parts[0]}{joined_path}{fragment}" else: parts = urlsplit(urljoin(base, url)) path = normpath(parts[2]) if sep != "/": path = "/".join(path.split(sep)) if parts[2].endswith("/") and not path.endswith("/"): path += "/" result = urlunsplit(parts[0:2] + (path,) + parts[3:]) if url.endswith("#") and not result.endswith("#"): result += "#" return result
# type error: Missing return statement
[docs] def context_from_urlinputsource(source: URLInputSource) -> Optional[str]: # type: ignore[return] """ Please note that JSON-LD documents served with the application/ld+json media type MUST have all context information, including references to external contexts, within the body of the document. Contexts linked via a http://www.w3.org/ns/json-ld#context HTTP Link Header MUST be ignored for such documents. """ if source.content_type != "application/ld+json": try: # source.links is the new way of getting Link headers from URLInputSource links = source.links except AttributeError: # type error: Return value expected return # type: ignore[return-value] for link in links: if ' rel="http://www.w3.org/ns/json-ld#context"' in link: i, j = link.index("<"), link.index(">") if i > -1 and j > -1: # type error: Value of type variable "AnyStr" of "urljoin" cannot be "Optional[str]" return urljoin(source.url, link[i + 1 : j]) # type: ignore[type-var]
__all__ = [ "json", "source_to_json", "split_iri", "norm_url", "context_from_urlinputsource", "orjson", "_HAS_ORJSON", ] class HTMLJSONParser(HTMLParser): def __init__( self, fragment_id: Optional[str] = None, extract_all_scripts: Optional[bool] = False, ): super().__init__() self.fragment_id = fragment_id self.json: List[Dict] = [] self.contains_json = False self.fragment_id_does_not_match = False self.base = None self.extract_all_scripts = extract_all_scripts self.script_count = 0 def handle_starttag(self, tag, attrs): self.contains_json = False self.fragment_id_does_not_match = False # Only set self. contains_json to True if the # type is 'application/ld+json' if tag == "script": for attr, value in attrs: if attr == "type" and value == "application/ld+json": self.contains_json = True elif attr == "id" and self.fragment_id and value != self.fragment_id: self.fragment_id_does_not_match = True elif tag == "base": for attr, value in attrs: if attr == "href": self.base = value def handle_data(self, data): # Only do something when we know the context is a # script element containing application/ld+json if self.contains_json is True and self.fragment_id_does_not_match is False: if not self.extract_all_scripts and self.script_count > 0: return if data.strip() == "": # skip empty data elements return # Try to parse the json if _HAS_ORJSON: # orjson can load a unicode string # if that's the only thing we have, # its not worth encoding it to bytes parsed = orjson.loads(data) else: parsed = json.loads(data) # Add to the result document if isinstance(parsed, list): self.json.extend(parsed) else: self.json.append(parsed) self.script_count += 1 def get_json(self) -> List[Dict]: return self.json def get_base(self): return self.base