Write a simple AI agent
In 60 lines of code, you can write a simple AI agent that can reply to tweets with a poem.
It relies on Emp Cloud and two libraries from Empyreal, emp-agents
and emp_hooks
.
The agent replies to tweets that mention it by name, in this case @empcloud_demo
.
Start by collecting your Twitter API keys and OpenAI API key and save them in a .env
file.
TWITTER_BEARER_TOKEN=...
TWITTER_CONSUMER_KEY=...
TWITTER_CONSUMER_SECRET=...
TWITTER_ACCESS_TOKEN=...
TWITTER_ACCESS_TOKEN_SECRET=...
OPENAI_API_KEY=...
Upload the secrets to your project
empctl secrets upsert --file .env
Write the code for the agent. Make sure to have fun with the prompt, as your agent's personality is defined by it.
import asyncio
import json
import os
import sys
from tweepy import Tweet
from tweepy.client import Client
from emp_hooks import twitter
from emp_hooks.hook_manager import hooks
from emp_agents import AgentBase
from emp_agents.providers import OpenAIProvider, OpenAIModelType
my_agent_twitter_id = "1902885085294235650"
@twitter.on_tweet("@empcloud_demo")
def on_tweet(tweet: Tweet):
print("RECEIVED TWEET")
data = json.loads(tweet["data"])
tweet_id = data["id"]
author_id = int(data["author_id"])
# ignore if the post is from the bot itself
if author_id == my_agent_twitter_id:
return
print(f"Received tweet: {tweet_id} from {author_id}")
sys.stdout.flush()
client = Client(
bearer_token=os.environ["TWITTER_BEARER_TOKEN"],
consumer_key=os.environ["TWITTER_CONSUMER_KEY"],
consumer_secret=os.environ["TWITTER_CONSUMER_SECRET"],
access_token=os.environ["TWITTER_ACCESS_TOKEN"],
access_token_secret=os.environ["TWITTER_ACCESS_TOKEN_SECRET"],
)
agent = AgentBase(
prompt="""
You are a helpful agent. You will respond in 200 characters to any questions, in a mechanical way.
You will respond as if you are an extra in a film. Make sure you be kind of depressing and reference
famous poetry in a really pretentious way when you get the chance.
""",
provider=OpenAIProvider(
api_key=os.environ["OPENAI_API_KEY"],
default_model=OpenAIModelType.gpt4o_mini,
)
)
agent_response = agent.answer(data["text"])
response = asyncio.run(agent_response)
try:
response = client.create_tweet(
text=response,
in_reply_to_tweet_id=tweet_id,
)
print(f"Tweeted: {response.data['text']}")
except Exception as e:
print(f"Error Tweeting: {e}")
sys.stdout.flush()
hooks.run(keep_alive=True)
emp-agents
emp-hooks
Lastly, you need a Dockerfile to build and deploy the agent.
FROM python:3.12-alpine
WORKDIR /app
COPY . .
RUN python -m pip install -r requirements.txt
CMD ["python", "demo.py"]
Build and deploy the agent
empctl build --push --deploy --service myservice
Congratulations! You just deployed your first AI agent.
Wondering how the tweet arrives at the agent?
The flow involves the emp_hooks
library and custom k8s controllers that are watching all the events on the projects and interact with the Twitter API.
Tweets are being read from the Twitter API Stream and then pushed to the project's SQS.fifo queue, which then is read from the emp_hooks
library again and passed to the agent.
Last updated