init
This commit is contained in:
commit
215c2c380f
9 changed files with 390 additions and 0 deletions
4
src/party/__init__.py
Normal file
4
src/party/__init__.py
Normal file
|
@ -0,0 +1,4 @@
|
|||
|
||||
from .parser import Parser, just, regex, end
|
||||
from .result import Result, ParseError
|
||||
from .forward import FwDeclaration
|
23
src/party/forward.py
Normal file
23
src/party/forward.py
Normal file
|
@ -0,0 +1,23 @@
|
|||
from typing import Generic, TypeVar
|
||||
from .result import Result
|
||||
from .parser import Parser
|
||||
|
||||
|
||||
P = TypeVar("P")
|
||||
class FwDeclaration(Generic[P]):
|
||||
parser: None | Parser[P]
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.parser = None
|
||||
|
||||
def p(self):
|
||||
def inner(stream: str, index: int) -> Result[P]:
|
||||
if self.parser is None:
|
||||
raise Exception("Using forwarded definition of parser without defining first.")
|
||||
return self.parser.inner(stream, index)
|
||||
return Parser(inner)
|
||||
|
||||
def define(self, parser: Parser[P]):
|
||||
if self.parser is not None:
|
||||
raise Exception("Redefinition of parser.")
|
||||
self.parser = parser
|
165
src/party/parser.py
Normal file
165
src/party/parser.py
Normal file
|
@ -0,0 +1,165 @@
|
|||
from typing import Callable, Generic, TypeVar, Union
|
||||
from .result import Result, Success, Failure, ParseError
|
||||
import re
|
||||
|
||||
|
||||
def regex(pattern: str):
|
||||
def inner(stream: str, index: int) -> Result[str]:
|
||||
do_match = re.match(pattern, stream[index:])
|
||||
if do_match is None:
|
||||
return Result.failure(index, set(f"matching /{pattern}/"))
|
||||
else:
|
||||
match = do_match[0]
|
||||
return Result.success(match, index + len(match))
|
||||
return Parser(inner)
|
||||
|
||||
def just(text: str):
|
||||
length = len(text)
|
||||
def inner(stream: str, index: int) -> Result[str]:
|
||||
end = index + length
|
||||
if stream[index:end] == text:
|
||||
return Result.success(text, end)
|
||||
else:
|
||||
return Result.failure(index, set([text]))
|
||||
return Parser(inner)
|
||||
|
||||
|
||||
def end() -> "Parser[None]":
|
||||
def inner(stream: str, index: int):
|
||||
if stream[index:] == "":
|
||||
return Result.success(None, index)
|
||||
else:
|
||||
return Result.failure(index, set())
|
||||
return Parser(inner)
|
||||
|
||||
|
||||
P = TypeVar("P")
|
||||
O = TypeVar("O")
|
||||
T = TypeVar("T")
|
||||
|
||||
class Parser(Generic[P]):
|
||||
inner: Callable[[str, int], Result[P]]
|
||||
|
||||
def __init__(self, inner: Callable[[str, int], Result[P]]):
|
||||
self.inner = inner
|
||||
|
||||
def parse(self, stream: str):
|
||||
(result, _rest) = self.and_then(end()).parse_part(stream)
|
||||
(value, _end) = result
|
||||
return value
|
||||
|
||||
def parse_part(self, stream: str):
|
||||
result = self.inner(stream, 0)
|
||||
if isinstance(result.actual, Success):
|
||||
rest = stream[result.actual.next_index:]
|
||||
return (result.actual.value, rest)
|
||||
else:
|
||||
raise ParseError(result.actual, stream)
|
||||
|
||||
def map(self, transform: Callable[[P], O]) -> "Parser[O]":
|
||||
def inner(stream: str, index: int):
|
||||
result = self.inner(stream, 0)
|
||||
if isinstance(result.actual, Success):
|
||||
mapped = transform(result.actual.value)
|
||||
return Result.success(mapped, result.actual.next_index)
|
||||
return result
|
||||
return Parser(inner)
|
||||
|
||||
def and_then(self, other: "Parser[T]"):
|
||||
def inner(stream: str, index: int) -> Result[tuple[P, T]]:
|
||||
result = self.inner(stream, index)
|
||||
if isinstance(result.actual, Failure):
|
||||
return Result(result.actual)
|
||||
value_left = result.actual.value
|
||||
next_index = result.actual.next_index
|
||||
|
||||
result = other.inner(stream, next_index)
|
||||
if isinstance(result.actual, Failure):
|
||||
return Result(result.actual)
|
||||
value_right = result.actual.value
|
||||
next_index = result.actual.next_index
|
||||
|
||||
return Result.success((value_left, value_right), next_index)
|
||||
return Parser(inner)
|
||||
|
||||
def or_else(self, other: "Parser[T]"):
|
||||
def inner(stream: str, index: int) -> Result[Union[P, T]]:
|
||||
result_left = self.inner(stream, index)
|
||||
if isinstance(result_left.actual, Success):
|
||||
return Result.success(result_left.actual.value, result_left.actual.next_index)
|
||||
result_right = other.inner(stream, index)
|
||||
if isinstance(result_right.actual, Success):
|
||||
return Result.success(result_right.actual.value, result_right.actual.next_index)
|
||||
return Result.failure(index, result_left.actual.expected.union(result_right.actual.expected))
|
||||
return Parser(inner)
|
||||
|
||||
def repeat(self):
|
||||
def inner(stream: str, index: int) -> Result[list[P]]:
|
||||
values = list[P]()
|
||||
while True:
|
||||
result = self.inner(stream, index)
|
||||
if isinstance(result.actual, Failure):
|
||||
break
|
||||
values.append(result.actual.value)
|
||||
if result.actual.next_index == index:
|
||||
raise Exception("Parsing empty patterns repeatedly.")
|
||||
index = result.actual.next_index
|
||||
return Result.success(values, index)
|
||||
return Parser(inner)
|
||||
|
||||
def or_not(self):
|
||||
def inner(stream: str, index: int) -> Result[Union[P, None]]:
|
||||
result = self.inner(stream, index)
|
||||
if isinstance(result.actual, Failure):
|
||||
return Result.success(None, index)
|
||||
return Result.success(result.actual.value, result.actual.next_index)
|
||||
return Parser(inner)
|
||||
|
||||
def value(self, value: T) -> "Parser[T]":
|
||||
return self.map(lambda _: value)
|
||||
|
||||
def sep_by(self, other: "Parser[T]"):
|
||||
parser = self.or_not().and_then(other.and_then(self).repeat())
|
||||
def mapping(value: tuple[P | None, list[tuple[T, P]]]):
|
||||
(first, rest) = value
|
||||
if first is None: return list[P]()
|
||||
return [first, *(value for (_sep, value) in rest)]
|
||||
mapped = parser.map(mapping)
|
||||
return mapped
|
||||
|
||||
# def skip_until(self, other: "Parser[T]"):
|
||||
# def inner(stream: str, index: int) -> Result[tuple[P, str, T]]:
|
||||
# pass
|
||||
# return Parser(inner)
|
||||
|
||||
# |
|
||||
def __or__(self, other: "Parser[T]"):
|
||||
return self.or_else(other)
|
||||
|
||||
# &
|
||||
def __and__(self, other: "Parser[T]"):
|
||||
return self.and_then(other)
|
||||
|
||||
# >>
|
||||
def __rshift__(self, other: "Parser[T]"):
|
||||
return self.and_then(other).map(lambda v: v[1])
|
||||
|
||||
# <<
|
||||
def __lshift__(self, other: "Parser[T]"):
|
||||
return self.and_then(other).map(lambda v: v[0])
|
||||
|
||||
P = TypeVar("P")
|
||||
class Transformer(Generic[P]):
|
||||
def parse(self, stream: str, at_index: int) -> Result[P]:
|
||||
raise Exception("Abstract method.")
|
||||
|
||||
L = TypeVar("L")
|
||||
R = TypeVar("R")
|
||||
class AndParser(Transformer[tuple[L, R]]):
|
||||
def __init__(self, left: Transformer[L], right: Transformer[R]):
|
||||
self.left = left
|
||||
self.right = right
|
||||
|
||||
def parse(self, stream: str, at_index: int) -> Result[tuple[L, R]]:
|
||||
result_left = self.left.parse(stream, at_index)
|
||||
if result_left.actual
|
39
src/party/result.py
Normal file
39
src/party/result.py
Normal file
|
@ -0,0 +1,39 @@
|
|||
from dataclasses import dataclass
|
||||
from typing import Union, Callable, Generic, TypeVar
|
||||
|
||||
|
||||
P = TypeVar("P")
|
||||
@dataclass
|
||||
class Success(Generic[P]):
|
||||
value: P
|
||||
next_index: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class Failure:
|
||||
at_index: int
|
||||
expected: set[str]
|
||||
depth: int | None
|
||||
|
||||
|
||||
P = TypeVar("P")
|
||||
Result = Union["Failure", "Success[P]"]
|
||||
|
||||
|
||||
class ParseError(BaseException):
|
||||
failure: Failure
|
||||
stream: str
|
||||
|
||||
def __init__(self, failure: Failure, stream: str) -> None:
|
||||
self.failure = failure
|
||||
self.stream = stream
|
||||
|
||||
fail_section = stream[failure.at_index:80]
|
||||
message = f"""
|
||||
Parsing failed at position {self.failure.at_index} of stream :
|
||||
${fail_section}
|
||||
|
||||
Expected one of:
|
||||
{failure.expected}
|
||||
"""
|
||||
super().__init__(message)
|
Loading…
Add table
Add a link
Reference in a new issue