DRAFT: RLMeta: a VM based approach

Published on 2019-07-17.

This is a work in progress that will change. Like to see it finished? Let me know by sending me an email.

In this article I present an alternative implementation of RLMeta in which grammars are compiled into instructions for a virtual machine (VM).

The VM based version builds upon the optimized version and the implementation is inspired by Regular Expression Matching: the Virtual Machine Approach and A Text Pattern-Matching Tool based on Parsing Expression Grammars.

Big picture difference

The optimized version compiles grammars into Python classes which can be used like this:

g = Grammar()
result = g.run("foo", "input string")

The VM based version also compiles grammars into Python classes with the same interface. The difference is how the run method is implemented. In the optimized version, it calls methods that represent rules in the grammar. The above call would result in _rule_foo being called. That in turn would make calls to other methods representing other rules in the grammar. The VM based version instead has a sequence of instructions and a program counter that keeps track of which instruction to execute. The VM is invoked from the run method.

To make the difference more clear, let's look at the Scream grammar that turns its input into a screaming equivalent (hello to HELLO!! for example):

Scream {
    scream = char*:xs -> { xs "!!" }
    char   = .:x      -> upper(x)
}

The optimized version compiles it into the following class with one method per rule in the grammar:

class Scream(_Grammar):

    def _rule_scream(self):
        return (lambda: (lambda _vars:
            (lambda: self._and([
                (lambda: _vars.bind('xs', (lambda: self._star((lambda: self._match_rule('char'))))())),
                (lambda: _SemanticAction(lambda: _Builder.create([
                    _vars.lookup('xs').eval(),
                    '!!',
                ]))),
            ]))()
        )(_Vars()))()

    def _rule_char(self):
        return (lambda: (lambda _vars:
            (lambda: self._and([
                (lambda: _vars.bind('x', self._match_any())),
                (lambda: _SemanticAction(lambda: upper(
                    _vars.lookup('x').eval(),
                ))),
            ]))()
        )(_Vars()))()

The VM based version compiles it into the following class with a sequence of instructions (don't worry about understanding them now, it will be clear later what the instructions mean):

class Scream(_Grammar):

    def __init__(self):
        self._instructions = i = []
        self._labels = l = {}
        def I(name, x=None, y=None):
            i.append((name, x, y))
        def LABEL(name):
            l[name] = len(i)
        LABEL('scream')
        I('PUSH_SCOPE')
        I('LIST_START')
        LABEL(0)
        I('BACKTRACK', 1)
        I('CALL', 'char')
        I('LIST_APPEND')
        I('COMMIT', 0)
        LABEL(1)
        I('LIST_END')
        I('BIND', 'xs')
        I('ACTION', lambda scope: _Builder.create([scope['xs'].eval(), '!!']))
        I('POP_SCOPE')
        I('RETURN')
        LABEL('char')
        I('PUSH_SCOPE')
        I('MATCH_ANY')
        I('BIND', 'x')
        I('ACTION', lambda scope: upper(scope['x'].eval()))
        I('POP_SCOPE')
        I('RETURN')

The external interface of the classes is exactly the same, but internally they look rather different. The run method in the optimized version looks like this:

def run(self, rule_name, input_object):
    self._memo = _Memo()
    self._stream = _Stream.from_object(self._memo, input_object)
    result = self._match_rule(rule_name).eval()
    if isinstance(result, _Builder):
        return result.build_string()
    else:
        return result

It does some setup and then calls _match_rule to start matching.

The run method in the VM based version looks like this:

  1. support.py
  2. classes
class _Grammar(object):

    def run(self, rule_name, input_object):
        if isinstance(input_object, basestring):
            stream = input_object
        else:
            stream = [input_object]
        result = rlmeta_vm(self._instructions, self._labels, rule_name, stream)
        if isinstance(result, _Builder):
            return result.build_string()
        else:
            return result

It also does some setup, but then it hands over the instructions (that are created in the __init__ method) to the VM that then executes them.

In the rest of this article I explain how grammars are compiled into VM instructions and how the VM is implemented.

Parser

The parser in the VM based version has one additional rule for labels:

  1. parser.rlmeta
  2. expr1
| space '#' -> ["Label"]

A label returns a semantic action that evaluates to a unique number. You will see later (in Or, Star, and Not) how it is used in the code generator.

The label feature first had to be added to the optimized version before it could be used to compile the VM based version. That implementation is not shown in this article, but the implementation of labels in the VM based version is similar.

The rest of the parser is exactly the same as in the optimized version:

  1. parser.rlmeta
Parser {
  grammar =
    | name:x space '{' rule*:ys space '}'      -> ["Grammar" x ~ys]
  rule =
    | name:x space '=' choice:y                -> ["Rule" x y]
  choice =
    | (space '|')?
      sequence:x (space '|' sequence)*:xs      -> ["Or" x ~xs]
  sequence =
    | expr:x expr*:xs                          -> ["Scope" ["And" x ~xs]]
  expr =
    | expr1:x space ':' name:y                 -> ["Bind" y x]
    | expr1
  expr1 =
    | expr2:x space '*'                        -> ["Star" x]
    | expr2:x space '?'                        -> ["Or" x ["And"]]
    | space '!' expr2:x                        -> ["Not" x]
    | space '%'                                -> ["MatchCallRule"]
    <<expr1>>
    | expr2
  expr2 =
    | space '->' hostExpr:x                    -> ["SemanticAction" x]
    | name:x !(space '=')                      -> ["MatchRule" x]
    | space char:x '-' char:y                  -> ["MatchRange" x y]
    | space string:x                           -> ["MatchString" x]
    | space charseq:x                          -> ["MatchCharseq" x]
    | space '.'                                -> ["MatchAny"]
    | space '(' choice:x space ')'             -> x
    | space '[' expr*:xs space ']'             -> ["MatchList" ["And" ~xs]]
  hostExpr =
    | space string:x                           -> ["String" x]
    | space '[' hostExprListItem*:xs space ']' -> ["List" ~xs]
    | space '{' buildExpr*:xs space '}'        -> ["Builder" ~xs]
    | name:x space '(' hostExpr*:ys space ')'  -> ["FnCall" x ~ys]
    | name:x                                   -> ["VarLookup" x]
  hostExprListItem =
    | space '~' hostExpr:x                     -> ["ListItemSplice" x]
    | hostExpr
  buildExpr =
    | space '>'                                -> ["IndentBuilder"]
    | space '<'                                -> ["DedentBuilder"]
    | hostExpr
  string    = '"'  (!'"'  innerChar)*:xs '"'   -> join(xs)
  charseq   = '\'' (!'\'' innerChar)*:xs '\''  -> join(xs)
  char      = '\''  !'\'' innerChar  :x  '\''  -> x
  innerChar = '\\' escape | .
  escape    = '\\' -> "\\" | '\'' -> "'"
            | '"'  -> "\"" | 'n'  -> "\n"
  name      = space nameStart:x nameChar*:xs   -> join([x ~xs])
  nameStart = 'a'-'z' | 'A'-'Z'
  nameChar  = 'a'-'z' | 'A'-'Z' | '0'-'9'
  space     = (' ' | '\n')*
}

Code generator

The code generator in the VM based version is similarly structured to the code generator in the optimized version with a grammar and a support library:

  1. codegenerator.rlmeta
CodeGenerator {
  <<rules>>
}
  1. support.py
<<imports>>

<<vm>>

<<classes>>

The ast rule is exactly the same as in the optimized version:

  1. codegenerator.rlmeta
  2. rules
ast = [%:x] -> x

Then there is an additional rule for when a Python representation of a value is needed:

  1. codegenerator.rlmeta
  2. rules
py = .:x -> repr(x)

Let's move on and look at how VM instructions are generated.

Grammar

When a Grammar AST node is matched, a Python class inheriting _Grammar is generated:

  1. codegenerator.rlmeta
  2. rules
Grammar = .:x ast*:ys -> { "class " x "(_Grammar):\n\n" >
                             "def __init__(self):\n" >
                               "self._instructions = i = []\n"
                               "self._labels = l = {}\n"
                               "def I(name, x=None, y=None):\n" >
                                 "i.append((name, x, y))\n"
                               <
                               "def LABEL(name):\n" >
                                 "l[name] = len(i)\n"
                               <
                               ys
                             <
                           < }

The name of the class is the same as the name of the grammar.

The __init__ method has functionality for creating instructions. An instruction is represented as a Python tuple with three elements: the name, the first argument, and the second argument. Arguments can be None. Instructions are stored in a list. Labels is a dictionary mapping label names to positions in the instruction list.

Shorthand names i and l are used instead of self._instructions and self._labels because they are faster. Not using self reduces one dictionary lookup.

The child AST nodes of Grammar are assumed to use the I and LABEL functions to create instructions.

Rule

When a Rule AST node is matched, instructions representing a function are generated:

  1. codegenerator.rlmeta
  2. rules
Rule = py:x ast:y -> { "LABEL(" x ")\n"
                       y
                       "I('RETURN')\n" }

In assembly-like notation (where labels are in the first column and instructions are in the second column) it looks like this:

<x>:
    <y instructions>
    RETURN

The label name is the name of the function. RETURN instructs the VM to resume execution at wherever it was before calling the function.

Or

When an Or AST node is matched, instructions representing a choice are generated:

  1. codegenerator.rlmeta
  2. rules
Or =
  | ast:x Or:y #:a #:b -> { "I('BACKTRACK', " a ")\n"
                            x
                            "I('COMMIT', " b ")\n"
                            "LABEL(" a ")\n"
                            y
                            "LABEL(" b ")\n" }
  | ast

In assembly-like notation it looks like this:

    BACKTRACK a
    <x instructions>
    COMMIT b
a:
    <y instructions>
b:

BACKTRACK instructs the VM to push a backtrack entry onto the stack so that it can try matching again at label a if the x instructions fail. COMMIT instructs the VM to pop the backtrack entry off the stack and continue execution at label b. If x instructions fail, the second choice at label a is tried, otherwise, execution continues at label b. The y instructions might represent another choice or the last choice. If there is only once choice, only instructions for that choice are generated. In that case, no BACKTRACK and COMMIT are needed.

Scope

When a Scope AST node is matched, instructions creating a new scope are generated:

  1. codegenerator.rlmeta
  2. rules
Scope = ast:x -> { "I('PUSH_SCOPE')\n"
                   x
                   "I('POP_SCOPE')\n" }

In assembly-like notation it looks like this:

    PUSH_SCOPE
    <x instructions>
    POP_SCOPE

PUSH_SCOPE instructs the VM to push a new scope onto the stack so that all bindings that are done by x instructions happen in this new scope. POP_SCOPE instructs the VM to pop the current scope off the stack.

And

When an And AST node is matched, instructions for all items in the sequence are generated:

  1. codegenerator.rlmeta
  2. rules
And = ast*

Bind

When a Bind AST node is matched, instructions binding the last result to a name are generated:

  1. codegenerator.rlmeta
  2. rules
Bind = py:x ast:y -> { y
                       "I('BIND', " x ")\n" }

In assembly-like notation it looks like this:

    <y instructions>
    BIND <x>

BIND instructs the VM to bind the last result to the name x in the current scope.

Star

When a Star AST node is matched, instructions representing a repetition are generated:

  1. codegenerator.rlmeta
  2. rules
Star = ast:x #:a #:b -> { "I('LIST_START')\n"
                          "LABEL(" a ")\n"
                          "I('BACKTRACK', " b ")\n"
                          x
                          "I('LIST_APPEND')\n"
                          "I('COMMIT', " a ")\n"
                          "LABEL(" b ")\n"
                          "I('LIST_END')\n" }

In assembly-like notation it looks like this:

    LIST_START
a:
    BACKTRACK b
    <x instructions>
    LIST_APPEND
    COMMIT a
b:
    LIST_END

LIST_START instructs the VM to create a new list for accumulating results. LIST_APPEND instructs the VM to append the last result to the list. LIST_END instructs the VM to make the list itself the last result. The BACKTRACK and COMMIT instructions are used to create control flow for a loop. As long as x instructions succeed, the program loops between label a and the COMMIT instruction. As soon as x instructions fail, the program continues execution at label b.

Not

When a Not AST node is matched, instructions representing negative lookahead are generated:

  1. codegenerator.rlmeta
  2. rules
Not = ast:x #:a #:b -> { "I('BACKTRACK', " b ")\n"
                         x
                         "I('COMMIT', " a ")\n"
                         "LABEL(" a ")\n"
                         "I('FAIL', 'no match expected')\n"
                         "LABEL(" b ")\n" }

In assembly-like notation it looks like this:

    BACKTRACK b
    <x instructions>
    COMMIT a
a:
    FAIL 'no match expected'
b:

FAIL instructs the VM that it should backtrack and try matching the next choice. The BACKTRACK and COMMIT instructions are used to create control flow for negative lookahead. If x instructions succeed, the COMMIT instruction makes the program continue at label a. That immediately fails because the negative lookahead does not expect a match. If x instructions fail, the program continues execution at label b, and the FAIL instruction is skipped.

MatchCallRule

When a MatchCallRule AST node is matched, an instruction representing that operation is generated:

  1. codegenerator.rlmeta
  2. rules
MatchCallRule = -> { "I('MATCH_CALL_RULE')\n" }

In assembly-like notation it looks like this:

    MATCH_CALL_RULE

MATCH_CALL_RULE instructs the VM to call the rule denoted by the current input object.

Label

When a Label AST node is matched, an instruction representing that operation is generated:

  1. codegenerator.rlmeta
  2. rules
Label = -> { "I('LABEL')\n" }

In assembly-like notation it looks like this:

    LABEL

LABEL instructs the VM to make the last result a semantic action that returns a unique number.

SemanticAction

When a SemanticAction AST node is matched, an instruction representing that operation is generated:

  1. codegenerator.rlmeta
  2. rules
SemanticAction = ast:x -> { "I('ACTION', lambda scope: " x ")\n" }

In assembly-like notation it looks like this:

    ACTION <python lambda>

ACTION instructs the VM to make the last result a semantic action.

Semantic actions are not evaluated by the VM, but rather by Python. The VM is only responsible for matching and creating semantic actions as result.

The lambda expression is generated by the following rules similarly to how it was done in the optimized version:

  1. codegenerator.rlmeta
  2. rules
String        = py
List          = astList
Builder       = astItems:x      -> { "_Builder.create([" x "])" }
IndentBuilder =                 -> { "_IndentBuilder()"         }
DedentBuilder =                 -> { "_DedentBuilder()"         }
FnCall        = .:x astItems:y  -> { x "(" y ")"                }
VarLookup     = py:x            -> { "scope[" x "].eval()"      }
astItems      =
  | ast:x astItem*:xs           -> { x xs                       }
  |                             -> {                            }
astItem       = ast:x           -> { ", " x                     }
astList       = astListItem*:xs -> { "(" xs "[])"               }
astListItem   =
  | ["ListItemSplice" ast:x]    -> {     x  "+"                 }
  | ast:x                       -> { "[" x "]+"                 }

The related pieces in the support library are exactly the same as in the optimized version:

  1. support.py
  2. imports
try:
    from cStringIO import StringIO
except:
    from StringIO import StringIO
  1. support.py
  2. classes
class _Builder(object):

    def build_string(self):
        output = _Output()
        self.write(output)
        return output.value

    @classmethod
    def create(self, item):
        if isinstance(item, _Builder):
            return item
        elif isinstance(item, list):
            return _ListBuilder([_Builder.create(x) for x in item])
        else:
            return _AtomBuilder(item)

class _Output(object):

    def __init__(self):
        self.buffer = StringIO()
        self.indentation = 0
        self.on_newline = True

    @property
    def value(self):
        return self.buffer.getvalue()

    def write(self, value):
        for ch in value:
            is_linebreak = ch == "\n"
            if self.indentation and self.on_newline and not is_linebreak:
                self.buffer.write("    "*self.indentation)
            self.buffer.write(ch)
            self.on_newline = is_linebreak

class _ListBuilder(_Builder):

    def __init__(self, builders):
        self.builders = builders

    def write(self, output):
        for builder in self.builders:
            builder.write(output)

class _AtomBuilder(_Builder):

    def __init__(self, atom):
        self.atom = atom

    def write(self, output):
        output.write(str(self.atom))

class _IndentBuilder(_Builder):

    def write(self, output):
        output.indentation += 1

class _DedentBuilder(_Builder):

    def write(self, output):
        output.indentation -= 1

MatchRule

When a MatchRule AST node is matched, an instruction representing that operation is generated:

  1. codegenerator.rlmeta
  2. rules
MatchRule = py:x -> { "I('CALL', " x ")\n" }

In assembly-like notation it looks like this:

    CALL <x>

CALL instructs the VM to call the given rule.

MatchRange

When a MatchRange AST node is matched, an instruction representing that operation is generated:

  1. codegenerator.rlmeta
  2. rules
MatchRange = py:x py:y -> { "I('MATCH_RANGE', " x ", " y ")\n" }

In assembly-like notation it looks like this:

    MATCH_RANGE <x> <y>

MATCH_RANGE instructs the VM to match an object in the given range.

MatchString

When a MatchString AST node is matched, an instruction representing that operation is generated:

  1. codegenerator.rlmeta
  2. rules
MatchString = py:x -> { "I('MATCH_STRING', " x ")\n" }

In assembly-like notation it looks like this:

    MATCH_STRING <x>

MATCH_STRING instructs the VM to match the given string.

MatchCharseq

When a MatchCharseq AST node is matched, an instruction representing that operation is generated:

  1. codegenerator.rlmeta
  2. rules
MatchCharseq = py:x -> { "I('MATCH_CHARSEQ', " x ")\n" }

In assembly-like notation it looks like this:

    MATCH_CHARSEQ <x>

MATCH_CHARSEQ instructs the VM to match the given sequence of characters.

MatchAny

When a MatchAny AST node is matched, an instruction representing that operation is generated:

  1. codegenerator.rlmeta
  2. rules
MatchAny = -> { "I('MATCH_ANY')\n" }

In assembly-like notation it looks like this:

    MATCH_ANY

MATCH_ANY instructs the VM to match any object.

MatchList

When a MatchList AST node is matched, instructions changing the input stream are generated:

  1. codegenerator.rlmeta
  2. rules
MatchList = ast:x -> { "I('PUSH_STREAM')\n"
                       x
                       "I('POP_STREAM')\n" }

In assembly-like notation it looks like this:

    PUSH_STREAM
    <x instructions>
    POP_STREAM

PUSH_STREAM instructs the VM to push the current input object onto the stack so that x instructions see it as the current input stream. POP_STREAM instructs the VM to pop the current input stream off the stack.

Example revisited

The generated instructions for the Scream grammar from the beginning of the article should now make more sense:

Scream {
    scream = char*:xs -> { xs "!!" }
    char   = .:x      -> upper(x)
}
class Scream(_Grammar):

    def __init__(self):
        self._instructions = i = []
        self._labels = l = {}
        def I(name, x=None, y=None):
            i.append((name, x, y))
        def LABEL(name):
            l[name] = len(i)
        LABEL('scream')
        I('PUSH_SCOPE')
        I('LIST_START')
        LABEL(0)
        I('BACKTRACK', 1)
        I('CALL', 'char')
        I('LIST_APPEND')
        I('COMMIT', 0)
        LABEL(1)
        I('LIST_END')
        I('BIND', 'xs')
        I('ACTION', lambda scope: _Builder.create([scope['xs'].eval(), '!!']))
        I('POP_SCOPE')
        I('RETURN')
        LABEL('char')
        I('PUSH_SCOPE')
        I('MATCH_ANY')
        I('BIND', 'x')
        I('ACTION', lambda scope: upper(scope['x'].eval()))
        I('POP_SCOPE')
        I('RETURN')

There are two labels for the two rules in the grammar. Both blocks of instructions end with a RETURN instruction so that those labels/functions can be called and returned from. The blocks are are also wrapped in PUSH_SCOPE/POP_SCOPE instructions so that variable bindings for the different calls happen in different scopes. Otherwise they would overwrite each other. The scream rule has a repetition and therefore also the LIST_* instructions. It also has generated label names (0 and 1) to create the loop. The BIND instructions bind the last result to a name in the current scope. The ACTION instructions have Python lambdas as first argument that are evaluated when there is a match. They get one argument which is the scope that was active when the action was defined.

Now let's move on to the implementation of the VM to understand how the execution of these instructions work.

VM

The VM is implemented as a single Python function with the following definition:

  1. support.py
  2. vm
def rlmeta_vm(instructions, labels, start_rule, stream):
    <<init>>
    <<loop>>

It takes a list of instructions to execute, a dictionary of labels, the name of the start rule, and the input stream. The init section sets up the VM state:

  1. support.py
  2. vm
  3. init
label_counter = 0
last_action = _ConstantSemanticAction(None)
pc = labels[start_rule]
call_backtrack_stack = []
stream, pos, stream_pos_stack = (stream, 0, [])
scope, scope_stack = (None, [])
fail_message = None
latest_fail_message, latest_fail_pos = (None, tuple())
memo = {}

The VM loop looks like this:

  1. support.py
  2. vm
  3. loop
while True:
    name, arg1, arg2 = instructions[pc]
    <<cases>>
    else:
        raise Exception("unknown command {}".format(name))
    <<fail case>>

It fetches the instruction pointed to by the program counter. Then it switches on the type of instruction. If the instruction is not recognized, an exception is raised. Finally the fail case is handled at the end of the loop since it is common to many instructions.

The order in which the cases appear in the if-chain is important. To get to the last case, all previous cases need to be tested, which takes time. So it is important that the more common cases appear earlier in the if-chain. I did an instruction frequency analysis when RLMeta compiled itself to determine the most common instructions. They are presented in that order.

The VM makes use of two semantic actions: one for constant values and one for user defined functions:

  1. support.py
  2. classes
class _ConstantSemanticAction(object):

    def __init__(self, value):
        self.value = value

    def eval(self):
        return self.value
  1. support.py
  2. classes
class _UserSemanticAction(object):

    def __init__(self, fn, value):
        self.fn = fn
        self.value = value

    def eval(self):
        return self.fn(self.value)

Let's move on to the implementation of each instruction.

PUSH_SCOPE

The most common instruction when RLMeta compiles itself is PUSH_SCOPE, and it therefore appears first. It looks like this:

  1. support.py
  2. vm
  3. loop
  4. cases
if name == "PUSH_SCOPE":
    scope_stack.append(scope)
    scope = {}
    pc += 1
    continue

It moved the current top of the stack to the list and assigns a new scope to the top of the stack. A scope is a dictionary mapping names to values.

The program counter is then incremented so that the next instruction will be executed. continue ensures that the failure case is skipped and the loop starts over immediately.

Most instructions end by modifying the program counter followed by a continue. If continue is not used, the code after the if-chain, which is the failure handling, is executed.

BACKTRACK

Next next common instruction is BACKTRACK:

  1. support.py
  2. vm
  3. loop
  4. cases
elif name == "BACKTRACK":
    call_backtrack_stack.append((labels[arg1], pos, len(stream_pos_stack), len(scope_stack)))
    pc += 1
    continue

It pushes a backtrack entry onto the stack. A backtrack entry consists of

Is lengths enough to save? Can stacks never change just topmost item?

CALL

  1. support.py
  2. vm
  3. loop
  4. cases
elif name == "CALL":
    key = (arg1, tuple([x[1] for x in stream_pos_stack]+[pos]))
    if key in memo:
        last_action, stream_pos_stack = memo[key]
        stream_pos_stack = stream_pos_stack[:]
        stream, pos = stream_pos_stack.pop()
        pc += 1
    else:
        call_backtrack_stack.append((pc+1, key))
        pc = labels[arg1]
    continue

MATCH_CHARSEQ

  1. support.py
  2. vm
  3. loop
  4. cases
elif name == "MATCH_CHARSEQ":
    for char in arg1:
        if pos >= len(stream) or stream[pos] != char:
            fail_message = ("expected {!r}", char)
            break
        pos += 1
    else:
        last_action = _ConstantSemanticAction(arg1)
        pc += 1
        continue

COMMIT

  1. support.py
  2. vm
  3. loop
  4. cases
elif name == "COMMIT":
    call_backtrack_stack.pop()
    pc = labels[arg1]
    continue

POP_SCOPE

  1. support.py
  2. vm
  3. loop
  4. cases
elif name == "POP_SCOPE":
    scope = scope_stack.pop()
    pc += 1
    continue

RETURN

  1. support.py
  2. vm
  3. loop
  4. cases
elif name == "RETURN":
    if len(call_backtrack_stack) == 0:
        return last_action.eval()
    pc, key = call_backtrack_stack.pop()
    memo[key] = (last_action, stream_pos_stack+[(stream, pos)])
    continue

LIST_APPEND

  1. support.py
  2. vm
  3. loop
  4. cases
elif name == "LIST_APPEND":
    scope.append(last_action)
    pc += 1
    continue

BIND

  1. support.py
  2. vm
  3. loop
  4. cases
elif name == "BIND":
    scope[arg1] = last_action
    pc += 1
    continue

ACTION

  1. support.py
  2. vm
  3. loop
  4. cases
elif name == "ACTION":
    last_action = _UserSemanticAction(arg1, scope)
    pc += 1
    continue

MATCH_RANGE

  1. support.py
  2. vm
  3. loop
  4. cases
elif name == "MATCH_RANGE":
    if pos >= len(stream) or not (arg1 <= stream[pos] <= arg2):
        fail_message = ("expected range {!r}-{!r}", arg1, arg2)
    else:
        last_action = _ConstantSemanticAction(stream[pos])
        pos += 1
        pc += 1
        continue

LIST_START

  1. support.py
  2. vm
  3. loop
  4. cases
elif name == "LIST_START":
    scope_stack.append(scope)
    scope = []
    pc += 1
    continue

LIST_END

  1. support.py
  2. vm
  3. loop
  4. cases
elif name == "LIST_END":
    last_action = _UserSemanticAction(lambda xs: [x.eval() for x in xs], scope)
    scope = scope_stack.pop()
    pc += 1
    continue

MATCH_ANY

  1. support.py
  2. vm
  3. loop
  4. cases
elif name == "MATCH_ANY":
    if pos >= len(stream):
        fail_message = ("expected any",)
    else:
        last_action = _ConstantSemanticAction(stream[pos])
        pos += 1
        pc += 1
        continue

PUSH_STREAM

  1. support.py
  2. vm
  3. loop
  4. cases
elif name == "PUSH_STREAM":
    if pos >= len(stream) or not isinstance(stream[pos], list):
        fail_message = ("expected list",)
    else:
        stream_pos_stack.append((stream, pos+1))
        stream = stream[pos]
        pos = 0
        pc += 1
        continue

POP_STREAM

  1. support.py
  2. vm
  3. loop
  4. cases
elif name == "POP_STREAM":
    if pos < len(stream):
        fail_message = ("expected end of list",)
    else:
        stream, pos = stream_pos_stack.pop()
        pc += 1
        continue

MATCH_CALL_RULE

  1. support.py
  2. vm
  3. loop
  4. cases
elif name == "MATCH_CALL_RULE":
    if pos >= len(stream):
        fail_message = ("expected any",)
    else:
        fn_name = str(stream[pos])
        key = (fn_name, tuple([x[1] for x in stream_pos_stack]+[pos]))
        if key in memo:
            last_action, stream_pos_stack = memo[key]
            stream_pos_stack = stream_pos_stack[:]
            stream, pos = stream_pos_stack.pop()
            pc += 1
        else:
            call_backtrack_stack.append((pc+1, key))
            pc = labels[fn_name]
            pos += 1
        continue

FAIL

  1. support.py
  2. vm
  3. loop
  4. cases
elif name == "FAIL":
    fail_message = (arg1,)

LABEL

  1. support.py
  2. vm
  3. loop
  4. cases
elif name == "LABEL":
    last_action = _ConstantSemanticAction(label_counter)
    label_counter += 1
    pc += 1
    continue

MATCH_STRING

  1. support.py
  2. vm
  3. loop
  4. cases
elif name == "MATCH_STRING":
    if pos >= len(stream) or stream[pos] != arg1:
        fail_message = ("expected {!r}", arg1)
    else:
        last_action = _ConstantSemanticAction(arg1)
        pos += 1
        pc += 1
        continue

Handling failure

  1. support.py
  2. vm
  3. loop
  4. fail case
fail_pos = tuple([x[1] for x in stream_pos_stack]+[pos])
if fail_pos >= latest_fail_pos:
    latest_fail_message = fail_message
    latest_fail_pos = fail_pos
  1. support.py
  2. vm
  3. loop
  4. fail case
call_backtrack_entry = tuple()
while call_backtrack_stack:
    call_backtrack_entry = call_backtrack_stack.pop()
    if len(call_backtrack_entry) == 4:
        break
if len(call_backtrack_entry) != 4:
    raise _MatchError(latest_fail_message, pos, stream)
(pc, pos, stream_stack_len, scope_stack_len) = call_backtrack_entry
if len(stream_pos_stack) > stream_stack_len:
    stream = stream_pos_stack[stream_stack_len][0]
stream_pos_stack = stream_pos_stack[:stream_stack_len]
if len(scope_stack) > scope_stack_len:
    scope = scope_stack[scope_stack_len]
scope_stack = scope_stack[:scope_stack_len]
  1. support.py
  2. classes
class _MatchError(Exception):

    def __init__(self, message, pos, stream):
        Exception.__init__(self)
        self.message = message
        self.pos = pos
        self.stream = stream

    def describe(self):
        message = ""
        if isinstance(self.stream, basestring):
            pos1, error_line_before = self._extract_line(self.stream, self.pos, -1)
            pos2, error_line_after = self._extract_line(self.stream, self.pos+1, 1)
            _, context_before = self._extract_line(self.stream, pos1, -1)
            _, context_after = self._extract_line(self.stream, pos2, 1)
            if context_before:
                message += "> "
                message += context_before
                message += "\n"
            message += "> "
            message += error_line_before
            message += error_line_after
            message += "\n"
            message += "--"
            message += "-"*(len(error_line_before)-1)
            message += "^\n"
            if context_after:
                message += "> "
                message += context_after
                message += "\n"
        else:
            message += "todo: list failure context\n"
        message += "Error: "
        message += self.message[0].format(*self.message[1:])
        message += "\n"
        return message

    def _extract_line(self, text, pos, direction):
        line = []
        while pos >= 0:
            try:
                if text[pos] == "\n":
                    if line:
                        break
                else:
                    if direction == 1:
                        line.append(text[pos])
                    else:
                        line.insert(0, text[pos])
                pos += direction
            except IndexError:
                break
        return (pos+direction, "".join(line))

Fix list failure messages. And cleanup failure message formatting.

Note on size

wc -l ...

Note on performance

todo: make performance comparision

Code listings for RLMeta

parser.rlmeta

Parser {
  grammar =
    | name:x space '{' rule*:ys space '}'      -> ["Grammar" x ~ys]
  rule =
    | name:x space '=' choice:y                -> ["Rule" x y]
  choice =
    | (space '|')?
      sequence:x (space '|' sequence)*:xs      -> ["Or" x ~xs]
  sequence =
    | expr:x expr*:xs                          -> ["Scope" ["And" x ~xs]]
  expr =
    | expr1:x space ':' name:y                 -> ["Bind" y x]
    | expr1
  expr1 =
    | expr2:x space '*'                        -> ["Star" x]
    | expr2:x space '?'                        -> ["Or" x ["And"]]
    | space '!' expr2:x                        -> ["Not" x]
    | space '%'                                -> ["MatchCallRule"]
    | space '#'                                -> ["Label"]
    | expr2
  expr2 =
    | space '->' hostExpr:x                    -> ["SemanticAction" x]
    | name:x !(space '=')                      -> ["MatchRule" x]
    | space char:x '-' char:y                  -> ["MatchRange" x y]
    | space string:x                           -> ["MatchString" x]
    | space charseq:x                          -> ["MatchCharseq" x]
    | space '.'                                -> ["MatchAny"]
    | space '(' choice:x space ')'             -> x
    | space '[' expr*:xs space ']'             -> ["MatchList" ["And" ~xs]]
  hostExpr =
    | space string:x                           -> ["String" x]
    | space '[' hostExprListItem*:xs space ']' -> ["List" ~xs]
    | space '{' buildExpr*:xs space '}'        -> ["Builder" ~xs]
    | name:x space '(' hostExpr*:ys space ')'  -> ["FnCall" x ~ys]
    | name:x                                   -> ["VarLookup" x]
  hostExprListItem =
    | space '~' hostExpr:x                     -> ["ListItemSplice" x]
    | hostExpr
  buildExpr =
    | space '>'                                -> ["IndentBuilder"]
    | space '<'                                -> ["DedentBuilder"]
    | hostExpr
  string    = '"'  (!'"'  innerChar)*:xs '"'   -> join(xs)
  charseq   = '\'' (!'\'' innerChar)*:xs '\''  -> join(xs)
  char      = '\''  !'\'' innerChar  :x  '\''  -> x
  innerChar = '\\' escape | .
  escape    = '\\' -> "\\" | '\'' -> "'"
            | '"'  -> "\"" | 'n'  -> "\n"
  name      = space nameStart:x nameChar*:xs   -> join([x ~xs])
  nameStart = 'a'-'z' | 'A'-'Z'
  nameChar  = 'a'-'z' | 'A'-'Z' | '0'-'9'
  space     = (' ' | '\n')*
}

codegenerator.rlmeta

CodeGenerator {
  ast            = [%:x]           -> x
  py             = .:x             -> repr(x)
  Grammar        = .:x ast*:ys     -> { "class " x "(_Grammar):\n\n" >
                                          "def __init__(self):\n" >
                                            "self._instructions = i = []\n"
                                            "self._labels = l = {}\n"
                                            "def I(name, x=None, y=None):\n" >
                                              "i.append((name, x, y))\n"
                                            <
                                            "def LABEL(name):\n" >
                                              "l[name] = len(i)\n"
                                            <
                                            ys
                                          <
                                        <                                    }
  Rule           = py:x ast:y      -> { "LABEL(" x ")\n"
                                        y
                                        "I('RETURN')\n"                      }
  Or             =
    | ast:x Or:y #:a #:b           -> { "I('BACKTRACK', " a ")\n"
                                        x
                                        "I('COMMIT', " b ")\n"
                                        "LABEL(" a ")\n"
                                        y
                                        "LABEL(" b ")\n"                     }
    | ast
  Scope          = ast:x           -> { "I('PUSH_SCOPE')\n"
                                        x
                                        "I('POP_SCOPE')\n"                   }
  And            = ast*
  Bind           = py:x ast:y      -> { y
                                        "I('BIND', " x ")\n"                 }
  Star           = ast:x #:a #:b   -> { "I('LIST_START')\n"
                                        "LABEL(" a ")\n"
                                        "I('BACKTRACK', " b ")\n"
                                        x
                                        "I('LIST_APPEND')\n"
                                        "I('COMMIT', " a ")\n"
                                        "LABEL(" b ")\n"
                                        "I('LIST_END')\n"                    }
  Not            = ast:x #:a #:b   -> { "I('BACKTRACK', " b ")\n"
                                        x
                                        "I('COMMIT', " a ")\n"
                                        "LABEL(" a ")\n"
                                        "I('FAIL', 'no match expected')\n"
                                        "LABEL(" b ")\n"                     }
  MatchCallRule  =                 -> { "I('MATCH_CALL_RULE')\n"             }
  Label          =                 -> { "I('LABEL')\n"                       }
  SemanticAction = ast:x           -> { "I('ACTION', lambda scope: " x ")\n" }
  String         = py
  List           = astList
  Builder        = astItems:x      -> { "_Builder.create([" x "])"           }
  IndentBuilder  =                 -> { "_IndentBuilder()"                   }
  DedentBuilder  =                 -> { "_DedentBuilder()"                   }
  FnCall         = .:x astItems:y  -> { x "(" y ")"                          }
  VarLookup      = py:x            -> { "scope[" x "].eval()"                }
  astItems       =
    | ast:x astItem*:xs            -> { x xs                                 }
    |                              -> {                                      }
  astItem        = ast:x           -> { ", " x                               }
  astList        = astListItem*:xs -> { "(" xs "[])"                         }
  astListItem    =
    | ["ListItemSplice" ast:x]     -> {     x  "+"                           }
    | ast:x                        -> { "[" x "]+"                           }
  MatchRule      = py:x            -> { "I('CALL', " x ")\n"                 }
  MatchRange     = py:x py:y       -> { "I('MATCH_RANGE', " x ", " y ")\n"   }
  MatchString    = py:x            -> { "I('MATCH_STRING', " x ")\n"         }
  MatchCharseq   = py:x            -> { "I('MATCH_CHARSEQ', " x ")\n"        }
  MatchAny       =                 -> { "I('MATCH_ANY')\n"                   }
  MatchList      = ast:x           -> { "I('PUSH_STREAM')\n"
                                        x
                                        "I('POP_STREAM')\n"                  }
}

support.py

try:
    from cStringIO import StringIO
except:
    from StringIO import StringIO

def rlmeta_vm(instructions, labels, start_rule, stream):
    label_counter = 0
    last_action = _ConstantSemanticAction(None)
    pc = labels[start_rule]
    call_backtrack_stack = []
    stream, pos, stream_pos_stack = (stream, 0, [])
    scope, scope_stack = (None, [])
    fail_message = None
    latest_fail_message, latest_fail_pos = (None, tuple())
    memo = {}
    while True:
        name, arg1, arg2 = instructions[pc]
        if name == "PUSH_SCOPE":
            scope_stack.append(scope)
            scope = {}
            pc += 1
            continue
        elif name == "BACKTRACK":
            call_backtrack_stack.append((labels[arg1], pos, len(stream_pos_stack), len(scope_stack)))
            pc += 1
            continue
        elif name == "CALL":
            key = (arg1, tuple([x[1] for x in stream_pos_stack]+[pos]))
            if key in memo:
                last_action, stream_pos_stack = memo[key]
                stream_pos_stack = stream_pos_stack[:]
                stream, pos = stream_pos_stack.pop()
                pc += 1
            else:
                call_backtrack_stack.append((pc+1, key))
                pc = labels[arg1]
            continue
        elif name == "MATCH_CHARSEQ":
            for char in arg1:
                if pos >= len(stream) or stream[pos] != char:
                    fail_message = ("expected {!r}", char)
                    break
                pos += 1
            else:
                last_action = _ConstantSemanticAction(arg1)
                pc += 1
                continue
        elif name == "COMMIT":
            call_backtrack_stack.pop()
            pc = labels[arg1]
            continue
        elif name == "POP_SCOPE":
            scope = scope_stack.pop()
            pc += 1
            continue
        elif name == "RETURN":
            if len(call_backtrack_stack) == 0:
                return last_action.eval()
            pc, key = call_backtrack_stack.pop()
            memo[key] = (last_action, stream_pos_stack+[(stream, pos)])
            continue
        elif name == "LIST_APPEND":
            scope.append(last_action)
            pc += 1
            continue
        elif name == "BIND":
            scope[arg1] = last_action
            pc += 1
            continue
        elif name == "ACTION":
            last_action = _UserSemanticAction(arg1, scope)
            pc += 1
            continue
        elif name == "MATCH_RANGE":
            if pos >= len(stream) or not (arg1 <= stream[pos] <= arg2):
                fail_message = ("expected range {!r}-{!r}", arg1, arg2)
            else:
                last_action = _ConstantSemanticAction(stream[pos])
                pos += 1
                pc += 1
                continue
        elif name == "LIST_START":
            scope_stack.append(scope)
            scope = []
            pc += 1
            continue
        elif name == "LIST_END":
            last_action = _UserSemanticAction(lambda xs: [x.eval() for x in xs], scope)
            scope = scope_stack.pop()
            pc += 1
            continue
        elif name == "MATCH_ANY":
            if pos >= len(stream):
                fail_message = ("expected any",)
            else:
                last_action = _ConstantSemanticAction(stream[pos])
                pos += 1
                pc += 1
                continue
        elif name == "PUSH_STREAM":
            if pos >= len(stream) or not isinstance(stream[pos], list):
                fail_message = ("expected list",)
            else:
                stream_pos_stack.append((stream, pos+1))
                stream = stream[pos]
                pos = 0
                pc += 1
                continue
        elif name == "POP_STREAM":
            if pos < len(stream):
                fail_message = ("expected end of list",)
            else:
                stream, pos = stream_pos_stack.pop()
                pc += 1
                continue
        elif name == "MATCH_CALL_RULE":
            if pos >= len(stream):
                fail_message = ("expected any",)
            else:
                fn_name = str(stream[pos])
                key = (fn_name, tuple([x[1] for x in stream_pos_stack]+[pos]))
                if key in memo:
                    last_action, stream_pos_stack = memo[key]
                    stream_pos_stack = stream_pos_stack[:]
                    stream, pos = stream_pos_stack.pop()
                    pc += 1
                else:
                    call_backtrack_stack.append((pc+1, key))
                    pc = labels[fn_name]
                    pos += 1
                continue
        elif name == "FAIL":
            fail_message = (arg1,)
        elif name == "LABEL":
            last_action = _ConstantSemanticAction(label_counter)
            label_counter += 1
            pc += 1
            continue
        elif name == "MATCH_STRING":
            if pos >= len(stream) or stream[pos] != arg1:
                fail_message = ("expected {!r}", arg1)
            else:
                last_action = _ConstantSemanticAction(arg1)
                pos += 1
                pc += 1
                continue
        else:
            raise Exception("unknown command {}".format(name))
        fail_pos = tuple([x[1] for x in stream_pos_stack]+[pos])
        if fail_pos >= latest_fail_pos:
            latest_fail_message = fail_message
            latest_fail_pos = fail_pos
        call_backtrack_entry = tuple()
        while call_backtrack_stack:
            call_backtrack_entry = call_backtrack_stack.pop()
            if len(call_backtrack_entry) == 4:
                break
        if len(call_backtrack_entry) != 4:
            raise _MatchError(latest_fail_message, pos, stream)
        (pc, pos, stream_stack_len, scope_stack_len) = call_backtrack_entry
        if len(stream_pos_stack) > stream_stack_len:
            stream = stream_pos_stack[stream_stack_len][0]
        stream_pos_stack = stream_pos_stack[:stream_stack_len]
        if len(scope_stack) > scope_stack_len:
            scope = scope_stack[scope_stack_len]
        scope_stack = scope_stack[:scope_stack_len]

class _Grammar(object):

    def run(self, rule_name, input_object):
        if isinstance(input_object, basestring):
            stream = input_object
        else:
            stream = [input_object]
        result = rlmeta_vm(self._instructions, self._labels, rule_name, stream)
        if isinstance(result, _Builder):
            return result.build_string()
        else:
            return result

class _Builder(object):

    def build_string(self):
        output = _Output()
        self.write(output)
        return output.value

    @classmethod
    def create(self, item):
        if isinstance(item, _Builder):
            return item
        elif isinstance(item, list):
            return _ListBuilder([_Builder.create(x) for x in item])
        else:
            return _AtomBuilder(item)

class _Output(object):

    def __init__(self):
        self.buffer = StringIO()
        self.indentation = 0
        self.on_newline = True

    @property
    def value(self):
        return self.buffer.getvalue()

    def write(self, value):
        for ch in value:
            is_linebreak = ch == "\n"
            if self.indentation and self.on_newline and not is_linebreak:
                self.buffer.write("    "*self.indentation)
            self.buffer.write(ch)
            self.on_newline = is_linebreak

class _ListBuilder(_Builder):

    def __init__(self, builders):
        self.builders = builders

    def write(self, output):
        for builder in self.builders:
            builder.write(output)

class _AtomBuilder(_Builder):

    def __init__(self, atom):
        self.atom = atom

    def write(self, output):
        output.write(str(self.atom))

class _IndentBuilder(_Builder):

    def write(self, output):
        output.indentation += 1

class _DedentBuilder(_Builder):

    def write(self, output):
        output.indentation -= 1

class _ConstantSemanticAction(object):

    def __init__(self, value):
        self.value = value

    def eval(self):
        return self.value

class _UserSemanticAction(object):

    def __init__(self, fn, value):
        self.fn = fn
        self.value = value

    def eval(self):
        return self.fn(self.value)

class _MatchError(Exception):

    def __init__(self, message, pos, stream):
        Exception.__init__(self)
        self.message = message
        self.pos = pos
        self.stream = stream

    def describe(self):
        message = ""
        if isinstance(self.stream, basestring):
            pos1, error_line_before = self._extract_line(self.stream, self.pos, -1)
            pos2, error_line_after = self._extract_line(self.stream, self.pos+1, 1)
            _, context_before = self._extract_line(self.stream, pos1, -1)
            _, context_after = self._extract_line(self.stream, pos2, 1)
            if context_before:
                message += "> "
                message += context_before
                message += "\n"
            message += "> "
            message += error_line_before
            message += error_line_after
            message += "\n"
            message += "--"
            message += "-"*(len(error_line_before)-1)
            message += "^\n"
            if context_after:
                message += "> "
                message += context_after
                message += "\n"
        else:
            message += "todo: list failure context\n"
        message += "Error: "
        message += self.message[0].format(*self.message[1:])
        message += "\n"
        return message

    def _extract_line(self, text, pos, direction):
        line = []
        while pos >= 0:
            try:
                if text[pos] == "\n":
                    if line:
                        break
                else:
                    if direction == 1:
                        line.append(text[pos])
                    else:
                        line.insert(0, text[pos])
                pos += direction
            except IndexError:
                break
        return (pos+direction, "".join(line))

compile.sh

  1. compile.sh
#!/bin/bash

set -e

rlmeta_compiler="$(pwd)/$1"

cd "$(dirname "$0")"

to_python_string() {
    python -c 'import sys; sys.stdout.write(repr(sys.stdin.read()))'
}

support_py_string=$(to_python_string < support.py)
support_py=$(python "$rlmeta_compiler" --support)
parser_py=$(python "$rlmeta_compiler" < parser.rlmeta)
codegenerator_py=$(python "$rlmeta_compiler" < codegenerator.rlmeta)

cat <<EOF
import sys

SUPPORT = $support_py_string

$support_py

$parser_py

$codegenerator_py

join = "".join

def compile_grammar(grammar):
    parser = Parser()
    code_generator = CodeGenerator()
    return code_generator.run("ast", parser.run("grammar", grammar))

if __name__ == "__main__":
    if "--support" in sys.argv:
        sys.stdout.write(SUPPORT)
    else:
        try:
            sys.stdout.write(compile_grammar(sys.stdin.read()))
        except _MatchError as e:
            sys.stderr.write(e.describe())
            sys.exit(1)
EOF

meta_compile.sh

  1. meta_compile.sh
#!/bin/bash

set -e

cd "$(dirname "$0")"

./compile.sh rlmeta.py > rlmeta1.py

./compile.sh rlmeta1.py > rlmeta2.py

./compile.sh rlmeta2.py > rlmeta3.py

diff rlmeta2.py rlmeta3.py

diff support.py <(python rlmeta3.py --support)

mv rlmeta3.py rlmeta2.py

mv rlmeta2.py rlmeta1.py

mv rlmeta1.py rlmeta.py

echo OK

Site proudly generated by Hakyll.