-
Notifications
You must be signed in to change notification settings - Fork 88
Add AI bot classification for event enrichment #155
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
jaredmixpanel
wants to merge
4
commits into
master
Choose a base branch
from
feature/ai-bot-classification
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
93693ef
test: add AI bot classification tests
jaredmixpanel e42c8a9
feat: implement AI bot classifier
jaredmixpanel 01a3902
feat: add bot classification middleware/integration
jaredmixpanel 0b6e320
fix: add missing category assertions in bot tests
jaredmixpanel File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,162 @@ | ||
| # mixpanel/ai_bot_classifier.py | ||
| """AI bot user-agent classification for Mixpanel events.""" | ||
|
|
||
| import re | ||
| from typing import Any, Callable, Dict, List, Optional | ||
|
|
||
| AI_BOT_DATABASE: List[Dict[str, Any]] = [ | ||
| { | ||
| 'pattern': re.compile(r'GPTBot/', re.IGNORECASE), | ||
| 'name': 'GPTBot', | ||
| 'provider': 'OpenAI', | ||
| 'category': 'indexing', | ||
| 'description': 'OpenAI web crawler for model training data', | ||
| }, | ||
| { | ||
| 'pattern': re.compile(r'ChatGPT-User/', re.IGNORECASE), | ||
| 'name': 'ChatGPT-User', | ||
| 'provider': 'OpenAI', | ||
| 'category': 'retrieval', | ||
| 'description': 'ChatGPT real-time retrieval for user queries (RAG)', | ||
| }, | ||
| { | ||
| 'pattern': re.compile(r'OAI-SearchBot/', re.IGNORECASE), | ||
| 'name': 'OAI-SearchBot', | ||
| 'provider': 'OpenAI', | ||
| 'category': 'indexing', | ||
| 'description': 'OpenAI search indexing crawler', | ||
| }, | ||
| { | ||
| 'pattern': re.compile(r'ClaudeBot/', re.IGNORECASE), | ||
| 'name': 'ClaudeBot', | ||
| 'provider': 'Anthropic', | ||
| 'category': 'indexing', | ||
| 'description': 'Anthropic web crawler for model training', | ||
| }, | ||
| { | ||
| 'pattern': re.compile(r'Claude-User/', re.IGNORECASE), | ||
| 'name': 'Claude-User', | ||
| 'provider': 'Anthropic', | ||
| 'category': 'retrieval', | ||
| 'description': 'Claude real-time retrieval for user queries', | ||
| }, | ||
| { | ||
| 'pattern': re.compile(r'Google-Extended/', re.IGNORECASE), | ||
| 'name': 'Google-Extended', | ||
| 'provider': 'Google', | ||
| 'category': 'indexing', | ||
| 'description': 'Google AI training data crawler', | ||
| }, | ||
| { | ||
| 'pattern': re.compile(r'PerplexityBot/', re.IGNORECASE), | ||
| 'name': 'PerplexityBot', | ||
| 'provider': 'Perplexity', | ||
| 'category': 'retrieval', | ||
| 'description': 'Perplexity AI search crawler', | ||
| }, | ||
| { | ||
| 'pattern': re.compile(r'Bytespider/', re.IGNORECASE), | ||
| 'name': 'Bytespider', | ||
| 'provider': 'ByteDance', | ||
| 'category': 'indexing', | ||
| 'description': 'ByteDance/TikTok AI crawler', | ||
| }, | ||
| { | ||
| 'pattern': re.compile(r'CCBot/', re.IGNORECASE), | ||
| 'name': 'CCBot', | ||
| 'provider': 'Common Crawl', | ||
| 'category': 'indexing', | ||
| 'description': 'Common Crawl bot', | ||
| }, | ||
| { | ||
| 'pattern': re.compile(r'Applebot-Extended/', re.IGNORECASE), | ||
| 'name': 'Applebot-Extended', | ||
| 'provider': 'Apple', | ||
| 'category': 'indexing', | ||
| 'description': 'Apple AI/Siri training data crawler', | ||
| }, | ||
| { | ||
| 'pattern': re.compile(r'Meta-ExternalAgent/', re.IGNORECASE), | ||
| 'name': 'Meta-ExternalAgent', | ||
| 'provider': 'Meta', | ||
| 'category': 'indexing', | ||
| 'description': 'Meta/Facebook AI training data crawler', | ||
| }, | ||
| { | ||
| 'pattern': re.compile(r'cohere-ai/', re.IGNORECASE), | ||
| 'name': 'cohere-ai', | ||
| 'provider': 'Cohere', | ||
| 'category': 'indexing', | ||
| 'description': 'Cohere AI training data crawler', | ||
| }, | ||
| ] | ||
|
|
||
|
|
||
| def classify_user_agent(user_agent: Optional[str]) -> Dict[str, Any]: | ||
| """ | ||
| Classify a user-agent string against the AI bot database. | ||
|
|
||
| Args: | ||
| user_agent: The user-agent string to classify | ||
|
|
||
| Returns: | ||
| Dict with '$is_ai_bot' (always present) and optional | ||
| '$ai_bot_name', '$ai_bot_provider', '$ai_bot_category' | ||
| """ | ||
| if not user_agent or not isinstance(user_agent, str): | ||
| return {'$is_ai_bot': False} | ||
|
|
||
| for bot in AI_BOT_DATABASE: | ||
| if bot['pattern'].search(user_agent): | ||
| return { | ||
| '$is_ai_bot': True, | ||
| '$ai_bot_name': bot['name'], | ||
| '$ai_bot_provider': bot['provider'], | ||
| '$ai_bot_category': bot['category'], | ||
| } | ||
|
|
||
| return {'$is_ai_bot': False} | ||
|
|
||
|
|
||
| def create_classifier( | ||
| additional_bots: Optional[List[Dict[str, Any]]] = None, | ||
| ) -> Callable: | ||
| """ | ||
| Create a classifier with optional additional bot patterns. | ||
|
|
||
| Args: | ||
| additional_bots: Additional bot patterns (checked before built-ins). | ||
| Each entry must have 'pattern' (compiled regex), 'name', 'provider', 'category'. | ||
|
|
||
| Returns: | ||
| A classify_user_agent function. | ||
| """ | ||
| combined = list(additional_bots or []) + AI_BOT_DATABASE | ||
|
|
||
| def classifier(user_agent: Optional[str]) -> Dict[str, Any]: | ||
| if not user_agent or not isinstance(user_agent, str): | ||
| return {'$is_ai_bot': False} | ||
| for bot in combined: | ||
| if bot['pattern'].search(user_agent): | ||
| return { | ||
| '$is_ai_bot': True, | ||
| '$ai_bot_name': bot['name'], | ||
| '$ai_bot_provider': bot['provider'], | ||
| '$ai_bot_category': bot['category'], | ||
| } | ||
| return {'$is_ai_bot': False} | ||
|
|
||
| return classifier | ||
|
|
||
|
|
||
| def get_bot_database() -> List[Dict[str, str]]: | ||
| """Return a copy of the bot database for inspection.""" | ||
| return [ | ||
| { | ||
| 'name': bot['name'], | ||
| 'provider': bot['provider'], | ||
| 'category': bot['category'], | ||
| 'description': bot.get('description', ''), | ||
| } | ||
| for bot in AI_BOT_DATABASE | ||
| ] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,84 @@ | ||
| # mixpanel/ai_bot_consumer.py | ||
| """BotClassifyingConsumer wrapper for Mixpanel Python SDK.""" | ||
|
|
||
| import json | ||
| from typing import Any, Dict, List, Optional | ||
|
|
||
| from .ai_bot_classifier import classify_user_agent, create_classifier | ||
|
|
||
|
|
||
| def _json_dumps(data): | ||
| """Serialize data matching the SDK's json_dumps format.""" | ||
| return json.dumps(data, separators=(',', ':')) | ||
|
|
||
|
|
||
| class BotClassifyingConsumer: | ||
| """ | ||
| Consumer wrapper that classifies AI bots in tracked events. | ||
|
|
||
| Wraps any Mixpanel consumer (Consumer, BufferedConsumer, or custom) | ||
| and enriches event data with bot classification properties when | ||
| a user-agent string is present in the event properties. | ||
|
|
||
| Usage: | ||
| from mixpanel import Mixpanel, Consumer | ||
| from mixpanel.ai_bot_consumer import BotClassifyingConsumer | ||
|
|
||
| consumer = BotClassifyingConsumer(Consumer()) | ||
| mp = Mixpanel('YOUR_TOKEN', consumer=consumer) | ||
|
|
||
| mp.track('user_id', 'page_view', { | ||
| '$user_agent': request.headers.get('User-Agent'), | ||
| }) | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| base_consumer: Any, | ||
| user_agent_property: str = '$user_agent', | ||
| additional_bots: Optional[List[Dict[str, Any]]] = None, | ||
| ): | ||
| """ | ||
| Args: | ||
| base_consumer: The consumer to wrap (must have a send() method) | ||
| user_agent_property: Property name containing the user-agent string | ||
| additional_bots: Additional bot patterns (checked before built-ins) | ||
| """ | ||
| self._base = base_consumer | ||
| self._ua_prop = user_agent_property | ||
| self._classify = ( | ||
| create_classifier(additional_bots=additional_bots) | ||
| if additional_bots | ||
| else classify_user_agent | ||
| ) | ||
|
|
||
| def send( | ||
| self, | ||
| endpoint: str, | ||
| json_message: str, | ||
| api_key: Any = None, | ||
| api_secret: Any = None, | ||
| ) -> None: | ||
| """ | ||
| Intercept event messages, classify bot user-agents, and forward. | ||
|
|
||
| Only modifies 'events' endpoint messages. People, groups, and | ||
| imports pass through unmodified. | ||
| """ | ||
| if endpoint == 'events': | ||
| data = json.loads(json_message) | ||
| properties = data.get('properties', {}) | ||
| user_agent = properties.get(self._ua_prop) | ||
|
|
||
| if user_agent: | ||
| classification = self._classify(user_agent) | ||
| properties.update(classification) | ||
| data['properties'] = properties | ||
| json_message = _json_dumps(data) | ||
|
|
||
| self._base.send(endpoint, json_message, api_key, api_secret) | ||
|
|
||
| def flush(self) -> None: | ||
| """Proxy flush to the wrapped consumer if available.""" | ||
| if hasattr(self._base, 'flush'): | ||
| self._base.flush() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,79 @@ | ||
| # mixpanel/ai_bot_helpers.py | ||
| """Framework integration helpers for AI bot classification.""" | ||
|
|
||
| from typing import Any, Dict, Optional | ||
|
|
||
|
|
||
| def extract_request_context_django(request: Any) -> Dict[str, str]: | ||
| """ | ||
| Extract user-agent and IP from a Django HttpRequest. | ||
|
|
||
| Usage: | ||
| from mixpanel.ai_bot_helpers import extract_request_context_django | ||
|
|
||
| mp.track('user_id', 'page_view', { | ||
| **extract_request_context_django(request), | ||
| 'page_url': request.path, | ||
| }) | ||
| """ | ||
| ctx = {} | ||
| ua = request.META.get('HTTP_USER_AGENT') | ||
| if ua: | ||
| ctx['$user_agent'] = ua | ||
|
|
||
| # Django's get_host() + REMOTE_ADDR | ||
| ip = ( | ||
| request.META.get('HTTP_X_FORWARDED_FOR', '').split(',')[0].strip() | ||
| or request.META.get('REMOTE_ADDR') | ||
| ) | ||
| if ip: | ||
| ctx['$ip'] = ip | ||
|
|
||
| return ctx | ||
|
|
||
|
|
||
| def extract_request_context_flask(request: Any) -> Dict[str, str]: | ||
| """ | ||
| Extract user-agent and IP from a Flask request. | ||
|
|
||
| Usage: | ||
| from mixpanel.ai_bot_helpers import extract_request_context_flask | ||
|
|
||
| mp.track('user_id', 'page_view', { | ||
| **extract_request_context_flask(request), | ||
| 'page_url': request.path, | ||
| }) | ||
| """ | ||
| ctx = {} | ||
| ua = request.headers.get('User-Agent') | ||
| if ua: | ||
| ctx['$user_agent'] = ua | ||
|
|
||
| ip = request.remote_addr | ||
| if ip: | ||
| ctx['$ip'] = ip | ||
|
|
||
| return ctx | ||
|
|
||
|
|
||
| def extract_request_context_fastapi(request: Any) -> Dict[str, str]: | ||
| """ | ||
| Extract user-agent and IP from a FastAPI/Starlette Request. | ||
|
|
||
| Usage: | ||
| from mixpanel.ai_bot_helpers import extract_request_context_fastapi | ||
|
|
||
| mp.track('user_id', 'page_view', { | ||
| **extract_request_context_fastapi(request), | ||
| 'page_url': str(request.url), | ||
| }) | ||
| """ | ||
| ctx = {} | ||
| ua = request.headers.get('user-agent') | ||
| if ua: | ||
| ctx['$user_agent'] = ua | ||
|
|
||
| if request.client: | ||
| ctx['$ip'] = request.client.host | ||
|
|
||
| return ctx | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.