My recent side-project vindex is a semantic video indexing platform. Users upload videos that then run through a multi-step processing pipeline to prepare and index them. Finally, users can search specific locations in the video using natural language.

The processing pipeline involves extracting frames, extracting audio, transcribing audio and computing various embeddings for the index. I’m using temporal to orchestrate this pipeline for fun and profit!

What is Temporal and why use it?

Temporal’s website introduces temporal with “Code smart. Move fast. Break nothing. Eliminate complex error or retry logic, avoid callbacks, and ensure that every workflow you start, completes. Temporal delivers durable execution for your services and applications.”.

Temporal provides programming framework where business processes (“workflows”) are broken down into small, resuable units of code called activities.

Workflows describe the flow of activities in sequence (or in parallel) including conditions and branching. Workflows are deterministic – running the same workflow with the same arguments will produce the same result. This is done by storing all inputs and outputs to and from activities as part of a workflow run.

Activities are side-effectful actions like RPC calls, running external programs, or reading/writing a database. Activities are annotated with retry policies and should have an idempotent implementation.

This provides some nice properties around errors – errors like network errors are transient, temporal can automatically retry activities with such failures. Programmer errors can also be transient; once fixed temporal can retry the affected activity or workflow from it’s last known working state.

Scaling is also handled by temporal; one can register workers as needed with the temporal server which will fairly distribute workflows and activities.

Development

For development I use temporalite to run temporal locally. It’s a go repository that you can run with:

git clone https://github.com/temporalio/temporalite.git
cd temporalite
go build ./cmd/temporalite
./temporalite start --namespace default

The app itself is written in python 3.10 and I manage python dependencies using pipenv which works well although it’s slow.

Writing an activity

I decided to split all activities into two files:

  • the parameters of the activity and the result type
  • the implementation of the activity itself

This avoid callers of the activity to transitively pull in any dependencies of the activity implementation. The python implementation checks that imports from a workflow module are side-effect free; however many useful python packages needed in activities perform side-effects (e.g. opencv) and thus avoiding the transitive dependency resolves any issues encountered here.

Example my_activity_params.py

from dataclasses import dataclass

@dataclass
class MyActivityParams:
    username: str
    media: int
    
@dataclass
class MyActivityResult:
    seconds: int

The temporal python SDK relies on python’s dataclasses to model parameters and return types of workflows and activities. I highly recommend starting with dataclasses from day 1 – this simplifies backwards compatibility. Once deployed to production changes to these types should be backwards compatible (only adding optional fields).

Example my_activity.py

from temporalio import activity
import psycopg_pool

from myapp.activities.my_activity_params import MyActivityParams, MyActivityResult

def make_my_activity(pool: psycopg_pool.AsyncConnectionPool):
    @activity.defn(name="my-activity")
    async def impl(params: MyActivityParams) -> MyActivityResult:
        async with pool.connection() as conn:
            async with conn.cursor() as cur:
                # ... do something w/ the database ...
                
        return MyActivityResult(seconds=5)

    return impl

All my activity modules don’t directly export the activity, but instead provide a function to construct an activity. This is a useful pattern to inject any common runtime dependencies like database connection pools or configurations.

Writing a workflow

I structure workflows similar to activities for similar reasons. Other than that, the workflows follow the python documentation pretty closely.

Example my_workflow_params.py

from typing import Optional
from dataclasses import dataclass

@dataclass
class MyWorkflowParams:
    workspace: int
    username: str
    s3_location: Optional[str]

Example my_workflow.py

from datetime import timedelta
from temporalio import workflow

from myapp.activities.my_activity_params import MyActivityParams
from myapp.workflows.my_workflow_params import MyWorkflowParams

@workflow.defn(name="my-workflow")
class MyWorkflow:
    @workflow.run
    async def run(self, params: MyWorkflowParams) -> None:
        # ...
        activity_result = await workflow.execute_activity(
            "my-activity",  MyActivityParams(
                username=params.username, media=1), start_to_close_timeout=timedelta(seconds=10))

        workflow.logger.info(
            f"Took {activity_result.seconds}")

        # ...

        pass

Putting it all together

Running workflows needs two main components – a worker process and a component that triggers (or inspects) new workflows.

The worker code will register all activities and workflows that it should be responsible for and poll for new tasks (activities or workflows) from the temporal server. These are horizontally scalable – you can run as many worker instances as needed.

The trigger will likely integrate into your existing app (e.g. a webserver using fastapi). Launching a workflow requires a connection to the temporal server, the workflow’s name, the corresponding parameters and an ID. The ID is useful for idempotency and later introspection of the workflow.

You can also isolate work into different task queues and/or namespaces – we’re using the default task queue and a default namespace here for simplicity.

Implementing the worker

import asyncio
from temporalio.worker import Worker
from temporalio.client import Client
import psycopg_pool

from myapp.workflows.my_workflow import MyWorkflow
from myapp.activities.my_activity import make_my_activity
# ...

async def run_worker(stop_event: asyncio.Event, pool: psycopg_pool.AsyncConnectionPool):
    client = await Client.connect("127.0.0.1:7233", namespace="default")

    print("Worker launching")
    worker = Worker(
        client,
        task_queue="default",
        # Add all your workflows here
        workflows=[MyWorkflow],
        
        # Add all your activity builders here
        activities=[make_my_activity(pool), # ...
                   ],
    )
    async with worker:
        print("Worker running")
        await stop_event.wait()
        print("Worker done")
        
async def main():
    pool = psycopg_pool.AsyncConnectionPool(conninfo="dbname=example")
    stop_worker = asyncio.Event()
    await run_worker(stop_worker, pool)
    stop_worker.set()

if __name__ == "__main__":
    asyncio.run(main())

Launching a workflow from your app

import asyncio
from temporalio.client import Client

from myapp.workflows.my_workflow_params import MyWorkflowParams


async def main():
    client = await Client.connect("127.0.0.1:7233", namespace="default")

    params = MyWorkflowParams(
        workspace=1, username="foo", s3_location=None)
    handle = await client.start_workflow(
        "my-workflow", params, id="some-instance-id", task_queue="default")
    await handle.result()

if __name__ == "__main__":
    asyncio.run(main())

Conclusion

So far using temporal has been great; I really like the programming model and the observability that comes with it out of the box. Looking forward to using it in more projects – in particular I’m curious to write more parts of the app with temporal and experiment with using it to power API endpoints end-to-end.