-
Notifications
You must be signed in to change notification settings - Fork 145
[Issue 305] add new json parser #366
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
296816b
cd095e2
f2f91fd
190cd5a
2834000
6297673
4741a43
080d848
5826922
e6332cb
1f6d5b6
fc980b1
3fe3e11
0be1780
c5b8d3c
03cce38
2dcd125
50c3038
562f288
a722036
c8851d8
347051a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
Signed-off-by: Meret Behrens <meret.behrens@tngtech.com>
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
# Copyright (c) 2022 spdx contributors | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
from typing import List | ||
|
||
|
||
class SPDXParsingError(Exception): | ||
messages: List[str] | ||
|
||
def __init__(self, messages: List[str]): | ||
self.messages = messages | ||
|
||
def get_messages(self): | ||
return self.messages | ||
armintaenzertng marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
# Copyright (c) 2022 spdx contributors | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
import re | ||
from typing import Union, Pattern, Match | ||
|
||
from src.model.actor import Actor, ActorType | ||
from src.model.spdx_no_assertion import SpdxNoAssertion | ||
from src.model.typing.constructor_type_errors import ConstructorTypeErrors | ||
from src.parser.error import SPDXParsingError | ||
|
||
|
||
class ActorParser: | ||
def parse_actor_or_no_assertion(self, actor_or_no_assertion: str) -> Union[SpdxNoAssertion, Actor]: | ||
if actor_or_no_assertion == SpdxNoAssertion.__str__: | ||
return SpdxNoAssertion() | ||
else: | ||
return self.parse_actor(actor_or_no_assertion) | ||
|
||
@staticmethod | ||
def parse_actor(actor: str) -> Actor: | ||
tool_re: Pattern = re.compile(r"Tool:\s*(.+)", re.UNICODE) | ||
person_re: Pattern = re.compile(r"Person:\s*(([^(])+)(\((.*)\))?", re.UNICODE) | ||
org_re: Pattern = re.compile(r"Organization:\s*(([^(])+)(\((.*)\))?", re.UNICODE) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Best to include the "start" symbol There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch, changed the regex There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we even want to include the "end" symbol. Not so sure about this one; but I mean,
is probably not valid |
||
tool_match: Match = tool_re.match(actor) | ||
person_match: Match = person_re.match(actor) | ||
org_match: Match = org_re.match(actor) | ||
|
||
if tool_match: | ||
name: str = tool_match.group(1).strip() | ||
try: | ||
creator = Actor(ActorType.TOOL, name=name) | ||
except ConstructorTypeErrors as err: | ||
raise SPDXParsingError(err.get_messages()) | ||
elif person_match: | ||
name: str = person_match.group(1).strip() | ||
email: str = person_match.group(4).strip() if person_match.group(4) else None | ||
armintaenzertng marked this conversation as resolved.
Show resolved
Hide resolved
|
||
try: | ||
creator = Actor(ActorType.PERSON, name=name, email=email) | ||
except ConstructorTypeErrors as err: | ||
raise SPDXParsingError(err.get_messages()) | ||
elif org_match: | ||
name: str = org_match.group(1).strip() | ||
email: str = org_match.group(4).strip() if org_match.group(4) else None | ||
try: | ||
creator = Actor(ActorType.ORGANIZATION, name=name, email=email) | ||
except ConstructorTypeErrors as err: | ||
raise SPDXParsingError(err.get_messages()) | ||
|
||
else: | ||
raise SPDXParsingError([f"Actor {actor} doesn't match any of person, organization or tool."]) | ||
|
||
return creator |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,156 @@ | ||
# Copyright (c) 2022 spdx contributors | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
from datetime import datetime | ||
from typing import Dict, Optional, List | ||
|
||
from src.model.actor import Actor | ||
from src.model.annotation import Annotation, AnnotationType | ||
from src.model.typing.constructor_type_errors import ConstructorTypeErrors | ||
from src.parser.error import SPDXParsingError | ||
from src.parser.json.actor_parser import ActorParser | ||
from src.parser.json.dict_parsing_functions import datetime_from_str | ||
from src.parser.logger import Logger | ||
|
||
|
||
class AnnotationParser: | ||
logger: Logger | ||
actor_parser: ActorParser | ||
|
||
def __init__(self): | ||
self.logger = Logger() | ||
self.actor_parser = ActorParser() | ||
|
||
def parse_all_annotations(self, input_doc_dict: Dict) -> List[Annotation]: | ||
annotations_list = [] | ||
doc_spdx_id: str = input_doc_dict.get("SPDXID") | ||
document_annotations: List[Dict] = input_doc_dict.get("annotations") | ||
if document_annotations: | ||
try: | ||
annotations_list.extend(self.parse_annotations(document_annotations, spdx_id=doc_spdx_id)) | ||
except SPDXParsingError as err: | ||
self.logger.append_all(err.get_messages()) | ||
|
||
reviews: List[Dict] = input_doc_dict.get("revieweds") | ||
armintaenzertng marked this conversation as resolved.
Show resolved
Hide resolved
armintaenzertng marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if reviews: | ||
for review in reviews: | ||
try: | ||
review_annotation: Annotation = self.parse_review(review, spdx_id=doc_spdx_id) | ||
if review_annotation: | ||
annotations_list.append(review_annotation) | ||
except SPDXParsingError as err: | ||
self.logger.append_all(err.get_messages()) | ||
packages: List[Dict] = input_doc_dict.get("packages") | ||
if packages: | ||
for package in packages: | ||
package_spdx_id: str = package.get("SPDXID") | ||
package_annotations: List[Dict] = package.get("annotations") | ||
if package_annotations: | ||
try: | ||
annotations_list.extend(self.parse_annotations(package_annotations, spdx_id=package_spdx_id)) | ||
except SPDXParsingError as err: | ||
self.logger.append_all(err.get_messages()) | ||
files: List[Dict] = input_doc_dict.get("files") | ||
if files: | ||
for file in files: | ||
file_spdx_id: str = file.get("SPDXID") | ||
file_annotations:List[Dict] = file.get("annotations") | ||
if file_annotations: | ||
try: | ||
annotations_list.extend(self.parse_annotations(file_annotations, spdx_id=file_spdx_id)) | ||
except SPDXParsingError as err: | ||
self.logger.append_all(err.get_messages()) | ||
|
||
snippets: List[Dict] = input_doc_dict.get("snippets") | ||
if snippets: | ||
for snippet in snippets: | ||
snippet_spdx_id: str = snippet.get("SPDXID") | ||
snippet_annotations: List[Dict] = snippet.get("annotations") | ||
if snippet_annotations: | ||
try: | ||
annotations_list.extend(self.parse_annotations(snippet_annotations, spdx_id=snippet_spdx_id)) | ||
except SPDXParsingError as err: | ||
self.logger.append_all(err.get_messages()) | ||
|
||
if self.logger.has_messages(): | ||
raise SPDXParsingError(self.logger.get_messages()) | ||
return annotations_list | ||
|
||
def parse_annotations(self, annotations_dict_list: List[Dict], spdx_id: Optional[str] = None) -> List[Annotation]: | ||
logger = Logger() | ||
annotations_list = [] | ||
for annotation_dict in annotations_dict_list: | ||
try: | ||
annotation: Annotation = self.parse_annotation(annotation_dict, spdx_id=spdx_id) | ||
annotations_list.append(annotation) | ||
except SPDXParsingError as err: | ||
logger.append_all(err.get_messages()) | ||
if logger.has_messages(): | ||
raise SPDXParsingError(logger.get_messages()) | ||
|
||
return annotations_list | ||
|
||
def parse_annotation(self, annotation: Dict, spdx_id: Optional[str] = None) -> Annotation: | ||
logger = Logger() | ||
spdx_id: str = annotation.get("SPDXID") or spdx_id | ||
try: | ||
annotation_type: Optional[AnnotationType] = self.parse_annotation_type(annotation.get("annotationType")) | ||
except SPDXParsingError as err: | ||
logger.append_all(err.get_messages()) | ||
annotation_type = None | ||
try: | ||
annotator: Optional[Actor] = self.actor_parser.parse_actor(annotation.get("annotator")) | ||
except SPDXParsingError as err: | ||
logger.append_all(err.get_messages()) | ||
annotator = None | ||
try: | ||
annotation_date: Optional[datetime] = datetime_from_str(annotation.get("annotationDate")) | ||
except TypeError: | ||
logger.append("ValueError while parsing annotationDate.") | ||
annotation_date = None | ||
annotation_comment: str = annotation.get("comment") | ||
if logger.has_messages(): | ||
raise SPDXParsingError([f"Error while parsing annotation: {logger.get_messages()}"]) | ||
try: | ||
annotation = Annotation(spdx_id, annotation_type, annotator, annotation_date, annotation_comment) | ||
except ConstructorTypeErrors as err: | ||
raise SPDXParsingError([f"Error while constructing annotation: {err.get_messages()}"]) | ||
return annotation | ||
|
||
@staticmethod | ||
def parse_annotation_type(annotation_type: str) -> AnnotationType: | ||
try: | ||
return AnnotationType[annotation_type] | ||
except KeyError: | ||
raise SPDXParsingError([f"Invalid annotation type: {annotation_type}"]) | ||
|
||
|
||
def parse_review(self, review_dict: Dict, spdx_id: str) -> Annotation: | ||
logger = Logger() | ||
try: | ||
annotator: Optional[Actor] = self.actor_parser.parse_actor(review_dict.get("reviewer")) | ||
except SPDXParsingError as err: | ||
logger.append_all(err.get_messages()) | ||
annotator = None | ||
try: | ||
annotation_date: Optional[datetime] = datetime_from_str(review_dict.get("reviewDate")) | ||
except TypeError: | ||
logger.append("ValueError while parsing reviewDate.") | ||
annotation_date = None | ||
annotation_type = AnnotationType.REVIEW | ||
comment: str = review_dict.get("comment") | ||
if logger.has_messages(): | ||
raise SPDXParsingError([f"Error while parsing review: {logger.get_messages()}"]) | ||
|
||
try: | ||
return Annotation(spdx_id=spdx_id, annotator=annotator, annotation_date=annotation_date, | ||
annotation_type=annotation_type, annotation_comment=comment) | ||
except ConstructorTypeErrors as err: | ||
raise SPDXParsingError([f"Error while constructing review: {err.get_messages()}"]) |
10000 Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
# Copyright (c) 2022 spdx contributors | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
from typing import Dict, List | ||
|
||
from src.model.checksum import Checksum, ChecksumAlgorithm | ||
from src.model.typing.constructor_type_errors import ConstructorTypeErrors | ||
from src.parser.error import SPDXParsingError | ||
from src.parser.json.dict_parsing_functions import transform_json_str_to_enum_name | ||
from src.parser.logger import Logger | ||
|
||
|
||
class ChecksumParser: | ||
auxiliary_logger: Logger | ||
armintaenzertng marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def __init__(self): | ||
self.auxiliary_logger = Logger() | ||
|
||
def parse_checksums(self, checksum_dicts_list: List[Dict]) -> List[Checksum]: | ||
armintaenzertng marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if not checksum_dicts_list: | ||
raise SPDXParsingError([f"No checksums provided, checksums are mandatory for files."]) | ||
armintaenzertng marked this conversation as resolved.
Show resolved
Hide resolved
armintaenzertng marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
checksum_list = [] | ||
for checksum_dict in checksum_dicts_list: | ||
try: | ||
checksum_list.append(self.parse_checksum(checksum_dict)) | ||
except SPDXParsingError as err: | ||
self.auxiliary_logger.append_all(err.get_messages()) | ||
continue | ||
if self.auxiliary_logger.has_messages(): | ||
raise SPDXParsingError(self.auxiliary_logger.get_messages()) | ||
|
||
return checksum_list | ||
|
||
@staticmethod | ||
def parse_checksum(checksum_dict: Dict) -> Checksum: | ||
logger = Logger() | ||
algorithm = transform_json_str_to_enum_name(checksum_dict.get("algorithm")) | ||
try: | ||
checksum_algorithm = ChecksumAlgorithm[algorithm] | ||
except KeyError: | ||
logger.append(f"Algorithm {algorithm} not valid for checksum.") | ||
checksum_algorithm = None | ||
checksum_value = checksum_dict.get("checksumValue") | ||
if logger.has_messages(): | ||
raise SPDXParsingError([f"Error while parsing checksum: {logger.get_messages()}"]) | ||
try: | ||
checksum = Checksum(algorithm=checksum_algorithm, value=checksum_value) | ||
except ConstructorTypeErrors as err: | ||
raise SPDXParsingError([f"Error while constructing checksum: {err.get_messages()}"]) | ||
return checksum |
Uh oh!
There was an error while loading. Please reload this page.