sentry_sdk module
The Sentry SDK is the new-style SDK for sentry.io. It implements the unified API that all modern SDKs follow for Python 2.7 and 3.5 or later.
The user documentation can be found on docs.sentry.io.
Quickstart
The only thing to get going is to call sentry_sdk.init(). When not passed any
arguments the default options are used and the DSN is picked up from the SENTRY_DSN
environment variable. Otherwise the DSN can be passed with the dsn keyword
or first argument.
import sentry_sdk sentry_sdk.init()
This initializes the default integrations which will automatically pick up any uncaught exceptions. Additionally you can report arbitrary other exceptions:
try: my_failing_function() except Exception as e: sentry_sdk.capture_exception(e)
""" The Sentry SDK is the new-style SDK for [sentry.io](https://sentry.io/). It implements the unified API that all modern SDKs follow for Python 2.7 and 3.5 or later. The user documentation can be found on [docs.sentry.io](https://docs.sentry.io/). ## Quickstart The only thing to get going is to call `sentry_sdk.init()`. When not passed any arguments the default options are used and the DSN is picked up from the `SENTRY_DSN` environment variable. Otherwise the DSN can be passed with the `dsn` keyword or first argument. import sentry_sdk sentry_sdk.init() This initializes the default integrations which will automatically pick up any uncaught exceptions. Additionally you can report arbitrary other exceptions: try: my_failing_function() except Exception as e: sentry_sdk.capture_exception(e) """ from sentry_sdk.hub import Hub, init from sentry_sdk.scope import Scope from sentry_sdk.transport import Transport, HttpTransport from sentry_sdk.client import Client from sentry_sdk.api import * # noqa from sentry_sdk.api import __all__ as api_all from sentry_sdk.consts import VERSION # noqa __all__ = api_all + [ # noqa "Hub", "Scope", "Client", "Transport", "HttpTransport", "init", "integrations", ] # Initialize the debug support after everything is loaded from sentry_sdk.debug import init_debug_support init_debug_support() del init_debug_support
Functions
def add_breadcrumb(
crumb=None, hint=None, **kwargs)
Alias for Hub.add_breadcrumb
Adds a breadcrumb. The breadcrumbs are a dictionary with the
data as the sentry v7/v8 protocol expects. hint is an optional
value that can be used by before_breadcrumb to customize the
breadcrumbs that are emitted.
def capture_event(
event, hint=None)
Alias for Hub.capture_event
Captures an event. The return value is the ID of the event.
The event is a dictionary following the Sentry v7/v8 protocol specification. Optionally an event hint dict can be passed that is used by processors to extract additional information from it. Typically the event hint object would contain exception information.
@hubmethod def capture_event(event, hint=None): # type: (Event, Optional[Hint]) -> Optional[str] hub = Hub.current if hub is not None: return hub.capture_event(event, hint) return None
def capture_exception(
error=None)
Alias for Hub.capture_exception
Captures an exception.
The argument passed can be None in which case the last exception
will be reported, otherwise an exception object or an exc_info
tuple.
@hubmethod def capture_exception(error=None): # type: (Optional[BaseException]) -> Optional[str] hub = Hub.current if hub is not None: return hub.capture_exception(error) return None
def capture_message(
message, level=None)
Alias for Hub.capture_message
Captures a message. The message is just a string. If no level
is provided the default level is info.
@hubmethod def capture_message(message, level=None): # type: (str, Optional[str]) -> Optional[str] hub = Hub.current if hub is not None: return hub.capture_message(message, level) return None
def configure_scope(
callback=None)
Alias for Hub.configure_scope
Reconfigures the scope.
@hubmethod # noqa def configure_scope( callback=None # type: Optional[Callable[[Scope], None]] ): # type: (...) -> Optional[ContextManager[Scope]] hub = Hub.current if hub is not None: return hub.configure_scope(callback) elif callback is None: @contextmanager def inner(): yield Scope() return inner() else: # returned if user provided callback return None
def flush(
timeout=None, callback=None)
Alias for Hub.flush
Alias for self.client.flush
@hubmethod def flush(timeout=None, callback=None): # type: (Optional[float], Optional[Callable[[int, float], None]]) -> None hub = Hub.current if hub is not None: return hub.flush(timeout=timeout, callback=callback)
def init(
*args, **kwargs)
Initializes the SDK and optionally integrations.
This takes the same arguments as the client constructor.
def init(*args, **kwargs): # type: (*str, **Any) -> ContextManager[Any] # TODO: https://github.com/getsentry/sentry-python/issues/272 """Initializes the SDK and optionally integrations. This takes the same arguments as the client constructor. """ global _initial_client client = Client(*args, **kwargs) Hub.current.bind_client(client) rv = _InitGuard(client) if client is not None: _initial_client = weakref.ref(client) return rv
def last_event_id(
)
Alias for Hub.last_event_id
Returns the last event ID.
@hubmethod def last_event_id(): # type: () -> Optional[str] hub = Hub.current if hub is not None: return hub.last_event_id() return None
def push_scope(
callback=None)
Alias for Hub.push_scope
Pushes a new layer on the scope stack. Returns a context manager that should be used to pop the scope again. Alternatively a callback can be provided that is executed in the context of the scope.
@hubmethod # noqa def push_scope( callback=None # type: Optional[Callable[[Scope], None]] ): # type: (...) -> Optional[ContextManager[Scope]] hub = Hub.current if hub is not None: return hub.push_scope(callback) elif callback is None: @contextmanager def inner(): yield Scope() return inner() else: # returned if user provided callback return None
Classes
class Client
The client is internally responsible for capturing the events and forwarding them to sentry through the configured transport. It takes the client options as keyword arguments and optionally the DSN as first argument.
class Client(object): """The client is internally responsible for capturing the events and forwarding them to sentry through the configured transport. It takes the client options as keyword arguments and optionally the DSN as first argument. """ def __init__(self, *args, **kwargs): # type: (*str, **ClientOptions) -> None old_debug = _client_init_debug.get(False) try: self.options = options = get_options(*args, **kwargs) _client_init_debug.set(options["debug"]) self.transport = make_transport(options) request_bodies = ("always", "never", "small", "medium") if options["request_bodies"] not in request_bodies: raise ValueError( "Invalid value for request_bodies. Must be one of {}".format( request_bodies ) ) self.integrations = setup_integrations( options["integrations"], with_defaults=options["default_integrations"] ) finally: _client_init_debug.set(old_debug) @property def dsn(self): # type: () -> Optional[str] """Returns the configured DSN as string.""" return self.options["dsn"] def _prepare_event( self, event, # type: Event hint, # type: Optional[Hint] scope, # type: Optional[Scope] ): # type: (...) -> Optional[Event] if event.get("timestamp") is None: event["timestamp"] = datetime.utcnow() hint = dict(hint or ()) # type: Hint if scope is not None: event_ = scope.apply_to_event(event, hint) if event_ is None: return None event = event_ if ( self.options["attach_stacktrace"] and "exception" not in event and "stacktrace" not in event and "threads" not in event ): with capture_internal_exceptions(): event["threads"] = { "values": [ { "stacktrace": current_stacktrace( self.options["with_locals"] ), "crashed": False, "current": True, } ] } for key in "release", "environment", "server_name", "dist": if event.get(key) is None and self.options[key] is not None: # type: ignore event[key] = text_type(self.options[key]).strip() # type: ignore if event.get("sdk") is None: sdk_info = dict(SDK_INFO) sdk_info["integrations"] = sorted(self.integrations.keys()) event["sdk"] = sdk_info if event.get("platform") is None: event["platform"] = "python" event = handle_in_app( event, self.options["in_app_exclude"], self.options["in_app_include"] ) # Postprocess the event here so that annotated types do # generally not surface in before_send if event is not None: event = Serializer().serialize_event(event) before_send = self.options["before_send"] if before_send is not None: new_event = None with capture_internal_exceptions(): new_event = before_send(event, hint or {}) if new_event is None: logger.info("before send dropped event (%s)", event) event = new_event # type: ignore return event def _is_ignored_error(self, event, hint): # type: (Event, Hint) -> bool exc_info = hint.get("exc_info") if exc_info is None: return False type_name = get_type_name(exc_info[0]) full_name = "%s.%s" % (exc_info[0].__module__, type_name) for errcls in self.options["ignore_errors"]: # String types are matched against the type name in the # exception only if isinstance(errcls, string_types): if errcls == full_name or errcls == type_name: return True else: if issubclass(exc_info[0], errcls): # type: ignore return True return False def _should_capture( self, event, # type: Event hint, # type: Hint scope=None, # type: Optional[Scope] ): # type: (...) -> bool if scope is not None and not scope._should_capture: return False if ( self.options["sample_rate"] < 1.0 and random.random() >= self.options["sample_rate"] ): return False if self._is_ignored_error(event, hint): return False return True def capture_event(self, event, hint=None, scope=None): # type: (Dict[str, Any], Optional[Any], Optional[Scope]) -> Optional[str] """Captures an event. This takes the ready made event and an optional hint and scope. The hint is internally used to further customize the representation of the error. When provided it's a dictionary of optional information such as exception info. If the transport is not set nothing happens, otherwise the return value of this function will be the ID of the captured event. """ if self.transport is None: return None if hint is None: hint = {} rv = event.get("event_id") if rv is None: event["event_id"] = rv = uuid.uuid4().hex if not self._should_capture(event, hint, scope): return None event = self._prepare_event(event, hint, scope) if event is None: return None self.transport.capture_event(event) return rv def close(self, timeout=None, callback=None): # type: (Optional[float], Optional[Callable[[int, float], None]]) -> None """ Close the client and shut down the transport. Arguments have the same semantics as `self.flush()`. """ if self.transport is not None: self.flush(timeout=timeout, callback=callback) self.transport.kill() self.transport = None def flush(self, timeout=None, callback=None): # type: (Optional[float], Optional[Callable[[int, float], None]]) -> None """ Wait `timeout` seconds for the current events to be sent. If no `timeout` is provided, the `shutdown_timeout` option value is used. The `callback` is invoked with two arguments: the number of pending events and the configured timeout. For instance the default atexit integration will use this to render out a message on stderr. """ if self.transport is not None: if timeout is None: timeout = self.options["shutdown_timeout"] self.transport.flush(timeout=timeout, callback=callback) def __enter__(self): # type: () -> Client return self def __exit__(self, exc_type, exc_value, tb): # type: (Any, Any, Any) -> None self.close()
Ancestors (in MRO)
- Client
- __builtin__.object
Instance variables
var dsn
Returns the configured DSN as string.
Methods
def __init__(
self, *args, **kwargs)
def __init__(self, *args, **kwargs): # type: (*str, **ClientOptions) -> None old_debug = _client_init_debug.get(False) try: self.options = options = get_options(*args, **kwargs) _client_init_debug.set(options["debug"]) self.transport = make_transport(options) request_bodies = ("always", "never", "small", "medium") if options["request_bodies"] not in request_bodies: raise ValueError( "Invalid value for request_bodies. Must be one of {}".format( request_bodies ) ) self.integrations = setup_integrations( options["integrations"], with_defaults=options["default_integrations"] ) finally: _client_init_debug.set(old_debug)
def capture_event(
self, event, hint=None, scope=None)
Captures an event.
This takes the ready made event and an optional hint and scope. The hint is internally used to further customize the representation of the error. When provided it's a dictionary of optional information such as exception info.
If the transport is not set nothing happens, otherwise the return value of this function will be the ID of the captured event.
def capture_event(self, event, hint=None, scope=None): # type: (Dict[str, Any], Optional[Any], Optional[Scope]) -> Optional[str] """Captures an event. This takes the ready made event and an optional hint and scope. The hint is internally used to further customize the representation of the error. When provided it's a dictionary of optional information such as exception info. If the transport is not set nothing happens, otherwise the return value of this function will be the ID of the captured event. """ if self.transport is None: return None if hint is None: hint = {} rv = event.get("event_id") if rv is None: event["event_id"] = rv = uuid.uuid4().hex if not self._should_capture(event, hint, scope): return None event = self._prepare_event(event, hint, scope) if event is None: return None self.transport.capture_event(event) return rv
def close(
self, timeout=None, callback=None)
Close the client and shut down the transport. Arguments have the same
semantics as self.flush().
def close(self, timeout=None, callback=None): # type: (Optional[float], Optional[Callable[[int, float], None]]) -> None """ Close the client and shut down the transport. Arguments have the same semantics as `self.flush()`. """ if self.transport is not None: self.flush(timeout=timeout, callback=callback) self.transport.kill() self.transport = None
def flush(
self, timeout=None, callback=None)
Wait timeout seconds for the current events to be sent. If no
timeout is provided, the shutdown_timeout option value is used.
The callback is invoked with two arguments: the number of pending
events and the configured timeout. For instance the default atexit
integration will use this to render out a message on stderr.
def flush(self, timeout=None, callback=None): # type: (Optional[float], Optional[Callable[[int, float], None]]) -> None """ Wait `timeout` seconds for the current events to be sent. If no `timeout` is provided, the `shutdown_timeout` option value is used. The `callback` is invoked with two arguments: the number of pending events and the configured timeout. For instance the default atexit integration will use this to render out a message on stderr. """ if self.transport is not None: if timeout is None: timeout = self.options["shutdown_timeout"] self.transport.flush(timeout=timeout, callback=callback)
class HttpTransport
The default HTTP transport.
class HttpTransport(Transport): """The default HTTP transport.""" def __init__(self, options): # type: (ClientOptions) -> None Transport.__init__(self, options) assert self.parsed_dsn is not None self._worker = BackgroundWorker() self._auth = self.parsed_dsn.to_auth("sentry.python/%s" % VERSION) self._disabled_until = None # type: Optional[datetime] self._retry = urllib3.util.Retry() self.options = options self._pool = self._make_pool( self.parsed_dsn, http_proxy=options["http_proxy"], https_proxy=options["https_proxy"], ca_certs=options["ca_certs"], ) from sentry_sdk import Hub self.hub_cls = Hub def _send_event(self, event): # type: (Event) -> None if self._disabled_until is not None: if datetime.utcnow() < self._disabled_until: return self._disabled_until = None body = io.BytesIO() with gzip.GzipFile(fileobj=body, mode="w") as f: f.write(json.dumps(event, allow_nan=False).encode("utf-8")) assert self.parsed_dsn is not None logger.debug( "Sending %s event [%s] to %s project:%s" % ( event.get("level") or "error", event["event_id"], self.parsed_dsn.host, self.parsed_dsn.project_id, ) ) response = self._pool.request( "POST", str(self._auth.store_api_url), body=body.getvalue(), headers={ "X-Sentry-Auth": str(self._auth.to_header()), "Content-Type": "application/json", "Content-Encoding": "gzip", }, ) try: if response.status == 429: self._disabled_until = datetime.utcnow() + timedelta( seconds=self._retry.get_retry_after(response) or 60 ) return elif response.status >= 300 or response.status < 200: logger.error( "Unexpected status code: %s (body: %s)", response.status, response.data, ) finally: response.close() self._disabled_until = None def _get_pool_options(self, ca_certs): # type: (Optional[Any]) -> Dict[str, Any] return { "num_pools": 2, "cert_reqs": "CERT_REQUIRED", "ca_certs": ca_certs or certifi.where(), } def _make_pool( self, parsed_dsn, # type: Dsn http_proxy, # type: Optional[str] https_proxy, # type: Optional[str] ca_certs, # type: Optional[Any] ): # type: (...) -> Union[PoolManager, ProxyManager] proxy = None # try HTTPS first if parsed_dsn.scheme == "https" and (https_proxy != ""): proxy = https_proxy or getproxies().get("https") # maybe fallback to HTTP proxy if not proxy and (http_proxy != ""): proxy = http_proxy or getproxies().get("http") opts = self._get_pool_options(ca_certs) if proxy: return urllib3.ProxyManager(proxy, **opts) else: return urllib3.PoolManager(**opts) def capture_event(self, event): # type: (Event) -> None hub = self.hub_cls.current def send_event_wrapper(): # type: () -> None with hub: with capture_internal_exceptions(): self._send_event(event) self._worker.submit(send_event_wrapper) def flush(self, timeout, callback=None): # type: (float, Optional[Any]) -> None logger.debug("Flushing HTTP transport") if timeout > 0: self._worker.flush(timeout, callback) def kill(self): # type: () -> None logger.debug("Killing HTTP transport") self._worker.kill()
Ancestors (in MRO)
- HttpTransport
- Transport
- __builtin__.object
Class variables
Instance variables
var hub_cls
Methods
def __init__(
self, options)
Inheritance:
Transport.__init__
def __init__(self, options): # type: (ClientOptions) -> None Transport.__init__(self, options) assert self.parsed_dsn is not None self._worker = BackgroundWorker() self._auth = self.parsed_dsn.to_auth("sentry.python/%s" % VERSION) self._disabled_until = None # type: Optional[datetime] self._retry = urllib3.util.Retry() self.options = options self._pool = self._make_pool( self.parsed_dsn, http_proxy=options["http_proxy"], https_proxy=options["https_proxy"], ca_certs=options["ca_certs"], ) from sentry_sdk import Hub self.hub_cls = Hub
def capture_event(
self, event)
Inheritance:
Transport.capture_event
This gets invoked with the event dictionary when an event should be sent to sentry.
def capture_event(self, event): # type: (Event) -> None hub = self.hub_cls.current def send_event_wrapper(): # type: () -> None with hub: with capture_internal_exceptions(): self._send_event(event) self._worker.submit(send_event_wrapper)
def flush(
self, timeout, callback=None)
Wait timeout seconds for the current events to be sent out.
def flush(self, timeout, callback=None): # type: (float, Optional[Any]) -> None logger.debug("Flushing HTTP transport") if timeout > 0: self._worker.flush(timeout, callback)
def kill(
self)
Forcefully kills the transport.
def kill(self): # type: () -> None logger.debug("Killing HTTP transport") self._worker.kill()
class Hub
The hub wraps the concurrency management of the SDK. Each thread has its own hub but the hub might transfer with the flow of execution if context vars are available.
If the hub is used with a with statement it's temporarily activated.
class Hub(with_metaclass(HubMeta)): # type: ignore """The hub wraps the concurrency management of the SDK. Each thread has its own hub but the hub might transfer with the flow of execution if context vars are available. If the hub is used with a with statement it's temporarily activated. """ _stack = None # type: List[Tuple[Optional[Client], Scope]] # Mypy doesn't pick up on the metaclass. if False: current = None # type: Hub main = None # type: Hub def __init__(self, client_or_hub=None, scope=None): # type: (Optional[Union[Hub, Client]], Optional[Any]) -> None if isinstance(client_or_hub, Hub): hub = client_or_hub client, other_scope = hub._stack[-1] if scope is None: scope = copy.copy(other_scope) else: client = client_or_hub if scope is None: scope = Scope() self._stack = [(client, scope)] self._last_event_id = None # type: Optional[str] self._old_hubs = [] # type: List[Hub] def __enter__(self): # type: () -> Hub self._old_hubs.append(Hub.current) _local.set(self) return self def __exit__( self, exc_type, # type: Optional[type] exc_value, # type: Optional[BaseException] tb, # type: Optional[Any] ): # type: (...) -> None old = self._old_hubs.pop() _local.set(old) def run(self, callback): # type: (Callable[[], T]) -> T """Runs a callback in the context of the hub. Alternatively the with statement can be used on the hub directly. """ with self: return callback() def get_integration(self, name_or_class): # type: (Union[str, Type[Integration]]) -> Any """Returns the integration for this hub by name or class. If there is no client bound or the client does not have that integration then `None` is returned. If the return value is not `None` the hub is guaranteed to have a client attached. """ if isinstance(name_or_class, str): integration_name = name_or_class elif name_or_class.identifier is not None: integration_name = name_or_class.identifier else: raise ValueError("Integration has no name") client = self._stack[-1][0] if client is not None: rv = client.integrations.get(integration_name) if rv is not None: return rv if _initial_client is not None: initial_client = _initial_client() else: initial_client = None if ( initial_client is not None and initial_client is not client and initial_client.integrations.get(integration_name) is not None ): warning = ( "Integration %r attempted to run but it was only " "enabled on init() but not the client that " "was bound to the current flow. Earlier versions of " "the SDK would consider these integrations enabled but " "this is no longer the case." % (name_or_class,) ) warn(Warning(warning), stacklevel=3) logger.warning(warning) @property def client(self): # type: () -> Optional[Client] """Returns the current client on the hub.""" return self._stack[-1][0] def last_event_id(self): # type: () -> Optional[str] """Returns the last event ID.""" return self._last_event_id def bind_client(self, new): # type: (Optional[Client]) -> None """Binds a new client to the hub.""" top = self._stack[-1] self._stack[-1] = (new, top[1]) def capture_event(self, event, hint=None): # type: (Event, Optional[Hint]) -> Optional[str] """Captures an event. The return value is the ID of the event. The event is a dictionary following the Sentry v7/v8 protocol specification. Optionally an event hint dict can be passed that is used by processors to extract additional information from it. Typically the event hint object would contain exception information. """ client, scope = self._stack[-1] if client is not None: rv = client.capture_event(event, hint, scope) if rv is not None: self._last_event_id = rv return rv return None def capture_message(self, message, level=None): # type: (str, Optional[str]) -> Optional[str] """Captures a message. The message is just a string. If no level is provided the default level is `info`. """ if self.client is None: return None if level is None: level = "info" return self.capture_event({"message": message, "level": level}) def capture_exception(self, error=None): # type: (Optional[BaseException]) -> Optional[str] """Captures an exception. The argument passed can be `None` in which case the last exception will be reported, otherwise an exception object or an `exc_info` tuple. """ client = self.client if client is None: return None if error is None: exc_info = sys.exc_info() else: exc_info = exc_info_from_error(error) event, hint = event_from_exception(exc_info, client_options=client.options) try: return self.capture_event(event, hint=hint) except Exception: self._capture_internal_exception(sys.exc_info()) return None def _capture_internal_exception(self, exc_info): # type: (_OptExcInfo) -> Any """Capture an exception that is likely caused by a bug in the SDK itself.""" logger.error("Internal error in sentry_sdk", exc_info=exc_info) # type: ignore def add_breadcrumb(self, crumb=None, hint=None, **kwargs): # type: (Optional[Breadcrumb], Optional[BreadcrumbHint], **Any) -> None """Adds a breadcrumb. The breadcrumbs are a dictionary with the data as the sentry v7/v8 protocol expects. `hint` is an optional value that can be used by `before_breadcrumb` to customize the breadcrumbs that are emitted. """ client, scope = self._stack[-1] if client is None: logger.info("Dropped breadcrumb because no client bound") return crumb = dict(crumb or ()) # type: Breadcrumb crumb.update(kwargs) if not crumb: return hint = dict(hint or ()) # type: Hint if crumb.get("timestamp") is None: crumb["timestamp"] = datetime.utcnow() if crumb.get("type") is None: crumb["type"] = "default" if client.options["before_breadcrumb"] is not None: new_crumb = client.options["before_breadcrumb"](crumb, hint) else: new_crumb = crumb if new_crumb is not None: scope._breadcrumbs.append(new_crumb) else: logger.info("before breadcrumb dropped breadcrumb (%s)", crumb) max_breadcrumbs = client.options["max_breadcrumbs"] # type: int while len(scope._breadcrumbs) > max_breadcrumbs: scope._breadcrumbs.popleft() @overload # noqa def push_scope(self, callback=None): # type: (Optional[None]) -> ContextManager[Scope] pass @overload # noqa def push_scope(self, callback): # type: (Callable[[Scope], None]) -> None pass def push_scope( # noqa self, callback=None # type: Optional[Callable[[Scope], None]] ): # type: (...) -> Optional[ContextManager[Scope]] """Pushes a new layer on the scope stack. Returns a context manager that should be used to pop the scope again. Alternatively a callback can be provided that is executed in the context of the scope. """ if callback is not None: with self.push_scope() as scope: callback(scope) return None client, scope = self._stack[-1] new_layer = (client, copy.copy(scope)) self._stack.append(new_layer) return _ScopeManager(self) scope = push_scope def pop_scope_unsafe(self): # type: () -> Tuple[Optional[Client], Scope] """Pops a scope layer from the stack. Try to use the context manager `push_scope()` instead.""" rv = self._stack.pop() assert self._stack, "stack must have at least one layer" return rv @overload # noqa def configure_scope(self, callback=None): # type: (Optional[None]) -> ContextManager[Scope] pass @overload # noqa def configure_scope(self, callback): # type: (Callable[[Scope], None]) -> None pass def configure_scope( # noqa self, callback=None # type: Optional[Callable[[Scope], None]] ): # noqa # type: (...) -> Optional[ContextManager[Scope]] """Reconfigures the scope.""" client, scope = self._stack[-1] if callback is not None: if client is not None: callback(scope) return None @contextmanager def inner(): # type: () -> Generator[Scope, None, None] if client is not None: yield scope else: yield Scope() return inner() def flush(self, timeout=None, callback=None): # type: (Optional[float], Optional[Callable[[int, float], None]]) -> None """Alias for self.client.flush""" client, scope = self._stack[-1] if client is not None: return client.flush(timeout=timeout, callback=callback) def iter_trace_propagation_headers(self): # type: () -> Generator[Tuple[str, str], None, None] client, scope = self._stack[-1] if scope._span is None: return propagate_traces = client and client.options["propagate_traces"] if not propagate_traces: return for item in scope._span.iter_headers(): yield item
Ancestors (in MRO)
- Hub
- __builtin__.object
Instance variables
var client
Returns the current client on the hub.
Methods
def __init__(
self, client_or_hub=None, scope=None)
def __init__(self, client_or_hub=None, scope=None): # type: (Optional[Union[Hub, Client]], Optional[Any]) -> None if isinstance(client_or_hub, Hub): hub = client_or_hub client, other_scope = hub._stack[-1] if scope is None: scope = copy.copy(other_scope) else: client = client_or_hub if scope is None: scope = Scope() self._stack = [(client, scope)] self._last_event_id = None # type: Optional[str] self._old_hubs = [] # type: List[Hub]
def add_breadcrumb(
self, crumb=None, hint=None, **kwargs)
Adds a breadcrumb. The breadcrumbs are a dictionary with the
data as the sentry v7/v8 protocol expects. hint is an optional
value that can be used by before_breadcrumb to customize the
breadcrumbs that are emitted.
def bind_client(
self, new)
Binds a new client to the hub.
def bind_client(self, new): # type: (Optional[Client]) -> None """Binds a new client to the hub.""" top = self._stack[-1] self._stack[-1] = (new, top[1])
def capture_event(
self, event, hint=None)
Captures an event. The return value is the ID of the event.
The event is a dictionary following the Sentry v7/v8 protocol specification. Optionally an event hint dict can be passed that is used by processors to extract additional information from it. Typically the event hint object would contain exception information.
def capture_event(self, event, hint=None): # type: (Event, Optional[Hint]) -> Optional[str] """Captures an event. The return value is the ID of the event. The event is a dictionary following the Sentry v7/v8 protocol specification. Optionally an event hint dict can be passed that is used by processors to extract additional information from it. Typically the event hint object would contain exception information. """ client, scope = self._stack[-1] if client is not None: rv = client.capture_event(event, hint, scope) if rv is not None: self._last_event_id = rv return rv return None
def capture_exception(
self, error=None)
Captures an exception.
The argument passed can be None in which case the last exception
will be reported, otherwise an exception object or an exc_info
tuple.
def capture_exception(self, error=None): # type: (Optional[BaseException]) -> Optional[str] """Captures an exception. The argument passed can be `None` in which case the last exception will be reported, otherwise an exception object or an `exc_info` tuple. """ client = self.client if client is None: return None if error is None: exc_info = sys.exc_info() else: exc_info = exc_info_from_error(error) event, hint = event_from_exception(exc_info, client_options=client.options) try: return self.capture_event(event, hint=hint) except Exception: self._capture_internal_exception(sys.exc_info()) return None
def capture_message(
self, message, level=None)
Captures a message. The message is just a string. If no level
is provided the default level is info.
def capture_message(self, message, level=None): # type: (str, Optional[str]) -> Optional[str] """Captures a message. The message is just a string. If no level is provided the default level is `info`. """ if self.client is None: return None if level is None: level = "info" return self.capture_event({"message": message, "level": level})
def configure_scope(
self, callback=None)
Reconfigures the scope.
def configure_scope( # noqa self, callback=None # type: Optional[Callable[[Scope], None]] ): # noqa # type: (...) -> Optional[ContextManager[Scope]] """Reconfigures the scope.""" client, scope = self._stack[-1] if callback is not None: if client is not None: callback(scope) return None @contextmanager def inner(): # type: () -> Generator[Scope, None, None] if client is not None: yield scope else: yield Scope() return inner()
def flush(
self, timeout=None, callback=None)
Alias for self.client.flush
def flush(self, timeout=None, callback=None): # type: (Optional[float], Optional[Callable[[int, float], None]]) -> None """Alias for self.client.flush""" client, scope = self._stack[-1] if client is not None: return client.flush(timeout=timeout, callback=callback)
def get_integration(
self, name_or_class)
Returns the integration for this hub by name or class. If there
is no client bound or the client does not have that integration
then None is returned.
If the return value is not None the hub is guaranteed to have a
client attached.
def get_integration(self, name_or_class): # type: (Union[str, Type[Integration]]) -> Any """Returns the integration for this hub by name or class. If there is no client bound or the client does not have that integration then `None` is returned. If the return value is not `None` the hub is guaranteed to have a client attached. """ if isinstance(name_or_class, str): integration_name = name_or_class elif name_or_class.identifier is not None: integration_name = name_or_class.identifier else: raise ValueError("Integration has no name") client = self._stack[-1][0] if client is not None: rv = client.integrations.get(integration_name) if rv is not None: return rv if _initial_client is not None: initial_client = _initial_client() else: initial_client = None if ( initial_client is not None and initial_client is not client and initial_client.integrations.get(integration_name) is not None ): warning = ( "Integration %r attempted to run but it was only " "enabled on init() but not the client that " "was bound to the current flow. Earlier versions of " "the SDK would consider these integrations enabled but " "this is no longer the case." % (name_or_class,) ) warn(Warning(warning), stacklevel=3) logger.warning(warning)
def iter_trace_propagation_headers(
self)
def iter_trace_propagation_headers(self): # type: () -> Generator[Tuple[str, str], None, None] client, scope = self._stack[-1] if scope._span is None: return propagate_traces = client and client.options["propagate_traces"] if not propagate_traces: return for item in scope._span.iter_headers(): yield item
def last_event_id(
self)
Returns the last event ID.
def last_event_id(self): # type: () -> Optional[str] """Returns the last event ID.""" return self._last_event_id
def pop_scope_unsafe(
self)
Pops a scope layer from the stack. Try to use the context manager
push_scope() instead.
def pop_scope_unsafe(self): # type: () -> Tuple[Optional[Client], Scope] """Pops a scope layer from the stack. Try to use the context manager `push_scope()` instead.""" rv = self._stack.pop() assert self._stack, "stack must have at least one layer" return rv
def push_scope(
self, callback=None)
Pushes a new layer on the scope stack. Returns a context manager that should be used to pop the scope again. Alternatively a callback can be provided that is executed in the context of the scope.
def push_scope( # noqa self, callback=None # type: Optional[Callable[[Scope], None]] ): # type: (...) -> Optional[ContextManager[Scope]] """Pushes a new layer on the scope stack. Returns a context manager that should be used to pop the scope again. Alternatively a callback can be provided that is executed in the context of the scope. """ if callback is not None: with self.push_scope() as scope: callback(scope) return None client, scope = self._stack[-1] new_layer = (client, copy.copy(scope)) self._stack.append(new_layer) return _ScopeManager(self)
def run(
self, callback)
Runs a callback in the context of the hub. Alternatively the with statement can be used on the hub directly.
def run(self, callback): # type: (Callable[[], T]) -> T """Runs a callback in the context of the hub. Alternatively the with statement can be used on the hub directly. """ with self: return callback()
def scope(
self, callback=None)
Pushes a new layer on the scope stack. Returns a context manager that should be used to pop the scope again. Alternatively a callback can be provided that is executed in the context of the scope.
def push_scope( # noqa self, callback=None # type: Optional[Callable[[Scope], None]] ): # type: (...) -> Optional[ContextManager[Scope]] """Pushes a new layer on the scope stack. Returns a context manager that should be used to pop the scope again. Alternatively a callback can be provided that is executed in the context of the scope. """ if callback is not None: with self.push_scope() as scope: callback(scope) return None client, scope = self._stack[-1] new_layer = (client, copy.copy(scope)) self._stack.append(new_layer) return _ScopeManager(self)
class Scope
The scope holds extra information that should be sent with all events that belong to it.
class Scope(object): """The scope holds extra information that should be sent with all events that belong to it. """ __slots__ = ( "_level", "_name", "_fingerprint", "_transaction", "_user", "_tags", "_contexts", "_extras", "_breadcrumbs", "_event_processors", "_error_processors", "_should_capture", "_span", ) def __init__(self): # type: () -> None self._event_processors = [] # type: List[EventProcessor] self._error_processors = [] # type: List[ErrorProcessor] self._name = None # type: Optional[str] self.clear() @_attr_setter def level(self, value): """When set this overrides the level.""" self._level = value @_attr_setter def fingerprint(self, value): """When set this overrides the default fingerprint.""" self._fingerprint = value @_attr_setter def transaction(self, value): """When set this forces a specific transaction name to be set.""" self._transaction = value @_attr_setter def user(self, value): """When set a specific user is bound to the scope.""" self._user = value def set_span_context(self, span_context): """Sets the span context.""" self._span = span_context def set_tag(self, key, value): # type: (str, Any) -> None """Sets a tag for a key to a specific value.""" self._tags[key] = value def remove_tag(self, key): # type: (str) -> None """Removes a specific tag.""" self._tags.pop(key, None) def set_context(self, key, value): # type: (str, Any) -> None """Binds a context at a certain key to a specific value.""" self._contexts[key] = value def remove_context(self, key): # type: (str) -> None """Removes a context.""" self._contexts.pop(key, None) def set_extra(self, key, value): # type: (str, Any) -> None """Sets an extra key to a specific value.""" self._extras[key] = value def remove_extra(self, key): # type: (str) -> None """Removes a specific extra key.""" self._extras.pop(key, None) def clear(self): # type: () -> None """Clears the entire scope.""" self._level = None self._fingerprint = None self._transaction = None self._user = None self._tags = {} # type: Dict[str, Any] self._contexts = {} # type: Dict[str, Dict[str, Any]] self._extras = {} # type: Dict[str, Any] self.clear_breadcrumbs() self._should_capture = True self._span = None def clear_breadcrumbs(self): # type: () -> None """Clears breadcrumb buffer.""" self._breadcrumbs = deque() # type: Deque[Breadcrumb] def add_event_processor(self, func): # type: (EventProcessor) -> None """"Register a scope local event processor on the scope. This function behaves like `before_send.` """ self._event_processors.append(func) def add_error_processor(self, func, cls=None): # type: (ErrorProcessor, Optional[type]) -> None """"Register a scope local error processor on the scope. The error processor works similar to an event processor but is invoked with the original exception info triple as second argument. """ if cls is not None: cls_ = cls # For mypy. real_func = func def func(event, exc_info): try: is_inst = isinstance(exc_info[1], cls_) except Exception: is_inst = False if is_inst: return real_func(event, exc_info) return event self._error_processors.append(func) @_disable_capture def apply_to_event(self, event, hint): # type: (Event, Hint) -> Optional[Event] """Applies the information contained on the scope to the given event.""" def _drop(event, cause, ty): # type: (Dict[str, Any], Any, str) -> Optional[Any] logger.info("%s (%s) dropped event (%s)", ty, cause, event) return None if self._level is not None: event["level"] = self._level event.setdefault("breadcrumbs", []).extend(self._breadcrumbs) if event.get("user") is None and self._user is not None: event["user"] = self._user if event.get("transaction") is None and self._transaction is not None: event["transaction"] = self._transaction if event.get("fingerprint") is None and self._fingerprint is not None: event["fingerprint"] = self._fingerprint if self._extras: event.setdefault("extra", {}).update(self._extras) if self._tags: event.setdefault("tags", {}).update(self._tags) if self._contexts: event.setdefault("contexts", {}).update(self._contexts) if self._span is not None: event.setdefault("contexts", {})["trace"] = { "trace_id": self._span.trace_id, "span_id": self._span.span_id, } exc_info = hint.get("exc_info") if exc_info is not None: for error_processor in self._error_processors: new_event = error_processor(event, exc_info) if new_event is None: return _drop(event, error_processor, "error processor") event = new_event for event_processor in chain(global_event_processors, self._event_processors): new_event = event with capture_internal_exceptions(): new_event = event_processor(event, hint) if new_event is None: return _drop(event, event_processor, "event processor") event = new_event return event def __copy__(self): # type: () -> Scope rv = object.__new__(self.__class__) # type: Scope rv._level = self._level rv._name = self._name rv._fingerprint = self._fingerprint rv._transaction = self._transaction rv._user = self._user rv._tags = dict(self._tags) rv._contexts = dict(self._contexts) rv._extras = dict(self._extras) rv._breadcrumbs = copy(self._breadcrumbs) rv._event_processors = list(self._event_processors) rv._error_processors = list(self._error_processors) rv._should_capture = self._should_capture rv._span = self._span return rv def __repr__(self): # type: () -> str return "<%s id=%s name=%s>" % ( self.__class__.__name__, hex(id(self)), self._name, )
Ancestors (in MRO)
- Scope
- __builtin__.object
Instance variables
var fingerprint
When set this overrides the default fingerprint.
var level
When set this overrides the level.
var transaction
When set this forces a specific transaction name to be set.
var user
When set a specific user is bound to the scope.
Methods
def __init__(
self)
def __init__(self): # type: () -> None self._event_processors = [] # type: List[EventProcessor] self._error_processors = [] # type: List[ErrorProcessor] self._name = None # type: Optional[str] self.clear()
def add_error_processor(
self, func, cls=None)
"Register a scope local error processor on the scope.
The error processor works similar to an event processor but is invoked with the original exception info triple as second argument.
def add_error_processor(self, func, cls=None): # type: (ErrorProcessor, Optional[type]) -> None """"Register a scope local error processor on the scope. The error processor works similar to an event processor but is invoked with the original exception info triple as second argument. """ if cls is not None: cls_ = cls # For mypy. real_func = func def func(event, exc_info): try: is_inst = isinstance(exc_info[1], cls_) except Exception: is_inst = False if is_inst: return real_func(event, exc_info) return event self._error_processors.append(func)
def add_event_processor(
self, func)
"Register a scope local event processor on the scope.
This function behaves like before_send.
def add_event_processor(self, func): # type: (EventProcessor) -> None """"Register a scope local event processor on the scope. This function behaves like `before_send.` """ self._event_processors.append(func)
def apply_to_event(
self, *args, **kwargs)
Applies the information contained on the scope to the given event.
@wraps(fn) def wrapper(self, *args, **kwargs): # type: (Any, *Dict[str, Any], **Any) -> Any if not self._should_capture: return try: self._should_capture = False return fn(self, *args, **kwargs) finally: self._should_capture = True
def clear(
self)
Clears the entire scope.
def clear(self): # type: () -> None """Clears the entire scope.""" self._level = None self._fingerprint = None self._transaction = None self._user = None self._tags = {} # type: Dict[str, Any] self._contexts = {} # type: Dict[str, Dict[str, Any]] self._extras = {} # type: Dict[str, Any] self.clear_breadcrumbs() self._should_capture = True self._span = None
def remove_context(
self, key)
Removes a context.
def remove_context(self, key): # type: (str) -> None """Removes a context.""" self._contexts.pop(key, None)
def remove_extra(
self, key)
Removes a specific extra key.
def remove_extra(self, key): # type: (str) -> None """Removes a specific extra key.""" self._extras.pop(key, None)
def remove_tag(
self, key)
Removes a specific tag.
def remove_tag(self, key): # type: (str) -> None """Removes a specific tag.""" self._tags.pop(key, None)
def set_context(
self, key, value)
Binds a context at a certain key to a specific value.
def set_context(self, key, value): # type: (str, Any) -> None """Binds a context at a certain key to a specific value.""" self._contexts[key] = value
def set_extra(
self, key, value)
Sets an extra key to a specific value.
def set_extra(self, key, value): # type: (str, Any) -> None """Sets an extra key to a specific value.""" self._extras[key] = value
def set_span_context(
self, span_context)
Sets the span context.
def set_span_context(self, span_context): """Sets the span context.""" self._span = span_context
def set_tag(
self, key, value)
Sets a tag for a key to a specific value.
def set_tag(self, key, value): # type: (str, Any) -> None """Sets a tag for a key to a specific value.""" self._tags[key] = value
class Transport
Baseclass for all transports.
A transport is used to send an event to sentry.
class Transport(object): """Baseclass for all transports. A transport is used to send an event to sentry. """ parsed_dsn = None # type: Optional[Dsn] def __init__(self, options=None): # type: (Optional[ClientOptions]) -> None self.options = options if options and options["dsn"] is not None and options["dsn"]: self.parsed_dsn = Dsn(options["dsn"]) else: self.parsed_dsn = None def capture_event(self, event): # type: (Event) -> None """This gets invoked with the event dictionary when an event should be sent to sentry. """ raise NotImplementedError() def flush(self, timeout, callback=None): # type: (float, Optional[Any]) -> None """Wait `timeout` seconds for the current events to be sent out.""" pass def kill(self): # type: () -> None """Forcefully kills the transport.""" pass def __del__(self): # type: () -> None try: self.kill() except Exception: pass
Ancestors (in MRO)
- Transport
- __builtin__.object
Class variables
var parsed_dsn
Instance variables
var options
Methods
def __init__(
self, options=None)
def __init__(self, options=None): # type: (Optional[ClientOptions]) -> None self.options = options if options and options["dsn"] is not None and options["dsn"]: self.parsed_dsn = Dsn(options["dsn"]) else: self.parsed_dsn = None
def capture_event(
self, event)
This gets invoked with the event dictionary when an event should be sent to sentry.
def capture_event(self, event): # type: (Event) -> None """This gets invoked with the event dictionary when an event should be sent to sentry. """ raise NotImplementedError()
def flush(
self, timeout, callback=None)
Wait timeout seconds for the current events to be sent out.
def flush(self, timeout, callback=None): # type: (float, Optional[Any]) -> None """Wait `timeout` seconds for the current events to be sent out.""" pass
def kill(
self)
Forcefully kills the transport.
def kill(self): # type: () -> None """Forcefully kills the transport.""" pass
Sub-modules
This package