activity_tools.headers

  1import re
  2import base64
  3import hashlib
  4import json
  5from typing import Tuple
  6from datetime import datetime
  7from urllib.parse import urlparse
  8
  9from cryptography.hazmat.primitives import hashes, serialization
 10from cryptography.hazmat.primitives.asymmetric import padding
 11from cryptography.hazmat.backends import default_backend
 12
 13from cryptography.exceptions import InvalidSignature
 14
 15from .objects import Actor, InboxObject
 16
 17class ContentTypes:
 18
 19    @classmethod
 20    @property
 21    def activity(cls) -> dict:
 22        return {
 23            'Content-Type': "application/activity+json"
 24        }
 25
 26    @classmethod
 27    @property
 28    def jrd(cls) -> dict:
 29        return {
 30            'Content-Type': "application/jrd+json"
 31        }
 32
 33class SignatureHeader:
 34
 35    key_id: str
 36    algorithm: str
 37    headers: list
 38    signature: str
 39
 40    def __init__(self, header) -> None:
 41        for sh in header.split(","):
 42            m = re.match(r'^"?([^"]+)"?="?([^"]+)"?$', sh)
 43            key = m.group(1)
 44            value = m.group(2)
 45
 46            if key.lower() == "keyid":
 47                self.key_id = value
 48            elif key.lower() == "algorithm":
 49                self.algorithm = value
 50            elif key.lower() == "headers":
 51                self.headers = value.split(" ")
 52            elif key.lower() == "signature":
 53                self.signature = value
 54            else:
 55                raise Exception("Unsupported SignatureHeader key")
 56
 57class Header:
 58    """
 59    Representation of a HTTP Header. This class has a bit magic that
 60    splits and parses headers like the signature header for easy use.
 61    """
 62
 63    name: str
 64    """ The headers name """
 65
 66    raw_value: str
 67    """ The unaltered header value """
 68
 69    _parsed_value: list
 70
 71    def __init__(self, header) -> None:
 72        self.name = header[0]
 73        self.raw_value = header[1]
 74        self._parsed_value = []
 75
 76        if self.is_signature():
 77            self._parsed_value = SignatureHeader(self.raw_value)
 78        else:
 79            self._parsed_value = self.raw_value
 80
 81    @property
 82    def value(self):
 83        """ 
 84        Parsed header value. Most headers are just strings, but
 85        special headers like the signature header will return a
 86        SignatureHeader object.
 87        """
 88        return self._parsed_value
 89
 90    def is_signature(self) -> bool:
 91        """
 92        Returns true of this is the signature header
 93        """
 94        return self.name.lower() == "signature"
 95
 96class Headers:
 97
 98    headers: list[Header]
 99
100    def __init__(self, headers) -> None:
101        self.headers = []
102
103        for header in headers:
104            self.headers.append(Header(header))
105
106    def get(self, name):
107        for header in self.headers:
108            if header.name.lower() == name:
109                return header
110
111def verify_signature(
112        object: dict,
113        headers: list[Tuple[str, str]],
114        inbox_path: str
115    ) -> bool:
116
117    """
118    This function verifies the signature on the specified request. This function
119    requires both the incoming object and HTTP headers with the path of the
120    receiving inbox. The function will return a boolean value.
121
122    arguments:
123    - object     - A dict representing the object we like to verify
124    - headers    - A list of tuples of strings representing our HTTP headers
125    - inbox_path - The path to our inbox, eg. /users/foo/inbox
126    """
127
128    # Analyze headers
129    headers_obj = Headers(headers)
130
131    # The extract the value of the signature header
132    signature_header: SignatureHeader = headers_obj.get('signature').value
133
134    # Extract the signature, confusually another header with the same name
135    # inside the signature header. Decode the signature.
136    signature = base64.b64decode(signature_header.signature)
137
138    # Fetch the remote actors, actor
139    actor = Actor.fetch(actor_url=object['actor'])
140
141    message = []
142    for h in signature_header.headers:
143        if h == '(request-target)':
144            message.append(f"(request-target): post {inbox_path}")
145        else:
146            message.append(f"{h}: {headers_obj.get(h).raw_value}")
147
148    message = "\n".join(message).encode("utf-8")
149
150    try:
151        actor.get_public_key().verify(
152            signature, message, padding.PKCS1v15(), hashes.SHA256()
153        )
154    except InvalidSignature:
155        return False
156
157    return True
158
159def make_signature(remote_inbox: str, message: str, sender_public_key_url: str):
160    """
161    This function generates a signature for the specified object.
162    """
163
164    # The following is to sign the HTTP request as defined in HTTP Signatures.
165    private_key_text = open('/tmp/key.pem', 'rb').read() # load from file
166
167    private_key = serialization.load_pem_private_key(
168        private_key_text,
169        password=None,
170        backend=default_backend()
171    )
172
173    current_date = datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
174
175    recipient_parsed = urlparse(remote_inbox)
176    recipient_host = recipient_parsed.netloc
177    recipient_path = recipient_parsed.path
178
179    # generating digest
180    message_json = json.dumps(message)
181
182    digest = base64.b64encode(
183        hashlib.sha256(
184            message_json.__str__().encode('utf-8')
185        ).digest()
186    )
187
188    signature_text = b'(request-target): post %s\ndigest: SHA-256=%s\nhost: %s\ndate: %s' % (
189        recipient_path.encode('utf-8'),
190        digest,
191        recipient_host.encode('utf-8'),
192        current_date.encode('utf-8')
193    )
194
195    raw_signature = private_key.sign(
196        signature_text,
197        padding.PKCS1v15(),
198        hashes.SHA256()
199    )
200
201    signature_header = 'keyId="%s",algorithm="rsa-sha256",headers="(request-target) digest host date",signature="%s"' % (
202        sender_public_key_url,
203        base64.b64encode(raw_signature).decode('utf-8')
204    )
205
206    headers = {
207        'Date': current_date,
208        'Host': recipient_host,
209        'Digest': "SHA-256="+digest.decode('utf-8'),
210        'Signature': signature_header
211    }
212
213    return headers    
class ContentTypes:
18class ContentTypes:
19
20    @classmethod
21    @property
22    def activity(cls) -> dict:
23        return {
24            'Content-Type': "application/activity+json"
25        }
26
27    @classmethod
28    @property
29    def jrd(cls) -> dict:
30        return {
31            'Content-Type': "application/jrd+json"
32        }
ContentTypes()
class SignatureHeader:
34class SignatureHeader:
35
36    key_id: str
37    algorithm: str
38    headers: list
39    signature: str
40
41    def __init__(self, header) -> None:
42        for sh in header.split(","):
43            m = re.match(r'^"?([^"]+)"?="?([^"]+)"?$', sh)
44            key = m.group(1)
45            value = m.group(2)
46
47            if key.lower() == "keyid":
48                self.key_id = value
49            elif key.lower() == "algorithm":
50                self.algorithm = value
51            elif key.lower() == "headers":
52                self.headers = value.split(" ")
53            elif key.lower() == "signature":
54                self.signature = value
55            else:
56                raise Exception("Unsupported SignatureHeader key")
SignatureHeader(header)
41    def __init__(self, header) -> None:
42        for sh in header.split(","):
43            m = re.match(r'^"?([^"]+)"?="?([^"]+)"?$', sh)
44            key = m.group(1)
45            value = m.group(2)
46
47            if key.lower() == "keyid":
48                self.key_id = value
49            elif key.lower() == "algorithm":
50                self.algorithm = value
51            elif key.lower() == "headers":
52                self.headers = value.split(" ")
53            elif key.lower() == "signature":
54                self.signature = value
55            else:
56                raise Exception("Unsupported SignatureHeader key")
class Headers:
 97class Headers:
 98
 99    headers: list[Header]
100
101    def __init__(self, headers) -> None:
102        self.headers = []
103
104        for header in headers:
105            self.headers.append(Header(header))
106
107    def get(self, name):
108        for header in self.headers:
109            if header.name.lower() == name:
110                return header
Headers(headers)
101    def __init__(self, headers) -> None:
102        self.headers = []
103
104        for header in headers:
105            self.headers.append(Header(header))
def get(self, name):
107    def get(self, name):
108        for header in self.headers:
109            if header.name.lower() == name:
110                return header
def verify_signature( object: dict, headers: list[typing.Tuple[str, str]], inbox_path: str) -> bool:
112def verify_signature(
113        object: dict,
114        headers: list[Tuple[str, str]],
115        inbox_path: str
116    ) -> bool:
117
118    """
119    This function verifies the signature on the specified request. This function
120    requires both the incoming object and HTTP headers with the path of the
121    receiving inbox. The function will return a boolean value.
122
123    arguments:
124    - object     - A dict representing the object we like to verify
125    - headers    - A list of tuples of strings representing our HTTP headers
126    - inbox_path - The path to our inbox, eg. /users/foo/inbox
127    """
128
129    # Analyze headers
130    headers_obj = Headers(headers)
131
132    # The extract the value of the signature header
133    signature_header: SignatureHeader = headers_obj.get('signature').value
134
135    # Extract the signature, confusually another header with the same name
136    # inside the signature header. Decode the signature.
137    signature = base64.b64decode(signature_header.signature)
138
139    # Fetch the remote actors, actor
140    actor = Actor.fetch(actor_url=object['actor'])
141
142    message = []
143    for h in signature_header.headers:
144        if h == '(request-target)':
145            message.append(f"(request-target): post {inbox_path}")
146        else:
147            message.append(f"{h}: {headers_obj.get(h).raw_value}")
148
149    message = "\n".join(message).encode("utf-8")
150
151    try:
152        actor.get_public_key().verify(
153            signature, message, padding.PKCS1v15(), hashes.SHA256()
154        )
155    except InvalidSignature:
156        return False
157
158    return True

This function verifies the signature on the specified request. This function requires both the incoming object and HTTP headers with the path of the receiving inbox. The function will return a boolean value.

arguments:

  • object - A dict representing the object we like to verify
  • headers - A list of tuples of strings representing our HTTP headers
  • inbox_path - The path to our inbox, eg. /users/foo/inbox
def make_signature(remote_inbox: str, message: str, sender_public_key_url: str):
160def make_signature(remote_inbox: str, message: str, sender_public_key_url: str):
161    """
162    This function generates a signature for the specified object.
163    """
164
165    # The following is to sign the HTTP request as defined in HTTP Signatures.
166    private_key_text = open('/tmp/key.pem', 'rb').read() # load from file
167
168    private_key = serialization.load_pem_private_key(
169        private_key_text,
170        password=None,
171        backend=default_backend()
172    )
173
174    current_date = datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
175
176    recipient_parsed = urlparse(remote_inbox)
177    recipient_host = recipient_parsed.netloc
178    recipient_path = recipient_parsed.path
179
180    # generating digest
181    message_json = json.dumps(message)
182
183    digest = base64.b64encode(
184        hashlib.sha256(
185            message_json.__str__().encode('utf-8')
186        ).digest()
187    )
188
189    signature_text = b'(request-target): post %s\ndigest: SHA-256=%s\nhost: %s\ndate: %s' % (
190        recipient_path.encode('utf-8'),
191        digest,
192        recipient_host.encode('utf-8'),
193        current_date.encode('utf-8')
194    )
195
196    raw_signature = private_key.sign(
197        signature_text,
198        padding.PKCS1v15(),
199        hashes.SHA256()
200    )
201
202    signature_header = 'keyId="%s",algorithm="rsa-sha256",headers="(request-target) digest host date",signature="%s"' % (
203        sender_public_key_url,
204        base64.b64encode(raw_signature).decode('utf-8')
205    )
206
207    headers = {
208        'Date': current_date,
209        'Host': recipient_host,
210        'Digest': "SHA-256="+digest.decode('utf-8'),
211        'Signature': signature_header
212    }
213
214    return headers    

This function generates a signature for the specified object.