本篇博客使用python实现了一个简易的ssl,以帮助理解ssl的大致实现流程。
ssl(secure socket layer)安全套接层是netscape公司率先采用的网络安全协议。它是在传输通信协议(tcp/ip)上实现的一种安全协议,采用公开密钥技术。
运行环境
- 操作系统:浪潮云启操作系统 inlinux 23.12 lts sp1
- python版本:3.9.9
- 使用的python包:cryptography、ownca、signify
浪潮云启操作系统(inlinux)面向企业级业务,提供自主可控、安全可靠的新一代服务器操作系统,全面支持云计算、大数据、人工智能、物联网等新型场景,具备性能高效、扩展便捷、管理智能、内生安全等特性。
运行前准备
# 安装python yum install -y python # 安装python包 pip install cryptography ownca signify
程序实现与流程说明
本程序实现了一个简易的ssl,共分为三个模块:ca.py,server.py,client.py。
ca.py负责签发证书,server.py与client.py通信,过程中会实现生成公私钥对、会话密钥等过程。为了简易性,server能够从本地直接获取证书,且只有server对client检查证书。
程序流程如下:
- ca生成根证书、公私钥对
- client生成公私钥对、向ca发送csr请求
- ca收到csr请求,用私钥签名,向client发送签名证书,client拿到证书
- client第一次向server发送数据,并附带证书信息
- server检验证书信息,并生成session_key,利用client公钥加密session_key,将密文发回给client,之后的对话用session_key验证。
- client解密session_key,利用消息和session_key生成mac,向server发送消息并附带mac
- server收到消息并验证mac,对话结束。
运行截图
ca.py

server.py

client.py

证书目录:

代码
ca.py
import ownca.ownca
import socket
from cryptography import x509
from cryptography.x509 import nameoid
ca = ownca.certificateauthority(ca_storage='./myca',common_name='myca')
print("myca initialized")
host = "127.0.0.1" # standard loopback interface address (localhost)
port = 11111 # port to listen on (non-privileged ports are > 1023)
server_socket = socket.socket(socket.af_inet, socket.sock_stream)
server_socket.bind((host, port))
server_socket.listen(5)
print("myca server is running...")
while true:
conn, addr = server_socket.accept()
print(f"connected by {addr}")
data = conn.recv(2048)
csr = x509.load_pem_x509_csr(data)
try:
# sign the csr, if success, the certificate will generate and store
ca.sign_csr(csr,csr.public_key(),maximum_days=825)
except exception as e:
print(e)
# extract cn from csr
common_name = csr.subject.get_attributes_for_oid(nameoid.common_name)[0].value
# load the issued certificate from existing file
load_cert = ca.load_certificate(common_name)
print('successfully sign for ' + common_name)
# send the certificate bytes
send_data = load_cert.cert_bytes
conn.sendall(send_data)
conn.close()
client.py
import socket
import pickle
from cryptography import x509
from cryptography.x509.oid import nameoid
from cryptography.hazmat.primitives import hashes, hmac
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
def generatecsr(common_name, private_key):
csr = x509.certificatesigningrequestbuilder().subject_name(x509.name([
# provide various details about who we are.
x509.nameattribute(nameoid.country_name, u"cn"),
x509.nameattribute(nameoid.state_or_province_name, u"beijing"),
# x509.nameattribute(nameoid.locality_name, u"richmond"),
# x509.nameattribute(nameoid.organization_name, u"my organization"),
x509.nameattribute(nameoid.common_name, common_name),
])).sign(private_key, hashes.sha256())
return csr
def rsadecryption(cipher, private_key):
msg = private_key.decrypt(
cipher,
padding.oaep(
mgf=padding.mgf1(algorithm=hashes.sha256()),
algorithm=hashes.sha256(),
label=none
)
)
return msg
# generate the rsa private key
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
cn = input("input common name: ")
csr = generatecsr(cn, private_key)
public_key = private_key.public_key()
# public_key = csr.public_key()
csr_bytes = csr.public_bytes(serialization.encoding.pem)
print(cn + ": key pair and csr generated!")
host = "127.0.0.1" # the server's hostname or ip address
ca_port = 11111 # the ca server port
with socket.socket(socket.af_inet, socket.sock_stream) as s:
s.connect((host, ca_port))
s.sendall(csr_bytes) # send the generated csr
cert_data = s.recv(2048)
print(cn + " signing finished!, part of the cert:")
print(cert_data[:40] + b'...')
print('================================================')
server_port = 22222 # the server port
session_key = b''
msg = b''
hmac = b''
public_key_pem = public_key.public_bytes(
encoding=serialization.encoding.pem,
format=serialization.publicformat.subjectpublickeyinfo
)
with socket.socket(socket.af_inet, socket.sock_stream) as s:
s.connect((host, server_port))
# send all params to the server
params = [cn, cert_data, public_key_pem, session_key, msg, hmac]
data = pickle.dumps(params)
s.sendall(data)
data = s.recv(2048)
print("received from server: ")
session_key = rsadecryption(data, private_key)
print("session key decrypted:",session_key)
msg = b'this is messgae from ' + cn.encode('utf-8') + b'.\n'
with socket.socket(socket.af_inet, socket.sock_stream) as s:
s.connect((host, server_port))
msg = msg * 10
h = hmac.hmac(session_key,hashes.sha256())
h.update(msg)
hmac = h.finalize()
# send all params with valid session_key, msg and hmac
params = [cn, cert_data, public_key_pem, session_key, msg, hmac]
data = pickle.dumps(params)
s.sendall(data)
data = s.recv(2048)
print(data)
server.py
import os
import pickle
import socket
import ownca
from cryptography.hazmat.primitives import hashes, hmac
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.serialization import load_pem_public_key
from signify.x509 import certificatestore, verificationcontext, certificate
ca = ownca.certificateauthority(ca_storage='./myca',common_name='myca')
print("successfully load the ca certificate")
host = "127.0.0.1"
port = 22222
server_socket = socket.socket(socket.af_inet, socket.sock_stream)
server_socket.bind((host, port))
server_socket.listen()
print("server is running...")
# generate a list of random bytes as session key
def generatesessionkey(byte_num=16):
return os.urandom(byte_num)
# encrypt the msg using rsa
def rsaencryption(msg, public_key):
cipher = public_key.encrypt(
msg,
padding.oaep(
mgf=padding.mgf1(algorithm=hashes.sha256()),
algorithm=hashes.sha256(),
label=none
)
)
return cipher
while true:
conn, addr = server_socket.accept()
print(f"connected by {addr}")
data = conn.recv(2048)
# find all params from the received data
params = pickle.loads(data)
cn = params[0]
cert_from_cli_pem = params[1]
public_key_pem = params[2] # for encrypting the session_key
session_key = params[3]
msg = params[4]
hmac_to_check = params[5]
# if no session built
if not session_key:
try:
# using signify to verify the certificate
# raise exception if verification error
trust_store = certificatestore([certificate.from_pem(ca.cert_bytes)], trusted=true)
context = verificationcontext(trust_store)
certificate.from_pem(cert_from_cli_pem).verify(context)
except exception as e:
print(cn + "verification error: ", e)
conn.sendall(b"verification error!\n")
conn.close()
continue
finally:
print(cn + " verification success!")
# load cert from stu_pem and find the public_key
public_key = load_pem_public_key(public_key_pem)
session_key = generatesessionkey()
print(cn + " session key generated:",session_key)
cipher = rsaencryption(session_key,public_key)
conn.sendall(cipher)
# valid session key
else:
h = hmac.hmac(session_key, hashes.sha256())
h.update(msg)
try:
h.verify(hmac_to_check)
except exception as e:
print(cn + " mac verification wrong:",e)
conn.close()
continue
finally:
print(cn + " mac verification success!")
print(msg)
conn.sendall(b'mac verification success!')
conn.close()
参考资料
到此这篇关于python实现简易ssl的项目实践的文章就介绍到这了,更多相关python实现简易ssl内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论