# How to do product mix optimization in real-time

## Linear programming on streaming data within an Apache Beam pipeline

Not everything needs to be machine learning! Classical operations research methods can be quite good for optimizing processes, product-mixes, and logistics if you understand the constraints you are operating under. You can even operate on streaming data within an Apache Beam pipeline to achieve real-time control and agility.

## Problem

Imagine that you have a manufacturing facility that can produce 4 types of products (A, B, C, and D) from 4 ingredients (dye, labor, water, and concentrate). Moreover, the value of producing A (the profit if you will) is \$50, of B is \$100 and so on:

Every hour, you get new supplies:

`{"dye": 5710, "labor": 870, "water": 22350, "concentrate": 5010}`

We want to maximize the total profit:

`50A + 100B + 125C + 40D`

How much of A, B, C, and D should you manufacture?

## Cost function and Constraints

It depends on how much of each ingredient you need to produce the products. Let’s say that you need 50 mg of dye, 5 person-hours, 300 liters of water and 30 ml of concentrate to make product A and similarly, you know what you need for the other products:

Moreover, you can not ship more than 25 units of B per hour and not more than 10 units of C per hour.

These are the constraints. If you write them up, you will get an equations of this form for dye (since you received 5710 mg of it):

`50A + 60B + 100C + 50D <= 5710`

The bounds for A is (0, inf) whereas for B, it is (0, 25).

## Cost function and Constraints

We can solve a constrained optimization problem like this using linear programming. scipy has a handy implementation:

`def linopt(materials):    import numpy as np    from scipy.optimize import linprog    from scipy.optimize import OptimizeResult    # coefficients of optimization function to *minimize*    c = -1 * np.array([50, 100, 125, 40])    # constraints  A_ub @x <= b_ub (could also use a_eq, b_eq, etc.)    A_ub = [        [50, 60, 100, 50],        [5, 25, 10, 5],        [300, 400, 800, 200],        [30, 75, 50, 20]    ]    b_ub = [materials['dye'],             materials['labor'],             materials['water'],             materials['concentrate']]    bounds = [        (0, np.inf),        (0, 25),        (0, 10),        (0, np.inf)    ]    res = linprog(c, A_ub=A_ub, b_ub=b_ub, bounds=bounds)    return np.round(res.x)`

Given the input materials:

`{"dye": 5170, "labor": 700, "water": 29940, "concentrate": 5160}`

the optimal product mix is:

`[37. 10. 10. 35.]`

## Apache Beam pipeline

Wrap this in an Apache Beam pipeline, and you can basically solve the problem for every JSON message you receive in a Pub/Sub topic.

For trying it out, let’s use a text file:

`    p = beam.Pipeline(options=options)    (p      | 'ingest'    >> beam.io.ReadFromText(INPUT)      | 'optimize'  >> beam.Map(lambda x : linopt(json.loads(x)))      | 'output'    >> beam.io.WriteToText(OUTPUT)    )    result = p.run()`

## Carrying inventory

In reality, we’d want to carry some inventory. If we receive 30 mg of dye, and use only 28 mg, we’d keep the remaining 2 mg for the next iteration. We can do that with Apache Beam by maintaining a stateful variable:

`class Inventory:    def __init__(self):        # assume water and labor can not be stored in inventory        self.dye = 0        self.concentrate = 0    def update(self, leftover):        self.dye = leftover        self.concentrate = leftover`

The optimization method is modified to use and update the inventory:

`def linopt(materials, inventory):    ...    b_ub = [        materials['dye'] + inventory.dye,        materials['labor'],        materials['water'],        materials['concentrate'] + inventory.concentrate    ]        ...    leftover = b_ub - np.matmul(A_ub, qty)    inventory.update(leftover)    return qty`

The pipeline is modified to use the inventory as a side input:

`inventory = Inventory()(p                    | 'ingest' >> beam.io.ReadFromText(INPUT)                    | 'parse' >> beam.Map(lambda x: json.loads(x))                          | 'optimize' >> beam.Map(lambda x: linopt(x, inventory))                    | 'output' >> beam.io.WriteToText(OUTPUT)    )`

Now, the remaining inventory from the previous timestep is used in addition to materials received:

`inventory from previous step: [1340.0, 5.0, 140.0, 278895.0]material received: {'dye': 5000, 'labor': 770, 'water': 20210, 'concentrate': 5300}total available:[6340.0, 770, 20210, 284195.0]qty:[ 0. 17.  0. 65.]inventory for next step:[2070.0, 20.0, 410.0, 281620.0]`

The full code is on GitHub. Enjoy!

--

--

articles are personal observations and not investment advice.