Skip to content

Commit e947df4

Browse files
authored
feat: Configurable import strict mode and max import message count (#56)
* Configurable import strict mode * Configurable import max message count
1 parent 00f2e90 commit e947df4

File tree

3 files changed

+446
-30
lines changed

3 files changed

+446
-30
lines changed
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package com.mixpanel.mixpanelapi;
2+
3+
/**
4+
* Options for configuring how messages are delivered to Mixpanel.
5+
* Use the {@link Builder} to create instances.
6+
*
7+
* <p>Different options apply to different message types:
8+
* <ul>
9+
* <li>{@code importStrictMode} - Only applies to import messages</li>
10+
* <li>{@code useIpAddress} - Only applies to events, people, and groups messages (NOT imports)</li>
11+
* </ul>
12+
*
13+
* <p>Example usage:
14+
* <pre>{@code
15+
* DeliveryOptions options = new DeliveryOptions.Builder()
16+
* .importStrictMode(false) // Disable strict validation for imports
17+
* .useIpAddress(true) // Use IP address for geolocation (events/people/groups only)
18+
* .build();
19+
*
20+
* mixpanelApi.deliver(delivery, options);
21+
* }</pre>
22+
*/
23+
public class DeliveryOptions {
24+
25+
private final boolean mImportStrictMode;
26+
private final boolean mUseIpAddress;
27+
28+
private DeliveryOptions(Builder builder) {
29+
mImportStrictMode = builder.importStrictMode;
30+
mUseIpAddress = builder.useIpAddress;
31+
}
32+
33+
/**
34+
* Returns whether strict mode is enabled for import messages.
35+
*
36+
* <p><strong>Note:</strong> This option only applies to import messages (historical events).
37+
* It has no effect on regular events, people, or groups messages.
38+
*
39+
* <p>When strict mode is enabled (default), the /import endpoint validates each event
40+
* and returns a 400 error if any event has issues. Correctly formed events are still
41+
* ingested, and problematic events are returned in the response with error messages.
42+
*
43+
* <p>When strict mode is disabled, validation is bypassed and all events are imported
44+
* regardless of their validity.
45+
*
46+
* @return true if strict mode is enabled for imports, false otherwise
47+
*/
48+
public boolean isImportStrictMode() {
49+
return mImportStrictMode;
50+
}
51+
52+
/**
53+
* Returns whether the IP address should be used for geolocation.
54+
*
55+
* <p><strong>Note:</strong> This option only applies to events, people, and groups messages.
56+
* It does NOT apply to import messages, which use Basic Auth and don't support the ip parameter.
57+
*
58+
* @return true if IP address should be used for geolocation, false otherwise
59+
*/
60+
public boolean useIpAddress() {
61+
return mUseIpAddress;
62+
}
63+
64+
/**
65+
* Builder for creating {@link DeliveryOptions} instances.
66+
*/
67+
public static class Builder {
68+
private boolean importStrictMode = true;
69+
private boolean useIpAddress = false;
70+
71+
/**
72+
* Sets whether to use strict mode for import messages.
73+
*
74+
* will validate the supplied events and return a 400 status code if any of the events fail validation with details of the error
75+
*
76+
* <p>Setting this value to true (default) will validate the supplied events and return
77+
* a 400 status code if any of the events fail validation with details of the error.
78+
* Setting this value to false disables validation.
79+
*
80+
* @param importStrictMode true to enable strict validation (default), false to disable
81+
* @return this Builder instance for method chaining
82+
*/
83+
public Builder importStrictMode(boolean importStrictMode) {
84+
this.importStrictMode = importStrictMode;
85+
return this;
86+
}
87+
88+
/**
89+
* Sets whether to use the IP address for geolocation.
90+
*
91+
* <p><strong>Note:</strong> This option only applies to events, people, and groups messages.
92+
* It does NOT apply to import messages.
93+
*
94+
* <p>When enabled, Mixpanel will use the IP address of the request to set
95+
* geolocation properties on events and profiles.
96+
*
97+
* @param useIpAddress true to use IP address for geolocation, false otherwise (default)
98+
* @return this Builder instance for method chaining
99+
*/
100+
public Builder useIpAddress(boolean useIpAddress) {
101+
this.useIpAddress = useIpAddress;
102+
return this;
103+
}
104+
105+
/**
106+
* Builds and returns a new {@link DeliveryOptions} instance.
107+
*
108+
* @return a new DeliveryOptions with the configured settings
109+
*/
110+
public DeliveryOptions build() {
111+
return new DeliveryOptions(this);
112+
}
113+
}
114+
}

src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java

Lines changed: 94 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public class MixpanelAPI implements AutoCloseable {
5353
protected final boolean mUseGzipCompression;
5454
protected final Integer mConnectTimeout;
5555
protected final Integer mReadTimeout;
56+
protected Integer mImportMaxMessageCount;
5657
protected final LocalFlagsProvider mLocalFlags;
5758
protected final RemoteFlagsProvider mRemoteFlags;
5859
protected final JsonSerializer mJsonSerializer;
@@ -71,7 +72,7 @@ public MixpanelAPI() {
7172
* @param useGzipCompression whether to use gzip compression for network requests
7273
*/
7374
public MixpanelAPI(boolean useGzipCompression) {
74-
this(null, null, null, null, useGzipCompression, null, null, null, null, null);
75+
this(null, null, null, null, useGzipCompression, null, null, null, null, null, null);
7576
}
7677

7778
/**
@@ -100,7 +101,7 @@ public MixpanelAPI(RemoteFlagsConfig remoteFlagsConfig) {
100101
* @param remoteFlagsConfig configuration for remote feature flags evaluation (can be null)
101102
*/
102103
private MixpanelAPI(LocalFlagsConfig localFlagsConfig, RemoteFlagsConfig remoteFlagsConfig) {
103-
this(null, null, null, null, false, localFlagsConfig, remoteFlagsConfig, null, null, null);
104+
this(null, null, null, null, false, localFlagsConfig, remoteFlagsConfig, null, null, null, null);
104105
}
105106

106107
/**
@@ -113,7 +114,7 @@ private MixpanelAPI(LocalFlagsConfig localFlagsConfig, RemoteFlagsConfig remoteF
113114
* @see #MixpanelAPI()
114115
*/
115116
public MixpanelAPI(String eventsEndpoint, String peopleEndpoint) {
116-
this(eventsEndpoint, peopleEndpoint, null, null, false, null, null, null, null, null);
117+
this(eventsEndpoint, peopleEndpoint, null, null, false, null, null, null, null, null, null);
117118
}
118119

119120
/**
@@ -127,7 +128,7 @@ public MixpanelAPI(String eventsEndpoint, String peopleEndpoint) {
127128
* @see #MixpanelAPI()
128129
*/
129130
public MixpanelAPI(String eventsEndpoint, String peopleEndpoint, String groupsEndpoint) {
130-
this(eventsEndpoint, peopleEndpoint, groupsEndpoint, null, false, null, null, null, null, null);
131+
this(eventsEndpoint, peopleEndpoint, groupsEndpoint, null, false, null, null, null, null, null, null);
131132
}
132133

133134
/**
@@ -142,7 +143,7 @@ public MixpanelAPI(String eventsEndpoint, String peopleEndpoint, String groupsEn
142143
* @see #MixpanelAPI()
143144
*/
144145
public MixpanelAPI(String eventsEndpoint, String peopleEndpoint, String groupsEndpoint, String importEndpoint) {
145-
this(eventsEndpoint, peopleEndpoint, groupsEndpoint, importEndpoint, false, null, null, null, null, null);
146+
this(eventsEndpoint, peopleEndpoint, groupsEndpoint, importEndpoint, false, null, null, null, null, null, null);
146147
}
147148

148149
/**
@@ -158,7 +159,7 @@ public MixpanelAPI(String eventsEndpoint, String peopleEndpoint, String groupsEn
158159
* @see #MixpanelAPI()
159160
*/
160161
public MixpanelAPI(String eventsEndpoint, String peopleEndpoint, String groupsEndpoint, String importEndpoint, boolean useGzipCompression) {
161-
this(eventsEndpoint, peopleEndpoint, groupsEndpoint, importEndpoint, useGzipCompression, null, null, null, null, null);
162+
this(eventsEndpoint, peopleEndpoint, groupsEndpoint, importEndpoint, useGzipCompression, null, null, null, null, null, null);
162163
}
163164

164165
/**
@@ -168,16 +169,17 @@ public MixpanelAPI(String eventsEndpoint, String peopleEndpoint, String groupsEn
168169
*/
169170
private MixpanelAPI(Builder builder) {
170171
this(
171-
builder.eventsEndpoint,
172-
builder.peopleEndpoint,
173-
builder.groupsEndpoint,
174-
builder.importEndpoint,
172+
builder.eventsEndpoint,
173+
builder.peopleEndpoint,
174+
builder.groupsEndpoint,
175+
builder.importEndpoint,
175176
builder.useGzipCompression,
176177
builder.flagsConfig instanceof LocalFlagsConfig ? (LocalFlagsConfig) builder.flagsConfig : null,
177178
builder.flagsConfig instanceof RemoteFlagsConfig ? (RemoteFlagsConfig) builder.flagsConfig : null,
178179
builder.jsonSerializer,
179180
builder.connectTimeout,
180-
builder.readTimeout
181+
builder.readTimeout,
182+
builder.importMaxMessageCount
181183
);
182184
}
183185

@@ -192,18 +194,22 @@ private MixpanelAPI(Builder builder) {
192194
* @param localFlagsConfig configuration for local feature flags
193195
* @param remoteFlagsConfig configuration for remote feature flags
194196
* @param jsonSerializer custom JSON serializer (null uses default)
197+
* @param connectTimeout connection timeout in milliseconds (null uses default)
198+
* @param readTimeout read timeout in milliseconds (null uses default)
199+
* @param importMaxMessageCount maximum messages per import batch (null uses default)
195200
*/
196201
private MixpanelAPI(
197-
String eventsEndpoint,
198-
String peopleEndpoint,
199-
String groupsEndpoint,
200-
String importEndpoint,
201-
boolean useGzipCompression,
202-
LocalFlagsConfig localFlagsConfig,
202+
String eventsEndpoint,
203+
String peopleEndpoint,
204+
String groupsEndpoint,
205+
String importEndpoint,
206+
boolean useGzipCompression,
207+
LocalFlagsConfig localFlagsConfig,
203208
RemoteFlagsConfig remoteFlagsConfig,
204209
JsonSerializer jsonSerializer,
205210
Integer connectTimeout,
206-
Integer readTimeout
211+
Integer readTimeout,
212+
Integer importMaxMessageCount
207213
) {
208214
mEventsEndpoint = eventsEndpoint != null ? eventsEndpoint : Config.BASE_ENDPOINT + "/track";
209215
mPeopleEndpoint = peopleEndpoint != null ? peopleEndpoint : Config.BASE_ENDPOINT + "/engage";
@@ -212,6 +218,8 @@ private MixpanelAPI(
212218
mUseGzipCompression = useGzipCompression;
213219
mConnectTimeout = connectTimeout != null ? connectTimeout : DEFAULT_CONNECT_TIMEOUT_MILLIS;
214220
mReadTimeout = readTimeout != null ? readTimeout : DEFAULT_READ_TIMEOUT_MILLIS;
221+
mImportMaxMessageCount = importMaxMessageCount != null ?
222+
Math.min(importMaxMessageCount, Config.IMPORT_MAX_MESSAGE_SIZE) : Config.IMPORT_MAX_MESSAGE_SIZE;
215223
mDefaultJsonSerializer = new OrgJsonSerializer();
216224
if (jsonSerializer != null) {
217225
logger.log(Level.INFO, "Custom JsonSerializer provided: " + jsonSerializer.getClass().getName());
@@ -269,14 +277,41 @@ public void deliver(ClientDelivery toSend) throws IOException {
269277
* should be called in a separate thread or in a queue consumer.
270278
*
271279
* @param toSend a ClientDelivery containing a number of Mixpanel messages
280+
* @param useIpAddress if true, Mixpanel will use the ip address of the request for geolocation
272281
* @throws IOException
273282
* @see ClientDelivery
274283
*/
275284
public void deliver(ClientDelivery toSend, boolean useIpAddress) throws IOException {
276-
String ipParameter = "ip=0";
277-
if (useIpAddress) {
278-
ipParameter = "ip=1";
279-
}
285+
DeliveryOptions options = new DeliveryOptions.Builder()
286+
.useIpAddress(useIpAddress)
287+
.build();
288+
deliver(toSend, options);
289+
}
290+
291+
/**
292+
* Attempts to send a given delivery to the Mixpanel servers with custom options.
293+
* Will block, possibly on multiple server requests. For most applications, this method
294+
* should be called in a separate thread or in a queue consumer.
295+
*
296+
* <p>Example usage:
297+
* <pre>{@code
298+
* DeliveryOptions options = new DeliveryOptions.Builder()
299+
* .importStrictMode(false) // Disable strict validation for imports
300+
* .useIpAddress(true) // Use IP address for geolocation (events/people/groups only)
301+
* .build();
302+
*
303+
* mixpanelApi.deliver(delivery, options);
304+
* }</pre>
305+
*
306+
* @param toSend a ClientDelivery containing a number of Mixpanel messages
307+
* @param options configuration options for delivery
308+
* @throws IOException if there's a network error
309+
* @throws MixpanelServerException if the server rejects the messages
310+
* @see ClientDelivery
311+
* @see DeliveryOptions
312+
*/
313+
public void deliver(ClientDelivery toSend, DeliveryOptions options) throws IOException {
314+
String ipParameter = options.useIpAddress() ? "ip=1" : "ip=0";
280315

281316
String eventsUrl = mEventsEndpoint + "?" + ipParameter;
282317
List<JSONObject> events = toSend.getEventsMessages();
@@ -290,10 +325,10 @@ public void deliver(ClientDelivery toSend, boolean useIpAddress) throws IOExcept
290325
List<JSONObject> groupMessages = toSend.getGroupMessages();
291326
sendMessages(groupMessages, groupsUrl);
292327

293-
// Handle import messages - use strict mode and extract token for auth
294328
List<JSONObject> importMessages = toSend.getImportMessages();
295329
if (importMessages.size() > 0) {
296-
String importUrl = mImportEndpoint + "?strict=1";
330+
String strictParam = options.isImportStrictMode() ? "1" : "0";
331+
String importUrl = mImportEndpoint + "?strict=" + strictParam;
297332
sendImportMessages(importMessages, importUrl);
298333
}
299334
}
@@ -426,10 +461,10 @@ private void sendImportMessages(List<JSONObject> messages, String endpointUrl) t
426461
}
427462
}
428463

429-
// Send messages in batches (max 2000 per batch for /import)
464+
// Send messages in batches (max 2000 per batch for /import by default)
430465
// If token is empty, the server will reject with 401 Unauthorized
431-
for (int i = 0; i < messages.size(); i += Config.IMPORT_MAX_MESSAGE_SIZE) {
432-
int endIndex = i + Config.IMPORT_MAX_MESSAGE_SIZE;
466+
for (int i = 0; i < messages.size(); i += mImportMaxMessageCount) {
467+
int endIndex = i + mImportMaxMessageCount;
433468
endIndex = Math.min(endIndex, messages.size());
434469
List<JSONObject> batch = messages.subList(i, endIndex);
435470

@@ -534,7 +569,7 @@ private String dataString(List<JSONObject> messages) {
534569
responseStream = conn.getInputStream();
535570
response = slurp(responseStream);
536571
} catch (IOException e) {
537-
// HTTP error codes (401, 400, etc.) throw IOException when calling getInputStream()
572+
// HTTP error codes (401, 400, 413, etc.) throw IOException when calling getInputStream()
538573
// Check if it's an HTTP error and read the error stream for details
539574
InputStream errorStream = conn.getErrorStream();
540575
if (errorStream != null) {
@@ -559,12 +594,24 @@ private String dataString(List<JSONObject> messages) {
559594
}
560595
}
561596

562-
// Import endpoint returns JSON like {"code":200,"status":"OK","num_records_imported":N}
597+
// Import endpoint returns different formats depending on strict mode:
598+
// - strict=1: JSON like {"code":200,"status":"OK","num_records_imported":N}
599+
// - strict=0: Plain text "0" (not imported) or "1" (imported)
563600
if (response == null) {
564601
return false;
565602
}
566603

567-
// Parse JSON response
604+
// First, try to handle strict=0 response format (plain text "0" or "1")
605+
String trimmedResponse = response.trim();
606+
if ("1".equals(trimmedResponse)) {
607+
// strict=0 with successful import
608+
return true;
609+
} else if ("0".equals(trimmedResponse)) {
610+
// strict=0 with failed import (events not imported, reason unknown)
611+
return false;
612+
}
613+
614+
// Try to parse as JSON response (strict=1 format)
568615
try {
569616
JSONObject jsonResponse = new JSONObject(response);
570617

@@ -659,6 +706,7 @@ public static class Builder {
659706
private JsonSerializer jsonSerializer;
660707
private Integer connectTimeout;
661708
private Integer readTimeout;
709+
private Integer importMaxMessageCount;
662710

663711
/**
664712
* Sets the endpoint URL for Mixpanel events messages.
@@ -768,6 +816,22 @@ public Builder readTimeout(int readTimeoutInMillis) {
768816
return this;
769817
}
770818

819+
/**
820+
* Sets the maximum number of messages to include in a single batch for the /import endpoint.
821+
* The default value is 2000 messages per batch.
822+
* The max accepted value is 2000
823+
*
824+
* @param importMaxMessageCount the maximum number of import messages per batch.
825+
* Value must be greater than 0 and less than or equal to 2000.
826+
* @return this Builder instance for method chaining
827+
*/
828+
public Builder importMaxMessageCount(int importMaxMessageCount) {
829+
if (importMaxMessageCount > 0 && importMaxMessageCount <= Config.IMPORT_MAX_MESSAGE_SIZE) {
830+
this.importMaxMessageCount = importMaxMessageCount;
831+
}
832+
return this;
833+
}
834+
771835
/**
772836
* Builds and returns a new MixpanelAPI instance with the configured settings.
773837
*

0 commit comments

Comments
 (0)