Custom Agents
Build sophisticated AI agents that leverage LookC's company research capabilities through custom implementations using Langchain and other agent frameworks.
Overview
Custom agent development with LookC MCP server enables you to create specialized AI workflows tailored to your specific company research and business intelligence needs. Whether you're building prospect research agents, competitive analysis tools, or market intelligence systems, the LookC MCP server provides the foundation for reliable, real-time company data access.
Langchain Integration
Seamless integration with Langchain's agent framework for complex reasoning workflows.
Custom Tool Chains
Combine LookC tools with other data sources for comprehensive business intelligence.
Production Ready
Built-in error handling, rate limiting, and monitoring for enterprise deployments.
Langchain Integration
Langchain provides a robust framework for building agents that can reason about when and how to use LookC's company research tools.
Installation and Setup
Langchain with LookC MCP
pip install langchain
pip install langchain-openai # or your preferred LLM provider
pip install requests
pip install pydantic
Agent Configuration
Langchain agent setup
from langchain.agents import initialize_agent, AgentType
from langchain.llms import OpenAI
from langchain.memory import ConversationBufferMemory
class CompanyResearchAgent:
def __init__(self, lookc_api_key: str, openai_api_key: str):
# Initialize LookC tools
self.employee_tool = LookCEmployeeSearchTool(lookc_api_key)
self.org_tool = LookCOrganizationTool(lookc_api_key)
# Initialize LLM
self.llm = OpenAI(
api_key=openai_api_key,
temperature=0.1,
model_name="gpt-4"
)
# Initialize memory
self.memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
# Create agent
self.agent = initialize_agent(
tools=[self.employee_tool, self.org_tool],
llm=self.llm,
agent=AgentType.CHAT_CONVERSATIONAL_REACT_DESCRIPTION,
memory=self.memory,
verbose=True,
handle_parsing_errors=True
)
def research_company(self, query: str) -> str:
"""Research a company based on natural language query."""
return self.agent.run(query)
def analyze_competitors(self, companies: List[str]) -> str:
"""Analyze multiple companies for competitive intelligence."""
analysis_prompt = f"""
Analyze the following companies for competitive intelligence:
{', '.join(companies)}
For each company:
1. Get organization information
2. Find engineering and technical leadership
3. Analyze team structure and size
4. Identify key personnel and their backgrounds
Provide a comprehensive competitive analysis comparing:
- Team sizes and structure
- Technical leadership experience
- Hiring patterns and growth
- Competitive positioning
"""
return self.agent.run(analysis_prompt)
# Usage example
agent = CompanyResearchAgent(
lookc_api_key="your_lookc_api_key",
openai_api_key="your_openai_api_key"
)
# Research a specific company
result = agent.research_company(
"Research the engineering team at Stripe. Focus on their payments infrastructure team and technical leadership."
)
print(result)
Custom Agent Patterns
Common patterns for building specialized agents with LookC integration.
Prospect Research Agent
Prospect research pattern
class ProspectResearchAgent:
def __init__(self, lookc_api_key: str, llm):
self.lookc_tools = [
LookCEmployeeSearchTool(lookc_api_key),
LookCOrganizationTool(lookc_api_key)
]
self.llm = llm
async def research_prospect(self, company_url: str, target_roles: List[str]) -> Dict[str, Any]:
"""Research prospects at a target company."""
# Step 1: Get company information
org_info = await self._get_organization_info(company_url)
# Step 2: Find decision makers
decision_makers = await self._find_decision_makers(
org_info["orgId"],
target_roles
)
# Step 3: Analyze team structure
team_analysis = await self._analyze_team_structure(
org_info["orgId"]
)
# Step 4: Generate insights
insights = await self._generate_insights(
org_info, decision_makers, team_analysis
)
return {
"company": org_info,
"prospects": decision_makers,
"team_analysis": team_analysis,
"insights": insights,
"recommended_approach": await self._recommend_approach(insights)
}
async def _find_decision_makers(self, org_id: str, target_roles: List[str]) -> List[Dict]:
"""Find decision makers based on target roles."""
role_pattern = "|".join(target_roles)
employees = await self.lookc_tools[0]._run(
org_id=org_id,
title_regex=role_pattern,
seniority_levels=["PARTNER_CXO", "VP_DIRECTOR"],
limit=100
)
return json.loads(employees)["employees"]
async def _generate_insights(self, org_info, decision_makers, team_analysis):
"""Generate strategic insights using LLM."""
prompt = f"""
Based on the following company research data, generate strategic insights:
Company: {org_info["name"]}
Decision Makers: {len(decision_makers)} found
Team Structure: {team_analysis}
Provide insights on:
1. Company growth stage and trajectory
2. Decision maker accessibility and influence
3. Technical priorities based on team structure
4. Optimal engagement strategy
"""
return await self.llm.agenerate([prompt])
Market Intelligence Agent
Market intelligence pattern
class MarketIntelligenceAgent:
def __init__(self, lookc_api_key: str, llm):
self.lookc_tools = [
LookCEmployeeSearchTool(lookc_api_key),
LookCOrganizationTool(lookc_api_key)
]
self.llm = llm
async def analyze_market_segment(self, companies: List[str], focus_areas: List[str]) -> Dict[str, Any]:
"""Analyze a market segment across multiple companies."""
company_data = []
for company_url in companies:
data = await self._analyze_single_company(company_url, focus_areas)
company_data.append(data)
market_analysis = await self._synthesize_market_data(company_data, focus_areas)
return {
"companies": company_data,
"market_trends": market_analysis["trends"],
"competitive_landscape": market_analysis["landscape"],
"opportunities": market_analysis["opportunities"],
"recommendations": market_analysis["recommendations"]
}
async def _analyze_single_company(self, company_url: str, focus_areas: List[str]) -> Dict[str, Any]:
"""Analyze a single company for market intelligence."""
# Get organization info
org_info = json.loads(await self.lookc_tools[1]._run(company_url))
# Analyze each focus area
focus_data = {}
for area in focus_areas:
employees = json.loads(await self.lookc_tools[0]._run(
org_id=org_info["orgId"],
title_regex=area.lower(),
limit=200
))
focus_data[area] = {
"team_size": len(employees["employees"]),
"seniority_distribution": self._analyze_seniority(employees["employees"]),
"key_personnel": [emp for emp in employees["employees"]
if emp["seniority_level"] in ["PARTNER_CXO", "VP_DIRECTOR"]]
}
return {
"company": org_info,
"focus_areas": focus_data,
"total_employees": sum(data["team_size"] for data in focus_data.values())
}
def _analyze_seniority(self, employees: List[Dict]) -> Dict[str, int]:
"""Analyze seniority distribution."""
distribution = {}
for emp in employees:
level = emp.get("seniority_level", "unknown")
distribution[level] = distribution.get(level, 0) + 1
return distribution
Sales Intelligence Agent
Sales intelligence pattern
class SalesIntelligenceAgent:
def __init__(self, lookc_api_key: str, llm):
self.lookc_tools = [
LookCEmployeeSearchTool(lookc_api_key),
LookCOrganizationTool(lookc_api_key)
]
self.llm = llm
async def generate_account_plan(self, company_url: str, solution_focus: str) -> Dict[str, Any]:
"""Generate a comprehensive account plan for sales."""
# Research company and key stakeholders
org_info = json.loads(await self.lookc_tools[1]._run(company_url))
# Find relevant stakeholders based on solution focus
stakeholders = await self._find_stakeholders(org_info["orgId"], solution_focus)
# Analyze buying committee
buying_committee = await self._analyze_buying_committee(stakeholders)
# Generate engagement strategy
strategy = await self._generate_engagement_strategy(
org_info, buying_committee, solution_focus
)
return {
"account_overview": org_info,
"stakeholders": stakeholders,
"buying_committee": buying_committee,
"engagement_strategy": strategy,
"next_steps": await self._recommend_next_steps(strategy)
}
async def _find_stakeholders(self, org_id: str, solution_focus: str) -> List[Dict]:
"""Find relevant stakeholders based on solution focus."""
# Map solution focus to relevant roles
role_mapping = {
"security": ["security|ciso|infosec|compliance"],
"infrastructure": ["infrastructure|devops|platform|sre"],
"data": ["data|analytics|ml|ai"],
"development": ["engineer|developer|architect|tech"]
}
role_pattern = role_mapping.get(solution_focus.lower(), solution_focus)
employees = json.loads(await self.lookc_tools[0]._run(
org_id=org_id,
title_regex=role_pattern,
seniority_levels=["PARTNER_CXO", "VP_DIRECTOR", "MANAGER"],
limit=150
))
return employees["employees"]
async def _analyze_buying_committee(self, stakeholders: List[Dict]) -> Dict[str, Any]:
"""Analyze the buying committee structure."""
committee = {
"economic_buyer": [],
"technical_buyer": [],
"user_buyer": [],
"coach": []
}
for stakeholder in stakeholders:
title = stakeholder["title"].lower()
seniority = stakeholder["seniority_level"]
if seniority == "PARTNER_CXO":
committee["economic_buyer"].append(stakeholder)
elif "director" in title or "vp" in title:
committee["technical_buyer"].append(stakeholder)
elif seniority == "manager":
committee["user_buyer"].append(stakeholder)
return committee
Production Deployment
Best practices for deploying custom agents with LookC integration in production environments.
Error Handling and Resilience
Production-ready implementation
import asyncio
import logging
from typing import Optional, Dict, Any
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass
class LookCConfig:
api_key: str
endpoint: str = "https://api.lookc.io/mcp"
timeout: int = 30
max_retries: int = 3
rate_limit_per_minute: int = 100
class ProductionLookCAgent:
def __init__(self, config: LookCConfig, llm):
self.config = config
self.llm = llm
self.request_times = []
self.logger = logging.getLogger(__name__)
async def call_lookc_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Call LookC tool with production-grade error handling."""
# Rate limiting
await self._enforce_rate_limit()
# Retry logic with exponential backoff
for attempt in range(self.config.max_retries):
try:
result = await self._make_request(tool_name, arguments)
self.logger.info(f"Successfully called {tool_name}")
return result
except Exception as e:
wait_time = 2 ** attempt
self.logger.warning(f"Attempt {attempt + 1} failed for {tool_name}: {e}")
if attempt < self.config.max_retries - 1:
await asyncio.sleep(wait_time)
else:
self.logger.error(f"All attempts failed for {tool_name}")
raise
return None
async def _enforce_rate_limit(self):
"""Enforce rate limiting to prevent API throttling."""
now = datetime.now()
minute_ago = now - timedelta(minutes=1)
# Remove old requests
self.request_times = [t for t in self.request_times if t > minute_ago]
# Check if we're at the limit
if len(self.request_times) >= self.config.rate_limit_per_minute:
sleep_time = 60 - (now - self.request_times[0]).seconds
await asyncio.sleep(sleep_time)
self.request_times.append(now)
async def _make_request(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Make the actual HTTP request to LookC MCP server."""
import aiohttp
payload = {
"jsonrpc": "2.0",
"id": int(datetime.now().timestamp()),
"method": "tools/call",
"params": {
"name": tool_name,
"arguments": arguments
}
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.config.api_key}"
}
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.config.timeout)) as session:
async with session.post(self.config.endpoint, json=payload, headers=headers) as response:
response.raise_for_status()
result = await response.json()
if "error" in result:
raise Exception(f"LookC API error: {result['error']}")
return result["result"]
Monitoring and Observability
Monitoring setup
import time
from prometheus_client import Counter, Histogram, Gauge
from functools import wraps
# Metrics
lookc_requests_total = Counter('lookc_requests_total', 'Total LookC API requests', ['tool', 'status'])
lookc_request_duration = Histogram('lookc_request_duration_seconds', 'LookC API request duration')
lookc_rate_limit_gauge = Gauge('lookc_rate_limit_remaining', 'Remaining rate limit')
def monitor_lookc_calls(func):
"""Decorator to monitor LookC API calls."""
@wraps(func)
async def wrapper(self, tool_name: str, *args, **kwargs):
start_time = time.time()
try:
result = await func(self, tool_name, *args, **kwargs)
lookc_requests_total.labels(tool=tool_name, status='success').inc()
return result
except Exception as e:
lookc_requests_total.labels(tool=tool_name, status='error').inc()
raise
finally:
duration = time.time() - start_time
lookc_request_duration.observe(duration)
return wrapper
class MonitoredLookCAgent(ProductionLookCAgent):
@monitor_lookc_calls
async def call_lookc_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Optional[Dict[str, Any]]:
return await super().call_lookc_tool(tool_name, arguments)
Best Practices
Async Operations
Use async/await patterns for better performance when making multiple LookC API calls.
Caching Strategy
Implement intelligent caching for organization data that doesn't change frequently.
For more integration examples, see our LLM Frameworks guide and Development Tools documentation.
