Skip to content

Commit 08d643e

Browse files
Dan rag page components (#1543)
1 parent 47cfb6e commit 08d643e

File tree

41 files changed

+1761
-46
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1761
-46
lines changed

pgml-dashboard/Cargo.lock

Lines changed: 11 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pgml-dashboard/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ sentry = "0.31"
4343
sentry-log = "0.31"
4444
sentry-anyhow = "0.31"
4545
serde_json = "1"
46+
sqlparser = "0.38"
4647
sqlx = { version = "0.7.3", features = [ "runtime-tokio-rustls", "postgres", "json", "migrate", "time", "uuid", "bigdecimal"] }
4748
tantivy = "0.19"
4849
time = "0.3"

pgml-dashboard/src/api/code_editor.rs

Lines changed: 285 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
use crate::components::code_editor::Editor;
2+
use crate::components::turbo::TurboFrame;
3+
use anyhow::Context;
4+
use once_cell::sync::OnceCell;
5+
use sailfish::TemplateOnce;
6+
use serde::Serialize;
7+
use sqlparser::dialect::PostgreSqlDialect;
8+
use sqlx::{postgres::PgPoolOptions, Executor, PgPool, Row};
9+
10+
use crate::responses::ResponseOk;
11+
12+
use rocket::route::Route;
13+
14+
static READONLY_POOL: OnceCell<PgPool> = OnceCell::new();
15+
static ERROR: &str =
16+
"Thanks for trying PostgresML! If you would like to run more queries, sign up for an account and create a database.";
17+
18+
fn get_readonly_pool() -> PgPool {
19+
READONLY_POOL
20+
.get_or_init(|| {
21+
PgPoolOptions::new()
22+
.max_connections(1)
23+
.idle_timeout(std::time::Duration::from_millis(60_000))
24+
.max_lifetime(std::time::Duration::from_millis(60_000))
25+
.connect_lazy(&std::env::var("CHATBOT_DATABASE_URL").expect("CHATBOT_DATABASE_URL not set"))
26+
.expect("could not build lazy database connection")
27+
})
28+
.clone()
29+
}
30+
31+
fn check_query(query: &str) -> anyhow::Result<()> {
32+
let ast = sqlparser::parser::Parser::parse_sql(&PostgreSqlDialect {}, query)?;
33+
34+
if ast.len() != 1 {
35+
anyhow::bail!(ERROR);
36+
}
37+
38+
let query = ast
39+
.into_iter()
40+
.next()
41+
.with_context(|| "impossible, ast is empty, even though we checked")?;
42+
43+
match query {
44+
sqlparser::ast::Statement::Query(query) => match *query.body {
45+
sqlparser::ast::SetExpr::Select(_) => (),
46+
_ => anyhow::bail!(ERROR),
47+
},
48+
_ => anyhow::bail!(ERROR),
49+
};
50+
51+
Ok(())
52+
}
53+
54+
#[derive(FromForm, Debug)]
55+
pub struct PlayForm {
56+
pub query: String,
57+
}
58+
59+
pub async fn play(sql: &str) -> anyhow::Result<String> {
60+
check_query(sql)?;
61+
let pool = get_readonly_pool();
62+
let row = sqlx::query(sql).fetch_one(&pool).await?;
63+
let transform: serde_json::Value = row.try_get(0)?;
64+
Ok(serde_json::to_string_pretty(&transform)?)
65+
}
66+
67+
/// Response expected by the frontend.
68+
#[derive(Serialize)]
69+
struct StreamResponse {
70+
error: Option<String>,
71+
result: Option<String>,
72+
}
73+
74+
impl StreamResponse {
75+
fn from_error(error: &str) -> Self {
76+
StreamResponse {
77+
error: Some(error.to_string()),
78+
result: None,
79+
}
80+
}
81+
82+
fn from_result(result: &str) -> Self {
83+
StreamResponse {
84+
error: None,
85+
result: Some(result.to_string()),
86+
}
87+
}
88+
}
89+
90+
impl ToString for StreamResponse {
91+
fn to_string(&self) -> String {
92+
serde_json::to_string(self).unwrap()
93+
}
94+
}
95+
96+
/// An async iterator over a PostgreSQL cursor.
97+
#[derive(Debug)]
98+
struct AsyncResult<'a> {
99+
/// Open transaction.
100+
transaction: sqlx::Transaction<'a, sqlx::Postgres>,
101+
cursor_name: String,
102+
}
103+
104+
impl<'a> AsyncResult<'a> {
105+
async fn from_message(message: ws::Message) -> anyhow::Result<Self> {
106+
if let ws::Message::Text(query) = message {
107+
let request = serde_json::from_str::<serde_json::Value>(&query)?;
108+
let query = request["sql"]
109+
.as_str()
110+
.context("Error sql key is required in websocket")?;
111+
Self::new(&query).await
112+
} else {
113+
anyhow::bail!(ERROR)
114+
}
115+
}
116+
117+
/// Create new AsyncResult given a query.
118+
async fn new(query: &str) -> anyhow::Result<Self> {
119+
let cursor_name = format!(r#""{}""#, crate::utils::random_string(12));
120+
121+
// Make sure it's a SELECT. Can't do too much damage there.
122+
check_query(query)?;
123+
124+
let pool = get_readonly_pool();
125+
let mut transaction = pool.begin().await?;
126+
127+
let query = format!("DECLARE {} CURSOR FOR {}", cursor_name, query);
128+
129+
info!(
130+
"[stream] query: {}",
131+
query.trim().split("\n").collect::<Vec<&str>>().join(" ")
132+
);
133+
134+
match transaction.execute(query.as_str()).await {
135+
Ok(_) => (),
136+
Err(err) => {
137+
info!("[stream] query error: {:?}", err);
138+
anyhow::bail!(err);
139+
}
140+
}
141+
142+
Ok(AsyncResult {
143+
transaction,
144+
cursor_name,
145+
})
146+
}
147+
148+
/// Fetch a row from the cursor, get the first column,
149+
/// decode the value and return it as a String.
150+
async fn next(&mut self) -> anyhow::Result<Option<String>> {
151+
use serde_json::Value;
152+
153+
let result = sqlx::query(format!("FETCH 1 FROM {}", self.cursor_name).as_str())
154+
.fetch_optional(&mut *self.transaction)
155+
.await?;
156+
157+
if let Some(row) = result {
158+
let _column = row.columns().get(0).with_context(|| "no columns")?;
159+
160+
// Handle pgml.embed() which returns an array of floating points.
161+
if let Ok(value) = row.try_get::<Vec<f32>, _>(0) {
162+
return Ok(Some(serde_json::to_string(&value)?));
163+
}
164+
165+
// Anything that just returns a String, e.g. pgml.version().
166+
if let Ok(value) = row.try_get::<String, _>(0) {
167+
return Ok(Some(value));
168+
}
169+
170+
// Array of strings.
171+
if let Ok(value) = row.try_get::<Vec<String>, _>(0) {
172+
return Ok(Some(value.join("")));
173+
}
174+
175+
// Integers.
176+
if let Ok(value) = row.try_get::<i64, _>(0) {
177+
return Ok(Some(value.to_string()));
178+
}
179+
180+
if let Ok(value) = row.try_get::<i32, _>(0) {
181+
return Ok(Some(value.to_string()));
182+
}
183+
184+
if let Ok(value) = row.try_get::<f64, _>(0) {
185+
return Ok(Some(value.to_string()));
186+
}
187+
188+
if let Ok(value) = row.try_get::<f32, _>(0) {
189+
return Ok(Some(value.to_string()));
190+
}
191+
192+
// Handle functions that return JSONB,
193+
// e.g. pgml.transform()
194+
if let Ok(value) = row.try_get::<Value, _>(0) {
195+
return Ok(Some(match value {
196+
Value::Array(ref values) => {
197+
let first_value = values.first();
198+
match first_value {
199+
Some(Value::Object(_)) => serde_json::to_string(&value)?,
200+
_ => values
201+
.into_iter()
202+
.map(|v| v.as_str().unwrap_or("").to_string())
203+
.collect::<Vec<String>>()
204+
.join(""),
205+
}
206+
}
207+
208+
value => serde_json::to_string(&value)?,
209+
}));
210+
}
211+
}
212+
213+
Ok(None)
214+
}
215+
216+
async fn close(mut self) -> anyhow::Result<()> {
217+
self.transaction
218+
.execute(format!("CLOSE {}", self.cursor_name).as_str())
219+
.await?;
220+
self.transaction.rollback().await?;
221+
Ok(())
222+
}
223+
}
224+
225+
#[get("/code_editor/play/stream")]
226+
pub async fn play_stream(ws: ws::WebSocket) -> ws::Stream!['static] {
227+
ws::Stream! { ws =>
228+
for await message in ws {
229+
let message = match message {
230+
Ok(message) => message,
231+
Err(_err) => continue,
232+
};
233+
234+
let mut got_something = false;
235+
match AsyncResult::from_message(message).await {
236+
Ok(mut result) => {
237+
loop {
238+
match result.next().await {
239+
Ok(Some(result)) => {
240+
got_something = true;
241+
yield ws::Message::from(StreamResponse::from_result(&result).to_string());
242+
}
243+
244+
Err(err) => {
245+
yield ws::Message::from(StreamResponse::from_error(&err.to_string()).to_string());
246+
break;
247+
}
248+
249+
Ok(None) => {
250+
if !got_something {
251+
yield ws::Message::from(StreamResponse::from_error(ERROR).to_string());
252+
}
253+
break;
254+
}
255+
}
256+
};
257+
258+
match result.close().await {
259+
Ok(_) => (),
260+
Err(err) => {
261+
info!("[stream] error closing: {:?}", err);
262+
}
263+
};
264+
}
265+
266+
Err(err) => {
267+
yield ws::Message::from(StreamResponse::from_error(&err.to_string()).to_string());
268+
}
269+
}
270+
};
271+
}
272+
}
273+
274+
#[get("/code_editor/embed?<id>")]
275+
pub fn embed_editor(id: String) -> ResponseOk {
276+
let comp = Editor::new();
277+
278+
let rsp = TurboFrame::new().set_target_id(&id).set_content(comp.into());
279+
280+
return ResponseOk(rsp.render_once().unwrap());
281+
}
282+
283+
pub fn routes() -> Vec<Route> {
284+
routes![play_stream, embed_editor,]
285+
}

pgml-dashboard/src/api/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@ use rocket::route::Route;
22

33
pub mod chatbot;
44
pub mod cms;
5+
pub mod code_editor;
56
pub mod deployment;
67

78
pub fn routes() -> Vec<Route> {
89
let mut routes = Vec::new();
910
routes.extend(cms::routes());
1011
routes.extend(chatbot::routes());
12+
routes.extend(code_editor::routes());
1113
routes
1214
}

pgml-dashboard/src/components/cards/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ pub use newsletter_subscribe::NewsletterSubscribe;
1515
pub mod primary;
1616
pub use primary::Primary;
1717

18+
// src/components/cards/psychedelic
19+
pub mod psychedelic;
20+
pub use psychedelic::Psychedelic;
21+
1822
// src/components/cards/rgb
1923
pub mod rgb;
2024
pub use rgb::Rgb;

0 commit comments

Comments
 (0)