1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import mimetypes
import re
import typing
import urllib.parse
from pathlib import Path
from werkzeug.security import safe_join
from mitmproxy import ctx, exceptions, flowfilter, http, version
from mitmproxy.utils.spec import parse_spec
class MapLocalSpec(typing.NamedTuple):
matches: flowfilter.TFilter
regex: str
local_path: Path
def parse_map_local_spec(option: str) -> MapLocalSpec:
filter, regex, replacement = parse_spec(option)
try:
re.compile(regex)
except re.error as e:
raise ValueError(f"Invalid regular expression {regex!r} ({e})")
try:
path = Path(replacement).expanduser().resolve(strict=True)
except FileNotFoundError as e:
raise ValueError(f"Invalid file path: {replacement} ({e})")
return MapLocalSpec(filter, regex, path)
def _safe_path_join(root: Path, untrusted: str) -> Path:
"""Join a Path element with an untrusted str.
This is a convenience wrapper for werkzeug's safe_join,
raising a ValueError if the path is malformed."""
untrusted_parts = Path(untrusted).parts
joined = safe_join(
root.as_posix(),
*untrusted_parts
)
if joined is None:
raise ValueError("Untrusted paths.")
return Path(joined)
def file_candidates(url: str, spec: MapLocalSpec) -> typing.List[Path]:
"""
Get all potential file candidates given a URL and a mapping spec ordered by preference.
This function already assumes that the spec regex matches the URL.
"""
m = re.search(spec.regex, url)
assert m
if m.groups():
suffix = m.group(1)
else:
suffix = re.split(spec.regex, url, maxsplit=1)[1]
suffix = suffix.split("?")[0] # remove query string
suffix = suffix.strip("/")
if suffix:
decoded_suffix = urllib.parse.unquote(suffix)
suffix_candidates = [decoded_suffix, f"{decoded_suffix}/index.html"]
escaped_suffix = re.sub(r"[^0-9a-zA-Z\-_.=(),/]", "_", decoded_suffix)
if decoded_suffix != escaped_suffix:
suffix_candidates.extend([escaped_suffix, f"{escaped_suffix}/index.html"])
try:
return [
_safe_path_join(spec.local_path, x)
for x in suffix_candidates
]
except ValueError:
return []
else:
return [spec.local_path / "index.html"]
class MapLocal:
def __init__(self):
self.replacements: typing.List[MapLocalSpec] = []
def load(self, loader):
loader.add_option(
"map_local", typing.Sequence[str], [],
"""
Map remote resources to a local file using a pattern of the form
"[/flow-filter]/url-regex/file-or-directory-path", where the
separator can be any character.
"""
)
def configure(self, updated):
if "map_local" in updated:
self.replacements = []
for option in ctx.options.map_local:
try:
spec = parse_map_local_spec(option)
except ValueError as e:
raise exceptions.OptionsError(f"Cannot parse map_local option {option}: {e}") from e
self.replacements.append(spec)
def request(self, flow: http.HTTPFlow) -> None:
if flow.reply and flow.reply.has_message:
return
url = flow.request.pretty_url
all_candidates = []
for spec in self.replacements:
if spec.matches(flow) and re.search(spec.regex, url):
if spec.local_path.is_file():
candidates = [spec.local_path]
else:
candidates = file_candidates(url, spec)
all_candidates.extend(candidates)
local_file = None
for candidate in candidates:
if candidate.is_file():
local_file = candidate
break
if local_file:
headers = {
"Server": version.MITMPROXY
}
mimetype = mimetypes.guess_type(str(local_file))[0]
if mimetype:
headers["Content-Type"] = mimetype
try:
contents = local_file.read_bytes()
except IOError as e:
ctx.log.warn(f"Could not read file: {e}")
continue
flow.response = http.HTTPResponse.make(
200,
contents,
headers
)
# only set flow.response once, for the first matching rule
return
if all_candidates:
flow.response = http.HTTPResponse.make(404)
ctx.log.info(f"None of the local file candidates exist: {', '.join(str(x) for x in all_candidates)}")