8000 Single value representing either success or failure + dosync short-ci… · alaisi/postgres.async@a204d42 · GitHub
[go: up one dir, main page]

Skip to content

Commit a204d42

Browse files
committed
Single value representing either success or failure + dosync short-circuiting test
1 parent 28b4dfe commit a204d42

File tree

3 files changed

+30
-26
lines changed

3 files changed

+30
-26
lines changed

src/postgres/async.clj

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
(ns postgres.async
2-
(:require [postgres.async.impl :refer [consumer-fn defasync] :as pg])
2+
(:require [postgres.async.impl :refer [consumer-fn defasync] :as pg]
3+
[clojure.core.async :refer [<!]])
34
(:import [com.github.pgasync Db ConnectionPoolBuilder
45
QueryExecutor TransactionExecutor Transaction]
56
[com.github.pgasync.impl.conversion DataConverter]))
@@ -106,11 +107,12 @@
106107
(defasync <rollback! [tx])
107108

108109
(defmacro dosql
109-
"Takes values from channels returned by db functions and returns [nil exception]
110-
on first error. Returns [result-of-body nil] on success."
110+
"Takes values from channels returned by db functions and returns exception
111+
on first error. Returns the result of evaluating the given forms on success"
111112
[bindings & forms]
112-
(let [err (gensym "e")]
113-
`(let [~@(pg/async-sql-bindings bindings err)]
114-
(if ~err
115-
[nil ~err]
116-
[(do ~@forms) nil]))))
113+
(if-let [[l r & bindings] (not-empty bindings)]
114+
`(let [~l (<! ~r)]
115+
(if (instance? Throwable ~l)
116+
~l
117+
(dosql ~bindings ~@forms)))
118+
`(do ~@forms)))

src/postgres/async/impl.clj

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
(ns postgres.async.impl
22
(:require [clojure.string :as string]
3-
[clojure.core.async :refer [chan put! go <!]])
3+
[clojure.core.async :refer [chan put! go]])
44
(:import [java.util.function Consumer]
55
[com.github.pgasync ResultSet]
66
[com.github.pgasync.impl PgRow]))
77

88
(defmacro defasync [name args]
99
`(defn ~name [~@args]
1010
(let [c# (chan 1)]
11-
(~(symbol (subs (str name) 1)) ~@args #(put! c# [%1 %2]))
11+
(~(symbol (subs (str name) 1)) ~@args #(put! c# (or %2 %1)))
1212
c#)))
1313

1414
(defmacro consumer-fn [[param] body]
@@ -60,14 +60,3 @@
6060
" WHERE " (first where)
6161
(when returning
6262
(str " RETURNING " returning))))
63-
64-
(defn async-sql-bindings
65-
"Converts bindings x (f) to [x err] (if [err] [nil err] (<! (f)))"
66-
[bindings err]
67-
(let [vars (map (fn [v]
68-
[v err])
69-
(take-nth 2 bindings))
70-
fs (map (fn [f]
71-
`(if ~err [nil ~err] (<! ~f)))
72-
(take-nth 2 (rest bindings)))]
73-
(list* [err err] [nil nil] (interleave vars fs))))

test/postgres/async_test.clj

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@
77
(def table "clj_pg_test")
88

99
(defn- wait [channel]
10-
(let [[r err] (<!! channel)]
11-
(if err
12-
(throw err)
13-
r)))
10+
(let [r (<!! channel)]
11+
(if (instance? Throwable r)
12+
(throw r))
13+
r))
1414

1515
(defn- create-tables [db]
1616
(wait (<execute! db [(str "drop table if exists " table)]))
@@ -71,4 +71,17 @@
7171
rs (<query! tx ["select 123 as x"])
7272
rs (<query! tx ["select $1::text as t" (:x (first rs))])
7373
_ (<commit! tx)]
74-
(:t (first rs)))))))))
74+
(:t (first rs))))))))
75+
(testing "dosql short-circuits on errors"
76+
(let [e (Exception. "Oops!")
77+
executed (atom 0)]
78+
(is (= (try
79+
(wait (go (dosql
80+
[_ (<query! *db* ["select 123 as t"])
81+
_ (go e)
82+
_ (swap! executed inc)]
83+
"Just hanging out")))
84+
(catch Exception caught
85+
{:caught caught}))
86+
{:caught e}))
87+
(is (= @executed 0)))))

0 commit comments

Comments
 (0)
0