In signal processing, data compression, source coding, or bit-rate reduction involves encoding information using fewer bits than the original representation. Compression can be either lossy or lossless. Lossless compression reduces bits by identifying and eliminating statistical redundancy. No information is lost in lossless compression. Lossy compression reduces bits by removing unnecessary or less important information. [1]
Entropy is a measure of unpredictability, so its value is inversely related to the compression capacity of a chain of symbols (e.g. a file). [2]
# Load Python libraries
import numpy as np
import math
import timeit
from collections import Counter
from scipy.stats import entropy
from functools import reduce
# Load Plot libraries
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline
%config InlineBackend.figure_format = 'retina'
# Function - Read file in low level (Bytes)
def get_file_bytes(file_path):
with open(file_path, 'rb') as f:
return bytearray(f.read());
return None
# Loading target file
file_path = "../data/text/book1-en.txt"
text_byte_list = get_file_bytes(file_path)[:40000]
n_bytes = len(text_byte_list)
print('Original file size:', n_bytes, 'Bytes')
# For more information please visit: https://ansegura7.github.io/Algorithms/basics/AlgorithmsBasics.html
def fact_int_2(n):
nn = math.sqrt(n)
m = int(nn)
while m > 1:
if n % m == 0:
return (m, n // m)
m -= 1
return (1, n)
# Find the optimal matrix size
def find_optimal_matrix(n):
return fact_int_2(n);
# Function - Create byte matrix
def create_byte_matrix(byte_list, row_len, col_len):
matrix = np.array(byte_list).reshape(row_len, col_len)
return matrix;
# Function - Plot image in binary file
def plot_byte_matrix(matrix, plt_title):
fig, ax = plt.subplots(figsize = (14, 14))
sns.heatmap(matrix, ax = ax)
ax.set_title(plt_title, fontsize = 16)
ax.set_xlabel('columns', fontsize = 12)
ax.set_ylabel('rows', fontsize = 12)
plt.show()
# Get optimal size
row_len, col_len = find_optimal_matrix(n_bytes)
# Creating the byte matrix (row, col)
bytes_matrix = create_byte_matrix(text_byte_list, row_len, col_len)
bytes_matrix.shape
# Plotting original matrix
plot_byte_matrix(bytes_matrix, 'Original Byte Matrix')
# Function - Return shannon entropy
def entropy_shannon(symbols, base=None):
value, counts = np.unique(symbols, return_counts=True)
return entropy(counts, base=base)
# Function - Calculates the compression percentage (%)
def calc_compression_percentage(curr_size, new_size, precision = 14):
return round((curr_size - new_size) / curr_size * 100, precision)
# Calculate entropy of file
actual_entropy = entropy_shannon(text_byte_list, 2)
actual_entropy
# Theoretical compression percentage (%)
max_entropy = 8
compress_rate = calc_compression_percentage(max_entropy, actual_entropy)
print('Theoretical compression:', compress_rate, '%')
It will be calculated from the difference in size between the original file and the file compressed by Huffman.
# Class HuffmanCode from scratch
class HuffmanCode:
# Return a Huffman code for an ensemble with distribution p
def get_code(self, p_symbols):
# Init validation
n = len(p_symbols)
if n == 0:
return dict()
elif n == 1:
return dict(zip(p_symbols.keys(), ['1']))
# Ensure probabilities sum to 1
self._normalize_weights(p_symbols)
# Returns Huffman code
return self._get_code(p_symbols);
# (Private) Calculate Huffman code
def _get_code(self, p):
# Base case of only two symbols, assign 0 or 1 arbitrarily
if len(p) == 2:
return dict(zip(p.keys(), ['0', '1']))
# Create a new distribution by merging lowest prob pair
p_prime = p.copy()
s1, s2 = self._get_lowest_prob_pair(p)
p1, p2 = p_prime.pop(s1), p_prime.pop(s2)
p_prime[s1 + s2] = p1 + p2
# Recurse and construct code on new distribution
code = self._get_code(p_prime)
symbol = s1 + s2
s1s2 = code.pop(symbol)
code[s1], code[s2] = s1s2 + '0', s1s2 + '1'
return code;
# Return pair of symbols from distribution p with lowest probabilities
def _get_lowest_prob_pair(self, p):
# Ensure there are at least 2 symbols in the dist.
if len(p) >= 2:
sorted_p = sorted(p.items(), key=lambda x: x[1])
return sorted_p[0][0], sorted_p[1][0];
return (None, None);
# Makes sure all weights add up to 1
def _normalize_weights(self, p_symbols, t_weight=1.0):
n = sum(p_symbols.values())
if n != t_weight:
for s in p_symbols:
p_symbols[s] = p_symbols[s] / n;
# Create Huffman Code instance
hc = HuffmanCode()
# Function - Calculate code frequency
def get_term_freq(term_list):
term_freq = {}
terms_count = dict(Counter(term_list))
for key, value in terms_count.items():
if isinstance(key, int):
key = chr(key)
term_freq[key] = value
return term_freq;
# Function - Build the compress file
def create_compress_file(byte_list, code_list):
compress_list = []
for symbol in byte_list:
key = chr(symbol)
new_symbol = code_list[key]
compress_list.append(new_symbol)
# Return compress file
return "".join(compress_list)
# Function - Compressing file
def get_compress_file(byte_list):
# Get symbols frequency
term_freq = get_term_freq(byte_list)
# Normalize term frequency
n = sum(term_freq.values())
for term in term_freq:
term_freq[term] = term_freq[term] / n;
# Get Huffman coding
h_code = hc.get_code(term_freq)
# Compressing file with Huffman code
compress_file = create_compress_file(byte_list, h_code)
return compress_file, h_code;
# Compressing initial text file
compress_file, h_code = get_compress_file(text_byte_list)
# Real compression percentage (%)
curr_size = len(text_byte_list)
new_size = len(compress_file) / 8
compress_rate = calc_compression_percentage(curr_size, new_size)
print('Real compression:', compress_rate, '%')
The assumption is that by lowering (changing) the entropy of a file, it can be compressed more, than with its current entropy.
# Convert a byte list to a bits string
def byte_list_to_bit_string(byte_list):
bit_string = "".join([bin(byte)[2:].zfill(8) for byte in byte_list])
return bit_string
# Convert a bits string to a byte list
def bit_string_to_byte_list(bit_string):
byte_list = []
n = len(bit_string) // 8
for i in range(n):
ix_start = i * 8
ix_end = (i + 1) * 8
byte = bit_string[ix_start:ix_end]
byte_list.append(int(byte, 2))
return byte_list
# Update a string by index
def update_string(s, i, v):
ns = s[:i] + v + s[i+1:]
return ns
# Creates a Reversible Linear Cellular Automata rule
def create_rlca_rule(byte, r = 2):
rule = {}
r_size = 2 * r + 1
bits = bin(byte)[2:].zfill(r_size)
b_size = len(bits)
for i in range(0, b_size):
bin_value = bits[(b_size - i - 1)]
rule[i] = int(bin_value)
return rule
# Get automata cells value
def get_cells_value(automaton, ix, r, n):
value = 0
ix_start = max(ix - r, 0)
ix_end = min(ix + r + 1, n)
for i in range(ix_start, ix_end):
if i != ix:
value += int(automaton[i])
return value
# Apply Linear Cellular Automata rule to current automaton
def apply_lca_rule(automaton, rule, n_cell, r, forward):
start_time = timeit.default_timer()
new_automaton = []
curr_bit_string = (automaton + '.')[:-1]
if forward:
curr_range = range(0, n_cell, 1)
else:
curr_range = range(n_cell - 1, -1, -1)
for i in curr_range:
gen_ix = get_cells_value(curr_bit_string, i, r, n_cell)
cell = rule[gen_ix]
new_cell = str(int(not(int(curr_bit_string[i]) ^ cell)))
if new_cell != curr_bit_string[i]:
curr_bit_string = update_string(curr_bit_string, i, new_cell)
# Code new automaton and return it
new_automaton = bit_string_to_byte_list(curr_bit_string)
return new_automaton
# Restoring entropy process
def restore_entropy(byte_list, key, r, forward=False):
bit_string = byte_list_to_bit_string(byte_list)
byte_list = apply_lca_rule(bit_string, key, len(bit_string), r, forward)
return byte_list
# Step 1 - Create a RLCA rule
r = 3
key = create_rlca_rule(19, r)
key
# Step 2 - Have a byte array
a = list(np.random.randint(0, 256, 50))
print(a)
# Step 3 - Apply rule in forward way over a
b = restore_entropy(a, key, r, True)
print(b)
# Step 4 - Apply rule in backward way over b
c = restore_entropy(b, key, r, False)
print(c)
# Step 5 - Compare initial state vs final state
a == c
A greedy approach with lossless compression reduces bits will be used. Below, the functions that allow you to find the best entropy configuration for a binary file using the RLCA.
# Function - Find the low entropy setup for target file
def find_low_entropy(byte_list, r, verbose = False):
start_time = timeit.default_timer()
# Local variables
B = max(256, pow(2, 2 * r + 1))
bit_string = byte_list_to_bit_string(byte_list)
n_cell = len(bit_string)
best_entropy = 8
best_key = -1
best_byte_list = []
best_bit_string = ''
entropy_list = []
if verbose:
print('key size:', B)
print('neighborhood:', r)
print('file size:', len(byte_list), 'bytes')
print('file size:', n_cell, 'bits')
print('initial entropy:', best_entropy)
for curr_key in range(0, B):
# Create a rule
curr_rule = create_rlca_rule(curr_key, r)
# Apply specific rule
curr_byte_list = apply_lca_rule(bit_string, curr_rule, n_cell, r, True)
# Get current entropy
curr_entropy = entropy_shannon(curr_byte_list, 2)
entropy_list.append(curr_entropy)
# Update best option
if curr_entropy <= best_entropy:
if verbose:
print(' - key:', curr_key, ', new entropy:', curr_entropy, ', diff:', (curr_entropy - best_entropy))
best_entropy = curr_entropy
best_key = curr_key
best_byte_list = curr_byte_list
# Elapsed time
elapsed = timeit.default_timer() - start_time
if verbose:
print('iters:', (curr_key + 1), ', elapsed time', elapsed, 's')
# Save results
result = {}
result['best_option'] = {'byte_list': best_byte_list, 'entropy': best_entropy, 'key': create_rlca_rule(best_key, r)}
result['all_option'] = entropy_list
return result
Now, we try to find the best configuration of entropy applying Reversible Linear Cellular Automata:
# Find file with low entropy
r = 4
verbose = True
result = find_low_entropy(text_byte_list, r, verbose)
best_option = result['best_option']
all_options = result['all_option']
# Show best entropy setup
best_option['entropy'], best_option['key']
# Calculate entropy changes
entropy_delta = []
last_entropy = 8
for v in all_options:
entropy_delta.append(v - last_entropy)
last_entropy = v
# Create pretty x axis labels
def get_x_labels(n):
x_labels = []
for ix in range(n):
if ix % 10 == 0:
x_labels.append(str(ix))
else:
x_labels.append('')
return x_labels
# Function that plots the entropy trend
def plot_entropy_trend():
fig = plt.figure(figsize = (16, 5))
# Create chart
plt.axhline(y = best_option['entropy'], color = "green", linestyle = "-", linewidth=16, alpha=0.3)
plt.plot(all_options)
for i in range(len(all_options)):
if all_options[i] == best_option['entropy']:
plt.scatter(x = i, y = best_option['entropy'], s=50, color='steelblue')
# Additional settings
plt.title('Entropy Trend & Best Options')
plt.xlabel('RLCA Key', fontsize = 10)
plt.ylabel('Entropy', fontsize = 10)
plt.ylim(0, 8)
plt.show()
# Plotting the entropy changes
def plot_entropy_changes():
fig = plt.figure(figsize = (16, 5))
height = entropy_delta
bars = range(0, len(entropy_delta))
y_pos = np.arange(len(entropy_delta))
n = len(entropy_delta)
# Create chart
barlist = plt.bar(y_pos, height)
plt.xticks(y_pos, get_x_labels(n), fontsize = 10, rotation = 50)
for i in range(n):
if entropy_delta[i] > 0:
barlist[i].set_color('salmon')
# Additional settings
plt.title('Entropy Changes')
plt.xlabel('RLCA Key', fontsize = 10)
plt.ylabel('Entropy', fontsize = 10)
plt.show()
# Plotting the entropy trend
plot_entropy_trend()
# Plotting the entropy changes
plot_entropy_changes()
As you can see in the charts above, the entropy behavior is cyclic, however, there are 2 RLCA keys that generate the lowest entropy values for the file.
# Save best byte list
best_byte_list = best_option['byte_list']
# Create and plot the new matrix (with low entropy)
ori_matrix = create_byte_matrix(best_byte_list, row_len, col_len)
plot_byte_matrix(ori_matrix, 'Matrix of New File')
# Save the new file as txt
temp_file_path = file_path.replace('.txt', '-new.txt')
with open(temp_file_path, 'w+b') as f:
binary_format = bytearray(best_byte_list)
f.write(binary_format)
# Calculating new theoretical compression percentage
compress_rate = calc_compression_percentage(actual_entropy, best_option['entropy'])
print(compress_rate, '%')
Below, the process to reverse the entropy of file to the initial value.
# Call the entropy restoration function
rule = best_option['key']
new_byte_list = restore_entropy(best_byte_list, rule, r)
# Comparing size of byte lists
len(text_byte_list) == len(new_byte_list)
# Comparing values of byte lists
sum(text_byte_list) - sum(new_byte_list)
# Comparing entropies of byte lists
entropy_shannon(text_byte_list, 2) == entropy_shannon(new_byte_list, 2)
# Create and plot the new matrix (with low entropy)
ori_matrix = create_byte_matrix(new_byte_list, row_len, col_len)
plot_byte_matrix(ori_matrix, 'Matrix of Original File')
# Save the original file as txt
temp_file_path = file_path.replace('.txt', '-original.txt')
with open(temp_file_path, 'w+b') as f:
binary_format = bytearray(new_byte_list)
f.write(binary_format)