Skip to content

domain_config

This module contains classes to load RDF domain configuration files (DCAT-like catalogs) defining how to find and select files for processing.

ConfigurationEntryList

Bases: list[CE]

Source code in ogc/na/domain_config.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
class ConfigurationEntryList(list[CE]):

    def find_entry_for_file(self, fn: str | Path) -> ConfigurationEntry | None:
        """
        Find the configuration entry that corresponds to a file, if any.

        :param fn: the file name
        :return: a DomainConfigurationEntry, or None if none is found
        """
        if not isinstance(fn, Path):
            fn = Path(fn)

        for entry in self:
            if entry.matches(fn):
                return entry

    def find_entries_for_files(self, fns: list[str | Path]) -> 'dict[Path, ConfigurationEntry]':
        """
        Find the configuration entries associated to a list of files. Similar
        to [find_entry_for_file()][ogc.na.domain_config.ConfigurationEntryList.find_entry_for_file]
        but with a list of files.

        :param fns: a list of files to find
        :return: a path \u2192 DomainConfigurationEntry dict for each file that is found
        """
        result: dict[Path, ConfigurationEntry] = {}
        for fn in fns:
            p = Path(fn).resolve()
            e = self.find_entry_for_file(p)
            if e:
                result[p] = e
        return result

    def find_all(self) -> 'dict[Path, ConfigurationEntry]':
        """
        Find all the files referenced by this configuration entry list, including
        their DomainConfigurationEntry.

        :return: a path to DomainConfigurationEntry mapping (dict) including all files
        """
        r = {}
        for entry in self:
            r.update({p: entry for p in entry.find_all()})
        return r

find_all()

Find all the files referenced by this configuration entry list, including their DomainConfigurationEntry.

Returns:

Type Description
'dict[Path, ConfigurationEntry]'

a path to DomainConfigurationEntry mapping (dict) including all files

Source code in ogc/na/domain_config.py
324
325
326
327
328
329
330
331
332
333
334
def find_all(self) -> 'dict[Path, ConfigurationEntry]':
    """
    Find all the files referenced by this configuration entry list, including
    their DomainConfigurationEntry.

    :return: a path to DomainConfigurationEntry mapping (dict) including all files
    """
    r = {}
    for entry in self:
        r.update({p: entry for p in entry.find_all()})
    return r

find_entries_for_files(fns)

Find the configuration entries associated to a list of files. Similar to find_entry_for_file() but with a list of files.

Parameters:

Name Type Description Default
fns list[str | Path]

a list of files to find

required

Returns:

Type Description
'dict[Path, ConfigurationEntry]'

a path → DomainConfigurationEntry dict for each file that is found

Source code in ogc/na/domain_config.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
def find_entries_for_files(self, fns: list[str | Path]) -> 'dict[Path, ConfigurationEntry]':
    """
    Find the configuration entries associated to a list of files. Similar
    to [find_entry_for_file()][ogc.na.domain_config.ConfigurationEntryList.find_entry_for_file]
    but with a list of files.

    :param fns: a list of files to find
    :return: a path \u2192 DomainConfigurationEntry dict for each file that is found
    """
    result: dict[Path, ConfigurationEntry] = {}
    for fn in fns:
        p = Path(fn).resolve()
        e = self.find_entry_for_file(p)
        if e:
            result[p] = e
    return result

find_entry_for_file(fn)

Find the configuration entry that corresponds to a file, if any.

Parameters:

Name Type Description Default
fn str | Path

the file name

required

Returns:

Type Description
ConfigurationEntry | None

a DomainConfigurationEntry, or None if none is found

Source code in ogc/na/domain_config.py
293
294
295
296
297
298
299
300
301
302
303
304
305
def find_entry_for_file(self, fn: str | Path) -> ConfigurationEntry | None:
    """
    Find the configuration entry that corresponds to a file, if any.

    :param fn: the file name
    :return: a DomainConfigurationEntry, or None if none is found
    """
    if not isinstance(fn, Path):
        fn = Path(fn)

    for entry in self:
        if entry.matches(fn):
            return entry

DomainConfiguration

The DomainConfiguration class can load a collection of ConfigurationEntry's detailing which files need to be processed and where they can be found, as well as including a list of profiles for entailment, validation, and (potentially) other operations.

Domain configurations use the http://www.example.org/ogc/domain-cfg# (dcfg) prefix.

A domain configuration must include, at least, a dcfg:glob (glob expression to find/filter files inside the base directory). If present, a dcfg:uriRootFilter will be used to determine which is the main concept scheme in the file (if more than one is found). Profiles for validation, entailment, etc. can be specified using dcterms:conformsTo.

dcfg:hasUpliftDefinition can also be used to declare (ordered) semantic uplift definitions, either from profile artifacts or from files.

Source code in ogc/na/domain_config.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
class DomainConfiguration:
    """
    The DomainConfiguration class can load a collection of ConfigurationEntry's
    detailing which files need to be processed and where they can be found, as well
    as including a list of profiles for entailment, validation, and (potentially)
    other operations.

    Domain configurations use the `http://www.example.org/ogc/domain-cfg#` (dcfg) prefix.

    A domain configuration must include, at least, a `dcfg:glob` (glob expression to find/filter
    files inside the base directory). If present, a `dcfg:uriRootFilter` will be used to determine
    which is the main concept scheme in the file (if more than one is found). Profiles for
    validation, entailment, etc. can be specified using `dcterms:conformsTo`.

    `dcfg:hasUpliftDefinition` can also be used to declare (ordered) semantic uplift definitions, either
    from profile artifacts or from files.
    """

    def __init__(self, source: Union[Graph, str, Path, IO], working_directory: str | Path = None,
                 profile_sources: str | Path | Iterable[str | Path] | None = None,
                 ignore_artifact_errors=False, local_artifacts_mappings: dict | None = None):
        """
        Creates a new DomainConfiguration, optionally specifying the working directory.

        :param source: Graph or Turtle file to load
        :param working_directory: the working directory to use for local paths.
        """
        if working_directory:
            self.working_directory = Path(working_directory).resolve()
        elif isinstance(source, str) or isinstance(source, Path):
            self.working_directory = Path(source).parent.resolve()
        else:
            self.working_directory = Path().resolve()
        logger.info("Working directory: %s", self.working_directory)
        self.entries = ConfigurationEntryList()
        self.uplift_entries = UpliftConfigurationEntryList()
        self.local_artifacts_mappings = {}
        if local_artifacts_mappings:
            self.local_artifacts_mappings.update(local_artifacts_mappings)
        self.profile_registry: ProfileRegistry | None = None
        self._profile_sources = profile_sources
        self._ignore_artifact_errors = ignore_artifact_errors

        self._load(source)

    def _load(self, source: Union[Graph, str, IO]):
        """
        Load entries from a Graph or Turtle document.

        :param source: Graph or Turtle file to load
        :return: this DomainConfiguration instance
        """
        service = ''
        if isinstance(source, Graph):
            g = source
        elif isinstance(source, str) and source.startswith('sparql:'):
            service = source[len('sparql:'):]
            g = Graph()
        else:
            g = Graph().parse(source)

        cfg_graph = g.query(DOMAIN_CFG_QUERY.replace('__SERVICE__', service)).graph

        ignore_profile_artifact_errors = self._ignore_artifact_errors

        prof_sources: set[str | Path] = set()
        for catalog_ref in cfg_graph.subjects(DCAT.dataset):
            logger.debug("Found catalog %s", catalog_ref)

            if bool(cfg_graph.value(catalog_ref, DCFG.ignoreProfileArtifactErrors)):
                ignore_profile_artifact_errors = True

            # Local artifacts mapping
            for mapping_ref in cfg_graph.objects(catalog_ref, DCFG.localArtifactMapping):
                base_uri = str(cfg_graph.value(mapping_ref, DCFG.baseURI))
                if base_uri in self.local_artifacts_mappings:
                    logger.debug("Local artifact mapping for %s overriden", base_uri)
                    # Overriden
                    continue
                local_path = Path(str(cfg_graph.value(mapping_ref, DCFG.localPath)))
                logger.debug("Found local artifact mapping: %s -> %s", base_uri, local_path)
                self.local_artifacts_mappings[base_uri] = local_path

            # Profile sources
            for p in cfg_graph.objects(catalog_ref, DCFG.hasProfileSource):
                if not isinstance(p, Literal):
                    continue
                if p.value.startswith('sparql:'):
                    prof_sources.add(p.value)
                else:
                    prof_sources.update(self.working_directory.glob(p.value))

            if self._profile_sources:
                prof_sources.update(self._profile_sources)

        self.profile_registry = ProfileRegistry(prof_sources,
                                                ignore_artifact_errors=ignore_profile_artifact_errors,
                                                local_artifact_mappings=self.local_artifacts_mappings)

        for cfg_ref in cfg_graph.objects(predicate=DCAT.dataset):

            globs = [str(g) for g in cfg_graph.objects(cfg_ref, DCFG.glob)]

            # DomainConfigurationEntry specific properties
            uri_root_filter = cfg_graph.value(cfg_ref, DCFG.uriRootFilter)
            profile_refs = cast(list[URIRef], list(cfg_graph.objects(cfg_ref, DCTERMS.conformsTo)))

            # UpliftConfigurationEntry specific properties
            found_uplift_defs = []
            max_order = None
            for uplift_def_ref in cfg_graph.objects(cfg_ref, DCFG.hasUpliftDefinition):
                order = cfg_graph.value(uplift_def_ref, DCFG.order)
                if order is not None and (max_order is None or int(order) > max_order):
                    max_order = int(order)
                target_prof = cfg_graph.value(uplift_def_ref, DCFG.profile)
                target_file = cfg_graph.value(uplift_def_ref, DCFG.file)
                if target_prof:
                    found_uplift_defs.append([order, target_prof])
                elif target_file:
                    found_uplift_defs.append([order, self.working_directory.joinpath(str(target_file)).resolve()])
            uplift_defs = [p[1] for p in
                           sorted(found_uplift_defs,
                                  key=lambda u: u[0] if u[0] is not None else max_order + 1)]

            identifier = cfg_graph.value(cfg_ref, DCTERMS.identifier) or str(cfg_ref)

            if (cfg_ref, RDF.type, DCFG.DomainConfiguration) in cfg_graph:
                self.entries.append(DomainConfigurationEntry(
                    working_directory=self.working_directory,
                    glob=globs,
                    identifier=identifier,
                    uri_root_filter=uri_root_filter,
                    conforms_to=profile_refs,
                ))

            if uplift_defs:
                self.uplift_entries.append(UpliftConfigurationEntry(
                    working_directory=self.working_directory,
                    glob=globs,
                    identifier=identifier,
                    uplift_definitions=uplift_defs,
                ))

        logger.info("Found %d domain configurations and %d uplift configurations",
                    len(self.entries),
                    len(self.uplift_entries))

        return self

    def __len__(self):
        return len(self.entries) + len(self.uplift_entries)

__init__(source, working_directory=None, profile_sources=None, ignore_artifact_errors=False, local_artifacts_mappings=None)

Creates a new DomainConfiguration, optionally specifying the working directory.

Parameters:

Name Type Description Default
source Union[Graph, str, Path, IO]

Graph or Turtle file to load

required
working_directory str | Path

the working directory to use for local paths.

None
Source code in ogc/na/domain_config.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def __init__(self, source: Union[Graph, str, Path, IO], working_directory: str | Path = None,
             profile_sources: str | Path | Iterable[str | Path] | None = None,
             ignore_artifact_errors=False, local_artifacts_mappings: dict | None = None):
    """
    Creates a new DomainConfiguration, optionally specifying the working directory.

    :param source: Graph or Turtle file to load
    :param working_directory: the working directory to use for local paths.
    """
    if working_directory:
        self.working_directory = Path(working_directory).resolve()
    elif isinstance(source, str) or isinstance(source, Path):
        self.working_directory = Path(source).parent.resolve()
    else:
        self.working_directory = Path().resolve()
    logger.info("Working directory: %s", self.working_directory)
    self.entries = ConfigurationEntryList()
    self.uplift_entries = UpliftConfigurationEntryList()
    self.local_artifacts_mappings = {}
    if local_artifacts_mappings:
        self.local_artifacts_mappings.update(local_artifacts_mappings)
    self.profile_registry: ProfileRegistry | None = None
    self._profile_sources = profile_sources
    self._ignore_artifact_errors = ignore_artifact_errors

    self._load(source)