##// END OF EJS Templates
Working on session manager.
Working on session manager.

File last commit:

r0:1aa783210b8e default
r93:0dab228f7441 default
Show More
lexer.py
697 lines | 23.9 KiB | text/x-python | PythonLexer
# -*- coding: utf-8 -*-
"""
pygments.lexer
~~~~~~~~~~~~~~
Base lexer classes.
:copyright: Copyright 2006-2012 by the Pygments team, see AUTHORS.
:license: BSD, see LICENSE for details.
"""
import re
from pygments.filter import apply_filters, Filter
from pygments.filters import get_filter_by_name
from pygments.token import Error, Text, Other, _TokenType
from pygments.util import get_bool_opt, get_int_opt, get_list_opt, \
make_analysator
__all__ = ['Lexer', 'RegexLexer', 'ExtendedRegexLexer', 'DelegatingLexer',
'LexerContext', 'include', 'bygroups', 'using', 'this']
_encoding_map = [('\xef\xbb\xbf', 'utf-8'),
('\xff\xfe\0\0', 'utf-32'),
('\0\0\xfe\xff', 'utf-32be'),
('\xff\xfe', 'utf-16'),
('\xfe\xff', 'utf-16be')]
_default_analyse = staticmethod(lambda x: 0.0)
class LexerMeta(type):
"""
This metaclass automagically converts ``analyse_text`` methods into
static methods which always return float values.
"""
def __new__(cls, name, bases, d):
if 'analyse_text' in d:
d['analyse_text'] = make_analysator(d['analyse_text'])
return type.__new__(cls, name, bases, d)
class Lexer(object):
"""
Lexer for a specific language.
Basic options recognized:
``stripnl``
Strip leading and trailing newlines from the input (default: True).
``stripall``
Strip all leading and trailing whitespace from the input
(default: False).
``ensurenl``
Make sure that the input ends with a newline (default: True). This
is required for some lexers that consume input linewise.
*New in Pygments 1.3.*
``tabsize``
If given and greater than 0, expand tabs in the input (default: 0).
``encoding``
If given, must be an encoding name. This encoding will be used to
convert the input string to Unicode, if it is not already a Unicode
string (default: ``'utf-8'``).
Can also be ``'guess'`` to use a simple UTF-8 / Latin1 detection, or
``'chardet'`` to use the chardet library, if it is installed.
"""
#: Name of the lexer
name = None
#: Shortcuts for the lexer
aliases = []
#: fn match rules
filenames = []
#: fn alias filenames
alias_filenames = []
#: mime types
mimetypes = []
__metaclass__ = LexerMeta
def __init__(self, **options):
self.options = options
self.stripnl = get_bool_opt(options, 'stripnl', True)
self.stripall = get_bool_opt(options, 'stripall', False)
self.ensurenl = get_bool_opt(options, 'ensurenl', True)
self.tabsize = get_int_opt(options, 'tabsize', 0)
self.encoding = options.get('encoding', 'utf-8')
# self.encoding = options.get('inencoding', None) or self.encoding
self.filters = []
for filter_ in get_list_opt(options, 'filters', ()):
self.add_filter(filter_)
def __repr__(self):
if self.options:
return '<pygments.lexers.%s with %r>' % (self.__class__.__name__,
self.options)
else:
return '<pygments.lexers.%s>' % self.__class__.__name__
def add_filter(self, filter_, **options):
"""
Add a new stream filter to this lexer.
"""
if not isinstance(filter_, Filter):
filter_ = get_filter_by_name(filter_, **options)
self.filters.append(filter_)
def analyse_text(text):
"""
Has to return a float between ``0`` and ``1`` that indicates
if a lexer wants to highlight this text. Used by ``guess_lexer``.
If this method returns ``0`` it won't highlight it in any case, if
it returns ``1`` highlighting with this lexer is guaranteed.
The `LexerMeta` metaclass automatically wraps this function so
that it works like a static method (no ``self`` or ``cls``
parameter) and the return value is automatically converted to
`float`. If the return value is an object that is boolean `False`
it's the same as if the return values was ``0.0``.
"""
def get_tokens(self, text, unfiltered=False):
"""
Return an iterable of (tokentype, value) pairs generated from
`text`. If `unfiltered` is set to `True`, the filtering mechanism
is bypassed even if filters are defined.
Also preprocess the text, i.e. expand tabs and strip it if
wanted and applies registered filters.
"""
if not isinstance(text, unicode):
if self.encoding == 'guess':
try:
text = text.decode('utf-8')
if text.startswith(u'\ufeff'):
text = text[len(u'\ufeff'):]
except UnicodeDecodeError:
text = text.decode('latin1')
elif self.encoding == 'chardet':
try:
import chardet
except ImportError:
raise ImportError('To enable chardet encoding guessing, '
'please install the chardet library '
'from http://chardet.feedparser.org/')
# check for BOM first
decoded = None
for bom, encoding in _encoding_map:
if text.startswith(bom):
decoded = unicode(text[len(bom):], encoding,
errors='replace')
break
# no BOM found, so use chardet
if decoded is None:
enc = chardet.detect(text[:1024]) # Guess using first 1KB
decoded = unicode(text, enc.get('encoding') or 'utf-8',
errors='replace')
text = decoded
else:
text = text.decode(self.encoding)
# text now *is* a unicode string
text = text.replace('\r\n', '\n')
text = text.replace('\r', '\n')
if self.stripall:
text = text.strip()
elif self.stripnl:
text = text.strip('\n')
if self.tabsize > 0:
text = text.expandtabs(self.tabsize)
if self.ensurenl and not text.endswith('\n'):
text += '\n'
def streamer():
for i, t, v in self.get_tokens_unprocessed(text):
yield t, v
stream = streamer()
if not unfiltered:
stream = apply_filters(stream, self.filters, self)
return stream
def get_tokens_unprocessed(self, text):
"""
Return an iterable of (tokentype, value) pairs.
In subclasses, implement this method as a generator to
maximize effectiveness.
"""
raise NotImplementedError
class DelegatingLexer(Lexer):
"""
This lexer takes two lexer as arguments. A root lexer and
a language lexer. First everything is scanned using the language
lexer, afterwards all ``Other`` tokens are lexed using the root
lexer.
The lexers from the ``template`` lexer package use this base lexer.
"""
def __init__(self, _root_lexer, _language_lexer, _needle=Other, **options):
self.root_lexer = _root_lexer(**options)
self.language_lexer = _language_lexer(**options)
self.needle = _needle
Lexer.__init__(self, **options)
def get_tokens_unprocessed(self, text):
buffered = ''
insertions = []
lng_buffer = []
for i, t, v in self.language_lexer.get_tokens_unprocessed(text):
if t is self.needle:
if lng_buffer:
insertions.append((len(buffered), lng_buffer))
lng_buffer = []
buffered += v
else:
lng_buffer.append((i, t, v))
if lng_buffer:
insertions.append((len(buffered), lng_buffer))
return do_insertions(insertions,
self.root_lexer.get_tokens_unprocessed(buffered))
#-------------------------------------------------------------------------------
# RegexLexer and ExtendedRegexLexer
#
class include(str):
"""
Indicates that a state should include rules from another state.
"""
pass
class combined(tuple):
"""
Indicates a state combined from multiple states.
"""
def __new__(cls, *args):
return tuple.__new__(cls, args)
def __init__(self, *args):
# tuple.__init__ doesn't do anything
pass
class _PseudoMatch(object):
"""
A pseudo match object constructed from a string.
"""
def __init__(self, start, text):
self._text = text
self._start = start
def start(self, arg=None):
return self._start
def end(self, arg=None):
return self._start + len(self._text)
def group(self, arg=None):
if arg:
raise IndexError('No such group')
return self._text
def groups(self):
return (self._text,)
def groupdict(self):
return {}
def bygroups(*args):
"""
Callback that yields multiple actions for each group in the match.
"""
def callback(lexer, match, ctx=None):
for i, action in enumerate(args):
if action is None:
continue
elif type(action) is _TokenType:
data = match.group(i + 1)
if data:
yield match.start(i + 1), action, data
else:
data = match.group(i + 1)
if data is not None:
if ctx:
ctx.pos = match.start(i + 1)
for item in action(lexer, _PseudoMatch(match.start(i + 1),
data), ctx):
if item:
yield item
if ctx:
ctx.pos = match.end()
return callback
class _This(object):
"""
Special singleton used for indicating the caller class.
Used by ``using``.
"""
this = _This()
def using(_other, **kwargs):
"""
Callback that processes the match with a different lexer.
The keyword arguments are forwarded to the lexer, except `state` which
is handled separately.
`state` specifies the state that the new lexer will start in, and can
be an enumerable such as ('root', 'inline', 'string') or a simple
string which is assumed to be on top of the root state.
Note: For that to work, `_other` must not be an `ExtendedRegexLexer`.
"""
gt_kwargs = {}
if 'state' in kwargs:
s = kwargs.pop('state')
if isinstance(s, (list, tuple)):
gt_kwargs['stack'] = s
else:
gt_kwargs['stack'] = ('root', s)
if _other is this:
def callback(lexer, match, ctx=None):
# if keyword arguments are given the callback
# function has to create a new lexer instance
if kwargs:
# XXX: cache that somehow
kwargs.update(lexer.options)
lx = lexer.__class__(**kwargs)
else:
lx = lexer
s = match.start()
for i, t, v in lx.get_tokens_unprocessed(match.group(), **gt_kwargs):
yield i + s, t, v
if ctx:
ctx.pos = match.end()
else:
def callback(lexer, match, ctx=None):
# XXX: cache that somehow
kwargs.update(lexer.options)
lx = _other(**kwargs)
s = match.start()
for i, t, v in lx.get_tokens_unprocessed(match.group(), **gt_kwargs):
yield i + s, t, v
if ctx:
ctx.pos = match.end()
return callback
class RegexLexerMeta(LexerMeta):
"""
Metaclass for RegexLexer, creates the self._tokens attribute from
self.tokens on the first instantiation.
"""
def _process_regex(cls, regex, rflags):
"""Preprocess the regular expression component of a token definition."""
return re.compile(regex, rflags).match
def _process_token(cls, token):
"""Preprocess the token component of a token definition."""
assert type(token) is _TokenType or callable(token), \
'token type must be simple type or callable, not %r' % (token,)
return token
def _process_new_state(cls, new_state, unprocessed, processed):
"""Preprocess the state transition action of a token definition."""
if isinstance(new_state, str):
# an existing state
if new_state == '#pop':
return -1
elif new_state in unprocessed:
return (new_state,)
elif new_state == '#push':
return new_state
elif new_state[:5] == '#pop:':
return -int(new_state[5:])
else:
assert False, 'unknown new state %r' % new_state
elif isinstance(new_state, combined):
# combine a new state from existing ones
tmp_state = '_tmp_%d' % cls._tmpname
cls._tmpname += 1
itokens = []
for istate in new_state:
assert istate != new_state, 'circular state ref %r' % istate
itokens.extend(cls._process_state(unprocessed,
processed, istate))
processed[tmp_state] = itokens
return (tmp_state,)
elif isinstance(new_state, tuple):
# push more than one state
for istate in new_state:
assert (istate in unprocessed or
istate in ('#pop', '#push')), \
'unknown new state ' + istate
return new_state
else:
assert False, 'unknown new state def %r' % new_state
def _process_state(cls, unprocessed, processed, state):
"""Preprocess a single state definition."""
assert type(state) is str, "wrong state name %r" % state
assert state[0] != '#', "invalid state name %r" % state
if state in processed:
return processed[state]
tokens = processed[state] = []
rflags = cls.flags
for tdef in unprocessed[state]:
if isinstance(tdef, include):
# it's a state reference
assert tdef != state, "circular state reference %r" % state
tokens.extend(cls._process_state(unprocessed, processed,
str(tdef)))
continue
assert type(tdef) is tuple, "wrong rule def %r" % tdef
try:
rex = cls._process_regex(tdef[0], rflags)
except Exception, err:
raise ValueError("uncompilable regex %r in state %r of %r: %s" %
(tdef[0], state, cls, err))
token = cls._process_token(tdef[1])
if len(tdef) == 2:
new_state = None
else:
new_state = cls._process_new_state(tdef[2],
unprocessed, processed)
tokens.append((rex, token, new_state))
return tokens
def process_tokendef(cls, name, tokendefs=None):
"""Preprocess a dictionary of token definitions."""
processed = cls._all_tokens[name] = {}
tokendefs = tokendefs or cls.tokens[name]
for state in tokendefs.keys():
cls._process_state(tokendefs, processed, state)
return processed
def __call__(cls, *args, **kwds):
"""Instantiate cls after preprocessing its token definitions."""
if '_tokens' not in cls.__dict__:
cls._all_tokens = {}
cls._tmpname = 0
if hasattr(cls, 'token_variants') and cls.token_variants:
# don't process yet
pass
else:
cls._tokens = cls.process_tokendef('', cls.tokens)
return type.__call__(cls, *args, **kwds)
class RegexLexer(Lexer):
"""
Base for simple stateful regular expression-based lexers.
Simplifies the lexing process so that you need only
provide a list of states and regular expressions.
"""
__metaclass__ = RegexLexerMeta
#: Flags for compiling the regular expressions.
#: Defaults to MULTILINE.
flags = re.MULTILINE
#: Dict of ``{'state': [(regex, tokentype, new_state), ...], ...}``
#:
#: The initial state is 'root'.
#: ``new_state`` can be omitted to signify no state transition.
#: If it is a string, the state is pushed on the stack and changed.
#: If it is a tuple of strings, all states are pushed on the stack and
#: the current state will be the topmost.
#: It can also be ``combined('state1', 'state2', ...)``
#: to signify a new, anonymous state combined from the rules of two
#: or more existing ones.
#: Furthermore, it can be '#pop' to signify going back one step in
#: the state stack, or '#push' to push the current state on the stack
#: again.
#:
#: The tuple can also be replaced with ``include('state')``, in which
#: case the rules from the state named by the string are included in the
#: current one.
tokens = {}
def get_tokens_unprocessed(self, text, stack=('root',)):
"""
Split ``text`` into (tokentype, text) pairs.
``stack`` is the inital stack (default: ``['root']``)
"""
pos = 0
tokendefs = self._tokens
statestack = list(stack)
statetokens = tokendefs[statestack[-1]]
while 1:
for rexmatch, action, new_state in statetokens:
m = rexmatch(text, pos)
if m:
if type(action) is _TokenType:
yield pos, action, m.group()
else:
for item in action(self, m):
yield item
pos = m.end()
if new_state is not None:
# state transition
if isinstance(new_state, tuple):
for state in new_state:
if state == '#pop':
statestack.pop()
elif state == '#push':
statestack.append(statestack[-1])
else:
statestack.append(state)
elif isinstance(new_state, int):
# pop
del statestack[new_state:]
elif new_state == '#push':
statestack.append(statestack[-1])
else:
assert False, "wrong state def: %r" % new_state
statetokens = tokendefs[statestack[-1]]
break
else:
try:
if text[pos] == '\n':
# at EOL, reset state to "root"
pos += 1
statestack = ['root']
statetokens = tokendefs['root']
yield pos, Text, u'\n'
continue
yield pos, Error, text[pos]
pos += 1
except IndexError:
break
class LexerContext(object):
"""
A helper object that holds lexer position data.
"""
def __init__(self, text, pos, stack=None, end=None):
self.text = text
self.pos = pos
self.end = end or len(text) # end=0 not supported ;-)
self.stack = stack or ['root']
def __repr__(self):
return 'LexerContext(%r, %r, %r)' % (
self.text, self.pos, self.stack)
class ExtendedRegexLexer(RegexLexer):
"""
A RegexLexer that uses a context object to store its state.
"""
def get_tokens_unprocessed(self, text=None, context=None):
"""
Split ``text`` into (tokentype, text) pairs.
If ``context`` is given, use this lexer context instead.
"""
tokendefs = self._tokens
if not context:
ctx = LexerContext(text, 0)
statetokens = tokendefs['root']
else:
ctx = context
statetokens = tokendefs[ctx.stack[-1]]
text = ctx.text
while 1:
for rexmatch, action, new_state in statetokens:
m = rexmatch(text, ctx.pos, ctx.end)
if m:
if type(action) is _TokenType:
yield ctx.pos, action, m.group()
ctx.pos = m.end()
else:
for item in action(self, m, ctx):
yield item
if not new_state:
# altered the state stack?
statetokens = tokendefs[ctx.stack[-1]]
# CAUTION: callback must set ctx.pos!
if new_state is not None:
# state transition
if isinstance(new_state, tuple):
ctx.stack.extend(new_state)
elif isinstance(new_state, int):
# pop
del ctx.stack[new_state:]
elif new_state == '#push':
ctx.stack.append(ctx.stack[-1])
else:
assert False, "wrong state def: %r" % new_state
statetokens = tokendefs[ctx.stack[-1]]
break
else:
try:
if ctx.pos >= ctx.end:
break
if text[ctx.pos] == '\n':
# at EOL, reset state to "root"
ctx.pos += 1
ctx.stack = ['root']
statetokens = tokendefs['root']
yield ctx.pos, Text, u'\n'
continue
yield ctx.pos, Error, text[ctx.pos]
ctx.pos += 1
except IndexError:
break
def do_insertions(insertions, tokens):
"""
Helper for lexers which must combine the results of several
sublexers.
``insertions`` is a list of ``(index, itokens)`` pairs.
Each ``itokens`` iterable should be inserted at position
``index`` into the token stream given by the ``tokens``
argument.
The result is a combined token stream.
TODO: clean up the code here.
"""
insertions = iter(insertions)
try:
index, itokens = insertions.next()
except StopIteration:
# no insertions
for item in tokens:
yield item
return
realpos = None
insleft = True
# iterate over the token stream where we want to insert
# the tokens from the insertion list.
for i, t, v in tokens:
# first iteration. store the postition of first item
if realpos is None:
realpos = i
oldi = 0
while insleft and i + len(v) >= index:
tmpval = v[oldi:index - i]
yield realpos, t, tmpval
realpos += len(tmpval)
for it_index, it_token, it_value in itokens:
yield realpos, it_token, it_value
realpos += len(it_value)
oldi = index - i
try:
index, itokens = insertions.next()
except StopIteration:
insleft = False
break # not strictly necessary
yield realpos, t, v[oldi:]
realpos += len(v) - oldi
# leftover tokens
while insleft:
# no normal tokens, set realpos to zero
realpos = realpos or 0
for p, t, v in itokens:
yield realpos, t, v
realpos += len(v)
try:
index, itokens = insertions.next()
except StopIteration:
insleft = False
break # not strictly necessary