summaryrefslogtreecommitdiffhomepage
path: root/scripts/jinja2/bccache.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/jinja2/bccache.py')
-rw-r--r--scripts/jinja2/bccache.py242
1 files changed, 122 insertions, 120 deletions
diff --git a/scripts/jinja2/bccache.py b/scripts/jinja2/bccache.py
index f5bd314..3bb61b7 100644
--- a/scripts/jinja2/bccache.py
+++ b/scripts/jinja2/bccache.py
@@ -1,63 +1,47 @@
-# -*- coding: utf-8 -*-
-"""
- jinja2.bccache
- ~~~~~~~~~~~~~~
-
- This module implements the bytecode cache system Jinja is optionally
- using. This is useful if you have very complex template situations and
- the compiliation of all those templates slow down your application too
- much.
-
- Situations where this is useful are often forking web applications that
- are initialized on the first request.
+"""The optional bytecode cache system. This is useful if you have very
+complex template situations and the compilation of all those templates
+slows down your application too much.
- :copyright: (c) 2010 by the Jinja Team.
- :license: BSD.
+Situations where this is useful are often forking web applications that
+are initialized on the first request.
"""
-from os import path, listdir
-import os
-import sys
-import stat
import errno
+import fnmatch
import marshal
+import os
+import pickle
+import stat
+import sys
import tempfile
-import fnmatch
+import typing as t
from hashlib import sha1
-from jinja2.utils import open_if_exists
-from jinja2._compat import BytesIO, pickle, PY2, text_type
+from io import BytesIO
+from types import CodeType
+if t.TYPE_CHECKING:
+ import typing_extensions as te
+ from .environment import Environment
-# marshal works better on 3.x, one hack less required
-if not PY2:
- marshal_dump = marshal.dump
- marshal_load = marshal.load
-else:
+ class _MemcachedClient(te.Protocol):
+ def get(self, key: str) -> bytes:
+ ...
- def marshal_dump(code, f):
- if isinstance(f, file):
- marshal.dump(code, f)
- else:
- f.write(marshal.dumps(code))
+ def set(self, key: str, value: bytes, timeout: t.Optional[int] = None) -> None:
+ ...
- def marshal_load(f):
- if isinstance(f, file):
- return marshal.load(f)
- return marshal.loads(f.read())
+bc_version = 5
+# Magic bytes to identify Jinja bytecode cache files. Contains the
+# Python major and minor version to avoid loading incompatible bytecode
+# if a project upgrades its Python version.
+bc_magic = (
+ b"j2"
+ + pickle.dumps(bc_version, 2)
+ + pickle.dumps((sys.version_info[0] << 24) | sys.version_info[1], 2)
+)
-bc_version = 2
-# magic version used to only change with new jinja versions. With 2.6
-# we change this to also take Python version changes into account. The
-# reason for this is that Python tends to segfault if fed earlier bytecode
-# versions because someone thought it would be a good idea to reuse opcodes
-# or make Python incompatible with earlier versions.
-bc_magic = 'j2'.encode('ascii') + \
- pickle.dumps(bc_version, 2) + \
- pickle.dumps((sys.version_info[0] << 24) | sys.version_info[1])
-
-
-class Bucket(object):
+class Bucket:
"""Buckets are used to store the bytecode for one template. It's created
and initialized by the bytecode cache and passed to the loading functions.
@@ -66,17 +50,17 @@ class Bucket(object):
cache subclasses don't have to care about cache invalidation.
"""
- def __init__(self, environment, key, checksum):
+ def __init__(self, environment: "Environment", key: str, checksum: str) -> None:
self.environment = environment
self.key = key
self.checksum = checksum
self.reset()
- def reset(self):
+ def reset(self) -> None:
"""Resets the bucket (unloads the bytecode)."""
- self.code = None
+ self.code: t.Optional[CodeType] = None
- def load_bytecode(self, f):
+ def load_bytecode(self, f: t.BinaryIO) -> None:
"""Loads bytecode from a file or file like object."""
# make sure the magic header is correct
magic = f.read(len(bc_magic))
@@ -90,31 +74,31 @@ class Bucket(object):
return
# if marshal_load fails then we need to reload
try:
- self.code = marshal_load(f)
+ self.code = marshal.load(f)
except (EOFError, ValueError, TypeError):
self.reset()
return
- def write_bytecode(self, f):
+ def write_bytecode(self, f: t.BinaryIO) -> None:
"""Dump the bytecode into the file or file like object passed."""
if self.code is None:
- raise TypeError('can\'t write empty bucket')
+ raise TypeError("can't write empty bucket")
f.write(bc_magic)
pickle.dump(self.checksum, f, 2)
- marshal_dump(self.code, f)
+ marshal.dump(self.code, f)
- def bytecode_from_string(self, string):
- """Load bytecode from a string."""
+ def bytecode_from_string(self, string: bytes) -> None:
+ """Load bytecode from bytes."""
self.load_bytecode(BytesIO(string))
- def bytecode_to_string(self):
- """Return the bytecode as string."""
+ def bytecode_to_string(self) -> bytes:
+ """Return the bytecode as bytes."""
out = BytesIO()
self.write_bytecode(out)
return out.getvalue()
-class BytecodeCache(object):
+class BytecodeCache:
"""To implement your own bytecode cache you have to subclass this class
and override :meth:`load_bytecode` and :meth:`dump_bytecode`. Both of
these methods are passed a :class:`~jinja2.bccache.Bucket`.
@@ -140,44 +124,51 @@ class BytecodeCache(object):
bucket.write_bytecode(f)
A more advanced version of a filesystem based bytecode cache is part of
- Jinja2.
+ Jinja.
"""
- def load_bytecode(self, bucket):
+ def load_bytecode(self, bucket: Bucket) -> None:
"""Subclasses have to override this method to load bytecode into a
bucket. If they are not able to find code in the cache for the
bucket, it must not do anything.
"""
raise NotImplementedError()
- def dump_bytecode(self, bucket):
+ def dump_bytecode(self, bucket: Bucket) -> None:
"""Subclasses have to override this method to write the bytecode
from a bucket back to the cache. If it unable to do so it must not
fail silently but raise an exception.
"""
raise NotImplementedError()
- def clear(self):
- """Clears the cache. This method is not used by Jinja2 but should be
+ def clear(self) -> None:
+ """Clears the cache. This method is not used by Jinja but should be
implemented to allow applications to clear the bytecode cache used
by a particular environment.
"""
- def get_cache_key(self, name, filename=None):
+ def get_cache_key(
+ self, name: str, filename: t.Optional[t.Union[str]] = None
+ ) -> str:
"""Returns the unique hash key for this template name."""
- hash = sha1(name.encode('utf-8'))
+ hash = sha1(name.encode("utf-8"))
+
if filename is not None:
- filename = '|' + filename
- if isinstance(filename, text_type):
- filename = filename.encode('utf-8')
- hash.update(filename)
+ hash.update(f"|{filename}".encode())
+
return hash.hexdigest()
- def get_source_checksum(self, source):
+ def get_source_checksum(self, source: str) -> str:
"""Returns a checksum for the source."""
- return sha1(source.encode('utf-8')).hexdigest()
-
- def get_bucket(self, environment, name, filename, source):
+ return sha1(source.encode("utf-8")).hexdigest()
+
+ def get_bucket(
+ self,
+ environment: "Environment",
+ name: str,
+ filename: t.Optional[str],
+ source: str,
+ ) -> Bucket:
"""Return a cache bucket for the given template. All arguments are
mandatory but filename may be `None`.
"""
@@ -187,7 +178,7 @@ class BytecodeCache(object):
self.load_bytecode(bucket)
return bucket
- def set_bucket(self, bucket):
+ def set_bucket(self, bucket: Bucket) -> None:
"""Put the bucket into the cache."""
self.dump_bytecode(bucket)
@@ -210,27 +201,31 @@ class FileSystemBytecodeCache(BytecodeCache):
This bytecode cache supports clearing of the cache using the clear method.
"""
- def __init__(self, directory=None, pattern='__jinja2_%s.cache'):
+ def __init__(
+ self, directory: t.Optional[str] = None, pattern: str = "__jinja2_%s.cache"
+ ) -> None:
if directory is None:
directory = self._get_default_cache_dir()
self.directory = directory
self.pattern = pattern
- def _get_default_cache_dir(self):
- def _unsafe_dir():
- raise RuntimeError('Cannot determine safe temp directory. You '
- 'need to explicitly provide one.')
+ def _get_default_cache_dir(self) -> str:
+ def _unsafe_dir() -> "te.NoReturn":
+ raise RuntimeError(
+ "Cannot determine safe temp directory. You "
+ "need to explicitly provide one."
+ )
tmpdir = tempfile.gettempdir()
# On windows the temporary directory is used specific unless
# explicitly forced otherwise. We can just use that.
- if os.name == 'nt':
+ if os.name == "nt":
return tmpdir
- if not hasattr(os, 'getuid'):
+ if not hasattr(os, "getuid"):
_unsafe_dir()
- dirname = '_jinja2-cache-%d' % os.getuid()
+ dirname = f"_jinja2-cache-{os.getuid()}"
actual_dir = os.path.join(tmpdir, dirname)
try:
@@ -241,49 +236,50 @@ class FileSystemBytecodeCache(BytecodeCache):
try:
os.chmod(actual_dir, stat.S_IRWXU)
actual_dir_stat = os.lstat(actual_dir)
- if actual_dir_stat.st_uid != os.getuid() \
- or not stat.S_ISDIR(actual_dir_stat.st_mode) \
- or stat.S_IMODE(actual_dir_stat.st_mode) != stat.S_IRWXU:
+ if (
+ actual_dir_stat.st_uid != os.getuid()
+ or not stat.S_ISDIR(actual_dir_stat.st_mode)
+ or stat.S_IMODE(actual_dir_stat.st_mode) != stat.S_IRWXU
+ ):
_unsafe_dir()
except OSError as e:
if e.errno != errno.EEXIST:
raise
actual_dir_stat = os.lstat(actual_dir)
- if actual_dir_stat.st_uid != os.getuid() \
- or not stat.S_ISDIR(actual_dir_stat.st_mode) \
- or stat.S_IMODE(actual_dir_stat.st_mode) != stat.S_IRWXU:
+ if (
+ actual_dir_stat.st_uid != os.getuid()
+ or not stat.S_ISDIR(actual_dir_stat.st_mode)
+ or stat.S_IMODE(actual_dir_stat.st_mode) != stat.S_IRWXU
+ ):
_unsafe_dir()
return actual_dir
- def _get_cache_filename(self, bucket):
- return path.join(self.directory, self.pattern % bucket.key)
+ def _get_cache_filename(self, bucket: Bucket) -> str:
+ return os.path.join(self.directory, self.pattern % (bucket.key,))
- def load_bytecode(self, bucket):
- f = open_if_exists(self._get_cache_filename(bucket), 'rb')
- if f is not None:
- try:
+ def load_bytecode(self, bucket: Bucket) -> None:
+ filename = self._get_cache_filename(bucket)
+
+ if os.path.exists(filename):
+ with open(filename, "rb") as f:
bucket.load_bytecode(f)
- finally:
- f.close()
- def dump_bytecode(self, bucket):
- f = open(self._get_cache_filename(bucket), 'wb')
- try:
+ def dump_bytecode(self, bucket: Bucket) -> None:
+ with open(self._get_cache_filename(bucket), "wb") as f:
bucket.write_bytecode(f)
- finally:
- f.close()
- def clear(self):
+ def clear(self) -> None:
# imported lazily here because google app-engine doesn't support
# write access on the file system and the function does not exist
# normally.
from os import remove
- files = fnmatch.filter(listdir(self.directory), self.pattern % '*')
+
+ files = fnmatch.filter(os.listdir(self.directory), self.pattern % ("*",))
for filename in files:
try:
- remove(path.join(self.directory, filename))
+ remove(os.path.join(self.directory, filename))
except OSError:
pass
@@ -296,12 +292,11 @@ class MemcachedBytecodeCache(BytecodeCache):
Libraries compatible with this class:
- - `werkzeug <http://werkzeug.pocoo.org/>`_.contrib.cache
- - `python-memcached <http://www.tummy.com/Community/software/python-memcached/>`_
- - `cmemcache <http://gijsbert.org/cmemcache/>`_
+ - `cachelib <https://github.com/pallets/cachelib>`_
+ - `python-memcached <https://pypi.org/project/python-memcached/>`_
(Unfortunately the django cache interface is not compatible because it
- does not support storing binary data, only unicode. You can however pass
+ does not support storing binary data, only text. You can however pass
the underlying cache client to the bytecode cache which is available
as `django.core.cache.cache._client`.)
@@ -334,29 +329,36 @@ class MemcachedBytecodeCache(BytecodeCache):
`ignore_memcache_errors` parameter.
"""
- def __init__(self, client, prefix='jinja2/bytecode/', timeout=None,
- ignore_memcache_errors=True):
+ def __init__(
+ self,
+ client: "_MemcachedClient",
+ prefix: str = "jinja2/bytecode/",
+ timeout: t.Optional[int] = None,
+ ignore_memcache_errors: bool = True,
+ ):
self.client = client
self.prefix = prefix
self.timeout = timeout
self.ignore_memcache_errors = ignore_memcache_errors
- def load_bytecode(self, bucket):
+ def load_bytecode(self, bucket: Bucket) -> None:
try:
code = self.client.get(self.prefix + bucket.key)
except Exception:
if not self.ignore_memcache_errors:
raise
- code = None
- if code is not None:
+ else:
bucket.bytecode_from_string(code)
- def dump_bytecode(self, bucket):
- args = (self.prefix + bucket.key, bucket.bytecode_to_string())
- if self.timeout is not None:
- args += (self.timeout,)
+ def dump_bytecode(self, bucket: Bucket) -> None:
+ key = self.prefix + bucket.key
+ value = bucket.bytecode_to_string()
+
try:
- self.client.set(*args)
+ if self.timeout is not None:
+ self.client.set(key, value, self.timeout)
+ else:
+ self.client.set(key, value)
except Exception:
if not self.ignore_memcache_errors:
raise