Skip to main content

Observability SDK

This guide shows you how to use the Agenta observability SDK to instrument workflows in your application.

If you're exclusively using a framework like LangChain, you can use the auto-instrumentation packages to automatically instrument your application.

However, if you need more flexibility, you can use the SDK to:

  • Instrument custom functions in your workflow
  • Add additional metadata to the spans
  • Link traces to applications, variants, and environments in Agenta

Setup

1. Install the Agenta SDK

pip install -U agenta

2. Set environment variables

  1. Visit the Agenta API Keys page.
  2. Click on Create New API Key and follow the prompts.
import os

os.environ["AGENTA_API_KEY"] = "YOUR_AGENTA_API_KEY"
os.environ["AGENTA_HOST"] = "https://cloud.agenta.ai"

Instrumenting functions with the decorator

To instrument a function, add the @ag.instrument() decorator. This automatically captures all input and output data.

The decorator has a spankind argument to categorize each span in the UI. Available types are:

agent, chain, workflow, tool, embedding, query, completion, chat, rerank

caution

The instrument decorator should be the top-most decorator on a function (i.e. the last decorator before the function call).

@ag.instrument(spankind="task")
def myllmcall(country:str):
prompt = f"What is the capital of {country}"
response = client.chat.completions.create(
model='gpt-4',
messages=[
{'role': 'user', 'content': prompt},
],
)
return response.choices[0].text

@ag.instrument(spankind="workflow")
def generate(country:str):
return myllmcall(country)

Agenta automatically determines the parent span based on the function call and nests the spans accordingly.

Modify a span's metadata

You can add additional information to a span's metadata using ag.tracing.store_meta(). This function accesses the active span from the context and adds the key-value pairs to the metadata.

@ag.instrument(spankind="task")
def compile_prompt(country:str):
prompt = f"What is the capital of {country}"

ag.tracing.store_meta({"prompt_template": prompt})

formatted_prompt = prompt.format(country=country)
return formatted_prompt

Metadata is displayed in the span's raw data view.

Linking spans to applications, variants, and environments

You can link a span to an application, variant, and environment by calling ag.tracing.store_refs().

Applications, variants, and environments can be referenced by their slugs, versions, and commit IDs (for specific versions). You can link a span to an application and variant like this:


@ag.instrument(spankind="workflow")
def generate(country:str):
prompt = f"What is the capital of {country}"


formatted_prompt = prompt.format(country=country)

completion = client.chat.completions.create(
model='gpt-4',
messages=[
{'role': 'user', 'content': formatted_prompt},
],
)

ag.tracing.store_refs(
{
"application.slug": "capital-app",
"environment.slug": "production",
}
)
return completion.choices[0].message.content

ag.tracing.store_refs() takes a dict with keys from application.slug, application.id, application.version, variant.slug, variant.id, variant.version, environment.slug, environment.id and environment.commit_id, with the values being the slug, id, version or commit id of the application, variant, and environment respectively.

Storing Internals

Internals are additional data stored in the span. Compared to metadata, internals have the following differences:

  • Internals are saved within the span data and are searchable with plain text queries.
  • Internals are shown by default in the span view in a collapsible section, while metadata is only shown as part of the JSON file with the raw data (i.e. better visibility with internals).
  • Internals can be used for evaluation. For instance, you can save the retrieved context in the internals and then use it to evaluate the factuality of the response.

As a rule of thumb, use metadata for additional information that is not used for evaluation and not elementary to understand the span, otherwise use internals.

Internals can be stored similarly to metadata:

@ag.instrument(spankind="workflow")
def rag_workflow(query:str):

context = retrieve_context(query)

ag.tracing.store_internals({"context": context})

prompt = f"Answer the following question {query} based on the context: {context}"

completion = client.chat.completions.create(
model='gpt-4',
messages=[
{'role': 'user', 'content': formatted_prompt},
],
)
return completion.choices[0].message.content

Redacting sensitive data: how to exclude data from capture

In some cases, you may want to exclude parts of the inputs or outputs due to privacy concerns or because the data is too large to be stored in the span.

You can do this by setting the ignore_inputs and/or ignore_outputs arguments to True in the instrument decorator.

@ag.instrument(
spankind="workflow",
ignore_inputs=True,
ignore_outputs=True
)
def rag_workflow(query:str):
...

If you want more control, you can specify which parts of the inputs and outputs to exclude:

@ag.instrument(
spankind="workflow",
ignore_inputs=["user_id"],
ignore_outputs=["pii"],
)
def rag_workflow(query:str, user_id:str):
...
return {
"result": ...,
"pii": ...
}

For even finer control, you can use a custom redact() callback, along with instructions in the case of errors.

def my_redact(name, field, data):
if name == "rag_workflow":
if field == "inputs":
del data["user_id"]
if field == "outputs":
del data["pii"]

return data


@ag.instrument(
spankind="workflow",
redact=my_redact,
redact_on_error=False,
)
def rag_workflow(query:str, user_id:str):
...
return {
"result": ...,
"pii": ...
}

Finally, if you want to set up global rules for redaction, you can provide a global redact() callback that applies everywhere.

def global_redact(
name:str,
field:str,
data: Dict[str, Any]
):
if "pii" in data:
del data["pii"]

return data


ag.init(
redact=global_redact,
redact_on_error=True,
)

def local_redact(
name:str,
field:str,
data: Dict[str, Any]
):
if name == "rag_workflow":
if field == "inputs":
del data["user_id"]

return data


@ag.instrument(
spankind="workflow",
redact=local_redact,
redact_on_error=False,
)
def rag_workflow(query:str, user_id:str):
...
return {
"result": ...,
"pii": ...
}