Skip to content

lint #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 98 additions & 53 deletions pgml/pgml/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,19 @@
from pgml.exceptions import PgMLException
from pgml.sql import q


class Project(object):
"""
Use projects to refine multiple models of a particular dataset on a specific objective.

Attributes:
id (int): a unique identifier
name (str): a human friendly unique identifier
objective (str): the purpose of this project
created_at (Timestamp): when this project was created
updated_at (Timestamp): when this project was last updated
"""

_cache = {}

def __init__(self):
Expand All @@ -36,11 +37,14 @@ def find(cls, id: int):
Returns:
Project or None: instantiated from the database if found
"""
result = plpy.execute(f"""
result = plpy.execute(
f"""
SELECT *
FROM pgml.projects
WHERE id = {q(id)}
""", 1)
""",
1,
)
if len(result) == 0:
return None

Expand All @@ -53,25 +57,28 @@ def find(cls, id: int):
@classmethod
def find_by_name(cls, name: str):
"""
Get a Project from the database by name.
Get a Project from the database by name.

This is the prefered API to retrieve projects, and they are cached by
name to avoid needing to go to he database on every usage.

Args:
name (str): the project name
Returns:
Project or None: instantiated from the database if found
"""
if name in cls._cache:
return cls._cache[name]

result = plpy.execute(f"""

result = plpy.execute(
f"""
SELECT *
FROM pgml.projects
WHERE name = {q(name)}
""", 1)
if len(result)== 0:
""",
1,
)
if len(result) == 0:
return None

project = Project()
Expand All @@ -84,7 +91,7 @@ def find_by_name(cls, name: str):
def create(cls, name: str, objective: str):
"""
Create a Project and save it to the database.

Args:
name (str): a human friendly identifier
objective (str): valid values are ["regression", "classification"].
Expand All @@ -93,11 +100,16 @@ def create(cls, name: str, objective: str):
"""

project = Project()
project.__dict__ = dict(plpy.execute(f"""
project.__dict__ = dict(
plpy.execute(
f"""
INSERT INTO pgml.projects (name, objective)
VALUES ({q(name)}, {q(objective)})
RETURNING *
""", 1)[0])
""",
1,
)[0]
)
project.__init__()
cls._cache[name] = project
return project
Expand All @@ -112,10 +124,11 @@ def deployed_model(self):
self._deployed_model = Model.find_deployed(self.id)
return self._deployed_model


class Snapshot(object):
"""
Snapshots capture a set of training & test data for repeatability.

Attributes:
id (int): a unique identifier
relation_name (str): the name of the table or view to snapshot
Expand All @@ -126,11 +139,18 @@ class Snapshot(object):
created_at (Timestamp): when this snapshot was created
updated_at (Timestamp): when this snapshot was last updated
"""

@classmethod
def create(cls, relation_name: str, y_column_name: str, test_size: float or int, test_sampling: str):
def create(
cls,
relation_name: str,
y_column_name: str,
test_size: float or int,
test_sampling: str,
):
"""
Create a Snapshot and save it to the database.
Create a Snapshot and save it to the database.

This creates both a metadata record in the snapshots table, as well as creating a new table
that holds a snapshot of all the data currently present in the relation so that training
runs may be repeated, or further analysis may be conducted against the input.
Expand All @@ -145,32 +165,46 @@ def create(cls, relation_name: str, y_column_name: str, test_size: float or int,
"""

snapshot = Snapshot()
snapshot.__dict__ = dict(plpy.execute(f"""
snapshot.__dict__ = dict(
plpy.execute(
f"""
INSERT INTO pgml.snapshots (relation_name, y_column_name, test_size, test_sampling, status)
VALUES ({q(relation_name)}, {q(y_column_name)}, {q(test_size)}, {q(test_sampling)}, 'new')
RETURNING *
""", 1)[0])
plpy.execute(f"""
""",
1,
)[0]
)
plpy.execute(
f"""
CREATE TABLE pgml."snapshot_{snapshot.id}" AS
SELECT * FROM "{snapshot.relation_name}";
""")
snapshot.__dict__ = dict(plpy.execute(f"""
"""
)
snapshot.__dict__ = dict(
plpy.execute(
f"""
UPDATE pgml.snapshots
SET status = 'created'
WHERE id = {q(snapshot.id)}
RETURNING *
""", 1)[0])
""",
1,
)[0]
)
return snapshot

def data(self):
"""
Returns:
list, list, list, list: All rows from the snapshot split into X_train, X_test, y_train, y_test sets.
"""
data = plpy.execute(f"""
data = plpy.execute(
f"""
SELECT *
FROM pgml."snapshot_{self.id}"
""")
"""
)

print(data)
# Sanity check the data
Expand Down Expand Up @@ -203,10 +237,10 @@ def data(self):
y.append(y_)

# Split into training and test sets
if self.test_sampling == 'random':
if self.test_sampling == "random":
return train_test_split(X, y, test_size=self.test_size, random_state=0)
else:
if self.test_sampling == 'first':
if self.test_sampling == "first":
X.reverse()
y.reverse()
if isinstance(split, float):
Expand All @@ -216,9 +250,9 @@ def data(self):
split = int(self.test_size * X.len())
return X[:split], X[split:], y[:split], y[split:]


# TODO normalize and clean data


class Model(object):
"""Models use an algorithm on a snapshot of data to record the parameters learned.

Expand All @@ -234,23 +268,26 @@ class Model(object):
pickle (bytes): the serialized version of the model parameters
algorithm: the in memory version of the model parameters that can make predictions
"""

@classmethod
def create(cls, project: Project, snapshot: Snapshot, algorithm_name: str):
"""
Create a Model and save it to the database.

Args:
project (str):
snapshot (str):
project (str):
snapshot (str):
algorithm_name (str):
Returns:
Model: instantiated from the database
"""
result = plpy.execute(f"""
result = plpy.execute(
f"""
INSERT INTO pgml.models (project_id, snapshot_id, algorithm_name, status)
VALUES ({q(project.id)}, {q(snapshot.id)}, {q(algorithm_name)}, 'new')
RETURNING *
""")
"""
)
model = Model()
model.__dict__ = dict(result[0])
model.__init__()
Expand All @@ -265,15 +302,17 @@ def find_deployed(cls, project_id: int):
Returns:
Model: that should currently be used for predictions of the project
"""
result = plpy.execute(f"""
result = plpy.execute(
f"""
SELECT models.*
FROM pgml.models
JOIN pgml.deployments
ON deployments.model_id = models.id
AND deployments.project_id = {q(project_id)}
ORDER by deployments.created_at DESC
LIMIT 1
""")
"""
)
if len(result) == 0:
return None

Expand Down Expand Up @@ -303,19 +342,19 @@ def algorithm(self):
self._algorithm = pickle.loads(self.pickle)
else:
self._algorithm = {
'linear_regression': LinearRegression,
'random_forest_regression': RandomForestRegressor,
'random_forest_classification': RandomForestClassifier
}[self.algorithm_name + '_' + self.project.objective]()
"linear_regression": LinearRegression,
"random_forest_regression": RandomForestRegressor,
"random_forest_classification": RandomForestClassifier,
}[self.algorithm_name + "_" + self.project.objective]()

return self._algorithm

def fit(self, snapshot: Snapshot):
"""
Learns the parameters of this model and records them in the database.
Learns the parameters of this model and records them in the database.

Args:
snapshot (Snapshot): dataset used to train this model
Args:
snapshot (Snapshot): dataset used to train this model
"""
X_train, X_test, y_train, y_test = snapshot.data()

Expand All @@ -328,22 +367,28 @@ def fit(self, snapshot: Snapshot):
r2 = r2_score(y_test, y_pred)

# Save the model
self.__dict__ = dict(plpy.execute(f"""
self.__dict__ = dict(
plpy.execute(
f"""
UPDATE pgml.models
SET pickle = '\\x{pickle.dumps(self.algorithm).hex()}',
status = 'successful',
mean_squared_error = {q(msq)},
r2_score = {q(r2)}
WHERE id = {q(self.id)}
RETURNING *
""")[0])
"""
)[0]
)

def deploy(self):
"""Promote this model to the active version for the project that will be used for predictions"""
plpy.execute(f"""
plpy.execute(
f"""
INSERT INTO pgml.deployments (project_id, model_id)
VALUES ({q(self.project_id)}, {q(self.id)})
""")
"""
)

def predict(self, data: list):
"""Use the model for a set of features.
Expand All @@ -358,12 +403,12 @@ def predict(self, data: list):


def train(
project_name: str,
project_name: str,
objective: str,
relation_name: str,
y_column_name: str,
relation_name: str,
y_column_name: str,
test_size: float or int = 0.1,
test_sampling: str = "random"
test_sampling: str = "random",
):
"""Create a regression model from a table or view filled with training data.

Expand All @@ -390,5 +435,5 @@ def train(
model.fit(snapshot)
if best_error is None or model.mean_squared_error < best_error:
best_error = model.mean_squared_error
best_model = model
best_model = model
best_model.deploy()
1 change: 1 addition & 0 deletions pgml/pgml/sql.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from plpy import quote_literal


def q(obj):
if type(obj) == str:
return quote_literal(obj)
Expand Down
7 changes: 5 additions & 2 deletions pgml/tests/plpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@

execute_results = deque()


def quote_literal(literal):
return "'" + literal + "'"

def execute(sql, lines = 0):

def execute(sql, lines=0):
if len(execute_results) > 0:
result = execute_results.popleft()
return result
else:
else:
return []


def add_mock_result(result):
execute_results.append(result)
Loading