Pyramid-Temporal integration library.
This package provides automatic transaction management for Temporal activities
using pyramid_tm, similar to how it works for web requests.
Main components:
- Worker: Pyramid-aware Temporal Worker with automatic context binding
- PyramidEnvironment: Wrapper for Pyramid bootstrap environment
- activity: Decorator module for defining pyramid-temporal activities
- ActivityContext: Context object providing real Pyramid requests to activities
Activities receive real Pyramid Request objects (via Pyramid's request factory),
so all request methods configured via add_request_method work automatically
(dbsession, tm, etc.).
Example
from pyramid.paster import bootstrap
from pyramid_temporal import Worker, activity, ActivityContext, PyramidEnvironment
@activity.defn
async def enrich_user(context: ActivityContext, user_id: int) -> bool:
# Real Pyramid request with all configured methods
session = context.request.dbsession
user = session.query(User).get(user_id)
user.enriched = True
return True
In worker setup:
env = PyramidEnvironment.from_bootstrap(bootstrap('development.ini'))
worker = Worker(
client,
env,
task_queue="my-queue",
activities=[enrich_user],
workflows=[MyWorkflow],
)
await worker.run()
ActivityContext
Context object providing Pyramid integration for Temporal activities.
This is the main context object passed to pyramid-temporal activities.
It provides access to the Pyramid environment, registry, and creates
real Pyramid requests for each activity execution using Pyramid's
request factory and request extension APIs.
The request has all the same properties and methods as a web request,
including any configured via add_request_method (like dbsession, tm, etc.).
Example
@activity.defn
async def my_activity(context: ActivityContext, user_id: int) -> bool:
# Real Pyramid request with all configured methods
session = context.request.dbsession
user = session.query(User).get(user_id)
return user is not None
Source code in pyramid_temporal/context.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134 | class ActivityContext:
"""Context object providing Pyramid integration for Temporal activities.
This is the main context object passed to pyramid-temporal activities.
It provides access to the Pyramid environment, registry, and creates
real Pyramid requests for each activity execution using Pyramid's
request factory and request extension APIs.
The request has all the same properties and methods as a web request,
including any configured via add_request_method (like dbsession, tm, etc.).
Example:
@activity.defn
async def my_activity(context: ActivityContext, user_id: int) -> bool:
# Real Pyramid request with all configured methods
session = context.request.dbsession
user = session.query(User).get(user_id)
return user is not None
"""
def __init__(self, env: PyramidEnvironment) -> None:
"""Initialize the activity context.
Args:
env: PyramidEnvironment instance
"""
self._env = env
self._request: Optional[Request] = None
self._request_context: Optional[RequestContext] = None
@property
def env(self) -> PyramidEnvironment:
"""Get the Pyramid environment."""
return self._env
@property
def registry(self) -> "Registry":
"""Get the Pyramid registry."""
return self._env.registry
@property
def settings(self) -> dict:
"""Get application settings (shortcut to registry.settings)."""
return self._env.registry.settings
@property
def request(self) -> Request:
"""Get the current activity request.
This is a real Pyramid Request object with all configured
request methods (dbsession, tm, etc.) available.
Note: This should only be accessed during activity execution,
after create_request() has been called by the interceptor.
Raises:
RuntimeError: If accessed outside of activity execution
"""
if self._request is None:
raise RuntimeError(
"ActivityContext.request accessed outside of activity execution. "
"The request is only available during activity execution."
)
return self._request
def create_request(self) -> Request:
"""Create a new Pyramid Request for an activity execution.
This is called by the interceptor at the start of each activity.
It uses Pyramid's request factory to create a real request, applies
request extensions (add_request_method), and sets up threadlocals.
Returns:
A real Pyramid Request instance
"""
registry = self._env.registry
request_factory = registry.queryUtility(IRequestFactory, default=Request)
request = request_factory.blank("/")
request.registry = registry
if self._env.request is not None:
request.environ.update(self._env.request.environ)
self._request_context = RequestContext(request)
self._request_context.begin()
apply_request_extensions(request)
self._request = request
logger.debug(
"Created Pyramid Request for activity (request id: %s)",
id(self._request),
)
return self._request
def close_request(self) -> None:
"""Close the current activity request and clean up resources.
This is called by the interceptor after activity execution completes.
It processes finished callbacks and tears down the threadlocal context.
"""
if self._request is not None:
try:
if self._request.finished_callbacks:
self._request._process_finished_callbacks()
self._request_context.end()
logger.debug("Closed Pyramid Request context")
except Exception as e:
logger.warning("Error closing Pyramid Request context: %s", e)
finally:
self._request_context = None
self._request = None
|
env
property
Get the Pyramid environment.
registry
property
Get the Pyramid registry.
request
property
Get the current activity request.
This is a real Pyramid Request object with all configured
request methods (dbsession, tm, etc.) available.
Note: This should only be accessed during activity execution,
after create_request() has been called by the interceptor.
| Raises: |
-
RuntimeError
–
If accessed outside of activity execution
|
settings
property
Get application settings (shortcut to registry.settings).
__init__(env)
Initialize the activity context.
Source code in pyramid_temporal/context.py
43
44
45
46
47
48
49
50
51 | def __init__(self, env: PyramidEnvironment) -> None:
"""Initialize the activity context.
Args:
env: PyramidEnvironment instance
"""
self._env = env
self._request: Optional[Request] = None
self._request_context: Optional[RequestContext] = None
|
close_request()
Close the current activity request and clean up resources.
This is called by the interceptor after activity execution completes.
It processes finished callbacks and tears down the threadlocal context.
Source code in pyramid_temporal/context.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134 | def close_request(self) -> None:
"""Close the current activity request and clean up resources.
This is called by the interceptor after activity execution completes.
It processes finished callbacks and tears down the threadlocal context.
"""
if self._request is not None:
try:
if self._request.finished_callbacks:
self._request._process_finished_callbacks()
self._request_context.end()
logger.debug("Closed Pyramid Request context")
except Exception as e:
logger.warning("Error closing Pyramid Request context: %s", e)
finally:
self._request_context = None
self._request = None
|
create_request()
Create a new Pyramid Request for an activity execution.
This is called by the interceptor at the start of each activity.
It uses Pyramid's request factory to create a real request, applies
request extensions (add_request_method), and sets up threadlocals.
| Returns: |
-
Request
–
A real Pyramid Request instance
|
Source code in pyramid_temporal/context.py
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116 | def create_request(self) -> Request:
"""Create a new Pyramid Request for an activity execution.
This is called by the interceptor at the start of each activity.
It uses Pyramid's request factory to create a real request, applies
request extensions (add_request_method), and sets up threadlocals.
Returns:
A real Pyramid Request instance
"""
registry = self._env.registry
request_factory = registry.queryUtility(IRequestFactory, default=Request)
request = request_factory.blank("/")
request.registry = registry
if self._env.request is not None:
request.environ.update(self._env.request.environ)
self._request_context = RequestContext(request)
self._request_context.begin()
apply_request_extensions(request)
self._request = request
logger.debug(
"Created Pyramid Request for activity (request id: %s)",
id(self._request),
)
return self._request
|
PyramidActivity
Wrapper for pyramid-temporal activities.
This class wraps an activity function and provides the ability to
bind it to an ActivityContext for execution.
Source code in pyramid_temporal/activity.py
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151 | class PyramidActivity:
"""Wrapper for pyramid-temporal activities.
This class wraps an activity function and provides the ability to
bind it to an ActivityContext for execution.
"""
def __init__(
self,
fn: Callable[..., Any],
name: Optional[str] = None,
no_thread_cancel_default: bool = False,
) -> None:
"""Initialize the pyramid activity wrapper.
Args:
fn: The original activity function
name: Optional custom activity name
no_thread_cancel_default: Thread cancellation setting
"""
self._fn = fn
self._name = name or fn.__name__
self._no_thread_cancel_default = no_thread_cancel_default
# Copy function metadata for better debugging
functools.update_wrapper(self, fn)
# Mark as pyramid-temporal activity
setattr(self, PYRAMID_ACTIVITY_MARKER, True)
@property
def name(self) -> str:
"""Get the activity name."""
return self._name
@property
def fn(self) -> Callable[..., Any]:
"""Get the original function."""
return self._fn
def bind(self, context: ActivityContext) -> Callable[..., Any]:
"""Bind this activity to a context, returning a Temporal-compatible activity.
This creates a wrapper class that injects the context as the first
argument when the activity is executed.
Args:
context: The ActivityContext to bind to
Returns:
A bound activity method that can be registered with Temporal Worker
"""
fn = self._fn
name = self._name
class BoundActivity:
"""Bound activity class for Temporal registration."""
def __init__(self, ctx: ActivityContext) -> None:
self.context = ctx
@temporal_activity.defn(name=name)
async def execute(self, *args: Any, **kwargs: Any) -> Any:
"""Execute the activity with context injection."""
return await fn(self.context, *args, **kwargs)
instance = BoundActivity(context)
return instance.execute
def __repr__(self) -> str:
return f"<PyramidActivity '{self._name}'>"
|
fn
property
Get the original function.
__init__(fn, name=None, no_thread_cancel_default=False)
Initialize the pyramid activity wrapper.
| Parameters: |
-
fn
(Callable[..., Any])
–
The original activity function
-
name
(Optional[str], default:
None
)
–
Optional custom activity name
-
no_thread_cancel_default
(bool, default:
False
)
–
Thread cancellation setting
|
Source code in pyramid_temporal/activity.py
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109 | def __init__(
self,
fn: Callable[..., Any],
name: Optional[str] = None,
no_thread_cancel_default: bool = False,
) -> None:
"""Initialize the pyramid activity wrapper.
Args:
fn: The original activity function
name: Optional custom activity name
no_thread_cancel_default: Thread cancellation setting
"""
self._fn = fn
self._name = name or fn.__name__
self._no_thread_cancel_default = no_thread_cancel_default
# Copy function metadata for better debugging
functools.update_wrapper(self, fn)
# Mark as pyramid-temporal activity
setattr(self, PYRAMID_ACTIVITY_MARKER, True)
|
bind(context)
Bind this activity to a context, returning a Temporal-compatible activity.
This creates a wrapper class that injects the context as the first
argument when the activity is executed.
| Returns: |
-
Callable[..., Any]
–
A bound activity method that can be registered with Temporal Worker
|
Source code in pyramid_temporal/activity.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148 | def bind(self, context: ActivityContext) -> Callable[..., Any]:
"""Bind this activity to a context, returning a Temporal-compatible activity.
This creates a wrapper class that injects the context as the first
argument when the activity is executed.
Args:
context: The ActivityContext to bind to
Returns:
A bound activity method that can be registered with Temporal Worker
"""
fn = self._fn
name = self._name
class BoundActivity:
"""Bound activity class for Temporal registration."""
def __init__(self, ctx: ActivityContext) -> None:
self.context = ctx
@temporal_activity.defn(name=name)
async def execute(self, *args: Any, **kwargs: Any) -> Any:
"""Execute the activity with context injection."""
return await fn(self.context, *args, **kwargs)
instance = BoundActivity(context)
return instance.execute
|
PyramidEnvironment
Wrapper for Pyramid bootstrap environment.
This class wraps the output of pyramid.paster.bootstrap, providing
structured access to the Pyramid application components.
| Attributes: |
-
registry
(Registry)
–
-
app
(Optional[Any])
–
-
request
(Optional[Any])
–
-
root
(Optional[Any])
–
The root object (for traversal-based applications)
|
Example
from pyramid.paster import bootstrap
from pyramid_temporal import PyramidEnvironment
Create from bootstrap output
env_dict = bootstrap('development.ini')
env = PyramidEnvironment.from_bootstrap(env_dict)
Access components
settings = env.settings
registry = env.registry
Clean up when done
env.close()
Source code in pyramid_temporal/environment.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133 | class PyramidEnvironment:
"""Wrapper for Pyramid bootstrap environment.
This class wraps the output of pyramid.paster.bootstrap, providing
structured access to the Pyramid application components.
Attributes:
registry: The Pyramid registry
app: The WSGI application
request: The base request object
root: The root object (for traversal-based applications)
Example:
from pyramid.paster import bootstrap
from pyramid_temporal import PyramidEnvironment
# Create from bootstrap output
env_dict = bootstrap('development.ini')
env = PyramidEnvironment.from_bootstrap(env_dict)
# Access components
settings = env.settings
registry = env.registry
# Clean up when done
env.close()
"""
def __init__(
self,
registry: "Registry",
app: Optional[Any] = None,
request: Optional[Any] = None,
root: Optional[Any] = None,
closer: Optional[Callable[[], None]] = None,
) -> None:
"""Initialize the Pyramid environment.
Args:
registry: Pyramid registry instance (required)
app: WSGI application instance
request: Base request object from bootstrap
root: Root object for traversal-based applications
closer: Cleanup callable from bootstrap
"""
self._registry = registry
self._app = app
self._request = request
self._root = root
self._closer = closer
logger.debug("Created PyramidEnvironment with registry: %s", registry)
@classmethod
def from_bootstrap(cls, env: dict) -> "PyramidEnvironment":
"""Create a PyramidEnvironment from bootstrap output.
This is the preferred way to create a PyramidEnvironment when
using pyramid.paster.bootstrap.
Args:
env: Dictionary returned by pyramid.paster.bootstrap()
Returns:
PyramidEnvironment instance
Example:
from pyramid.paster import bootstrap
from pyramid_temporal import PyramidEnvironment
env = PyramidEnvironment.from_bootstrap(bootstrap('development.ini'))
"""
return cls(
registry=env["registry"],
app=env.get("app"),
request=env.get("request"),
root=env.get("root"),
closer=env.get("closer"),
)
@property
def registry(self) -> "Registry":
"""Get the Pyramid registry."""
return self._registry
@property
def app(self) -> Optional[Any]:
"""Get the WSGI application."""
return self._app
@property
def request(self) -> Optional[Any]:
"""Get the base request object from bootstrap."""
return self._request
@property
def root(self) -> Optional[Any]:
"""Get the root object for traversal-based applications."""
return self._root
@property
def settings(self) -> dict:
"""Get application settings (shortcut to registry.settings)."""
return self._registry.settings
def close(self) -> None:
"""Clean up resources.
This calls the closer function from bootstrap to properly
clean up the Pyramid application.
"""
if self._closer is not None:
logger.debug("Closing PyramidEnvironment")
self._closer()
def __repr__(self) -> str:
"""Return string representation."""
return f"<PyramidEnvironment registry={self._registry}>"
|
app
property
Get the WSGI application.
registry
property
Get the Pyramid registry.
request
property
Get the base request object from bootstrap.
root
property
Get the root object for traversal-based applications.
settings
property
Get application settings (shortcut to registry.settings).
__init__(registry, app=None, request=None, root=None, closer=None)
Initialize the Pyramid environment.
| Parameters: |
-
registry
(Registry)
–
Pyramid registry instance (required)
-
app
(Optional[Any], default:
None
)
–
WSGI application instance
-
request
(Optional[Any], default:
None
)
–
Base request object from bootstrap
-
root
(Optional[Any], default:
None
)
–
Root object for traversal-based applications
-
closer
(Optional[Callable[[], None]], default:
None
)
–
Cleanup callable from bootstrap
|
Source code in pyramid_temporal/environment.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67 | def __init__(
self,
registry: "Registry",
app: Optional[Any] = None,
request: Optional[Any] = None,
root: Optional[Any] = None,
closer: Optional[Callable[[], None]] = None,
) -> None:
"""Initialize the Pyramid environment.
Args:
registry: Pyramid registry instance (required)
app: WSGI application instance
request: Base request object from bootstrap
root: Root object for traversal-based applications
closer: Cleanup callable from bootstrap
"""
self._registry = registry
self._app = app
self._request = request
self._root = root
self._closer = closer
logger.debug("Created PyramidEnvironment with registry: %s", registry)
|
__repr__()
Return string representation.
Source code in pyramid_temporal/environment.py
| def __repr__(self) -> str:
"""Return string representation."""
return f"<PyramidEnvironment registry={self._registry}>"
|
close()
Clean up resources.
This calls the closer function from bootstrap to properly
clean up the Pyramid application.
Source code in pyramid_temporal/environment.py
121
122
123
124
125
126
127
128
129 | def close(self) -> None:
"""Clean up resources.
This calls the closer function from bootstrap to properly
clean up the Pyramid application.
"""
if self._closer is not None:
logger.debug("Closing PyramidEnvironment")
self._closer()
|
from_bootstrap(env)
classmethod
Create a PyramidEnvironment from bootstrap output.
This is the preferred way to create a PyramidEnvironment when
using pyramid.paster.bootstrap.
| Parameters: |
-
env
(dict)
–
Dictionary returned by pyramid.paster.bootstrap()
|
Example
from pyramid.paster import bootstrap
from pyramid_temporal import PyramidEnvironment
env = PyramidEnvironment.from_bootstrap(bootstrap('development.ini'))
Source code in pyramid_temporal/environment.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94 | @classmethod
def from_bootstrap(cls, env: dict) -> "PyramidEnvironment":
"""Create a PyramidEnvironment from bootstrap output.
This is the preferred way to create a PyramidEnvironment when
using pyramid.paster.bootstrap.
Args:
env: Dictionary returned by pyramid.paster.bootstrap()
Returns:
PyramidEnvironment instance
Example:
from pyramid.paster import bootstrap
from pyramid_temporal import PyramidEnvironment
env = PyramidEnvironment.from_bootstrap(bootstrap('development.ini'))
"""
return cls(
registry=env["registry"],
app=env.get("app"),
request=env.get("request"),
root=env.get("root"),
closer=env.get("closer"),
)
|
PyramidTemporalInterceptor
Bases: Interceptor
Main interceptor class for pyramid-temporal integration.
This is the main entry point for integrating pyramid-temporal
transaction management with Temporal workers.
Source code in pyramid_temporal/interceptor.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147 | class PyramidTemporalInterceptor(Interceptor):
"""Main interceptor class for pyramid-temporal integration.
This is the main entry point for integrating pyramid-temporal
transaction management with Temporal workers.
"""
def __init__(
self,
context: Optional["ActivityContext"] = None,
) -> None:
"""Initialize the interceptor.
Args:
context: ActivityContext for request/session management
"""
self._context = context
logger.info("Initialized PyramidTemporalInterceptor with context: %s", "yes" if context else "no")
def intercept_activity(self, next_interceptor: ActivityInboundInterceptor) -> ActivityInboundInterceptor:
"""Intercept activity execution to add transaction management.
Args:
next_interceptor: The next interceptor in the chain
Returns:
Transactional activity interceptor
"""
return TransactionalActivityInterceptor(
next_interceptor,
context=self._context,
)
|
__init__(context=None)
Initialize the interceptor.
| Parameters: |
-
context
(Optional[ActivityContext], default:
None
)
–
ActivityContext for request/session management
|
Source code in pyramid_temporal/interceptor.py
123
124
125
126
127
128
129
130
131
132
133 | def __init__(
self,
context: Optional["ActivityContext"] = None,
) -> None:
"""Initialize the interceptor.
Args:
context: ActivityContext for request/session management
"""
self._context = context
logger.info("Initialized PyramidTemporalInterceptor with context: %s", "yes" if context else "no")
|
intercept_activity(next_interceptor)
Intercept activity execution to add transaction management.
| Parameters: |
-
next_interceptor
(ActivityInboundInterceptor)
–
The next interceptor in the chain
|
| Returns: |
-
ActivityInboundInterceptor
–
Transactional activity interceptor
|
Source code in pyramid_temporal/interceptor.py
135
136
137
138
139
140
141
142
143
144
145
146
147 | def intercept_activity(self, next_interceptor: ActivityInboundInterceptor) -> ActivityInboundInterceptor:
"""Intercept activity execution to add transaction management.
Args:
next_interceptor: The next interceptor in the chain
Returns:
Transactional activity interceptor
"""
return TransactionalActivityInterceptor(
next_interceptor,
context=self._context,
)
|
Worker
Pyramid-aware Temporal Worker.
This worker wraps the standard Temporal Worker and provides automatic
context binding for pyramid-temporal activities. It also includes
transaction management via the PyramidTemporalInterceptor.
Example
from pyramid.paster import bootstrap
from pyramid_temporal import Worker, activity, PyramidEnvironment
@activity.defn
async def my_activity(context: ActivityContext, user_id: int) -> bool:
session = context.request.dbsession
# ... do work ...
return True
Create environment from bootstrap
env = PyramidEnvironment.from_bootstrap(bootstrap('development.ini'))
worker = Worker(
client,
env,
task_queue="my-queue",
activities=[my_activity],
workflows=[MyWorkflow],
)
Run the worker
await worker.run()
Source code in pyramid_temporal/worker.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190 | class Worker:
"""Pyramid-aware Temporal Worker.
This worker wraps the standard Temporal Worker and provides automatic
context binding for pyramid-temporal activities. It also includes
transaction management via the PyramidTemporalInterceptor.
Example:
from pyramid.paster import bootstrap
from pyramid_temporal import Worker, activity, PyramidEnvironment
@activity.defn
async def my_activity(context: ActivityContext, user_id: int) -> bool:
session = context.request.dbsession
# ... do work ...
return True
# Create environment from bootstrap
env = PyramidEnvironment.from_bootstrap(bootstrap('development.ini'))
worker = Worker(
client,
env,
task_queue="my-queue",
activities=[my_activity],
workflows=[MyWorkflow],
)
# Run the worker
await worker.run()
"""
def __init__(
self,
client: Client,
env: PyramidEnvironment,
*,
task_queue: str,
activities: Sequence[Any] = (),
workflows: Sequence[type] = (),
interceptors: Sequence["Interceptor"] = (),
**kwargs: Any,
) -> None:
"""Initialize the Pyramid-aware worker.
Args:
client: Temporal client instance
env: PyramidEnvironment instance (from bootstrap)
task_queue: Name of the task queue to poll
activities: List of activities (both pyramid-temporal and plain Temporal)
workflows: List of workflow classes
interceptors: Additional interceptors to include (pyramid-temporal
interceptor is automatically added)
**kwargs: Additional arguments passed to Temporal Worker
"""
self._client = client
self._env = env
self._task_queue = task_queue
self._workflows = workflows
self._extra_kwargs = kwargs
# Create the activity context
self._context = ActivityContext(env=env)
# Bind activities and separate pyramid vs plain
self._bound_activities = self._bind_activities(activities)
# Create interceptors list with our interceptor first
self._interceptors = self._create_interceptors(interceptors)
# Create the underlying Temporal worker
self._worker = self._create_worker()
logger.info(
"Created Pyramid Worker for task queue '%s' with %d activities and %d workflows",
task_queue,
len(self._bound_activities),
len(workflows),
)
def _bind_activities(self, activities: Sequence[Any]) -> list:
"""Bind pyramid-temporal activities to context, pass through others.
Args:
activities: List of activities (mixed pyramid and plain)
Returns:
List of bound/processed activities
"""
bound = []
for act in activities:
if is_pyramid_activity(act):
# Bind pyramid-temporal activity to context
pyramid_act: PyramidActivity = act
bound_act = pyramid_act.bind(self._context)
logger.debug("Bound pyramid activity: %s", pyramid_act.name)
bound.append(bound_act)
else:
# Pass through plain Temporal activity
logger.debug("Passing through plain activity: %s", getattr(act, "__name__", act))
bound.append(act)
return bound
def _create_interceptors(self, extra_interceptors: Sequence["Interceptor"]) -> list["Interceptor"]:
"""Create the interceptors list with pyramid-temporal interceptor.
Args:
extra_interceptors: Additional interceptors from user
Returns:
List of interceptors with PyramidTemporalInterceptor included
"""
# Create our interceptor with context reference
pyramid_interceptor = PyramidTemporalInterceptor(
context=self._context,
)
# Our interceptor first, then user's interceptors
return [pyramid_interceptor, *extra_interceptors]
def _create_worker(self) -> TemporalWorker:
"""Create the underlying Temporal worker.
Returns:
Configured Temporal Worker instance
"""
return TemporalWorker(
self._client,
task_queue=self._task_queue,
activities=self._bound_activities,
workflows=list(self._workflows),
interceptors=self._interceptors,
**self._extra_kwargs,
)
@property
def env(self) -> PyramidEnvironment:
"""Get the Pyramid environment."""
return self._env
@property
def context(self) -> ActivityContext:
"""Get the activity context."""
return self._context
@property
def task_queue(self) -> str:
"""Get the task queue name."""
return self._task_queue
async def run(self) -> None:
"""Run the worker until shutdown is requested.
This is the main entry point for running the worker.
It will poll the task queue and execute activities/workflows.
"""
logger.info("Starting Pyramid Worker on task queue '%s'", self._task_queue)
await self._worker.run()
async def __aenter__(self) -> "Worker":
"""Async context manager entry."""
await self._worker.__aenter__()
return self
async def __aexit__(self, *args: Any) -> None:
"""Async context manager exit."""
await self._worker.__aexit__(*args)
|
context
property
Get the activity context.
env
property
Get the Pyramid environment.
__aenter__()
async
Async context manager entry.
Source code in pyramid_temporal/worker.py
| async def __aenter__(self) -> "Worker":
"""Async context manager entry."""
await self._worker.__aenter__()
return self
|
__aexit__(*args)
async
Async context manager exit.
Source code in pyramid_temporal/worker.py
| async def __aexit__(self, *args: Any) -> None:
"""Async context manager exit."""
await self._worker.__aexit__(*args)
|
__init__(client, env, *, task_queue, activities=(), workflows=(), interceptors=(), **kwargs)
Initialize the Pyramid-aware worker.
| Parameters: |
-
client
(Client)
–
-
env
(PyramidEnvironment)
–
PyramidEnvironment instance (from bootstrap)
-
task_queue
(str)
–
Name of the task queue to poll
-
activities
(Sequence[Any], default:
()
)
–
List of activities (both pyramid-temporal and plain Temporal)
-
workflows
(Sequence[type], default:
()
)
–
-
interceptors
(Sequence[Interceptor], default:
()
)
–
Additional interceptors to include (pyramid-temporal
interceptor is automatically added)
-
**kwargs
(Any, default:
{}
)
–
Additional arguments passed to Temporal Worker
|
Source code in pyramid_temporal/worker.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102 | def __init__(
self,
client: Client,
env: PyramidEnvironment,
*,
task_queue: str,
activities: Sequence[Any] = (),
workflows: Sequence[type] = (),
interceptors: Sequence["Interceptor"] = (),
**kwargs: Any,
) -> None:
"""Initialize the Pyramid-aware worker.
Args:
client: Temporal client instance
env: PyramidEnvironment instance (from bootstrap)
task_queue: Name of the task queue to poll
activities: List of activities (both pyramid-temporal and plain Temporal)
workflows: List of workflow classes
interceptors: Additional interceptors to include (pyramid-temporal
interceptor is automatically added)
**kwargs: Additional arguments passed to Temporal Worker
"""
self._client = client
self._env = env
self._task_queue = task_queue
self._workflows = workflows
self._extra_kwargs = kwargs
# Create the activity context
self._context = ActivityContext(env=env)
# Bind activities and separate pyramid vs plain
self._bound_activities = self._bind_activities(activities)
# Create interceptors list with our interceptor first
self._interceptors = self._create_interceptors(interceptors)
# Create the underlying Temporal worker
self._worker = self._create_worker()
logger.info(
"Created Pyramid Worker for task queue '%s' with %d activities and %d workflows",
task_queue,
len(self._bound_activities),
len(workflows),
)
|
run()
async
Run the worker until shutdown is requested.
This is the main entry point for running the worker.
It will poll the task queue and execute activities/workflows.
Source code in pyramid_temporal/worker.py
174
175
176
177
178
179
180
181 | async def run(self) -> None:
"""Run the worker until shutdown is requested.
This is the main entry point for running the worker.
It will poll the task queue and execute activities/workflows.
"""
logger.info("Starting Pyramid Worker on task queue '%s'", self._task_queue)
await self._worker.run()
|
defn(fn=None, *, name=None, no_thread_cancel_default=False)
Decorator to define a pyramid-temporal activity.
This decorator marks a function as a pyramid-temporal activity that
will receive an ActivityContext as its first argument. The context
is automatically injected when the activity is executed via the
pyramid-temporal Worker.
The decorated function should have ActivityContext as its first parameter:
@activity.defn
async def my_activity(context: ActivityContext, user_id: int) -> bool:
session = context.request.dbsession
# ... do work ...
return True
| Parameters: |
-
fn
(Optional[F], default:
None
)
–
The activity function (when used without parentheses)
-
name
(Optional[str], default:
None
)
–
Optional custom name for the activity. Defaults to function name.
-
no_thread_cancel_default
(bool, default:
False
)
–
Whether to disable thread cancellation by default.
|
| Returns: |
-
Any
–
A decorated activity that can be bound to a context via the Worker.
|
Example
@activity.defn
async def process_order(context: ActivityContext, order_id: int) -> bool:
session = context.request.dbsession
order = session.query(Order).get(order_id)
# Process the order...
return True
Or with custom name:
@activity.defn(name="custom-activity-name")
async def my_activity(context: ActivityContext, data: str) -> str:
return data.upper()
Source code in pyramid_temporal/activity.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78 | def defn(
fn: Optional[F] = None,
*,
name: Optional[str] = None,
no_thread_cancel_default: bool = False,
) -> Any:
"""Decorator to define a pyramid-temporal activity.
This decorator marks a function as a pyramid-temporal activity that
will receive an ActivityContext as its first argument. The context
is automatically injected when the activity is executed via the
pyramid-temporal Worker.
The decorated function should have ActivityContext as its first parameter:
@activity.defn
async def my_activity(context: ActivityContext, user_id: int) -> bool:
session = context.request.dbsession
# ... do work ...
return True
Args:
fn: The activity function (when used without parentheses)
name: Optional custom name for the activity. Defaults to function name.
no_thread_cancel_default: Whether to disable thread cancellation by default.
Returns:
A decorated activity that can be bound to a context via the Worker.
Example:
@activity.defn
async def process_order(context: ActivityContext, order_id: int) -> bool:
session = context.request.dbsession
order = session.query(Order).get(order_id)
# Process the order...
return True
# Or with custom name:
@activity.defn(name="custom-activity-name")
async def my_activity(context: ActivityContext, data: str) -> str:
return data.upper()
"""
def decorator(func: F) -> "PyramidActivity":
activity = PyramidActivity(
func,
name=name,
no_thread_cancel_default=no_thread_cancel_default,
)
return activity
# Handle both @activity.defn and @activity.defn() syntax
if fn is not None:
return decorator(fn)
return decorator
|
includeme(config)
Pyramid configuration include function.
This function can be called via config.include('pyramid_temporal')
to register pyramid-temporal with a Pyramid application.
Configuration settings:
- pyramid_temporal.temporal_host: Temporal server host (default: localhost:7233)
- pyramid_temporal.log_level: Logging level (default: INFO)
- pyramid_temporal.auto_connect: Auto-connect to Temporal on startup (default: True)
| Parameters: |
-
config
(Configurator)
–
Pyramid configurator instance
|
Example
from pyramid.config import Configurator
def main():
config = Configurator()
config.include('pyramid_temporal')
# Optional: Configure Temporal connection
config.registry.settings['pyramid_temporal.temporal_host'] = 'localhost:7233'
# ... rest of configuration
Source code in pyramid_temporal/__init__.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137 | def includeme(config: "Configurator") -> None:
"""Pyramid configuration include function.
This function can be called via config.include('pyramid_temporal')
to register pyramid-temporal with a Pyramid application.
Configuration settings:
- pyramid_temporal.temporal_host: Temporal server host (default: localhost:7233)
- pyramid_temporal.log_level: Logging level (default: INFO)
- pyramid_temporal.auto_connect: Auto-connect to Temporal on startup (default: True)
Args:
config: Pyramid configurator instance
Example:
```python
from pyramid.config import Configurator
def main():
config = Configurator()
config.include('pyramid_temporal')
# Optional: Configure Temporal connection
config.registry.settings['pyramid_temporal.temporal_host'] = 'localhost:7233'
# ... rest of configuration
```
"""
logger.info("Including pyramid-temporal configuration")
# Get settings
settings = config.get_settings()
# Set default settings for pyramid-temporal if they don't exist
if "pyramid_temporal.log_level" not in settings:
settings["pyramid_temporal.log_level"] = "INFO"
if "pyramid_temporal.temporal_host" not in settings:
settings["pyramid_temporal.temporal_host"] = "localhost:7233"
if "pyramid_temporal.auto_connect" not in settings:
settings["pyramid_temporal.auto_connect"] = "true"
# Configure logging level
log_level = settings.get("pyramid_temporal.log_level", "INFO").upper()
pyramid_temporal_logger = logging.getLogger("pyramid_temporal")
pyramid_temporal_logger.setLevel(getattr(logging, log_level, logging.INFO))
# pyramid-temporal configuration is now complete
# Setup Temporal client if auto_connect is enabled
auto_connect = settings.get("pyramid_temporal.auto_connect", "true").lower() == "true"
if auto_connect:
_setup_temporal_client(config, settings)
# Add request method to get Temporal client
config.add_request_method(_get_temporal_client, "temporal_client", reify=True)
logger.info("pyramid-temporal configuration complete")
|
is_pyramid_activity(obj)
Check if an object is a pyramid-temporal activity.
| Returns: |
-
bool
–
True if the object is a pyramid-temporal activity
|
Source code in pyramid_temporal/activity.py
154
155
156
157
158
159
160
161
162
163 | def is_pyramid_activity(obj: Any) -> bool:
"""Check if an object is a pyramid-temporal activity.
Args:
obj: Object to check
Returns:
True if the object is a pyramid-temporal activity
"""
return getattr(obj, PYRAMID_ACTIVITY_MARKER, False)
|