"""
Parser plugin interface.
This module defines the parser plugin interface and contains other
related parser support code.
The module is mainly useful for those wanting to write a parser that
can plugin to rdflib. If you are wanting to invoke a parser you likely
want to do so through the Graph class parse method.
"""
from __future__ import annotations
import codecs
import os
import pathlib
import sys
from io import BufferedIOBase, BytesIO, RawIOBase, StringIO, TextIOBase, TextIOWrapper
from typing import (
IO,
TYPE_CHECKING,
Any,
BinaryIO,
List,
Optional,
TextIO,
Tuple,
Union,
cast,
)
from urllib.parse import urljoin
from urllib.request import Request, url2pathname
from xml.sax import xmlreader
import rdflib.util
from rdflib import __version__
from rdflib._networking import _urlopen
from rdflib.namespace import Namespace
from rdflib.term import URIRef
if TYPE_CHECKING:
from email.message import Message
from io import BufferedReader
from urllib.response import addinfourl
from typing_extensions import Buffer
from rdflib.graph import Graph
__all__ = [
"Parser",
"InputSource",
"StringInputSource",
"URLInputSource",
"FileInputSource",
"PythonInputSource",
]
[docs]
class Parser:
__slots__ = ()
[docs]
def __init__(self):
pass
[docs]
def parse(self, source: InputSource, sink: Graph) -> None:
pass
class BytesIOWrapper(BufferedIOBase):
__slots__ = (
"wrapped",
"enc_str",
"text_str",
"encoding",
"encoder",
"has_read1",
"has_seek",
"_name",
"_fileno",
"_isatty",
"_leftover",
"_bytes_per_char",
"_text_bytes_offset",
)
def __init__(self, wrapped: Union[str, StringIO, TextIOBase], encoding="utf-8"):
super(BytesIOWrapper, self).__init__()
self.wrapped = wrapped
self.encoding = encoding
self.encoder = codecs.getencoder(self.encoding)
self.enc_str: Optional[Union[BytesIO, BufferedIOBase]] = None
self.text_str: Optional[Union[StringIO, TextIOBase]] = None
self.has_read1: Optional[bool] = None
self.has_seek: Optional[bool] = None
self._name: Optional[str] = None
self._fileno: Optional[Union[int, BaseException]] = None
self._isatty: Optional[Union[bool, BaseException]] = None
self._leftover: bytes = b""
self._text_bytes_offset: int = 0
norm_encoding = encoding.lower().replace("_", "-")
if norm_encoding in ("utf-8", "utf8", "u8", "cp65001"):
# utf-8 has a variable number of bytes per character, 1-4
self._bytes_per_char: int = 1 # assume average of 1 byte per character
elif norm_encoding in (
"latin1",
"latin-1",
"iso-8859-1",
"iso8859-1",
"ascii",
"us-ascii",
):
# these are all 1-byte-per-character encodings
self._bytes_per_char = 1
elif norm_encoding.startswith("utf-16") or norm_encoding.startswith("utf16"):
# utf-16 has a variable number of bytes per character, 2-3
self._bytes_per_char = 2 # assume average of 2 bytes per character
elif norm_encoding.startswith("utf-32") or norm_encoding.startswith("utf32"):
# utf-32 is fixed length with 4 bytes per character
self._bytes_per_char = 4
else:
# not sure, just assume it is 2 bytes per character
self._bytes_per_char = 2
def _init(self):
name: Optional[str] = None
if isinstance(self.wrapped, str):
b, blen = self.encoder(self.wrapped)
self.enc_str = BytesIO(b)
name = "string"
elif isinstance(self.wrapped, TextIOWrapper):
inner = self.wrapped.buffer
# type error: TextIOWrapper.buffer cannot be a BytesIOWrapper
if isinstance(inner, BytesIOWrapper): # type: ignore[unreachable]
raise Exception(
"BytesIOWrapper cannot be wrapped in TextIOWrapper, "
"then wrapped in another BytesIOWrapper"
)
else:
self.enc_str = cast(BufferedIOBase, inner)
elif isinstance(self.wrapped, (TextIOBase, StringIO)):
self.text_str = self.wrapped
use_stream: Union[BytesIO, StringIO, BufferedIOBase, TextIOBase]
if self.enc_str is not None:
use_stream = self.enc_str
elif self.text_str is not None:
use_stream = self.text_str
else:
raise Exception("No stream to read from")
if name is None:
try:
name = use_stream.name # type: ignore[union-attr]
except AttributeError:
name = "stream"
self.has_read1 = hasattr(use_stream, "read1")
try:
self.has_seek = use_stream.seekable()
except AttributeError:
self.has_seek = hasattr(use_stream, "seek")
self._name = name
def _check_fileno(self):
use_stream: Union[BytesIO, StringIO, BufferedIOBase, TextIOBase]
if self.enc_str is None and self.text_str is None:
self._init()
if self.enc_str is not None:
use_stream = self.enc_str
elif self.text_str is not None:
use_stream = self.text_str
try:
self._fileno = use_stream.fileno()
except OSError as e:
self._fileno = e
except AttributeError:
self._fileno = -1
def _check_isatty(self):
use_stream: Union[BytesIO, StringIO, BufferedIOBase, TextIOBase]
if self.enc_str is None and self.text_str is None:
self._init()
if self.enc_str is not None:
use_stream = self.enc_str
elif self.text_str is not None:
use_stream = self.text_str
try:
self._isatty = use_stream.isatty()
except OSError as e:
self._isatty = e
except AttributeError:
self._isatty = False
@property
def name(self) -> Any:
if self._name is None:
self._init()
return self._name
@property
def closed(self) -> bool:
if self.enc_str is None and self.text_str is None:
return False
closed: Optional[bool] = None
if self.enc_str is not None:
try:
closed = self.enc_str.closed
except AttributeError:
closed = None
elif self.text_str is not None:
try:
closed = self.text_str.closed
except AttributeError:
closed = None
return False if closed is None else closed
def readable(self) -> bool:
return True
def writable(self) -> bool:
return False
def truncate(self, size: Optional[int] = None) -> int:
raise NotImplementedError("Cannot truncate on BytesIOWrapper")
def isatty(self) -> bool:
if self._isatty is None:
self._check_isatty()
if isinstance(self._isatty, BaseException):
raise self._isatty
else:
return bool(self._isatty)
def fileno(self) -> int:
if self._fileno is None:
self._check_fileno()
if isinstance(self._fileno, BaseException):
raise self._fileno
else:
return -1 if self._fileno is None else self._fileno
def close(self):
if self.enc_str is None and self.text_str is None:
return
if self.enc_str is not None:
try:
self.enc_str.close()
except AttributeError:
pass
elif self.text_str is not None:
try:
self.text_str.close()
except AttributeError:
pass
def flush(self):
return # Does nothing on read-only streams
def _read_bytes_from_text_stream(self, size: Optional[int] = -1, /) -> bytes:
if TYPE_CHECKING:
assert self.text_str is not None
if size is None or size < 0:
try:
ret_str: str = self.text_str.read()
except EOFError:
ret_str = ""
ret_encoded, enc_len = self.encoder(ret_str)
if self._leftover:
ret_bytes = self._leftover + ret_encoded
self._leftover = b""
else:
ret_bytes = ret_encoded
elif size == len(self._leftover):
ret_bytes = self._leftover
self._leftover = b""
elif size < len(self._leftover):
ret_bytes = self._leftover[:size]
self._leftover = self._leftover[size:]
else:
d, m = divmod(size, self._bytes_per_char)
get_per_loop = int(d) + (1 if m > 0 else 0)
got_bytes: bytes = self._leftover
while len(got_bytes) < size:
try:
got_str: str = self.text_str.read(get_per_loop)
except EOFError:
got_str = ""
if len(got_str) < 1:
break
ret_encoded, enc_len = self.encoder(got_str)
got_bytes += ret_encoded
if len(got_bytes) == size:
self._leftover = b""
ret_bytes = got_bytes
else:
ret_bytes = got_bytes[:size]
self._leftover = got_bytes[size:]
del got_bytes
self._text_bytes_offset += len(ret_bytes)
return ret_bytes
def read(self, size: Optional[int] = -1, /) -> bytes:
"""
Read at most size bytes, returned as a bytes object.
If the size argument is negative or omitted read until EOF is reached.
Return an empty bytes object if already at EOF.
"""
if size is not None and size == 0:
return b""
if self.enc_str is None and self.text_str is None:
self._init()
if self.enc_str is not None:
ret_bytes = self.enc_str.read(size)
else:
ret_bytes = self._read_bytes_from_text_stream(size)
return ret_bytes
def read1(self, size: Optional[int] = -1, /) -> bytes:
"""
Read at most size bytes, with at most one call to the underlying raw stream’s
read() or readinto() method. Returned as a bytes object.
If the size argument is negative or omitted, read until EOF is reached.
Return an empty bytes object at EOF.
"""
if (self.enc_str is None and self.text_str is None) or self.has_read1 is None:
self._init()
if not self.has_read1:
raise NotImplementedError()
if self.enc_str is not None:
if size is None or size < 0:
return self.enc_str.read1()
return self.enc_str.read1(size)
raise NotImplementedError("read1() not supported for TextIO in BytesIOWrapper")
def readinto(self, b: Buffer, /) -> int:
"""
Read len(b) bytes into buffer b.
Returns number of bytes read (0 for EOF), or error if the object
is set not to block and has no data to read.
"""
if TYPE_CHECKING:
assert isinstance(b, (memoryview, bytearray))
if len(b) == 0:
return 0
if self.enc_str is None and self.text_str is None:
self._init()
if self.enc_str is not None:
return self.enc_str.readinto(b)
else:
size = len(b)
read_data: bytes = self._read_bytes_from_text_stream(size)
read_len = len(read_data)
if read_len == 0:
return 0
b[:read_len] = read_data
return read_len
def readinto1(self, b: Buffer, /) -> int:
"""
Read len(b) bytes into buffer b, with at most one call to the underlying raw
stream's read() or readinto() method.
Returns number of bytes read (0 for EOF), or error if the object
is set not to block and has no data to read.
"""
if TYPE_CHECKING:
assert isinstance(b, (memoryview, bytearray))
if (self.enc_str is None and self.text_str is None) or self.has_read1 is None:
self._init()
if not self.has_read1:
raise NotImplementedError()
if self.enc_str is not None:
return self.enc_str.readinto1(b)
raise NotImplementedError(
"readinto1() not supported for TextIO in BytesIOWrapper"
)
def seek(self, offset: int, whence: int = 0, /) -> int:
if self.has_seek is not None and not self.has_seek:
raise NotImplementedError()
if (self.enc_str is None and self.text_str is None) or self.has_seek is None:
self._init()
if not whence == 0:
raise NotImplementedError("Only SEEK_SET is supported on BytesIOWrapper")
if offset != 0:
raise NotImplementedError(
"Only seeking to zero is supported on BytesIOWrapper"
)
if self.enc_str is not None:
self.enc_str.seek(offset, whence)
elif self.text_str is not None:
self.text_str.seek(offset, whence)
self._text_bytes_offset = 0
self._leftover = b""
return 0
def seekable(self):
if (self.enc_str is None and self.text_str is None) or self.has_seek is None:
self._init()
return self.has_seek
def tell(self) -> int:
if self.has_seek is not None and not self.has_seek:
raise NotImplementedError("Cannot tell() pos because file is not seekable.")
if self.enc_str is not None:
try:
self._text_bytes_offset = self.enc_str.tell()
except AttributeError:
pass
return self._text_bytes_offset
def write(self, b, /):
raise NotImplementedError("Cannot write to a BytesIOWrapper")
headers = {
"User-agent": "rdflib-%s (https://rdflib.github.io/; eikeon@eikeon.com)"
% __version__
}
def create_input_source(
source: Optional[
Union[IO[bytes], TextIO, InputSource, str, bytes, pathlib.PurePath]
] = None,
publicID: Optional[str] = None, # noqa: N803
location: Optional[str] = None,
file: Optional[Union[BinaryIO, TextIO]] = None,
data: Optional[Union[str, bytes, dict]] = None,
format: Optional[str] = None,
) -> InputSource:
"""
Return an appropriate InputSource instance for the given
parameters.
"""
# test that exactly one of source, location, file, and data is not None.
non_empty_arguments = list(
filter(
lambda v: v is not None,
[source, location, file, data],
)
)
if len(non_empty_arguments) != 1:
raise ValueError(
"exactly one of source, location, file or data must be given",
)
input_source = None
if source is not None:
if TYPE_CHECKING:
assert file is None
assert data is None
assert location is None
if isinstance(source, InputSource):
input_source = source
else:
if isinstance(source, str):
location = source
elif isinstance(source, pathlib.PurePath):
location = str(source)
elif isinstance(source, bytes):
data = source
elif hasattr(source, "read") and not isinstance(source, Namespace):
f = source
input_source = InputSource()
if hasattr(source, "encoding"):
input_source.setCharacterStream(source)
input_source.setEncoding(source.encoding)
try:
b = source.buffer # type: ignore[union-attr]
input_source.setByteStream(b)
except (AttributeError, LookupError):
input_source.setByteStream(source)
else:
input_source.setByteStream(f)
if f is sys.stdin:
input_source.setSystemId("file:///dev/stdin")
elif hasattr(f, "name"):
input_source.setSystemId(f.name)
else:
raise Exception(
"Unexpected type '%s' for source '%s'" % (type(source), source)
)
absolute_location = None # Further to fix for issue 130
auto_close = False # make sure we close all file handles we open
if location is not None:
if TYPE_CHECKING:
assert file is None
assert data is None
assert source is None
(
absolute_location,
auto_close,
file,
input_source,
) = _create_input_source_from_location(
file=file,
format=format,
input_source=input_source,
location=location,
)
if file is not None:
if TYPE_CHECKING:
assert location is None
assert data is None
assert source is None
input_source = FileInputSource(file)
if data is not None:
if TYPE_CHECKING:
assert location is None
assert file is None
assert source is None
if isinstance(data, dict):
input_source = PythonInputSource(data)
auto_close = True
elif isinstance(data, (str, bytes, bytearray)):
input_source = StringInputSource(data)
auto_close = True
else:
raise RuntimeError(f"parse data can only str, or bytes. not: {type(data)}")
if input_source is None:
raise Exception("could not create InputSource")
else:
input_source.auto_close |= auto_close
if publicID is not None: # Further to fix for issue 130
input_source.setPublicId(publicID)
# Further to fix for issue 130
elif input_source.getPublicId() is None:
input_source.setPublicId(absolute_location or "")
return input_source
def _create_input_source_from_location(
file: Optional[Union[BinaryIO, TextIO]],
format: Optional[str],
input_source: Optional[InputSource],
location: str,
) -> Tuple[URIRef, bool, Optional[Union[BinaryIO, TextIO]], Optional[InputSource]]:
# Fix for Windows problem https://github.com/RDFLib/rdflib/issues/145 and
# https://github.com/RDFLib/rdflib/issues/1430
# NOTE: using pathlib.Path.exists on a URL fails on windows as it is not a
# valid path. However os.path.exists() returns false for a URL on windows
# which is why it is being used instead.
if os.path.exists(location):
location = pathlib.Path(location).absolute().as_uri()
base = pathlib.Path.cwd().as_uri()
absolute_location = URIRef(rdflib.util._iri2uri(location), base=base)
if absolute_location.startswith("file:///"):
filename = url2pathname(absolute_location.replace("file:///", "/"))
file = open(filename, "rb")
else:
input_source = URLInputSource(absolute_location, format)
auto_close = True
# publicID = publicID or absolute_location # Further to fix
# for issue 130
return absolute_location, auto_close, file, input_source