summaryrefslogtreecommitdiffhomepage
path: root/scripts/jinja2/runtime.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/jinja2/runtime.py')
-rw-r--r--scripts/jinja2/runtime.py1166
1 files changed, 802 insertions, 364 deletions
diff --git a/scripts/jinja2/runtime.py b/scripts/jinja2/runtime.py
index 792fb9a..2346cf2 100644
--- a/scripts/jinja2/runtime.py
+++ b/scripts/jinja2/runtime.py
@@ -1,60 +1,116 @@
-# -*- coding: utf-8 -*-
-"""
- jinja2.runtime
- ~~~~~~~~~~~~~~
-
- Runtime helpers.
-
- :copyright: (c) 2010 by the Jinja Team.
- :license: BSD.
-"""
+"""The runtime functions and state used by compiled templates."""
+import functools
import sys
-
+import typing as t
+from collections import abc
from itertools import chain
-from jinja2.nodes import EvalContext, _context_function_types
-from jinja2.utils import Markup, soft_unicode, escape, missing, concat, \
- internalcode, object_type_repr
-from jinja2.exceptions import UndefinedError, TemplateRuntimeError, \
- TemplateNotFound
-from jinja2._compat import imap, text_type, iteritems, \
- implements_iterator, implements_to_string, string_types, PY2
+from markupsafe import escape # noqa: F401
+from markupsafe import Markup
+from markupsafe import soft_str
+
+from .async_utils import auto_aiter
+from .async_utils import auto_await # noqa: F401
+from .exceptions import TemplateNotFound # noqa: F401
+from .exceptions import TemplateRuntimeError # noqa: F401
+from .exceptions import UndefinedError
+from .nodes import EvalContext
+from .utils import _PassArg
+from .utils import concat
+from .utils import internalcode
+from .utils import missing
+from .utils import Namespace # noqa: F401
+from .utils import object_type_repr
+from .utils import pass_eval_context
+
+V = t.TypeVar("V")
+F = t.TypeVar("F", bound=t.Callable[..., t.Any])
+
+if t.TYPE_CHECKING:
+ import logging
+ import typing_extensions as te
+ from .environment import Environment
+
+ class LoopRenderFunc(te.Protocol):
+ def __call__(
+ self,
+ reciter: t.Iterable[V],
+ loop_render_func: "LoopRenderFunc",
+ depth: int = 0,
+ ) -> str:
+ ...
-# these variables are exported to the template runtime
-__all__ = ['LoopContext', 'TemplateReference', 'Macro', 'Markup',
- 'TemplateRuntimeError', 'missing', 'concat', 'escape',
- 'markup_join', 'unicode_join', 'to_string', 'identity',
- 'TemplateNotFound', 'make_logging_undefined']
-#: the name of the function that is used to convert something into
-#: a string. We can just use the text type here.
-to_string = text_type
-
-#: the identity function. Useful for certain things in the environment
-identity = lambda x: x
-
-_last_iteration = object()
+# these variables are exported to the template runtime
+exported = [
+ "LoopContext",
+ "TemplateReference",
+ "Macro",
+ "Markup",
+ "TemplateRuntimeError",
+ "missing",
+ "concat",
+ "escape",
+ "markup_join",
+ "str_join",
+ "identity",
+ "TemplateNotFound",
+ "Namespace",
+ "Undefined",
+ "internalcode",
+]
+async_exported = [
+ "AsyncLoopContext",
+ "auto_aiter",
+ "auto_await",
+]
+
+
+def identity(x: V) -> V:
+ """Returns its argument. Useful for certain things in the
+ environment.
+ """
+ return x
-def markup_join(seq):
- """Concatenation that escapes if necessary and converts to unicode."""
+def markup_join(seq: t.Iterable[t.Any]) -> str:
+ """Concatenation that escapes if necessary and converts to string."""
buf = []
- iterator = imap(soft_unicode, seq)
+ iterator = map(soft_str, seq)
for arg in iterator:
buf.append(arg)
- if hasattr(arg, '__html__'):
- return Markup(u'').join(chain(buf, iterator))
+ if hasattr(arg, "__html__"):
+ return Markup("").join(chain(buf, iterator))
return concat(buf)
-def unicode_join(seq):
- """Simple args to unicode conversion and concatenation."""
- return concat(imap(text_type, seq))
+def str_join(seq: t.Iterable[t.Any]) -> str:
+ """Simple args to string conversion and concatenation."""
+ return concat(map(str, seq))
+
+
+def unicode_join(seq: t.Iterable[t.Any]) -> str:
+ import warnings
+
+ warnings.warn(
+ "This template must be recompiled with at least Jinja 3.0, or"
+ " it will fail in Jinja 3.1.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ return str_join(seq)
-def new_context(environment, template_name, blocks, vars=None,
- shared=None, globals=None, locals=None):
- """Internal helper to for context creation."""
+def new_context(
+ environment: "Environment",
+ template_name: t.Optional[str],
+ blocks: t.Dict[str, t.Callable[["Context"], t.Iterator[str]]],
+ vars: t.Optional[t.Dict[str, t.Any]] = None,
+ shared: bool = False,
+ globals: t.Optional[t.MutableMapping[str, t.Any]] = None,
+ locals: t.Optional[t.Mapping[str, t.Any]] = None,
+) -> "Context":
+ """Internal helper for context creation."""
if vars is None:
vars = {}
if shared:
@@ -66,30 +122,38 @@ def new_context(environment, template_name, blocks, vars=None,
# we don't want to modify the dict passed
if shared:
parent = dict(parent)
- for key, value in iteritems(locals):
- if key[:2] == 'l_' and value is not missing:
- parent[key[2:]] = value
- return Context(environment, parent, template_name, blocks)
+ for key, value in locals.items():
+ if value is not missing:
+ parent[key] = value
+ return environment.context_class(
+ environment, parent, template_name, blocks, globals=globals
+ )
-class TemplateReference(object):
+class TemplateReference:
"""The `self` in templates."""
- def __init__(self, context):
+ def __init__(self, context: "Context") -> None:
self.__context = context
- def __getitem__(self, name):
+ def __getitem__(self, name: str) -> t.Any:
blocks = self.__context.blocks[name]
return BlockReference(name, self.__context, blocks, 0)
- def __repr__(self):
- return '<%s %r>' % (
- self.__class__.__name__,
- self.__context.name
- )
+ def __repr__(self) -> str:
+ return f"<{type(self).__name__} {self.__context.name!r}>"
+
+
+def _dict_method_all(dict_method: F) -> F:
+ @functools.wraps(dict_method)
+ def f_all(self: "Context") -> t.Any:
+ return dict_method(self.get_all())
+ return t.cast(F, f_all)
-class Context(object):
+
+@abc.Mapping.register
+class Context:
"""The template context holds the variables of a template. It stores the
values passed to the template and also the names the template exports.
Creating instances is neither supported nor useful as it's created
@@ -99,7 +163,7 @@ class Context(object):
The context is immutable. Modifications on :attr:`parent` **must not**
happen and modifications on :attr:`vars` are allowed from generated
template code only. Template filters and global functions marked as
- :func:`contextfunction`\s get the active context passed as first argument
+ :func:`pass_context` get the active context passed as first argument
and are allowed to access the context read-only.
The template context supports read only dict operations (`get`,
@@ -108,341 +172,672 @@ class Context(object):
method that doesn't fail with a `KeyError` but returns an
:class:`Undefined` object for missing variables.
"""
- __slots__ = ('parent', 'vars', 'environment', 'eval_ctx', 'exported_vars',
- 'name', 'blocks', '__weakref__')
- def __init__(self, environment, parent, name, blocks):
+ _legacy_resolve_mode: t.ClassVar[bool] = False
+
+ def __init_subclass__(cls) -> None:
+ if "resolve_or_missing" in cls.__dict__:
+ # If the subclass overrides resolve_or_missing it opts in to
+ # modern mode no matter what.
+ cls._legacy_resolve_mode = False
+ elif "resolve" in cls.__dict__ or cls._legacy_resolve_mode:
+ # If the subclass overrides resolve, or if its base is
+ # already in legacy mode, warn about legacy behavior.
+ import warnings
+
+ warnings.warn(
+ "Overriding 'resolve' is deprecated and will not have"
+ " the expected behavior in Jinja 3.1. Override"
+ " 'resolve_or_missing' instead ",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ cls._legacy_resolve_mode = True
+
+ def __init__(
+ self,
+ environment: "Environment",
+ parent: t.Dict[str, t.Any],
+ name: t.Optional[str],
+ blocks: t.Dict[str, t.Callable[["Context"], t.Iterator[str]]],
+ globals: t.Optional[t.MutableMapping[str, t.Any]] = None,
+ ):
self.parent = parent
- self.vars = {}
- self.environment = environment
+ self.vars: t.Dict[str, t.Any] = {}
+ self.environment: "Environment" = environment
self.eval_ctx = EvalContext(self.environment, name)
- self.exported_vars = set()
+ self.exported_vars: t.Set[str] = set()
self.name = name
+ self.globals_keys = set() if globals is None else set(globals)
# create the initial mapping of blocks. Whenever template inheritance
# takes place the runtime will update this mapping with the new blocks
# from the template.
- self.blocks = dict((k, [v]) for k, v in iteritems(blocks))
+ self.blocks = {k: [v] for k, v in blocks.items()}
- def super(self, name, current):
+ def super(
+ self, name: str, current: t.Callable[["Context"], t.Iterator[str]]
+ ) -> t.Union["BlockReference", "Undefined"]:
"""Render a parent block."""
try:
blocks = self.blocks[name]
index = blocks.index(current) + 1
blocks[index]
except LookupError:
- return self.environment.undefined('there is no parent block '
- 'called %r.' % name,
- name='super')
+ return self.environment.undefined(
+ f"there is no parent block called {name!r}.", name="super"
+ )
return BlockReference(name, self, blocks, index)
- def get(self, key, default=None):
- """Returns an item from the template context, if it doesn't exist
- `default` is returned.
+ def get(self, key: str, default: t.Any = None) -> t.Any:
+ """Look up a variable by name, or return a default if the key is
+ not found.
+
+ :param key: The variable name to look up.
+ :param default: The value to return if the key is not found.
"""
try:
return self[key]
except KeyError:
return default
- def resolve(self, key):
- """Looks up a variable like `__getitem__` or `get` but returns an
- :class:`Undefined` object with the name of the name looked up.
+ def resolve(self, key: str) -> t.Union[t.Any, "Undefined"]:
+ """Look up a variable by name, or return an :class:`Undefined`
+ object if the key is not found.
+
+ If you need to add custom behavior, override
+ :meth:`resolve_or_missing`, not this method. The various lookup
+ functions use that method, not this one.
+
+ :param key: The variable name to look up.
"""
+ if self._legacy_resolve_mode:
+ if key in self.vars:
+ return self.vars[key]
+
+ if key in self.parent:
+ return self.parent[key]
+
+ return self.environment.undefined(name=key)
+
+ rv = self.resolve_or_missing(key)
+
+ if rv is missing:
+ return self.environment.undefined(name=key)
+
+ return rv
+
+ def resolve_or_missing(self, key: str) -> t.Any:
+ """Look up a variable by name, or return a ``missing`` sentinel
+ if the key is not found.
+
+ Override this method to add custom lookup behavior.
+ :meth:`resolve`, :meth:`get`, and :meth:`__getitem__` use this
+ method. Don't call this method directly.
+
+ :param key: The variable name to look up.
+ """
+ if self._legacy_resolve_mode:
+ rv = self.resolve(key)
+
+ if isinstance(rv, Undefined):
+ return missing
+
+ return rv
+
if key in self.vars:
return self.vars[key]
+
if key in self.parent:
return self.parent[key]
- return self.environment.undefined(name=key)
- def get_exported(self):
+ return missing
+
+ def get_exported(self) -> t.Dict[str, t.Any]:
"""Get a new dict with the exported variables."""
- return dict((k, self.vars[k]) for k in self.exported_vars)
+ return {k: self.vars[k] for k in self.exported_vars}
- def get_all(self):
- """Return a copy of the complete context as dict including the
- exported variables.
+ def get_all(self) -> t.Dict[str, t.Any]:
+ """Return the complete context as dict including the exported
+ variables. For optimizations reasons this might not return an
+ actual copy so be careful with using it.
"""
+ if not self.vars:
+ return self.parent
+ if not self.parent:
+ return self.vars
return dict(self.parent, **self.vars)
@internalcode
- def call(__self, __obj, *args, **kwargs):
+ def call(
+ __self, __obj: t.Callable, *args: t.Any, **kwargs: t.Any # noqa: B902
+ ) -> t.Union[t.Any, "Undefined"]:
"""Call the callable with the arguments and keyword arguments
provided but inject the active context or environment as first
- argument if the callable is a :func:`contextfunction` or
- :func:`environmentfunction`.
+ argument if the callable has :func:`pass_context` or
+ :func:`pass_environment`.
"""
if __debug__:
__traceback_hide__ = True # noqa
# Allow callable classes to take a context
- fn = __obj.__call__
- for fn_type in ('contextfunction',
- 'evalcontextfunction',
- 'environmentfunction'):
- if hasattr(fn, fn_type):
- __obj = fn
- break
-
- if isinstance(__obj, _context_function_types):
- if getattr(__obj, 'contextfunction', 0):
- args = (__self,) + args
- elif getattr(__obj, 'evalcontextfunction', 0):
- args = (__self.eval_ctx,) + args
- elif getattr(__obj, 'environmentfunction', 0):
- args = (__self.environment,) + args
+ if (
+ hasattr(__obj, "__call__") # noqa: B004
+ and _PassArg.from_obj(__obj.__call__) is not None # type: ignore
+ ):
+ __obj = __obj.__call__ # type: ignore
+
+ pass_arg = _PassArg.from_obj(__obj)
+
+ if pass_arg is _PassArg.context:
+ # the active context should have access to variables set in
+ # loops and blocks without mutating the context itself
+ if kwargs.get("_loop_vars"):
+ __self = __self.derived(kwargs["_loop_vars"])
+ if kwargs.get("_block_vars"):
+ __self = __self.derived(kwargs["_block_vars"])
+ args = (__self,) + args
+ elif pass_arg is _PassArg.eval_context:
+ args = (__self.eval_ctx,) + args
+ elif pass_arg is _PassArg.environment:
+ args = (__self.environment,) + args
+
+ kwargs.pop("_block_vars", None)
+ kwargs.pop("_loop_vars", None)
+
try:
return __obj(*args, **kwargs)
except StopIteration:
- return __self.environment.undefined('value was undefined because '
- 'a callable raised a '
- 'StopIteration exception')
-
- def derived(self, locals=None):
- """Internal helper function to create a derived context."""
- context = new_context(self.environment, self.name, {},
- self.parent, True, None, locals)
- context.vars.update(self.vars)
+ return __self.environment.undefined(
+ "value was undefined because a callable raised a"
+ " StopIteration exception"
+ )
+
+ def derived(self, locals: t.Optional[t.Dict[str, t.Any]] = None) -> "Context":
+ """Internal helper function to create a derived context. This is
+ used in situations where the system needs a new context in the same
+ template that is independent.
+ """
+ context = new_context(
+ self.environment, self.name, {}, self.get_all(), True, None, locals
+ )
context.eval_ctx = self.eval_ctx
- context.blocks.update((k, list(v)) for k, v in iteritems(self.blocks))
+ context.blocks.update((k, list(v)) for k, v in self.blocks.items())
return context
- def _all(meth):
- proxy = lambda self: getattr(self.get_all(), meth)()
- proxy.__doc__ = getattr(dict, meth).__doc__
- proxy.__name__ = meth
- return proxy
-
- keys = _all('keys')
- values = _all('values')
- items = _all('items')
+ keys = _dict_method_all(dict.keys)
+ values = _dict_method_all(dict.values)
+ items = _dict_method_all(dict.items)
- # not available on python 3
- if PY2:
- iterkeys = _all('iterkeys')
- itervalues = _all('itervalues')
- iteritems = _all('iteritems')
- del _all
-
- def __contains__(self, name):
+ def __contains__(self, name: str) -> bool:
return name in self.vars or name in self.parent
- def __getitem__(self, key):
- """Lookup a variable or raise `KeyError` if the variable is
- undefined.
+ def __getitem__(self, key: str) -> t.Any:
+ """Look up a variable by name with ``[]`` syntax, or raise a
+ ``KeyError`` if the key is not found.
"""
- item = self.resolve(key)
- if isinstance(item, Undefined):
- raise KeyError(key)
- return item
+ item = self.resolve_or_missing(key)
- def __repr__(self):
- return '<%s %s of %r>' % (
- self.__class__.__name__,
- repr(self.get_all()),
- self.name
- )
+ if item is missing:
+ raise KeyError(key)
+ return item
-# register the context as mapping if possible
-try:
- from collections import Mapping
- Mapping.register(Context)
-except ImportError:
- pass
+ def __repr__(self) -> str:
+ return f"<{type(self).__name__} {self.get_all()!r} of {self.name!r}>"
-class BlockReference(object):
+class BlockReference:
"""One block on a template reference."""
- def __init__(self, name, context, stack, depth):
+ def __init__(
+ self,
+ name: str,
+ context: "Context",
+ stack: t.List[t.Callable[["Context"], t.Iterator[str]]],
+ depth: int,
+ ) -> None:
self.name = name
self._context = context
self._stack = stack
self._depth = depth
@property
- def super(self):
+ def super(self) -> t.Union["BlockReference", "Undefined"]:
"""Super the block."""
if self._depth + 1 >= len(self._stack):
- return self._context.environment. \
- undefined('there is no parent block called %r.' %
- self.name, name='super')
- return BlockReference(self.name, self._context, self._stack,
- self._depth + 1)
+ return self._context.environment.undefined(
+ f"there is no parent block called {self.name!r}.", name="super"
+ )
+ return BlockReference(self.name, self._context, self._stack, self._depth + 1)
@internalcode
- def __call__(self):
+ async def _async_call(self) -> str:
+ rv = concat(
+ [x async for x in self._stack[self._depth](self._context)] # type: ignore
+ )
+
+ if self._context.eval_ctx.autoescape:
+ return Markup(rv)
+
+ return rv
+
+ @internalcode
+ def __call__(self) -> str:
+ if self._context.environment.is_async:
+ return self._async_call() # type: ignore
+
rv = concat(self._stack[self._depth](self._context))
+
if self._context.eval_ctx.autoescape:
- rv = Markup(rv)
+ return Markup(rv)
+
return rv
-class LoopContext(object):
- """A loop context for dynamic iteration."""
+class LoopContext:
+ """A wrapper iterable for dynamic ``for`` loops, with information
+ about the loop and iteration.
+ """
- def __init__(self, iterable, recurse=None, depth0=0):
- self._iterator = iter(iterable)
+ #: Current iteration of the loop, starting at 0.
+ index0 = -1
+
+ _length: t.Optional[int] = None
+ _after: t.Any = missing
+ _current: t.Any = missing
+ _before: t.Any = missing
+ _last_changed_value: t.Any = missing
+
+ def __init__(
+ self,
+ iterable: t.Iterable[V],
+ undefined: t.Type["Undefined"],
+ recurse: t.Optional["LoopRenderFunc"] = None,
+ depth0: int = 0,
+ ) -> None:
+ """
+ :param iterable: Iterable to wrap.
+ :param undefined: :class:`Undefined` class to use for next and
+ previous items.
+ :param recurse: The function to render the loop body when the
+ loop is marked recursive.
+ :param depth0: Incremented when looping recursively.
+ """
+ self._iterable = iterable
+ self._iterator = self._to_iterator(iterable)
+ self._undefined = undefined
self._recurse = recurse
- self._after = self._safe_next()
- self.index0 = -1
+ #: How many levels deep a recursive loop currently is, starting at 0.
self.depth0 = depth0
- # try to get the length of the iterable early. This must be done
- # here because there are some broken iterators around where there
- # __len__ is the number of iterations left (i'm looking at your
- # listreverseiterator!).
+ @staticmethod
+ def _to_iterator(iterable: t.Iterable[V]) -> t.Iterator[V]:
+ return iter(iterable)
+
+ @property
+ def length(self) -> int:
+ """Length of the iterable.
+
+ If the iterable is a generator or otherwise does not have a
+ size, it is eagerly evaluated to get a size.
+ """
+ if self._length is not None:
+ return self._length
+
try:
- self._length = len(iterable)
- except (TypeError, AttributeError):
- self._length = None
+ self._length = len(self._iterable) # type: ignore
+ except TypeError:
+ iterable = list(self._iterator)
+ self._iterator = self._to_iterator(iterable)
+ self._length = len(iterable) + self.index + (self._after is not missing)
+
+ return self._length
+
+ def __len__(self) -> int:
+ return self.length
+
+ @property
+ def depth(self) -> int:
+ """How many levels deep a recursive loop currently is, starting at 1."""
+ return self.depth0 + 1
+
+ @property
+ def index(self) -> int:
+ """Current iteration of the loop, starting at 1."""
+ return self.index0 + 1
+
+ @property
+ def revindex0(self) -> int:
+ """Number of iterations from the end of the loop, ending at 0.
+
+ Requires calculating :attr:`length`.
+ """
+ return self.length - self.index
+
+ @property
+ def revindex(self) -> int:
+ """Number of iterations from the end of the loop, ending at 1.
+
+ Requires calculating :attr:`length`.
+ """
+ return self.length - self.index0
+
+ @property
+ def first(self) -> bool:
+ """Whether this is the first iteration of the loop."""
+ return self.index0 == 0
+
+ def _peek_next(self) -> t.Any:
+ """Return the next element in the iterable, or :data:`missing`
+ if the iterable is exhausted. Only peeks one item ahead, caching
+ the result in :attr:`_last` for use in subsequent checks. The
+ cache is reset when :meth:`__next__` is called.
+ """
+ if self._after is not missing:
+ return self._after
+
+ self._after = next(self._iterator, missing)
+ return self._after
+
+ @property
+ def last(self) -> bool:
+ """Whether this is the last iteration of the loop.
+
+ Causes the iterable to advance early. See
+ :func:`itertools.groupby` for issues this can cause.
+ The :func:`groupby` filter avoids that issue.
+ """
+ return self._peek_next() is missing
- def cycle(self, *args):
- """Cycles among the arguments with the current loop index."""
+ @property
+ def previtem(self) -> t.Union[t.Any, "Undefined"]:
+ """The item in the previous iteration. Undefined during the
+ first iteration.
+ """
+ if self.first:
+ return self._undefined("there is no previous item")
+
+ return self._before
+
+ @property
+ def nextitem(self) -> t.Union[t.Any, "Undefined"]:
+ """The item in the next iteration. Undefined during the last
+ iteration.
+
+ Causes the iterable to advance early. See
+ :func:`itertools.groupby` for issues this can cause.
+ The :func:`jinja-filters.groupby` filter avoids that issue.
+ """
+ rv = self._peek_next()
+
+ if rv is missing:
+ return self._undefined("there is no next item")
+
+ return rv
+
+ def cycle(self, *args: V) -> V:
+ """Return a value from the given args, cycling through based on
+ the current :attr:`index0`.
+
+ :param args: One or more values to cycle through.
+ """
if not args:
- raise TypeError('no items for cycling given')
+ raise TypeError("no items for cycling given")
+
return args[self.index0 % len(args)]
- first = property(lambda x: x.index0 == 0)
- last = property(lambda x: x._after is _last_iteration)
- index = property(lambda x: x.index0 + 1)
- revindex = property(lambda x: x.length - x.index0)
- revindex0 = property(lambda x: x.length - x.index)
- depth = property(lambda x: x.depth0 + 1)
+ def changed(self, *value: t.Any) -> bool:
+ """Return ``True`` if previously called with a different value
+ (including when called for the first time).
- def __len__(self):
- return self.length
+ :param value: One or more values to compare to the last call.
+ """
+ if self._last_changed_value != value:
+ self._last_changed_value = value
+ return True
- def __iter__(self):
- return LoopContextIterator(self)
+ return False
- def _safe_next(self):
- try:
- return next(self._iterator)
- except StopIteration:
- return _last_iteration
+ def __iter__(self) -> "LoopContext":
+ return self
+
+ def __next__(self) -> t.Tuple[t.Any, "LoopContext"]:
+ if self._after is not missing:
+ rv = self._after
+ self._after = missing
+ else:
+ rv = next(self._iterator)
+
+ self.index0 += 1
+ self._before = self._current
+ self._current = rv
+ return rv, self
@internalcode
- def loop(self, iterable):
+ def __call__(self, iterable: t.Iterable[V]) -> str:
+ """When iterating over nested data, render the body of the loop
+ recursively with the given inner iterable data.
+
+ The loop must have the ``recursive`` marker for this to work.
+ """
if self._recurse is None:
- raise TypeError('Tried to call non recursive loop. Maybe you '
- "forgot the 'recursive' modifier.")
- return self._recurse(iterable, self._recurse, self.depth0 + 1)
+ raise TypeError(
+ "The loop must have the 'recursive' marker to be called recursively."
+ )
- # a nifty trick to enhance the error message if someone tried to call
- # the the loop without or with too many arguments.
- __call__ = loop
- del loop
+ return self._recurse(iterable, self._recurse, depth=self.depth)
+
+ def __repr__(self) -> str:
+ return f"<{type(self).__name__} {self.index}/{self.length}>"
+
+
+class AsyncLoopContext(LoopContext):
+ _iterator: t.AsyncIterator[t.Any] # type: ignore
+
+ @staticmethod
+ def _to_iterator( # type: ignore
+ iterable: t.Union[t.Iterable[V], t.AsyncIterable[V]]
+ ) -> t.AsyncIterator[V]:
+ return auto_aiter(iterable)
@property
- def length(self):
- if self._length is None:
- # if was not possible to get the length of the iterator when
- # the loop context was created (ie: iterating over a generator)
- # we have to convert the iterable into a sequence and use the
- # length of that + the number of iterations so far.
- iterable = tuple(self._iterator)
- self._iterator = iter(iterable)
- iterations_done = self.index0 + 2
- self._length = len(iterable) + iterations_done
+ async def length(self) -> int: # type: ignore
+ if self._length is not None:
+ return self._length
+
+ try:
+ self._length = len(self._iterable) # type: ignore
+ except TypeError:
+ iterable = [x async for x in self._iterator]
+ self._iterator = self._to_iterator(iterable)
+ self._length = len(iterable) + self.index + (self._after is not missing)
+
return self._length
- def __repr__(self):
- return '<%s %r/%r>' % (
- self.__class__.__name__,
- self.index,
- self.length
- )
+ @property
+ async def revindex0(self) -> int: # type: ignore
+ return await self.length - self.index
+
+ @property
+ async def revindex(self) -> int: # type: ignore
+ return await self.length - self.index0
+
+ async def _peek_next(self) -> t.Any:
+ if self._after is not missing:
+ return self._after
+
+ try:
+ self._after = await self._iterator.__anext__()
+ except StopAsyncIteration:
+ self._after = missing
+
+ return self._after
+ @property
+ async def last(self) -> bool: # type: ignore
+ return await self._peek_next() is missing
-@implements_iterator
-class LoopContextIterator(object):
- """The iterator for a loop context."""
- __slots__ = ('context',)
+ @property
+ async def nextitem(self) -> t.Union[t.Any, "Undefined"]:
+ rv = await self._peek_next()
- def __init__(self, context):
- self.context = context
+ if rv is missing:
+ return self._undefined("there is no next item")
- def __iter__(self):
+ return rv
+
+ def __aiter__(self) -> "AsyncLoopContext":
return self
- def __next__(self):
- ctx = self.context
- ctx.index0 += 1
- if ctx._after is _last_iteration:
- raise StopIteration()
- next_elem = ctx._after
- ctx._after = ctx._safe_next()
- return next_elem, ctx
+ async def __anext__(self) -> t.Tuple[t.Any, "AsyncLoopContext"]:
+ if self._after is not missing:
+ rv = self._after
+ self._after = missing
+ else:
+ rv = await self._iterator.__anext__()
+ self.index0 += 1
+ self._before = self._current
+ self._current = rv
+ return rv, self
-class Macro(object):
+
+class Macro:
"""Wraps a macro function."""
- def __init__(self, environment, func, name, arguments, defaults,
- catch_kwargs, catch_varargs, caller):
+ def __init__(
+ self,
+ environment: "Environment",
+ func: t.Callable[..., str],
+ name: str,
+ arguments: t.List[str],
+ catch_kwargs: bool,
+ catch_varargs: bool,
+ caller: bool,
+ default_autoescape: t.Optional[bool] = None,
+ ):
self._environment = environment
self._func = func
self._argument_count = len(arguments)
self.name = name
self.arguments = arguments
- self.defaults = defaults
self.catch_kwargs = catch_kwargs
self.catch_varargs = catch_varargs
self.caller = caller
+ self.explicit_caller = "caller" in arguments
+
+ if default_autoescape is None:
+ if callable(environment.autoescape):
+ default_autoescape = environment.autoescape(None)
+ else:
+ default_autoescape = environment.autoescape
+
+ self._default_autoescape = default_autoescape
@internalcode
- def __call__(self, *args, **kwargs):
+ @pass_eval_context
+ def __call__(self, *args: t.Any, **kwargs: t.Any) -> str:
+ # This requires a bit of explanation, In the past we used to
+ # decide largely based on compile-time information if a macro is
+ # safe or unsafe. While there was a volatile mode it was largely
+ # unused for deciding on escaping. This turns out to be
+ # problematic for macros because whether a macro is safe depends not
+ # on the escape mode when it was defined, but rather when it was used.
+ #
+ # Because however we export macros from the module system and
+ # there are historic callers that do not pass an eval context (and
+ # will continue to not pass one), we need to perform an instance
+ # check here.
+ #
+ # This is considered safe because an eval context is not a valid
+ # argument to callables otherwise anyway. Worst case here is
+ # that if no eval context is passed we fall back to the compile
+ # time autoescape flag.
+ if args and isinstance(args[0], EvalContext):
+ autoescape = args[0].autoescape
+ args = args[1:]
+ else:
+ autoescape = self._default_autoescape
+
# try to consume the positional arguments
- arguments = list(args[:self._argument_count])
+ arguments = list(args[: self._argument_count])
off = len(arguments)
+ # For information why this is necessary refer to the handling
+ # of caller in the `macro_body` handler in the compiler.
+ found_caller = False
+
# if the number of arguments consumed is not the number of
# arguments expected we start filling in keyword arguments
# and defaults.
if off != self._argument_count:
- for idx, name in enumerate(self.arguments[len(arguments):]):
+ for name in self.arguments[len(arguments) :]:
try:
value = kwargs.pop(name)
except KeyError:
- try:
- value = self.defaults[idx - self._argument_count + off]
- except IndexError:
- value = self._environment.undefined(
- 'parameter %r was not provided' % name, name=name)
+ value = missing
+ if name == "caller":
+ found_caller = True
arguments.append(value)
+ else:
+ found_caller = self.explicit_caller
# it's important that the order of these arguments does not change
# if not also changed in the compiler's `function_scoping` method.
# the order is caller, keyword arguments, positional arguments!
- if self.caller:
- caller = kwargs.pop('caller', None)
+ if self.caller and not found_caller:
+ caller = kwargs.pop("caller", None)
if caller is None:
- caller = self._environment.undefined('No caller defined',
- name='caller')
+ caller = self._environment.undefined("No caller defined", name="caller")
arguments.append(caller)
+
if self.catch_kwargs:
arguments.append(kwargs)
elif kwargs:
- raise TypeError('macro %r takes no keyword argument %r' %
- (self.name, next(iter(kwargs))))
+ if "caller" in kwargs:
+ raise TypeError(
+ f"macro {self.name!r} was invoked with two values for the special"
+ " caller argument. This is most likely a bug."
+ )
+ raise TypeError(
+ f"macro {self.name!r} takes no keyword argument {next(iter(kwargs))!r}"
+ )
if self.catch_varargs:
- arguments.append(args[self._argument_count:])
+ arguments.append(args[self._argument_count :])
elif len(args) > self._argument_count:
- raise TypeError('macro %r takes not more than %d argument(s)' %
- (self.name, len(self.arguments)))
- return self._func(*arguments)
-
- def __repr__(self):
- return '<%s %s>' % (
- self.__class__.__name__,
- self.name is None and 'anonymous' or repr(self.name)
- )
+ raise TypeError(
+ f"macro {self.name!r} takes not more than"
+ f" {len(self.arguments)} argument(s)"
+ )
+
+ return self._invoke(arguments, autoescape)
+
+ async def _async_invoke(self, arguments: t.List[t.Any], autoescape: bool) -> str:
+ rv = await self._func(*arguments) # type: ignore
+
+ if autoescape:
+ return Markup(rv)
+
+ return rv # type: ignore
+
+ def _invoke(self, arguments: t.List[t.Any], autoescape: bool) -> str:
+ if self._environment.is_async:
+ return self._async_invoke(arguments, autoescape) # type: ignore
+
+ rv = self._func(*arguments)
+ if autoescape:
+ rv = Markup(rv)
+
+ return rv
+
+ def __repr__(self) -> str:
+ name = "anonymous" if self.name is None else repr(self.name)
+ return f"<{type(self).__name__} {name}>"
-@implements_to_string
-class Undefined(object):
+
+class Undefined:
"""The default undefined type. This undefined type can be printed and
iterated over, but every other access will raise an :exc:`UndefinedError`:
@@ -454,80 +849,109 @@ class Undefined(object):
>>> foo + 42
Traceback (most recent call last):
...
- UndefinedError: 'foo' is undefined
+ jinja2.exceptions.UndefinedError: 'foo' is undefined
"""
- __slots__ = ('_undefined_hint', '_undefined_obj', '_undefined_name',
- '_undefined_exception')
- def __init__(self, hint=None, obj=missing, name=None, exc=UndefinedError):
+ __slots__ = (
+ "_undefined_hint",
+ "_undefined_obj",
+ "_undefined_name",
+ "_undefined_exception",
+ )
+
+ def __init__(
+ self,
+ hint: t.Optional[str] = None,
+ obj: t.Any = missing,
+ name: t.Optional[str] = None,
+ exc: t.Type[TemplateRuntimeError] = UndefinedError,
+ ) -> None:
self._undefined_hint = hint
self._undefined_obj = obj
self._undefined_name = name
self._undefined_exception = exc
+ @property
+ def _undefined_message(self) -> str:
+ """Build a message about the undefined value based on how it was
+ accessed.
+ """
+ if self._undefined_hint:
+ return self._undefined_hint
+
+ if self._undefined_obj is missing:
+ return f"{self._undefined_name!r} is undefined"
+
+ if not isinstance(self._undefined_name, str):
+ return (
+ f"{object_type_repr(self._undefined_obj)} has no"
+ f" element {self._undefined_name!r}"
+ )
+
+ return (
+ f"{object_type_repr(self._undefined_obj)!r} has no"
+ f" attribute {self._undefined_name!r}"
+ )
+
@internalcode
- def _fail_with_undefined_error(self, *args, **kwargs):
- """Regular callback function for undefined objects that raises an
- `UndefinedError` on call.
+ def _fail_with_undefined_error(
+ self, *args: t.Any, **kwargs: t.Any
+ ) -> "te.NoReturn":
+ """Raise an :exc:`UndefinedError` when operations are performed
+ on the undefined value.
"""
- if self._undefined_hint is None:
- if self._undefined_obj is missing:
- hint = '%r is undefined' % self._undefined_name
- elif not isinstance(self._undefined_name, string_types):
- hint = '%s has no element %r' % (
- object_type_repr(self._undefined_obj),
- self._undefined_name
- )
- else:
- hint = '%r has no attribute %r' % (
- object_type_repr(self._undefined_obj),
- self._undefined_name
- )
- else:
- hint = self._undefined_hint
- raise self._undefined_exception(hint)
+ raise self._undefined_exception(self._undefined_message)
@internalcode
- def __getattr__(self, name):
- if name[:2] == '__':
+ def __getattr__(self, name: str) -> t.Any:
+ if name[:2] == "__":
raise AttributeError(name)
- return self._fail_with_undefined_error()
- __add__ = __radd__ = __mul__ = __rmul__ = __div__ = __rdiv__ = \
- __truediv__ = __rtruediv__ = __floordiv__ = __rfloordiv__ = \
- __mod__ = __rmod__ = __pos__ = __neg__ = __call__ = \
- __getitem__ = __lt__ = __le__ = __gt__ = __ge__ = __int__ = \
- __float__ = __complex__ = __pow__ = __rpow__ = \
- _fail_with_undefined_error
+ return self._fail_with_undefined_error()
- def __eq__(self, other):
+ __add__ = __radd__ = __sub__ = __rsub__ = _fail_with_undefined_error
+ __mul__ = __rmul__ = __div__ = __rdiv__ = _fail_with_undefined_error
+ __truediv__ = __rtruediv__ = _fail_with_undefined_error
+ __floordiv__ = __rfloordiv__ = _fail_with_undefined_error
+ __mod__ = __rmod__ = _fail_with_undefined_error
+ __pos__ = __neg__ = _fail_with_undefined_error
+ __call__ = __getitem__ = _fail_with_undefined_error
+ __lt__ = __le__ = __gt__ = __ge__ = _fail_with_undefined_error
+ __int__ = __float__ = __complex__ = _fail_with_undefined_error
+ __pow__ = __rpow__ = _fail_with_undefined_error
+
+ def __eq__(self, other: t.Any) -> bool:
return type(self) is type(other)
- def __ne__(self, other):
+ def __ne__(self, other: t.Any) -> bool:
return not self.__eq__(other)
- def __hash__(self):
+ def __hash__(self) -> int:
return id(type(self))
- def __str__(self):
- return u''
+ def __str__(self) -> str:
+ return ""
- def __len__(self):
+ def __len__(self) -> int:
return 0
- def __iter__(self):
- if 0:
- yield None
+ def __iter__(self) -> t.Iterator[t.Any]:
+ yield from ()
+
+ async def __aiter__(self) -> t.AsyncIterator[t.Any]:
+ for _ in ():
+ yield
- def __nonzero__(self):
+ def __bool__(self) -> bool:
return False
- __bool__ = __nonzero__
- def __repr__(self):
- return 'Undefined'
+ def __repr__(self) -> str:
+ return "Undefined"
-def make_logging_undefined(logger=None, base=None):
+def make_logging_undefined(
+ logger: t.Optional["logging.Logger"] = None, base: t.Type[Undefined] = Undefined
+) -> t.Type[Undefined]:
"""Given a logger object this returns a new undefined class that will
log certain failures. It will log iterations and printing. If no
logger is given a default logger is created.
@@ -549,66 +973,69 @@ def make_logging_undefined(logger=None, base=None):
"""
if logger is None:
import logging
+
logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler(sys.stderr))
- if base is None:
- base = Undefined
-
- def _log_message(undef):
- if undef._undefined_hint is None:
- if undef._undefined_obj is missing:
- hint = '%s is undefined' % undef._undefined_name
- elif not isinstance(undef._undefined_name, string_types):
- hint = '%s has no element %s' % (
- object_type_repr(undef._undefined_obj),
- undef._undefined_name)
- else:
- hint = '%s has no attribute %s' % (
- object_type_repr(undef._undefined_obj),
- undef._undefined_name)
- else:
- hint = undef._undefined_hint
- logger.warning('Template variable warning: %s', hint)
- class LoggingUndefined(base):
+ def _log_message(undef: Undefined) -> None:
+ logger.warning( # type: ignore
+ "Template variable warning: %s", undef._undefined_message
+ )
+
+ class LoggingUndefined(base): # type: ignore
+ __slots__ = ()
- def _fail_with_undefined_error(self, *args, **kwargs):
+ def _fail_with_undefined_error( # type: ignore
+ self, *args: t.Any, **kwargs: t.Any
+ ) -> "te.NoReturn":
try:
- return base._fail_with_undefined_error(self, *args, **kwargs)
+ super()._fail_with_undefined_error(*args, **kwargs)
except self._undefined_exception as e:
- logger.error('Template variable error: %s', str(e))
+ logger.error("Template variable error: %s", e) # type: ignore
raise e
- def __str__(self):
- rv = base.__str__(self)
+ def __str__(self) -> str:
_log_message(self)
- return rv
+ return super().__str__() # type: ignore
- def __iter__(self):
- rv = base.__iter__(self)
+ def __iter__(self) -> t.Iterator[t.Any]:
_log_message(self)
- return rv
-
- if PY2:
- def __nonzero__(self):
- rv = base.__nonzero__(self)
- _log_message(self)
- return rv
+ return super().__iter__() # type: ignore
- def __unicode__(self):
- rv = base.__unicode__(self)
- _log_message(self)
- return rv
- else:
- def __bool__(self):
- rv = base.__bool__(self)
- _log_message(self)
- return rv
+ def __bool__(self) -> bool:
+ _log_message(self)
+ return super().__bool__() # type: ignore
return LoggingUndefined
-@implements_to_string
+class ChainableUndefined(Undefined):
+ """An undefined that is chainable, where both ``__getattr__`` and
+ ``__getitem__`` return itself rather than raising an
+ :exc:`UndefinedError`.
+
+ >>> foo = ChainableUndefined(name='foo')
+ >>> str(foo.bar['baz'])
+ ''
+ >>> foo.bar['baz'] + 42
+ Traceback (most recent call last):
+ ...
+ jinja2.exceptions.UndefinedError: 'foo' is undefined
+
+ .. versionadded:: 2.11.0
+ """
+
+ __slots__ = ()
+
+ def __html__(self) -> str:
+ return str(self)
+
+ def __getattr__(self, _: str) -> "ChainableUndefined":
+ return self
+
+ __getitem__ = __getattr__ # type: ignore
+
+
class DebugUndefined(Undefined):
"""An undefined that returns the debug info when printed.
@@ -620,22 +1047,27 @@ class DebugUndefined(Undefined):
>>> foo + 42
Traceback (most recent call last):
...
- UndefinedError: 'foo' is undefined
+ jinja2.exceptions.UndefinedError: 'foo' is undefined
"""
+
__slots__ = ()
- def __str__(self):
- if self._undefined_hint is None:
- if self._undefined_obj is missing:
- return u'{{ %s }}' % self._undefined_name
- return '{{ no such element: %s[%r] }}' % (
- object_type_repr(self._undefined_obj),
- self._undefined_name
+ def __str__(self) -> str:
+ if self._undefined_hint:
+ message = f"undefined value printed: {self._undefined_hint}"
+
+ elif self._undefined_obj is missing:
+ message = self._undefined_name # type: ignore
+
+ else:
+ message = (
+ f"no such element: {object_type_repr(self._undefined_obj)}"
+ f"[{self._undefined_name!r}]"
)
- return u'{{ undefined value printed: %s }}' % self._undefined_hint
+
+ return f"{{{{ {message} }}}}"
-@implements_to_string
class StrictUndefined(Undefined):
"""An undefined that barks on print and iteration as well as boolean
tests and all kinds of comparisons. In other words: you can do nothing
@@ -645,22 +1077,28 @@ class StrictUndefined(Undefined):
>>> str(foo)
Traceback (most recent call last):
...
- UndefinedError: 'foo' is undefined
+ jinja2.exceptions.UndefinedError: 'foo' is undefined
>>> not foo
Traceback (most recent call last):
...
- UndefinedError: 'foo' is undefined
+ jinja2.exceptions.UndefinedError: 'foo' is undefined
>>> foo + 42
Traceback (most recent call last):
...
- UndefinedError: 'foo' is undefined
+ jinja2.exceptions.UndefinedError: 'foo' is undefined
"""
- __slots__ = ()
- __iter__ = __str__ = __len__ = __nonzero__ = __eq__ = \
- __ne__ = __bool__ = __hash__ = \
- Undefined._fail_with_undefined_error
-
-# remove remaining slots attributes, after the metaclass did the magic they
-# are unneeded and irritating as they contain wrong data for the subclasses.
-del Undefined.__slots__, DebugUndefined.__slots__, StrictUndefined.__slots__
+ __slots__ = ()
+ __iter__ = __str__ = __len__ = Undefined._fail_with_undefined_error
+ __eq__ = __ne__ = __bool__ = __hash__ = Undefined._fail_with_undefined_error
+ __contains__ = Undefined._fail_with_undefined_error
+
+
+# Remove slots attributes, after the metaclass is applied they are
+# unneeded and contain wrong data for subclasses.
+del (
+ Undefined.__slots__,
+ ChainableUndefined.__slots__,
+ DebugUndefined.__slots__,
+ StrictUndefined.__slots__,
+)