File tree Expand file tree Collapse file tree 3 files changed +30
-26
lines changed Expand file tree Collapse file tree 3 files changed +30
-26
lines changed Original file line number Diff line number Diff line change 1
1
(ns postgres.async
2
- (:require [postgres.async.impl :refer [consumer-fn defasync ] :as pg])
2
+ (:require [postgres.async.impl :refer [consumer-fn defasync ] :as pg]
3
+ [clojure.core.async :refer [<!]])
3
4
(:import [com.github.pgasync Db ConnectionPoolBuilder
4
5
QueryExecutor TransactionExecutor Transaction]
5
6
[com.github.pgasync.impl.conversion DataConverter]))
106
107
(defasync <rollback! [tx])
107
108
108
109
(defmacro dosql
109
- " Takes values from channels returned by db functions and returns [nil exception]
110
- on first error. Returns [ result-of-body nil] on success. "
110
+ " Takes values from channels returned by db functions and returns exception
111
+ on first error. Returns the result of evaluating the given forms on success"
111
112
[bindings & forms]
112
- (let [err (gensym " e" )]
113
- `(let [~@(pg/async-sql-bindings bindings err)]
114
- (if ~err
115
- [nil ~err]
116
- [(do ~@forms) nil ]))))
113
+ (if-let [[l r & bindings] (not-empty bindings)]
114
+ `(let [~l (<! ~r)]
115
+ (if (instance? Throwable ~l)
116
+ ~l
117
+ (dosql ~bindings ~@forms)))
118
+ `(do ~@forms)))
Original file line number Diff line number Diff line change 1
1
(ns postgres.async.impl
2
2
(:require [clojure.string :as string]
3
- [clojure.core.async :refer [chan put! go <! ]])
3
+ [clojure.core.async :refer [chan put! go]])
4
4
(:import [java.util.function Consumer]
5
5
[com.github.pgasync ResultSet]
6
6
[com.github.pgasync.impl PgRow]))
7
7
8
8
(defmacro defasync [name args]
9
9
`(defn ~name [~@args]
10
10
(let [c# (chan 1 )]
11
- (~(symbol (subs (str name) 1 )) ~@args #(put! c# [% 1 %2 ] ))
11
+ (~(symbol (subs (str name) 1 )) ~@args #(put! c# ( or %2 % 1 ) ))
12
12
c#)))
13
13
14
14
(defmacro consumer-fn [[param] body]
60
60
" WHERE " (first where)
61
61
(when returning
62
62
(str " RETURNING " returning))))
63
-
64
- (defn async-sql-bindings
65
- " Converts bindings x (f) to [x err] (if [err] [nil err] (<! (f)))"
66
- [bindings err]
67
- (let [vars (map (fn [v]
68
- [v err])
69
- (take-nth 2 bindings))
70
- fs (map (fn [f]
71
- `(if ~err [nil ~err] (<! ~f)))
72
- (take-nth 2 (rest bindings)))]
73
- (list* [err err] [nil nil ] (interleave vars fs))))
Original file line number Diff line number Diff line change 7
7
(def table " clj_pg_test" )
8
8
9
9
(defn- wait [channel]
10
- (let [[r err] (<!! channel)]
11
- (if err
12
- (throw err )
13
- r) ))
10
+ (let [r (<!! channel)]
11
+ (if ( instance? Throwable r)
12
+ (throw r) )
13
+ r ))
14
14
15
15
(defn- create-tables [db]
16
16
(wait (<execute! db [(str " drop table if exists " table)]))
71
71
rs (<query! tx [" select 123 as x" ])
72
72
rs (<query! tx [" select $1::text as t" (:x (first rs))])
73
73
_ (<commit! tx)]
74
- (:t (first rs)))))))))
74
+ (:t (first rs))))))))
75
+ (testing " dosql short-circuits on errors"
76
+ (let [e (Exception. " Oops!" )
77
+ executed (atom 0 )]
78
+ (is (= (try
79
+ (wait (go (dosql
80
+ [_ (<query! *db* [" select 123 as t" ])
81
+ _ (go e)
82
+ _ (swap! executed inc)]
83
+ " Just hanging out" )))
84
+ (catch Exception caught
85
+ {:caught caught}))
86
+ {:caught e}))
87
+ (is (= @executed 0 )))))
You can’t perform that action at this time.
0 commit comments