Pyramid-Temporal integration library.

This package provides automatic transaction management for Temporal activities using pyramid_tm, similar to how it works for web requests.

Main components: - Worker: Pyramid-aware Temporal Worker with automatic context binding - PyramidEnvironment: Wrapper for Pyramid bootstrap environment - activity: Decorator module for defining pyramid-temporal activities - ActivityContext: Context object providing real Pyramid requests to activities

Activities receive real Pyramid Request objects (via Pyramid's request factory), so all request methods configured via add_request_method work automatically (dbsession, tm, etc.).

Example

from pyramid.paster import bootstrap from pyramid_temporal import Worker, activity, ActivityContext, PyramidEnvironment

@activity.defn async def enrich_user(context: ActivityContext, user_id: int) -> bool: # Real Pyramid request with all configured methods session = context.request.dbsession user = session.query(User).get(user_id) user.enriched = True return True

In worker setup:

env = PyramidEnvironment.from_bootstrap(bootstrap('development.ini')) worker = Worker( client, env, task_queue="my-queue", activities=[enrich_user], workflows=[MyWorkflow], ) await worker.run()

ActivityContext

Context object providing Pyramid integration for Temporal activities.

This is the main context object passed to pyramid-temporal activities. It provides access to the Pyramid environment, registry, and creates real Pyramid requests for each activity execution using Pyramid's request factory and request extension APIs.

The request has all the same properties and methods as a web request, including any configured via add_request_method (like dbsession, tm, etc.).

Example

@activity.defn async def my_activity(context: ActivityContext, user_id: int) -> bool: # Real Pyramid request with all configured methods session = context.request.dbsession user = session.query(User).get(user_id) return user is not None

Source code in pyramid_temporal/context.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
class ActivityContext:
    """Context object providing Pyramid integration for Temporal activities.

    This is the main context object passed to pyramid-temporal activities.
    It provides access to the Pyramid environment, registry, and creates
    real Pyramid requests for each activity execution using Pyramid's
    request factory and request extension APIs.

    The request has all the same properties and methods as a web request,
    including any configured via add_request_method (like dbsession, tm, etc.).

    Example:
        @activity.defn
        async def my_activity(context: ActivityContext, user_id: int) -> bool:
            # Real Pyramid request with all configured methods
            session = context.request.dbsession
            user = session.query(User).get(user_id)
            return user is not None
    """

    def __init__(self, env: PyramidEnvironment) -> None:
        """Initialize the activity context.

        Args:
            env: PyramidEnvironment instance
        """
        self._env = env
        self._request: Optional[Request] = None
        self._request_context: Optional[RequestContext] = None

    @property
    def env(self) -> PyramidEnvironment:
        """Get the Pyramid environment."""
        return self._env

    @property
    def registry(self) -> "Registry":
        """Get the Pyramid registry."""
        return self._env.registry

    @property
    def settings(self) -> dict:
        """Get application settings (shortcut to registry.settings)."""
        return self._env.registry.settings

    @property
    def request(self) -> Request:
        """Get the current activity request.

        This is a real Pyramid Request object with all configured
        request methods (dbsession, tm, etc.) available.

        Note: This should only be accessed during activity execution,
        after create_request() has been called by the interceptor.

        Raises:
            RuntimeError: If accessed outside of activity execution
        """
        if self._request is None:
            raise RuntimeError(
                "ActivityContext.request accessed outside of activity execution. "
                "The request is only available during activity execution."
            )
        return self._request

    def create_request(self) -> Request:
        """Create a new Pyramid Request for an activity execution.

        This is called by the interceptor at the start of each activity.
        It uses Pyramid's request factory to create a real request, applies
        request extensions (add_request_method), and sets up threadlocals.

        Returns:
            A real Pyramid Request instance
        """
        registry = self._env.registry
        request_factory = registry.queryUtility(IRequestFactory, default=Request)
        request = request_factory.blank("/")
        request.registry = registry

        if self._env.request is not None:
            request.environ.update(self._env.request.environ)

        self._request_context = RequestContext(request)
        self._request_context.begin()
        apply_request_extensions(request)

        self._request = request

        logger.debug(
            "Created Pyramid Request for activity (request id: %s)",
            id(self._request),
        )
        return self._request

    def close_request(self) -> None:
        """Close the current activity request and clean up resources.

        This is called by the interceptor after activity execution completes.
        It processes finished callbacks and tears down the threadlocal context.
        """
        if self._request is not None:
            try:
                if self._request.finished_callbacks:
                    self._request._process_finished_callbacks()
                self._request_context.end()
                logger.debug("Closed Pyramid Request context")
            except Exception as e:
                logger.warning("Error closing Pyramid Request context: %s", e)
            finally:
                self._request_context = None
                self._request = None

env property

Get the Pyramid environment.

registry property

Get the Pyramid registry.

request property

Get the current activity request.

This is a real Pyramid Request object with all configured request methods (dbsession, tm, etc.) available.

Note: This should only be accessed during activity execution, after create_request() has been called by the interceptor.

Raises:
  • RuntimeError

    If accessed outside of activity execution

settings property

Get application settings (shortcut to registry.settings).

__init__(env)

Initialize the activity context.

Parameters:
Source code in pyramid_temporal/context.py
43
44
45
46
47
48
49
50
51
def __init__(self, env: PyramidEnvironment) -> None:
    """Initialize the activity context.

    Args:
        env: PyramidEnvironment instance
    """
    self._env = env
    self._request: Optional[Request] = None
    self._request_context: Optional[RequestContext] = None

close_request()

Close the current activity request and clean up resources.

This is called by the interceptor after activity execution completes. It processes finished callbacks and tears down the threadlocal context.

Source code in pyramid_temporal/context.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def close_request(self) -> None:
    """Close the current activity request and clean up resources.

    This is called by the interceptor after activity execution completes.
    It processes finished callbacks and tears down the threadlocal context.
    """
    if self._request is not None:
        try:
            if self._request.finished_callbacks:
                self._request._process_finished_callbacks()
            self._request_context.end()
            logger.debug("Closed Pyramid Request context")
        except Exception as e:
            logger.warning("Error closing Pyramid Request context: %s", e)
        finally:
            self._request_context = None
            self._request = None

create_request()

Create a new Pyramid Request for an activity execution.

This is called by the interceptor at the start of each activity. It uses Pyramid's request factory to create a real request, applies request extensions (add_request_method), and sets up threadlocals.

Returns:
  • Request

    A real Pyramid Request instance

Source code in pyramid_temporal/context.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def create_request(self) -> Request:
    """Create a new Pyramid Request for an activity execution.

    This is called by the interceptor at the start of each activity.
    It uses Pyramid's request factory to create a real request, applies
    request extensions (add_request_method), and sets up threadlocals.

    Returns:
        A real Pyramid Request instance
    """
    registry = self._env.registry
    request_factory = registry.queryUtility(IRequestFactory, default=Request)
    request = request_factory.blank("/")
    request.registry = registry

    if self._env.request is not None:
        request.environ.update(self._env.request.environ)

    self._request_context = RequestContext(request)
    self._request_context.begin()
    apply_request_extensions(request)

    self._request = request

    logger.debug(
        "Created Pyramid Request for activity (request id: %s)",
        id(self._request),
    )
    return self._request

PyramidActivity

Wrapper for pyramid-temporal activities.

This class wraps an activity function and provides the ability to bind it to an ActivityContext for execution.

Source code in pyramid_temporal/activity.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
class PyramidActivity:
    """Wrapper for pyramid-temporal activities.

    This class wraps an activity function and provides the ability to
    bind it to an ActivityContext for execution.
    """

    def __init__(
        self,
        fn: Callable[..., Any],
        name: Optional[str] = None,
        no_thread_cancel_default: bool = False,
    ) -> None:
        """Initialize the pyramid activity wrapper.

        Args:
            fn: The original activity function
            name: Optional custom activity name
            no_thread_cancel_default: Thread cancellation setting
        """
        self._fn = fn
        self._name = name or fn.__name__
        self._no_thread_cancel_default = no_thread_cancel_default

        # Copy function metadata for better debugging
        functools.update_wrapper(self, fn)

        # Mark as pyramid-temporal activity
        setattr(self, PYRAMID_ACTIVITY_MARKER, True)

    @property
    def name(self) -> str:
        """Get the activity name."""
        return self._name

    @property
    def fn(self) -> Callable[..., Any]:
        """Get the original function."""
        return self._fn

    def bind(self, context: ActivityContext) -> Callable[..., Any]:
        """Bind this activity to a context, returning a Temporal-compatible activity.

        This creates a wrapper class that injects the context as the first
        argument when the activity is executed.

        Args:
            context: The ActivityContext to bind to

        Returns:
            A bound activity method that can be registered with Temporal Worker
        """
        fn = self._fn
        name = self._name

        class BoundActivity:
            """Bound activity class for Temporal registration."""

            def __init__(self, ctx: ActivityContext) -> None:
                self.context = ctx

            @temporal_activity.defn(name=name)
            async def execute(self, *args: Any, **kwargs: Any) -> Any:
                """Execute the activity with context injection."""
                return await fn(self.context, *args, **kwargs)

        instance = BoundActivity(context)
        return instance.execute

    def __repr__(self) -> str:
        return f"<PyramidActivity '{self._name}'>"

fn property

Get the original function.

name property

Get the activity name.

__init__(fn, name=None, no_thread_cancel_default=False)

Initialize the pyramid activity wrapper.

Parameters:
  • fn (Callable[..., Any]) –

    The original activity function

  • name (Optional[str], default: None ) –

    Optional custom activity name

  • no_thread_cancel_default (bool, default: False ) –

    Thread cancellation setting

Source code in pyramid_temporal/activity.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def __init__(
    self,
    fn: Callable[..., Any],
    name: Optional[str] = None,
    no_thread_cancel_default: bool = False,
) -> None:
    """Initialize the pyramid activity wrapper.

    Args:
        fn: The original activity function
        name: Optional custom activity name
        no_thread_cancel_default: Thread cancellation setting
    """
    self._fn = fn
    self._name = name or fn.__name__
    self._no_thread_cancel_default = no_thread_cancel_default

    # Copy function metadata for better debugging
    functools.update_wrapper(self, fn)

    # Mark as pyramid-temporal activity
    setattr(self, PYRAMID_ACTIVITY_MARKER, True)

bind(context)

Bind this activity to a context, returning a Temporal-compatible activity.

This creates a wrapper class that injects the context as the first argument when the activity is executed.

Parameters:
Returns:
  • Callable[..., Any]

    A bound activity method that can be registered with Temporal Worker

Source code in pyramid_temporal/activity.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def bind(self, context: ActivityContext) -> Callable[..., Any]:
    """Bind this activity to a context, returning a Temporal-compatible activity.

    This creates a wrapper class that injects the context as the first
    argument when the activity is executed.

    Args:
        context: The ActivityContext to bind to

    Returns:
        A bound activity method that can be registered with Temporal Worker
    """
    fn = self._fn
    name = self._name

    class BoundActivity:
        """Bound activity class for Temporal registration."""

        def __init__(self, ctx: ActivityContext) -> None:
            self.context = ctx

        @temporal_activity.defn(name=name)
        async def execute(self, *args: Any, **kwargs: Any) -> Any:
            """Execute the activity with context injection."""
            return await fn(self.context, *args, **kwargs)

    instance = BoundActivity(context)
    return instance.execute

PyramidEnvironment

Wrapper for Pyramid bootstrap environment.

This class wraps the output of pyramid.paster.bootstrap, providing structured access to the Pyramid application components.

Attributes:
  • registry (Registry) –

    The Pyramid registry

  • app (Optional[Any]) –

    The WSGI application

  • request (Optional[Any]) –

    The base request object

  • root (Optional[Any]) –

    The root object (for traversal-based applications)

Example

from pyramid.paster import bootstrap from pyramid_temporal import PyramidEnvironment

Create from bootstrap output

env_dict = bootstrap('development.ini') env = PyramidEnvironment.from_bootstrap(env_dict)

Access components

settings = env.settings registry = env.registry

Clean up when done

env.close()

Source code in pyramid_temporal/environment.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
class PyramidEnvironment:
    """Wrapper for Pyramid bootstrap environment.

    This class wraps the output of pyramid.paster.bootstrap, providing
    structured access to the Pyramid application components.

    Attributes:
        registry: The Pyramid registry
        app: The WSGI application
        request: The base request object
        root: The root object (for traversal-based applications)

    Example:
        from pyramid.paster import bootstrap
        from pyramid_temporal import PyramidEnvironment

        # Create from bootstrap output
        env_dict = bootstrap('development.ini')
        env = PyramidEnvironment.from_bootstrap(env_dict)

        # Access components
        settings = env.settings
        registry = env.registry

        # Clean up when done
        env.close()
    """

    def __init__(
        self,
        registry: "Registry",
        app: Optional[Any] = None,
        request: Optional[Any] = None,
        root: Optional[Any] = None,
        closer: Optional[Callable[[], None]] = None,
    ) -> None:
        """Initialize the Pyramid environment.

        Args:
            registry: Pyramid registry instance (required)
            app: WSGI application instance
            request: Base request object from bootstrap
            root: Root object for traversal-based applications
            closer: Cleanup callable from bootstrap
        """
        self._registry = registry
        self._app = app
        self._request = request
        self._root = root
        self._closer = closer

        logger.debug("Created PyramidEnvironment with registry: %s", registry)

    @classmethod
    def from_bootstrap(cls, env: dict) -> "PyramidEnvironment":
        """Create a PyramidEnvironment from bootstrap output.

        This is the preferred way to create a PyramidEnvironment when
        using pyramid.paster.bootstrap.

        Args:
            env: Dictionary returned by pyramid.paster.bootstrap()

        Returns:
            PyramidEnvironment instance

        Example:
            from pyramid.paster import bootstrap
            from pyramid_temporal import PyramidEnvironment

            env = PyramidEnvironment.from_bootstrap(bootstrap('development.ini'))
        """
        return cls(
            registry=env["registry"],
            app=env.get("app"),
            request=env.get("request"),
            root=env.get("root"),
            closer=env.get("closer"),
        )

    @property
    def registry(self) -> "Registry":
        """Get the Pyramid registry."""
        return self._registry

    @property
    def app(self) -> Optional[Any]:
        """Get the WSGI application."""
        return self._app

    @property
    def request(self) -> Optional[Any]:
        """Get the base request object from bootstrap."""
        return self._request

    @property
    def root(self) -> Optional[Any]:
        """Get the root object for traversal-based applications."""
        return self._root

    @property
    def settings(self) -> dict:
        """Get application settings (shortcut to registry.settings)."""
        return self._registry.settings

    def close(self) -> None:
        """Clean up resources.

        This calls the closer function from bootstrap to properly
        clean up the Pyramid application.
        """
        if self._closer is not None:
            logger.debug("Closing PyramidEnvironment")
            self._closer()

    def __repr__(self) -> str:
        """Return string representation."""
        return f"<PyramidEnvironment registry={self._registry}>"

app property

Get the WSGI application.

registry property

Get the Pyramid registry.

request property

Get the base request object from bootstrap.

root property

Get the root object for traversal-based applications.

settings property

Get application settings (shortcut to registry.settings).

__init__(registry, app=None, request=None, root=None, closer=None)

Initialize the Pyramid environment.

Parameters:
  • registry (Registry) –

    Pyramid registry instance (required)

  • app (Optional[Any], default: None ) –

    WSGI application instance

  • request (Optional[Any], default: None ) –

    Base request object from bootstrap

  • root (Optional[Any], default: None ) –

    Root object for traversal-based applications

  • closer (Optional[Callable[[], None]], default: None ) –

    Cleanup callable from bootstrap

Source code in pyramid_temporal/environment.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def __init__(
    self,
    registry: "Registry",
    app: Optional[Any] = None,
    request: Optional[Any] = None,
    root: Optional[Any] = None,
    closer: Optional[Callable[[], None]] = None,
) -> None:
    """Initialize the Pyramid environment.

    Args:
        registry: Pyramid registry instance (required)
        app: WSGI application instance
        request: Base request object from bootstrap
        root: Root object for traversal-based applications
        closer: Cleanup callable from bootstrap
    """
    self._registry = registry
    self._app = app
    self._request = request
    self._root = root
    self._closer = closer

    logger.debug("Created PyramidEnvironment with registry: %s", registry)

__repr__()

Return string representation.

Source code in pyramid_temporal/environment.py
131
132
133
def __repr__(self) -> str:
    """Return string representation."""
    return f"<PyramidEnvironment registry={self._registry}>"

close()

Clean up resources.

This calls the closer function from bootstrap to properly clean up the Pyramid application.

Source code in pyramid_temporal/environment.py
121
122
123
124
125
126
127
128
129
def close(self) -> None:
    """Clean up resources.

    This calls the closer function from bootstrap to properly
    clean up the Pyramid application.
    """
    if self._closer is not None:
        logger.debug("Closing PyramidEnvironment")
        self._closer()

from_bootstrap(env) classmethod

Create a PyramidEnvironment from bootstrap output.

This is the preferred way to create a PyramidEnvironment when using pyramid.paster.bootstrap.

Parameters:
  • env (dict) –

    Dictionary returned by pyramid.paster.bootstrap()

Returns:
Example

from pyramid.paster import bootstrap from pyramid_temporal import PyramidEnvironment

env = PyramidEnvironment.from_bootstrap(bootstrap('development.ini'))

Source code in pyramid_temporal/environment.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
@classmethod
def from_bootstrap(cls, env: dict) -> "PyramidEnvironment":
    """Create a PyramidEnvironment from bootstrap output.

    This is the preferred way to create a PyramidEnvironment when
    using pyramid.paster.bootstrap.

    Args:
        env: Dictionary returned by pyramid.paster.bootstrap()

    Returns:
        PyramidEnvironment instance

    Example:
        from pyramid.paster import bootstrap
        from pyramid_temporal import PyramidEnvironment

        env = PyramidEnvironment.from_bootstrap(bootstrap('development.ini'))
    """
    return cls(
        registry=env["registry"],
        app=env.get("app"),
        request=env.get("request"),
        root=env.get("root"),
        closer=env.get("closer"),
    )

PyramidTemporalInterceptor

Bases: Interceptor

Main interceptor class for pyramid-temporal integration.

This is the main entry point for integrating pyramid-temporal transaction management with Temporal workers.

Source code in pyramid_temporal/interceptor.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
class PyramidTemporalInterceptor(Interceptor):
    """Main interceptor class for pyramid-temporal integration.

    This is the main entry point for integrating pyramid-temporal
    transaction management with Temporal workers.
    """

    def __init__(
        self,
        context: Optional["ActivityContext"] = None,
    ) -> None:
        """Initialize the interceptor.

        Args:
            context: ActivityContext for request/session management
        """
        self._context = context
        logger.info("Initialized PyramidTemporalInterceptor with context: %s", "yes" if context else "no")

    def intercept_activity(self, next_interceptor: ActivityInboundInterceptor) -> ActivityInboundInterceptor:
        """Intercept activity execution to add transaction management.

        Args:
            next_interceptor: The next interceptor in the chain

        Returns:
            Transactional activity interceptor
        """
        return TransactionalActivityInterceptor(
            next_interceptor,
            context=self._context,
        )

__init__(context=None)

Initialize the interceptor.

Parameters:
  • context (Optional[ActivityContext], default: None ) –

    ActivityContext for request/session management

Source code in pyramid_temporal/interceptor.py
123
124
125
126
127
128
129
130
131
132
133
def __init__(
    self,
    context: Optional["ActivityContext"] = None,
) -> None:
    """Initialize the interceptor.

    Args:
        context: ActivityContext for request/session management
    """
    self._context = context
    logger.info("Initialized PyramidTemporalInterceptor with context: %s", "yes" if context else "no")

intercept_activity(next_interceptor)

Intercept activity execution to add transaction management.

Parameters:
  • next_interceptor (ActivityInboundInterceptor) –

    The next interceptor in the chain

Returns:
  • ActivityInboundInterceptor

    Transactional activity interceptor

Source code in pyramid_temporal/interceptor.py
135
136
137
138
139
140
141
142
143
144
145
146
147
def intercept_activity(self, next_interceptor: ActivityInboundInterceptor) -> ActivityInboundInterceptor:
    """Intercept activity execution to add transaction management.

    Args:
        next_interceptor: The next interceptor in the chain

    Returns:
        Transactional activity interceptor
    """
    return TransactionalActivityInterceptor(
        next_interceptor,
        context=self._context,
    )

Worker

Pyramid-aware Temporal Worker.

This worker wraps the standard Temporal Worker and provides automatic context binding for pyramid-temporal activities. It also includes transaction management via the PyramidTemporalInterceptor.

Example

from pyramid.paster import bootstrap from pyramid_temporal import Worker, activity, PyramidEnvironment

@activity.defn async def my_activity(context: ActivityContext, user_id: int) -> bool: session = context.request.dbsession # ... do work ... return True

Create environment from bootstrap

env = PyramidEnvironment.from_bootstrap(bootstrap('development.ini'))

worker = Worker( client, env, task_queue="my-queue", activities=[my_activity], workflows=[MyWorkflow], )

Run the worker

await worker.run()

Source code in pyramid_temporal/worker.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
class Worker:
    """Pyramid-aware Temporal Worker.

    This worker wraps the standard Temporal Worker and provides automatic
    context binding for pyramid-temporal activities. It also includes
    transaction management via the PyramidTemporalInterceptor.

    Example:
        from pyramid.paster import bootstrap
        from pyramid_temporal import Worker, activity, PyramidEnvironment

        @activity.defn
        async def my_activity(context: ActivityContext, user_id: int) -> bool:
            session = context.request.dbsession
            # ... do work ...
            return True

        # Create environment from bootstrap
        env = PyramidEnvironment.from_bootstrap(bootstrap('development.ini'))

        worker = Worker(
            client,
            env,
            task_queue="my-queue",
            activities=[my_activity],
            workflows=[MyWorkflow],
        )

        # Run the worker
        await worker.run()
    """

    def __init__(
        self,
        client: Client,
        env: PyramidEnvironment,
        *,
        task_queue: str,
        activities: Sequence[Any] = (),
        workflows: Sequence[type] = (),
        interceptors: Sequence["Interceptor"] = (),
        **kwargs: Any,
    ) -> None:
        """Initialize the Pyramid-aware worker.

        Args:
            client: Temporal client instance
            env: PyramidEnvironment instance (from bootstrap)
            task_queue: Name of the task queue to poll
            activities: List of activities (both pyramid-temporal and plain Temporal)
            workflows: List of workflow classes
            interceptors: Additional interceptors to include (pyramid-temporal
                         interceptor is automatically added)
            **kwargs: Additional arguments passed to Temporal Worker
        """
        self._client = client
        self._env = env
        self._task_queue = task_queue
        self._workflows = workflows
        self._extra_kwargs = kwargs

        # Create the activity context
        self._context = ActivityContext(env=env)

        # Bind activities and separate pyramid vs plain
        self._bound_activities = self._bind_activities(activities)

        # Create interceptors list with our interceptor first
        self._interceptors = self._create_interceptors(interceptors)

        # Create the underlying Temporal worker
        self._worker = self._create_worker()

        logger.info(
            "Created Pyramid Worker for task queue '%s' with %d activities and %d workflows",
            task_queue,
            len(self._bound_activities),
            len(workflows),
        )

    def _bind_activities(self, activities: Sequence[Any]) -> list:
        """Bind pyramid-temporal activities to context, pass through others.

        Args:
            activities: List of activities (mixed pyramid and plain)

        Returns:
            List of bound/processed activities
        """
        bound = []
        for act in activities:
            if is_pyramid_activity(act):
                # Bind pyramid-temporal activity to context
                pyramid_act: PyramidActivity = act
                bound_act = pyramid_act.bind(self._context)
                logger.debug("Bound pyramid activity: %s", pyramid_act.name)
                bound.append(bound_act)
            else:
                # Pass through plain Temporal activity
                logger.debug("Passing through plain activity: %s", getattr(act, "__name__", act))
                bound.append(act)
        return bound

    def _create_interceptors(self, extra_interceptors: Sequence["Interceptor"]) -> list["Interceptor"]:
        """Create the interceptors list with pyramid-temporal interceptor.

        Args:
            extra_interceptors: Additional interceptors from user

        Returns:
            List of interceptors with PyramidTemporalInterceptor included
        """
        # Create our interceptor with context reference
        pyramid_interceptor = PyramidTemporalInterceptor(
            context=self._context,
        )

        # Our interceptor first, then user's interceptors
        return [pyramid_interceptor, *extra_interceptors]

    def _create_worker(self) -> TemporalWorker:
        """Create the underlying Temporal worker.

        Returns:
            Configured Temporal Worker instance
        """
        return TemporalWorker(
            self._client,
            task_queue=self._task_queue,
            activities=self._bound_activities,
            workflows=list(self._workflows),
            interceptors=self._interceptors,
            **self._extra_kwargs,
        )

    @property
    def env(self) -> PyramidEnvironment:
        """Get the Pyramid environment."""
        return self._env

    @property
    def context(self) -> ActivityContext:
        """Get the activity context."""
        return self._context

    @property
    def task_queue(self) -> str:
        """Get the task queue name."""
        return self._task_queue

    async def run(self) -> None:
        """Run the worker until shutdown is requested.

        This is the main entry point for running the worker.
        It will poll the task queue and execute activities/workflows.
        """
        logger.info("Starting Pyramid Worker on task queue '%s'", self._task_queue)
        await self._worker.run()

    async def __aenter__(self) -> "Worker":
        """Async context manager entry."""
        await self._worker.__aenter__()
        return self

    async def __aexit__(self, *args: Any) -> None:
        """Async context manager exit."""
        await self._worker.__aexit__(*args)

context property

Get the activity context.

env property

Get the Pyramid environment.

task_queue property

Get the task queue name.

__aenter__() async

Async context manager entry.

Source code in pyramid_temporal/worker.py
183
184
185
186
async def __aenter__(self) -> "Worker":
    """Async context manager entry."""
    await self._worker.__aenter__()
    return self

__aexit__(*args) async

Async context manager exit.

Source code in pyramid_temporal/worker.py
188
189
190
async def __aexit__(self, *args: Any) -> None:
    """Async context manager exit."""
    await self._worker.__aexit__(*args)

__init__(client, env, *, task_queue, activities=(), workflows=(), interceptors=(), **kwargs)

Initialize the Pyramid-aware worker.

Parameters:
  • client (Client) –

    Temporal client instance

  • env (PyramidEnvironment) –

    PyramidEnvironment instance (from bootstrap)

  • task_queue (str) –

    Name of the task queue to poll

  • activities (Sequence[Any], default: () ) –

    List of activities (both pyramid-temporal and plain Temporal)

  • workflows (Sequence[type], default: () ) –

    List of workflow classes

  • interceptors (Sequence[Interceptor], default: () ) –

    Additional interceptors to include (pyramid-temporal interceptor is automatically added)

  • **kwargs (Any, default: {} ) –

    Additional arguments passed to Temporal Worker

Source code in pyramid_temporal/worker.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def __init__(
    self,
    client: Client,
    env: PyramidEnvironment,
    *,
    task_queue: str,
    activities: Sequence[Any] = (),
    workflows: Sequence[type] = (),
    interceptors: Sequence["Interceptor"] = (),
    **kwargs: Any,
) -> None:
    """Initialize the Pyramid-aware worker.

    Args:
        client: Temporal client instance
        env: PyramidEnvironment instance (from bootstrap)
        task_queue: Name of the task queue to poll
        activities: List of activities (both pyramid-temporal and plain Temporal)
        workflows: List of workflow classes
        interceptors: Additional interceptors to include (pyramid-temporal
                     interceptor is automatically added)
        **kwargs: Additional arguments passed to Temporal Worker
    """
    self._client = client
    self._env = env
    self._task_queue = task_queue
    self._workflows = workflows
    self._extra_kwargs = kwargs

    # Create the activity context
    self._context = ActivityContext(env=env)

    # Bind activities and separate pyramid vs plain
    self._bound_activities = self._bind_activities(activities)

    # Create interceptors list with our interceptor first
    self._interceptors = self._create_interceptors(interceptors)

    # Create the underlying Temporal worker
    self._worker = self._create_worker()

    logger.info(
        "Created Pyramid Worker for task queue '%s' with %d activities and %d workflows",
        task_queue,
        len(self._bound_activities),
        len(workflows),
    )

run() async

Run the worker until shutdown is requested.

This is the main entry point for running the worker. It will poll the task queue and execute activities/workflows.

Source code in pyramid_temporal/worker.py
174
175
176
177
178
179
180
181
async def run(self) -> None:
    """Run the worker until shutdown is requested.

    This is the main entry point for running the worker.
    It will poll the task queue and execute activities/workflows.
    """
    logger.info("Starting Pyramid Worker on task queue '%s'", self._task_queue)
    await self._worker.run()

defn(fn=None, *, name=None, no_thread_cancel_default=False)

Decorator to define a pyramid-temporal activity.

This decorator marks a function as a pyramid-temporal activity that will receive an ActivityContext as its first argument. The context is automatically injected when the activity is executed via the pyramid-temporal Worker.

The decorated function should have ActivityContext as its first parameter:

@activity.defn
async def my_activity(context: ActivityContext, user_id: int) -> bool:
    session = context.request.dbsession
    # ... do work ...
    return True
Parameters:
  • fn (Optional[F], default: None ) –

    The activity function (when used without parentheses)

  • name (Optional[str], default: None ) –

    Optional custom name for the activity. Defaults to function name.

  • no_thread_cancel_default (bool, default: False ) –

    Whether to disable thread cancellation by default.

Returns:
  • Any

    A decorated activity that can be bound to a context via the Worker.

Example

@activity.defn async def process_order(context: ActivityContext, order_id: int) -> bool: session = context.request.dbsession order = session.query(Order).get(order_id) # Process the order... return True

Or with custom name:

@activity.defn(name="custom-activity-name") async def my_activity(context: ActivityContext, data: str) -> str: return data.upper()

Source code in pyramid_temporal/activity.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def defn(
    fn: Optional[F] = None,
    *,
    name: Optional[str] = None,
    no_thread_cancel_default: bool = False,
) -> Any:
    """Decorator to define a pyramid-temporal activity.

    This decorator marks a function as a pyramid-temporal activity that
    will receive an ActivityContext as its first argument. The context
    is automatically injected when the activity is executed via the
    pyramid-temporal Worker.

    The decorated function should have ActivityContext as its first parameter:

        @activity.defn
        async def my_activity(context: ActivityContext, user_id: int) -> bool:
            session = context.request.dbsession
            # ... do work ...
            return True

    Args:
        fn: The activity function (when used without parentheses)
        name: Optional custom name for the activity. Defaults to function name.
        no_thread_cancel_default: Whether to disable thread cancellation by default.

    Returns:
        A decorated activity that can be bound to a context via the Worker.

    Example:
        @activity.defn
        async def process_order(context: ActivityContext, order_id: int) -> bool:
            session = context.request.dbsession
            order = session.query(Order).get(order_id)
            # Process the order...
            return True

        # Or with custom name:
        @activity.defn(name="custom-activity-name")
        async def my_activity(context: ActivityContext, data: str) -> str:
            return data.upper()
    """

    def decorator(func: F) -> "PyramidActivity":
        activity = PyramidActivity(
            func,
            name=name,
            no_thread_cancel_default=no_thread_cancel_default,
        )
        return activity

    # Handle both @activity.defn and @activity.defn() syntax
    if fn is not None:
        return decorator(fn)
    return decorator

includeme(config)

Pyramid configuration include function.

This function can be called via config.include('pyramid_temporal') to register pyramid-temporal with a Pyramid application.

Configuration settings: - pyramid_temporal.temporal_host: Temporal server host (default: localhost:7233) - pyramid_temporal.log_level: Logging level (default: INFO) - pyramid_temporal.auto_connect: Auto-connect to Temporal on startup (default: True)

Parameters:
  • config (Configurator) –

    Pyramid configurator instance

Example
from pyramid.config import Configurator

def main():
    config = Configurator()
    config.include('pyramid_temporal')

    # Optional: Configure Temporal connection
    config.registry.settings['pyramid_temporal.temporal_host'] = 'localhost:7233'

    # ... rest of configuration
Source code in pyramid_temporal/__init__.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def includeme(config: "Configurator") -> None:
    """Pyramid configuration include function.

    This function can be called via config.include('pyramid_temporal')
    to register pyramid-temporal with a Pyramid application.

    Configuration settings:
    - pyramid_temporal.temporal_host: Temporal server host (default: localhost:7233)
    - pyramid_temporal.log_level: Logging level (default: INFO)
    - pyramid_temporal.auto_connect: Auto-connect to Temporal on startup (default: True)

    Args:
        config: Pyramid configurator instance

    Example:
        ```python
        from pyramid.config import Configurator

        def main():
            config = Configurator()
            config.include('pyramid_temporal')

            # Optional: Configure Temporal connection
            config.registry.settings['pyramid_temporal.temporal_host'] = 'localhost:7233'

            # ... rest of configuration
        ```
    """
    logger.info("Including pyramid-temporal configuration")

    # Get settings
    settings = config.get_settings()

    # Set default settings for pyramid-temporal if they don't exist
    if "pyramid_temporal.log_level" not in settings:
        settings["pyramid_temporal.log_level"] = "INFO"

    if "pyramid_temporal.temporal_host" not in settings:
        settings["pyramid_temporal.temporal_host"] = "localhost:7233"

    if "pyramid_temporal.auto_connect" not in settings:
        settings["pyramid_temporal.auto_connect"] = "true"

    # Configure logging level
    log_level = settings.get("pyramid_temporal.log_level", "INFO").upper()
    pyramid_temporal_logger = logging.getLogger("pyramid_temporal")
    pyramid_temporal_logger.setLevel(getattr(logging, log_level, logging.INFO))

    # pyramid-temporal configuration is now complete

    # Setup Temporal client if auto_connect is enabled
    auto_connect = settings.get("pyramid_temporal.auto_connect", "true").lower() == "true"

    if auto_connect:
        _setup_temporal_client(config, settings)

    # Add request method to get Temporal client
    config.add_request_method(_get_temporal_client, "temporal_client", reify=True)

    logger.info("pyramid-temporal configuration complete")

is_pyramid_activity(obj)

Check if an object is a pyramid-temporal activity.

Parameters:
  • obj (Any) –

    Object to check

Returns:
  • bool

    True if the object is a pyramid-temporal activity

Source code in pyramid_temporal/activity.py
154
155
156
157
158
159
160
161
162
163
def is_pyramid_activity(obj: Any) -> bool:
    """Check if an object is a pyramid-temporal activity.

    Args:
        obj: Object to check

    Returns:
        True if the object is a pyramid-temporal activity
    """
    return getattr(obj, PYRAMID_ACTIVITY_MARKER, False)