Skip to content

Commit 4c52bc9

Browse files
authored
SDK - Patch parallel uploads (#1468)
1 parent e79f1ca commit 4c52bc9

File tree

2 files changed

+21
-20
lines changed

2 files changed

+21
-20
lines changed

pgml-sdks/pgml/src/collection.rs

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -533,23 +533,21 @@ impl Collection {
533533

534534
let mut set = JoinSet::new();
535535
for batch in documents.chunks(batch_size as usize) {
536-
if set.len() < parallel_batches {
537-
let local_self = self.clone();
538-
let local_batch = batch.to_owned();
539-
let local_args = args.clone();
540-
let local_pipelines = pipelines.clone();
541-
let local_pool = pool.clone();
542-
set.spawn(async move {
543-
local_self
544-
._upsert_documents(local_batch, local_args, local_pipelines, local_pool)
545-
.await
546-
});
547-
} else {
548-
if let Some(res) = set.join_next().await {
549-
res??;
550-
progress_bar.inc(batch_size);
551-
}
536+
if set.len() >= parallel_batches {
537+
set.join_next().await.unwrap()??;
538+
progress_bar.inc(batch_size);
552539
}
540+
541+
let local_self = self.clone();
542+
let local_batch = batch.to_owned();
543+
let local_args = args.clone();
544+
let local_pipelines = pipelines.clone();
545+
let local_pool = pool.clone();
546+
set.spawn(async move {
547+
local_self
548+
._upsert_documents(local_batch, local_args, local_pipelines, local_pool)
549+
.await
550+
});
553551
}
554552

555553
while let Some(res) = set.join_next().await {

pgml-sdks/pgml/src/lib.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -442,7 +442,10 @@ mod tests {
442442
json!({
443443
"title": {
444444
"semantic_search": {
445-
"model": "intfloat/e5-small"
445+
"model": "intfloat/e5-small-v2",
446+
"parameters": {
447+
"prompt": "passage: "
448+
}
446449
}
447450
},
448451
"body": {
@@ -454,9 +457,9 @@ mod tests {
454457
}
455458
},
456459
"semantic_search": {
457-
"model": "hkunlp/instructor-base",
460+
"model": "intfloat/e5-small-v2",
458461
"parameters": {
459-
"instruction": "Represent the Wikipedia document for retrieval"
462+
"prompt": "passage: "
460463
}
461464
},
462465
"full_text_search": {
@@ -475,7 +478,7 @@ mod tests {
475478
documents.clone(),
476479
Some(
477480
json!({
478-
"batch_size": 4,
481+
"batch_size": 2,
479482
"parallel_batches": 5
480483
})
481484
.into(),

0 commit comments

Comments
 (0)