Updating Helpers

This commit is contained in:
Martin Karkowski 2022-07-21 23:31:19 +02:00
parent 6870b27829
commit 6302f2d2ed
17 changed files with 2522 additions and 48 deletions

View File

@ -0,0 +1,7 @@
set DIR=%~dp0
cd "%DIR%"
if not "%1"=="am_admin" (powershell start -verb runas '%0' am_admin & exit /b)
pip install .

View File

@ -0,0 +1,4 @@
set DIR=%~dp0
cd "%DIR%"
cd ..
nope-py-prepare-code --input lib --output temp --type ts

View File

@ -0,0 +1,5 @@
set DIR=%~dp0
cd "%DIR%"
cd ..
nope-py-prepare-code --input dist-py --output temp --type js

View File

@ -4,6 +4,7 @@ __license__ = 'MIT'
__copyright__ = 'Copyright 2022 M.Karkowski'
__version__ = '0.1.0'
from .parser import get_parser
from .main import main
from .logger import get_logger
from .logger import get_logger
from .js import get_parser as get_parser_js, transform as transform_js
from .ts import get_parser as get_parser_ts, transform as transform_ts
from .main import main

View File

@ -0,0 +1,2 @@
from .parser import get_parser
from .transformer import transform

View File

@ -0,0 +1,61 @@
import os
import multiprocessing as mp
from pathlib import Path
from parser import get_parser
from transformer import transform
MAX_CPU = mp.cpu_count()
if MAX_CPU <= 4:
MAX_CPU = 1
else:
MAX_CPU = MAX_CPU - 2
def parse(parser, path_to_file):
""" Function to parse the corresponding
Args:
parser (_type_): _description_
logger (_type_): _description_
input_path (_type_): _description_
output_path (_type_): _description_
name (_type_): _description_
path_to_file (_type_): _description_
dir_path (_type_): _description_
Returns:
str: _description_
"""
try:
code = transform(
parser.parse(open(path_to_file, encoding="utf-8").read()),
True
)
print(code)
except Exception as err:
raise err
if __name__ == "__main__":
# "C:\Users\m.karkowski\Documents\00-Repos\nope-backend\dist-py\helpers\runtimeMethods.js"
input_path = Path(__file__).parent.joinpath(
'..', "..", "dist-py", "helpers")
path_to_file = os.path.join(input_path, "jsonSchemaMethods.js")
input_path = Path(__file__).parent.joinpath(
'..', "..", "..", "dist-py", "helpers")
path_to_file = os.path.join(input_path, "runtimeMethods.js")
parse(
get_parser(),
path_to_file
)

View File

@ -0,0 +1,431 @@
%import common.WS
%import common.DIGIT
%import common.INT
//--------------------------------------------------------------------------
// JS-Ignorers:
//--------------------------------------------------------------------------
%ignore /\"use strict\";/
%ignore /Object.defineProperty(exports, "__esModule", { value: true });/
%ignore /var __(.|\n)*?};/
%ignore /\w+ = __decorate(.|\n)*?\], \w+\);/
%ignore /exports.\w+ = \w+;/
type_of_function: "void 0"
//--------------------------------------------------------------------------
//--------------------------------------------------------------------------
MULTI_LINE_COMMENT: "/**" /(.|\n)*?/ "*/\n"
COMMENT: "//" /.*/
STR: /(`.*?`)|(".*?")|(\'.*?\')/
%ignore MULTI_LINE_COMMENT
%ignore COMMENT
%ignore WS
// Ignore Exports
%ignore "export" "default" /w+/ ";"
%ignore "export" "*" "from" STR ";"
%ignore "export" "*" "as" /w+/ "from" STR ";"
%ignore "export" /(?<=export)(.|\n)+?(?=})/ "}" "from" STR ";"
%ignore "export" /(?<=export)(.|\n)+?(?=})/ "}" ";"
// --------------------------------------------------------------------------
// Summary:
// --------------------------------------------------------------------------
start: statement+
// we have to define, what is a valid return type
ret_expr: id
| str
| str_multi_line
| num
| bool
| null
| undefined
| instanceof
| typeof
| increment
| decrement
| invert
| list
| dict
| descruct_dict
| reassign
| await_stmt
| delete_stmt
| reg_ex
| throw_statement
| sum
| product
| boolean_operation
| accessor
| function
| arrow_function
| function_call
| inline_if
| new_class
| break_statement
| continue_statement
| "(" ret_expr ")"
| return_statement
// Now we ar able to provide this expressions wiht a terminator.
ret_expr_with_terminator: ret_expr terminator
return_statement: "return" [ret_expr]
statement: ret_expr_with_terminator
| declare
| declare_var
| declare_var_not_initialized
| declare_var_descructed
| import_stmt
| for
| while_statement
| do_while
| if_statement
| switch
| class
| decorated_class
| function
| try_catch
// --------------------------------------------------------------------------
// --------------------------------------------------------------------------
// Default Terminator:
terminator: ";"
// Default ID:
id: /[a-zA-Z_$][a-zA-Z0-9_$]*/ -> identifier
// We define valid import statements:
import_stmt: "import" str terminator -> import_stmt_all
| "import" id "from" str terminator -> import_stmt_id
| "import" "*" "as" id "from" str terminator -> import_stmt_as
| "import" "{" import_names "}" "from" str terminator -> import_stmt_from
// we may import multiple items:
import_names: import_name ("," import_name)* [","]
// The import name might include multiple lines
import_name: id
| id "as" id -> import_as_name
| /\n/
// Lets define a string;
str: /(`.*?`)|(".*?")|(\'.*?\')/
str_multi_line: /(`(\\`|.|\n)*?`)/
// Lets define a number;
num: INT ["." INT] | "." INT
// Define a boolean;
bool: "false" | "true"
null: "null"
undefined: "undefined"
increment: accessor "++"
| "++" accessor
decrement: accessor "--"
| "--" accessor
invert: "!" ret_expr
instanceof: id "instanceof" id
typeof: "typeof" accessor
delete_stmt: "delete" accessor
await_stmt: "await" ret_expr
reg_ex: "/" /(?<=\/).+(?=\/\w)/ "/" [/\w/]
?sum: product
| sum "+" product -> add
| sum "-" product -> sub
| accessor "+=" product -> assigned_add
| accessor "-=" product -> assigned_sub
?product: atom
| product "*" atom -> mult
| product "/" atom -> div
| accessor "*=" atom -> assigned_mult
| accessor "/=" atom -> assigned_div
boolean_operation: boolean_input boolean_operator boolean_input
boolean_operator: ">" -> bool_op_gt
| "<" -> bool_op_lt
| "<=" -> bool_op_lte
| ">=" -> bool_op_gte
| "==" -> bool_op_eq
| "===" -> bool_op_eq
| "!=" -> bool_op_not_eq
| "!==" -> bool_op_not_eq
| "&&" -> bool_op_and
| "||" -> bool_op_or
| "in" -> bool_op_in
boolean_input: ret_expr
?atom: ret_expr
| id
| "-" atom -> negative_types_repr
| "(" sum ")"
// Define Lists.
list: "[" [list_items] "]"
list_items: (list_item [","])+
list_item: ret_expr
| "..." ret_expr -> list_item_rest
descruct_list: "[" ((id | (rest_accessor)) [","])* "]" "=" ret_expr
// Define Objects
dict: "{" [dict_items] "}"
dict_items: (dict_item [","] )+
dict_item: (id | num | str) ":" ret_expr -> dict_item_default
| id "(" [func_args] ")" func_body -> dict_item_func
| "..." ret_expr -> dict_item_rest
| id -> dict_item_short
descruct_dict: "{" ((id | (id ":" id) | (rest_accessor)) [","])* "}" "=" ret_expr
// --------------------------------------------------------------------------
// Lets enable defining variables:
// --------------------------------------------------------------------------
export: "export"
declare: "declare" declare_var_type id terminator
declare_var.10: [export] declare_var_type id "=" ret_expr_with_terminator
| [export] declare_var_type id "=" function
declare_var_not_initialized: [export] declare_var_type id terminator
declare_var_descructed: declare_var_type descruct_dict terminator
| declare_var_type descruct_list terminator
// Valid defintions of variables.
declare_var_type: "let"
| "var"
| "const"
// --------------------------------------------------------------------------
// Acess Variables:
// --------------------------------------------------------------------------
// To access some vars
// We may want to convert them to
// specific types.
bracket_accessor: id
| str
| str_multi_line
| num
| bool
| instanceof
| typeof
| "(" await_stmt ")"
| await_stmt
| sum
| product
| boolean_operation
| function_call
| inline_if
| "(" bracket_accessor ")"
accessor: id -> var_based_access
| str
| num
| list
| dict
| await_stmt
| "(" await_stmt ")"
| accessor ["?"] ("." accessor)+ -> access_dot
| accessor ["?"] ("[" bracket_accessor "]")+ -> access_bracket
| function_call -> simple_access
rest_accessor: "..." id
// --------------------------------------------------------------------------
// Reassignment:
// --------------------------------------------------------------------------
// lets define a reassingment:
reassign: accessor "=" ret_expr
// --------------------------------------------------------------------------
// Functions:
// These may contain:
// - Async Functions
// - Sync Functions
// They might be defined as `arrow_function`.
//
// In General, a function is name with an valid id, may be typed,
// may receives a custom amount of arguments and normally provides a
// function body;
// --------------------------------------------------------------------------
function: [export] "function" [id] "(" [func_args] ")" func_body -> function
| [export] "async" "function" [id] "(" [func_args] ")" func_body -> async_function
arrow_function: "(" [func_args] ")" "=>" func_body -> arrow_function
| "async" "(" [func_args] ")" "=>" func_body -> async_arrow_function
// Now we have to define the valid arguments:
// The function may receives multiple arguments
// which are typed or implicit (-> any)
func_args: func_arg ("," func_arg)*
func_arg: id -> default_func_arg
| "..." id -> rest_func_arg
| id "=" ret_expr -> assigend_func_arg
// We dont want to enable list | dict destruction as a function arg, because
// we can not parse it more or less :(
// | dict | list
// Define the Function Body:
// This consists of the brackets and the statements in the function
func_body: "{" [func_statements] "}"
func_statements: func_statement+
func_statement: statement
// And now we define, which elements are allowed to be included
// in a function. in our case these are more or less all statements
// Additionally, we have ot make shure, that we are able to
// "return" something.
// --------------------------------------------------------------------------
// --------------------------------------------------------------------------
// The Defintion of how a function could be called.
// In here we consider sync as well as async calls.
function_call: accessor "(" [call_args] ")"
// We define our call args:
call_args: call_arg ("," call_arg)*
call_arg: ret_expr -> call_arg
| "..." ret_expr -> rest_call_arg
// --------------------------------------------------------------------------
// Loops
// --------------------------------------------------------------------------
// Define a For - Statement
for: "for" "(" declare_var_type for_iter_var for_iter_type ret_expr ")" iter_body -> default_for
| "for" "(" declare_var_type "[" (for_iter_var [","])+ "]" for_iter_type ret_expr ")" iter_body -> mutli_for
| "for" "(" declare_var_type id "=" ret_expr ";" ret_expr ";" ret_expr ")" iter_body -> ranged_for
for_iter_type: "in" | "of"
for_iter_var: id | dict | list
while_statement: "while" "(" ret_expr ")" iter_body
do_while: "do" iter_body "while" "(" ret_expr ")" terminator
iter_body: "{" iter_statements "}" | iter_statement
iter_statements: iter_statement*
iter_statement: statement
continue_statement: "continue"
break_statement: "break"
// --------------------------------------------------------------------------
// IF-Statements
// --------------------------------------------------------------------------
// Define a If - Statement
// We have to consider "if" "else if" and "else"
if_statement: "if" "(" ret_expr ")" if_body [else_if_statements] [else_statement]
else_if_statements: else_if_statement+
else_if_statement: "else" "if" "(" ret_expr ")" if_body
else_statement: "else" if_body
if_body: "{" statement* "}"
| ret_expr_with_terminator -> if_body_single
inline_if: ret_expr "?" ret_expr ":" ret_expr
// --------------------------------------------------------------------------
// switch-case
// --------------------------------------------------------------------------
switch: "switch" "(" ret_expr ")" switch_body
switch_body: "{" ((switch_case)* [switch_default])* "}"
switch_case: "case" ret_expr ":" [switch_case_body]
switch_default: "default" ":" [switch_case_body]
switch_case_body: (("{" switch_case_statements "}") | switch_case_statements) break_statement
switch_case_statements: switch_case_statement*
switch_case_statement: statement
// --------------------------------------------------------------------------
// Error Handling
// --------------------------------------------------------------------------
try_catch: "try" try_catch_body "catch" "(" id ")" try_catch_body ["finally" try_catch_body]
try_catch_body: "{" statement* "}"
throw_statement: "throw" ret_expr
throw_error_statement: "throw" "Error" "(" ret_expr ")"
// --------------------------------------------------------------------------
// classes
// --------------------------------------------------------------------------
class: [export] "class" id ["extends" id] class_body
decorated_class: "@" function_call class
| /let \w+ = (?=class)/ class terminator // Version compiled by tsc
class_body: "{" class_declarations* "}"
class_declarations: constructor
| getter
| setter
| method
| async_method
| decorated_method
| decorated_async_method
constructor: "constructor" "(" constructor_args? ")" func_body
constructor_args: constructor_arg ("," constructor_arg)*
constructor_arg: ["@" function_call] func_arg
getter: "get" id "(" ")" func_body
setter: "set" id "(" func_arg ")" func_body
method: id "(" [func_args] ")" func_body
async_method: "async" id "(" [func_args] ")" func_body
decorated_method: ("@" function_call) method
decorated_async_method: ("@" function_call) async_method
// Types for a new Class
new_class: "new" id "(" [call_args] ")"

View File

@ -2,7 +2,7 @@ from lark import Lark
from pathlib import Path
# Define the Grammar File.
grammar_file_path = Path(__file__).parent.joinpath('grammar.lark')
grammar_file_path = Path(__file__).parent.joinpath('grammar.js.lark')
def get_parser():

View File

@ -0,0 +1,918 @@
import _ast
import ast
import astor
import logging
from prepare_code import get_logger
from lark import Transformer
class CodeTransformeJs(Transformer):
def __init__(self, visit_tokens: bool = True, level=logging.INFO) -> None:
super().__init__(visit_tokens)
self.logger = get_logger("DebugWrapper", level)
self._callback_counter = 0
self._anonymous_func_to_ids = dict()
self._ids_to_anonymous_func = dict()
# The Key is allways a ast.Node (the Parent Node.)
# The Value is a Set, containing sub-nodes,
self._anonymous_func_tree = dict()
def _get_func_name(self):
name = f"callback_{self._callback_counter}"
self._callback_counter += 1
return name
def _adapt_items(self, items):
if type(items) is list:
ret = []
for item in items:
if (type(item) is _ast.FunctionDef):
ret.append(self._anonymous_func_to_ids[item])
else:
ret.append(item)
return ret
elif (type(items) is _ast.FunctionDef):
return self._anonymous_func_to_ids[items]
else:
return items
def _add_to_tree(self, parent, items):
""" Create the required Tree.
Parent = Node
Childs =
"""
if parent not in self._anonymous_func_tree:
self._anonymous_func_tree[parent] = set()
if type(items) is list:
for item in items:
self._add_to_tree_single(parent, item)
else:
self._add_to_tree_single(parent, items)
def _add_to_tree_single(self, parent, item):
if item in self._ids_to_anonymous_func:
self._anonymous_func_tree[parent].add(
self._ids_to_anonymous_func[item]
)
elif item in self._anonymous_func_tree:
s = self._anonymous_func_tree[parent]
# Store the contained Ids.
self._anonymous_func_tree[parent] = s.union(
self._anonymous_func_tree[item])
def _adapt_body(self, body):
defintions_to_add = set()
for item in body:
if item in self._anonymous_func_tree:
defintions_to_add = defintions_to_add.union(
self._anonymous_func_tree[item])
self._anonymous_func_tree.pop(item)
return list(defintions_to_add) + body
def log(self, name, items):
try:
self.logger.debug(
f"calling -> '{name}' -> type(items) = {type(items)}; len(items) = {len(items)}")
except:
self.logger.debug(
f"calling -> '{name}' -> type(items) = {type(items)}")
def log_extracted(self, name, **args):
try:
self.logger.debug(f"calling -> '{name}' -> args = {dict(**args)}")
except:
pass
def start(self, items):
self.log("start", items)
body = self._adapt_body(items)
ret = _ast.Module(body=body)
self._add_to_tree(ret, body)
return ret
def ret_expr(self, items):
self.log("ret_expr", items)
return items[0]
def ret_expr_with_terminator(self, items):
self.log("ret_expr_with_terminator", items)
ret_expr, terminator = items
self.log_extracted("ret_expr_with_terminator",
ret_expr=ret_expr, terminator=terminator)
return ret_expr
def return_statement(self, items):
self.log("ret_expr_with_terminator", items)
(ret_expr, ) = items
self.log_extracted("return_statement", ret_expr=ret_expr)
return _ast.Return(value=ret_expr)
def statement(self, items):
return items[0]
def terminator(self, items):
""" We want to skip the terminators.
"""
self.log("terminator", items)
return
def identifier(self, items):
self.log("id", items)
return _ast.Name(id=items[0].value)
def import_stmt_all(self, items):
self.log("import_stmt_all", items)
module, terminator = items
self.log_extracted("import_stmt_all", module=module,
terminator=terminator)
return _ast.Import(name=_ast.alias(name=module.value))
def import_stmt_id(self, items):
self.log("import_stmt_id", items)
return items
def import_stmt_as(self, items):
self.log("import_stmt_as", items)
(identifier, module, __) = items
if module.value == identifier:
names = [_ast.alias(name=module.value, asname=None)]
else:
names = [_ast.alias(name=module.value, asname=identifier)]
self.log_extracted("import_stmt_all",
identifier=identifier, module=module)
return _ast.Import(names=names)
def import_stmt_from(self, items):
self.log("import_stmt_from", items)
(import_names, _, __) = items
self.log_extracted("import_stmt_from", import_names=import_names)
# TODO: determine the level properly
return _ast.ImportFrom(module=_.value.replace('/', '.'), names=import_names, level=0)
def import_names(self, items):
return items
def import_name(self, items):
(items,) = items
return _ast.alias(name=items, asname=None)
def import_as_name(self, items):
(import_name, as_name) = items
return _ast.alias(name=import_name, asname=as_name)
def str(self, items):
self.log("str", items)
(items,) = items
items = items[1:-1]
return _ast.Constant(value=items)
def str_multi_line(self, items):
(items,) = items
items = items[1:-1]
return _ast.Constant(value=items)
def num(self, items):
self.log("num", items)
dig_01 = items[0].value
dig_02 = items[1].value if items[1] is not None else 0
value = float(str(f"{dig_01}.{dig_02}"))
return _ast.Constant(value=value)
def bool(self, items):
if items.value == "false":
return _ast.Constant(value=False)
return _ast.Constant(value=True)
def null(self, items):
return _ast.Constant(value=None)
def undefined(self, items):
return _ast.Constant(value=None)
def increment(self, items):
return self.assigned_add([items[0], _ast.Constant(value=1)])
def decrement(self, items):
return self.assigned_sub([items[0], _ast.Constant(value=1)])
def invert(self, items):
return _ast.UnaryOp(
op=_ast.Not(),
operand=items[0]
)
def istanceof(self, items):
return _ast.Call(
func=_ast.Name(id='isinstance', ctx=ast.Load()),
args=[
items[0],
items[1]
],
keywords=[]
)
def typeof(self, items):
return _ast.Call(
func=_ast.Name(id='type', ctx=ast.Load()),
args=[
items[0]
],
keywords=[]
)
def instanceof(self, items):
return _ast.Call(
func=_ast.Name(id='type', ctx=ast.Load()),
args=[
items[0]
],
keywords=[]
)
def delete_stmt(self, items):
return _ast.Delete(
targets=[
items[0]
]
)
def await_stmt(self, items):
return _ast.Await(
value=items[0]
)
def reg_ex(self, items):
return _ast.Call(
func=_ast.Name(id='re.compile', ctx=ast.Load()),
args=[
items[0]
],
keywords=[]
)
def add(self, items):
return self._op(items, _ast.Add())
def sub(self, items):
return self._op(items, _ast.Sub())
def mult(self, items):
return self._op(items, _ast.Mult())
def div(self, items):
return self._op(items, _ast.Div())
def _op(self, items, op):
self.log("assigned_add", items)
return _ast.BinOp(
left=items[0],
op=op,
right=items[1]
)
def _assigned_op(self, items, op):
self.log("assigned_add", items)
return self.reassign([
items[0],
self._op(items, op)
])
def assigned_add(self, items):
return self._assigned_op(items, _ast.Add())
def assigned_sub(self, items):
return self._assigned_op(items, _ast.Sub())
def assigned_mult(self, items):
return self._assigned_op(items, _ast.Mult())
def assigned_div(self, items):
return self._assigned_op(items, _ast.Div())
def boolean_operation(self, items):
self.log("boolean_operation", items)
return _ast.Compare(
left=items[0],
ops=[
items[1]
],
comparators=[
items[2]
]
)
def bool_op_gt(self, items):
return _ast.Gt()
def bool_op_lt(self, items):
return _ast.Lt()
def bool_op_gte(self, items):
return _ast.GtE()
def bool_op_lte(self, items):
return _ast.LtE()
def bool_op_eq(self, items):
return _ast.Eq()
def bool_op_not_eq(self, items):
return _ast.NotEq()
def bool_op_and(self, items):
return _ast.And()
def bool_op_or(self, items):
return _ast.Or()
def bool_op_in(self, items):
return _ast.In()
def boolean_input(self, items):
return items[0]
def list(self, items):
if items[0] is not None:
_items = self._adapt_items(items[0])
ret = _ast.List(
elts=_items,
ctx=ast.Load()
)
self._add_to_tree(ret, _items)
return ret
return _ast.List(
elts=[],
ctx=ast.Load()
)
def list_items(self, items):
self.log("list_items", items)
return items
def list_item(self, items):
self.log("list_item", items)
return items[0]
def list_item_rest(self, items):
self.log("list_item_rest", items)
return _ast.Starred(
value=items[0]
)
def descruct_list(self, items):
ret = []
for idx, target in enumerate(items[0]):
ret.append(
self.declare_var([
# Use the
target,
_ast.Subscript(
value=items[1],
slice=_ast.Constant(value=idx)
)
])
)
return ret
def dict(self, items):
keys = []
values = []
if type(items[0]) == list:
for item in items[0]:
keys += item["keys"]
values += values["values"]
ret = _ast.Dict(
keys=keys,
values=self._adapt_items(values),
ctx=ast.Load()
)
self._add_to_tree(ret, values)
return ret
def dict_items(self, items):
self.log("dict_items", items)
return items[0]
def dict_item_default(self, items):
return {
"keys": [items[0]],
"values": [items[1]]
}
def dict_item_func(self, items):
# TODO: How to Handle.
return {
"keys": [items[0].value],
"values": []
}
def dict_item_rest(self, items):
return {
"keys": [None],
"values": [items[0]]
}
def dict_item_short(self, items):
print("dict_item_short", items[0])
return {
"keys": [_ast.Constant(value=items[0].value)],
"values": [items[0]]
}
def descruct_dict(self, items):
# TODO: Impement
return _ast.Constant(value=f"DICT_DESCRUTION from dict '{items[1].value}'")
def export(self, items):
return
def declare_var(self, items):
(_, __, id, value) = items
self.log("declare_var", items)
try:
self.logger.debug(
f"declare_var: id={ast.dump(id)}; value={ast.dump(value)}")
except:
pass
_value = self._adapt_items(value)
ret = _ast.Assign(
targets=[id],
value=_value
)
self._add_to_tree(ret, _value)
return ret
def declare_var_not_initialized(self, items):
return _ast.Assign(
targets=[items[0]],
value=_ast.Assign(value=None)
)
def declare_var_descructed(self, items):
# Return only the desturcted stuff
return items[1]
def bracket_accessor(self, items):
return items[0]
def accessor(self, items):
return items[0]
def var_based_access(self, items):
return _ast.Name(id=items[0], ctx=ast.Load())
def simple_access(self, items):
return items[0]
def access_dot(self, items):
if type(items[1]) is _ast.Name:
return _ast.Attribute(
value=items[0],
attr=items[1].id,
ctx=ast.Load()
)
return _ast.Attribute(
value=items[0],
attr=items[1],
ctx=ast.Load()
)
def access_bracket(self, items):
return _ast.Subscript(
value=items[0],
slice=items[1]
)
def rest_accessor(self, items):
return _ast.Call(
func=_ast.Name(id='deepcopy', ctx=ast.Load()),
args=[
items[0],
],
keywords=[]
)
def reassign(self, items):
return self.declare_var([None, None, items[0], items[1]])
def _function(self, items, type=None):
_constructor = _ast.AsyncFunctionDef if type == "async" else _ast.FunctionDef
(export, name, func_args, body) = items
self.logger.debug("_function", name, func_args, body)
kwargs = {
"name": name.id,
"body": self._adapt_body(body),
"args": func_args,
"decorator_list": []
}
ret = _constructor(** kwargs)
# Register the Function.
self._anonymous_func_to_ids[ret] = name
self._ids_to_anonymous_func[name] = ret
# self._anonymous_func_tree[ret] = name
return ret
def function(self, items):
self.log("function", items)
return self._function(items)
def async_function(self, items):
self.log("async_function", items)
return self._function(items, "async")
def _arrow_function(self, items, type=None):
self.log("_arrow_function", items)
print(items[0])
# TODO: Get name for callback
name = _ast.Name(id=self._get_func_name())
return self._function([None, name, *items], type)
def arrow_function(self, items):
self.log("arrow_function", items)
return self._arrow_function(items)
def arrow_function_one_arg(self, items):
self.log("arrow_function", items)
return self._arrow_function(items)
def async_arrow_function(self, items):
self.log("async_arrow_function", items)
return self._arrow_function(items, "async")
def func_args(self, items):
ret = {
"args": [],
"defaults": [],
"vararg": []
}
for item in items:
ret["args"] += item["args"]
ret["defaults"] += item["defaults"]
ret["vararg"] += item["vararg"]
if len(ret["vararg"]) == 0:
ret.pop("vararg")
return _ast.arguments(**ret)
def func_arg(self, items):
return items
def default_func_arg(self, items):
self.log("default_func_arg", items)
# TODO: Implement
return {
"args": [_ast.arg(arg=items[0].id)],
"defaults": [],
"vararg": []
}
def rest_func_arg(self, items):
self.log("rest_func_arg", items)
return {
"vararg": [_ast.arg(arg=items[0].id)],
"args": [],
"defaults": []
}
def assigend_func_arg(self, items):
self.log("assigend_func_arg", items)
return {
"args": [_ast.arg(arg=items[0].id)],
"defaults": [items[1]],
"vararg": []
}
def implicit_or_typed(self, items):
return items[0]
def func_body(self, items):
ret = []
elements = items[0]
self.log("func_body", elements)
for element in elements:
if type(element) is list:
ret += element
else:
ret.append(element)
return ret
def func_statements(self, items):
return items
def func_statement(self, items):
return items
def function_call(self, items):
self.log("function_call", items)
id = items[0]
args = items[1] if items[1] is not None else []
# Adapt the ID:
if type(id) is _ast.Name:
id = id
ret = _ast.Call(
func=id,
args=self._adapt_items(args),
keywords=[]
)
return _ast.Call(
func=id,
args=self._adapt_items(args),
keywords=[]
)
def call_args(self, items):
self.log("call_args", items)
return items
def call_arg(self, items):
self.log("call_arg", items)
return items[0]
def rest_call_arg(self, items):
return _ast.Starred(
value=items[0]
)
def default_for(self, items):
pass
def multi_for(self, items):
pass
def ranged_for(self, items):
pass
def for_iter_type(self, items):
pass
def for_iter_var(self, items):
pass
def while_statement(self, items):
self.log("while_statement", items)
self.logger.debug(items[0])
self.logger.debug(items[1])
return _ast.While(
test=items[0],
body=self._adapt_body(
items[1] if type(items) == list else [items[1]]
),
orelse=[]
)
def iter_body(self, items):
self.log("iter_body", items)
if items[0] is None:
return []
return items[0]
def iter_statements(self, items):
return items
def iter_statement(self, items):
return items[0]
def continue_statement(self, items):
return _ast.Continue()
def break_statement(self, items):
return _ast.Break()
def if_statement(self, items):
test = items[0]
body = items[1]
elifs = items[2]
else_body = items[3] if items[3] is not None else []
inclused_elif = elifs is not None
if inclused_elif:
def _rec(idx):
(_test, _body) = elifs[idx]
if idx < len(elifs) - 1:
return _ast.If(
test=_test,
body=self._adapt_body(_body),
orelse=[
_rec(idx+1)
]
)
# Now we know, that we are working with
# the last item
return _ast.If(
test=_test,
body=self._adapt_body(_body),
orelse=else_body
)
return _ast.If(
test=test,
body=self._adapt_body(body),
orelse=[
_rec(0)
]
)
return _ast.If(
test=test,
body=self._adapt_body(body),
orelse=else_body
)
def else_if_statements(self, items):
return items
def else_if_statement(self, items):
return items
def else_statement(self, items):
return items[0]
def if_body(self, items):
return items
def if_body_single(self, items):
return items
def inline_if(self, items):
_items = self._adapt_body(items)
ret = _ast.IfExp(
test=_items[0],
body=_items[1],
orelse=_items[2]
)
self._add_to_tree(ret, _items)
return ret
def new_class(self, items):
identifier = items[0]
args = items[1] if items[1] is not None else []
return _ast.Call(func=_ast.Name(id=identifier), args=args, keywords=[])
def switch(self, items):
raise Exception("!!SWITCH_CASE!!")
return _ast.Match()
def switch_body(self, items):
return _ast.Break()
def switch_case(self, items):
return _ast.Break()
def switch_default(self, items):
return _ast.Break()
def switch_case_statements(self, items):
return _ast.Break()
def switch_case_statement(self, items):
return _ast.Break()
def try_catch(self, items):
return _ast.Break()
def try_catch_body(self, items):
return
def throw_statement(self, items):
return _ast.Raise(
exc=items[0]
)
def throw_error_statement(self, items):
return _ast.Raise(
exc=items[0]
)
class DebuggedCodeTransformeJs(Transformer):
def __init__(self, visit_tokens: bool = True) -> None:
super().__init__(visit_tokens)
self.logger = get_logger("DebugWrapper")
def __getattribute__(self, __name: str):
logger = object.__getattribute__(self, "logger")
try:
func = object.__getattribute__(self, __name)
if callable(func):
def cb(items):
logger.info(f"Calling function '{__name}'")
try:
logger.info(f"received parameters => {len(items)}")
except:
pass
return func(items)
return cb
return func
except:
pass
logger.warn(f"'{__name}' has not been found!")
raise KeyError(f"'{__name}' has not been found!")
def transform(tree, debug):
transformer = CodeTransformeJs(
True, logging.DEBUG if debug else logging.INFO)
program = transformer.transform(tree)
if debug:
print(ast.dump(program, indent=2))
code = astor.to_source(program)
if debug:
print(code)
return code

View File

@ -0,0 +1 @@
from .get_logger import get_logger

View File

@ -1,15 +1,23 @@
import argparse
from msilib.schema import Error
import os
import re
import multiprocessing as mp
from .parser import get_parser
from .logger import get_logger
from . import get_logger, ts, js
MAX_CPU = mp.cpu_count()
if MAX_CPU <= 4:
MAX_CPU = 2
MAX_CPU = 1
else:
MAX_CPU = MAX_CPU - 2
func = {
"ts": ts,
"js": js
}
def worker(opt):
""" Helper function, which will be called during a multiprocess. Converts the input.
@ -17,9 +25,11 @@ def worker(opt):
Args:
opt: packed options
"""
return parse(get_parser(), *opt)
type = opt[-2]
def parse(parser, logger, input_path, output_path, name, path_to_file, dir_path):
return parse(func[type].get_parser(), *opt)
def parse(parser, logger, input_path, output_path, name, path_to_file, dir_path, type, detailed_logs):
""" Function to parse the corresponding
Args:
@ -35,9 +45,14 @@ def parse(parser, logger, input_path, output_path, name, path_to_file, dir_path)
str: _description_
"""
func = {
"ts": ts,
"js": js
}
try:
python_name = name.replace(".ts", ".py")
python_name = name.replace(".ts", ".py").replace(".js", ".py")
rel_path = dir_path[len(input_path) + 1:]
logger.debug(f"determined the following rel-path = {rel_path}")
@ -45,10 +60,12 @@ def parse(parser, logger, input_path, output_path, name, path_to_file, dir_path)
pytho_path_to_file = os.path.join(output_path,rel_path,python_name)
try:
content = parser.parse(open(path_to_file, encoding="utf-8").read())
content = parser.parse(
parser.parse(open(path_to_file, encoding="utf-8").read())
)
logger.info(f"converted {path_to_file}")
return (
(pytho_path_to_file , content),
(os.path.join(output_path,rel_path), pytho_path_to_file , content),
(False ,False)
)
@ -70,10 +87,14 @@ def parse(parser, logger, input_path, output_path, name, path_to_file, dir_path)
pass
logger.error(f"Failed to convert {ptr_to_err}")
logger.error(str(err).split("\n")[0])
if detailed_logs:
logger.error(err)
else:
logger.error(str(err).split("\n")[0])
return (
(False, False),
(False, False, False),
(str(err), ptr_to_err)
)
@ -81,7 +102,7 @@ def parse(parser, logger, input_path, output_path, name, path_to_file, dir_path)
# An unknown Error has been found
logger.error(err)
return (
(False,False),
(False, False,False),
(str(err), False)
)
@ -102,6 +123,8 @@ def main():
help='Shows debug related output')
parser.add_argument('--cores', type=int, default=MAX_CPU, dest='cores',
help='The Amount of cores, which must be use')
parser.add_argument('--details', dest='details', action='store_true',
help='Shows all erros.')
# Create a Logger:
logger = get_logger("nope-py-prepare")
@ -109,7 +132,7 @@ def main():
args = parser.parse_args()
files_to_ignore ={
"js": lambda name: name.endswith(".js") and not (name.endswith(".spec.js") or "index" in name or "/types/" in name),
"js": lambda name: name.endswith(".js") and not (name.endswith(".spec.js") or "index" in name or "\\types\\" in name or "/types/" in name),
"ts": lambda name: name.endswith(".ts") and not (name.endswith(".spec.ts") or "index" in name),
}
@ -128,19 +151,41 @@ def main():
logger.info(f"Checking dir: '{input_path}'")
# Get all relevant Files.
for dir_path, directories, files in os.walk(input_path):
for file_name in files:
# Generate the Path of files.
path_to_file = os.path.join(dir_path, file_name)
if files_to_ignore[args.type](path_to_file):
# Show a log message
logger.debug(f"Found file: '{file_name}' at '{dir_path}'")
if os.path.isdir(input_path):
# Get all relevant Files.
for dir_path, directories, files in os.walk(input_path):
for file_name in files:
# Generate the Path of files.
path_to_file = os.path.join(dir_path, file_name)
if files_to_ignore[args.type](path_to_file):
# Show a log message
logger.debug(f"Found file: '{file_name}' at '{dir_path}'")
# Add the file-name, path to the file and the dir path
typescript_files.append((file_name, path_to_file, dir_path))
elif os.path.isfile(input_path):
# Generate the Path of files.
path_to_file = input_path
if files_to_ignore[args.type](path_to_file):
dir_path = os.path.dirname(path_to_file)
file_name = path_to_file[len(dir_path) + 1 :]
# Show a log message
logger.debug(f"Found file: '{file_name}' at '{dir_path}'")
# Add the file-name, path to the file and the dir path
typescript_files.append((file_name, path_to_file, dir_path))
else:
raise Error("Failed to load the file")
# Add the file-name, path to the file and the dir path
typescript_files.append((file_name, path_to_file, dir_path))
# Define the Destination
@ -159,7 +204,9 @@ def main():
output_path, # The Output Path
file_name, # Name of the File
path_to_file, # Path to the File
dir_path # Path of the Directory.
dir_path, # Path of the Directory.
args.type,
args.details
) for file_name, path_to_file, dir_path in typescript_files
]
)
@ -170,16 +217,22 @@ def main():
success = []
failed = []
for (py_file_name,content),(err, org_file_name) in results:
for (path, py_file_name,content),(err, org_file_name) in results:
if py_file_name and content:
success.append(py_file_name)
os.makedirs(path, exist_ok=True)
with open(py_file_name, "w") as file:
file.write(content)
else:
failed.append(org_file_name)
logger.warn(f"The following files failed ({len(failed)}):")
for (idx, file_name) in enumerate(failed):
print("\t", idx, ".\t", file_name)
if len(failed):
logger.warn(f"The following files failed ({len(failed)}):")
for (idx, file_name) in enumerate(failed):
print("\t", idx, ".\t", file_name)
if (args.debug):
print("\n"*2)

View File

@ -0,0 +1,2 @@
from .parser import get_parser
from .transformer import transform

View File

@ -0,0 +1,60 @@
import os
import multiprocessing as mp
from pathlib import Path
from parser import get_parser
from transformer import transform
MAX_CPU = mp.cpu_count()
if MAX_CPU <= 4:
MAX_CPU = 1
else:
MAX_CPU = MAX_CPU - 2
def parse(parser, path_to_file):
""" Function to parse the corresponding
Args:
parser (_type_): _description_
logger (_type_): _description_
input_path (_type_): _description_
output_path (_type_): _description_
name (_type_): _description_
path_to_file (_type_): _description_
dir_path (_type_): _description_
Returns:
str: _description_
"""
try:
code = transform(
parser.parse(open(path_to_file, encoding="utf-8").read()),
False
)
print(code)
except Exception as err:
raise err
if __name__ == "__main__":
# "C:\Users\m.karkowski\Documents\00-Repos\nope-backend\dist-py\helpers\runtimeMethods.js"
input_path = Path(__file__).parent.joinpath(
'..', "..", "..", "dist-py", "helpers")
path_to_file = os.path.join(input_path, "jsonSchemaMethods.js")
input_path = Path(__file__).parent.joinpath(
'..', "..", "..", "dist-py", "helpers")
path_to_file = os.path.join(input_path, "runtimeMethods.js")
parse(
get_parser(),
path_to_file
)

View File

@ -196,7 +196,7 @@ descruct_list: "[" ((id | (rest_accessor)) [","])* "]" "=" ret_
dict: "{" [dict_items] "}"
dict_items: (dict_item [","] )+
dict_item: (id | num | str) ":" ret_expr -> dict_item_default
| id "(" func_args? ")" func_body -> dict_item_func
| id "(" [func_args] ")" func_body -> dict_item_func
| "..." ret_expr -> dict_item_rest
| id -> dict_item_short
@ -282,15 +282,16 @@ reassign: accessor "=" ret_expr
// function body;
// --------------------------------------------------------------------------
function_helper: ["<" type ("," type)* ">"] "(" func_args? ")" [":" type] func_body
function_generic_input_type: "<" type ("," type)* ">"
function_return_type: ":" type
function: [export] "function" [id] ["<" type ("," type)* ">"] "(" func_args? ")" [":" type] func_body -> function
| [export] "async" "function" [id] ["<" type ("," type)* ">"] "(" func_args? ")" [":" type] func_body -> async_function
function.10: [export] "function" [id] [function_generic_input_type] "(" [func_args] ")" [function_return_type] func_body -> function
| [export] "async" "function" [id] [function_generic_input_type] "(" [func_args] ")" [function_return_type] func_body -> async_function
arrow_function: ["<" type ("," type)* ">"] (( "(" func_args? ")" [":" type] ) | func_arg ) "=>" func_body -> arrow_function
| "async" ["<" type ("," type)* ">"] (( "(" func_args? ")" [":" type] ) | func_arg ) "=>" func_body -> async_arrow_function
arrow_function.10: [function_generic_input_type] "(" [func_args] ")" [function_return_type] "=>" func_body -> arrow_function
| "async" [function_generic_input_type] "(" [func_args] ")" [function_return_type] "=>" func_body -> async_arrow_function
arrow_function_type: ["<" type ("," type)* ">"] "(" func_args? ")" [":" type] "=>" type
arrow_function_type: [function_generic_input_type] "(" [func_args] ")" [function_return_type] "=>" type
// Now we have to define the valid arguments:
// The function may receives multiple arguments
@ -316,9 +317,6 @@ func_body: "{" [func_statements] "}"
func_statements: func_statement+
func_statement: statement
arrow_func_body: func_body
| ret_expr
// And now we define, which elements are allowed to be included
// in a function. in our case these are more or less all statements
// Additionally, we have ot make shure, that we are able to
@ -331,7 +329,7 @@ arrow_func_body: func_body
// The Defintion of how a function could be called.
// In here we consider sync as well as async calls.
function_call: accessor ["<" type ("," type)* ">"] "(" call_args? ")"
function_call: accessor ["<" type ("," type)* ">"] "(" [call_args] ")"
// We define our call args:
@ -436,6 +434,7 @@ try_catch: "try" try_catch_body "catch" "(" id ")" try_catch_
try_catch_body: "{" statement* "}"
throw_statement: "throw" ret_expr
throw_error_statement: "throw" "Error" "(" ret_expr ")"
// --------------------------------------------------------------------------
// interfaces
@ -452,7 +451,7 @@ interface_declarations: interface_declaration
// Interfaces contains implicit or typed attributes or methods
interface_declaration: [readonly] implicit_or_typed terminator
| id ["<" type ("," type)* ">"] "(" func_args? ")" [":" type ] terminator
| id ["<" type ("," type)* ">"] "(" [func_args] ")" [":" type ] terminator
// --------------------------------------------------------------------------
// classes
@ -497,10 +496,10 @@ decorated_property_assigned: ("@" function_call) property_assigned
getter: [visibility] "get" id "("")" [":" type] func_body
setter: [visibility] "set" id "(" func_arg ")" [":" "void"] func_body
method: [visibility] id ["<" type ("," type)* ">"] "(" func_args? ")" [":" type] func_body
async_method: [visibility] "async" id ["<" type ("," type)* ">"] "(" func_args? ")" [":" type] func_body
method: [visibility] id ["<" type ("," type)* ">"] "(" [func_args] ")" [":" type] func_body
async_method: [visibility] "async" id ["<" type ("," type)* ">"] "(" [func_args] ")" [":" type] func_body
decorated_method: ("@" function_call) method
decorated_async_method: ("@" function_call) async_method
// Types for a new Class
new_class: "new" id_typed "(" call_args? ")"
new_class: "new" id_typed "(" [call_args] ")"

View File

@ -0,0 +1,14 @@
from lark import Lark
from pathlib import Path
# Define the Grammar File.
grammar_file_path = Path(__file__).parent.joinpath('grammar.ts.lark')
def get_parser():
""" Helper, to generate a parser.
Returns:
A lark parser
"""
return Lark.open(grammar_file_path, debug = True, maybe_placeholders=True)

View File

@ -0,0 +1,916 @@
import _ast
import ast
import astor
import logging
from prepare_code import get_logger
from lark import Transformer
class CodeTransformeTs(Transformer):
def __init__(self, visit_tokens: bool = True, level=logging.INFO) -> None:
super().__init__(visit_tokens)
self.logger = get_logger("DebugWrapper", level)
self._callback_counter = 0
self._anonymous_func_to_ids = dict()
self._ids_to_anonymous_func = dict()
# The Key is allways a ast.Node (the Parent Node.)
# The Value is a Set, containing sub-nodes,
self._anonymous_func_tree = dict()
def _get_func_name(self):
name = f"callback_{self._callback_counter}"
self._callback_counter += 1
return name
def _adapt_items(self, items):
if type(items) is list:
ret = []
for item in items:
if (type(item) is _ast.FunctionDef):
ret.append(self._anonymous_func_to_ids[item])
else:
ret.append(item)
return ret
elif (type(items) is _ast.FunctionDef):
return self._anonymous_func_to_ids[items]
else:
return items
def _add_to_tree(self, parent, items):
""" Create the required Tree.
Parent = Node
Childs =
"""
if parent not in self._anonymous_func_tree:
self._anonymous_func_tree[parent] = set()
if type(items) is list:
for item in items:
self._add_to_tree_single(parent, item)
else:
self._add_to_tree_single(parent, items)
def _add_to_tree_single(self, parent, item):
if item in self._ids_to_anonymous_func:
self._anonymous_func_tree[parent].add(
self._ids_to_anonymous_func[item]
)
elif item in self._anonymous_func_tree:
s = self._anonymous_func_tree[parent]
# Store the contained Ids.
self._anonymous_func_tree[parent] = s.union(
self._anonymous_func_tree[item])
def _adapt_body(self, body):
defintions_to_add = set()
for item in body:
if item in self._anonymous_func_tree:
defintions_to_add = defintions_to_add.union(
self._anonymous_func_tree[item])
self._anonymous_func_tree.pop(item)
return list(defintions_to_add) + body
def log(self, name, items):
try:
self.logger.debug(
f"calling -> '{name}' -> type(items) = {type(items)}; len(items) = {len(items)}")
except:
self.logger.debug(
f"calling -> '{name}' -> type(items) = {type(items)}")
def log_extracted(self, name, **args):
try:
self.logger.debug(f"calling -> '{name}' -> args = {dict(**args)}")
except:
pass
def start(self, items):
self.log("start", items)
body = self._adapt_body(items)
ret = _ast.Module(body=body)
self._add_to_tree(ret, body)
return ret
def ret_expr(self, items):
self.log("ret_expr", items)
return items[0]
def ret_expr_with_terminator(self, items):
self.log("ret_expr_with_terminator", items)
ret_expr, terminator = items
self.log_extracted("ret_expr_with_terminator",
ret_expr=ret_expr, terminator=terminator)
return ret_expr
def return_statement(self, items):
self.log("ret_expr_with_terminator", items)
(ret_expr, ) = items
self.log_extracted("return_statement", ret_expr=ret_expr)
return _ast.Return(value=ret_expr)
def statement(self, items):
return items[0]
def terminator(self, items):
""" We want to skip the terminators.
"""
self.log("terminator", items)
return
def identifier(self, items):
self.log("id", items)
return _ast.Name(id=items[0].value)
def import_stmt_all(self, items):
self.log("import_stmt_all", items)
module, terminator = items
self.log_extracted("import_stmt_all", module=module,
terminator=terminator)
return _ast.Import(name=_ast.alias(name=module.value))
def import_stmt_id(self, items):
self.log("import_stmt_id", items)
return items
def import_stmt_as(self, items):
self.log("import_stmt_as", items)
(identifier, module, __) = items
if module.value == identifier:
names = [_ast.alias(name=module.value, asname=None)]
else:
names = [_ast.alias(name=module.value, asname=identifier)]
self.log_extracted("import_stmt_all",
identifier=identifier, module=module)
return _ast.Import(names=names)
def import_stmt_from(self, items):
self.log("import_stmt_from", items)
(import_names, _, __) = items
self.log_extracted("import_stmt_from", import_names=import_names)
# TODO: determine the level properly
return _ast.ImportFrom(module=_.value.replace('/', '.'), names=import_names, level=0)
def import_names(self, items):
return items
def import_name(self, items):
(items,) = items
return _ast.alias(name=items, asname=None)
def import_as_name(self, items):
(import_name, as_name) = items
return _ast.alias(name=import_name, asname=as_name)
def str(self, items):
self.log("str", items)
(items,) = items
items = items[1:-1]
return _ast.Constant(value=items)
def str_multi_line(self, items):
(items,) = items
items = items[1:-1]
return _ast.Constant(value=items)
def num(self, items):
self.log("num", items)
dig_01 = items[0].value
dig_02 = items[1].value if items[1] is not None else 0
value = float(str(f"{dig_01}.{dig_02}"))
return _ast.Constant(value=value)
def bool(self, items):
if items.value == "false":
return _ast.Constant(value=False)
return _ast.Constant(value=True)
def null(self, items):
return _ast.Constant(value=None)
def undefined(self, items):
return _ast.Constant(value=None)
def increment(self, items):
return self.assigned_add([items[0], _ast.Constant(value=1)])
def decrement(self, items):
return self.assigned_sub([items[0], _ast.Constant(value=1)])
def invert(self, items):
return _ast.UnaryOp(
op=_ast.Not(),
operand=items[0]
)
def istanceof(self, items):
return _ast.Call(
func=_ast.Name(id='isinstance', ctx=ast.Load()),
args=[
items[0],
items[1]
],
keywords=[]
)
def typeof(self, items):
return _ast.Call(
func=_ast.Name(id='type', ctx=ast.Load()),
args=[
items[0]
],
keywords=[]
)
def instanceof(self, items):
return _ast.Call(
func=_ast.Name(id='type', ctx=ast.Load()),
args=[
items[0]
],
keywords=[]
)
def delete_stmt(self, items):
return _ast.Delete(
targets=[
items[0]
]
)
def await_stmt(self, items):
return _ast.Await(
value=items[0]
)
def reg_ex(self, items):
return _ast.Call(
func=_ast.Name(id='re.compile', ctx=ast.Load()),
args=[
items[0]
],
keywords=[]
)
def add(self, items):
return self._op(items, _ast.Add())
def sub(self, items):
return self._op(items, _ast.Sub())
def mult(self, items):
return self._op(items, _ast.Mult())
def div(self, items):
return self._op(items, _ast.Div())
def _op(self, items, op):
self.log("assigned_add", items)
return _ast.BinOp(
left=items[0],
op=op,
right=items[1]
)
def _assigned_op(self, items, op):
self.log("assigned_add", items)
return self.reassign([
items[0],
self._op(items, op)
])
def assigned_add(self, items):
return self._assigned_op(items, _ast.Add())
def assigned_sub(self, items):
return self._assigned_op(items, _ast.Sub())
def assigned_mult(self, items):
return self._assigned_op(items, _ast.Mult())
def assigned_div(self, items):
return self._assigned_op(items, _ast.Div())
def boolean_operation(self, items):
self.log("boolean_operation", items)
return _ast.Compare(
left=items[0],
ops=[
items[1]
],
comparators=[
items[2]
]
)
def bool_op_gt(self, items):
return _ast.Gt()
def bool_op_lt(self, items):
return _ast.Lt()
def bool_op_gte(self, items):
return _ast.GtE()
def bool_op_lte(self, items):
return _ast.LtE()
def bool_op_eq(self, items):
return _ast.Eq()
def bool_op_not_eq(self, items):
return _ast.NotEq()
def bool_op_and(self, items):
return _ast.And()
def bool_op_or(self, items):
return _ast.Or()
def bool_op_in(self, items):
return _ast.In()
def boolean_input(self, items):
return items[0]
def list(self, items):
if items[0] is not None:
_items = self._adapt_items(items[0])
ret = _ast.List(
elts=_items,
ctx=ast.Load()
)
self._add_to_tree(ret, _items)
return ret
return _ast.List(
elts=[],
ctx=ast.Load()
)
def list_items(self, items):
self.log("list_items", items)
return items
def list_item(self, items):
self.log("list_item", items)
return items[0]
def list_item_rest(self, items):
self.log("list_item_rest", items)
return _ast.Starred(
value=items[0]
)
def descruct_list(self, items):
ret = []
for idx, target in enumerate(items[0]):
ret.append(
self.declare_var([
# Use the
target,
_ast.Subscript(
value=items[1],
slice=_ast.Constant(value=idx)
)
])
)
return ret
def dict(self, items):
keys = []
values = []
if type(items[0]) == list:
for item in items[0]:
keys += item["keys"]
values += values["values"]
ret = _ast.Dict(
keys=keys,
values=self._adapt_items(values),
ctx=ast.Load()
)
self._add_to_tree(ret, values)
return ret
def dict_items(self, items):
self.log("dict_items", items)
return items[0]
def dict_item_default(self, items):
return {
"keys": [items[0]],
"values": [items[1]]
}
def dict_item_func(self, items):
# TODO: How to Handle.
return {
"keys": [items[0].value],
"values": []
}
def dict_item_rest(self, items):
return {
"keys": [None],
"values": [items[0]]
}
def dict_item_short(self, items):
return {
"keys": [_ast.Constant(value=items[0].value)],
"values": [items[0]]
}
def descruct_dict(self, items):
# TODO: Impement
return _ast.Constant(value=f"DICT_DESCRUTION from dict '{items[1].value}'")
def export(self, items):
return
def declare_var(self, items):
(_, __, id, value) = items
self.log("declare_var", items)
try:
self.logger.debug(
f"declare_var: id={ast.dump(id)}; value={ast.dump(value)}")
except:
pass
_value = self._adapt_items(value)
ret = _ast.Assign(
targets=[id],
value=_value
)
self._add_to_tree(ret, _value)
return ret
def declare_var_not_initialized(self, items):
return _ast.Assign(
targets=[items[0]],
value=_ast.Assign(value=None)
)
def declare_var_descructed(self, items):
# Return only the desturcted stuff
return items[1]
def bracket_accessor(self, items):
return items[0]
def accessor(self, items):
return items[0]
def var_based_access(self, items):
return _ast.Name(id=items[0], ctx=ast.Load())
def simple_access(self, items):
return items[0]
def access_dot(self, items):
if type(items[1]) is _ast.Name:
return _ast.Attribute(
value=items[0],
attr=items[1].id,
ctx=ast.Load()
)
return _ast.Attribute(
value=items[0],
attr=items[1],
ctx=ast.Load()
)
def access_bracket(self, items):
return _ast.Subscript(
value=items[0],
slice=items[1]
)
def rest_accessor(self, items):
return _ast.Call(
func=_ast.Name(id='deepcopy', ctx=ast.Load()),
args=[
items[0],
],
keywords=[]
)
def reassign(self, items):
return self.declare_var([None, None, items[0], items[1]])
def _function(self, items, type=None):
_constructor = _ast.AsyncFunctionDef if type == "async" else _ast.FunctionDef
(export, name, generic_type, func_args, ret_type, body) = items
self.logger.debug("_function", name, func_args, body)
kwargs = {
"name": name.id,
"body": self._adapt_body(body),
"args": func_args,
"decorator_list": []
}
ret = _constructor(** kwargs)
# Register the Function.
self._anonymous_func_to_ids[ret] = name
self._ids_to_anonymous_func[name] = ret
# self._anonymous_func_tree[ret] = name
return ret
def function(self, items):
self.log("function", items)
return self._function(items)
def async_function(self, items):
self.log("async_function", items)
return self._function(items, "async")
def _arrow_function(self, items, type=None):
self.log("_arrow_function", items)
print(items)
# TODO: Get name for callback
name = _ast.Name(id=self._get_func_name())
return self._function([None, name, *items], type)
def arrow_function(self, items):
self.log("arrow_function", items)
return self._arrow_function(items)
def arrow_function_one_arg(self, items):
self.log("arrow_function", items)
return self._arrow_function(items)
def async_arrow_function(self, items):
self.log("async_arrow_function", items)
return self._arrow_function(items, "async")
def func_args(self, items):
ret = {
"args": [],
"defaults": [],
"vararg": []
}
for item in items:
ret["args"] += item["args"]
ret["defaults"] += item["defaults"]
ret["vararg"] += item["vararg"]
if len(ret["vararg"]) == 0:
ret.pop("vararg")
return _ast.arguments(**ret)
def func_arg(self, items):
return items
def default_func_arg(self, items):
self.log("default_func_arg", items)
# TODO: Implement
return {
"args": [_ast.arg(arg=items[0].id)],
"defaults": [],
"vararg": []
}
def rest_func_arg(self, items):
self.log("rest_func_arg", items)
return {
"vararg": [_ast.arg(arg=items[0].id)],
"args": [],
"defaults": []
}
def assigend_func_arg(self, items):
self.log("assigend_func_arg", items)
return {
"args": [_ast.arg(arg=items[0].id)],
"defaults": [items[1]],
"vararg": []
}
def implicit_or_typed(self, items):
return items[0]
def func_body(self, items):
ret = []
elements = items[0]
self.log("func_body", elements)
for element in elements:
if type(element) is list:
ret += element
else:
ret.append(element)
return ret
def func_statements(self, items):
return items
def func_statement(self, items):
return items
def function_call(self, items):
self.log("function_call", items)
id = items[0]
args = items[-1] if items[-1] is not None else []
# Adapt the ID:
if type(id) is _ast.Name:
id = id
ret = _ast.Call(
func=id,
args=self._adapt_items(args),
keywords=[]
)
return _ast.Call(
func=id,
args=self._adapt_items(args),
keywords=[]
)
def call_args(self, items):
self.log("call_args", items)
return items
def call_arg(self, items):
self.log("call_arg", items)
return items[0]
def rest_call_arg(self, items):
return _ast.Starred(
value=items[0]
)
def default_for(self, items):
pass
def multi_for(self, items):
pass
def ranged_for(self, items):
pass
def for_iter_type(self, items):
pass
def for_iter_var(self, items):
pass
def while_statement(self, items):
self.log("while_statement", items)
self.logger.debug(items[0])
self.logger.debug(items[1])
return _ast.While(
test=items[0],
body=self._adapt_body(
items[1] if type(items) == list else [items[1]]
),
orelse=[]
)
def iter_body(self, items):
self.log("iter_body", items)
if items[0] is None:
return []
return items[0]
def iter_statements(self, items):
return items
def iter_statement(self, items):
return items[0]
def continue_statement(self, items):
return _ast.Continue()
def break_statement(self, items):
return _ast.Break()
def if_statement(self, items):
test = items[0]
body = items[1]
elifs = items[2]
else_body = items[3] if items[3] is not None else []
inclused_elif = elifs is not None
if inclused_elif:
def _rec(idx):
(_test, _body) = elifs[idx]
if idx < len(elifs) - 1:
return _ast.If(
test=_test,
body=self._adapt_body(_body),
orelse=[
_rec(idx+1)
]
)
# Now we know, that we are working with
# the last item
return _ast.If(
test=_test,
body=self._adapt_body(_body),
orelse=else_body
)
return _ast.If(
test=test,
body=self._adapt_body(body),
orelse=[
_rec(0)
]
)
return _ast.If(
test=test,
body=self._adapt_body(body),
orelse=else_body
)
def else_if_statements(self, items):
return items
def else_if_statement(self, items):
return items
def else_statement(self, items):
return items[0]
def if_body(self, items):
return items
def if_body_single(self, items):
return items
def inline_if(self, items):
_items = self._adapt_body(items)
ret = _ast.IfExp(
test=_items[0],
body=_items[1],
orelse=_items[2]
)
self._add_to_tree(ret, _items)
return ret
def new_class(self, items):
identifier = items[0]
args = items[1] if items[1] is not None else []
return _ast.Call(func=_ast.Name(id=identifier), args=args, keywords=[])
def switch(self, items):
raise Exception("!!SWITCH_CASE!!")
return _ast.Constant(value="SWITCH")
def switch_body(self, items):
return _ast.Break()
def switch_case(self, items):
return _ast.Break()
def switch_default(self, items):
return _ast.Break()
def switch_case_statements(self, items):
return _ast.Break()
def switch_case_statement(self, items):
return _ast.Break()
def try_catch(self, items):
return _ast.Break()
def try_catch_body(self, items):
return
def throw_statement(self, items):
return _ast.Raise(
exc=items[0]
)
def throw_error_statement(self, items):
return _ast.Raise(
exc=items[0]
)
class DebuggedCodeTransformeTs(Transformer):
def __init__(self, visit_tokens: bool = True) -> None:
super().__init__(visit_tokens)
self.logger = get_logger("DebugWrapper")
def __getattribute__(self, __name: str):
logger = object.__getattribute__(self, "logger")
try:
func = object.__getattribute__(self, __name)
if callable(func):
def cb(items):
logger.info(f"Calling function '{__name}'")
try:
logger.info(f"received parameters => {len(items)}")
except:
pass
return func(items)
return cb
return func
except:
pass
logger.warn(f"'{__name}' has not been found!")
raise KeyError(f"'{__name}' has not been found!")
def transform(tree, debug):
transformer = CodeTransformeTs(
True, logging.DEBUG if debug else logging.INFO)
program = transformer.transform(tree)
if debug:
print(ast.dump(program, indent=2))
code = astor.to_source(program)
if debug:
print(code)
return code