From 215c2c380fcbb5371f0b7734363eecbf0bf85a71 Mon Sep 17 00:00:00 2001 From: JOLIMAITRE Matthieu Date: Mon, 20 May 2024 17:48:14 +0200 Subject: [PATCH] init --- .gitignore | 2 + .vscode/settings.json | 4 + setup.sh | 6 ++ src/party/__init__.py | 4 + src/party/forward.py | 23 ++++++ src/party/parser.py | 165 ++++++++++++++++++++++++++++++++++++++++++ src/party/result.py | 39 ++++++++++ tests/json.py | 86 ++++++++++++++++++++++ tests/test.py | 61 ++++++++++++++++ 9 files changed, 390 insertions(+) create mode 100644 .gitignore create mode 100644 .vscode/settings.json create mode 100755 setup.sh create mode 100644 src/party/__init__.py create mode 100644 src/party/forward.py create mode 100644 src/party/parser.py create mode 100644 src/party/result.py create mode 100644 tests/json.py create mode 100755 tests/test.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..27009d8 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/venv +__pycache__ diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..dcb1530 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,4 @@ +{ + "python.analysis.typeCheckingMode": "basic", + "python.analysis.autoImportCompletions": true +} \ No newline at end of file diff --git a/setup.sh b/setup.sh new file mode 100755 index 0000000..685be52 --- /dev/null +++ b/setup.sh @@ -0,0 +1,6 @@ +#!/bin/sh +set -e +cd "$(dirname "$(realpath "$0")")" + +python3 -m venv venv +source venv/bin/activate \ No newline at end of file diff --git a/src/party/__init__.py b/src/party/__init__.py new file mode 100644 index 0000000..2899a23 --- /dev/null +++ b/src/party/__init__.py @@ -0,0 +1,4 @@ + +from .parser import Parser, just, regex, end +from .result import Result, ParseError +from .forward import FwDeclaration diff --git a/src/party/forward.py b/src/party/forward.py new file mode 100644 index 0000000..fd59dd0 --- /dev/null +++ b/src/party/forward.py @@ -0,0 +1,23 @@ +from typing import Generic, TypeVar +from .result import Result +from .parser import Parser + + +P = TypeVar("P") +class FwDeclaration(Generic[P]): + parser: None | Parser[P] + + def __init__(self) -> None: + self.parser = None + + def p(self): + def inner(stream: str, index: int) -> Result[P]: + if self.parser is None: + raise Exception("Using forwarded definition of parser without defining first.") + return self.parser.inner(stream, index) + return Parser(inner) + + def define(self, parser: Parser[P]): + if self.parser is not None: + raise Exception("Redefinition of parser.") + self.parser = parser diff --git a/src/party/parser.py b/src/party/parser.py new file mode 100644 index 0000000..fce3065 --- /dev/null +++ b/src/party/parser.py @@ -0,0 +1,165 @@ +from typing import Callable, Generic, TypeVar, Union +from .result import Result, Success, Failure, ParseError +import re + + +def regex(pattern: str): + def inner(stream: str, index: int) -> Result[str]: + do_match = re.match(pattern, stream[index:]) + if do_match is None: + return Result.failure(index, set(f"matching /{pattern}/")) + else: + match = do_match[0] + return Result.success(match, index + len(match)) + return Parser(inner) + +def just(text: str): + length = len(text) + def inner(stream: str, index: int) -> Result[str]: + end = index + length + if stream[index:end] == text: + return Result.success(text, end) + else: + return Result.failure(index, set([text])) + return Parser(inner) + + +def end() -> "Parser[None]": + def inner(stream: str, index: int): + if stream[index:] == "": + return Result.success(None, index) + else: + return Result.failure(index, set()) + return Parser(inner) + + +P = TypeVar("P") +O = TypeVar("O") +T = TypeVar("T") + +class Parser(Generic[P]): + inner: Callable[[str, int], Result[P]] + + def __init__(self, inner: Callable[[str, int], Result[P]]): + self.inner = inner + + def parse(self, stream: str): + (result, _rest) = self.and_then(end()).parse_part(stream) + (value, _end) = result + return value + + def parse_part(self, stream: str): + result = self.inner(stream, 0) + if isinstance(result.actual, Success): + rest = stream[result.actual.next_index:] + return (result.actual.value, rest) + else: + raise ParseError(result.actual, stream) + + def map(self, transform: Callable[[P], O]) -> "Parser[O]": + def inner(stream: str, index: int): + result = self.inner(stream, 0) + if isinstance(result.actual, Success): + mapped = transform(result.actual.value) + return Result.success(mapped, result.actual.next_index) + return result + return Parser(inner) + + def and_then(self, other: "Parser[T]"): + def inner(stream: str, index: int) -> Result[tuple[P, T]]: + result = self.inner(stream, index) + if isinstance(result.actual, Failure): + return Result(result.actual) + value_left = result.actual.value + next_index = result.actual.next_index + + result = other.inner(stream, next_index) + if isinstance(result.actual, Failure): + return Result(result.actual) + value_right = result.actual.value + next_index = result.actual.next_index + + return Result.success((value_left, value_right), next_index) + return Parser(inner) + + def or_else(self, other: "Parser[T]"): + def inner(stream: str, index: int) -> Result[Union[P, T]]: + result_left = self.inner(stream, index) + if isinstance(result_left.actual, Success): + return Result.success(result_left.actual.value, result_left.actual.next_index) + result_right = other.inner(stream, index) + if isinstance(result_right.actual, Success): + return Result.success(result_right.actual.value, result_right.actual.next_index) + return Result.failure(index, result_left.actual.expected.union(result_right.actual.expected)) + return Parser(inner) + + def repeat(self): + def inner(stream: str, index: int) -> Result[list[P]]: + values = list[P]() + while True: + result = self.inner(stream, index) + if isinstance(result.actual, Failure): + break + values.append(result.actual.value) + if result.actual.next_index == index: + raise Exception("Parsing empty patterns repeatedly.") + index = result.actual.next_index + return Result.success(values, index) + return Parser(inner) + + def or_not(self): + def inner(stream: str, index: int) -> Result[Union[P, None]]: + result = self.inner(stream, index) + if isinstance(result.actual, Failure): + return Result.success(None, index) + return Result.success(result.actual.value, result.actual.next_index) + return Parser(inner) + + def value(self, value: T) -> "Parser[T]": + return self.map(lambda _: value) + + def sep_by(self, other: "Parser[T]"): + parser = self.or_not().and_then(other.and_then(self).repeat()) + def mapping(value: tuple[P | None, list[tuple[T, P]]]): + (first, rest) = value + if first is None: return list[P]() + return [first, *(value for (_sep, value) in rest)] + mapped = parser.map(mapping) + return mapped + + # def skip_until(self, other: "Parser[T]"): + # def inner(stream: str, index: int) -> Result[tuple[P, str, T]]: + # pass + # return Parser(inner) + + # | + def __or__(self, other: "Parser[T]"): + return self.or_else(other) + + # & + def __and__(self, other: "Parser[T]"): + return self.and_then(other) + + # >> + def __rshift__(self, other: "Parser[T]"): + return self.and_then(other).map(lambda v: v[1]) + + # << + def __lshift__(self, other: "Parser[T]"): + return self.and_then(other).map(lambda v: v[0]) + +P = TypeVar("P") +class Transformer(Generic[P]): + def parse(self, stream: str, at_index: int) -> Result[P]: + raise Exception("Abstract method.") + +L = TypeVar("L") +R = TypeVar("R") +class AndParser(Transformer[tuple[L, R]]): + def __init__(self, left: Transformer[L], right: Transformer[R]): + self.left = left + self.right = right + + def parse(self, stream: str, at_index: int) -> Result[tuple[L, R]]: + result_left = self.left.parse(stream, at_index) + if result_left.actual diff --git a/src/party/result.py b/src/party/result.py new file mode 100644 index 0000000..3149290 --- /dev/null +++ b/src/party/result.py @@ -0,0 +1,39 @@ +from dataclasses import dataclass +from typing import Union, Callable, Generic, TypeVar + + +P = TypeVar("P") +@dataclass +class Success(Generic[P]): + value: P + next_index: int + + +@dataclass +class Failure: + at_index: int + expected: set[str] + depth: int | None + + +P = TypeVar("P") +Result = Union["Failure", "Success[P]"] + + +class ParseError(BaseException): + failure: Failure + stream: str + + def __init__(self, failure: Failure, stream: str) -> None: + self.failure = failure + self.stream = stream + + fail_section = stream[failure.at_index:80] + message = f""" +Parsing failed at position {self.failure.at_index} of stream : +${fail_section} + +Expected one of: +{failure.expected} +""" + super().__init__(message) diff --git a/tests/json.py b/tests/json.py new file mode 100644 index 0000000..a08535a --- /dev/null +++ b/tests/json.py @@ -0,0 +1,86 @@ +from os.path import dirname +import sys + +sys.path.append(f"{dirname(__file__)}/../src") +from party import regex, Parser, just, FwDeclaration + +from typing import TypeVar, Any + +# Utilities +# whitespace = regex(r"\s*") +whitespace = just(" ").or_else(just("\n")).repeat() + +T = TypeVar("T") + + +def lexeme(p: Parser[T]) -> Parser[T]: + return p << whitespace + + +# Punctuation +lbrace = lexeme(just("{")) +rbrace = lexeme(just("}")) +lbrack = lexeme(just("[")) +rbrack = lexeme(just("]")) +colon = lexeme(just(":")) +comma = lexeme(just(",")) + +# Primitives +true = lexeme(just("true")).value(True) +false = lexeme(just("false")).value(False) +null = lexeme(just("null")).value(None) +number = lexeme(regex(r"-?(0|[1-9][0-9]*)([.][0-9]+)?([eE][+-]?[0-9]+)?")).map(float) +just_part = regex(r'[^"\\]+') +just_esc = just("\\") >> ( + just("\\") + | just("/") + | just('"') + | just("b").value("\b") + | just("f").value("\f") + | just("n").value("\n") + | just("r").value("\r") + | just("t").value("\t") + | regex(r"u[0-9a-fA-F]{4}").map(lambda s: chr(int(s[1:], 16))) +) +quoted = lexeme(just('"') >> (just_part | just_esc).repeat().map(lambda l: "".join(l)) << just('"')) + +# Data structures +json_value = FwDeclaration[Any]() +object_pair = (quoted << colon) & json_value.p() +json_object = lbrace >> object_pair.sep_by(comma).map(dict) << rbrace +array = lbrack >> json_value.p().sep_by(comma) << rbrack + +# Everything +json_value.define(quoted | number | json_object | array | true | false | null) +json_doc = whitespace >> json_value.p() + + +def test(): + assert ( + json_doc.parse( + r""" + { + "int": 1, + "just": "hello", + "a list": [1, 2, 3], + "escapes": "\n \u24D2", + "nested": {"x": "y"}, + "other": [true, false, null] + } +""" + ) + == { + "int": 1, + "just": "hello", + "a list": [1, 2, 3], + "escapes": "\n ⓒ", + "nested": {"x": "y"}, + "other": [True, False, None], + } + ) + + +if __name__ == "__main__": + from sys import stdin + test() + # print(repr(json_doc.parse(stdin.read()))) diff --git a/tests/test.py b/tests/test.py new file mode 100755 index 0000000..689c89f --- /dev/null +++ b/tests/test.py @@ -0,0 +1,61 @@ +#!/bin/env -S python + +from os.path import dirname +import sys + +sys.path.append(f"{dirname(__file__)}/../src") +from party import regex, Parser, just, FwDeclaration, end + + +# input = "abc" +# parser = just("a").or_else(just("b")).or_else(just("c")) +# (parsed1, rest1) = parser.parse_part(input) +# (parsed2, rest2) = parser.parse_part(rest1) +# (parsed3, rest3) = parser.parse_part(rest2) + +# print( +# "parsed1", parsed1, +# "rest1", rest1, +# "parsed2", parsed2, +# "rest2", rest2, +# "parsed3", parsed3, +# "rest3", rest3, +# ) + + +# input = "a" +# parser = just("a").and_then(end()) +# parsed = parser.parse(input) + +# print("parsed", parsed) + + +# input = "...a" +# parser = just(".").repeat() >> just("a") +# parsed = parser.parse(input) + +# print("parsed", parsed) + + +# input = r"{a}" +# parser = just("{") >> just("a") << just("}") +# parsed = parser.parse(input) + +# print("parsed", parsed) + + +# input = r"a,a,a,a" +# parser = just("a").sep_by(just(",")) +# parsed = parser.parse(input) + +# print("parsed", parsed) + + +#td +input = r"{a,a,a,a" +# parser = just("{") >> just("a").sep_by(just(",")) +# parser = just("{").and_then(just("a").sep_by(just(","))) +parser = just("{").and_then(just("a").sep_by(just(","))) +parsed = parser.parse(input) + +print("parsed", parsed)