signatures.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. """ signs activitypub activities """
  2. import hashlib
  3. from urllib.parse import urlparse
  4. import datetime
  5. from base64 import b64encode, b64decode
  6. from Crypto import Random
  7. from Crypto.PublicKey import RSA
  8. from Crypto.Signature import pkcs1_15 # pylint: disable=no-name-in-module
  9. from Crypto.Hash import SHA256
  10. MAX_SIGNATURE_AGE = 300
  11. def create_key_pair():
  12. """a new public/private key pair, used for creating new users"""
  13. random_generator = Random.new().read
  14. key = RSA.generate(2048, random_generator)
  15. private_key = key.export_key().decode("utf8")
  16. public_key = key.public_key().export_key().decode("utf8")
  17. return private_key, public_key
  18. def make_signature(method, sender, destination, date, **kwargs):
  19. """uses a private key to sign an outgoing message"""
  20. inbox_parts = urlparse(destination)
  21. signature_headers = [
  22. f"(request-target): {method} {inbox_parts.path}",
  23. f"host: {inbox_parts.netloc}",
  24. f"date: {date}",
  25. ]
  26. headers = "(request-target) host date"
  27. digest = kwargs.get("digest")
  28. if digest is not None:
  29. signature_headers.append(f"digest: {digest}")
  30. headers = "(request-target) host date digest"
  31. message_to_sign = "\n".join(signature_headers)
  32. signer = pkcs1_15.new(RSA.import_key(sender.key_pair.private_key))
  33. signed_message = signer.sign(SHA256.new(message_to_sign.encode("utf8")))
  34. # For legacy reasons we need to use an incorrect keyId for older Bookwyrm versions
  35. key_id = (
  36. f"{sender.remote_id}#main-key"
  37. if kwargs.get("use_legacy_key")
  38. else f"{sender.remote_id}/#main-key"
  39. )
  40. signature = {
  41. "keyId": key_id,
  42. "algorithm": "rsa-sha256",
  43. "headers": headers,
  44. "signature": b64encode(signed_message).decode("utf8"),
  45. }
  46. return ",".join(f'{k}="{v}"' for (k, v) in signature.items())
  47. def make_digest(data):
  48. """creates a message digest for signing"""
  49. return "SHA-256=" + b64encode(hashlib.sha256(data.encode("utf-8")).digest()).decode(
  50. "utf-8"
  51. )
  52. def verify_digest(request):
  53. """checks if a digest is syntactically valid and matches the message"""
  54. algorithm, digest = request.headers["digest"].split("=", 1)
  55. if algorithm == "SHA-256":
  56. hash_function = hashlib.sha256
  57. elif algorithm == "SHA-512":
  58. hash_function = hashlib.sha512
  59. else:
  60. raise ValueError(f"Unsupported hash function: {algorithm}")
  61. expected = hash_function(request.body).digest()
  62. if b64decode(digest) != expected:
  63. raise ValueError("Invalid HTTP Digest header")
  64. class Signature:
  65. """read and validate incoming signatures"""
  66. def __init__(self, key_id, headers, signature):
  67. self.key_id = key_id
  68. self.headers = headers
  69. self.signature = signature
  70. # pylint: disable=invalid-name
  71. @classmethod
  72. def parse(cls, request):
  73. """extract and parse a signature from an http request"""
  74. signature_dict = {}
  75. for pair in request.headers["Signature"].split(","):
  76. k, v = pair.split("=", 1)
  77. v = v.replace('"', "")
  78. signature_dict[k] = v
  79. try:
  80. key_id = signature_dict["keyId"]
  81. headers = signature_dict["headers"]
  82. signature = b64decode(signature_dict["signature"])
  83. except KeyError:
  84. raise ValueError("Invalid auth header")
  85. return cls(key_id, headers, signature)
  86. def verify(self, public_key, request):
  87. """verify rsa signature"""
  88. if http_date_age(request.headers["date"]) > MAX_SIGNATURE_AGE:
  89. raise ValueError(f"Request too old: {request.headers['date']}")
  90. public_key = RSA.import_key(public_key)
  91. comparison_string = []
  92. for signed_header_name in self.headers.split(" "):
  93. if signed_header_name == "(request-target)":
  94. comparison_string.append(f"(request-target): post {request.path}")
  95. else:
  96. if signed_header_name == "digest":
  97. verify_digest(request)
  98. comparison_string.append(
  99. f"{signed_header_name}: {request.headers[signed_header_name]}"
  100. )
  101. comparison_string = "\n".join(comparison_string)
  102. signer = pkcs1_15.new(public_key)
  103. digest = SHA256.new()
  104. digest.update(comparison_string.encode())
  105. # raises a ValueError if it fails
  106. signer.verify(digest, self.signature)
  107. def http_date_age(datestr):
  108. """age of a signature in seconds"""
  109. parsed = datetime.datetime.strptime(datestr, "%a, %d %b %Y %H:%M:%S GMT")
  110. delta = datetime.datetime.utcnow() - parsed
  111. return delta.total_seconds()