PLAT-1199 Stop using pycrypto
This commit is contained in:
@@ -103,11 +103,9 @@ class NotificationPrefViewTest(UrlResetMixin, TestCase):
|
||||
self.assertRaises(PermissionDenied, ajax_enable, request)
|
||||
self.assertNotPrefExists(self.user)
|
||||
|
||||
@patch("Crypto.Random.new")
|
||||
def test_ajax_enable_success(self, mock_random_new):
|
||||
mock_stream = Mock()
|
||||
mock_stream.read.return_value = self.INITIALIZATION_VECTOR
|
||||
mock_random_new.return_value = mock_stream
|
||||
@patch("os.urandom")
|
||||
def test_ajax_enable_success(self, mock_urandom):
|
||||
mock_urandom.return_value = self.INITIALIZATION_VECTOR
|
||||
|
||||
def test_user(user):
|
||||
request = self.request_factory.post("dummy")
|
||||
|
||||
@@ -1,9 +1,19 @@
|
||||
"""
|
||||
Views to support notification preferences.
|
||||
"""
|
||||
|
||||
from __future__ import division
|
||||
|
||||
import json
|
||||
import os
|
||||
from base64 import urlsafe_b64decode, urlsafe_b64encode
|
||||
from hashlib import sha256
|
||||
|
||||
from Crypto import Random
|
||||
from Crypto.Cipher import AES
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher
|
||||
from cryptography.hazmat.primitives.ciphers.algorithms import AES
|
||||
from cryptography.hazmat.primitives.ciphers.modes import CBC
|
||||
from cryptography.hazmat.primitives.padding import PKCS7
|
||||
from django.conf import settings
|
||||
from django.contrib.auth.models import User
|
||||
from django.core.exceptions import PermissionDenied
|
||||
@@ -15,6 +25,8 @@ from notification_prefs import NOTIFICATION_PREF_KEY
|
||||
from openedx.core.djangoapps.user_api.models import UserPreference
|
||||
from openedx.core.djangoapps.user_api.preferences.api import delete_user_preference
|
||||
|
||||
AES_BLOCK_SIZE_BYTES = int(AES.block_size / 8)
|
||||
|
||||
|
||||
class UsernameDecryptionException(Exception):
|
||||
pass
|
||||
@@ -39,35 +51,20 @@ class UsernameCipher(object):
|
||||
decryption cipher
|
||||
5. base64url encode the result
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def _get_aes_cipher(initialization_vector):
|
||||
hash_ = sha256()
|
||||
hash_.update(settings.SECRET_KEY)
|
||||
return AES.new(hash_.digest(), AES.MODE_CBC, initialization_vector)
|
||||
|
||||
@staticmethod
|
||||
def _add_padding(input_str):
|
||||
"""Return `input_str` with PKCS#7 padding added to match AES block length"""
|
||||
padding_len = AES.block_size - len(input_str) % AES.block_size
|
||||
return input_str + padding_len * chr(padding_len)
|
||||
|
||||
@staticmethod
|
||||
def _remove_padding(input_str):
|
||||
"""Return `input_str` with PKCS#7 padding trimmed to match AES block length"""
|
||||
num_pad_bytes = ord(input_str[-1])
|
||||
if num_pad_bytes < 1 or num_pad_bytes > AES.block_size or num_pad_bytes >= len(input_str):
|
||||
raise UsernameDecryptionException("padding")
|
||||
return input_str[:-num_pad_bytes]
|
||||
return Cipher(AES(hash_.digest()), CBC(initialization_vector), backend=default_backend())
|
||||
|
||||
@staticmethod
|
||||
def encrypt(username):
|
||||
initialization_vector = Random.new().read(AES.block_size)
|
||||
initialization_vector = os.urandom(AES_BLOCK_SIZE_BYTES)
|
||||
aes_cipher = UsernameCipher._get_aes_cipher(initialization_vector)
|
||||
return urlsafe_b64encode(
|
||||
initialization_vector +
|
||||
aes_cipher.encrypt(UsernameCipher._add_padding(username.encode("utf-8")))
|
||||
)
|
||||
encryptor = aes_cipher.encryptor()
|
||||
padder = PKCS7(AES.block_size).padder()
|
||||
padded = padder.update(username.encode("utf-8")) + padder.finalize()
|
||||
return urlsafe_b64encode(initialization_vector + encryptor.update(padded) + encryptor.finalize())
|
||||
|
||||
@staticmethod
|
||||
def decrypt(token):
|
||||
@@ -76,19 +73,27 @@ class UsernameCipher(object):
|
||||
except TypeError:
|
||||
raise UsernameDecryptionException("base64url")
|
||||
|
||||
if len(base64_decoded) < AES.block_size:
|
||||
if len(base64_decoded) < AES_BLOCK_SIZE_BYTES:
|
||||
raise UsernameDecryptionException("initialization_vector")
|
||||
|
||||
initialization_vector = base64_decoded[:AES.block_size]
|
||||
aes_encrypted = base64_decoded[AES.block_size:]
|
||||
initialization_vector = base64_decoded[:AES_BLOCK_SIZE_BYTES]
|
||||
aes_encrypted = base64_decoded[AES_BLOCK_SIZE_BYTES:]
|
||||
aes_cipher = UsernameCipher._get_aes_cipher(initialization_vector)
|
||||
decryptor = aes_cipher.decryptor()
|
||||
unpadder = PKCS7(AES.block_size).unpadder()
|
||||
|
||||
try:
|
||||
decrypted = aes_cipher.decrypt(aes_encrypted)
|
||||
decrypted = decryptor.update(aes_encrypted) + decryptor.finalize()
|
||||
except ValueError:
|
||||
raise UsernameDecryptionException("aes")
|
||||
|
||||
return UsernameCipher._remove_padding(decrypted)
|
||||
try:
|
||||
unpadded = unpadder.update(decrypted) + unpadder.finalize()
|
||||
if len(unpadded) == 0:
|
||||
raise UsernameDecryptionException("padding")
|
||||
return unpadded
|
||||
except ValueError:
|
||||
raise UsernameDecryptionException("padding")
|
||||
|
||||
|
||||
def enable_notifications(user):
|
||||
|
||||
@@ -608,7 +608,7 @@ class SoftwareSecurePhotoVerification(PhotoVerification):
|
||||
both Software Secure and edx-platform.
|
||||
|
||||
2. The snapshot of a user's photo ID is also encrypted using AES-256, but
|
||||
the key is randomly generated using pycrypto's Random. Every verification
|
||||
the key is randomly generated using os.urandom. Every verification
|
||||
attempt has a new key. The AES key is then encrypted using a public key
|
||||
provided by Software Secure. We store only the RSA-encryped AES key.
|
||||
Since edx-platform does not have Software Secure's private RSA key, it
|
||||
|
||||
@@ -6,8 +6,6 @@ words, passing the `str` value of
|
||||
You want to pass in the result of calling .decode('hex') on that, so this instead:
|
||||
"'2\xfer\xaa\xf2\xab\xb4M\xe9\xe1a\x13\x1bT5\xc8\xd3|\xbd\xb6\xf5\xdf$*\xe8`\xb2\x83\x11_-\xae'"
|
||||
|
||||
The RSA functions take any key format that RSA.importKey() accepts, so...
|
||||
|
||||
An RSA public key can be in any of the following formats:
|
||||
* X.509 subjectPublicKeyInfo DER SEQUENCE (binary or PEM encoding)
|
||||
* PKCS#1 RSAPublicKey DER SEQUENCE (binary or PEM encoding)
|
||||
@@ -16,27 +14,32 @@ An RSA public key can be in any of the following formats:
|
||||
An RSA private key can be in any of the following formats:
|
||||
* PKCS#1 RSAPrivateKey DER SEQUENCE (binary or PEM encoding)
|
||||
* PKCS#8 PrivateKeyInfo DER SEQUENCE (binary or PEM encoding)
|
||||
* OpenSSH (textual public key only)
|
||||
|
||||
In case of PEM encoding, the private key can be encrypted with DES or 3TDES
|
||||
according to a certain pass phrase. Only OpenSSL-compatible pass phrases are
|
||||
supported.
|
||||
"""
|
||||
from __future__ import division
|
||||
|
||||
import base64
|
||||
import binascii
|
||||
import hmac
|
||||
import logging
|
||||
import os
|
||||
from hashlib import md5, sha256
|
||||
|
||||
from Crypto import Random
|
||||
from Crypto.Cipher import AES, PKCS1_OAEP
|
||||
from Crypto.PublicKey import RSA
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography.hazmat.primitives.asymmetric.padding import MGF1, OAEP
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher
|
||||
from cryptography.hazmat.primitives.ciphers.algorithms import AES
|
||||
from cryptography.hazmat.primitives.ciphers.modes import CBC
|
||||
from cryptography.hazmat.primitives.hashes import SHA1
|
||||
from cryptography.hazmat.primitives.padding import PKCS7
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
AES_BLOCK_SIZE_BYTES = int(AES.block_size / 8)
|
||||
|
||||
|
||||
def encrypt_and_encode(data, key):
|
||||
""" Encrypts and endcodes `data` using `key' """
|
||||
""" Encrypts and encodes `data` using `key' """
|
||||
return base64.urlsafe_b64encode(aes_encrypt(data, key))
|
||||
|
||||
|
||||
@@ -51,7 +54,8 @@ def aes_encrypt(data, key):
|
||||
"""
|
||||
cipher = aes_cipher_from_key(key)
|
||||
padded_data = pad(data)
|
||||
return cipher.encrypt(padded_data)
|
||||
encryptor = cipher.encryptor()
|
||||
return encryptor.update(padded_data) + encryptor.finalize()
|
||||
|
||||
|
||||
def aes_decrypt(encrypted_data, key):
|
||||
@@ -59,17 +63,18 @@ def aes_decrypt(encrypted_data, key):
|
||||
Decrypt `encrypted_data` using `key`
|
||||
"""
|
||||
cipher = aes_cipher_from_key(key)
|
||||
padded_data = cipher.decrypt(encrypted_data)
|
||||
decryptor = cipher.decryptor()
|
||||
padded_data = decryptor.update(encrypted_data) + decryptor.finalize()
|
||||
return unpad(padded_data)
|
||||
|
||||
|
||||
def aes_cipher_from_key(key):
|
||||
"""
|
||||
Given an AES key, return a Cipher object that has `encrypt()` and
|
||||
`decrypt()` methods. It will create the cipher to use CBC mode, and create
|
||||
Given an AES key, return a Cipher object that has `encryptor()` and
|
||||
`decryptor()` methods. It will create the cipher to use CBC mode, and create
|
||||
the initialization vector as Software Secure expects it.
|
||||
"""
|
||||
return AES.new(key, AES.MODE_CBC, generate_aes_iv(key))
|
||||
return Cipher(AES(key), CBC(generate_aes_iv(key)), backend=default_backend())
|
||||
|
||||
|
||||
def generate_aes_iv(key):
|
||||
@@ -77,42 +82,47 @@ def generate_aes_iv(key):
|
||||
Return the initialization vector Software Secure expects for a given AES
|
||||
key (they hash it a couple of times and take a substring).
|
||||
"""
|
||||
return md5(key + md5(key).hexdigest()).hexdigest()[:AES.block_size]
|
||||
return md5(key + md5(key).hexdigest()).hexdigest()[:AES_BLOCK_SIZE_BYTES]
|
||||
|
||||
|
||||
def random_aes_key():
|
||||
return Random.new().read(32)
|
||||
return os.urandom(32)
|
||||
|
||||
|
||||
def pad(data):
|
||||
""" Pad the given `data` such that it fits into the proper AES block size """
|
||||
bytes_to_pad = AES.block_size - len(data) % AES.block_size
|
||||
return data + (bytes_to_pad * chr(bytes_to_pad))
|
||||
padder = PKCS7(AES.block_size).padder()
|
||||
return padder.update(data) + padder.finalize()
|
||||
|
||||
|
||||
def unpad(padded_data):
|
||||
""" remove all padding from `padded_data` """
|
||||
num_padded_bytes = ord(padded_data[-1])
|
||||
return padded_data[:-num_padded_bytes]
|
||||
unpadder = PKCS7(AES.block_size).unpadder()
|
||||
return unpadder.update(padded_data) + unpadder.finalize()
|
||||
|
||||
|
||||
def rsa_encrypt(data, rsa_pub_key_str):
|
||||
def rsa_encrypt(data, rsa_pub_key_bytes):
|
||||
"""
|
||||
`rsa_pub_key` is a string with the public key
|
||||
`rsa_pub_key_bytes` is a byte sequence with the public key
|
||||
"""
|
||||
key = RSA.importKey(rsa_pub_key_str)
|
||||
cipher = PKCS1_OAEP.new(key)
|
||||
encrypted_data = cipher.encrypt(data)
|
||||
return encrypted_data
|
||||
if rsa_pub_key_bytes.startswith(b'-----'):
|
||||
key = serialization.load_pem_public_key(rsa_pub_key_bytes, backend=default_backend())
|
||||
elif rsa_pub_key_bytes.startswith(b'ssh-rsa '):
|
||||
key = serialization.load_ssh_public_key(rsa_pub_key_bytes, backend=default_backend())
|
||||
else:
|
||||
key = serialization.load_der_public_key(rsa_pub_key_bytes, backend=default_backend())
|
||||
return key.encrypt(data, OAEP(MGF1(SHA1()), SHA1(), label=None))
|
||||
|
||||
|
||||
def rsa_decrypt(data, rsa_priv_key_str):
|
||||
def rsa_decrypt(data, rsa_priv_key_bytes):
|
||||
"""
|
||||
When given some `data` and an RSA private key, decrypt the data
|
||||
"""
|
||||
key = RSA.importKey(rsa_priv_key_str)
|
||||
cipher = PKCS1_OAEP.new(key)
|
||||
return cipher.decrypt(data)
|
||||
if rsa_priv_key_bytes.startswith(b'-----'):
|
||||
key = serialization.load_pem_private_key(rsa_priv_key_bytes, password=None, backend=default_backend())
|
||||
else:
|
||||
key = serialization.load_der_private_key(rsa_priv_key_bytes, password=None, backend=default_backend())
|
||||
return key.decrypt(data, OAEP(MGF1(SHA1()), SHA1(), label=None))
|
||||
|
||||
|
||||
def has_valid_signature(method, headers_dict, body_dict, access_key, secret_key):
|
||||
|
||||
@@ -7,7 +7,7 @@ import unittest
|
||||
|
||||
import ddt
|
||||
import httpretty
|
||||
from Crypto.PublicKey import RSA
|
||||
from Cryptodome.PublicKey import RSA
|
||||
from django.conf import settings
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.test import RequestFactory, TestCase, override_settings
|
||||
|
||||
@@ -8,7 +8,7 @@ from __future__ import unicode_literals
|
||||
import hashlib
|
||||
import json
|
||||
|
||||
from Crypto.PublicKey import RSA
|
||||
from Cryptodome.PublicKey import RSA
|
||||
from django.conf import settings
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.http import JsonResponse
|
||||
|
||||
@@ -47,7 +47,7 @@ edx-lint==0.4.3
|
||||
astroid==1.3.8
|
||||
edx-django-oauth2-provider==1.2.5
|
||||
edx-django-sites-extensions==2.3.0
|
||||
edx-enterprise==0.53.14
|
||||
edx-enterprise==0.53.15
|
||||
edx-oauth2-provider==1.2.2
|
||||
edx-opaque-keys==0.4.0
|
||||
edx-organizations==0.4.7
|
||||
@@ -76,7 +76,7 @@ piexif==1.0.2
|
||||
Pillow==3.4
|
||||
polib==1.0.3
|
||||
psutil==1.2.1
|
||||
pycrypto>=2.6
|
||||
pycryptodomex==3.4.7
|
||||
pygments==2.2.0
|
||||
pygraphviz==1.1
|
||||
pyjwkest==1.3.2
|
||||
|
||||
Reference in New Issue
Block a user