Files
edx-platform/lms/djangoapps/verify_student/ssencrypt.py
Diana Huang 4ee9ef61cf Clean up some old pep8/pylint violations
Also, deletes some unused code.
2013-09-24 17:12:31 -04:00

209 lines
6.8 KiB
Python

"""
NOTE: Anytime a `key` is passed into a function here, we assume it's a raw byte
string. It should *not* be a string representation of a hex value. In other
words, passing the `str` value of
`"32fe72aaf2abb44de9e161131b5435c8d37cbdb6f5df242ae860b283115f2dae"` is bad.
You want to pass in the result of calling .decode('hex') on that, so this instead:
"'2\xfer\xaa\xf2\xab\xb4M\xe9\xe1a\x13\x1bT5\xc8\xd3|\xbd\xb6\xf5\xdf$*\xe8`\xb2\x83\x11_-\xae'"
The RSA functions take any key format that RSA.importKey() accepts, so...
An RSA public key can be in any of the following formats:
* X.509 subjectPublicKeyInfo DER SEQUENCE (binary or PEM encoding)
* PKCS#1 RSAPublicKey DER SEQUENCE (binary or PEM encoding)
* OpenSSH (textual public key only)
An RSA private key can be in any of the following formats:
* PKCS#1 RSAPrivateKey DER SEQUENCE (binary or PEM encoding)
* PKCS#8 PrivateKeyInfo DER SEQUENCE (binary or PEM encoding)
* OpenSSH (textual public key only)
In case of PEM encoding, the private key can be encrypted with DES or 3TDES
according to a certain pass phrase. Only OpenSSL-compatible pass phrases are
supported.
"""
from hashlib import md5, sha256
import base64
import binascii
import hmac
import logging
from Crypto import Random
from Crypto.Cipher import AES, PKCS1_OAEP
from Crypto.PublicKey import RSA
log = logging.getLogger(__name__)
def encrypt_and_encode(data, key):
""" Encrypts and endcodes `data` using `key' """
return base64.urlsafe_b64encode(aes_encrypt(data, key))
def decode_and_decrypt(encoded_data, key):
""" Decrypts and decodes `data` using `key' """
return aes_decrypt(base64.urlsafe_b64decode(encoded_data), key)
def aes_encrypt(data, key):
"""
Return a version of the `data` that has been encrypted to
"""
cipher = aes_cipher_from_key(key)
padded_data = pad(data)
return cipher.encrypt(padded_data)
def aes_decrypt(encrypted_data, key):
"""
Decrypt `encrypted_data` using `key`
"""
cipher = aes_cipher_from_key(key)
padded_data = cipher.decrypt(encrypted_data)
return unpad(padded_data)
def aes_cipher_from_key(key):
"""
Given an AES key, return a Cipher object that has `encrypt()` and
`decrypt()` methods. It will create the cipher to use CBC mode, and create
the initialization vector as Software Secure expects it.
"""
return AES.new(key, AES.MODE_CBC, generate_aes_iv(key))
def generate_aes_iv(key):
"""
Return the initialization vector Software Secure expects for a given AES
key (they hash it a couple of times and take a substring).
"""
return md5(key + md5(key).hexdigest()).hexdigest()[:AES.block_size]
def random_aes_key():
return Random.new().read(32)
def pad(data):
""" Pad the given `data` such that it fits into the proper AES block size """
bytes_to_pad = AES.block_size - len(data) % AES.block_size
return data + (bytes_to_pad * chr(bytes_to_pad))
def unpad(padded_data):
""" remove all padding from `padded_data` """
num_padded_bytes = ord(padded_data[-1])
return padded_data[:-num_padded_bytes]
def rsa_encrypt(data, rsa_pub_key_str):
"""
`rsa_pub_key` is a string with the public key
"""
key = RSA.importKey(rsa_pub_key_str)
cipher = PKCS1_OAEP.new(key)
encrypted_data = cipher.encrypt(data)
return encrypted_data
def rsa_decrypt(data, rsa_priv_key_str):
"""
When given some `data` and an RSA private key, decrypt the data
"""
key = RSA.importKey(rsa_priv_key_str)
cipher = PKCS1_OAEP.new(key)
return cipher.decrypt(data)
def has_valid_signature(method, headers_dict, body_dict, access_key, secret_key):
"""
Given a message (either request or response), say whether it has a valid
signature or not.
"""
_, expected_signature, _ = generate_signed_message(
method, headers_dict, body_dict, access_key, secret_key
)
authorization = headers_dict["Authorization"]
auth_token, post_signature = authorization.split(":")
_, post_access_key = auth_token.split()
if post_access_key != access_key:
log.error("Posted access key does not match ours")
log.debug("Their access: %s; Our access: %s", post_access_key, access_key)
return False
if post_signature != expected_signature:
log.error("Posted signature does not match expected")
log.debug("Their sig: %s; Expected: %s", post_signature, expected_signature)
return False
return True
def generate_signed_message(method, headers_dict, body_dict, access_key, secret_key):
"""
Returns a (message, signature) pair.
"""
message = signing_format_message(method, headers_dict, body_dict)
# hmac needs a byte string for it's starting key, can't be unicode.
hashed = hmac.new(secret_key.encode('utf-8'), message, sha256)
signature = binascii.b2a_base64(hashed.digest()).rstrip('\n')
authorization_header = "SSI {}:{}".format(access_key, signature)
message += '\n'
return message, signature, authorization_header
def signing_format_message(method, headers_dict, body_dict):
"""
Given a dictionary of headers and a dictionary of the JSON for the body,
will return a str that represents the normalized version of this messsage
that will be used to generate a signature.
"""
headers_str = "{}\n\n{}".format(method, header_string(headers_dict))
body_str = body_string(body_dict)
message = headers_str + body_str
return message
def header_string(headers_dict):
"""Given a dictionary of headers, return a canonical string representation."""
header_list = []
if 'Content-Type' in headers_dict:
header_list.append(headers_dict['Content-Type'] + "\n")
if 'Date' in headers_dict:
header_list.append(headers_dict['Date'] + "\n")
if 'Content-MD5' in headers_dict:
header_list.append(headers_dict['Content-MD5'] + "\n")
return "".join(header_list) # Note that trailing \n's are important
def body_string(body_dict, prefix=""):
"""
Return a canonical string representation of the body of a JSON request or
response. This canonical representation will be used as an input to the
hashing used to generate a signature.
"""
body_list = []
for key, value in sorted(body_dict.items()):
if isinstance(value, (list, tuple)):
for i, arr in enumerate(value):
if isinstance(arr, dict):
body_list.append(body_string(arr, u"{}.{}.".format(key, i)))
else:
body_list.append(u"{}.{}:{}\n".format(key, i, arr).encode('utf-8'))
elif isinstance(value, dict):
body_list.append(body_string(value, key + ":"))
else:
if value is None:
value = "null"
body_list.append(u"{}{}:{}\n".format(prefix, key, value).encode('utf-8'))
return "".join(body_list) # Note that trailing \n's are important