1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import asyncio
import urllib.parse
import asgiref.compatibility
import asgiref.wsgi
from mitmproxy import ctx, http
class ASGIApp:
"""
An addon that hosts an ASGI/WSGI HTTP app within mitmproxy, at a specified hostname and port.
Some important caveats:
- This implementation will block and wait until the entire HTTP response is completed before sending out data.
- It currently only implements the HTTP protocol (Lifespan and WebSocket are unimplemented).
"""
def __init__(self, asgi_app, host: str, port: int):
asgi_app = asgiref.compatibility.guarantee_single_callable(asgi_app)
self.asgi_app, self.host, self.port = asgi_app, host, port
@property
def name(self) -> str:
return f"asgiapp:{self.host}:{self.port}"
def request(self, flow: http.HTTPFlow) -> None:
assert flow.reply
if (flow.request.pretty_host, flow.request.port) == (self.host, self.port) and not flow.reply.has_message:
flow.reply.take() # pause hook completion
asyncio.ensure_future(serve(self.asgi_app, flow))
class WSGIApp(ASGIApp):
def __init__(self, wsgi_app, host: str, port: int):
asgi_app = asgiref.wsgi.WsgiToAsgi(wsgi_app)
super().__init__(asgi_app, host, port)
HTTP_VERSION_MAP = {
"HTTP/1.0": "1.0",
"HTTP/1.1": "1.1",
"HTTP/2.0": "2",
}
def make_scope(flow: http.HTTPFlow) -> dict:
# %3F is a quoted question mark
quoted_path = urllib.parse.quote_from_bytes(flow.request.data.path).split("%3F", maxsplit=1)
# (Unicode string) – HTTP request target excluding any query string, with percent-encoded
# sequences and UTF-8 byte sequences decoded into characters.
path = quoted_path[0]
# (byte string) – URL portion after the ?, percent-encoded.
query_string: bytes
if len(quoted_path) > 1:
query_string = quoted_path[1].encode()
else:
query_string = b""
return {
"type": "http",
"asgi": {
"version": "3.0",
"spec_version": "2.1",
},
"http_version": HTTP_VERSION_MAP.get(flow.request.http_version, "1.1"),
"method": flow.request.method,
"scheme": flow.request.scheme,
"path": path,
"raw_path": flow.request.path,
"query_string": query_string,
"headers": list(list(x) for x in flow.request.headers.fields),
"client": flow.client_conn.address,
"extensions": {
"mitmproxy.master": ctx.master,
}
}
async def serve(app, flow: http.HTTPFlow):
"""
Serves app on flow.
"""
assert flow.reply
scope = make_scope(flow)
done = asyncio.Event()
received_body = False
async def receive():
nonlocal received_body
if not received_body:
received_body = True
return {
"type": "http.request",
"body": flow.request.raw_content,
}
else: # pragma: no cover
# We really don't expect this to be called a second time, but what to do?
# We just wait until the request is done before we continue here with sending a disconnect.
await done.wait()
return {
"type": "http.disconnect"
}
async def send(event):
if event["type"] == "http.response.start":
flow.response = http.HTTPResponse.make(event["status"], b"", event.get("headers", []))
flow.response.decode()
elif event["type"] == "http.response.body":
flow.response.content += event.get("body", b"")
if not event.get("more_body", False):
flow.reply.ack()
else:
raise AssertionError(f"Unexpected event: {event['type']}")
try:
await app(scope, receive, send)
if not flow.reply.has_message:
raise RuntimeError(f"no response sent.")
except Exception as e:
ctx.log.error(f"Error in asgi app: {e}")
flow.response = http.HTTPResponse.make(500, b"ASGI Error.")
flow.reply.ack(force=True)
finally:
flow.reply.commit()
done.set()