complete blog on strategy pattern
All checks were successful
Deploy Blog / deploy (push) Successful in 2m50s

This commit is contained in:
Thomas Bishop 2026-01-25 17:45:00 +00:00
parent ca74de7066
commit 4858c1a68a

View file

@ -6,58 +6,289 @@ tags: ["typescript", "python"]
---
In this post I am going to talk about an effective design pattern I came accross
in the course of my work. Please note that I will obscure any senstive
operational details and focus only the technical aspects.
in the course of my work. (I will obscure any senstive operational details and
focus only on the technical aspects.)
A lot of my work consists in integrating the different applications that our
stakeholders use to manage the content libraries of streaming services. When a
user updates a catalog item in one application (let's call it 'Alpha'), this
should update related records in another application (let's call it 'Omega').
Most of my work involves integrating the various applications that our
stakeholders use to catalog and license broadcast content. For example, when a
user updates the synopsis of an episode of a series in one application (let's
call it 'Alpha'), this should update related records in another application
(let's call it 'Omega').
On the surface, this is a fairly trivial workflow managed via a serverless AWS
pipeline. When Alpha is updated, an event is added to an SQS queue which
triggers an associated Lambda which is subscribed to the queue. The Lambda
parses the event data, transforms it into the data structure expected by Omega,
and sends it on.
We have a simple AWS workflow. When a record is updated in Alpha, a notification
is sent to an SQS queue that a lambda function subscribes to. The lambda parses
the data and transforms it into the data structure expected by Omega and sends
it on.
The data contained in the SQS event body is usually minimal. It specifies the
type of event that has occurred in Alpha, along with the record category and ID
of the affected record. For example:
Complexity arises from the variation accross payloads:
```json
{
"status": "created",
"category": "show",
"id": "SHOW-0001"
- each record can be one of eight categories and each category has different
transformational rules ('mappings')
- for certain categories, the record will be a 'child' to another 'parent'
record, where some of the mappings of the child have to be inherited from the
parent. In this case, additional API requests must be made to check that the
parent exists and if it exists, retrieve that data and append to the child
- the data types mapped from Alpha to Omega do not always correspond
- not every Alpha category has a corresponding category in Omega. There is at
least one scenario where one Alpha category can correspond to two Omega
categories
We face further complexity because the mappings are often subject to change as
the business is still working out the overall schema. In addition both Alpha and
Omega are incomplete software also subject to change!
In essence, however, the same core process is being repeated with each
invocation: we call APIs and map data. The variation exists mostly at the type
level.
Accordingly, I needed a solution that would
- isolate the core logic (read from queue, parse, post to API) from the
contextual intricacies of the mappings
- avoid repeating mappings that are common to multiple schemas
- be sufficiently decoupled so as to easily accommodate the frequent schema
revisions and API rebaselines
I subsequently learned that my solution more or less follows a pre-existing
design pattern that is well suited to object-oriented programming: the strategy
pattern.
The name of the game when it comes to the strategy pattern is flexibility and
reuse in the service of reduced repetition. The key characteristic is that the
software decides at _runtime_ which process to run in response to incoming data:
> [For example] a class that performs validation on incoming data may use the
> strategy pattern to select a validation algorithm depending on the type of
> data, the source of the data, user choice, or other discriminating factors.
> These factors are not known until runtime and may require radically different
> validation to be performed. The validation algorithms (strategies),
> encapsulated separately from the validating object, may be used by other
> validating objects in different areas of the system (or even different
> systems) without code duplication.
[Strategy pattern: Wikipedia](https://en.wikipedia.org/wiki/Strategy_pattern)
To achieve this (in TypeScript) I created a factory class that functions as a
sorting station for the incoming data.
The factory, and the lambda handler, are both ignorant as to the specific
mappings that are being applied. The handler simply takes the `category` field
from the incoming SQS and passes off responsibility to the factory:
```ts
const mapper = MapperFactory.create(
category,
this.alphaApiService,
this.omegaApiService
)
```
The factory then instantiates a _strategy_ based on the category it receives:
```ts
export class MapperFactory {
static create(
catalogType: CatalogType,
alphaApiService: AlphaApiService,
omegaApiService: OmegaApiService
): BaseMapper<IAlphaRecord, IOmegaRecord> {
const mappers: Record
CatalogType,
new (
a: AlphaApiService,
o: OmegaApiService
) => BaseMapper<IAlphaRecord, IOmegaRecord>
> = {
[CatalogType.SHOW]: ShowMapper,
[CatalogType.EPISODE]: EpisodeMapper,
// And many more...
}
const MapperClass = mappers[catalogType]
return new MapperClass(alphaApiService, omegaApiService)
}
}
```
So we use the ID to send a further API request to Alpha to get the full record
information and we use this to populate the data that is sent on to Omega.
Each strategy (i.e `ShowMapper` and `EpisodeMapper`) is free to contain
arbitrary mappings and methods unique to the given category but each must
implement the `BaseMapper` interface:
The complexity arises from the fact that there are about eight different
category types, each with subtly different transformational rules ('mappings')
and, for certain categories, the record will be a 'child' to another 'parent'
record, where some of the mappings of the child have to be inherited from the
parent. In the latter case, additional API requests must be made to (a) check
that the parent exists and (b) if it exists, retrieve that data and append to
the child.
```ts
interface BaseMapper<
TAlphaRecord extends IAlphaRecord,
TOmegaRecord extends IOmegaRecord,
> {
mapCatalogItem(alphaRecord: TAlphaRecord): TOmegaRecord
fetchAlphaRecord(id: string): Promise<TAlphaRecord>
updateOmegaRecord(mappedCatalogItem: TOmegaRecord): Promise<void>
process(): Promise<void>
}
```
In addition, the properties mapped from Alpha to Omega are not always a simple
one-to-one correspondence. Sometimes the data must first be pre-processed and
translated into a form that Omega will understand, whereas other times it can
simply be passed on unaltered. Furthermore, not every Alpha category has a
corresponding category in Omega. There is at leas one scenario where one Alpha
category can correspond to two Omega categories.
The two API methods are common to all strategies and hence do not need to be
defined anywhere other than in `BaseMapper`. In contrast, `mapCatalogItem` is a
`abstract` method that each child must define. It's here that the specific
mappings are applied:
Finally, there is an additional contextual complexity in that the mappings that
we implementing are often subject to change as the business is still working out
the optimal data-flow betweeen the two applications. So, we often need to make
revisions on the fly.
```ts
class ShowMapper implements BaseMapper<IAlphaShowRecord, IOmegaShowRecord> {
mapCatalogItem(alphaRecord: IAlphaShowRecord): IOmegaShowRecord {
return {
omegaTitle: alphaRecord.title,
omegaRunningTime:
typeof alphaRecord?.release_duration === "number"
? Math.floor(alphaRecord.release_duration / 60).toString()
: "",
}
}
}
```
It should be clear from the preceding account that we have a domain where there
is a significant degree of commonality and repetition alongside more contingent
factors. I needed to create a solution that ... whilst being abstracted enough
to...
The `process` method is really just glue; for most strategies it just sends the
mapped payload to Omega:
My solution works as follows.
```ts
class ShowMapper implements BaseMapper<IAlphaShowRecord, IOmegaShowRecord> {
public async process(
alphaId: string,
alphaRecord: TAlphaRecord
): Promise<void> {
return this.updateOmegaRecord(this.mapCatalogItem(alphaRecord, alphaId))
}
}
```
You'll noticed that the type system and generics are leveraged in the class and
function signatures. Each strategy, depending on its category, will receive and
return a type corresponding to that category. In the previous example these are
`IAlphaShowRecord` and `IOmegaShowRecord`, respectively. Each of these child
types extends the base types (`IAlphaRecord`, `IOmegaRecord`) so common fields
can be passed down without repetition.
```ts
interface IAlphaRecord {
id: number
}
interface IAlphaShowRecord extends IAlphaRecord {
customField: string
}
```
The decoupled and extensible nature of the strategy pattern has meant that we
can easily accommodate revisions to the mappings without impacting the core
logic. It can also easily assimilate additional categories when the schema
changes.
In fact, I think there is little that we could not reconcile with this
architecture. For example, I mentioned earlier that for certain categories,
Alpha fields must be combined with parent records in Omega, necessitating
additional API fetching and parsing. In these case we simply redefine `process`
on the child to do the extra lookup.
The integration, thus described, covers "business as usual": the frequent
updates that our users will make via the third-party software in the normal
business case.
During the development of the integration we were also tasked with creating a
program that will seed Omega with its initial base data from Alpha, before users
can start adding their own content. This requires exporting all records from
Alpha and systematically transferring them to Omega. As part of this process we
store the Alpha records in a temporary database, so that we can record
success/failure for each individual upload.
We are again mapping Alpha data types to Omega data types. The difference is
that this program will only run once at initialisation and then will not be used
again. In this scenario, a lot of the work of the lambda is removed since we are
getting our data direct from an Alpha export rather than via API calls.
I decided to write the program as simple Python script that receives the
exported Alpha data via a CSV file, maps it and uploads to Omega. By this time,
I had read up on the strategy pattern and was able to produce a more elegant
implementation that exploits the excellent `pydantic` validation library to
divorce all the mapping procedures from the ingestion logic entirely, minus the
verbosity and control-freakery of TypeScript!
Again, there is a factory class that matches the Alpha category to a strategy:
```py
class IngestorService:
"""
Orchestrates ingestion of raw Alpha export data into `upload_tracker` MySQL table
"""
def __init__(self):
self._strategies: Dict[Category, BaseIngestionStrategy] = {
Category.SHOW: ShowIngestionStrategy(),
Category.EPISODE: EpisodeIngestionStrategy()
}
def ingest(self, export_file_manifest: Dict[Category, str], db_conn):
for category, file_path in export_file_manifest.items():
strategy = self._strategies.get(category)
try:
strategy.run(file_path, db_conn)
except Exception as e:
raise Exception from e
```
Similar to `BaseMapper` in the TS version, there is an abstract base class that
includes an abstract method for the individual mappings and a lookup table that
matches each category to its export CSV, but this is now much more concise:
```py
from abc import ABC
from typing import Generic, List, Type, TypeVar,
T = TypeVar("T", bound=AlphaBaseRecord)
class BaseIngestionStrategy(ABC, Generic[T]):
model_class: Type[T]
def parse(self, export_file_path: str) -> List[T]:
with open(export_file_path) as f:
raw_data = json.load(f)
return [self.model_class(**item) for item in raw_data]
def insert(self, data: List[T], db_conn):
pass
# Inserts mapped data into database...
def run(self, file_path, db_conn):
parsed = self.parse(file_path)
self.insert(parsed, db_conn)
```
The `parse` method doesn't know or care about which kind of record it is
parsing. So long as the `pydantic` validation against the model passes, it will
inject it into the database.
Thanks to `pydantic` doing the core mapping work:
```py
class ShowRecord(AlphaBaseRecord):
custom_field: Optional[str] = None
@property
def to_omega(self) -> OmegaPayloadShow
return {
"custom_field_with_diff_name": self.custom_field
}
```
...the actual strategy is extremely clean and minimal. Show, for example, is
just:
```py
from models.show_fabric import ShowRecord
from modules.base_ingestion_strategy import BaseIngestionStrategy
class ShowIngestionStrategy(BaseIngestionStrategy):
model_class = ShowRecord
```
Hopefully the demonstrations in each language underscore the core pattern at
work in both. The strategy pattern has helped me to reduce cognitive overhead
and produce highly maintainable and extensible solutions in two related
programming contexts.