Source code for rdflib.plugins.serializers.patch

from __future__ import annotations

import warnings
from typing import IO, Optional
from uuid import uuid4

from rdflib import Dataset
from rdflib.plugins.serializers.nquads import _nq_row
from rdflib.plugins.serializers.nt import _nt_row
from rdflib.serializer import Serializer

add_remove_methods = {"add": "A", "remove": "D"}


[docs] class PatchSerializer(Serializer): """ Creates an RDF patch file to add and remove triples/quads. Can either: - Create an add or delete patch for a single Dataset. - Create a patch to represent the difference between two Datasets. """
[docs] def __init__( self, store: Dataset, ): self.store: Dataset = store super().__init__(store)
[docs] def serialize( self, stream: IO[bytes], base: Optional[str] = None, encoding: Optional[str] = None, **kwargs, ): """ Serialize the store to the given stream. :param stream: The stream to serialize to. :param base: The base URI to use for the serialization. :param encoding: The encoding to use for the serialization. :param kwargs: Additional keyword arguments. Supported keyword arguments: - operation: The operation to perform. Either 'add' or 'remove'. - target: The target Dataset to compare against. NB: Only one of 'operation' or 'target' should be provided. - header_id: The header ID to use. - header_prev: The previous header ID to use. """ operation = kwargs.get("operation") target = kwargs.get("target") header_id = kwargs.get("header_id") header_prev = kwargs.get("header_prev") if not header_id: header_id = f"uuid:{uuid4()}" encoding = self.encoding if base is not None: warnings.warn("PatchSerializer does not support base.") if encoding is not None and encoding.lower() != self.encoding.lower(): warnings.warn( "PatchSerializer does not use custom encoding. " f"Given encoding was: {encoding}" ) def write_header(): stream.write(f"H id <{header_id}> .\n".encode(encoding, "replace")) if header_prev: stream.write(f"H prev <{header_prev}>\n".encode(encoding, "replace")) stream.write("TX .\n".encode(encoding, "replace")) def write_triples(contexts, op_code, use_passed_contexts=False): for context in contexts: if not use_passed_contexts: context = self.store.get_context(context.identifier) for triple in context: stream.write( self._patch_row(triple, context.identifier, op_code).encode( encoding, "replace" ) ) if operation: assert operation in add_remove_methods, f"Invalid operation: {operation}" elif not target: # No operation specified and no target specified # Fall back to default operation of "add" to prevent a no-op operation = "add" write_header() if operation: operation_code = add_remove_methods.get(operation) write_triples(self.store.contexts(), operation_code) elif target: to_add, to_remove = self._diff(target) write_triples(to_add.contexts(), "A", use_passed_contexts=True) write_triples(to_remove.contexts(), "D", use_passed_contexts=True) stream.write("TC .\n".encode(encoding, "replace"))
def _diff(self, target): rows_to_add = target - self.store rows_to_remove = self.store - target return rows_to_add, rows_to_remove def _patch_row(self, triple, context_id, operation): if context_id == self.store.default_context.identifier: return f"{operation} {_nt_row(triple)}" else: return f"{operation} {_nq_row(triple, context_id)}"