import numpy as np
import torch
from transformers import AutoTokenizer, AutoModelForTokenClassification, TrainingArguments, Trainer
from datasets import Dataset
import evaluate
# Label configuration
label_list = ["O", "B-NAME", "I-NAME", "B-EMAIL", "I-EMAIL", "B-PHONE", "I-PHONE"]
label_to_id = {label: idx for idx, label in enumerate(label_list)}
id_to_label = {idx: label for label, idx in label_to_id.items()}
o_idx = label_to_id["O"]
non_o_indices = [i for i, l in enumerate(label_list) if l != "O"]
# Sample data preprocessed into tokens and word-level NER tags
tokens_data = [
["Contact", "John", "Doe", "at", "john.doe@example.com", "or", "123-456-7890", "."],
["Send", "an", "email", "to", "jane_smith@mail.com", "."],
["Call", "987-654-3210", "for", "support", "."]
]
ner_tags_data = [
[label_to_id["O"], label_to_id["B-NAME"], label_to_id["I-NAME"], label_to_id["O"], label_to_id["B-EMAIL"], label_to_id["O"], label_to_id["B-PHONE"], label_to_id["O"]],
[label_to_id["O"], label_to_id["O"], label_to_id["O"], label_to_id["O"], label_to_id["B-EMAIL"], label_to_id["O"]],
[label_to_id["O"], label_to_id["B-PHONE"], label_to_id["O"], label_to_id["O"], label_to_id["O"]]
]
data = [{"tokens": tokens, "ner_tags": tags} for tokens, tags in zip(tokens_data, ner_tags_data)]
dataset = Dataset.from_list(data)
tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")
def tokenize_and_align_labels(examples):
tokenized_inputs = tokenizer(
examples["tokens"],
truncation=True,
is_split_into_words=True
)
labels = []
for i, label in enumerate(examples["ner_tags"]):
word_ids = tokenized_inputs.word_ids(batch_index=i)
previous_word_idx = None
label_ids = []
for word_idx in word_ids:
if word_idx is None:
label_ids.append(-100)
elif word_idx != previous_word_idx:
label_ids.append(label[word_idx])
else:
label_ids.append(-100)
previous_word_idx = word_idx
labels.append(label_ids)
tokenized_inputs["labels"] = labels
return tokenized_inputs
tokenized_dataset = dataset.map(tokenize_and_align_labels, batched=True)
tokenized_datasets = tokenized_dataset.train_test_split(test_size=0.2, seed=42)
train_dataset = tokenized_datasets["train"]
val_dataset = tokenized_datasets["test"]
model = AutoModelForTokenClassification.from_pretrained(
"bert-base-cased",
num_labels=len(label_list),
id2label=id_to_label,
label2id=label_to_id
)
# Softmax function for numpy
def softmax(x, axis=-1):
e_x = np.exp(x - np.max(x, axis=axis, keepdims=True))
return e_x / np.sum(e_x, axis=axis, keepdims=True)
# Metric with confidence threshold
metric = evaluate.load("seqeval")
conf_threshold = 0.3 # Adjustable threshold to boost recall
def compute_metrics(p):
logits, labels = p.predictions, p.label_ids
probs = softmax(logits)
predictions = []
for i in range(probs.shape[0]):
batch_preds = []
for j in range(probs.shape[1]):
if labels[i][j] == -100:
batch_preds.append(-100)
continue
non_o_probs = probs[i, j][non_o_indices]
max_non_o_prob = np.max(non_o_probs)
if max_non_o_prob > conf_threshold:
pred_id = non_o_indices[np.argmax(non_o_probs)]
else:
pred_id = o_idx
batch_preds.append(pred_id)
predictions.append(batch_preds)
predictions = np.array(predictions)
true_predictions = [
[id_to_label[p] for (p, l) in zip(prediction, label) if l != -100]
for prediction, label in zip(predictions, labels)
]
true_labels = [
[id_to_label[l] for l in label if l != -100]
for label in labels
]
results = metric.compute(predictions=true_predictions, references=true_labels, zero_division=0)
return {
"precision": results["overall_precision"],
"recall": results["overall_recall"],
"f1": results["overall_f1"],
"accuracy": results["overall_accuracy"],
}
training_args = TrainingArguments(
output_dir="./results",
evaluation_strategy="epoch",
learning_rate=3e-5,
per_device_train_batch_size=8,
per_device_eval_batch_size=8,
num_train_epochs=5,
weight_decay=0.01,
save_total_limit=1,
load_best_model_at_end=True,
metric_for_best_model="f1",
greater_is_better=True
)
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=val_dataset,
tokenizer=tokenizer,
compute_metrics=compute_metrics
)
trainer.train()
def redact_pii(text):
inputs = tokenizer(text, return_tensors="pt")
with torch.no_grad():
outputs = model(**inputs)
logits = outputs.logits[0, 1:-1].cpu().numpy()
probs = softmax(logits)
tokens = tokenizer.tokenize(text)
predictions = []
for j in range(len(probs)):
non_o_probs = probs[j][non_o_indices]
max_non_o_prob = np.max(non_o_probs)
if max_non_o_prob > conf_threshold:
pred_id = non_o_indices[np.argmax(non_o_probs)]
else:
pred_id = o_idx
predictions.append(id_to_label[pred_id])
redacted_tokens = []
for token, label in zip(tokens, predictions):
if label == "O":
redacted_tokens.append(token)
else:
redacted_tokens.append("[REDACTED]")
return tokenizer.convert_tokens_to_string(redacted_tokens)
# Test
sample_text = "Contact John Doe at john.doe@example.com or 123-456-7890."
redacted = redact_pii(sample_text)
print(f"Original: {sample_text}")
print(f"Redacted: {redacted}")