# How to do product mix optimization in real-time

## Linear programming on streaming data within an Apache Beam pipeline

Not everything needs to be machine learning! Classical operations research methods can be quite good for optimizing processes, product-mixes, and logistics if you understand the constraints you are operating under. You can even operate on streaming data within an Apache Beam pipeline to achieve real-time control and agility.

## Problem

Imagine that you have a manufacturing facility that can produce 4 types of products (A, B, C, and D) from 4 ingredients (dye, labor, water, and concentrate). Moreover, the value of producing A (the profit if you will) is $50, of B is $100 and so on:

Every hour, you get new supplies:

`{"dye": 5710, "labor": 870, "water": 22350, "concentrate": 5010}`

We want to maximize the total profit:

`50A + 100B + 125C + 40D`

How much of A, B, C, and D should you manufacture?

## Cost function and Constraints

It depends on how much of each ingredient you need to produce the products. Let’s say that you need 50 mg of dye, 5 person-hours, 300 liters of water and 30 ml of concentrate to make product A and similarly, you know what you need for the other products:

Moreover, you can not ship more than 25 units of B per hour and not more than 10 units of C per hour.

These are the constraints. If you write them up, you will get an equations of this form for dye (since you received 5710 mg of it):

`50A + 60B + 100C + 50D <= 5710`

The bounds for A is (0, inf) whereas for B, it is (0, 25).

## Cost function and Constraints

We can solve a constrained optimization problem like this using linear programming. scipy has a handy implementation:

def linopt(materials):

import numpy as np

from scipy.optimize import linprog

from scipy.optimize import OptimizeResult # coefficients of optimization function to *minimize*

c = -1 * np.array([50, 100, 125, 40])

# constraints A_ub @x <= b_ub (could also use a_eq, b_eq, etc.)

A_ub = [

[50, 60, 100, 50],

[5, 25, 10, 5],

[300, 400, 800, 200],

[30, 75, 50, 20]

]

b_ub = [materials['dye'],

materials['labor'],

materials['water'],

materials['concentrate']]

bounds = [

(0, np.inf),

(0, 25),

(0, 10),

(0, np.inf)

]

res = linprog(c, A_ub=A_ub, b_ub=b_ub, bounds=bounds)

return np.round(res.x)

Given the input materials:

`{"dye": 5170, "labor": 700, "water": 29940, "concentrate": 5160}`

the optimal product mix is:

`[37. 10. 10. 35.]`

## Apache Beam pipeline

Wrap this in an Apache Beam pipeline, and you can basically solve the problem for every JSON message you receive in a Pub/Sub topic.

For trying it out, let’s use a text file:

` p = beam.Pipeline(options=options)`

(p

| 'ingest' >> beam.io.ReadFromText(INPUT)

| 'optimize' >> beam.Map(lambda x : linopt(json.loads(x)))

| 'output' >> beam.io.WriteToText(OUTPUT)

)

result = p.run()

## Carrying inventory

In reality, we’d want to carry some inventory. If we receive 30 mg of dye, and use only 28 mg, we’d keep the remaining 2 mg for the next iteration. We can do that with Apache Beam by maintaining a stateful variable:

class Inventory:

def __init__(self):

# assume water and labor can not be stored in inventory

self.dye = 0

self.concentrate = 0 def update(self, leftover):

self.dye = leftover[0]

self.concentrate = leftover[3]

The optimization method is modified to use and update the inventory:

def linopt(materials,inventory):

...

b_ub = [

materials['dye'] +inventory.dye,

materials['labor'],

materials['water'],

materials['concentrate'] +inventory.concentrate

]

...leftover = b_ub - np.matmul(A_ub, qty)

inventory.update(leftover)

return qty

The pipeline is modified to use the inventory as a side input:

inventory = Inventory()(p

| 'ingest' >> beam.io.ReadFromText(INPUT)

| 'parse' >> beam.Map(lambda x: json.loads(x))

| 'optimize' >> beam.Map(lambda x: linopt(x,inventory))

| 'output' >> beam.io.WriteToText(OUTPUT)

)

Now, the remaining inventory from the previous timestep is used in addition to materials received:

inventory from previous step:

[1340.0, 5.0, 140.0, 278895.0]material received:

{'dye': 5000, 'labor': 770, 'water': 20210, 'concentrate': 5300}total available:

[6340.0, 770, 20210, 284195.0]qty:

[ 0. 17. 0. 65.]inventory for next step:

[2070.0, 20.0, 410.0, 281620.0]

The full code is on GitHub. Enjoy!