Experiment 3
Aim of the Experiment
To implement and analyse the performance of the K-Mean and K-Medoid
algorithms for customer segmentation based on their annual income and spending
score.
Theory
A) K-Means algorithm
K-Means is an unsupervised clustering algorithm that partitions a dataset into K non-
overlapping groups (clusters) such that:
Points within the same cluster are similar (small intra-cluster distance).
Points from different clusters are dissimilar (large inter-cluster distance).
Key Concepts
Cluster center (centroid): The mean position of all points in the cluster.
Inertia (SSE): Sum of Squared Errors, a measure of how tightly the points are clustered
around the centroid:
K: The number of clusters you choose or determine via methods like the Elbow method or
Silhouette score.
Customer Segmentation Example
For your Annual Income vs. Spending Score data:
Each customer is a point xi= [Income, Spending Score].
K-Means groups them into segments such as:
1. Low income, low spend
2. Low income, high spend
3. High income, low spend
4. High income, high spend
Marketing teams can target these clusters differently.
HANDS-ON PYTHON IMPLEMENTATION
# ===========================================
# K-Means Customer Segmentation (2D features)
# Features: Annual Income, Spending Score
# Includes model selection + profiling + plots
# ===========================================
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score, calinski_harabasz_score, davies_bouldin_score
RANDOM_STATE = 42
# ---------- 1) Load data ----------
def load_or_generate():
"""
Try to load Mall_Customers.csv with columns:
- 'Annual Income (k$)'
- 'Spending Score (1-100)'
If not found, generate synthetic-but-realistic data.
"""
fname = "Mall_Customers.csv"
if os.path.exists(fname):
df = pd.read_csv(fname)
needed = ["Annual Income (k$)", "Spending Score (1-100)"]
if not all(col in df.columns for col in needed):
raise ValueError(f"CSV found but missing columns {needed}")
data = df[needed].copy()
data.rename(columns={"Annual Income (k$)": "Income_k",
"Spending Score (1-100)": "SpendScore"}, inplace=True)
return data, True
else:
# Synthetic clusters: (income_k$, spending_score)
rng = np.random.default_rng(RANDOM_STATE)
n_per = [90, 70, 80, 60]
centers = np.array([
[35, 30], # low income, low spend
[35, 75], # low income, high spend
[80, 40], # high income, low spend
[80, 80] # high income, high spend
])
scales = np.array([
[7, 10],
[7, 10],
[10, 12],
[10, 12],
])
X_parts = []
for (cx, cy), (sx, sy), n in zip(centers, scales, n_per):
pts = rng.normal(loc=[cx, cy], scale=[sx, sy], size=(n, 2))
X_parts.append(pts)
X = np.vstack(X_parts)
data = pd.DataFrame(X, columns=["Income_k", "SpendScore"])
# Clip to realistic bounds
data["Income_k"] = data["Income_k"].clip(10, 140)
data["SpendScore"] = data["SpendScore"].clip(1, 100)
return data, False
data, is_real = load_or_generate()
print(f"Loaded {'REAL' if is_real else 'SYNTHETIC'} data. Shape: {data.shape}")
# ---------- 2) Feature scaling ----------
X = data[["Income_k", "SpendScore"]].values
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# ---------- 3) Model selection (k from 2..10) ----------
k_range = range(2, 11)
inertias, sils, chs, dbs = [], [], [], []
for k in k_range:
km = KMeans(n_clusters=k, n_init=10, random_state=RANDOM_STATE)
labels = km.fit_predict(X_scaled)
inertias.append(km.inertia_)
sils.append(silhouette_score(X_scaled, labels))
chs.append(calinski_harabasz_score(X_scaled, labels))
dbs.append(davies_bouldin_score(X_scaled, labels))
# Heuristics: choose k by the best silhouette, (backup: elbow)
best_k_sil = k_range[int(np.argmax(sils))]
# Elbow (look for largest relative drop); simple heuristic:
rel_drop = [ (inertias[i-1] - inertias[i]) / inertias[i-1] for i in range(1, len(inertias)) ]
elbow_k = k_range[1 + int(np.argmax(rel_drop))]
print(f"Silhouette-optimal k: {best_k_sil}")
print(f"Elbow-suggested k : {elbow_k}")
# Pick final k (prefer silhouette)
FINAL_K = best_k_sil
# ---------- 4) Fit final model ----------
final_km = KMeans(n_clusters=FINAL_K, n_init=10, random_state=RANDOM_STATE)
labels = final_km.fit_predict(X_scaled)
# Cluster centers back to original scale for interpretation
centers_scaled = final_km.cluster_centers_
centers_orig = scaler.inverse_transform(centers_scaled)
# Append labels to data
segmented = data.copy()
segmented["Cluster"] = labels
# ---------- 5) Cluster profiling ----------
profile = (
segmented
.groupby("Cluster")
.agg(
n_customers=("Cluster", "size"),
income_mean=("Income_k", "mean"),
income_std=("Income_k", "std"),
spend_mean=("SpendScore", "mean"),
spend_std=("SpendScore", "std")
.sort_values("spend_mean", ascending=False)
print("\n=== Cluster Profile (original units) ===")
print(profile.round(2))
# ---------- 6) Plots ----------
# (A) Elbow
plt.figure()
plt.plot(list(k_range), inertias, marker="o")
plt.xlabel("k (number of clusters)")
plt.ylabel("Inertia (SSE)")
plt.title("Elbow Method")
plt.grid(True)
plt.show()
# (B) Quality metrics vs k
plt.figure()
plt.plot(list(k_range), sils, marker="o", label="Silhouette")
plt.plot(list(k_range), chs, marker="o", label="Calinski–Harabasz")
plt.plot(list(k_range), dbs, marker="o", label="Davies–Bouldin (lower better)")
plt.xlabel("k (number of clusters)")
plt.ylabel("Score")
plt.title("Cluster Quality Metrics vs k")
plt.legend()
plt.grid(True)
plt.show()
# (C) Final segmentation plot (original scale)
plt.figure()
for c in range(FINAL_K):
mask = labels == c
plt.scatter(
data.loc[mask, "Income_k"],
data.loc[mask, "SpendScore"],
s=40,
label=f"Cluster {c}"
# plot centers
plt.scatter(centers_orig[:, 0], centers_orig[:, 1], s=200, marker="X", edgecolor="k",
linewidths=1.5, label="Centers")
plt.xlabel("Annual Income (k$)")
plt.ylabel("Spending Score (1-100)")
plt.title(f"K-Means Segmentation (k={FINAL_K})")
plt.legend()
plt.grid(True)
plt.show()
# ---------- 7) Simple interpretation helper ----------
def label_cluster(row, centers):
# nearest center index
diffs = centers - np.array([row["Income_k"], row["SpendScore"]])
d2 = np.sum(diffs**2, axis=1)
j = int(np.argmin(d2))
return j
# Example: top 5 customers by spending score within each cluster
examples = (
segmented.sort_values(["Cluster", "SpendScore"], ascending=[True, False])
.groupby("Cluster")
.head(5)
.reset_index(drop=True)
print("\n=== Sample customers (top spenders) by cluster ===")
print(examples.head(20).to_string(index=False))
B) K-Medoid algorithm
K-Medoids is an unsupervised clustering algorithm closely related to K-Means, but instead
of using the mean of points in a cluster as the cluster center, it uses an actual data point (called
the medoid) as the center.
K-Means: Centroid can be anywhere in feature space (not necessarily a data point).
K-Medoids: Center is a real observation from the dataset.
Because medoids are real points, K-Medoids is more robust to noise and outliers than K-
Means.
Key Concepts
Medoid: The most centrally located point in a cluster, minimizing total dissimilarity to
all other points in that cluster.
Dissimilarity Measure: Usually Euclidean distance, but can be any metric (e.g.,
Manhattan, cosine).
Number of clusters KKK: Chosen before training, via methods like Silhouette score
or the elbow method.
Mathematical Objective
Comparison to K-Means
Aspect K-Means K-Medoids
Actual data point
Center type Mean (centroid)
(medoid)
Sensitivity to outliers High Low
Distance metrics Mostly Euclidean Any metric
Higher (due to swap
Computational cost Lower
checks)
HANDS-ON PYTHON IMPLEMENTATION
# =========================================================
# K-Medoids (PAM) Customer Segmentation, 2D (Income, Score)
# Model selection + profiling + clean plots
# =========================================================
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from dataclasses import dataclass
from typing import Tuple
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import silhouette_score, calinski_harabasz_score,
davies_bouldin_score
from sklearn.utils import check_random_state
RANDOM_STATE = 42
rs = check_random_state(RANDOM_STATE)
# ---------- 0) Optional: use sklearn-extra if available ----------
USE_SKLEARN_EXTRA = False
try:
from sklearn_extra.cluster import KMedoids as SKX_KMedoids # type: ignore
USE_SKLEARN_EXTRA = True
except Exception:
USE_SKLEARN_EXTRA = False
# ---------- 1) Load or generate data ----------
def load_or_generate() -> Tuple[pd.DataFrame, bool]:
"""
Try to load Mall_Customers.csv with columns:
- 'Annual Income (k$)'
- 'Spending Score (1-100)'
If not found, generate synthetic but realistic data.
"""
fname = "Mall_Customers.csv"
if os.path.exists(fname):
df = pd.read_csv(fname)
need = ["Annual Income (k$)", "Spending Score (1-100)"]
if not all(c in df.columns for c in need):
raise ValueError(f"CSV found but missing required columns: {need}")
data = df[need].rename(columns={
"Annual Income (k$)": "Income_k",
"Spending Score (1-100)": "SpendScore"
})
return data, True
# Synthetic: four consumer segments
n_per = [90, 70, 80, 60]
centers = np.array([
[35, 30], # low income, low spend
[35, 75], # low income, high spend
[80, 40], # high income, low spend
[80, 80], # high income, high spend
])
scales = np.array([
[7, 10],
[7, 10],
[10, 12],
[10, 12],
])
parts = []
for (cx, cy), (sx, sy), n in zip(centers, scales, n_per):
pts = rs.normal(loc=[cx, cy], scale=[sx, sy], size=(n, 2))
parts.append(pts)
X = np.vstack(parts)
data = pd.DataFrame(X, columns=["Income_k", "SpendScore"])
data["Income_k"] = data["Income_k"].clip(10, 140)
data["SpendScore"] = data["SpendScore"].clip(1, 100)
return data, False
data, is_real = load_or_generate()
print(f"Loaded {'REAL' if is_real else 'SYNTHETIC'} data. Shape: {data.shape}")
# ---------- 2) Scale features ----------
X_orig = data[["Income_k", "SpendScore"]].values
scaler = StandardScaler()
X = scaler.fit_transform(X_orig) # distances behave better after scaling
# ---------- 3) PAM (K-Medoids) fallback implementation ----------
def pairwise_sqeuclidean(A, B=None):
"""Efficient squared Euclidean distances (n x m)."""
if B is None:
B=A
a2 = (A*A).sum(axis=1)[:, None]
b2 = (B*B).sum(axis=1)[None, :]
return a2 + b2 - 2 * A @ B.T
@dataclass
class PamResult:
medoid_indices: np.ndarray
labels: np.ndarray
cost: float
def pam_kmedoids(X, n_clusters=4, max_iter=100, random_state=42) -> PamResult:
"""
Simple PAM (Partitioning Around Medoids) using squared Euclidean distance.
- Initialize medoids with unique random points.
- Alternate assign / best single swap until no improvement.
"""
rs = check_random_state(random_state)
n = X.shape[0]
# init: distinct random medoids
medoids = rs.choice(n, size=n_clusters, replace=False)
D2 = pairwise_sqeuclidean(X) # precompute
def assign(meds):
dist_to_meds = D2[:, meds] # n x k
labels = np.argmin(dist_to_meds, axis=1)
costs = dist_to_meds[np.arange(n), labels]
return labels, costs
labels, costs = assign(medoids)
best_cost = float(costs.sum())
for _ in range(max_iter):
improved = False
# attempt best single swap
current_set = set(medoids.tolist())
for m_idx in range(n_clusters):
m = medoids[m_idx]
for h in range(n):
if h in current_set:
continue
# try swapping medoid m with non-medoid h
trial = medoids.copy()
trial[m_idx] = h
trial_labels, trial_costs = assign(trial)
trial_cost = float(trial_costs.sum())
if trial_cost + 1e-9 < best_cost: # strict improvement
medoids = np.sort(trial) # keep order stable
labels, costs = trial_labels, trial_costs
best_cost = trial_cost
improved = True
if not improved:
break
return PamResult(medoids, labels, best_cost)
# ---------- 4) Model selection over K ----------
k_range = range(2, 11)
sil_scores, ch_scores, db_scores, total_costs = [], [], [], []
def fit_kmedoids(X, k):
if USE_SKLEARN_EXTRA:
km = SKX_KMedoids(n_clusters=k, metric='euclidean', init='k-medoids++',
random_state=RANDOM_STATE, method='pam', max_iter=300)
labels = km.fit_predict(X)
# cost: sum of distances to medoids
from sklearn.metrics import pairwise_distances
dist = pairwise_distances(X, km.cluster_centers_, metric='euclidean')
cost = float(np.min(dist, axis=1).sum())
meds = []
# Map centers back to nearest data points (true medoids)
for c in km.cluster_centers_:
j = np.argmin(((X - c)**2).sum(axis=1))
meds.append(j)
medoid_indices = np.array(meds, dtype=int)
return labels, medoid_indices, cost
else:
res = pam_kmedoids(X, n_clusters=k, max_iter=100,
random_state=RANDOM_STATE)
return res.labels, res.medoid_indices, res.cost
for k in k_range:
labels, medoids_idx, cost = fit_kmedoids(X, k)
total_costs.append(cost)
sil_scores.append(silhouette_score(X, labels))
ch_scores.append(calinski_harabasz_score(X, labels))
db_scores.append(davies_bouldin_score(X, labels))
best_k = k_range[int(np.argmax(sil_scores))]
print(f"Best K by Silhouette: {best_k}")
# ---------- 5) Fit final model ----------
final_labels, final_medoids_idx, final_cost = fit_kmedoids(X, best_k)
# Medoids in original units: they are actual data rows
medoids_orig = X_orig[final_medoids_idx, :]
segmented = data.copy()
segmented["Cluster"] = final_labels
# ---------- 6) Cluster profiling ----------
profile = (
segmented.groupby("Cluster")
.agg(
n_customers=("Cluster", "size"),
income_mean=("Income_k", "mean"),
income_std=("Income_k", "std"),
spend_mean=("SpendScore", "mean"),
spend_std=("SpendScore", "std")
.sort_values("spend_mean", ascending=False)
print("\n=== Cluster Profile (original units) ===")
print(profile.round(2))
# ---------- 7) Plots ----------
# (A) Total medoid cost vs K (lower is better)
plt.figure()
plt.plot(list(k_range), total_costs, marker="o")
plt.xlabel("K (number of clusters)")
plt.ylabel("Total medoid cost (sum of distances)")
plt.title("K-Medoids: Cost vs K")
plt.grid(True)
plt.show()
# (B) Quality metrics vs K
plt.figure()
plt.plot(list(k_range), sil_scores, marker="o", label="Silhouette (↑)")
plt.plot(list(k_range), ch_scores, marker="o", label="Calinski–Harabasz (↑)")
plt.plot(list(k_range), db_scores, marker="o", label="Davies–Bouldin (↓)")
plt.xlabel("K (number of clusters)")
plt.ylabel("Score")
plt.title("Cluster Quality Metrics vs K")
plt.legend()
plt.grid(True)
plt.show()
# (C) Final segmentation (original scale)
plt.figure()
for c in range(best_k):
mask = final_labels == c
plt.scatter(
data.loc[mask, "Income_k"],
data.loc[mask, "SpendScore"],
s=40,
label=f"Cluster {c}"
# plot medoids (true customers)
plt.scatter(medoids_orig[:, 0], medoids_orig[:, 1], s=220, marker="X",
edgecolor="k", linewidths=1.5, label="Medoids")
plt.xlabel("Annual Income (k$)")
plt.ylabel("Spending Score (1-100)")
plt.title(f"K-Medoids Segmentation (K={best_k})")
plt.legend()
plt.grid(True)
plt.show()
# ---------- 8) Peek at typical customers per cluster ----------
examples = (
segmented.sort_values(["Cluster", "SpendScore"], ascending=[True, False])
.groupby("Cluster")
.head(5)
.reset_index(drop=True)
print("\n=== Sample customers (top spenders) by cluster ===")
print(examples.head(20).to_string(index=False))