1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# ~*~ coding: utf-8 ~*~
#
from __future__ import unicode_literals
import logging
import os
import re
import uuid
from django.conf import settings
from django.contrib.auth.mixins import UserPassesTestMixin
from django.urls import reverse_lazy
from django.utils.translation import ugettext as _
from paramiko.rsakey import RSAKey
from common.tasks import send_mail_async
from common.utils import reverse, get_object_or_none
from .models import User
try:
import cStringIO as StringIO
except ImportError:
import StringIO
logger = logging.getLogger('jumpserver')
class AdminUserRequiredMixin(UserPassesTestMixin):
login_url = reverse_lazy('users:login')
def test_func(self):
return self.request.user.is_staff
def user_add_success_next(user):
subject = _('Create account successfully')
recipient_list = [user.email]
message = _("""
Hello %(name)s:
</br>
Your account has been created successfully
</br>
<a href="%(rest_password_url)s?token=%(rest_password_token)s">click here to set your password</a>
</br>
This link is valid for 1 hour. After it expires, <a href="%(forget_password_url)s?email=%(email)s">request new one</a>
</br>
---
</br>
<a href="%(login_url)s">Login direct</a>
</br>
""") % {
'name': user.name,
'rest_password_url': reverse('users:reset-password', external=True),
'rest_password_token': user.generate_reset_token(),
'forget_password_url': reverse('users:forgot-password', external=True),
'email': user.email,
'login_url': reverse('users:login', external=True),
}
send_mail_async.delay(subject, message, recipient_list, html_message=message)
def send_reset_password_mail(user):
subject = _('Reset password')
recipient_list = [user.email]
message = _("""
Hello %(name)s:
</br>
Please click the link below to reset your password, if not your request, concern your account security
</br>
<a href="%(rest_password_url)s?token=%(rest_password_token)s">Click here reset password</a>
</br>
This link is valid for 1 hour. After it expires, <a href="%(forget_password_url)s?email=%(email)s">request new one</a>
</br>
---
</br>
<a href="%(login_url)s">Login direct</a>
</br>
""") % {
'name': user.name,
'rest_password_url': reverse('users:reset-password', external=True),
'rest_password_token': user.generate_reset_token(),
'forget_password_url': reverse('users:forgot-password', external=True),
'email': user.email,
'login_url': reverse('users:login', external=True),
}
if settings.DEBUG:
logger.debug(message)
send_mail_async.delay(subject, message, recipient_list, html_message=message)
def send_reset_ssh_key_mail(user):
subject = _('SSH Key Reset')
recipient_list = [user.email]
message = _("""
Hello %(name)s:
</br>
Your ssh public key has been reset by site administrator.
Please login and reset your ssh public key.
</br>
<a href="%(login_url)s">Login direct</a>
</br>
""") % {
'name': user.name,
'login_url': reverse('users:login', external=True),
}
if settings.DEBUG:
logger.debug(message)
send_mail_async.delay(subject, message, recipient_list, html_message=message)
# def validate_ssh_pk(text):
# """
# Expects a SSH private key as string.
# Returns a boolean and a error message.
# If the text is parsed as private key successfully,
# (True,'') is returned. Otherwise,
# (False, <message describing the error>) is returned.
#
# from https://github.com/githubnemo/SSH-private-key-validator/blob/master/validate.py
#
# """
#
# if not text:
# return False, 'No text given'
#
# startPattern = re.compile("^-----BEGIN [A-Z]+ PRIVATE KEY-----")
# optionPattern = re.compile("^.+: .+")
# contentPattern = re.compile("^([a-zA-Z0-9+/]{64}|[a-zA-Z0-9+/]{1,64}[=]{0,2})$")
# endPattern = re.compile("^-----END [A-Z]+ PRIVATE KEY-----")
#
# def contentState(text):
# for i in range(0, len(text)):
# line = text[i]
#
# if endPattern.match(line):
# if i == len(text) - 1 or len(text[i + 1]) == 0:
# return True, ''
# else:
# return False, 'At end but content coming'
#
# elif not contentPattern.match(line):
# return False, 'Wrong string in content section'
#
# return False, 'No content or missing end line'
#
# def optionState(text):
# for i in range(0, len(text)):
# line = text[i]
#
# if line[-1:] == '\\':
# return optionState(text[i + 2:])
#
# if not optionPattern.match(line):
# return contentState(text[i + 1:])
#
# return False, 'Expected option, found nothing'
# def startState(text):
# if len(text) == 0 or not startPattern.match(text[0]):
# return False, 'Header is wrong'
# return optionState(text[1:])
#
# return startState([n.strip() for n in text.splitlines()])
#
def check_user_valid(**kwargs):
password = kwargs.pop('password', None)
public_key = kwargs.pop('public_key', None)
user = get_object_or_none(User, **kwargs)
if user is None or not user.is_valid:
return None
if password and user.check_password(password):
return user
if public_key:
public_key_saved = user.public_key.split()
if len(public_key_saved) == 1:
if public_key == public_key_saved[0]:
return user
elif len(public_key_saved) > 1:
if public_key == public_key_saved[1]:
return user
return None
def token_gen(*args, **kwargs):
return uuid.uuid4().get_hex()