Skip to content

Commit

Permalink
[pinpoint-apm#11953] Update io.asyncer:r2dbc-mysql of spring r2dbc pl…
Browse files Browse the repository at this point in the history
…ugin
  • Loading branch information
jaehong-kim committed Jan 15, 2025
1 parent 6bc7753 commit 7f175ab
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,11 @@
</dependency>

<dependency>
<groupId>dev.miku</groupId>
<groupId>io.asyncer</groupId>
<artifactId>r2dbc-mysql</artifactId>
<version>0.8.2.RELEASE</version>
<version>1.3.1</version>
</dependency>


<dependency>
<groupId>com.github.jasync-sql</groupId>
<artifactId>jasync-r2dbc-mysql</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,41 +16,32 @@

package com.pinpoint.test.plugin;

import dev.miku.r2dbc.mysql.MySqlConnectionConfiguration;
import dev.miku.r2dbc.mysql.MySqlConnectionFactory;
import dev.miku.r2dbc.mysql.constant.SslMode;
import io.asyncer.r2dbc.mysql.MySqlConnectionConfiguration;
import io.asyncer.r2dbc.mysql.MySqlConnectionFactory;
import io.asyncer.r2dbc.mysql.constant.SslMode;
import io.r2dbc.spi.ConnectionFactory;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

import java.time.Duration;

@Component
@Qualifier("mysql")
public class MysqlR2dbcDatabase implements R2dbcDatabase {
private ConnectionFactory connectionFactory;

@PostConstruct
public void init() throws Exception {
public MysqlR2dbcDatabase() {
System.out.println("INIT");
MySqlConnectionConfiguration connectionConfiguration = MySqlConnectionConfiguration.builder()
.host("localhost")
.port(49178)
.port(32789)
.database("test")
.user("root")
.password("")
.connectTimeout(Duration.ofSeconds(5 * 60))
.sslMode(SslMode.DISABLED)
.build();

connectionFactory = MySqlConnectionFactory.from(connectionConfiguration);
}

@PreDestroy
public void destroy() {
}

@Override
public ConnectionFactory getConnectionFactory() {
return this.connectionFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.r2dbc.core.R2dbcEntityTemplate;
import org.springframework.r2dbc.core.DatabaseClient;
import org.springframework.web.bind.annotation.GetMapping;
Expand All @@ -42,11 +42,12 @@
public class SpringDataR2dbcPluginController {
private final Logger logger = LogManager.getLogger(this.getClass());

R2dbcDatabase r2dbcDatabase;
@Autowired
MysqlR2dbcDatabase r2dbcDatabase;

public SpringDataR2dbcPluginController(@Qualifier("mysql") R2dbcDatabase r2dbcDatabase) {
this.r2dbcDatabase = r2dbcDatabase;
}
// public SpringDataR2dbcPluginController(@Qualifier("mysql") R2dbcDatabase r2dbcDatabase) {
// this.r2dbcDatabase = r2dbcDatabase;
// }

@GetMapping("/template/insert")
public Mono<Map<String, Object>> insert() throws SQLException {
Expand Down Expand Up @@ -86,11 +87,12 @@ public List<String> connectionSelect() throws Throwable {
Publisher<? extends Connection> conn = r2dbcDatabase.getConnectionFactory().create();
final ObservableSubscriber<String> subscriber = new ObservableSubscriber();
Mono.from(conn)
.flatMapMany(
c -> Flux.from(c.createStatement("SELECT * FROM persons")
.execute())
.flatMapMany(connection -> {
System.out.println("## Before");
return connection.createStatement("SELECT first_name FROM persons").execute();
}
).flatMap(result -> result.map(((row, rowMetadata) -> row.get("first_name", String.class)))
).subscribe(subscriber);
).subscribe(subscriber);
subscriber.await();
return subscriber.getReceived();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import com.navercorp.pinpoint.plugin.spring.r2dbc.interceptor.mysql.MySqlConnectionConfigurationInterceptor;
import com.navercorp.pinpoint.plugin.spring.r2dbc.interceptor.mysql.MySqlConnectionConstructorInterceptor;
import com.navercorp.pinpoint.plugin.spring.r2dbc.interceptor.mysql.MySqlConnectionFactoryFromInterceptor;
import com.navercorp.pinpoint.plugin.spring.r2dbc.interceptor.mysql.QueryFlowLoginInterceptor;
import com.navercorp.pinpoint.plugin.spring.r2dbc.interceptor.mysql.ReactorNettyClientConstructorInterceptor;
import com.navercorp.pinpoint.plugin.spring.r2dbc.interceptor.oracle.OracleConnectionFactoryImplConstructorInterceptor;
import com.navercorp.pinpoint.plugin.spring.r2dbc.interceptor.oracle.OracleConnectionFactoryImplLambdaCreateInterceptor;
Expand Down Expand Up @@ -115,17 +114,16 @@ public void setup(ProfilerPluginSetupContext context) {
}
if (config.getMysqlConfig().isPluginEnable()) {
// MySQL
transformTemplate.transform("dev.miku.r2dbc.mysql.MySqlConnectionConfiguration", MySqlConnectionConfigurationTransform.class);
transformTemplate.transform("dev.miku.r2dbc.mysql.MySqlConnectionFactory", MySqlConnectionFactoryTransform.class);
transformTemplate.transform("dev.miku.r2dbc.mysql.MySqlConnection", MySqlConnectionTransform.class);
transformTemplate.transform("dev.miku.r2dbc.mysql.PrepareSimpleStatement", MySqlStatementTransform.class);
transformTemplate.transform("dev.miku.r2dbc.mysql.TextSimpleStatement", MySqlStatementTransform.class);
transformTemplate.transform("dev.miku.r2dbc.mysql.PrepareParametrizedStatement", MySqlStatementTransform.class);
transformTemplate.transform("dev.miku.r2dbc.mysql.TextParametrizedStatement", MySqlStatementTransform.class);
transformTemplate.transform("dev.miku.r2dbc.mysql.SimpleStatementSupport", MySqlStatementTransform.class);
transformTemplate.transform("dev.miku.r2dbc.mysql.ParametrizedStatementSupport", MySqlStatementTransform.class);
transformTemplate.transform("dev.miku.r2dbc.mysql.client.ReactorNettyClient", ReactorNettyClientTransform.class);
transformTemplate.transform("dev.miku.r2dbc.mysql.QueryFlow", QueryFlowTransform.class);
transformTemplate.transform("io.asyncer.r2dbc.mysql.MySqlConnectionConfiguration", MySqlConnectionConfigurationTransform.class);
transformTemplate.transform("io.asyncer.r2dbc.mysql.MySqlConnectionFactory", MySqlConnectionFactoryTransform.class);
transformTemplate.transform("io.asyncer.r2dbc.mysql.MySqlSimpleConnection", MySqlConnectionTransform.class);
transformTemplate.transform("io.asyncer.r2dbc.mysql.PrepareSimpleStatement", MySqlStatementTransform.class);
transformTemplate.transform("io.asyncer.r2dbc.mysql.TextSimpleStatement", MySqlStatementTransform.class);
transformTemplate.transform("io.asyncer.r2dbc.mysql.PrepareParameterizedStatement", MySqlStatementTransform.class);
transformTemplate.transform("io.asyncer.r2dbc.mysql.TextParameterizedStatement", MySqlStatementTransform.class);
transformTemplate.transform("io.asyncer.r2dbc.mysql.SimpleStatementSupport", MySqlStatementTransform.class);
transformTemplate.transform("io.asyncer.r2dbc.mysql.ParameterizedStatementSupport", MySqlStatementTransform.class);
transformTemplate.transform("io.asyncer.r2dbc.mysql.client.ReactorNettyClient", ReactorNettyClientTransform.class);
// MySQL - Jasync
transformTemplate.transform("com.github.jasync.sql.db.Configuration", JasyncConfigurationTransform.class);
transformTemplate.transform("com.github.jasync.sql.db.mysql.pool.MySQLConnectionFactory", JasyncMySQLConnectionFactoryTransform.class);
Expand Down Expand Up @@ -409,7 +407,7 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader classLoader,
final InstrumentClass target = instrumentor.getInstrumentClass(classLoader, className, classfileBuffer);
target.addField(DatabaseInfoAccessor.class);

final InstrumentMethod fromMethod = target.getDeclaredMethod("from", "dev.miku.r2dbc.mysql.MySqlConnectionConfiguration");
final InstrumentMethod fromMethod = target.getDeclaredMethod("from", "io.asyncer.r2dbc.mysql.MySqlConnectionConfiguration");
if (fromMethod != null) {
fromMethod.addInterceptor(MySqlConnectionFactoryFromInterceptor.class);
}
Expand All @@ -430,7 +428,7 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader classLoader,
final InstrumentClass target = instrumentor.getInstrumentClass(classLoader, className, classfileBuffer);
target.addField(DatabaseInfoAccessor.class);

final InstrumentMethod constructorMethod = target.getConstructor("dev.miku.r2dbc.mysql.client.Client", "dev.miku.r2dbc.mysql.ConnectionContext", "dev.miku.r2dbc.mysql.codec.Codecs", "io.r2dbc.spi.IsolationLevel", "java.lang.String", "java.util.function.Predicate");
final InstrumentMethod constructorMethod = target.getConstructor("io.asyncer.r2dbc.mysql.client.Client", "io.asyncer.r2dbc.mysql.codec.Codecs", "io.asyncer.r2dbc.mysql.cache.QueryCache", "java.util.function.Predicate");
if (constructorMethod != null) {
constructorMethod.addInterceptor(MySqlConnectionConstructorInterceptor.class);
}
Expand Down Expand Up @@ -483,7 +481,7 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader classLoader,
final InstrumentClass target = instrumentor.getInstrumentClass(classLoader, className, classfileBuffer);
target.addField(DatabaseInfoAccessor.class);

final InstrumentMethod constructorMethod = target.getConstructor("reactor.netty.Connection", "dev.miku.r2dbc.mysql.MySqlSslConfiguration", "dev.miku.r2dbc.mysql.ConnectionContext");
final InstrumentMethod constructorMethod = target.getConstructor("reactor.netty.Connection", "io.asyncer.r2dbc.mysql.MySqlSslConfiguration", "io.asyncer.r2dbc.mysql.ConnectionContext");
if (constructorMethod != null) {
constructorMethod.addInterceptor(ReactorNettyClientConstructorInterceptor.class);
}
Expand All @@ -492,22 +490,6 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader classLoader,
}
}

public static class QueryFlowTransform implements TransformCallback {

@Override
public byte[] doInTransform(Instrumentor instrumentor, ClassLoader classLoader, String className, Class<?> classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) throws InstrumentException {
final InstrumentClass target = instrumentor.getInstrumentClass(classLoader, className, classfileBuffer);
target.addField(DatabaseInfoAccessor.class);

final InstrumentMethod loginMethod = target.getDeclaredMethod("login", "dev.miku.r2dbc.mysql.client.Client", "dev.miku.r2dbc.mysql.constant.SslMode", "java.lang.String", "java.lang.String", "java.lang.CharSequence", "dev.miku.r2dbc.mysql.ConnectionContext");
if (loginMethod != null) {
loginMethod.addInterceptor(QueryFlowLoginInterceptor.class);
}

return target.toBytecode();
}
}

// jasync-sql
public static class JasyncConfigurationTransform implements TransformCallback {

Expand Down

0 comments on commit 7f175ab

Please sign in to comment.