Compression & Entropy

  • Created by Andrés Segura Tinoco
  • Created on June 28, 2019

In signal processing, data compression, source coding, or bit-rate reduction involves encoding information using fewer bits than the original representation. Compression can be either lossy or lossless. Lossless compression reduces bits by identifying and eliminating statistical redundancy. No information is lost in lossless compression. Lossy compression reduces bits by removing unnecessary or less important information. [1]

Entropy is a measure of unpredictability, so its value is inversely related to the compression capacity of a chain of symbols (e.g. a file). [2]

A cellular automaton (abbrev. CA) is a discrete model studied in computer science, mathematics, physics, complexity science, theoretical biology and microstructure modeling. [3] [4]

1. Compression with current Entropy

In [1]:
# Load Python libraries
import numpy as np
import math
import timeit
from collections import Counter
from scipy.stats import entropy
from functools import reduce
In [2]:
# Load Plot libraries
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline
%config InlineBackend.figure_format = 'retina'

Loading and plotting data

In [3]:
# Function - Read file in low level (Bytes)
def get_file_bytes(file_path):
    with open(file_path, 'rb') as f:
        return bytearray(f.read());
    return None
In [4]:
# Loading target file
file_path = "../data/text/book1-en.txt"
text_byte_list = get_file_bytes(file_path)[:40000]
n_bytes = len(text_byte_list)
print('Original file size:', n_bytes, 'Bytes')
Original file size: 40000 Bytes
In [5]:
# For more information please visit: https://ansegura7.github.io/Algorithms/basics/AlgorithmsBasics.html
def fact_int_2(n):
    nn = math.sqrt(n)
    m = int(nn)
    
    while m > 1:
        if n % m == 0:
            return (m, n // m)
        m -= 1
    
    return (1, n)

# Find the optimal matrix size
def find_optimal_matrix(n):
    return fact_int_2(n);
In [6]:
# Function - Create byte matrix
def create_byte_matrix(byte_list, row_len, col_len):
    matrix = np.array(byte_list).reshape(row_len, col_len)
    return matrix;
In [7]:
# Function - Plot image in binary file
def plot_byte_matrix(matrix, plt_title):
    fig, ax = plt.subplots(figsize = (14, 14))
    sns.heatmap(matrix, ax = ax)
    ax.set_title(plt_title, fontsize = 16)
    ax.set_xlabel('columns', fontsize = 12)
    ax.set_ylabel('rows', fontsize = 12)
    plt.show()
In [8]:
# Get optimal size
row_len, col_len = find_optimal_matrix(n_bytes)

# Creating the byte matrix (row, col)
bytes_matrix = create_byte_matrix(text_byte_list, row_len, col_len)
bytes_matrix.shape
Out[8]:
(200, 200)
In [9]:
# Plotting original matrix
plot_byte_matrix(bytes_matrix, 'Original Byte Matrix')

Theoretical compression percentage

In [10]:
# Function - Return shannon entropy
def entropy_shannon(symbols, base=None):
    value, counts = np.unique(symbols, return_counts=True)
    return entropy(counts, base=base)
In [11]:
# Function - Calculates the compression percentage (%)
def calc_compression_percentage(curr_size, new_size, precision = 14):
    return round((curr_size - new_size) / curr_size * 100, precision)
In [12]:
# Calculate entropy of file
actual_entropy = entropy_shannon(text_byte_list, 2)
actual_entropy
Out[12]:
4.878636811101506
In [13]:
# Theoretical compression percentage (%)
max_entropy = 8
compress_rate = calc_compression_percentage(max_entropy, actual_entropy)
print('Theoretical compression:', compress_rate, '%')
Theoretical compression: 39.01703986123118 %

Real compression percentage

It will be calculated from the difference in size between the original file and the file compressed by Huffman.

In [14]:
# Class HuffmanCode from scratch
class HuffmanCode:
    
    # Return a Huffman code for an ensemble with distribution p
    def get_code(self, p_symbols):
        
        # Init validation
        n = len(p_symbols)
        if n == 0:
            return dict()
        elif n == 1:
            return dict(zip(p_symbols.keys(), ['1']))
        
        # Ensure probabilities sum to 1
        self._normalize_weights(p_symbols)
        
        # Returns Huffman code
        return self._get_code(p_symbols);
    
    # (Private) Calculate Huffman code
    def _get_code(self, p):
        
        # Base case of only two symbols, assign 0 or 1 arbitrarily
        if len(p) == 2:
            return dict(zip(p.keys(), ['0', '1']))
        
        # Create a new distribution by merging lowest prob pair
        p_prime = p.copy()
        s1, s2 = self._get_lowest_prob_pair(p)
        p1, p2 = p_prime.pop(s1), p_prime.pop(s2)
        p_prime[s1 + s2] = p1 + p2
        
        # Recurse and construct code on new distribution
        code = self._get_code(p_prime)
        symbol = s1 + s2
        s1s2 = code.pop(symbol)
        code[s1], code[s2] = s1s2 + '0', s1s2 + '1'
        
        return code;
    
    # Return pair of symbols from distribution p with lowest probabilities
    def _get_lowest_prob_pair(self, p):
        
        # Ensure there are at least 2 symbols in the dist.
        if len(p) >= 2:
            sorted_p = sorted(p.items(), key=lambda x: x[1])
            return sorted_p[0][0], sorted_p[1][0];
        
        return (None, None);
    
    # Makes sure all weights add up to 1
    def _normalize_weights(self, p_symbols, t_weight=1.0):
        n = sum(p_symbols.values())
        
        if n != t_weight:
            for s in p_symbols:
                p_symbols[s] = p_symbols[s] / n;

# Create Huffman Code instance
hc = HuffmanCode()
In [15]:
# Function - Calculate code frequency
def get_term_freq(term_list):
    term_freq = {}
    terms_count = dict(Counter(term_list))
    
    for key, value in terms_count.items():
        if isinstance(key, int):
            key = chr(key)
        term_freq[key] = value
    
    return term_freq;

# Function - Build the compress file
def create_compress_file(byte_list, code_list):
    compress_list = []
    
    for symbol in byte_list:
        key = chr(symbol)
        new_symbol = code_list[key]
        compress_list.append(new_symbol)
    
    # Return compress file
    return "".join(compress_list)

# Function - Compressing file
def get_compress_file(byte_list):
    
    # Get symbols frequency
    term_freq = get_term_freq(byte_list)
    
    # Normalize term frequency
    n = sum(term_freq.values())
    for term in term_freq:
        term_freq[term] = term_freq[term] / n;
    
    # Get Huffman coding
    h_code = hc.get_code(term_freq)
    
    # Compressing file with Huffman code
    compress_file = create_compress_file(byte_list, h_code)
            
    return compress_file, h_code;
In [16]:
# Compressing initial text file
compress_file, h_code = get_compress_file(text_byte_list)
In [17]:
# Real compression percentage (%)
curr_size = len(text_byte_list)
new_size = len(compress_file) / 8
compress_rate = calc_compression_percentage(curr_size, new_size)
print('Real compression:', compress_rate, '%')
Real compression: 38.4471875 %

2. Changing Entropy for higher Compression

The assumption is that by lowering (changing) the entropy of a file, it can be compressed more, than with its current entropy.

Util Functions: converts byte list to binary string and vice versa

In [18]:
# Convert a byte list to a bits string
def byte_list_to_bit_string(byte_list):
    bit_string = "".join([bin(byte)[2:].zfill(8) for byte in byte_list])
    return bit_string
In [19]:
# Convert a bits string to a byte list
def bit_string_to_byte_list(bit_string):
    byte_list = []
    n = len(bit_string) // 8
    for i in range(n):
        ix_start = i * 8
        ix_end = (i + 1) * 8
        byte = bit_string[ix_start:ix_end]
        byte_list.append(int(byte, 2))
    return byte_list
In [20]:
# Update a string by index
def update_string(s, i, v):
    ns = s[:i] + v + s[i+1:]
    return ns

Reversible Linear Cellular Automata

In [21]:
# Creates a Reversible Linear Cellular Automata rule
def create_rlca_rule(byte, r = 2):
    rule = {}
    r_size = 2 * r + 1
    bits = bin(byte)[2:].zfill(r_size)
    b_size = len(bits)
    
    for i in range(0, b_size):
        bin_value = bits[(b_size - i - 1)]
        rule[i] = int(bin_value)
    
    return rule
In [22]:
# Get automata cells value
def get_cells_value(automaton, ix, r, n):
    value = 0
    ix_start = max(ix - r, 0)
    ix_end = min(ix + r + 1, n)
    
    for i in range(ix_start, ix_end):
        if i != ix:
            value += int(automaton[i])
    
    return value
In [23]:
# Apply Linear Cellular Automata rule to current automaton
def apply_lca_rule(automaton, rule, n_cell, r, forward):
    start_time = timeit.default_timer()
    new_automaton = []
    curr_bit_string = (automaton + '.')[:-1]
    
    if forward:
        curr_range = range(0, n_cell, 1)
    else:
        curr_range = range(n_cell - 1, -1, -1)
    
    for i in curr_range:
        gen_ix = get_cells_value(curr_bit_string, i, r, n_cell)
        cell = rule[gen_ix]
        new_cell = str(int(not(int(curr_bit_string[i]) ^ cell)))
        
        if new_cell != curr_bit_string[i]:
            curr_bit_string = update_string(curr_bit_string, i, new_cell)
    
    # Code new automaton and return it
    new_automaton = bit_string_to_byte_list(curr_bit_string)
    return new_automaton
In [24]:
# Restoring entropy process
def restore_entropy(byte_list, key, r, forward=False):
    bit_string = byte_list_to_bit_string(byte_list)
    byte_list = apply_lca_rule(bit_string, key, len(bit_string), r, forward)
    return byte_list

Test Example of RLCA

In [25]:
# Step 1 - Create a RLCA rule
r = 3
key = create_rlca_rule(19, r)
key
Out[25]:
{0: 1, 1: 1, 2: 0, 3: 0, 4: 1, 5: 0, 6: 0}
In [26]:
# Step 2 - Have a byte array
a = list(np.random.randint(0, 256, 50))
print(a)
[44, 213, 105, 61, 18, 249, 101, 28, 80, 223, 114, 167, 54, 165, 192, 37, 161, 143, 176, 228, 204, 234, 207, 44, 197, 223, 77, 238, 211, 70, 168, 241, 131, 171, 56, 66, 222, 107, 161, 137, 88, 46, 24, 142, 223, 80, 136, 57, 138, 120]
In [27]:
# Step 3 - Apply rule in forward way over a
b = restore_entropy(a, key, r, True)
print(b)
[83, 254, 54, 6, 241, 167, 225, 179, 57, 0, 133, 95, 209, 80, 254, 210, 114, 215, 20, 89, 50, 49, 44, 255, 183, 97, 144, 17, 40, 99, 13, 24, 157, 88, 139, 188, 63, 144, 216, 230, 46, 135, 46, 118, 225, 167, 119, 198, 101, 175]
In [28]:
# Step 4 - Apply rule in backward way over b
c = restore_entropy(b, key, r, False)
print(c)
[44, 213, 105, 61, 18, 249, 101, 28, 80, 223, 114, 167, 54, 165, 192, 37, 161, 143, 176, 228, 204, 234, 207, 44, 197, 223, 77, 238, 211, 70, 168, 241, 131, 171, 56, 66, 222, 107, 161, 137, 88, 46, 24, 142, 223, 80, 136, 57, 138, 120]
In [29]:
# Step 5 - Compare initial state vs final state
a == c
Out[29]:
True

Real Example of RLCA

A greedy approach with lossless compression reduces bits will be used. Below, the functions that allow you to find the best entropy configuration for a binary file using the RLCA.

In [30]:
# Function - Find the low entropy setup for target file
def find_low_entropy(byte_list, r, verbose = False):
    start_time = timeit.default_timer()
    
    # Local variables
    B = max(256, pow(2, 2 * r + 1))
    bit_string = byte_list_to_bit_string(byte_list)
    n_cell = len(bit_string)
    best_entropy = 8
    best_key = -1
    best_byte_list = []
    best_bit_string = ''
    entropy_list = []
    
    if verbose:
        print('key size:', B)
        print('neighborhood:', r)
        print('file size:', len(byte_list), 'bytes')
        print('file size:', n_cell, 'bits')
        print('initial entropy:', best_entropy)
    
    for curr_key in range(0, B):
        
        # Create a rule
        curr_rule = create_rlca_rule(curr_key, r)
        
        # Apply specific rule
        curr_byte_list = apply_lca_rule(bit_string, curr_rule, n_cell, r, True)
        
        # Get current entropy
        curr_entropy = entropy_shannon(curr_byte_list, 2)
        entropy_list.append(curr_entropy)
        
        # Update best option
        if curr_entropy <= best_entropy:
            if verbose:
                print(' - key:', curr_key, ', new entropy:', curr_entropy, ', diff:', (curr_entropy - best_entropy))
            best_entropy = curr_entropy
            best_key = curr_key
            best_byte_list = curr_byte_list
    
    # Elapsed time
    elapsed = timeit.default_timer() - start_time
    if verbose:
        print('iters:', (curr_key + 1), ', elapsed time', elapsed, 's')
    
    # Save results
    result = {}
    result['best_option'] = {'byte_list': best_byte_list, 'entropy': best_entropy, 'key': create_rlca_rule(best_key, r)}
    result['all_option'] = entropy_list
    
    return result

Now, we try to find the best configuration of entropy applying Reversible Linear Cellular Automata:

In [31]:
# Find file with low entropy
r = 4
verbose = True
result = find_low_entropy(text_byte_list, r, verbose)
best_option = result['best_option']
all_options = result['all_option']
key size: 512
neighborhood: 4
file size: 40000 bytes
file size: 320000 bits
initial entropy: 8
 - key: 0 , new entropy: 4.878636811101507 , diff: -3.121363188898493
 - key: 511 , new entropy: 4.878636811101506 , diff: -8.881784197001252e-16
iters: 512 , elapsed time 4632.5416168 s
In [32]:
# Show best entropy setup
best_option['entropy'], best_option['key']
Out[32]:
(4.878636811101506, {0: 1, 1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 1})

Now, let's visualize the trend and entropy changes of the file when applying the RLCA

In [33]:
# Calculate entropy changes
entropy_delta = []
last_entropy = 8
for v in all_options:
    entropy_delta.append(v - last_entropy)
    last_entropy = v
In [34]:
# Create pretty x axis labels
def get_x_labels(n):
    x_labels = []
    for ix in range(n):
        if ix % 10 == 0:
            x_labels.append(str(ix))
        else:
            x_labels.append('')
    return x_labels
In [35]:
# Function that plots the entropy trend
def plot_entropy_trend():
    fig = plt.figure(figsize = (16, 5))
    
    # Create chart
    plt.axhline(y = best_option['entropy'], color = "green", linestyle = "-", linewidth=16, alpha=0.3)
    plt.plot(all_options)
    for i in range(len(all_options)):
        if all_options[i] == best_option['entropy']:
            plt.scatter(x = i, y = best_option['entropy'], s=50, color='steelblue')

    # Additional settings
    plt.title('Entropy Trend & Best Options')
    plt.xlabel('RLCA Key', fontsize = 10)
    plt.ylabel('Entropy', fontsize = 10)
    plt.ylim(0, 8)
    plt.show()
In [36]:
# Plotting the entropy changes
def plot_entropy_changes():
    fig = plt.figure(figsize = (16, 5))

    height = entropy_delta
    bars = range(0, len(entropy_delta))
    y_pos = np.arange(len(entropy_delta))
    n = len(entropy_delta)
    
    # Create chart
    barlist = plt.bar(y_pos, height)
    plt.xticks(y_pos, get_x_labels(n), fontsize = 10, rotation = 50)
    for i in range(n):
        if entropy_delta[i] > 0:
            barlist[i].set_color('salmon')
    
    # Additional settings
    plt.title('Entropy Changes')
    plt.xlabel('RLCA Key', fontsize = 10)
    plt.ylabel('Entropy', fontsize = 10)
    plt.show()
In [37]:
# Plotting the entropy trend
plot_entropy_trend()
In [38]:
# Plotting the entropy changes
plot_entropy_changes()

As you can see in the charts above, the entropy behavior is cyclic, however, there are 2 RLCA keys that generate the lowest entropy values for the file.

In [39]:
# Save best byte list
best_byte_list = best_option['byte_list']
In [40]:
# Create and plot the new matrix (with low entropy)
ori_matrix = create_byte_matrix(best_byte_list, row_len, col_len)
plot_byte_matrix(ori_matrix, 'Matrix of New File')
In [41]:
# Save the new file as txt
temp_file_path = file_path.replace('.txt', '-new.txt')
with open(temp_file_path, 'w+b') as f:
    binary_format = bytearray(best_byte_list)
    f.write(binary_format)

Theoretical compression percentage

In [42]:
# Calculating new theoretical compression percentage
compress_rate = calc_compression_percentage(actual_entropy, best_option['entropy'])
print(compress_rate, '%')
0.0 %

3. Restoring Entropy to Decompression

Below, the process to reverse the entropy of file to the initial value.

In [43]:
# Call the entropy restoration function
rule = best_option['key']
new_byte_list = restore_entropy(best_byte_list, rule, r)

Validation

In [44]:
# Comparing size of byte lists
len(text_byte_list) == len(new_byte_list)
Out[44]:
True
In [45]:
# Comparing values of byte lists
sum(text_byte_list) - sum(new_byte_list)
Out[45]:
0
In [46]:
# Comparing entropies of byte lists
entropy_shannon(text_byte_list, 2) == entropy_shannon(new_byte_list, 2)
Out[46]:
True

Plotting

In [47]:
# Create and plot the new matrix (with low entropy)
ori_matrix = create_byte_matrix(new_byte_list, row_len, col_len)
plot_byte_matrix(ori_matrix, 'Matrix of Original File')
In [48]:
# Save the original file as txt
temp_file_path = file_path.replace('.txt', '-original.txt')
with open(temp_file_path, 'w+b') as f:
    binary_format = bytearray(new_byte_list)
    f.write(binary_format)

References

[1] Wikipedia - Data Compression.
[2] Wikipedia - Entropy.
[3] Wikipedia - Cellular Automaton.
[4] Github - Algorithms.