activity_tools.headers
1import re 2import base64 3import hashlib 4import json 5from typing import Tuple 6from datetime import datetime 7from urllib.parse import urlparse 8 9from cryptography.hazmat.primitives import hashes, serialization 10from cryptography.hazmat.primitives.asymmetric import padding 11from cryptography.hazmat.backends import default_backend 12 13from cryptography.exceptions import InvalidSignature 14 15from .objects import Actor, InboxObject 16 17class ContentTypes: 18 19 @classmethod 20 @property 21 def activity(cls) -> dict: 22 return { 23 'Content-Type': "application/activity+json" 24 } 25 26 @classmethod 27 @property 28 def jrd(cls) -> dict: 29 return { 30 'Content-Type': "application/jrd+json" 31 } 32 33class SignatureHeader: 34 35 key_id: str 36 algorithm: str 37 headers: list 38 signature: str 39 40 def __init__(self, header) -> None: 41 for sh in header.split(","): 42 m = re.match(r'^"?([^"]+)"?="?([^"]+)"?$', sh) 43 key = m.group(1) 44 value = m.group(2) 45 46 if key.lower() == "keyid": 47 self.key_id = value 48 elif key.lower() == "algorithm": 49 self.algorithm = value 50 elif key.lower() == "headers": 51 self.headers = value.split(" ") 52 elif key.lower() == "signature": 53 self.signature = value 54 else: 55 raise Exception("Unsupported SignatureHeader key") 56 57class Header: 58 """ 59 Representation of a HTTP Header. This class has a bit magic that 60 splits and parses headers like the signature header for easy use. 61 """ 62 63 name: str 64 """ The headers name """ 65 66 raw_value: str 67 """ The unaltered header value """ 68 69 _parsed_value: list 70 71 def __init__(self, header) -> None: 72 self.name = header[0] 73 self.raw_value = header[1] 74 self._parsed_value = [] 75 76 if self.is_signature(): 77 self._parsed_value = SignatureHeader(self.raw_value) 78 else: 79 self._parsed_value = self.raw_value 80 81 @property 82 def value(self): 83 """ 84 Parsed header value. Most headers are just strings, but 85 special headers like the signature header will return a 86 SignatureHeader object. 87 """ 88 return self._parsed_value 89 90 def is_signature(self) -> bool: 91 """ 92 Returns true of this is the signature header 93 """ 94 return self.name.lower() == "signature" 95 96class Headers: 97 98 headers: list[Header] 99 100 def __init__(self, headers) -> None: 101 self.headers = [] 102 103 for header in headers: 104 self.headers.append(Header(header)) 105 106 def get(self, name): 107 for header in self.headers: 108 if header.name.lower() == name: 109 return header 110 111def verify_signature( 112 object: dict, 113 headers: list[Tuple[str, str]], 114 inbox_path: str 115 ) -> bool: 116 117 """ 118 This function verifies the signature on the specified request. This function 119 requires both the incoming object and HTTP headers with the path of the 120 receiving inbox. The function will return a boolean value. 121 122 arguments: 123 - object - A dict representing the object we like to verify 124 - headers - A list of tuples of strings representing our HTTP headers 125 - inbox_path - The path to our inbox, eg. /users/foo/inbox 126 """ 127 128 # Analyze headers 129 headers_obj = Headers(headers) 130 131 # The extract the value of the signature header 132 signature_header: SignatureHeader = headers_obj.get('signature').value 133 134 # Extract the signature, confusually another header with the same name 135 # inside the signature header. Decode the signature. 136 signature = base64.b64decode(signature_header.signature) 137 138 # Fetch the remote actors, actor 139 actor = Actor.fetch(actor_url=object['actor']) 140 141 message = [] 142 for h in signature_header.headers: 143 if h == '(request-target)': 144 message.append(f"(request-target): post {inbox_path}") 145 else: 146 message.append(f"{h}: {headers_obj.get(h).raw_value}") 147 148 message = "\n".join(message).encode("utf-8") 149 150 try: 151 actor.get_public_key().verify( 152 signature, message, padding.PKCS1v15(), hashes.SHA256() 153 ) 154 except InvalidSignature: 155 return False 156 157 return True 158 159def make_signature(remote_inbox: str, message: str, sender_public_key_url: str): 160 """ 161 This function generates a signature for the specified object. 162 """ 163 164 # The following is to sign the HTTP request as defined in HTTP Signatures. 165 private_key_text = open('/tmp/key.pem', 'rb').read() # load from file 166 167 private_key = serialization.load_pem_private_key( 168 private_key_text, 169 password=None, 170 backend=default_backend() 171 ) 172 173 current_date = datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT') 174 175 recipient_parsed = urlparse(remote_inbox) 176 recipient_host = recipient_parsed.netloc 177 recipient_path = recipient_parsed.path 178 179 # generating digest 180 message_json = json.dumps(message) 181 182 digest = base64.b64encode( 183 hashlib.sha256( 184 message_json.__str__().encode('utf-8') 185 ).digest() 186 ) 187 188 signature_text = b'(request-target): post %s\ndigest: SHA-256=%s\nhost: %s\ndate: %s' % ( 189 recipient_path.encode('utf-8'), 190 digest, 191 recipient_host.encode('utf-8'), 192 current_date.encode('utf-8') 193 ) 194 195 raw_signature = private_key.sign( 196 signature_text, 197 padding.PKCS1v15(), 198 hashes.SHA256() 199 ) 200 201 signature_header = 'keyId="%s",algorithm="rsa-sha256",headers="(request-target) digest host date",signature="%s"' % ( 202 sender_public_key_url, 203 base64.b64encode(raw_signature).decode('utf-8') 204 ) 205 206 headers = { 207 'Date': current_date, 208 'Host': recipient_host, 209 'Digest': "SHA-256="+digest.decode('utf-8'), 210 'Signature': signature_header 211 } 212 213 return headers
class
ContentTypes:
class
SignatureHeader:
34class SignatureHeader: 35 36 key_id: str 37 algorithm: str 38 headers: list 39 signature: str 40 41 def __init__(self, header) -> None: 42 for sh in header.split(","): 43 m = re.match(r'^"?([^"]+)"?="?([^"]+)"?$', sh) 44 key = m.group(1) 45 value = m.group(2) 46 47 if key.lower() == "keyid": 48 self.key_id = value 49 elif key.lower() == "algorithm": 50 self.algorithm = value 51 elif key.lower() == "headers": 52 self.headers = value.split(" ") 53 elif key.lower() == "signature": 54 self.signature = value 55 else: 56 raise Exception("Unsupported SignatureHeader key")
SignatureHeader(header)
41 def __init__(self, header) -> None: 42 for sh in header.split(","): 43 m = re.match(r'^"?([^"]+)"?="?([^"]+)"?$', sh) 44 key = m.group(1) 45 value = m.group(2) 46 47 if key.lower() == "keyid": 48 self.key_id = value 49 elif key.lower() == "algorithm": 50 self.algorithm = value 51 elif key.lower() == "headers": 52 self.headers = value.split(" ") 53 elif key.lower() == "signature": 54 self.signature = value 55 else: 56 raise Exception("Unsupported SignatureHeader key")
class
Header:
58class Header: 59 """ 60 Representation of a HTTP Header. This class has a bit magic that 61 splits and parses headers like the signature header for easy use. 62 """ 63 64 name: str 65 """ The headers name """ 66 67 raw_value: str 68 """ The unaltered header value """ 69 70 _parsed_value: list 71 72 def __init__(self, header) -> None: 73 self.name = header[0] 74 self.raw_value = header[1] 75 self._parsed_value = [] 76 77 if self.is_signature(): 78 self._parsed_value = SignatureHeader(self.raw_value) 79 else: 80 self._parsed_value = self.raw_value 81 82 @property 83 def value(self): 84 """ 85 Parsed header value. Most headers are just strings, but 86 special headers like the signature header will return a 87 SignatureHeader object. 88 """ 89 return self._parsed_value 90 91 def is_signature(self) -> bool: 92 """ 93 Returns true of this is the signature header 94 """ 95 return self.name.lower() == "signature"
Representation of a HTTP Header. This class has a bit magic that splits and parses headers like the signature header for easy use.
class
Headers:
97class Headers: 98 99 headers: list[Header] 100 101 def __init__(self, headers) -> None: 102 self.headers = [] 103 104 for header in headers: 105 self.headers.append(Header(header)) 106 107 def get(self, name): 108 for header in self.headers: 109 if header.name.lower() == name: 110 return header
def
verify_signature( object: dict, headers: list[typing.Tuple[str, str]], inbox_path: str) -> bool:
112def verify_signature( 113 object: dict, 114 headers: list[Tuple[str, str]], 115 inbox_path: str 116 ) -> bool: 117 118 """ 119 This function verifies the signature on the specified request. This function 120 requires both the incoming object and HTTP headers with the path of the 121 receiving inbox. The function will return a boolean value. 122 123 arguments: 124 - object - A dict representing the object we like to verify 125 - headers - A list of tuples of strings representing our HTTP headers 126 - inbox_path - The path to our inbox, eg. /users/foo/inbox 127 """ 128 129 # Analyze headers 130 headers_obj = Headers(headers) 131 132 # The extract the value of the signature header 133 signature_header: SignatureHeader = headers_obj.get('signature').value 134 135 # Extract the signature, confusually another header with the same name 136 # inside the signature header. Decode the signature. 137 signature = base64.b64decode(signature_header.signature) 138 139 # Fetch the remote actors, actor 140 actor = Actor.fetch(actor_url=object['actor']) 141 142 message = [] 143 for h in signature_header.headers: 144 if h == '(request-target)': 145 message.append(f"(request-target): post {inbox_path}") 146 else: 147 message.append(f"{h}: {headers_obj.get(h).raw_value}") 148 149 message = "\n".join(message).encode("utf-8") 150 151 try: 152 actor.get_public_key().verify( 153 signature, message, padding.PKCS1v15(), hashes.SHA256() 154 ) 155 except InvalidSignature: 156 return False 157 158 return True
This function verifies the signature on the specified request. This function requires both the incoming object and HTTP headers with the path of the receiving inbox. The function will return a boolean value.
arguments:
- object - A dict representing the object we like to verify
- headers - A list of tuples of strings representing our HTTP headers
- inbox_path - The path to our inbox, eg. /users/foo/inbox
def
make_signature(remote_inbox: str, message: str, sender_public_key_url: str):
160def make_signature(remote_inbox: str, message: str, sender_public_key_url: str): 161 """ 162 This function generates a signature for the specified object. 163 """ 164 165 # The following is to sign the HTTP request as defined in HTTP Signatures. 166 private_key_text = open('/tmp/key.pem', 'rb').read() # load from file 167 168 private_key = serialization.load_pem_private_key( 169 private_key_text, 170 password=None, 171 backend=default_backend() 172 ) 173 174 current_date = datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT') 175 176 recipient_parsed = urlparse(remote_inbox) 177 recipient_host = recipient_parsed.netloc 178 recipient_path = recipient_parsed.path 179 180 # generating digest 181 message_json = json.dumps(message) 182 183 digest = base64.b64encode( 184 hashlib.sha256( 185 message_json.__str__().encode('utf-8') 186 ).digest() 187 ) 188 189 signature_text = b'(request-target): post %s\ndigest: SHA-256=%s\nhost: %s\ndate: %s' % ( 190 recipient_path.encode('utf-8'), 191 digest, 192 recipient_host.encode('utf-8'), 193 current_date.encode('utf-8') 194 ) 195 196 raw_signature = private_key.sign( 197 signature_text, 198 padding.PKCS1v15(), 199 hashes.SHA256() 200 ) 201 202 signature_header = 'keyId="%s",algorithm="rsa-sha256",headers="(request-target) digest host date",signature="%s"' % ( 203 sender_public_key_url, 204 base64.b64encode(raw_signature).decode('utf-8') 205 ) 206 207 headers = { 208 'Date': current_date, 209 'Host': recipient_host, 210 'Digest': "SHA-256="+digest.decode('utf-8'), 211 'Signature': signature_header 212 } 213 214 return headers
This function generates a signature for the specified object.