Programming Assignment
University of The People
CS 3308-01: Information Retrieval
Instructor: Sharina Babb
16th July 2025
import string
import sys
import os
import re
import math
import sqlite3
import time
from typing import Dict, Set, List
from collections import defaultdict, Counter
from nltk.stem import PorterStemmer
# Define stop words
stop_words = set(["a", "an", "the", "and", "or", "but", "is", "are",
"was", "were", "in", "of", "to", "with"])
# Compile regex patterns for efficiency
chars = re.compile(r'\W+')
pattid = re.compile(r'(\d{3})/(\d{3})/(\d{3})')
# Global counters for corpus statistics
tokens = 0
documents = 0
terms = 0
stop_word_count = 0
# Database to store term information
database: Dict[str, 'Term'] = {}
class Term:
"""
Class to represent term information in the index
Stores term frequency, document frequency, and posting
information
"""
def __init__(self):
self.termid: int = 0
self.termfreq: int = 0
self.docs: int = 0
self.docids: Dict[int, int] = {}
def splitchars(line: str) -> List[str]:
"""
Split input text into tokens based on non-word characters
Args:
line: Input text string
Returns:
List of tokens
"""
return chars.split(line)
def remove_stop_words(tokens: List[str]) -> List[str]:
global stop_word_count
filtered_tokens = [token for token in tokens if token not in
stop_words]
stop_word_count += len(tokens) - len(filtered_tokens)
return filtered_tokens
ps = PorterStemmer()
def stem_tokens(tokens: List[str]) -> List[str]:
return [ps.stem(token) for token in tokens]
def remove_punctuation_tokens(tokens: List[str]) -> List[str]:
return [token for token in tokens if token and token[0] not in
string.punctuation]
def parsetoken(line: str) -> List[str]:
"""
Process a line of text to extract and index terms
Args:
line: Input text line
Returns:
List of processed tokens
"""
global documents, tokens, terms
# Normalize input text
line = line.replace('\t', ' ').strip()
# Split into tokens
token_list = splitchars(line)
token_list = remove_stop_words(token_list)
token_list = stem_tokens(token_list)
token_list = remove_punctuation_tokens(token_list)
for token in token_list:
# Clean and normalize token
token = token.replace('\n', '')
lower_token = token.lower().strip()
if not lower_token: # Skip empty tokens
continue
tokens += 1 # Increment total token count
# Add new term to database if not exists
if lower_token not in database:
terms += 1
database[lower_token] = Term()
database[lower_token].termid = terms
database[lower_token].docids = {}
database[lower_token].docs = 0
# Update posting information
if documents not in database[lower_token].docids:
database[lower_token].docs += 1
database[lower_token].docids[documents] = 0
# Update term frequency
database[lower_token].docids[documents] += 1
database[lower_token].termfreq += 1
return token_list
def process(filename: str) -> bool:
"""
Process a single document file
Args:
filename: Path to document file
Returns:
Boolean indicating success
"""
try:
# print(f"Reading file: {filename}")
with open(filename, 'r', encoding='utf-8') as file:
for line in file:
parsetoken(line)
return True
except IOError as e:
print(f"Error processing file {filename}: {str(e)}")
return False
except UnicodeDecodeError:
print(f"Unicode decode error in file {filename}")
return False
def walkdir(cur: sqlite3.Cursor, dirname: str) -> bool:
"""
Recursively walk through directory and process all files
Args:
cur: Database cursor
dirname: Directory path
Returns:
Boolean indicating success
"""
global documents
try:
# Get all files and directories
all_items = [f for f in os.listdir(dirname)
if os.path.isdir(os.path.join(dirname, f))
or os.path.isfile(os.path.join(dirname, f))]
for item in all_items:
full_path = os.path.join(dirname, item)
if os.path.isdir(full_path):
print(f"Entering directory: {full_path}")
walkdir(cur, full_path)
else:
# print(f"Processing file: {full_path}")
documents += 1
# Add document to dictionary
cur.execute("INSERT INTO DocumentDictionary VALUES
(?, ?)",
(full_path, documents))
if not process(full_path):
print(f"Failed to process file: {full_path}")
return True
except Exception as e:
print(f"Error walking directory {dirname}: {str(e)}")
return False
def setup_database(cursor: sqlite3.Cursor):
"""
Set up database tables and indexes
Args:
cursor: Database cursor
"""
# Document Dictionary
cursor.execute("DROP TABLE IF EXISTS DocumentDictionary")
# Term Dictionary
cursor.execute("DROP TABLE IF EXISTS TermDictionary")
# Posting Table
cursor.execute("DROP TABLE IF EXISTS Posting")
# Create new tables
cursor.execute("""
CREATE TABLE IF NOT EXISTS DocumentDictionary (
DocumentName TEXT,
DocId INTEGER PRIMARY KEY
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS TermDictionary (
Term TEXT,
TermId INTEGER PRIMARY KEY
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS Posting (
TermId INTEGER,
DocId INTEGER,
tfidf REAL,
docfreq INTEGER,
termfreq INTEGER,
FOREIGN KEY(TermId) REFERENCES TermDictionary(TermId),
FOREIGN KEY(DocId) REFERENCES
DocumentDictionary(DocId)
)
""")
# Create indexes
cursor.execute("CREATE INDEX IF NOT EXISTS idx_term ON
TermDictionary(Term)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_posting_term
ON Posting(TermId)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_posting_doc ON
Posting(DocId)")
def calculate_frequencies():
term_frequencies = []
document_frequencies = defaultdict(int)
for term, term_obj in database.items():
tf = Counter(term_obj.docids)
term_frequencies.append(tf)
for doc_id in tf:
document_frequencies[term] += 1
return term_frequencies, document_frequencies
def calculate_idf(document_frequencies, total_docs):
idf = {}
for term, df in document_frequencies.items():
idf[term] = math.log(total_docs / df)
return idf
def calculate_tf_idf(term_frequencies, idf):
tf_idf = []
for tf in term_frequencies:
tf_idf_doc = {}
for term, freq in tf.items():
tf_idf_doc[term] = freq * idf.get(term, 0)
tf_idf.append(tf_idf_doc)
return tf_idf
class InvertedIndex:
def __init__(self):
self.index = defaultdict(list)
def add_document(self, doc_id, tf_idf):
for term, weight in tf_idf.items():
self.index[term].append((doc_id, weight))
def report_statistics():
total_terms = sum(term.termfreq for term in database.values())
unique_terms = len(database)
print(f"Number of documents processed: {documents}")
print(f"Total number of terms parsed from all documents:
{tokens}")
print(f"Total number of unique terms found and added to the
index: {unique_terms}")
print(f"Total number of terms found that matched one of the stop
words: {stop_word_count}")
def main():
"""
Main execution function
"""
# Record start time
start_time = time.localtime()
print(f"Start Time: {start_time.tm_hour:02d}:
{start_time.tm_min:02d}")
# Initialize database
db_path = "cacm_index.db"
conn = sqlite3.connect(db_path)
conn.isolation_level = None # Enable autocommit
cursor = conn.cursor()
# Setup database tables
setup_database(cursor)
# Process corpus
corpus_path = "./cacm" # Update this path to match your
environment
if not os.path.exists(corpus_path):
print(f"Error: Corpus directory not found at {corpus_path}")
return
walkdir(cursor, corpus_path)
# Calculate tf-idf for each term in each document
term_frequencies, document_frequencies =
calculate_frequencies()
idf = calculate_idf(document_frequencies, documents)
tf_idf = calculate_tf_idf(term_frequencies, idf)
# Insert terms into database
for term, term_obj in database.items():
cursor.execute("INSERT INTO TermDictionary (Term, TermId)
VALUES (?, ?)",
(term, term_obj.termid))
# Insert posting information
for doc_id, freq in term_obj.docids.items():
tfidf = freq * math.log(documents / term_obj.docs)
cursor.execute("""
INSERT INTO Posting
(TermId, DocId, tfidf, docfreq, termfreq)
VALUES (?, ?, ?, ?, ?)
""", (term_obj.termid, doc_id, tfidf, term_obj.docs, freq))
# Commit changes and close connection
conn.commit()
conn.close()
# Print statistics
report_statistics()
end_time = time.localtime()
print(f"\nEnd Time: {end_time.tm_hour:02d}:
{end_time.tm_min:02d}")
if __name__ == '__main__':
main()
statistics
References:
Manning, C.D., Raghaven, P., & Schütze, H. (2009). An Introduction to
Information Retrieval (Online ed.). Cambridge, MA: Cambridge
University Press. Available at
http://nlp.stanford.edu/IR-book/information-retrieval-book.html