Updated scripts and added new

This commit is contained in:
2023-11-25 20:23:04 +01:00
parent a5153cc997
commit 92df6f6b44
7 changed files with 566 additions and 251 deletions

163
.gitignore vendored Normal file
View File

@ -0,0 +1,163 @@
# Stuff
stuff/
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

47
binary_diff.py Normal file
View File

@ -0,0 +1,47 @@
from argparse import ArgumentParser
from itertools import zip_longest
from pathlib import Path
CHUNK_SIZE = 16
def hl(b: str):
return f"\033[31;47;1m{b}\033[0m"
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument("file1", type=Path, help="First file.")
parser.add_argument("file2", type=Path, help="Second file.")
args = parser.parse_args()
# Read files
with open(args.file1, "rb") as fd1, open(args.file2, "rb") as fd2:
file1 = fd1.read()
file2 = fd2.read()
# Find differences
diff = [b1 == b2 for b1, b2 in zip_longest(file1, file2, fillvalue=-1)]
n1 = len(file1)
n2 = len(file2)
values1 = [f"{b:02X}" for b in file1]
values2 = [f"{b:02X}" for b in file2]
# Highlight different values
values1 = [b if diff[i] else hl(b) for i, b in enumerate(values1)]
values2 = [b if diff[i] else hl(b) for i, b in enumerate(values2)]
# Pad to same length
values1 = values1 + [" "] * (max(n1, n2) - n1)
values2 = values2 + [" "] * (max(n1, n2) - n2)
# Print bytes in chunks
print()
print(args.file1.name, " " * ((3 * CHUNK_SIZE - 1) + 2 - len(args.file1.name)), args.file2.name)
print("-" * (3 * CHUNK_SIZE - 1), " ", "-" * (3 * CHUNK_SIZE - 1))
for chunk in [slice(i, i + CHUNK_SIZE) for i in range(0, max(n1, n2), CHUNK_SIZE)]:
print(" ".join(values1[chunk]), " ", " ".join(values2[chunk]))
print()
print(f"Files differ at {diff.count(False)} places.")

View File

@ -1,31 +1,37 @@
import argparse
import zipfile
import json import json
import os import logging
from argparse import ArgumentParser
from pathlib import Path
from zipfile import ZipFile
from util import is_valid_zip, select_zip
# Costs # Costs
# fmt: off
CONSTANT_COST = 1 CONSTANT_COST = 1
REGISTER_COST = 1 REGISTER_COST = 1
OP_COST = { OP_COST = {
"SUB": 4, "SUB" : 4,
"ADD": 4, "ADD" : 4,
"INC": 4, "INC" : 4,
"DEC": 4, "DEC" : 4,
"MUL": 10, "MUL" : 10,
"DIV": 10, "DIV" : 10,
"MOD": 10, "MOD" : 10,
"OR": 8, "OR" : 8,
"AND": 8, "AND" : 8,
"XOR": 8, "XOR" : 8,
"INV": 8, "INV" : 8,
"SL": 5, "SL" : 5,
"SR": 5, "SR" : 5,
"SLU": 5, "SLU" : 5,
"SRU": 5, "SRU" : 5,
"ROTL": 7, "ROTL": 7,
"ROTR": 7, "ROTR": 7,
} }
ALG_LINE_COST = 0.5 ALG_LINE_COST = 0.5
BASE_OPERATIONS = ("A_ADD_B", "B_SUB_A", "TRANS_A", "TRANS_B")
# fmt: on
# ansi escape codes # ansi escape codes
ul = "\033[4m" # underline ul = "\033[4m" # underline
@ -33,7 +39,7 @@ end = "\033[0m" # reset
ylw = "\033[33m" # yellow ylw = "\033[33m" # yellow
def is_empty_row(row, pedantic=False): def is_empty_row(row: dict, pedantic: bool = False) -> bool:
# Check if signal table is non-empty # Check if signal table is non-empty
if "signal" in row and len(row["signal"]) != 0: if "signal" in row and len(row["signal"]) != 0:
return False return False
@ -41,23 +47,15 @@ def is_empty_row(row, pedantic=False):
# Only check if keys are set if pedantic is true # Only check if keys are set if pedantic is true
if not pedantic: if not pedantic:
return True return True
# Check if "unconditional-jump", "conditional-jump" or "label" is set # Check if "unconditional-jump", "conditional-jump" or "label" is set
keys = ("unconditional-jump", "conditional-jump", "label") keys = ("unconditional-jump", "conditional-jump", "label")
return not any(key in row for key in keys) return not any(key in row for key in keys)
def evaluate(filepath, verbose=False, pedantic=False): def evaluate(file: Path, pedantic: bool = False) -> float:
filename = filepath.split(os.path.sep)[-1] with ZipFile(file, "r") as savefile:
if not filepath.endswith(".zip"): with savefile.open("machine.json", "r") as machinefile, savefile.open("signal.json", "r") as signalfile:
print(
f"{filename} :: Supplied file does not have .zip file extension. Skipping .."
)
return -1
with zipfile.ZipFile(filepath, "r") as savefile:
with savefile.open("machine.json", "r") as machinefile, savefile.open(
"signal.json", "r"
) as signalfile:
machine = json.load(machinefile) machine = json.load(machinefile)
signal = json.load(signalfile) signal = json.load(signalfile)
@ -65,83 +63,41 @@ def evaluate(filepath, verbose=False, pedantic=False):
total_rows = signal["signaltable"]["row"] total_rows = signal["signaltable"]["row"]
# Remove rows without effect (used for formatting for example) # Remove rows without effect (used for formatting for example)
rows = [row for row in total_rows if not is_empty_row(row, pedantic=pedantic)] rows = [row for row in total_rows if not is_empty_row(row, pedantic=pedantic)]
if verbose:
print(f"{filename} :: Total number of rows: {len(total_rows)}") logging.debug(f"{file.name} :: Total number of rows: {len(total_rows)}")
print(f"{filename} :: Number of rows after excluding empty: {len(rows)}") logging.debug(f"{file.name} :: Number of rows after excluding empty: {len(rows)}")
# Check if IR or PC register was used # Check if IR or PC register was used
pc_used = any((signal["name"] == "PC.W" and signal["value"] == "1" for row in rows for signal in row["signal"]))
ir_used = any((signal["name"] == "IR.W" and signal["value"] == "1" for row in rows for signal in row["signal"]))
pc_used = any( if pc_used:
( logging.debug(f"{file.name} :: PC Register was used in signal table row.")
signal["name"] == "PC.W" and signal["value"] == "1" if ir_used:
for row in rows logging.debug(f"{file.name} :: IR Register was used in signal table row.")
for signal in row["signal"]
)
)
ir_used = any(
(
signal["name"] == "IR.W" and signal["value"] == "1"
for row in rows
for signal in row["signal"]
)
)
if verbose:
if pc_used:
print(f"{filename} :: PC Register was used in signal table row.")
if ir_used:
print(f"{filename} :: IR Register was used in signal table row.")
# Load used multiplexer constants # Load used multiplexer constants
try: try:
mux_input_a = next( mux_input_a = next(filter(lambda mux: mux["muxType"] == "A", machine["machine"]["muxInputs"]))["input"]
filter(lambda mux: mux["muxType"] == "A", machine["machine"]["muxInputs"]) mux_consts_a = [int(mux_input["value"]) for mux_input in mux_input_a if mux_input["type"] == "constant"]
)["input"]
mux_consts_a = [
int(mux_input["value"])
for mux_input in mux_input_a
if mux_input["type"] == "constant"
]
except StopIteration: except StopIteration:
print( logging.error(f"{file.name} :: Couldn't find input for multiplexer A. Is the file corrupted?")
f"{filename} :: Couldn't find input for multiplexer A. Is the file corrupted? Skipping file .." exit(1)
)
return -1
try: try:
mux_input_b = next( mux_input_b = next(filter(lambda mux: mux["muxType"] == "B", machine["machine"]["muxInputs"]))["input"]
filter(lambda mux: mux["muxType"] == "B", machine["machine"]["muxInputs"]) mux_consts_b = [int(mux_input["value"]) for mux_input in mux_input_b if mux_input["type"] == "constant"]
)["input"]
mux_consts_b = [
int(mux_input["value"])
for mux_input in mux_input_b
if mux_input["type"] == "constant"
]
except StopIteration: except StopIteration:
print( logging.error(f"{file.name} :: Couldn't find input for multiplexer B. Is the file corrupted?")
f"{filename} :: Couldn't find input for multiplexer B. Is the file corrupted? Skipping file .." exit(1)
)
return -1
# Base machine has constants 0 and 1 at multiplexer A. All other constants are extensions. # Base machine has constants 0 and 1 at multiplexer A. All other constants are extensions.
base_muxt_a = (0, 1) constants = set(mux_consts_a + mux_consts_b) - set((0, 1))
for base_input in base_muxt_a:
try:
mux_consts_a.remove(base_input)
except ValueError:
pass
constants = set(mux_consts_a + mux_consts_b) logging.debug(f"{file.name} :: Found {len(mux_consts_a)} constants for multiplexer A: {mux_consts_a}")
logging.debug(f"{file.name} :: Found {len(mux_consts_b)} constants for multiplexer B: {mux_consts_b}")
if verbose: logging.debug(
print( f"{file.name} :: Found {len(constants)} total unique constants: [{', '.join([str(c) for c in constants])}]"
f"{filename} :: Found {len(mux_consts_a)} constants for multiplexer A: {mux_consts_a}" )
)
print(
f"{filename} :: Found {len(mux_consts_b)} constants for multiplexer B: {mux_consts_b}"
)
print(
f"{filename} :: Found {len(constants)} total unique constants: [{', '.join([str(c) for c in constants])}]"
)
# Load used registers # Load used registers
registers = machine["machine"]["registers"]["register"] registers = machine["machine"]["registers"]["register"]
@ -152,13 +108,12 @@ def evaluate(filepath, verbose=False, pedantic=False):
if ir_used: if ir_used:
registers.append("IR_ALT") registers.append("IR_ALT")
if verbose: logging.debug(f"{file.name} :: Found {len(registers)} additional registers: {registers}")
print(f"{filename} :: Found {len(registers)} additional registers: {registers}")
# Load used operations # Load used operations
operations = machine["machine"]["alu"]["operation"] operations = machine["machine"]["alu"]["operation"]
base_operations = ("A_ADD_B", "B_SUB_A", "TRANS_A", "TRANS_B")
for base_op in base_operations: for base_op in BASE_OPERATIONS:
try: try:
operations.remove(base_op) operations.remove(base_op)
except ValueError: except ValueError:
@ -170,10 +125,7 @@ def evaluate(filepath, verbose=False, pedantic=False):
operations = list(map(get_op, operations)) operations = list(map(get_op, operations))
if verbose: logging.debug(f"{file.name} :: Found {len(operations)} additional operations: {operations}")
print(
f"{filename} :: Found {len(operations)} additional operations: {operations}"
)
# Sum points # Sum points
alg_line_costs = len(rows) * ALG_LINE_COST # every line of code alg_line_costs = len(rows) * ALG_LINE_COST # every line of code
@ -187,39 +139,36 @@ def evaluate(filepath, verbose=False, pedantic=False):
# Summarize # Summarize
costs = (alg_line_costs, constant_costs, register_costs, operation_costs, total) costs = (alg_line_costs, constant_costs, register_costs, operation_costs, total)
precision = max( precision = max([len(str(float(cost)).split(".")[1].lstrip("0")) for cost in costs])
[len(str(float(cost)).split(".")[1].lstrip("0")) for cost in costs] # ^ unreadable but works ¯\_(ツ)_/¯
) # unreadable but works ¯\_(ツ)_/¯
if verbose: logging.debug("")
print("")
print(f"{ul}Summary for {filename}:{end}\n") logging.info(f"{ul}Summary for {file.name}:{end}\n")
print(f" {alg_line_costs:5.{min(precision, 2)}f} LINES (excluding empty lines)") logging.info(f" {alg_line_costs:5.{min(precision, 2)}f} LINES (excluding empty lines)")
print(f"+ {constant_costs:5.{min(precision, 2)}f} CONSTANTS") logging.info(f"+ {constant_costs:5.{min(precision, 2)}f} CONSTANTS")
print(f"+ {register_costs:5.{min(precision, 2)}f} REGISTERS") logging.info(f"+ {register_costs:5.{min(precision, 2)}f} REGISTERS")
print(f"+ {operation_costs:5.{min(precision, 2)}f} OPERATIONS") logging.info(f"+ {operation_costs:5.{min(precision, 2)}f} OPERATIONS")
print(f"-------------") logging.info(f"-------------")
print(f"= {ylw}{total:5.{min(precision, 2)}f} TOTAL{end}\n\n") logging.info(f"= {ylw}{total:5.{min(precision, 2)}f} TOTAL{end}\n\n")
return total return total
# Evaluation
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser() parser = ArgumentParser()
parser.add_argument( parser.add_argument(
"source", "submissions",
type=str, type=Path,
nargs="+", nargs="+",
help="Either ZIP file(s) generated by simulator or the submission root folder", help="One or more submission root folder",
) )
parser.add_argument( parser.add_argument(
"-v", "-v",
"--verbose", "--verbose",
dest="verbose", dest="verbose",
action="store_true", action="store_true",
help="Prints additional information.", help="Prints additional information",
) )
parser.add_argument( parser.add_argument(
"-p", "-p",
@ -229,48 +178,46 @@ if __name__ == "__main__":
help="Extra pedantic (for example when checking for empty lines)", help="Extra pedantic (for example when checking for empty lines)",
) )
parser.add_argument( parser.add_argument(
"-t", "-r",
"--top", "--rename",
dest="top", dest="rename",
type=int, action="store_true",
help="Print top n candidates (defaults to 7)", help="Rename ZIP files to be the same name as its group directory",
) )
args = parser.parse_args() args = parser.parse_args()
verbose = args.verbose # Logger setup (DEBUG used for verbose output)
pedantic = args.pedantic if args.verbose:
top_n = args.top if args.top != None else 7 logging.basicConfig(format="%(message)s", level=logging.DEBUG)
else:
logging.basicConfig(format="%(message)s", level=logging.INFO)
# output score for each file # Output score for each file
scores = [] scores = []
# check if source argument is folder # Gather simulator files
savefiles = [] savefiles = []
for source in args.source: for submissions in args.submissions:
if os.path.isdir(source): for group in [f for f in submissions.iterdir() if f.is_dir()]:
# add all zip from subdirectories # Find submission ZIP file
for d in [ zips = [file for file in group.glob("*.zip") if is_valid_zip(file)]
e for e in os.listdir(source) if os.path.isdir(os.path.join(source, e)) if len(zips) == 0:
]: logging.error(f"Could not find valid ZIP file for {group.name}")
savefiles.append(os.path.join(source, d, f"{d}.zip")) savefile = zips[0] if len(zips) == 1 else select_zip(zips)
elif source.endswith(".zip"):
savefiles.append(source)
else:
print(f"Source '{source}' is not a ZIP file.")
# Rename if required
if args.rename and savefile.stem != group.stem:
savefile = savefile.rename((savefile.parent / group.stem).with_suffix(".zip"))
savefiles.append(savefile)
# Evaluate
for savefile in savefiles: for savefile in savefiles:
score = evaluate(savefile, verbose=verbose, pedantic=pedantic) score = evaluate(savefile, pedantic=args.pedantic)
if score == -1: scores.append((savefile, score))
continue
scores.append([savefile, score]) # Print leaderboard
# if there is more than one file, output top 3
scores.sort(key=lambda x: x[1]) scores.sort(key=lambda x: x[1])
if len(savefiles) > 1:
n = len(savefiles) logging.info(f"{ul}Leaderboard:{end}")
if n > 1: for index, (file, score) in enumerate(scores, start=1):
print(f"{ul}Leaderboard:{end}") logging.info(f"#{index} - {ylw}{score:5.2f} TOTAL{end} - {file.name}")
for i in range(min(n, top_n)):
file, score = scores[i]
print(f"#{i + 1} - {ylw}{score:5.2f} TOTAL{end} - {file}")

47
init.py Normal file
View File

@ -0,0 +1,47 @@
import shutil
from argparse import ArgumentParser
from pathlib import Path
from random import randbytes
from rc4 import rc4
def generate_key(key_length: int) -> bytes:
"""
Generates a random key with a length of key_length bytes
"""
return randbytes(key_length)
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument("group", type=Path, help="Group root directory")
parser.add_argument("file", type=Path, help="File to decrypt for this group")
key_opts = parser.add_mutually_exclusive_group(required=True)
key_opts.add_argument(
"--key-file",
"-k",
dest="key_file",
type=Path,
help="Key source file for this group",
)
key_opts.add_argument(
"--key-length",
"-l",
dest="key_length",
type=int,
help="Key length for this group",
)
args = parser.parse_args()
# Create required files in group directory
shutil.copy(args.file, args.group / "data_decrypted")
if args.key_file is not None:
try:
shutil.copy(args.key_file, args.group / "key")
except shutil.SameFileError:
pass
else:
with open(args.group / "key", "wb") as key_file:
key_file.write(generate_key(args.key_length))
rc4(args.group / "data_decrypted", args.group / "data_encrypted", args.group / "key")

View File

@ -1,11 +1,12 @@
import argparse import argparse
import json import json
import math import math
import sys
import os import os
import subprocess import subprocess
import sys
from pathlib import Path
from zipfile import ZipFile from util import is_valid_zip, select_zip
""" """
Checks if all submission output the correct decrypted data. Checks if all submission output the correct decrypted data.
@ -33,21 +34,31 @@ from zipfile import ZipFile
If more than one simulator ZIP file is present, the user will also be prompted to choose. If more than one simulator ZIP file is present, the user will also be prompted to choose.
""" """
SIMULATOR_PATH = "./minimax_simulator-2.0.0-cli.jar" SIMULATOR_PATH = Path("./minimax_simulator-2.0.0-cli.jar")
# helper strings # helper strings
OK = "\033[32mOK\033[0m" OK = "\033[32mOK\033[0m"
ERROR = "\033[31mERROR\033[0m" ERROR = "\033[31mERROR\033[0m"
def compare_result(actual_file: str, expected_file: str) -> bool: def ul(s: str) -> str:
"""
Adds ansi escape sequences to underline string.
"""
return f"\033[4m{s}\033[0m"
def compare_result(actual_file: Path, expected_file: Path) -> bool:
""" """
Compares the file at path 'actual_file' with the file at path 'expected_file' bytewise. Compares the file at path 'actual_file' with the file at path 'expected_file' bytewise.
If the actual file is larger than the expected file, additional bytes will be ignored. If the actual file is larger than the expected file, additional bytes will be ignored.
""" """
with open(actual_file, "rb") as actual, open(expected_file, "rb") as expected: try:
actual_bytes = actual.read() with open(actual_file, "rb") as actual, open(expected_file, "rb") as expected:
expected_bytes = expected.read() actual_bytes = actual.read()
expected_bytes = expected.read()
except FileNotFoundError:
return False
return actual_bytes[: len(expected_bytes)] == expected_bytes return actual_bytes[: len(expected_bytes)] == expected_bytes
@ -73,47 +84,14 @@ def create_mem_layout() -> dict:
return mem_layout return mem_layout
def is_valid_zip(file: str) -> bool:
"""
Checks if a zip file is a save file from the minimax simulator, e.g. if the contents
are a 'machine.json' and 'signal.json' file.
"""
if not file.endswith(".zip"):
return False
with ZipFile(file) as machine_zip:
zip_content = machine_zip.namelist()
return set(zip_content) == set(("machine.json", "signal.json"))
def select_zip(zips: list) -> str:
"""
Prompts the user to select a single zip file from a list, and returns it.
"""
print("Multiple zip files found. Please select one:")
for index, f in enumerate(zips, start=1):
print(f"[{index}] {f}")
while True:
try:
selection = input("Enter the number of the zip file to select: ")
selection = int(selection) - 1
if selection <= 0 or selection > len(zips):
print(f"Please select a number between 1 and {len(zips)}.")
else:
return zips[selection]
except ValueError:
print("Please enter a valid integer.")
except KeyboardInterrupt:
sys.exit("Aborted")
def evaluate( def evaluate(
zip_file: str, zip_file: Path,
sbox_file: str, sbox_file: Path,
key_file: str, key_file: Path,
data_file: str, data_file: Path,
result_file: str, result_file: Path,
mem_layout: dict, mem_layout: dict,
simulator: str, simulator: Path,
) -> None: ) -> None:
""" """
Runs the minimax simulator on the given input. The resulting file is saved in 'result_file'. Runs the minimax simulator on the given input. The resulting file is saved in 'result_file'.
@ -138,36 +116,38 @@ def evaluate(
"--export-file", "--export-file",
result_file, result_file,
"--export-from", "--export-from",
mem_layout["data"], mem_layout.get("result", mem_layout["data"]),
"--export-to", "--export-to",
mem_layout["data"] + math.ceil(os.path.getsize(data_file)), mem_layout.get("result", mem_layout["data"]) + math.ceil(os.path.getsize(data_file) / 4),
] ]
args = [ if "result" in mem_layout:
str(arg) for arg in args print("Decryption was not done in-place.")
] # subprocess.run requires all arguments to be strings
args = [str(arg) for arg in args]
# subprocess.run requires all arguments to be strings
print(f"Running simulator, storing result in '{result_file}'") print(f"Running simulator, storing result in '{result_file}'")
result = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print("\033[38;5;245m") print("\033[38;5;245m")
print(result.stdout.decode("utf-8")) subprocess.run(args, stdout=sys.stdout, stderr=sys.stderr)
print("\033[38;5;124")
print(result.stderr.decode("utf-8"))
print("\033[0m") print("\033[0m")
if __name__ == "__main__": # only run if executed as script if __name__ == "__main__": # only run if executed as script
parser = argparse.ArgumentParser(
parser = argparse.ArgumentParser() description="Runs the simulator on all projects found in the submissions directory."
parser.add_argument("submissions", type=str, help="Submissions root directory") )
parser.add_argument( parser.add_argument(
"group", type=str, help="Group directory, contains all project files" "submissions",
type=Path,
help="Submissions root directory",
)
parser.add_argument(
"group",
type=Path,
help="Group directory, contains all project files",
) )
parser.add_argument( parser.add_argument(
"-e", "-e",
@ -176,24 +156,20 @@ if __name__ == "__main__": # only run if executed as script
type=str, type=str,
help="Result file extension", help="Result file extension",
) )
parser.add_argument("-j", "--jar", dest="jar", type=str, help="Simulator jar file") parser.add_argument("-j", "--jar", dest="jar", type=Path, help="Simulator jar file")
args = parser.parse_args() args = parser.parse_args()
# Load teams # Load teams
teams = [ teams = [e for e in args.submissions.iterdir() if e.is_dir()]
e
for e in os.listdir(args.submissions)
if os.path.isdir(os.path.join(args.submissions, e))
]
print(f"The following teams were found:") print(f"The following teams were found:")
for team in teams: for team in teams:
print(f"* {team}") print(f"* {team.name}")
# Check directory structure # Check directory structure
for file in ("sBox", "key", "data_encrypted", "data_decrypted"): for filename in ("sBox", "key", "data_encrypted", "data_decrypted"):
if not os.path.exists(os.path.join(args.group, file)): if not (args.group / filename).exists():
sys.exit(f"Group project file '{file}' is missing.") sys.exit(f"Group project file '{filename}' is missing.")
# Check file extension # Check file extension
if args.file_ext is None: if args.file_ext is None:
@ -205,31 +181,24 @@ if __name__ == "__main__": # only run if executed as script
simulator = SIMULATOR_PATH if args.jar is None else args.jar simulator = SIMULATOR_PATH if args.jar is None else args.jar
# Store evaluation results # Store evaluation results
expected_file = os.path.join(args.group, "data_decrypted") expected_file = args.group / "data_decrypted"
results = [] results = []
# Evaluate each team # Evaluate each team
for team in teams: for team in teams:
print(ul(f"Evaluating team '{team.name}'"))
# load memory layout file if available (otherwise create and store it) # load memory layout file if available (otherwise create and store it)
try: try:
with open( with open(team / "mem_layout.json", "r") as mem_layout_file:
os.path.join(args.submissions, team, "mem_layout.json"), "r"
) as mem_layout_file:
mem_layout = json.load(mem_layout_file) mem_layout = json.load(mem_layout_file)
except FileNotFoundError: except FileNotFoundError:
mem_layout = create_mem_layout() mem_layout = create_mem_layout()
with open( with open(team / "mem_layout.json", "w") as mem_layout_file:
os.path.join(args.submissions, team, "mem_layout.json"), "w"
) as mem_layout_file:
json.dump(mem_layout, mem_layout_file) json.dump(mem_layout, mem_layout_file)
# Select project file (if more there is more than one zip file) # Select project file (if more there is more than one zip file)
zip_files = [ zip_files = [file for file in team.glob("*.zip") if is_valid_zip(file)]
os.path.join(args.submissions, team, f)
for f in os.listdir(os.path.join(args.submissions, team))
if is_valid_zip(os.path.join(args.submissions, team, f))
]
zip_file = zip_files[0] if len(zip_files) == 1 else select_zip(zip_files) zip_file = zip_files[0] if len(zip_files) == 1 else select_zip(zip_files)
# check memory layout and convert hexadecimal addresses # check memory layout and convert hexadecimal addresses
@ -246,12 +215,10 @@ if __name__ == "__main__": # only run if executed as script
mem_layout[key] = addr mem_layout[key] = addr
# evaluate team # evaluate team
sbox_file = os.path.join(args.group, "sBox") sbox_file = args.group / "sBox"
key_file = os.path.join(args.group, "key") key_file = args.group / "key"
data_file = os.path.join(args.group, "data_encrypted") data_file = args.group / "data_encrypted"
result_file = os.path.join( result_file = team / f"data_decrypted{args.file_ext}"
args.submissions, team, f"data_decrypted{args.file_ext}"
)
evaluate( evaluate(
zip_file, zip_file,
@ -270,8 +237,8 @@ if __name__ == "__main__": # only run if executed as script
print("Summary:") print("Summary:")
for team, result in zip(teams, results): for team, result in zip(teams, results):
if result is True: if result is True:
print(f"[{OK}] - {team}") print(f"[{OK}] - {team.name}")
else: else:
print(f"[{ERROR}] - {team}") print(f"[{ERROR}] - {team.name}")
print() print()

108
rc4.py Normal file
View File

@ -0,0 +1,108 @@
import logging
from argparse import ArgumentParser
from pathlib import Path
_VERBOSE_SBOX = False
_VERBOSE_PRGA = False
_STEPWISE_PRGA = False
def sbox(key: list[int]) -> list[int]:
"""
Creates and permutates the substitution box given a key.
"""
# create sbox
S = [i for i in range(256)]
# permutate sbox
j = 0
for i in range(256):
j = (j + S[i] + key[i % len(key)]) % 256
if _VERBOSE_SBOX:
print(f"Swapping S[{i=:3d}] = {S[i]:3d} and S[{j=:3d}] = {S[j]:3d}.")
S[i], S[j] = S[j], S[i] # swap
return S
def prga(text: list[int], S: list[int]) -> list[int]:
"""
Encrypts/Decrypts text given a substitution box.
"""
text = list(text)
i = 0
j = 0
for x in range(len(text)):
i = (i + 1) % 256
j = (j + S[i]) % 256
if _VERBOSE_PRGA:
print(f"Swapping S[{i=:3d}] = {S[i]:3d} and S[{j=:3d}] = {S[j]:3d}.")
if _STEPWISE_PRGA:
try:
input("Press ENTER to continue ...") # Step through
except KeyboardInterrupt:
print("KeyboardInterrupt.")
exit(0)
S[i], S[j] = S[j], S[i] # swap
K = S[(S[i] + S[j]) % 256]
text[x] ^= K
return text
def convert(text: bytes | list[int]) -> bytes | list[int]:
"""
Converts text from bytes -> list[int] and back.
"""
if isinstance(text, bytes):
return [int(c) for c in text]
elif isinstance(text, list):
return bytes(text)
else:
raise ValueError(f"Unsupported input type '{type(text)}'")
def rc4(in_file: str | Path, out_file: str | Path, key_file: str | Path):
"""
Execute the RC4 encryption/decryption algorithm on the input file,
creating the output file using the contents from the key file.
"""
with open(key_file, "rb") as key_file:
S = sbox(convert(key_file.read()))
with open("sbox_solution", "wb") as fd:
fd.write(convert(S))
# read input
with open(in_file, "rb") as data_in_file:
data_in = data_in_file.read()
data_in = convert(data_in) # bytes -> list[int]
data_out = prga(data_in, S)
data_out = convert(data_out) # list[int] -> bytes
# write output
with open(out_file, "wb") as data_out_file:
data_out_file.write(data_out)
if __name__ == "__main__":
parser = ArgumentParser(
description="Encrypts plaintext using the RC4 encryption algorithm, or decrypts RC4 ciphertext (based on input)"
)
parser.add_argument("input", type=str, help="Input file")
parser.add_argument("key", type=str, help="Key file")
parser.add_argument("output", type=str, help="Output file")
parser.add_argument("--verbose-sbox", const=True, default=False, action="store_const", help="Show S-Box swaps")
parser.add_argument("--verbose-prga", const=True, default=False, action="store_const", help="Show PRGA swaps")
parser.add_argument("--step-prga", const=True, default=False, action="store_const", help="Step-by-step PRGA swaps")
args = parser.parse_args()
_VERBOSE_SBOX = args.verbose_sbox
_VERBOSE_PRGA = args.verbose_prga
_STEPWISE_PRGA = args.step_prga
rc4(args.input, args.output, args.key)

36
util.py Normal file
View File

@ -0,0 +1,36 @@
import sys
from pathlib import Path
from zipfile import ZipFile
def is_valid_zip(file: Path) -> bool:
"""
Checks if a zip file is a save file from the minimax simulator, e.g. if the contents
are a 'machine.json' and 'signal.json' file.
"""
if not file.suffix == ".zip":
return False
with ZipFile(file) as machine_zip:
zip_content = machine_zip.namelist()
return set(zip_content) == set(("machine.json", "signal.json"))
def select_zip(zips: list[Path]) -> Path:
"""
Prompts the user to select a single zip file from a list, and returns it.
"""
print("Multiple zip files found. Please select one:")
for index, f in enumerate(zips, start=1):
print(f"[{index}] {f.name}")
while True:
try:
selection = input("Enter the number of the zip file to select: ")
selection = int(selection) - 1
if selection <= 0 or selection > len(zips):
print(f"Please select a number between 1 and {len(zips)}.")
else:
return zips[selection]
except ValueError:
print("Please enter a valid integer.")
except KeyboardInterrupt:
sys.exit("Aborted")