Write a simple AI agent

In 60 lines of code, you can write a simple AI agent that can reply to tweets with a poem. It relies on Emp Cloud and two libraries from Empyreal, emp-agents and emp_hooks. The agent replies to tweets that mention it by name, in this case @empcloud_demo.

Start by collecting your Twitter API keys and OpenAI API key and save them in a .env file.

TWITTER_BEARER_TOKEN=...
TWITTER_CONSUMER_KEY=...
TWITTER_CONSUMER_SECRET=...
TWITTER_ACCESS_TOKEN=...
TWITTER_ACCESS_TOKEN_SECRET=...
OPENAI_API_KEY=...

Upload the secrets to your project

empctl secrets upsert --file .env

Write the code for the agent. Make sure to have fun with the prompt, as your agent's personality is defined by it.

import asyncio
import json
import os
import sys

from tweepy import Tweet
from tweepy.client import Client

from emp_hooks import twitter
from emp_hooks.hook_manager import hooks
from emp_agents import AgentBase
from emp_agents.providers import OpenAIProvider, OpenAIModelType

my_agent_twitter_id = "1902885085294235650"

@twitter.on_tweet("@empcloud_demo")
def on_tweet(tweet: Tweet):
    print("RECEIVED TWEET")
    data = json.loads(tweet["data"])

    tweet_id = data["id"]
    author_id = int(data["author_id"])

    # ignore if the post is from the bot itself
    if author_id == my_agent_twitter_id:
        return

    print(f"Received tweet: {tweet_id} from {author_id}")
    sys.stdout.flush()

    client = Client(
        bearer_token=os.environ["TWITTER_BEARER_TOKEN"],
        consumer_key=os.environ["TWITTER_CONSUMER_KEY"],
        consumer_secret=os.environ["TWITTER_CONSUMER_SECRET"],
        access_token=os.environ["TWITTER_ACCESS_TOKEN"],
        access_token_secret=os.environ["TWITTER_ACCESS_TOKEN_SECRET"],
    )

    agent = AgentBase(
        prompt="""
            You are a helpful agent. You will respond in 200 characters to any questions, in a mechanical way.
            You will respond as if you are an extra in a film. Make sure you be kind of depressing and reference
            famous poetry in a really pretentious way when you get the chance.
        """,
        provider=OpenAIProvider(
            api_key=os.environ["OPENAI_API_KEY"],
            default_model=OpenAIModelType.gpt4o_mini,
        )
    )

    agent_response = agent.answer(data["text"])
    response = asyncio.run(agent_response)

    try:
        response = client.create_tweet(
            text=response,
            in_reply_to_tweet_id=tweet_id,
        )
        print(f"Tweeted: {response.data['text']}")
    except Exception as e:
        print(f"Error Tweeting: {e}")
        sys.stdout.flush()


hooks.run(keep_alive=True)
emp-agents
emp-hooks

Lastly, you need a Dockerfile to build and deploy the agent.

FROM python:3.12-alpine

WORKDIR /app

COPY . .
RUN python -m pip install -r requirements.txt

CMD ["python", "demo.py"]

Build and deploy the agent

empctl build --push --deploy --service myservice

Congratulations! You just deployed your first AI agent.

Wondering how the tweet arrives at the agent?

The flow involves the emp_hooks library and custom k8s controllers that are watching all the events on the projects and interact with the Twitter API.

Tweets are being read from the Twitter API Stream and then pushed to the project's SQS.fifo queue, which then is read from the emp_hooks library again and passed to the agent.

Last updated