Unigram Step-by-Step Implementation#
Unigram Language Model (ULM) is a subword tokenization algorithm, frequently used for Natural Language Processing (NLP) tasks. It’s a statistical language model which is trained to predict the next word in a sentence in the context of the previous words. Here, we will walk you through a detailed implementation of the Unigram algorithm using a financial news headlines dataset.
Dataset Preparation#
First, we need to load our dataset. We will use the ashraq/financial-news dataset from the Hugging Face Hub. We will use the headline column as our text data. We will randomly sample 1000 records from this dataset for our tokenization process. Here is how we can do this:
from datasets import load_dataset
dataset = load_dataset("ashraq/financial-news")
texts = dataset["train"].shuffle(seed=1234).select(range(1000))["headline"]
Found cached dataset parquet (/home/yjlee/.cache/huggingface/datasets/ashraq___parquet/ashraq--financial-news-89d6ac597a40e29e/0.0.0/2a3b91fbd88a2c90d1dbbb32b460cf621d31bd5b05b934492fdef7d8d6f236ec)
100%|██████████| 1/1 [00:00<00:00, 54.01it/s]
Loading cached shuffled indices for dataset at /home/yjlee/.cache/huggingface/datasets/ashraq___parquet/ashraq--financial-news-89d6ac597a40e29e/0.0.0/2a3b91fbd88a2c90d1dbbb32b460cf621d31bd5b05b934492fdef7d8d6f236ec/cache-79cde8905e45a47f.arrow
import re
from collections import defaultdict
def pre_tokenize(text, lowercase=True):
if lowercase:
text = text.lower()
text = re.sub(r"\s+", " ", text)
return text.split(" ")
def initialize_vocab(texts, lowercase=True):
vocab = defaultdict(int)
for text in texts:
words = pre_tokenize(text, lowercase)
for word in words:
vocab[word] += 1
return vocab
word_freqs = initialize_vocab(texts)
print("Number of words: {}".format(len(word_freqs.keys())))
Number of words: 3636
Vocabulary Initialization#
For Unigram, we need to initialize our vocabulary to something larger than the vocabulary size we want. We have to include all the basic characters (otherwise we won’t be able to tokenize every word). For the bigger substrings, we can use the most frequent substrings in the corpus:
character_freqs = defaultdict(int)
subwords_freqs = defaultdict(int)
for word, freq in word_freqs.items():
for i in range(len(word)):
character_freqs[word[i]] += freq
# Loop through the subwords of length at least 2
for j in range(i + 2, len(word) + 1):
subwords_freqs[word[i:j]] += freq
Then, we sort subwords by frequency:
sorted_subwords = sorted(subwords_freqs.items(), key=lambda x: x[1], reverse=True)
print(sorted_subwords[:10])
len(sorted_subwords)
[('in', 1092), ('re', 706), ('er', 690), ('st', 675), ('es', 673), ('an', 629), ('ar', 617), ('al', 610), ('on', 539), ('ng', 504)]
33925
token_freqs = (
list(character_freqs.items()) + sorted_subwords[: 2000 - len(character_freqs)]
)
token_freqs = {token: freq for token, freq in token_freqs}
len(token_freqs)
2000
Model Initialization#
Next, we create a Unigram model, where each token is associated with a negative log-likelihood:
from math import log
total_sum = sum([freq for token, freq in token_freqs.items()])
model = {token: -log(freq / total_sum) for token, freq in token_freqs.items()}
len(model)
2000
Word Encoding#
The encode_word function is used to find the best segmentation of a word into subwords, using the Unigram model:
def encode_word(word, model):
best_segmentations = [{"start": 0, "score": 1}] + [
{"start": None, "score": None} for _ in range(len(word))
]
for start_idx in range(len(word)):
# This should be properly filled by the previous steps of the loop
best_score_at_start = best_segmentations[start_idx]["score"]
for end_idx in range(start_idx + 1, len(word) + 1):
token = word[start_idx:end_idx]
if token in model and best_score_at_start is not None:
score = model[token] + best_score_at_start
# If we have found a better segmentation ending at end_idx, we update
if (
best_segmentations[end_idx]["score"] is None
or best_segmentations[end_idx]["score"] > score
):
best_segmentations[end_idx] = {"start": start_idx, "score": score}
segmentation = best_segmentations[-1]
if segmentation["score"] is None:
# We did not find a tokenization of the word -> unknown
return ["<unk>"], None
score = segmentation["score"]
start = segmentation["start"]
end = len(word)
tokens = []
while start != 0:
tokens.insert(0, word[start:end])
next_start = best_segmentations[start]["start"]
end = start
start = next_start
tokens.insert(0, word[start:end])
return tokens, score
print(encode_word("iphone", model))
print(encode_word("apple", model))
(['i', 'ph', 'one'], 21.168929786103597)
(['app', 'le'], 16.224036483432627)
Loss Computation#
We then define a function to compute the loss of our model. The loss is the sum of the negative log-likelihoods of each word in the vocabulary, weighted by its frequency:
def compute_loss(model):
loss = 0
for word, freq in word_freqs.items():
_, word_loss = encode_word(word, model)
loss += freq * word_loss
return loss
compute_loss(model)
136832.67116572068
Compute Scores#
We compute the scores by calculating the change in the loss when a token is removed from the model:
import copy
def compute_scores(model):
scores = {}
model_loss = compute_loss(model)
for token, score in model.items():
# We always keep tokens of length 1
if len(token) == 1:
continue
model_without_token = copy.deepcopy(model)
_ = model_without_token.pop(token)
scores[token] = compute_loss(model_without_token) - model_loss
return scores
scores = compute_scores(model)
print(scores["app"])
print(scores["le"])
print(scores["investment"])
print(scores["invest"])
print(scores["ment"])
26.386676873022225
45.03959364909679
48.10544469070737
55.051404965808615
45.268116643244866
Model Optimization#
We iteratively remove tokens from the model to improve its performance:
percent_to_remove = 0.1
while len(model) > 1800:
scores = compute_scores(model)
sorted_scores = sorted(scores.items(), key=lambda x: x[1])
# Remove percent_to_remove tokens with the lowest scores.
for i in range(int(len(model) * percent_to_remove)):
print("Removing {}".format(sorted_scores[i][0]))
_ = token_freqs.pop(sorted_scores[i][0])
total_sum = sum([freq for token, freq in token_freqs.items()])
model = {token: -log(freq / total_sum) for token, freq in token_freqs.items()}
Tokenization#
Finally, we define a tokenize function that splits a given text into subwords according to our trained model:
def tokenize(text, model):
words = pre_tokenize(text)
encoded_words = [encode_word(word, model)[0] for word in words]
return sum(encoded_words, [])
tokenized_text = tokenize("investment opportunities in the company", model)
print(tokenized_text)
['investment', 'op', 'port', 'un', 'ities', 'in', 'the', 'compan', 'y']
This concludes our detailed step-by-step implementation of the Unigram Language Model.