From 92df6f6b4460602543c05bda4e5df72deff81276 Mon Sep 17 00:00:00 2001 From: Paul Aumann Date: Sat, 25 Nov 2023 20:23:04 +0100 Subject: [PATCH] Updated scripts and added new --- .gitignore | 163 +++++++++++++++++++++++++ binary_diff.py | 47 +++++++ challenge_evaluation.py | 263 ++++++++++++++++------------------------ init.py | 47 +++++++ machine_evaluation.py | 153 +++++++++-------------- rc4.py | 108 +++++++++++++++++ util.py | 36 ++++++ 7 files changed, 566 insertions(+), 251 deletions(-) create mode 100644 .gitignore create mode 100644 binary_diff.py create mode 100644 init.py create mode 100644 rc4.py create mode 100644 util.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0f31042 --- /dev/null +++ b/.gitignore @@ -0,0 +1,163 @@ +# Stuff +stuff/ + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ diff --git a/binary_diff.py b/binary_diff.py new file mode 100644 index 0000000..503b633 --- /dev/null +++ b/binary_diff.py @@ -0,0 +1,47 @@ +from argparse import ArgumentParser +from itertools import zip_longest +from pathlib import Path + +CHUNK_SIZE = 16 + + +def hl(b: str): + return f"\033[31;47;1m{b}\033[0m" + + +if __name__ == "__main__": + parser = ArgumentParser() + parser.add_argument("file1", type=Path, help="First file.") + parser.add_argument("file2", type=Path, help="Second file.") + args = parser.parse_args() + + # Read files + with open(args.file1, "rb") as fd1, open(args.file2, "rb") as fd2: + file1 = fd1.read() + file2 = fd2.read() + + # Find differences + diff = [b1 == b2 for b1, b2 in zip_longest(file1, file2, fillvalue=-1)] + + n1 = len(file1) + n2 = len(file2) + values1 = [f"{b:02X}" for b in file1] + values2 = [f"{b:02X}" for b in file2] + + # Highlight different values + values1 = [b if diff[i] else hl(b) for i, b in enumerate(values1)] + values2 = [b if diff[i] else hl(b) for i, b in enumerate(values2)] + + # Pad to same length + values1 = values1 + [" "] * (max(n1, n2) - n1) + values2 = values2 + [" "] * (max(n1, n2) - n2) + + # Print bytes in chunks + print() + print(args.file1.name, " " * ((3 * CHUNK_SIZE - 1) + 2 - len(args.file1.name)), args.file2.name) + print("-" * (3 * CHUNK_SIZE - 1), " ", "-" * (3 * CHUNK_SIZE - 1)) + for chunk in [slice(i, i + CHUNK_SIZE) for i in range(0, max(n1, n2), CHUNK_SIZE)]: + print(" ".join(values1[chunk]), " ", " ".join(values2[chunk])) + + print() + print(f"Files differ at {diff.count(False)} places.") diff --git a/challenge_evaluation.py b/challenge_evaluation.py index 8e580c7..780bf34 100644 --- a/challenge_evaluation.py +++ b/challenge_evaluation.py @@ -1,31 +1,37 @@ -import argparse -import zipfile import json -import os +import logging +from argparse import ArgumentParser +from pathlib import Path +from zipfile import ZipFile + +from util import is_valid_zip, select_zip # Costs +# fmt: off CONSTANT_COST = 1 REGISTER_COST = 1 OP_COST = { - "SUB": 4, - "ADD": 4, - "INC": 4, - "DEC": 4, - "MUL": 10, - "DIV": 10, - "MOD": 10, - "OR": 8, - "AND": 8, - "XOR": 8, - "INV": 8, - "SL": 5, - "SR": 5, - "SLU": 5, - "SRU": 5, - "ROTL": 7, - "ROTR": 7, + "SUB" : 4, + "ADD" : 4, + "INC" : 4, + "DEC" : 4, + "MUL" : 10, + "DIV" : 10, + "MOD" : 10, + "OR" : 8, + "AND" : 8, + "XOR" : 8, + "INV" : 8, + "SL" : 5, + "SR" : 5, + "SLU" : 5, + "SRU" : 5, + "ROTL": 7, + "ROTR": 7, } ALG_LINE_COST = 0.5 +BASE_OPERATIONS = ("A_ADD_B", "B_SUB_A", "TRANS_A", "TRANS_B") +# fmt: on # ansi escape codes ul = "\033[4m" # underline @@ -33,7 +39,7 @@ end = "\033[0m" # reset ylw = "\033[33m" # yellow -def is_empty_row(row, pedantic=False): +def is_empty_row(row: dict, pedantic: bool = False) -> bool: # Check if signal table is non-empty if "signal" in row and len(row["signal"]) != 0: return False @@ -41,23 +47,15 @@ def is_empty_row(row, pedantic=False): # Only check if keys are set if pedantic is true if not pedantic: return True + # Check if "unconditional-jump", "conditional-jump" or "label" is set keys = ("unconditional-jump", "conditional-jump", "label") return not any(key in row for key in keys) -def evaluate(filepath, verbose=False, pedantic=False): - filename = filepath.split(os.path.sep)[-1] - if not filepath.endswith(".zip"): - print( - f"{filename} :: Supplied file does not have .zip file extension. Skipping .." - ) - return -1 - - with zipfile.ZipFile(filepath, "r") as savefile: - with savefile.open("machine.json", "r") as machinefile, savefile.open( - "signal.json", "r" - ) as signalfile: +def evaluate(file: Path, pedantic: bool = False) -> float: + with ZipFile(file, "r") as savefile: + with savefile.open("machine.json", "r") as machinefile, savefile.open("signal.json", "r") as signalfile: machine = json.load(machinefile) signal = json.load(signalfile) @@ -65,83 +63,41 @@ def evaluate(filepath, verbose=False, pedantic=False): total_rows = signal["signaltable"]["row"] # Remove rows without effect (used for formatting for example) rows = [row for row in total_rows if not is_empty_row(row, pedantic=pedantic)] - if verbose: - print(f"{filename} :: Total number of rows: {len(total_rows)}") - print(f"{filename} :: Number of rows after excluding empty: {len(rows)}") + + logging.debug(f"{file.name} :: Total number of rows: {len(total_rows)}") + logging.debug(f"{file.name} :: Number of rows after excluding empty: {len(rows)}") # Check if IR or PC register was used + pc_used = any((signal["name"] == "PC.W" and signal["value"] == "1" for row in rows for signal in row["signal"])) + ir_used = any((signal["name"] == "IR.W" and signal["value"] == "1" for row in rows for signal in row["signal"])) - pc_used = any( - ( - signal["name"] == "PC.W" and signal["value"] == "1" - for row in rows - for signal in row["signal"] - ) - ) - ir_used = any( - ( - signal["name"] == "IR.W" and signal["value"] == "1" - for row in rows - for signal in row["signal"] - ) - ) - - if verbose: - if pc_used: - print(f"{filename} :: PC Register was used in signal table row.") - if ir_used: - print(f"{filename} :: IR Register was used in signal table row.") + if pc_used: + logging.debug(f"{file.name} :: PC Register was used in signal table row.") + if ir_used: + logging.debug(f"{file.name} :: IR Register was used in signal table row.") # Load used multiplexer constants try: - mux_input_a = next( - filter(lambda mux: mux["muxType"] == "A", machine["machine"]["muxInputs"]) - )["input"] - mux_consts_a = [ - int(mux_input["value"]) - for mux_input in mux_input_a - if mux_input["type"] == "constant" - ] + mux_input_a = next(filter(lambda mux: mux["muxType"] == "A", machine["machine"]["muxInputs"]))["input"] + mux_consts_a = [int(mux_input["value"]) for mux_input in mux_input_a if mux_input["type"] == "constant"] except StopIteration: - print( - f"{filename} :: Couldn't find input for multiplexer A. Is the file corrupted? Skipping file .." - ) - return -1 + logging.error(f"{file.name} :: Couldn't find input for multiplexer A. Is the file corrupted?") + exit(1) try: - mux_input_b = next( - filter(lambda mux: mux["muxType"] == "B", machine["machine"]["muxInputs"]) - )["input"] - mux_consts_b = [ - int(mux_input["value"]) - for mux_input in mux_input_b - if mux_input["type"] == "constant" - ] + mux_input_b = next(filter(lambda mux: mux["muxType"] == "B", machine["machine"]["muxInputs"]))["input"] + mux_consts_b = [int(mux_input["value"]) for mux_input in mux_input_b if mux_input["type"] == "constant"] except StopIteration: - print( - f"{filename} :: Couldn't find input for multiplexer B. Is the file corrupted? Skipping file .." - ) - return -1 + logging.error(f"{file.name} :: Couldn't find input for multiplexer B. Is the file corrupted?") + exit(1) # Base machine has constants 0 and 1 at multiplexer A. All other constants are extensions. - base_muxt_a = (0, 1) - for base_input in base_muxt_a: - try: - mux_consts_a.remove(base_input) - except ValueError: - pass + constants = set(mux_consts_a + mux_consts_b) - set((0, 1)) - constants = set(mux_consts_a + mux_consts_b) - - if verbose: - print( - f"{filename} :: Found {len(mux_consts_a)} constants for multiplexer A: {mux_consts_a}" - ) - print( - f"{filename} :: Found {len(mux_consts_b)} constants for multiplexer B: {mux_consts_b}" - ) - print( - f"{filename} :: Found {len(constants)} total unique constants: [{', '.join([str(c) for c in constants])}]" - ) + logging.debug(f"{file.name} :: Found {len(mux_consts_a)} constants for multiplexer A: {mux_consts_a}") + logging.debug(f"{file.name} :: Found {len(mux_consts_b)} constants for multiplexer B: {mux_consts_b}") + logging.debug( + f"{file.name} :: Found {len(constants)} total unique constants: [{', '.join([str(c) for c in constants])}]" + ) # Load used registers registers = machine["machine"]["registers"]["register"] @@ -152,13 +108,12 @@ def evaluate(filepath, verbose=False, pedantic=False): if ir_used: registers.append("IR_ALT") - if verbose: - print(f"{filename} :: Found {len(registers)} additional registers: {registers}") + logging.debug(f"{file.name} :: Found {len(registers)} additional registers: {registers}") # Load used operations operations = machine["machine"]["alu"]["operation"] - base_operations = ("A_ADD_B", "B_SUB_A", "TRANS_A", "TRANS_B") - for base_op in base_operations: + + for base_op in BASE_OPERATIONS: try: operations.remove(base_op) except ValueError: @@ -170,10 +125,7 @@ def evaluate(filepath, verbose=False, pedantic=False): operations = list(map(get_op, operations)) - if verbose: - print( - f"{filename} :: Found {len(operations)} additional operations: {operations}" - ) + logging.debug(f"{file.name} :: Found {len(operations)} additional operations: {operations}") # Sum points alg_line_costs = len(rows) * ALG_LINE_COST # every line of code @@ -187,39 +139,36 @@ def evaluate(filepath, verbose=False, pedantic=False): # Summarize costs = (alg_line_costs, constant_costs, register_costs, operation_costs, total) - precision = max( - [len(str(float(cost)).split(".")[1].lstrip("0")) for cost in costs] - ) # unreadable but works ¯\_(ツ)_/¯ + precision = max([len(str(float(cost)).split(".")[1].lstrip("0")) for cost in costs]) + # ^ unreadable but works ¯\_(ツ)_/¯ - if verbose: - print("") + logging.debug("") - print(f"{ul}Summary for {filename}:{end}\n") - print(f" {alg_line_costs:5.{min(precision, 2)}f} LINES (excluding empty lines)") - print(f"+ {constant_costs:5.{min(precision, 2)}f} CONSTANTS") - print(f"+ {register_costs:5.{min(precision, 2)}f} REGISTERS") - print(f"+ {operation_costs:5.{min(precision, 2)}f} OPERATIONS") - print(f"-------------") - print(f"= {ylw}{total:5.{min(precision, 2)}f} TOTAL{end}\n\n") + logging.info(f"{ul}Summary for {file.name}:{end}\n") + logging.info(f" {alg_line_costs:5.{min(precision, 2)}f} LINES (excluding empty lines)") + logging.info(f"+ {constant_costs:5.{min(precision, 2)}f} CONSTANTS") + logging.info(f"+ {register_costs:5.{min(precision, 2)}f} REGISTERS") + logging.info(f"+ {operation_costs:5.{min(precision, 2)}f} OPERATIONS") + logging.info(f"-------------") + logging.info(f"= {ylw}{total:5.{min(precision, 2)}f} TOTAL{end}\n\n") return total -# Evaluation if __name__ == "__main__": - parser = argparse.ArgumentParser() + parser = ArgumentParser() parser.add_argument( - "source", - type=str, + "submissions", + type=Path, nargs="+", - help="Either ZIP file(s) generated by simulator or the submission root folder", + help="One or more submission root folder", ) parser.add_argument( "-v", "--verbose", dest="verbose", action="store_true", - help="Prints additional information.", + help="Prints additional information", ) parser.add_argument( "-p", @@ -229,48 +178,46 @@ if __name__ == "__main__": help="Extra pedantic (for example when checking for empty lines)", ) parser.add_argument( - "-t", - "--top", - dest="top", - type=int, - help="Print top n candidates (defaults to 7)", + "-r", + "--rename", + dest="rename", + action="store_true", + help="Rename ZIP files to be the same name as its group directory", ) args = parser.parse_args() - verbose = args.verbose - pedantic = args.pedantic - top_n = args.top if args.top != None else 7 + # Logger setup (DEBUG used for verbose output) + if args.verbose: + logging.basicConfig(format="%(message)s", level=logging.DEBUG) + else: + logging.basicConfig(format="%(message)s", level=logging.INFO) - # output score for each file + # Output score for each file scores = [] - # check if source argument is folder + # Gather simulator files savefiles = [] - for source in args.source: - if os.path.isdir(source): - # add all zip from subdirectories - for d in [ - e for e in os.listdir(source) if os.path.isdir(os.path.join(source, e)) - ]: - savefiles.append(os.path.join(source, d, f"{d}.zip")) - elif source.endswith(".zip"): - savefiles.append(source) - else: - print(f"Source '{source}' is not a ZIP file.") + for submissions in args.submissions: + for group in [f for f in submissions.iterdir() if f.is_dir()]: + # Find submission ZIP file + zips = [file for file in group.glob("*.zip") if is_valid_zip(file)] + if len(zips) == 0: + logging.error(f"Could not find valid ZIP file for {group.name}") + savefile = zips[0] if len(zips) == 1 else select_zip(zips) + # Rename if required + if args.rename and savefile.stem != group.stem: + savefile = savefile.rename((savefile.parent / group.stem).with_suffix(".zip")) + savefiles.append(savefile) + + # Evaluate for savefile in savefiles: - score = evaluate(savefile, verbose=verbose, pedantic=pedantic) - if score == -1: - continue + score = evaluate(savefile, pedantic=args.pedantic) + scores.append((savefile, score)) - scores.append([savefile, score]) - - # if there is more than one file, output top 3 + # Print leaderboard scores.sort(key=lambda x: x[1]) - - n = len(savefiles) - if n > 1: - print(f"{ul}Leaderboard:{end}") - for i in range(min(n, top_n)): - file, score = scores[i] - print(f"#{i + 1} - {ylw}{score:5.2f} TOTAL{end} - {file}") + if len(savefiles) > 1: + logging.info(f"{ul}Leaderboard:{end}") + for index, (file, score) in enumerate(scores, start=1): + logging.info(f"#{index} - {ylw}{score:5.2f} TOTAL{end} - {file.name}") diff --git a/init.py b/init.py new file mode 100644 index 0000000..e34b946 --- /dev/null +++ b/init.py @@ -0,0 +1,47 @@ +import shutil +from argparse import ArgumentParser +from pathlib import Path +from random import randbytes + +from rc4 import rc4 + + +def generate_key(key_length: int) -> bytes: + """ + Generates a random key with a length of key_length bytes + """ + return randbytes(key_length) + + +if __name__ == "__main__": + parser = ArgumentParser() + parser.add_argument("group", type=Path, help="Group root directory") + parser.add_argument("file", type=Path, help="File to decrypt for this group") + key_opts = parser.add_mutually_exclusive_group(required=True) + key_opts.add_argument( + "--key-file", + "-k", + dest="key_file", + type=Path, + help="Key source file for this group", + ) + key_opts.add_argument( + "--key-length", + "-l", + dest="key_length", + type=int, + help="Key length for this group", + ) + args = parser.parse_args() + + # Create required files in group directory + shutil.copy(args.file, args.group / "data_decrypted") + if args.key_file is not None: + try: + shutil.copy(args.key_file, args.group / "key") + except shutil.SameFileError: + pass + else: + with open(args.group / "key", "wb") as key_file: + key_file.write(generate_key(args.key_length)) + rc4(args.group / "data_decrypted", args.group / "data_encrypted", args.group / "key") diff --git a/machine_evaluation.py b/machine_evaluation.py index c9a2770..6c8076f 100644 --- a/machine_evaluation.py +++ b/machine_evaluation.py @@ -1,11 +1,12 @@ import argparse import json import math -import sys import os import subprocess +import sys +from pathlib import Path -from zipfile import ZipFile +from util import is_valid_zip, select_zip """ Checks if all submission output the correct decrypted data. @@ -33,21 +34,31 @@ from zipfile import ZipFile If more than one simulator ZIP file is present, the user will also be prompted to choose. """ -SIMULATOR_PATH = "./minimax_simulator-2.0.0-cli.jar" +SIMULATOR_PATH = Path("./minimax_simulator-2.0.0-cli.jar") # helper strings OK = "\033[32mOK\033[0m" ERROR = "\033[31mERROR\033[0m" -def compare_result(actual_file: str, expected_file: str) -> bool: +def ul(s: str) -> str: + """ + Adds ansi escape sequences to underline string. + """ + return f"\033[4m{s}\033[0m" + + +def compare_result(actual_file: Path, expected_file: Path) -> bool: """ Compares the file at path 'actual_file' with the file at path 'expected_file' bytewise. If the actual file is larger than the expected file, additional bytes will be ignored. """ - with open(actual_file, "rb") as actual, open(expected_file, "rb") as expected: - actual_bytes = actual.read() - expected_bytes = expected.read() + try: + with open(actual_file, "rb") as actual, open(expected_file, "rb") as expected: + actual_bytes = actual.read() + expected_bytes = expected.read() + except FileNotFoundError: + return False return actual_bytes[: len(expected_bytes)] == expected_bytes @@ -73,47 +84,14 @@ def create_mem_layout() -> dict: return mem_layout -def is_valid_zip(file: str) -> bool: - """ - Checks if a zip file is a save file from the minimax simulator, e.g. if the contents - are a 'machine.json' and 'signal.json' file. - """ - if not file.endswith(".zip"): - return False - with ZipFile(file) as machine_zip: - zip_content = machine_zip.namelist() - return set(zip_content) == set(("machine.json", "signal.json")) - - -def select_zip(zips: list) -> str: - """ - Prompts the user to select a single zip file from a list, and returns it. - """ - print("Multiple zip files found. Please select one:") - for index, f in enumerate(zips, start=1): - print(f"[{index}] {f}") - while True: - try: - selection = input("Enter the number of the zip file to select: ") - selection = int(selection) - 1 - if selection <= 0 or selection > len(zips): - print(f"Please select a number between 1 and {len(zips)}.") - else: - return zips[selection] - except ValueError: - print("Please enter a valid integer.") - except KeyboardInterrupt: - sys.exit("Aborted") - - def evaluate( - zip_file: str, - sbox_file: str, - key_file: str, - data_file: str, - result_file: str, + zip_file: Path, + sbox_file: Path, + key_file: Path, + data_file: Path, + result_file: Path, mem_layout: dict, - simulator: str, + simulator: Path, ) -> None: """ Runs the minimax simulator on the given input. The resulting file is saved in 'result_file'. @@ -138,36 +116,38 @@ def evaluate( "--export-file", result_file, "--export-from", - mem_layout["data"], + mem_layout.get("result", mem_layout["data"]), "--export-to", - mem_layout["data"] + math.ceil(os.path.getsize(data_file)), + mem_layout.get("result", mem_layout["data"]) + math.ceil(os.path.getsize(data_file) / 4), ] - args = [ - str(arg) for arg in args - ] # subprocess.run requires all arguments to be strings + if "result" in mem_layout: + print("Decryption was not done in-place.") + + args = [str(arg) for arg in args] + # subprocess.run requires all arguments to be strings print(f"Running simulator, storing result in '{result_file}'") - - result = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - print("\033[38;5;245m") - print(result.stdout.decode("utf-8")) - - print("\033[38;5;124") - - print(result.stderr.decode("utf-8")) + subprocess.run(args, stdout=sys.stdout, stderr=sys.stderr) print("\033[0m") if __name__ == "__main__": # only run if executed as script - - parser = argparse.ArgumentParser() - parser.add_argument("submissions", type=str, help="Submissions root directory") + parser = argparse.ArgumentParser( + description="Runs the simulator on all projects found in the submissions directory." + ) parser.add_argument( - "group", type=str, help="Group directory, contains all project files" + "submissions", + type=Path, + help="Submissions root directory", + ) + parser.add_argument( + "group", + type=Path, + help="Group directory, contains all project files", ) parser.add_argument( "-e", @@ -176,24 +156,20 @@ if __name__ == "__main__": # only run if executed as script type=str, help="Result file extension", ) - parser.add_argument("-j", "--jar", dest="jar", type=str, help="Simulator jar file") + parser.add_argument("-j", "--jar", dest="jar", type=Path, help="Simulator jar file") args = parser.parse_args() # Load teams - teams = [ - e - for e in os.listdir(args.submissions) - if os.path.isdir(os.path.join(args.submissions, e)) - ] + teams = [e for e in args.submissions.iterdir() if e.is_dir()] print(f"The following teams were found:") for team in teams: - print(f"* {team}") + print(f"* {team.name}") # Check directory structure - for file in ("sBox", "key", "data_encrypted", "data_decrypted"): - if not os.path.exists(os.path.join(args.group, file)): - sys.exit(f"Group project file '{file}' is missing.") + for filename in ("sBox", "key", "data_encrypted", "data_decrypted"): + if not (args.group / filename).exists(): + sys.exit(f"Group project file '{filename}' is missing.") # Check file extension if args.file_ext is None: @@ -205,31 +181,24 @@ if __name__ == "__main__": # only run if executed as script simulator = SIMULATOR_PATH if args.jar is None else args.jar # Store evaluation results - expected_file = os.path.join(args.group, "data_decrypted") + expected_file = args.group / "data_decrypted" results = [] # Evaluate each team for team in teams: + print(ul(f"Evaluating team '{team.name}'")) # load memory layout file if available (otherwise create and store it) try: - with open( - os.path.join(args.submissions, team, "mem_layout.json"), "r" - ) as mem_layout_file: + with open(team / "mem_layout.json", "r") as mem_layout_file: mem_layout = json.load(mem_layout_file) except FileNotFoundError: mem_layout = create_mem_layout() - with open( - os.path.join(args.submissions, team, "mem_layout.json"), "w" - ) as mem_layout_file: + with open(team / "mem_layout.json", "w") as mem_layout_file: json.dump(mem_layout, mem_layout_file) # Select project file (if more there is more than one zip file) - zip_files = [ - os.path.join(args.submissions, team, f) - for f in os.listdir(os.path.join(args.submissions, team)) - if is_valid_zip(os.path.join(args.submissions, team, f)) - ] + zip_files = [file for file in team.glob("*.zip") if is_valid_zip(file)] zip_file = zip_files[0] if len(zip_files) == 1 else select_zip(zip_files) # check memory layout and convert hexadecimal addresses @@ -246,12 +215,10 @@ if __name__ == "__main__": # only run if executed as script mem_layout[key] = addr # evaluate team - sbox_file = os.path.join(args.group, "sBox") - key_file = os.path.join(args.group, "key") - data_file = os.path.join(args.group, "data_encrypted") - result_file = os.path.join( - args.submissions, team, f"data_decrypted{args.file_ext}" - ) + sbox_file = args.group / "sBox" + key_file = args.group / "key" + data_file = args.group / "data_encrypted" + result_file = team / f"data_decrypted{args.file_ext}" evaluate( zip_file, @@ -270,8 +237,8 @@ if __name__ == "__main__": # only run if executed as script print("Summary:") for team, result in zip(teams, results): if result is True: - print(f"[{OK}] - {team}") + print(f"[{OK}] - {team.name}") else: - print(f"[{ERROR}] - {team}") + print(f"[{ERROR}] - {team.name}") print() diff --git a/rc4.py b/rc4.py new file mode 100644 index 0000000..fab105a --- /dev/null +++ b/rc4.py @@ -0,0 +1,108 @@ +import logging +from argparse import ArgumentParser +from pathlib import Path + +_VERBOSE_SBOX = False +_VERBOSE_PRGA = False +_STEPWISE_PRGA = False + + +def sbox(key: list[int]) -> list[int]: + """ + Creates and permutates the substitution box given a key. + """ + # create sbox + S = [i for i in range(256)] + + # permutate sbox + j = 0 + for i in range(256): + j = (j + S[i] + key[i % len(key)]) % 256 + if _VERBOSE_SBOX: + print(f"Swapping S[{i=:3d}] = {S[i]:3d} and S[{j=:3d}] = {S[j]:3d}.") + S[i], S[j] = S[j], S[i] # swap + + return S + + +def prga(text: list[int], S: list[int]) -> list[int]: + """ + Encrypts/Decrypts text given a substitution box. + """ + text = list(text) + i = 0 + j = 0 + for x in range(len(text)): + i = (i + 1) % 256 + j = (j + S[i]) % 256 + + if _VERBOSE_PRGA: + print(f"Swapping S[{i=:3d}] = {S[i]:3d} and S[{j=:3d}] = {S[j]:3d}.") + + if _STEPWISE_PRGA: + try: + input("Press ENTER to continue ...") # Step through + except KeyboardInterrupt: + print("KeyboardInterrupt.") + exit(0) + + S[i], S[j] = S[j], S[i] # swap + K = S[(S[i] + S[j]) % 256] + text[x] ^= K + return text + + +def convert(text: bytes | list[int]) -> bytes | list[int]: + """ + Converts text from bytes -> list[int] and back. + """ + if isinstance(text, bytes): + return [int(c) for c in text] + elif isinstance(text, list): + return bytes(text) + else: + raise ValueError(f"Unsupported input type '{type(text)}'") + + +def rc4(in_file: str | Path, out_file: str | Path, key_file: str | Path): + """ + Execute the RC4 encryption/decryption algorithm on the input file, + creating the output file using the contents from the key file. + """ + + with open(key_file, "rb") as key_file: + S = sbox(convert(key_file.read())) + + with open("sbox_solution", "wb") as fd: + fd.write(convert(S)) + + # read input + with open(in_file, "rb") as data_in_file: + data_in = data_in_file.read() + + data_in = convert(data_in) # bytes -> list[int] + data_out = prga(data_in, S) + data_out = convert(data_out) # list[int] -> bytes + + # write output + with open(out_file, "wb") as data_out_file: + data_out_file.write(data_out) + + +if __name__ == "__main__": + parser = ArgumentParser( + description="Encrypts plaintext using the RC4 encryption algorithm, or decrypts RC4 ciphertext (based on input)" + ) + parser.add_argument("input", type=str, help="Input file") + parser.add_argument("key", type=str, help="Key file") + parser.add_argument("output", type=str, help="Output file") + parser.add_argument("--verbose-sbox", const=True, default=False, action="store_const", help="Show S-Box swaps") + parser.add_argument("--verbose-prga", const=True, default=False, action="store_const", help="Show PRGA swaps") + parser.add_argument("--step-prga", const=True, default=False, action="store_const", help="Step-by-step PRGA swaps") + args = parser.parse_args() + + _VERBOSE_SBOX = args.verbose_sbox + _VERBOSE_PRGA = args.verbose_prga + _STEPWISE_PRGA = args.step_prga + + rc4(args.input, args.output, args.key) diff --git a/util.py b/util.py new file mode 100644 index 0000000..94ab065 --- /dev/null +++ b/util.py @@ -0,0 +1,36 @@ +import sys +from pathlib import Path +from zipfile import ZipFile + + +def is_valid_zip(file: Path) -> bool: + """ + Checks if a zip file is a save file from the minimax simulator, e.g. if the contents + are a 'machine.json' and 'signal.json' file. + """ + if not file.suffix == ".zip": + return False + with ZipFile(file) as machine_zip: + zip_content = machine_zip.namelist() + return set(zip_content) == set(("machine.json", "signal.json")) + + +def select_zip(zips: list[Path]) -> Path: + """ + Prompts the user to select a single zip file from a list, and returns it. + """ + print("Multiple zip files found. Please select one:") + for index, f in enumerate(zips, start=1): + print(f"[{index}] {f.name}") + while True: + try: + selection = input("Enter the number of the zip file to select: ") + selection = int(selection) - 1 + if selection <= 0 or selection > len(zips): + print(f"Please select a number between 1 and {len(zips)}.") + else: + return zips[selection] + except ValueError: + print("Please enter a valid integer.") + except KeyboardInterrupt: + sys.exit("Aborted")